Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/**
* Ingest planner graph (#952) — research subgraph wiring + routing tests.
*/
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

const {
prepareIngest,
planIngest,
planQueries,
webSearch,
wikiSearch,
fetchArticles,
evaluateSufficiency,
refineQueries,
compileBatch,
} = vi.hoisted(() => ({
prepareIngest: vi.fn(),
planIngest: vi.fn(),
planQueries: vi.fn(),
webSearch: vi.fn(),
wikiSearch: vi.fn(),
fetchArticles: vi.fn(),
evaluateSufficiency: vi.fn(),
refineQueries: vi.fn(),
compileBatch: vi.fn(),
}));

vi.mock("../../../../agents/graphs/ingest/nodes/index.js", async () => {
const real = await vi.importActual<
typeof import("../../../../agents/graphs/ingest/nodes/index.js")
>("../../../../agents/graphs/ingest/nodes/index.js");
return { ...real, prepareIngest, planIngest };
});

vi.mock("../../../../agents/subgraphs/research/nodes/index.js", async () => {
const real = await vi.importActual<
typeof import("../../../../agents/subgraphs/research/nodes/index.js")
>("../../../../agents/subgraphs/research/nodes/index.js");
return {
...real,
planQueries,
webSearch,
wikiSearch,
fetchArticles,
evaluateSufficiency,
refineQueries,
compileBatch,
};
});

import { GraphRunner } from "../../../../agents/runner/graphRunner.js";
import { __resetRegistryForTests } from "../../../../agents/registry/graphRegistry.js";
import {
INGEST_PLANNER_GRAPH_ID,
registerIngestPlannerGraph,
} from "../../../../agents/graphs/ingest/index.js";
import type { GraphContext } from "../../../../agents/core/types/graphContext.js";
import type { Database } from "../../../../types/index.js";
import { MemorySaver } from "@langchain/langgraph";

function fakeContext(threadId: string): GraphContext {
return {
threadId,
sessionId: threadId,
userId: "user-1",
pageId: "",
graphId: INGEST_PLANNER_GRAPH_ID,
backend: "zedi_managed",
tier: "free",
db: {} as Database,
feature: "ingest_graph:test",
userEmail: null,
};
}

const articleInput = {
title: "Test Article",
url: "https://example.com/a",
excerpt: "Body text about testing.",
};

const candidatesInput = [{ id: "page-1", title: "Existing", excerpt: "Old content" }];

function defaultMocks() {
prepareIngest.mockImplementation(async () => ({
article: articleInput,
candidates: candidatesInput,
phase: "ingest:prepare",
}));

planQueries.mockImplementation(async () => ({
queries: [{ id: "q1", query: "topic", channels: ["web"] }],
maxIterations: 3,
iteration: 0,
phase: "research:plan",
}));
webSearch.mockImplementation(async () => ({
pendingSources: [{ id: "src:1", kind: "web", title: "Hit", url: "https://hit/" }],
}));
wikiSearch.mockImplementation(async () => ({ pendingSources: [] }));
fetchArticles.mockImplementation(async () => ({ pendingSources: [] }));
evaluateSufficiency.mockImplementation(async (state: { iteration: number }) => ({
lastEvaluation: { score: 0.9, rationale: "ok", missingAspects: [] },
iteration: state.iteration + 1,
phase: "research:evaluated",
}));
compileBatch.mockImplementation(
async (state: {
iteration: number;
queries: unknown[];
pendingSources: unknown[];
lastEvaluation: unknown;
}) => ({
batches: [
{
id: "batch-1",
iteration: state.iteration,
queries: state.queries,
sources: state.pendingSources,
evaluation: state.lastEvaluation,
createdAt: "2026-01-01T00:00:00.000Z",
},
],
exitReason: "score_threshold",
phase: "research:compile",
}),
);

planIngest.mockImplementation(async () => ({
ingestPlan: {
action: "merge",
reason: "Same topic",
targetPageId: "page-1",
},
phase: "ingest:planned",
}));
}

