Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ export class CompactionService {
spanText: string
): Promise<string> {
const prompt = this.buildSummaryPrompt(previousSummary, spanText)
await this.llmProviderPresenter.executeWithRateLimit(model.providerId)
const response = await this.llmProviderPresenter.generateText(
model.providerId,
prompt,
Expand Down
73 changes: 72 additions & 1 deletion src/main/presenter/deepchatAgentPresenter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ import type {
} from '@shared/types/agent-interface'
import type { MCPToolCall, MCPToolResponse } from '@shared/types/core/mcp'
import type { ChatMessage } from '@shared/types/core/chat-message'
import type { IConfigPresenter, ILlmProviderPresenter, ModelConfig } from '@shared/presenter'
import type {
IConfigPresenter,
ILlmProviderPresenter,
ModelConfig,
RateLimitQueueSnapshot
} from '@shared/presenter'
import type { MCPToolDefinition } from '@shared/types/core/mcp'
import type { IToolPresenter } from '@shared/types/presenters/tool.presenter'
import type { ReasoningPortrait } from '@shared/types/model-db'
Expand Down Expand Up @@ -114,6 +119,8 @@ const isReasoningEffort = (value: unknown): value is 'minimal' | 'low' | 'medium
const isVerbosity = (value: unknown): value is 'low' | 'medium' | 'high' =>
value === 'low' || value === 'medium' || value === 'high'

const RATE_LIMIT_STREAM_MESSAGE_PREFIX = '__rate_limit__:'

const createAbortError = (): Error => {
if (typeof DOMException !== 'undefined') {
return new DOMException('Aborted', 'AbortError')
Expand Down Expand Up @@ -1342,6 +1349,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
}

const traceEnabled = this.configPresenter.getSetting<boolean>('traceDebugEnabled') === true
const llmProviderPresenter = this.llmProviderPresenter
const pendingInputCoordinator = this.pendingInputCoordinator
const injectSteerInputsIntoRequest = this.injectSteerInputsIntoRequest.bind(this)
const persistMessageTrace = this.persistMessageTrace.bind(this)
Expand Down Expand Up @@ -1374,6 +1382,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation {

const abortController = new AbortController()
const activeGeneration = this.registerActiveGeneration(sessionId, messageId, abortController)
const rateLimitMessageId = this.buildRateLimitStreamMessageId(activeGeneration.runId)
const emitRateLimitWaitingMessage = this.emitRateLimitWaitingMessage.bind(this)
const clearRateLimitWaitingMessage = this.clearRateLimitWaitingMessage.bind(this)

try {
this.dispatchHook('SessionStart', {
Expand Down Expand Up @@ -1407,8 +1418,21 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
)

let didConsumeSteerBatch = false
let queuedForRateLimit = false

try {
await llmProviderPresenter.executeWithRateLimit(state.providerId, {
signal: abortController.signal,
onQueued: (snapshot) => {
queuedForRateLimit = true
emitRateLimitWaitingMessage(sessionId, rateLimitMessageId, snapshot)
}
})
if (queuedForRateLimit) {
clearRateLimitWaitingMessage(sessionId, rateLimitMessageId)
queuedForRateLimit = false
}
Comment on lines +1484 to +1494
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Re-check cancellation after the rate-limit wait.

After executeWithRateLimit(...) resolves, this path goes straight into provider.coreStream(...). If the user cancels in that handoff window, one more provider request can still start because the signal is not checked again here.

💡 Suggested fix
             await llmProviderPresenter.executeWithRateLimit(state.providerId, {
               signal: abortController.signal,
               onQueued: (snapshot) => {
                 queuedForRateLimit = true
                 emitRateLimitWaitingMessage(sessionId, rateLimitMessageId, snapshot)
               }
             })
+            this.throwIfAbortRequested(abortController.signal)
             if (queuedForRateLimit) {
               clearRateLimitWaitingMessage(sessionId, rateLimitMessageId)
               queuedForRateLimit = false
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/presenter/deepchatAgentPresenter/index.ts` around lines 1484 - 1494,
After executeWithRateLimit(...) completes and before calling
provider.coreStream(...), re-check the abortController.signal to avoid starting
a provider request if the user cancelled during the rate-limit wait: if
abortController.signal.aborted then clear the rate-limit waiting UI
(clearRateLimitWaitingMessage(sessionId, rateLimitMessageId) if
queuedForRateLimit) and abort the flow (throw or return) instead of proceeding
to provider.coreStream; ensure the same abortController.signal is passed into
provider.coreStream when allowed, and keep handling of queuedForRateLimit and
emitRateLimitWaitingMessage(sessionId, rateLimitMessageId) consistent.


for await (const event of provider.coreStream(
injectedMessages,
requestModelId,
Expand All @@ -1428,6 +1452,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
pendingInputCoordinator.consumeClaimedSteerBatch(sessionId)
}
} catch (error) {
if (queuedForRateLimit) {
clearRateLimitWaitingMessage(sessionId, rateLimitMessageId)
}
if (!didConsumeSteerBatch && claimedSteerBatch.length > 0) {
pendingInputCoordinator.releaseClaimedInputs(sessionId)
}
Expand Down Expand Up @@ -1669,6 +1696,47 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
return this.activeGenerations.get(sessionId)?.runId === runId
}

private buildRateLimitStreamMessageId(runId: string): string {
return `${RATE_LIMIT_STREAM_MESSAGE_PREFIX}${runId}`
}

private emitRateLimitWaitingMessage(
sessionId: string,
messageId: string,
snapshot: RateLimitQueueSnapshot
): void {
const block: AssistantMessageBlock = {
type: 'action',
action_type: 'rate_limit',
content: '',
status: 'pending',
timestamp: Date.now(),
extra: {
providerId: snapshot.providerId,
qpsLimit: snapshot.qpsLimit,
currentQps: snapshot.currentQps,
queueLength: snapshot.queueLength,
estimatedWaitTime: snapshot.estimatedWaitTime
}
}

eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, {
conversationId: sessionId,
eventId: messageId,
messageId,
blocks: [block]
})
}

private clearRateLimitWaitingMessage(sessionId: string, messageId: string): void {
eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, {
conversationId: sessionId,
eventId: messageId,
messageId,
blocks: []
})
}

private applyProcessResultStatus(
sessionId: string,
result: ProcessResult | null | undefined,
Expand Down Expand Up @@ -3207,6 +3275,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
visionModel.modelId,
visionModel.providerId
)
await this.llmProviderPresenter.executeWithRateLimit(visionModel.providerId, {
signal: abortSignal
})
const response = await this.llmProviderPresenter.generateCompletionStandalone(
visionModel.providerId,
messages,
Expand Down
13 changes: 13 additions & 0 deletions src/main/presenter/deepchatAgentPresenter/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ const CONTEXT_WINDOW_ERROR_PATTERNS = [
const USER_CANCELED_GENERATION_ERROR = 'common.error.userCanceledGeneration'
const NO_MODEL_RESPONSE_ERROR = 'common.error.noModelResponse'

function isAbortError(error: unknown): boolean {
return error instanceof Error && (error.name === 'AbortError' || error.name === 'CanceledError')
}

function isContextWindowErrorMessage(message: string): boolean {
const normalized = message.toLowerCase()
return CONTEXT_WINDOW_ERROR_PATTERNS.some((pattern) => normalized.includes(pattern))
Expand Down Expand Up @@ -268,6 +272,15 @@ export async function processStream(params: ProcessParams): Promise<ProcessResul
usage: buildUsageSnapshot(state)
}
} catch (err) {
if (io.abortSignal.aborted || isAbortError(err)) {
console.log(`[ProcessStream] aborted via exception after ${eventCount} events`)
return {
status: 'aborted' as const,
stopReason: 'user_stop',
errorMessage: USER_CANCELED_GENERATION_ERROR,
usage: buildUsageSnapshot(state)
}
}
console.error(`[ProcessStream] exception after ${eventCount} events:`, err)
finalizeError(state, io, err)
return {
Expand Down
2 changes: 2 additions & 0 deletions src/main/presenter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ export class Presenter implements IPresenter {
this.filePresenter.prepareFileCompletely(absPath, typeInfo, contentType)
}),
getLlmProviderPresenter: () => ({
executeWithRateLimit: (providerId, options) =>
this.llmproviderPresenter.executeWithRateLimit(providerId, options),
generateCompletionStandalone: (providerId, messages, modelId, temperature, maxTokens) =>
this.llmproviderPresenter.generateCompletionStandalone(
providerId,
Expand Down
11 changes: 11 additions & 0 deletions src/main/presenter/llmProviderPresenter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
IConfigPresenter,
ISQLitePresenter,
AcpConfigState,
RateLimitQueueSnapshot,
AcpWorkdirInfo,
AcpDebugRequest,
AcpDebugRunResult
Expand Down Expand Up @@ -203,6 +204,16 @@ export class LLMProviderPresenter implements ILlmProviderPresenter {
return this.rateLimitManager.getAllProviderRateLimitStatus()
}

async executeWithRateLimit(
providerId: string,
options?: {
signal?: AbortSignal
onQueued?: (snapshot: RateLimitQueueSnapshot) => void
}
): Promise<void> {
await this.rateLimitManager.executeWithRateLimit(providerId, options)
}

isGenerating(eventId: string): boolean {
return this.activeStreams.has(eventId)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import { RATE_LIMIT_EVENTS } from '@/events'
import { eventBus, SendTarget } from '@/eventbus'
import { IConfigPresenter, LLM_PROVIDER } from '@shared/presenter'
import { ProviderRateLimitState, QueueItem, RateLimitConfig } from '../types'
import {
ExecuteWithRateLimitOptions,
ProviderRateLimitState,
QueueItem,
RateLimitConfig,
RateLimitQueueSnapshot
} from '../types'

const createAbortError = (): Error => {
if (typeof DOMException !== 'undefined') {
return new DOMException('Aborted', 'AbortError')
}

const error = new Error('Aborted')
error.name = 'AbortError'
return error
}

export class RateLimitManager {
private readonly providerRateLimitStates: Map<string, ProviderRateLimitState> = new Map()
Expand Down Expand Up @@ -97,8 +113,14 @@ export class RateLimitManager {
return status
}

async executeWithRateLimit(providerId: string): Promise<void> {
async executeWithRateLimit(
providerId: string,
options?: ExecuteWithRateLimitOptions
): Promise<void> {
const state = this.getOrCreateRateLimitState(providerId)
if (options?.signal?.aborted) {
throw createAbortError()
}
if (!state.config.enabled) {
this.recordRequest(providerId)
return Promise.resolve()
Expand All @@ -108,14 +130,27 @@ export class RateLimitManager {
return Promise.resolve()
}
return new Promise<void>((resolve, reject) => {
let settled = false
let abortCleanup: (() => void) | null = null
const settle = (callback: () => void) => {
if (settled) {
return
}
settled = true
abortCleanup?.()
abortCleanup = null
callback()
}

const queueItem: QueueItem = {
id: `${providerId}-${Date.now()}-${Math.random()}`,
timestamp: Date.now(),
resolve,
reject
resolve: () => settle(resolve),
reject: (error) => settle(() => reject(error))
}

state.queue.push(queueItem)
const snapshot = this.buildQueueSnapshot(providerId, state)
console.log(
`[RateLimitManager] Request queued for ${providerId}, queue length: ${state.queue.length}`
)
Expand All @@ -124,6 +159,29 @@ export class RateLimitManager {
queueLength: state.queue.length,
requestId: queueItem.id
})
try {
options?.onQueued?.(snapshot)
} catch (error) {
console.warn(`[RateLimitManager] onQueued callback failed for ${providerId}:`, error)
}

const signal = options?.signal
if (signal) {
const onAbort = () => {
const removed = this.removeQueueItem(providerId, queueItem.id)
if (removed) {
console.log(`[RateLimitManager] Request aborted while queued for ${providerId}`)
}
queueItem.reject(createAbortError())
}
signal.addEventListener('abort', onAbort, { once: true })
abortCleanup = () => signal.removeEventListener('abort', onAbort)
if (signal.aborted) {
onAbort()
return
}
}

this.processRateLimitQueue(providerId)
})
}
Expand Down Expand Up @@ -281,6 +339,39 @@ export class RateLimitManager {
return state?.queue.length || 0
}

private removeQueueItem(providerId: string, queueItemId: string): boolean {
const state = this.providerRateLimitStates.get(providerId)
if (!state) {
return false
}

const index = state.queue.findIndex((item) => item.id === queueItemId)
if (index === -1) {
return false
}

state.queue.splice(index, 1)
return true
}

private buildQueueSnapshot(
providerId: string,
state: ProviderRateLimitState
): RateLimitQueueSnapshot {
const intervalMs = (1 / state.config.qpsLimit) * 1000
const nextAllowedTime = state.lastRequestTime + intervalMs
const baseWaitTime = Math.max(0, nextAllowedTime - Date.now())
const additionalQueuedIntervals = Math.max(0, state.queue.length - 1) * intervalMs

return {
providerId,
qpsLimit: state.config.qpsLimit,
currentQps: this.getCurrentQps(providerId),
queueLength: state.queue.length,
estimatedWaitTime: Math.max(0, baseWaitTime + additionalQueuedIntervals)
}
}

private getLastRequestTime(providerId: string): number {
const state = this.providerRateLimitStates.get(providerId)
return state?.lastRequestTime || 0
Expand Down
13 changes: 13 additions & 0 deletions src/main/presenter/llmProviderPresenter/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ export interface RateLimitConfig {
enabled: boolean
}

export interface RateLimitQueueSnapshot {
providerId: string
qpsLimit: number
currentQps: number
queueLength: number
estimatedWaitTime: number
}

export interface ExecuteWithRateLimitOptions {
signal?: AbortSignal
onQueued?: (snapshot: RateLimitQueueSnapshot) => void
}

export interface QueueItem {
id: string
timestamp: number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,7 @@ export class AgentToolManager {
visionTarget.modelId,
visionTarget.providerId
)
await this.getLlmProviderPresenter().executeWithRateLimit(visionTarget.providerId)
const response = await this.getLlmProviderPresenter().generateCompletionStandalone(
visionTarget.providerId,
messages,
Expand Down
5 changes: 4 additions & 1 deletion src/main/presenter/toolPresenter/runtimePorts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ export interface AgentToolRuntimePort {
getSkillPresenter(): ISkillPresenter
getYoBrowserToolHandler(): IYoBrowserPresenter['toolHandler']
getFilePresenter(): Pick<IFilePresenter, 'getMimeType' | 'prepareFileCompletely'>
getLlmProviderPresenter(): Pick<ILlmProviderPresenter, 'generateCompletionStandalone'>
getLlmProviderPresenter(): Pick<
ILlmProviderPresenter,
'executeWithRateLimit' | 'generateCompletionStandalone'
>
createSettingsWindow(): ReturnType<IWindowPresenter['createSettingsWindow']>
sendToWindow(
windowId: number,
Expand Down
Loading
Loading