Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions server/api/src/routes/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import type { IngestArticleSummary } from "../services/ingestPlanner.js";
import { assertSupportedComposeBackend } from "../agents/core/llm/modelFactory.js";
import { assertComposeBackendReady } from "../agents/core/composeBackendValidation.js";
import { resolveCheckpointerForRun } from "../agents/core/checkpoint/index.js";
import { getRegisteredGraph } from "../agents/registry/graphRegistry.js";
import type { BaseCheckpointSaver } from "@langchain/langgraph";
import type { ExecutionBackend } from "../agents/core/types/executionBackend.js";

const app = new Hono<AppEnv>();
Expand Down Expand Up @@ -95,6 +97,45 @@ function normalizeGraphCandidates(raw: CandidatePage[] | undefined): CandidatePa
return out;
}

/**
* Read `userId` stored in an ingest-planner checkpoint, if any.
* ingest-planner checkpoint に保存された `userId` を読む(あれば)。
*/
async function readIngestCheckpointUserId(
threadId: string,
checkpointer: BaseCheckpointSaver,
): Promise<string | null> {
const registered = getRegisteredGraph(INGEST_PLANNER_GRAPH_ID);
if (!registered) return null;
const graph = registered.factory({ checkpointer }) as {
getState?: (config: unknown) => Promise<{ values?: Record<string, unknown> } | undefined>;
};
if (typeof graph.getState !== "function") return null;
try {
const snap = await graph.getState({ configurable: { thread_id: threadId } });
const owner = snap?.values?.userId;
return typeof owner === "string" && owner.length > 0 ? owner : null;
} catch {
return null;
}
}

/**
* Reject cross-user access when reusing a `threadId` tied to another user's checkpoint.
* 他ユーザーの checkpoint に紐づく `threadId` 再利用を拒否する。
*/
async function assertIngestThreadAccessible(
threadId: string,
userId: string,
checkpointer: BaseCheckpointSaver | false,
): Promise<void> {
if (checkpointer === false) return;
const owner = await readIngestCheckpointUserId(threadId, checkpointer);
if (owner !== null && owner !== userId) {
throw new HTTPException(403, { message: "threadId is not accessible" });
}
}

/**
* リクエストボディ。
* Request body for POST /api/ingest/plan.
Expand Down Expand Up @@ -411,6 +452,7 @@ app.post("/graph/run", authRequired, rateLimit(), async (c) => {
});

const checkpointer = await resolveCheckpointerForRun();
await assertIngestThreadAccessible(threadId, userId, checkpointer);
const runner = new GraphRunner();
const result = await runner.invoke(
{
Expand Down Expand Up @@ -510,6 +552,7 @@ app.post("/graph/resume", authRequired, rateLimit(), async (c) => {
});
}

await assertIngestThreadAccessible(threadId, userId, checkpointer);
const runner = new GraphRunner();
const result = await runner.resume(
{
Expand Down
Loading