diff --git a/.changeset/expose-abort-usage.md b/.changeset/expose-abort-usage.md new file mode 100644 index 000000000000..f62ad0a2042b --- /dev/null +++ b/.changeset/expose-abort-usage.md @@ -0,0 +1,5 @@ +--- +'ai': patch +--- + +feat(ai): expose usage metrics in onAbort callback diff --git a/content/docs/03-ai-sdk-core/50-error-handling.mdx b/content/docs/03-ai-sdk-core/50-error-handling.mdx index 2d62e395bc57..42f0fc1f8d7c 100644 --- a/content/docs/03-ai-sdk-core/50-error-handling.mdx +++ b/content/docs/03-ai-sdk-core/50-error-handling.mdx @@ -123,7 +123,11 @@ for await (const textPart of textStream) { The `onAbort` callback receives: -- `steps`: An array of all completed steps before the abort +- `steps`: All completed steps before the abort +- `usage`: Token usage for the current (aborted) step, or `undefined` if not yet received +- `totalUsage`: Aggregated usage across all completed steps plus the current step, or `undefined` if no usage data was received +- `inputMessages`: The input messages for the current step, including prior step responses +- `partialText`: The text being streamed at abort time, or `undefined` if no text was generated You can also handle abort events directly in the stream: diff --git a/content/docs/07-reference/01-ai-sdk-core/02-stream-text.mdx b/content/docs/07-reference/01-ai-sdk-core/02-stream-text.mdx index 186cfc9a579c..b7f5eebd8c01 100644 --- a/content/docs/07-reference/01-ai-sdk-core/02-stream-text.mdx +++ b/content/docs/07-reference/01-ai-sdk-core/02-stream-text.mdx @@ -1799,6 +1799,30 @@ To see `streamText` in action, check out [these examples](#examples). type: 'Array', description: 'Details for all previously finished steps.', }, + { + name: 'usage', + type: 'LanguageModelUsage | undefined', + description: + 'Token usage for the current (aborted) step. undefined if the provider had not yet sent usage data before the abort.', + }, + { + name: 'totalUsage', + type: 'LanguageModelUsage | undefined', + description: + 'Aggregated token usage across all completed steps plus the current step. undefined if no usage data was received before the abort.', + }, + { + name: 'inputMessages', + type: 'ModelMessage[]', + description: + 'The input messages for the current step, including initial messages and all prior step responses. Empty array if abort fires before the first step starts.', + }, + { + name: 'partialText', + type: 'string | undefined', + description: + 'The text being streamed in the current step at abort time. undefined if no text had been generated. Resets at each step boundary.', + }, ], }, ], diff --git a/examples/ai-functions/src/stream-text/openai/abort.ts b/examples/ai-functions/src/stream-text/openai/abort.ts index 7d1fd58e9cc1..8b753ddb6edd 100644 --- a/examples/ai-functions/src/stream-text/openai/abort.ts +++ b/examples/ai-functions/src/stream-text/openai/abort.ts @@ -1,13 +1,44 @@ import { openai } from '@ai-sdk/openai'; -import { streamText } from 'ai'; +import { isStepCount, streamText, tool } from 'ai'; +import { z } from 'zod'; import { run } from '../../lib/run'; +const tools = { + add: tool({ + description: 'Add two numbers', + inputSchema: z.object({ a: z.number(), b: z.number() }), + execute: async ({ a, b }) => ({ result: a + b }), + }), + multiply: tool({ + description: 'Multiply two numbers', + inputSchema: z.object({ a: z.number(), b: z.number() }), + execute: async ({ a, b }) => ({ result: a * b }), + }), +}; + run(async () => { + let stepCount = 0; + try { const { textStream } = streamText({ - model: openai('gpt-3.5-turbo'), - prompt: 'Write a short story about a robot learning to love:\n\n', - abortSignal: AbortSignal.timeout(3000), + model: openai('gpt-4o-mini'), + tools, + stopWhen: isStepCount(5), + prompt: + 'First add 7 and 5. Then multiply the result by 6. Then write a very long detailed essay (at least 500 words) about the number you got.', + abortSignal: AbortSignal.timeout(10000), + onStepFinish(step) { + stepCount++; + console.log(`\n[Step ${stepCount} finished]`); + console.log(' Step usage:', step.usage); + }, + onAbort({ usage, totalUsage, inputMessages, partialText }) { + console.log('\n\nStream aborted mid-generation.'); + console.log('Current step usage:', usage); + console.log('Total usage:', totalUsage); + console.log('Input messages:', inputMessages); + console.log('Partial text:', partialText); + }, }); for await (const textPart of textStream) { diff --git a/packages/ai/src/generate-text/stream-text.test.ts b/packages/ai/src/generate-text/stream-text.test.ts index 87c85bd4ec52..4995db7c0d07 100644 --- a/packages/ai/src/generate-text/stream-text.test.ts +++ b/packages/ai/src/generate-text/stream-text.test.ts @@ -17052,7 +17052,16 @@ describe('streamText', () => { expect(onAbortCalls).toMatchInlineSnapshot(` [ { + "inputMessages": [ + { + "content": "test-input", + "role": "user", + }, + ], + "partialText": undefined, "steps": [], + "totalUsage": undefined, + "usage": undefined, }, ] `); @@ -17274,6 +17283,42 @@ describe('streamText', () => { expect(onAbortCalls).toMatchInlineSnapshot(` [ { + "inputMessages": [ + { + "content": "prompt", + "role": "user", + }, + { + "content": [ + { + "input": { + "value": "value", + }, + "providerExecuted": undefined, + "providerOptions": undefined, + "toolCallId": "call-1", + "toolName": "tool1", + "type": "tool-call", + }, + ], + "role": "assistant", + }, + { + "content": [ + { + "output": { + "type": "text", + "value": "result1", + }, + "toolCallId": "call-1", + "toolName": "tool1", + "type": "tool-result", + }, + ], + "role": "tool", + }, + ], + "partialText": undefined, "steps": [ DefaultStepResult { "callId": "test-telemetry-call-id", @@ -17369,6 +17414,23 @@ describe('streamText', () => { "warnings": [], }, ], + "totalUsage": { + "cachedInputTokens": undefined, + "inputTokenDetails": { + "cacheReadTokens": undefined, + "cacheWriteTokens": undefined, + "noCacheTokens": 3, + }, + "inputTokens": 3, + "outputTokenDetails": { + "reasoningTokens": undefined, + "textTokens": 10, + }, + "outputTokens": 10, + "reasoningTokens": undefined, + "totalTokens": 13, + }, + "usage": undefined, }, ] `); @@ -17601,7 +17663,47 @@ describe('streamText', () => { expect(onAbortCalls).toMatchInlineSnapshot(` [ { + "inputMessages": [ + { + "content": "prompt", + "role": "user", + }, + ], + "partialText": undefined, "steps": [], + "totalUsage": { + "cachedInputTokens": undefined, + "inputTokenDetails": { + "cacheReadTokens": undefined, + "cacheWriteTokens": undefined, + "noCacheTokens": 3, + }, + "inputTokens": 3, + "outputTokenDetails": { + "reasoningTokens": undefined, + "textTokens": 10, + }, + "outputTokens": 10, + "reasoningTokens": undefined, + "totalTokens": 13, + }, + "usage": { + "cachedInputTokens": undefined, + "inputTokenDetails": { + "cacheReadTokens": undefined, + "cacheWriteTokens": undefined, + "noCacheTokens": 3, + }, + "inputTokens": 3, + "outputTokenDetails": { + "reasoningTokens": undefined, + "textTokens": 10, + }, + "outputTokens": 10, + "raw": undefined, + "reasoningTokens": undefined, + "totalTokens": 13, + }, }, ] `); @@ -17638,6 +17740,577 @@ describe('streamText', () => { `); }); }); + + describe('onAbort usage data', () => { + it('should have undefined usage and totalUsage when abort fires before provider sends usage', async () => { + const abortController = new AbortController(); + let pullCalls = 0; + const onAbortCalls: Array = []; + + const result = streamText({ + abortSignal: abortController.signal, + onAbort: event => { + onAbortCalls.push(event); + }, + model: new MockLanguageModelV4({ + doStream: async () => ({ + stream: new ReadableStream({ + pull(controller) { + switch (pullCalls++) { + case 0: + controller.enqueue({ + type: 'stream-start', + warnings: [], + }); + break; + case 1: + controller.enqueue({ type: 'text-start', id: '1' }); + break; + case 2: + controller.enqueue({ + type: 'text-delta', + id: '1', + delta: 'Hello', + }); + break; + case 3: + // abort WITHOUT sending a finish/usage chunk + abortController.abort(); + controller.error( + new DOMException( + 'The user aborted a request.', + 'AbortError', + ), + ); + break; + } + }, + }), + }), + }), + prompt: 'test-input', + }); + + await result.consumeStream(); + + expect(onAbortCalls[0].usage).toMatchInlineSnapshot(`undefined`); + expect(onAbortCalls[0].totalUsage).toMatchInlineSnapshot(`undefined`); + }); + + it('should have real usage and totalUsage when provider sent usage before abort', async () => { + const abortController = new AbortController(); + let pullCalls = 0; + const onAbortCalls: Array = []; + + const result = streamText({ + abortSignal: abortController.signal, + onAbort: event => { + onAbortCalls.push(event); + }, + model: new MockLanguageModelV4({ + doStream: async () => ({ + stream: new ReadableStream({ + pull(controller) { + switch (pullCalls++) { + case 0: + controller.enqueue({ + type: 'stream-start', + warnings: [], + }); + break; + case 1: + controller.enqueue({ type: 'text-start', id: '1' }); + break; + case 2: + controller.enqueue({ + type: 'text-delta', + id: '1', + delta: 'Hello', + }); + break; + case 3: + // send finish (with real usage) BEFORE abort fires + controller.enqueue({ + type: 'finish', + finishReason: { unified: 'stop', raw: 'stop' }, + usage: testUsage, + }); + break; + case 4: + // now abort — usage was already recorded + abortController.abort(); + controller.error( + new DOMException( + 'The user aborted a request.', + 'AbortError', + ), + ); + break; + } + }, + }), + }), + }), + prompt: 'test-input', + }); + + await result.consumeStream(); + + expect(onAbortCalls[0].usage).toMatchInlineSnapshot(` + { + "cachedInputTokens": undefined, + "inputTokenDetails": { + "cacheReadTokens": undefined, + "cacheWriteTokens": undefined, + "noCacheTokens": 3, + }, + "inputTokens": 3, + "outputTokenDetails": { + "reasoningTokens": undefined, + "textTokens": 10, + }, + "outputTokens": 10, + "raw": undefined, + "reasoningTokens": undefined, + "totalTokens": 13, + } + `); + expect(onAbortCalls[0].totalUsage).toMatchInlineSnapshot(` + { + "cachedInputTokens": undefined, + "inputTokenDetails": { + "cacheReadTokens": undefined, + "cacheWriteTokens": undefined, + "noCacheTokens": 3, + }, + "inputTokens": 3, + "outputTokenDetails": { + "reasoningTokens": undefined, + "textTokens": 10, + }, + "outputTokens": 10, + "reasoningTokens": undefined, + "totalTokens": 13, + } + `); + }); + + it('should aggregate usage across steps on multi-step abort', async () => { + const abortController = new AbortController(); + let pullCalls = 0; + let streamCalls = 0; + const onAbortCalls: Array = []; + + const result = streamText({ + abortSignal: abortController.signal, + onAbort: event => { + onAbortCalls.push(event); + }, + model: new MockLanguageModelV4({ + doStream: async () => ({ + stream: new ReadableStream({ + start() { + streamCalls++; + pullCalls = 0; + }, + pull(controller) { + if (streamCalls === 1) { + // step 1: completes naturally with usage + switch (pullCalls++) { + case 0: + controller.enqueue({ + type: 'stream-start', + warnings: [], + }); + break; + case 1: + controller.enqueue({ + type: 'tool-call', + toolCallId: 'call-1', + toolName: 'tool1', + input: `{ "value": "value" }`, + }); + break; + case 2: + controller.enqueue({ + type: 'finish', + finishReason: { + unified: 'tool-calls', + raw: undefined, + }, + usage: testUsage, + }); + controller.close(); + break; + } + } else { + // step 2: sends usage then aborts + switch (pullCalls++) { + case 0: + controller.enqueue({ + type: 'stream-start', + warnings: [], + }); + break; + case 1: + controller.enqueue({ type: 'text-start', id: '1' }); + break; + case 2: + controller.enqueue({ + type: 'text-delta', + id: '1', + delta: 'Hello', + }); + break; + case 3: + // step 2 sends usage before abort + controller.enqueue({ + type: 'finish', + finishReason: { unified: 'stop', raw: 'stop' }, + usage: testUsage2, + }); + break; + case 4: + abortController.abort(); + controller.error( + new DOMException( + 'The user aborted a request.', + 'AbortError', + ), + ); + break; + } + } + }, + }), + }), + }), + tools: { + tool1: { + inputSchema: z.object({ value: z.string() }), + execute: async () => 'result1', + }, + }, + stopWhen: isStepCount(3), + ...defaultSettings(), + }); + + await result.consumeStream(); + + expect(onAbortCalls[0].usage).toMatchInlineSnapshot(` + { + "cachedInputTokens": 0, + "inputTokenDetails": { + "cacheReadTokens": 0, + "cacheWriteTokens": 0, + "noCacheTokens": 3, + }, + "inputTokens": 3, + "outputTokenDetails": { + "reasoningTokens": 10, + "textTokens": 10, + }, + "outputTokens": 10, + "raw": undefined, + "reasoningTokens": 10, + "totalTokens": 13, + } + `); + expect(onAbortCalls[0].totalUsage).toMatchInlineSnapshot(` + { + "cachedInputTokens": 0, + "inputTokenDetails": { + "cacheReadTokens": 0, + "cacheWriteTokens": 0, + "noCacheTokens": 6, + }, + "inputTokens": 6, + "outputTokenDetails": { + "reasoningTokens": 10, + "textTokens": 20, + }, + "outputTokens": 20, + "reasoningTokens": 10, + "totalTokens": 26, + } + `); + }); + }); + + describe('onAbort inputMessages and partialText', () => { + it('should have inputMessages and undefined partialText when abort fires before first chunk', async () => { + const abortController = new AbortController(); + const onAbortCalls: Array = []; + + const result = streamText({ + abortSignal: abortController.signal, + onAbort: event => { + onAbortCalls.push(event); + }, + model: new MockLanguageModelV4({ + doStream: async () => ({ + stream: new ReadableStream({ + start(controller) { + abortController.abort(); + controller.error( + new DOMException( + 'The user aborted a request.', + 'AbortError', + ), + ); + }, + }), + }), + }), + prompt: 'test-input', + }); + + await result.consumeStream(); + + expect(onAbortCalls[0].inputMessages).toMatchInlineSnapshot(` + [ + { + "content": "test-input", + "role": "user", + }, + ] + `); + expect(onAbortCalls[0].partialText).toMatchInlineSnapshot(`undefined`); + }); + + it('should have inputMessages and partialText when abort fires mid-text', async () => { + const abortController = new AbortController(); + let pullCalls = 0; + const onAbortCalls: Array = []; + + const result = streamText({ + abortSignal: abortController.signal, + onAbort: event => { + onAbortCalls.push(event); + }, + model: new MockLanguageModelV4({ + doStream: async () => ({ + stream: new ReadableStream({ + pull(controller) { + switch (pullCalls++) { + case 0: + controller.enqueue({ + type: 'stream-start', + warnings: [], + }); + break; + case 1: + controller.enqueue({ type: 'text-start', id: '0' }); + break; + case 2: + controller.enqueue({ + type: 'text-delta', + id: '0', + delta: 'Hello ', + }); + break; + case 3: + controller.enqueue({ + type: 'text-delta', + id: '0', + delta: 'world', + }); + break; + case 4: + abortController.abort(); + controller.error( + new DOMException( + 'The user aborted a request.', + 'AbortError', + ), + ); + break; + } + }, + }), + }), + }), + prompt: 'test-input', + }); + + await result.consumeStream(); + + expect(onAbortCalls[0].partialText).toMatchInlineSnapshot( + `"Hello world"`, + ); + expect(onAbortCalls[0].inputMessages).toMatchInlineSnapshot(` + [ + { + "content": "test-input", + "role": "user", + }, + ] + `); + }); + + it('should have undefined partialText when abort fires before any text-delta', async () => { + const abortController = new AbortController(); + let pullCalls = 0; + const onAbortCalls: Array = []; + + const result = streamText({ + abortSignal: abortController.signal, + onAbort: event => { + onAbortCalls.push(event); + }, + model: new MockLanguageModelV4({ + doStream: async () => ({ + stream: new ReadableStream({ + pull(controller) { + switch (pullCalls++) { + case 0: + controller.enqueue({ + type: 'stream-start', + warnings: [], + }); + break; + case 1: + abortController.abort(); + controller.error( + new DOMException( + 'The user aborted a request.', + 'AbortError', + ), + ); + break; + } + }, + }), + }), + }), + prompt: 'test-input', + }); + + await result.consumeStream(); + + expect(onAbortCalls[0].partialText).toMatchInlineSnapshot(`undefined`); + expect(onAbortCalls[0].inputMessages).toMatchInlineSnapshot(` + [ + { + "content": "test-input", + "role": "user", + }, + ] + `); + }); + + it('should include prior step response in inputMessages on multi-step abort', async () => { + const abortController = new AbortController(); + let pullCalls = 0; + let streamCalls = 0; + const onAbortCalls: Array = []; + + const result = streamText({ + abortSignal: abortController.signal, + onAbort: event => { + onAbortCalls.push(event); + }, + model: new MockLanguageModelV4({ + doStream: async () => ({ + stream: new ReadableStream({ + start() { + streamCalls++; + pullCalls = 0; + }, + pull(controller) { + if (streamCalls === 1) { + // step 1: completes with a tool call + switch (pullCalls++) { + case 0: + controller.enqueue({ + type: 'stream-start', + warnings: [], + }); + break; + case 1: + controller.enqueue({ + type: 'tool-call', + toolCallId: 'call-1', + toolName: 'tool1', + input: `{ "value": "value" }`, + }); + break; + case 2: + controller.enqueue({ + type: 'finish', + finishReason: { + unified: 'tool-calls', + raw: undefined, + }, + usage: testUsage, + }); + controller.close(); + break; + } + } else { + // step 2: streams partial text then aborts + switch (pullCalls++) { + case 0: + controller.enqueue({ + type: 'stream-start', + warnings: [], + }); + break; + case 1: + controller.enqueue({ type: 'text-start', id: '0' }); + break; + case 2: + controller.enqueue({ + type: 'text-delta', + id: '0', + delta: 'Partial', + }); + break; + case 3: + abortController.abort(); + controller.error( + new DOMException( + 'The user aborted a request.', + 'AbortError', + ), + ); + break; + } + } + }, + }), + }), + }), + tools: { + tool1: { + inputSchema: z.object({ value: z.string() }), + execute: async () => 'result1', + }, + }, + stopWhen: isStepCount(3), + ...defaultSettings(), + }); + + await result.consumeStream(); + + expect(onAbortCalls[0].partialText).toMatchInlineSnapshot(`undefined`); + // inputMessages for step 2 should include the initial user message + // AND the step-1 assistant response (tool call + tool result) + expect(onAbortCalls[0].inputMessages.length).toMatchInlineSnapshot(`3`); + expect(onAbortCalls[0].inputMessages[0]).toMatchInlineSnapshot(` + { + "content": "prompt", + "role": "user", + } + `); + expect(onAbortCalls[0].inputMessages[1].role).toMatchInlineSnapshot( + `"assistant"`, + ); + expect(onAbortCalls[0].inputMessages[2].role).toMatchInlineSnapshot( + `"tool"`, + ); + }); + }); }); describe('context', () => { diff --git a/packages/ai/src/generate-text/stream-text.ts b/packages/ai/src/generate-text/stream-text.ts index 878017e89d45..3b7542e82d52 100644 --- a/packages/ai/src/generate-text/stream-text.ts +++ b/packages/ai/src/generate-text/stream-text.ts @@ -9,6 +9,7 @@ import { DelayedPromise, IdGenerator, isAbortError, + ModelMessage, ProviderOptions, ToolApprovalResponse, ToolContent, @@ -202,6 +203,29 @@ export type StreamTextOnAbortCallback< * Details for all previously finished steps. */ readonly steps: StepResult[]; + /** + * Token usage for the current (aborted) step. + * undefined if the provider had not yet sent usage data before the abort. + */ + readonly usage?: LanguageModelUsage; + /** + * Aggregated token usage across all completed steps plus the current step. + * undefined if no usage data was received before the abort. + */ + readonly totalUsage?: LanguageModelUsage; + /** + * The input messages for the most recently started step: initial messages + * plus all prior completed step response messages. Empty array if abort + * fires before the first step starts. Use with your own tokenizer to + * estimate prompt token cost when provider usage is unavailable. + */ + readonly inputMessages: ModelMessage[]; + /** + * The text that was actively being streamed in the current step at abort + * time. undefined if no text was being streamed. Resets at each step + * boundary. Use with your own tokenizer to estimate completion token cost. + */ + readonly partialText?: string; }) => PromiseLike | void; /** @@ -887,9 +911,12 @@ class DefaultStreamTextResult< let recordedFinishReason: FinishReason | undefined = undefined; let recordedRawFinishReason: string | undefined = undefined; let recordedTotalUsage: LanguageModelUsage | undefined = undefined; + let currentStepUsage: LanguageModelUsage | undefined = undefined; + let completedStepsUsage: LanguageModelUsage | undefined = undefined; let recordedRequest: LanguageModelRequestMetadata = {}; let recordedWarnings: Array = []; const recordedSteps: StepResult[] = []; + let currentStepInputMessages: ModelMessage[] = []; // Track provider-executed tool calls that support deferred results // (e.g., code_execution in programmatic tool calling scenarios). @@ -905,6 +932,9 @@ class DefaultStreamTextResult< } > = {}; + // Text accumulated before eventProcessor, used by abort() to report partialText. + let pullLevelTextContent: Record = {}; + let activeReasoningContent: Record< string, { @@ -1230,7 +1260,22 @@ class DefaultStreamTextResult< async pull(controller) { // abort handling: function abort() { - onAbort?.({ steps: recordedSteps }); + const totalUsage = + currentStepUsage != null || completedStepsUsage != null + ? addLanguageModelUsage( + completedStepsUsage ?? createNullLanguageModelUsage(), + currentStepUsage ?? createNullLanguageModelUsage(), + ) + : undefined; + const partialText = + Object.values(pullLevelTextContent).join('') || undefined; + onAbort?.({ + steps: recordedSteps, + usage: currentStepUsage, + totalUsage, + inputMessages: currentStepInputMessages, + partialText, + }); controller.enqueue({ type: 'abort', // The `reason` is usually of type DOMException, but it can also be of any type, @@ -1574,6 +1619,7 @@ class DefaultStreamTextResult< stepFinish = new DelayedPromise(); const stepInputMessages = [...initialMessages, ...responseMessages]; + currentStepInputMessages = stepInputMessages; const prepareStepResult = await prepareStep?.({ model, @@ -1745,6 +1791,8 @@ class DefaultStreamTextResult< const msToFirstChunk = now() - stepStartTimestampMs; stepFirstChunk = false; + pullLevelTextContent = {}; + // Step start: controller.enqueue({ type: 'start-step', @@ -1770,15 +1818,27 @@ class DefaultStreamTextResult< const chunkType = chunk.type; switch (chunkType) { - case 'tool-approval-request': - case 'text-start': + case 'tool-approval-request': { + controller.enqueue(chunk); + break; + } + + case 'text-start': { + pullLevelTextContent[chunk.id] = ''; + controller.enqueue(chunk); + break; + } + case 'text-end': { + delete pullLevelTextContent[chunk.id]; controller.enqueue(chunk); break; } case 'text-delta': { if (chunk.text.length > 0) { + pullLevelTextContent[chunk.id] = + (pullLevelTextContent[chunk.id] ?? '') + chunk.text; controller.enqueue(chunk); } break; @@ -1836,6 +1896,7 @@ class DefaultStreamTextResult< // Note: tool executions might not be finished yet when the finish event is emitted. // store usage and finish reason for promises and onFinish callback: stepUsage = chunk.usage; + currentStepUsage = chunk.usage; stepFinishReason = chunk.finishReason; stepRawFinishReason = chunk.rawFinishReason; stepProviderMetadata = chunk.providerMetadata; @@ -1911,6 +1972,10 @@ class DefaultStreamTextResult< const combinedUsage = addLanguageModelUsage(usage, stepUsage); + // track cumulative usage of completed steps for the onAbort callback: + completedStepsUsage = combinedUsage; + currentStepUsage = undefined; + // wait for the step to be fully processed by the event processor // to ensure that the recorded steps are complete: await stepFinish.promise; diff --git a/packages/openai/src/chat/openai-chat-language-model.ts b/packages/openai/src/chat/openai-chat-language-model.ts index 031cf5f341a7..b3ce1f6998b3 100644 --- a/packages/openai/src/chat/openai-chat-language-model.ts +++ b/packages/openai/src/chat/openai-chat-language-model.ts @@ -452,6 +452,7 @@ export class OpenAIChatLanguageModel implements LanguageModelV4 { raw: undefined, }; let usage: OpenAIChatUsage | undefined = undefined; + let finishSent = false; let metadataExtracted = false; let isActiveText = false; @@ -534,6 +535,20 @@ export class OpenAIChatLanguageModel implements LanguageModelV4 { providerMetadata.openai.logprobs = choice.logprobs.content; } + // Emit finish chunk early when usage data arrives so it is available + // if the stream is aborted before it closes naturally. + // Must be before the `choice?.delta == null` guard because OpenAI + // sends usage in a chunk with empty choices (no delta). + if (value.usage != null && !finishSent) { + controller.enqueue({ + type: 'finish', + finishReason, + usage: convertOpenAIChatUsage(usage!), + ...(providerMetadata != null ? { providerMetadata } : {}), + }); + finishSent = true; + } + if (choice?.delta == null) { return; } @@ -695,12 +710,15 @@ export class OpenAIChatLanguageModel implements LanguageModelV4 { controller.enqueue({ type: 'text-end', id: '0' }); } - controller.enqueue({ - type: 'finish', - finishReason, - usage: convertOpenAIChatUsage(usage), - ...(providerMetadata != null ? { providerMetadata } : {}), - }); + if (!finishSent) { + controller.enqueue({ + type: 'finish', + finishReason, + usage: convertOpenAIChatUsage(usage), + ...(providerMetadata != null ? { providerMetadata } : {}), + }); + finishSent = true; + } }, }), ),