describe("ingestPlannerGraph — research subgraph connection", () => {
beforeEach(() => {
__resetRegistryForTests();
registerIngestPlannerGraph();
prepareIngest.mockReset();
planIngest.mockReset();
planQueries.mockReset();
webSearch.mockReset();
wikiSearch.mockReset();
fetchArticles.mockReset();
evaluateSufficiency.mockReset();
refineQueries.mockReset();
compileBatch.mockReset();
defaultMocks();
});

afterEach(() => {
__resetRegistryForTests();
});

it("runs prepare_ingest then research nodes before halting at human_review_research", async () => {
const runner = new GraphRunner();
const result = await runner.invoke(
{
graphId: INGEST_PLANNER_GRAPH_ID,
context: fakeContext("thread-ingest-1"),
checkpointer: new MemorySaver(),
recursionLimit: 60,
},
{
kind: "input",
value: { article: articleInput, candidates: candidatesInput },
},
);

expect(result.status).toBe("interrupted");
expect(prepareIngest).toHaveBeenCalledTimes(1);
expect(planQueries).toHaveBeenCalledTimes(1);
expect(compileBatch).toHaveBeenCalledTimes(1);
expect(planIngest).not.toHaveBeenCalled();
});

it("reaches plan_ingest after research HITL resume", async () => {
const checkpointer = new MemorySaver();
const runner = new GraphRunner();
const ctx = fakeContext("thread-ingest-2");

await runner.invoke(
{
graphId: INGEST_PLANNER_GRAPH_ID,
context: ctx,
checkpointer,
recursionLimit: 60,
},
{ kind: "input", value: { article: articleInput, candidates: candidatesInput } },
);

const resumed = await runner.resume(
{
graphId: INGEST_PLANNER_GRAPH_ID,
context: ctx,
checkpointer,
recursionLimit: 60,
},
{ approvedSourceIds: ["src:1"] },
);

expect(resumed.status).toBe("completed");
expect(planIngest).toHaveBeenCalledTimes(1);
const output = resumed.output as { ingestPlan?: { action?: string } };
expect(output.ingestPlan?.action).toBe("merge");
});
});
37 changes: 37 additions & 0 deletions server/api/src/__tests__/agents/graphs/ingest/planIngest.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* `plan_ingest` prompt helpers (#952).
*/
import { describe, expect, it } from "vitest";
import { appendApprovedResearchToPlannerMessages } from "../../../../agents/graphs/ingest/nodes/planIngest.js";
import { buildIngestPlannerPrompt } from "../../../../services/ingestPlanner.js";

const article = {
title: "Test",
url: "https://example.com/a",
excerpt: "Body",
};

describe("appendApprovedResearchToPlannerMessages", () => {
it("appends approved source titles and excerpts to the user message", () => {
const base = buildIngestPlannerPrompt({ article, candidates: [] });
const enriched = appendApprovedResearchToPlannerMessages(base, [
{
id: "src:1",
kind: "fetched",
title: "Background article",
excerpt: "Important context for merge decision.",
},
]);

const user = enriched.at(-1);
expect(user?.role).toBe("user");
expect(user?.content).toContain("## APPROVED RESEARCH");
expect(user?.content).toContain("Background article");
expect(user?.content).toContain("Important context");
});

it("returns messages unchanged when no approved sources", () => {
const base = buildIngestPlannerPrompt({ article, candidates: [] });
expect(appendApprovedResearchToPlannerMessages(base, [])).toEqual(base);
});
});
3 changes: 2 additions & 1 deletion server/api/src/agents/core/composeModelConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import { WIKI_COMPOSE_GRAPH_ID } from "../graphs/wikiCompose/index.js";
import { getOrchestratorModelId } from "../subgraphs/research/nodes/planQueries.js";
import { RESEARCH_GRAPH_ID } from "../subgraphs/research/index.js";
import { INGEST_PLANNER_GRAPH_ID } from "../graphs/ingest/index.js";

const DRAFT_MODEL_ENV = "WIKI_COMPOSE_DRAFT_MODEL_ID";
const DRAFT_MODEL_FALLBACK = "claude-3-5-sonnet";
Expand All @@ -23,7 +24,7 @@ export function getComposeModelIdsForGraph(graphId: string): string[] {
const draft = getDraftModelId();
return orchestrator === draft ? [orchestrator] : [orchestrator, draft];
}
if (graphId === RESEARCH_GRAPH_ID) {
if (graphId === RESEARCH_GRAPH_ID || graphId === INGEST_PLANNER_GRAPH_ID) {
return [getOrchestratorModelId()];
}
return [getOrchestratorModelId()];
Expand Down
17 changes: 17 additions & 0 deletions server/api/src/agents/graphs/ingest/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export {
INGEST_PLANNER_GRAPH_ID,
INGEST_PLANNER_GRAPH_VERSION,
registerIngestPlannerGraph,
} from "./ingestPlannerGraph.js";
export {
IngestPlannerState,
type IngestPlannerStateType,
type IngestPlannerStateUpdate,
} from "./state.js";
export type {
IngestAction,
IngestPlan,
IngestConflict,
CandidatePage,
IngestArticleSummary,
} from "./types.js";
79 changes: 79 additions & 0 deletions server/api/src/agents/graphs/ingest/ingestPlannerGraph.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Wiki Compose P4 — `ingestPlannerGraph` (issue #952).
*
* 記事クリップ ingest フロー。`prepare_ingest` の後に P1 調査ループ
* (`researchLoopSubgraph` と同じノード / tools / `shouldRefine`)を組み込み、
* `human_review_research` のあと `plan_ingest` で merge / create / skip を決定する。
*
* Ingest planner graph: seeds clip context, runs the shared research loop
* (same nodes/tools as Compose), then emits an ingest plan via ZediChatModel.
*/
import { END, START, StateGraph } from "@langchain/langgraph";
import { registerGraph, type GraphFactory } from "../../registry/graphRegistry.js";
import { shouldRefine } from "../../subgraphs/research/researchGraph.js";
import {
planQueries,
webSearch,
wikiSearch,
fetchArticles,
evaluateSufficiency,
refineQueries,
compileBatch,
humanReviewResearch,
} from "../../subgraphs/research/nodes/index.js";
import { IngestPlannerState } from "./state.js";
import { prepareIngest, planIngest } from "./nodes/index.js";

/** Registered graph id. */
export const INGEST_PLANNER_GRAPH_ID = "ingest-planner" as const;
/** Registered graph version. */
export const INGEST_PLANNER_GRAPH_VERSION = "1.0.0";

const factory: GraphFactory = ({ checkpointer }) => {
const builder = new StateGraph(IngestPlannerState)
.addNode("prepare_ingest", prepareIngest)
.addNode("plan_ingest", planIngest)
.addEdge(START, "prepare_ingest")
// Research loop (same wiring as `researchLoopSubgraph` / `wireResearchLoopSubgraph`).
.addNode("plan_queries", planQueries)
.addNode("web_search", webSearch)
.addNode("wiki_search", wikiSearch)
.addNode("fetch_articles", fetchArticles)
.addNode("evaluate_sufficiency", evaluateSufficiency)
.addNode("refine_queries", refineQueries)
.addNode("compile_batch", compileBatch)
.addNode("human_review_research", humanReviewResearch)
.addEdge("prepare_ingest", "plan_queries")
.addEdge("plan_queries", "web_search")
.addEdge("plan_queries", "wiki_search")
.addEdge("web_search", "fetch_articles")
.addEdge("wiki_search", "fetch_articles")
.addEdge("fetch_articles", "evaluate_sufficiency")
.addConditionalEdges("evaluate_sufficiency", shouldRefine, {
refine: "refine_queries",
compile: "compile_batch",
})
.addEdge("refine_queries", "web_search")
.addEdge("refine_queries", "wiki_search")
.addEdge("compile_batch", "human_review_research")
.addEdge("human_review_research", "plan_ingest")
.addEdge("plan_ingest", END);

return checkpointer ? builder.compile({ checkpointer }) : builder.compile();
};

/**
* Register the ingest planner graph. Idempotent; call from `app.ts` bootstrap.
*/
export function registerIngestPlannerGraph(): void {
registerGraph({
id: INGEST_PLANNER_GRAPH_ID,
version: INGEST_PLANNER_GRAPH_VERSION,
phase: "ingest",
description:
"Wiki Compose P4: ingest clip planner. Runs the P1 research loop (shared nodes/tools) " +
"then plans merge/create/skip via ZediChatModel. Interrupt at human_review_research; " +
"resume payload matches wiki-compose-research. Coexists with POST /api/ingest/plan (#595).",
factory,
});
}
2 changes: 2 additions & 0 deletions server/api/src/agents/graphs/ingest/nodes/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { prepareIngest } from "./prepareIngest.js";
export { planIngest } from "./planIngest.js";
Loading
Loading