diff --git a/.changeset/cbor-transport-compat.md b/.changeset/cbor-transport-compat.md new file mode 100644 index 0000000000..43f5988631 --- /dev/null +++ b/.changeset/cbor-transport-compat.md @@ -0,0 +1,7 @@ +--- +"@workflow/world-vercel": patch +"@workflow/world": patch +"@workflow/core": patch +--- + +Bump specVersion to 3 and gate CBOR queue transport on spec version. Old deployments (specVersion < 3) receive JSON queue messages; new deployments receive CBOR. Handler uses dual transport to deserialize both formats. Fixes replay/reenqueue from dashboard to older deployments. diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index bb326364fb..fc1c486a88 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -6,7 +6,7 @@ import { WorkflowRunFailedError, WorkflowWorldError, } from '@workflow/errors'; -import type { World } from '@workflow/world'; +import { SPEC_VERSION_CURRENT, type World } from '@workflow/world'; import { afterAll, assert, @@ -1544,11 +1544,13 @@ describe('e2e', () => { headers: getProtectionBypassHeaders(), }); expect(flowRes.status).toBe(200); - expect(flowRes.headers.get('Content-Type')).toBe('text/plain'); - const flowBody = await flowRes.text(); - expect(flowBody).toBe( - 'Workflow SDK "/.well-known/workflow/v1/flow" endpoint is healthy' - ); + expect(flowRes.headers.get('Content-Type')).toBe('application/json'); + const flowBody = await flowRes.json(); + expect(flowBody).toEqual({ + healthy: true, + endpoint: '/.well-known/workflow/v1/flow', + specVersion: SPEC_VERSION_CURRENT, + }); // Test the step endpoint health check const stepHealthUrl = new URL( @@ -1560,11 +1562,13 @@ describe('e2e', () => { headers: getProtectionBypassHeaders(), }); expect(stepRes.status).toBe(200); - expect(stepRes.headers.get('Content-Type')).toBe('text/plain'); - const stepBody = await stepRes.text(); - expect(stepBody).toBe( - 'Workflow SDK "/.well-known/workflow/v1/step" endpoint is healthy' - ); + expect(stepRes.headers.get('Content-Type')).toBe('application/json'); + const stepBody = await stepRes.json(); + expect(stepBody).toEqual({ + healthy: true, + endpoint: '/.well-known/workflow/v1/step', + specVersion: SPEC_VERSION_CURRENT, + }); } ); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 88ed83bff1..fbabe993eb 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -4,7 +4,7 @@ import type { ValidQueueName, World, } from '@workflow/world'; -import { HealthCheckPayloadSchema } from '@workflow/world'; +import { HealthCheckPayloadSchema, SPEC_VERSION_CURRENT } from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { runtimeLogger } from '../logger.js'; @@ -54,6 +54,8 @@ export interface HealthCheckResult { error?: string; /** Latency if the health check was successful */ latencyMs?: number; + /** Spec version of the responding deployment */ + specVersion?: number; } /** @@ -96,6 +98,7 @@ export async function handleHealthCheckMessage( healthy: true, endpoint, correlationId: healthCheck.correlationId, + specVersion: SPEC_VERSION_CURRENT, timestamp: Date.now(), }); // Use a fake runId that passes validation. @@ -195,7 +198,14 @@ function parseHealthCheckResponse( return null; } - return { healthy: (response as { healthy: boolean }).healthy }; + const r = response as Record; + const parsed: { healthy: boolean; specVersion?: number } = { + healthy: r.healthy as boolean, + }; + if (typeof r.specVersion === 'number') { + parsed.specVersion = r.specVersion; + } + return parsed; } export async function healthCheck( @@ -361,11 +371,15 @@ export function withHealthCheck( }); } return new Response( - `Workflow SDK "${url.pathname}" endpoint is healthy`, + JSON.stringify({ + healthy: true, + endpoint: url.pathname, + specVersion: SPEC_VERSION_CURRENT, + }), { status: 200, headers: { - 'Content-Type': 'text/plain', + 'Content-Type': 'application/json', ...HEALTH_CHECK_CORS_HEADERS, }, } diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 16a9ac5899..f894e55a1e 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -8,6 +8,7 @@ import { type Hook, isLegacySpecVersion, SPEC_VERSION_CURRENT, + SPEC_VERSION_LEGACY, type WorkflowInvokePayload, type WorkflowRun, } from '@workflow/world'; @@ -194,6 +195,7 @@ export async function resumeHook( } satisfies WorkflowInvokePayload, { deploymentId: workflowRun.deploymentId, + specVersion: workflowRun.specVersion ?? SPEC_VERSION_LEGACY, } ); diff --git a/packages/core/src/runtime/runs.ts b/packages/core/src/runtime/runs.ts index 0c8a9ef8fd..3d6844a64b 100644 --- a/packages/core/src/runtime/runs.ts +++ b/packages/core/src/runtime/runs.ts @@ -117,6 +117,7 @@ export async function reenqueueRun(world: World, runId: string): Promise { }, { deploymentId: run.deploymentId, + specVersion: run.specVersion ?? SPEC_VERSION_LEGACY, } ); } catch (err) { @@ -209,6 +210,7 @@ export async function wakeUpRun( }, { deploymentId: run.deploymentId, + specVersion: run.specVersion ?? SPEC_VERSION_LEGACY, } ); } diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index c1e1d6eddb..f0423db343 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -280,7 +280,7 @@ describe('start', () => { expect(mockQueue).toHaveBeenCalledWith( expect.any(String), expect.any(Object), - { deploymentId: 'dpl_resolved_abc123' } + expect.objectContaining({ deploymentId: 'dpl_resolved_abc123' }) ); }); diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 59fc6db56e..ffe7d49981 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -6,7 +6,11 @@ import { WorkflowWorldError, } from '@workflow/errors'; import type { WorkflowInvokePayload, World } from '@workflow/world'; -import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world'; +import { + isLegacySpecVersion, + SPEC_VERSION_CURRENT, + SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, +} from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { importKey } from '../encryption.js'; import { runtimeLogger } from '../logger.js'; @@ -220,16 +224,21 @@ export async function start( { runId, traceCarrier, - runInput: { - input: workflowArguments, - deploymentId, - workflowName, - specVersion, - executionContext, - }, + ...(specVersion >= SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT + ? { + runInput: { + input: workflowArguments, + deploymentId, + workflowName, + specVersion, + executionContext, + }, + } + : {}), } satisfies WorkflowInvokePayload, { deploymentId, + specVersion, } ), ]); diff --git a/packages/world-postgres/test/storage.test.ts b/packages/world-postgres/test/storage.test.ts index 424c4b14c9..8903c4bb63 100644 --- a/packages/world-postgres/test/storage.test.ts +++ b/packages/world-postgres/test/storage.test.ts @@ -1,6 +1,7 @@ import { execSync } from 'node:child_process'; import { PostgreSqlContainer } from '@testcontainers/postgresql'; import type { Hook, Step, WorkflowRun } from '@workflow/world'; +import { SPEC_VERSION_CURRENT } from '@workflow/world'; import { encode } from 'cbor-x'; import { Pool } from 'pg'; import { @@ -385,7 +386,7 @@ describe('Storage (Postgres integration)', () => { completedAt: undefined, createdAt: expect.any(Date), updatedAt: expect.any(Date), - specVersion: 2, + specVersion: SPEC_VERSION_CURRENT, }); }); }); diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index d11e2e0f7e..530062f677 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -7,6 +7,8 @@ import { type QueueOptions, type QueuePayload, QueuePayloadSchema, + SPEC_VERSION_CURRENT, + SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, ValidQueueName, } from '@workflow/world'; import { decode, encode } from 'cbor-x'; @@ -18,6 +20,8 @@ import { type APIConfig, getHeaders, getHttpUrl } from './utils.js'; * CBOR-based queue transport. Encodes values with cbor-x on send and * decodes on receive, preserving Uint8Array values natively (workflow * input is a Uint8Array in specVersion >= 2). + * + * Used for specVersion >= SPEC_VERSION_CURRENT (3). */ class CborTransport implements Transport { readonly contentType = 'application/cbor'; @@ -38,6 +42,58 @@ class CborTransport implements Transport { } } +/** + * JSON-based queue transport. Used for specVersion < SPEC_VERSION_CURRENT + * to maintain compatibility with older deployments that expect JSON messages. + */ +class JsonTransport implements Transport { + readonly contentType = 'application/json'; + + serialize(value: unknown): Buffer { + return Buffer.from(JSON.stringify(value)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + return JSON.parse(Buffer.concat(chunks).toString()); + } +} + +/** + * Dual transport for the queue handler. Serializes with CBOR (handler + * re-enqueues target the same new deployment) but deserializes with + * CBOR-first, falling back to JSON for messages from older deployments. + */ +class DualTransport implements Transport { + readonly contentType = 'application/cbor'; + + serialize(value: unknown): Buffer { + return Buffer.from(encode(value)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + const buffer = Buffer.concat(chunks); + try { + return decode(buffer); + } catch { + return JSON.parse(buffer.toString()); + } + } +} + const requestIdStorage = new AsyncLocalStorage(); const MessageWrapper = z.object({ @@ -113,11 +169,13 @@ export function createQueue(config?: APIConfig): Queue { const region = 'iad1'; const cborTransport = new CborTransport(); + const jsonTransport = new JsonTransport(); + const dualTransport = new DualTransport(); const clientOptions = { region, dispatcher: getDispatcher(), - transport: cborTransport, + transport: dualTransport, ...(usingProxy && { // final path will be /queues-proxy/api/v3/topic/... // and the proxy will strip the /queues-proxy prefix before forwarding to VQS @@ -142,9 +200,17 @@ export function createQueue(config?: APIConfig): Queue { ); } + // Select transport based on the target run's specVersion: + // CBOR for specVersion >= 3 (CBOR transport), JSON for older ones. + const useCbor = + (opts?.specVersion ?? SPEC_VERSION_CURRENT) >= + SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT; + const transport = useCbor ? cborTransport : jsonTransport; + const client = new QueueClient({ ...clientOptions, deploymentId, + transport, }); // The CborTransport handles CBOR encoding inside serialize(), diff --git a/packages/world/src/index.ts b/packages/world/src/index.ts index e3a574e3bf..06ce262df3 100644 --- a/packages/world/src/index.ts +++ b/packages/world/src/index.ts @@ -50,6 +50,8 @@ export { requiresNewerWorld, SPEC_VERSION_CURRENT, SPEC_VERSION_LEGACY, + SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, + SPEC_VERSION_SUPPORTS_EVENT_SOURCING, } from './spec-version.js'; export type * from './steps.js'; export { StepSchema, StepStatusSchema } from './steps.js'; diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 59f467b463..409d00dd0c 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -81,6 +81,8 @@ export interface QueueOptions { headers?: Record; /** Delay message delivery by this many seconds */ delaySeconds?: number; + /** Spec version of the target run. Used to select the queue transport format. */ + specVersion?: number; } export interface Queue { diff --git a/packages/world/src/spec-version.ts b/packages/world/src/spec-version.ts index 50c5b28d72..56e9755fee 100644 --- a/packages/world/src/spec-version.ts +++ b/packages/world/src/spec-version.ts @@ -15,23 +15,33 @@ export type SpecVersion = number & { readonly [SpecVersionBrand]: typeof SpecVersionBrand; }; -/** Legacy spec version (pre-event-sourcing). Also used for runs without specVersion. */ +/** + * Legacy spec version (pre-event-sourcing). Also used for runs without specVersion. + * This is the only true legacy version — specVersion 2+ all use the event-sourced model. + */ export const SPEC_VERSION_LEGACY = 1 as SpecVersion; -/** Current spec version (event-sourced architecture). */ -export const SPEC_VERSION_CURRENT = 2 as SpecVersion; +export const SPEC_VERSION_SUPPORTS_EVENT_SOURCING = 2 as SpecVersion; +export const SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT = 3 as SpecVersion; + +/** Current spec version (event-sourced architecture with CBOR queue transport). */ +export const SPEC_VERSION_CURRENT = + SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT as SpecVersion; /** - * Check if a spec version is legacy (< SPEC_VERSION_CURRENT or undefined). + * Check if a spec version is legacy (<= SPEC_VERSION_LEGACY or undefined). * Legacy runs require different handling - they use direct entity mutation * instead of the event-sourced model. * + * Checks against SPEC_VERSION_LEGACY (1), not SPEC_VERSION_CURRENT, so that + * intermediate versions (e.g. 2) are not incorrectly treated as legacy when + * SPEC_VERSION_CURRENT is bumped. + * * @param v - The spec version number, or undefined/null for legacy runs * @returns true if the run is a legacy run */ export function isLegacySpecVersion(v: number | undefined | null): boolean { - if (v === undefined || v === null) return true; - return v < SPEC_VERSION_CURRENT; + return v === undefined || v === null || v <= SPEC_VERSION_LEGACY; } /**