Skip to content

Commit cf36e62

Browse files
barry3406claude
andcommitted
fix: complete streaming event coverage + generator cleanup
- yield tool_call_start before tool execution (was defined but never yielded) - yield skill_activated when activate_skill tool succeeds - try/finally guarantees run_end on early consumer break - tool_call_end.durationMs made optional (no per-tool timing data available) - 6 new tests: tool_call_start ordering, skill_activated, fatal/pause run_end, run()/stream() equivalence, concurrent tool events Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 88dbe93 commit cf36e62

3 files changed

Lines changed: 149 additions & 12 deletions

File tree

packages/ai/src/loop/engine.ts

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,14 @@ export async function* runSmartLoopStream(
215215
/* Phase 1: Initialization */
216216
/* ================================================================ */
217217

218+
const runStartTime = Date.now();
219+
let finalResultYielded = false;
220+
218221
// 1a. Create engine state from config (or checkpoint for resume)
219222
const state = createEngineState(config, goal, checkpoint, resumeMessage);
220223

224+
try {
225+
221226
// 1b. Build tool catalog (inline or deferred)
222227
const catalog = createToolCatalog(state.tools, config.toolCatalog);
223228
const allTools = catalog.discoverTool
@@ -399,7 +404,7 @@ export async function* runSmartLoopStream(
399404
status: "paused",
400405
checkpoint: buildCheckpoint(state, "paused"),
401406
};
402-
yield { type: "run_end" as const, result: pausedResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
407+
finalResultYielded = true; yield { type: "run_end" as const, result: pausedResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
403408
return pausedResult;
404409
}
405410
if (decision.action === "cancel") {
@@ -411,7 +416,7 @@ export async function* runSmartLoopStream(
411416
status: "canceled",
412417
checkpoint: buildCheckpoint(state, "canceled"),
413418
};
414-
yield { type: "run_end" as const, result: canceledResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
419+
finalResultYielded = true; yield { type: "run_end" as const, result: canceledResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
415420
return canceledResult;
416421
}
417422
} catch {
@@ -485,7 +490,7 @@ export async function* runSmartLoopStream(
485490
error: `Primary LLM failed: ${primaryMsg}. Fallback LLM also failed: ${fallbackMsg}`,
486491
checkpoint: buildCheckpoint(state),
487492
};
488-
yield { type: "run_end" as const, result: fatalResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
493+
finalResultYielded = true; yield { type: "run_end" as const, result: fatalResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
489494
return fatalResult;
490495
}
491496
// If we reach here, fallback succeeded — skip the context_limit path
@@ -562,7 +567,7 @@ export async function* runSmartLoopStream(
562567
error: "Context overflow unrecoverable: autocompact and reactive compact both exhausted",
563568
checkpoint: buildCheckpoint(state),
564569
};
565-
yield { type: "run_end" as const, result: contextFatalResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
570+
finalResultYielded = true; yield { type: "run_end" as const, result: contextFatalResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
566571
return contextFatalResult;
567572
} else {
568573
// Non-context error with no fallback configured — use decideContinuation
@@ -583,7 +588,7 @@ export async function* runSmartLoopStream(
583588
error: errorMsg || contAction.error,
584589
checkpoint: buildCheckpoint(state),
585590
};
586-
yield { type: "run_end" as const, result: errFatalResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
591+
finalResultYielded = true; yield { type: "run_end" as const, result: errFatalResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
587592
return errFatalResult;
588593
}
589594
// Should not reach here for "error" finishReason (decideContinuation returns fatal)
@@ -645,7 +650,7 @@ export async function* runSmartLoopStream(
645650
status: "completed",
646651
checkpoint: buildCheckpoint(state, "completed"),
647652
};
648-
yield { type: "run_end" as const, result: budgetResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
653+
finalResultYielded = true; yield { type: "run_end" as const, result: budgetResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
649654
return budgetResult;
650655
}
651656
}
@@ -671,15 +676,26 @@ export async function* runSmartLoopStream(
671676
pendingApproval: blockedApproval,
672677
checkpoint: cp,
673678
};
674-
yield { type: "run_end" as const, result: approvalResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
679+
finalResultYielded = true; yield { type: "run_end" as const, result: approvalResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
675680
return approvalResult;
676681
}
677682

678683
// Tool calls were made
679684
if (toolRecords.length > 0) {
685+
// Yield tool_call_start for each tool that was requested
686+
for (const req of outcome.toolRequests) {
687+
yield { type: "tool_call_start" as const, tool: req.name, args: req.args, iteration: state.iterations, timestamp: Date.now() };
688+
}
689+
680690
// Yield tool_call_end events for each tool execution
681691
for (const record of toolRecords) {
682-
yield { type: "tool_call_end" as const, tool: record.tool, args: record.args, result: record.result, status: (record.status ?? "success") as "success" | "error", durationMs: 0, iteration: state.iterations, timestamp: Date.now() };
692+
// Detect skill activation
693+
if (record.tool === "activate_skill" && record.status !== "error") {
694+
const skillResult = record.result as Record<string, unknown>;
695+
yield { type: "skill_activated" as const, skill: (skillResult?.skill as string) ?? "unknown", iteration: state.iterations, timestamp: Date.now() };
696+
}
697+
698+
yield { type: "tool_call_end" as const, tool: record.tool, args: record.args, result: record.result, status: (record.status ?? "success") as "success" | "error", iteration: state.iterations, timestamp: Date.now() };
683699
}
684700

685701
// Append assistant message
@@ -938,7 +954,7 @@ export async function* runSmartLoopStream(
938954
status: "completed",
939955
checkpoint: buildCheckpoint(state, "completed"),
940956
};
941-
yield { type: "run_end" as const, result: completedResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
957+
finalResultYielded = true; yield { type: "run_end" as const, result: completedResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
942958
return completedResult;
943959
}
944960

@@ -956,7 +972,7 @@ export async function* runSmartLoopStream(
956972
status: "completed",
957973
checkpoint: buildCheckpoint(state, "completed"),
958974
};
959-
yield { type: "run_end" as const, result: forceResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
975+
finalResultYielded = true; yield { type: "run_end" as const, result: forceResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
960976
return forceResult;
961977
}
962978

@@ -970,6 +986,12 @@ export async function* runSmartLoopStream(
970986
status: "max_iterations",
971987
checkpoint: buildCheckpoint(state, "max_iterations"),
972988
};
973-
yield { type: "run_end" as const, result: maxIterResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
989+
finalResultYielded = true; yield { type: "run_end" as const, result: maxIterResult, durationMs: Date.now() - state.runStartTime, timestamp: Date.now() };
974990
return maxIterResult;
991+
992+
} finally {
993+
if (!finalResultYielded) {
994+
yield { type: "run_end" as const, result: { result: null, iterations: 0, toolCalls: [], taskCalls: [], status: "fatal" as const, error: "Stream terminated early" }, durationMs: Date.now() - runStartTime, timestamp: Date.now() };
995+
}
996+
}
975997
}

packages/ai/src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ export type AgentEvent =
284284
| { type: "llm_call_start"; iteration: number; messageCount: number; timestamp: number }
285285
| { type: "llm_call_end"; iteration: number; content: string; finishReason: string; tokensUsed?: { input: number; output: number } | undefined; durationMs: number; timestamp: number }
286286
| { type: "tool_call_start"; tool: string; args: unknown; iteration: number; timestamp: number }
287-
| { type: "tool_call_end"; tool: string; args: unknown; result: unknown; status: "success" | "error"; durationMs: number; iteration: number; timestamp: number }
287+
| { type: "tool_call_end"; tool: string; args: unknown; result: unknown; status: "success" | "error"; durationMs?: number; iteration: number; timestamp: number }
288288
| { type: "skill_activated"; skill: string; iteration: number; timestamp: number }
289289
| { type: "compression"; strategy: "snip" | "microcompact" | "autocompact" | "reactive"; tokensBefore: number; tokensAfter: number; timestamp: number }
290290
| { type: "memory_enrichment"; memoriesInjected: number; iteration: number; timestamp: number }

tests/unit/agent-streaming.test.ts

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,4 +213,119 @@ describe("Agent Streaming", () => {
213213
expect(llmEnd).toBeDefined();
214214
expect((llmEnd as any).content.length).toBeLessThanOrEqual(200);
215215
});
216+
217+
it("yields tool_call_start before tool_call_end", async () => {
218+
const agent = createSmartAgent({
219+
llm: mockLLM([JSON.stringify({ tool: "t", arguments: { x: 1 } }), "Done."]),
220+
tools: [{ name: "t", description: "test", async execute(a) { return a.x; } }],
221+
});
222+
const events: AgentEvent[] = [];
223+
const stream = agent.stream("Go");
224+
let r = await stream.next();
225+
while (!r.done) { events.push(r.value); r = await stream.next(); }
226+
227+
const startIdx = events.findIndex(e => e.type === "tool_call_start");
228+
const endIdx = events.findIndex(e => e.type === "tool_call_end");
229+
expect(startIdx).toBeGreaterThanOrEqual(0);
230+
expect(endIdx).toBeGreaterThan(startIdx);
231+
});
232+
233+
it("yields skill_activated when activate_skill is called", async () => {
234+
const agent = createSmartAgent({
235+
llm: mockLLM([JSON.stringify({ tool: "activate_skill", arguments: { skill_name: "debug" } }), "Done."]),
236+
tools: [],
237+
skills: [{ name: "debug", description: "Debug", trigger: "on bug", prompt: "Step 1: investigate" }],
238+
});
239+
const events: AgentEvent[] = [];
240+
const stream = agent.stream("Debug this");
241+
let r = await stream.next();
242+
while (!r.done) { events.push(r.value); r = await stream.next(); }
243+
244+
const skillEvent = events.find(e => e.type === "skill_activated");
245+
expect(skillEvent).toBeDefined();
246+
expect((skillEvent as any).skill).toBe("debug");
247+
});
248+
249+
it("yields run_end on fatal error", async () => {
250+
const failLlm: LLMProvider = {
251+
name: "fail",
252+
async chat() { throw new Error("Permanent failure"); },
253+
};
254+
const agent = createSmartAgent({ llm: failLlm, tools: [], maxIterations: 2 });
255+
const events: AgentEvent[] = [];
256+
const stream = agent.stream("Go");
257+
let r = await stream.next();
258+
while (!r.done) { events.push(r.value); r = await stream.next(); }
259+
const finalResult = r.value;
260+
261+
expect(finalResult.status).toBe("fatal");
262+
const runEnd = events.find(e => e.type === "run_end");
263+
expect(runEnd).toBeDefined();
264+
expect((runEnd as any).result.status).toBe("fatal");
265+
});
266+
267+
it("yields run_end on pause", async () => {
268+
let callCount = 0;
269+
const agent = createSmartAgent({
270+
llm: { name: "mock", async chat() { callCount++; return { content: "Hi", model: "m" }; } },
271+
tools: [],
272+
hooks: {
273+
async getControlState() {
274+
return callCount === 0 ? { action: "pause" as const } : { action: "continue" as const };
275+
},
276+
},
277+
});
278+
const events: AgentEvent[] = [];
279+
const stream = agent.stream("Go");
280+
let r = await stream.next();
281+
while (!r.done) { events.push(r.value); r = await stream.next(); }
282+
283+
expect(r.value.status).toBe("paused");
284+
const runEnd = events.find(e => e.type === "run_end");
285+
expect(runEnd).toBeDefined();
286+
});
287+
288+
it("run() and stream() produce same final result", async () => {
289+
const makeAgent = () => createSmartAgent({
290+
llm: mockLLM(["The answer is 42."]),
291+
tools: [],
292+
});
293+
294+
const runResult = await makeAgent().run("What is the answer?");
295+
296+
const streamAgent = makeAgent();
297+
const stream = streamAgent.stream("What is the answer?");
298+
let r = await stream.next();
299+
while (!r.done) { r = await stream.next(); }
300+
const streamResult = r.value;
301+
302+
expect(streamResult.status).toBe(runResult.status);
303+
expect(streamResult.result).toBe(runResult.result);
304+
expect(streamResult.iterations).toBe(runResult.iterations);
305+
});
306+
307+
it("concurrent tool calls produce multiple tool_call events", async () => {
308+
const agent = createSmartAgent({
309+
llm: mockLLM([
310+
JSON.stringify([
311+
{ tool: "a", arguments: {} },
312+
{ tool: "b", arguments: {} },
313+
]),
314+
"Done.",
315+
]),
316+
tools: [
317+
{ name: "a", description: "A", isConcurrencySafe: true, async execute() { return 1; } },
318+
{ name: "b", description: "B", isConcurrencySafe: true, async execute() { return 2; } },
319+
],
320+
});
321+
const events: AgentEvent[] = [];
322+
const stream = agent.stream("Go");
323+
let r = await stream.next();
324+
while (!r.done) { events.push(r.value); r = await stream.next(); }
325+
326+
const toolStarts = events.filter(e => e.type === "tool_call_start");
327+
const toolEnds = events.filter(e => e.type === "tool_call_end");
328+
expect(toolStarts.length).toBeGreaterThanOrEqual(2);
329+
expect(toolEnds.length).toBeGreaterThanOrEqual(2);
330+
});
216331
});

0 commit comments

Comments
 (0)