diff --git a/.changeset/strong-guests-study.md b/.changeset/strong-guests-study.md new file mode 100644 index 0000000000..67e7dfa815 --- /dev/null +++ b/.changeset/strong-guests-study.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-vercel': patch +--- + +Update Vercel queue max delay for longer sleeps without having to re-enqueue as frequently diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index b9097ef1f2..64e150eb29 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -44,7 +44,7 @@ vi.mock('./utils.js', () => ({ getHeaders: vi.fn().mockReturnValue(new Map()), })); -import { createQueue } from './queue.js'; +import { createQueue, MAX_DELAY_SECONDS } from './queue.js'; describe('createQueue', () => { beforeEach(() => { @@ -404,7 +404,7 @@ describe('createQueue', () => { } }); - it('should clamp delaySeconds to max 23 hours for long sleeps', async () => { + it('should clamp delaySeconds to 1 hour less than 7 days for long sleeps', async () => { mockSend.mockResolvedValue({ messageId: 'new-msg-123' }); let capturedHandler: ( @@ -422,7 +422,7 @@ describe('createQueue', () => { try { const queue = createQueue(); queue.createQueueHandler('__wkf_workflow_', async () => ({ - timeoutSeconds: 100000, + timeoutSeconds: 700000, })); await capturedHandler!( @@ -443,7 +443,7 @@ describe('createQueue', () => { expect(mockSend).toHaveBeenCalledTimes(1); // send(topicName, payload, options) const sendOpts = mockSend.mock.calls[0][2]; - expect(sendOpts.delaySeconds).toBe(82800); // MAX_DELAY_SECONDS + expect(sendOpts.delaySeconds).toBe(MAX_DELAY_SECONDS); } finally { if (originalEnv !== undefined) { process.env.VERCEL_DEPLOYMENT_ID = originalEnv; @@ -453,6 +453,116 @@ describe('createQueue', () => { } }); + it('should fall back to the default max delay when the env override is invalid', async () => { + mockSend.mockResolvedValue({ messageId: 'new-msg-123' }); + + let capturedHandler: ( + message: unknown, + metadata: unknown + ) => Promise; + mockHandleCallback.mockImplementation((handler) => { + capturedHandler = handler; + return async () => new Response('ok'); + }); + + const originalDeploymentId = process.env.VERCEL_DEPLOYMENT_ID; + const originalMaxDelay = process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test'; + process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS = 'not-a-number'; + + try { + const queue = createQueue(); + queue.createQueueHandler('__wkf_workflow_', async () => ({ + timeoutSeconds: 700000, + })); + + await capturedHandler!( + { + payload: { runId: 'run-123' }, + queueName: '__wkf_workflow_test', + deploymentId: 'dpl_original', + }, + { + messageId: 'msg-123', + deliveryCount: 1, + createdAt: new Date(), + topicName: '__wkf_workflow_test', + consumerGroup: 'test', + } + ); + + const sendOpts = mockSend.mock.calls[0][2]; + expect(sendOpts.delaySeconds).toBe(MAX_DELAY_SECONDS); + } finally { + if (originalDeploymentId !== undefined) { + process.env.VERCEL_DEPLOYMENT_ID = originalDeploymentId; + } else { + delete process.env.VERCEL_DEPLOYMENT_ID; + } + + if (originalMaxDelay !== undefined) { + process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS = originalMaxDelay; + } else { + delete process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + } + } + }); + + it('should clamp oversized env overrides to the default max delay', async () => { + mockSend.mockResolvedValue({ messageId: 'new-msg-123' }); + + let capturedHandler: ( + message: unknown, + metadata: unknown + ) => Promise; + mockHandleCallback.mockImplementation((handler) => { + capturedHandler = handler; + return async () => new Response('ok'); + }); + + const originalDeploymentId = process.env.VERCEL_DEPLOYMENT_ID; + const originalMaxDelay = process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test'; + process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS = `${MAX_DELAY_SECONDS + 1}`; + + try { + const queue = createQueue(); + queue.createQueueHandler('__wkf_workflow_', async () => ({ + timeoutSeconds: 700000, + })); + + await capturedHandler!( + { + payload: { runId: 'run-123' }, + queueName: '__wkf_workflow_test', + deploymentId: 'dpl_original', + }, + { + messageId: 'msg-123', + deliveryCount: 1, + createdAt: new Date(), + topicName: '__wkf_workflow_test', + consumerGroup: 'test', + } + ); + + const sendOpts = mockSend.mock.calls[0][2]; + expect(sendOpts.delaySeconds).toBe(MAX_DELAY_SECONDS); + } finally { + if (originalDeploymentId !== undefined) { + process.env.VERCEL_DEPLOYMENT_ID = originalDeploymentId; + } else { + delete process.env.VERCEL_DEPLOYMENT_ID; + } + + if (originalMaxDelay !== undefined) { + process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS = originalMaxDelay; + } else { + delete process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + } + } + }); + it('should send new message without delaySeconds when handler returns timeoutSeconds: 0', async () => { mockSend.mockResolvedValue({ messageId: 'new-msg-123' }); diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 7e82527b01..91bd986058 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -32,12 +32,12 @@ const MessageWrapper = z.object({ * rather than using visibility timeouts on the same message. * * Benefits of this approach: - * - Fresh 24-hour lifetime with each message (no message age tracking needed) + * - Fresh delay window with each message (no message age tracking needed) * - Messages fire at the scheduled time (no short-circuit + recheck pattern) * - Simpler conceptual model: messages are triggers with delivery schedules * - * For sleeps > 24 hours (max delay), we use chaining: - * 1. Schedule message with max delay (~23h, leaving buffer) + * For sleeps > 7 days (max delay), we use chaining: + * 1. Schedule message with max delay (~6d, leaving a 24h re-enqueue margin) * 2. When it fires, workflow checks if sleep is complete * 3. If not, another delayed message is queued for remaining time * 4. Process repeats until the full sleep duration has elapsed @@ -48,9 +48,28 @@ const MessageWrapper = z.object({ * * These constants can be overridden via environment variables for testing. */ -const MAX_DELAY_SECONDS = Number( - process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS || 82800 // 23 hours - leave 1h buffer before 24h retention limit -); +const SECONDS_PER_HOUR = 60 * 60; +export const RE_ENQUEUE_MARGIN_SECONDS = 24 * SECONDS_PER_HOUR; // 24 hours +export const MAX_QUEUE_DELAY_WINDOW_SECONDS = 7 * 24 * SECONDS_PER_HOUR; // 7 days +export const MAX_DELAY_SECONDS = + MAX_QUEUE_DELAY_WINDOW_SECONDS - RE_ENQUEUE_MARGIN_SECONDS; + +function getMaxDelaySeconds(): number { + const rawMaxDelaySeconds = process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + if ( + rawMaxDelaySeconds === undefined || + rawMaxDelaySeconds.trim().length === 0 + ) { + return MAX_DELAY_SECONDS; + } + + const parsedMaxDelaySeconds = Number(rawMaxDelaySeconds); + if (!Number.isFinite(parsedMaxDelaySeconds) || parsedMaxDelaySeconds < 0) { + return MAX_DELAY_SECONDS; + } + + return Math.min(Math.floor(parsedMaxDelaySeconds), MAX_DELAY_SECONDS); +} /** * Extract known identifiers from a queue payload and return them as VQS headers. @@ -191,11 +210,12 @@ export function createQueue(config?: APIConfig): Queue { if (typeof result?.timeoutSeconds === 'number') { // When timeoutSeconds is 0, skip delaySeconds entirely for immediate re-enqueue. - // Otherwise, clamp to max delay (23h) - for longer sleeps, the workflow will chain - // multiple delayed messages until the full sleep duration has elapsed. + // Otherwise, clamp to the queue delay window minus a 24h buffer (6d). + // For longer sleeps, the workflow will chain multiple delayed messages until + // the full sleep duration has elapsed. const delaySeconds = result.timeoutSeconds > 0 - ? Math.min(result.timeoutSeconds, MAX_DELAY_SECONDS) + ? Math.min(result.timeoutSeconds, getMaxDelaySeconds()) : undefined; // Send new message BEFORE acknowledging current message. diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 47678829b6..16a6c4cd03 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -22,8 +22,8 @@ catalogs: specifier: 3.2.0 version: 3.2.0 '@vercel/queue': - specifier: 0.1.4 - version: 0.1.4 + specifier: 0.1.6 + version: 0.1.6 '@vitest/coverage-v8': specifier: ^4.0.18 version: 4.0.18 @@ -1306,7 +1306,7 @@ importers: dependencies: '@vercel/queue': specifier: 'catalog:' - version: 0.1.4 + version: 0.1.6 '@workflow/errors': specifier: workspace:* version: link:../errors @@ -1352,7 +1352,7 @@ importers: dependencies: '@vercel/queue': specifier: 'catalog:' - version: 0.1.4 + version: 0.1.6 '@workflow/errors': specifier: workspace:* version: link:../errors @@ -1459,7 +1459,7 @@ importers: version: 3.2.0 '@vercel/queue': specifier: 'catalog:' - version: 0.1.4 + version: 0.1.6 '@workflow/errors': specifier: workspace:* version: link:../errors @@ -8109,8 +8109,8 @@ packages: '@opentelemetry/sdk-metrics': '>=1.19.0 <2.0.0' '@opentelemetry/sdk-trace-base': '>=1.19.0 <2.0.0' - '@vercel/queue@0.1.4': - resolution: {integrity: sha512-wo+jCycmCX078vQSbkX+RcLvySONDCK0f9aQp5UMKQD1+B+xKt3YVbIYbZukvoHQpbm5nnk6If+ADSeK/PmCgQ==} + '@vercel/queue@0.1.6': + resolution: {integrity: sha512-FUQ0ySYNm31ZO709lg6a2NPamzH5LpfU9QZwZVduxnOH0N/aNp+8rjKmYLDWQpdA/S+ihNexJV+NhbV3GFaumQ==} engines: {node: '>=20.0.0'} '@vercel/routing-utils@5.3.0': @@ -22614,7 +22614,7 @@ snapshots: '@opentelemetry/sdk-metrics': 1.30.1(@opentelemetry/api@1.9.0) '@opentelemetry/sdk-trace-base': 1.30.1(@opentelemetry/api@1.9.0) - '@vercel/queue@0.1.4': + '@vercel/queue@0.1.6': dependencies: '@vercel/oidc': 3.2.0 minimatch: 10.2.4 diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 667f4878f3..12c5396e6a 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -11,7 +11,7 @@ catalog: "@types/node": 22.19.0 "@vercel/functions": ^3.4.3 "@vercel/oidc": 3.2.0 - "@vercel/queue": 0.1.4 + "@vercel/queue": 0.1.6 "@vitest/coverage-v8": ^4.0.18 ai: 6.0.116 esbuild: ^0.27.3