Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/cbor-transport-compat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/world-vercel": patch
"@workflow/world": patch
"@workflow/core": patch
---

Bump specVersion to 3 and gate CBOR queue transport on spec version. Old deployments (specVersion < 3) receive JSON queue messages; new deployments receive CBOR. Handler uses dual transport to deserialize both formats. Fixes replay/reenqueue from dashboard to older deployments.
26 changes: 15 additions & 11 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
WorkflowRunFailedError,
WorkflowWorldError,
} from '@workflow/errors';
import type { World } from '@workflow/world';
import { SPEC_VERSION_CURRENT, type World } from '@workflow/world';
import {
afterAll,
assert,
Expand Down Expand Up @@ -1544,11 +1544,13 @@ describe('e2e', () => {
headers: getProtectionBypassHeaders(),
});
expect(flowRes.status).toBe(200);
expect(flowRes.headers.get('Content-Type')).toBe('text/plain');
const flowBody = await flowRes.text();
expect(flowBody).toBe(
'Workflow SDK "/.well-known/workflow/v1/flow" endpoint is healthy'
);
expect(flowRes.headers.get('Content-Type')).toBe('application/json');
const flowBody = await flowRes.json();
expect(flowBody).toEqual({
healthy: true,
endpoint: '/.well-known/workflow/v1/flow',
specVersion: SPEC_VERSION_CURRENT,
});

// Test the step endpoint health check
const stepHealthUrl = new URL(
Expand All @@ -1560,11 +1562,13 @@ describe('e2e', () => {
headers: getProtectionBypassHeaders(),
});
expect(stepRes.status).toBe(200);
expect(stepRes.headers.get('Content-Type')).toBe('text/plain');
const stepBody = await stepRes.text();
expect(stepBody).toBe(
'Workflow SDK "/.well-known/workflow/v1/step" endpoint is healthy'
);
expect(stepRes.headers.get('Content-Type')).toBe('application/json');
const stepBody = await stepRes.json();
expect(stepBody).toEqual({
healthy: true,
endpoint: '/.well-known/workflow/v1/step',
specVersion: SPEC_VERSION_CURRENT,
});
}
);

Expand Down
10 changes: 7 additions & 3 deletions packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type {
ValidQueueName,
World,
} from '@workflow/world';
import { HealthCheckPayloadSchema } from '@workflow/world';
import { HealthCheckPayloadSchema, SPEC_VERSION_CURRENT } from '@workflow/world';
import { monotonicFactory } from 'ulid';

import { runtimeLogger } from '../logger.js';
Expand Down Expand Up @@ -96,6 +96,7 @@ export async function handleHealthCheckMessage(
healthy: true,
endpoint,
correlationId: healthCheck.correlationId,
specVersion: SPEC_VERSION_CURRENT,
timestamp: Date.now(),
});
// Use a fake runId that passes validation.
Expand Down Expand Up @@ -361,11 +362,14 @@ export function withHealthCheck(
});
}
return new Response(
`Workflow SDK "${url.pathname}" endpoint is healthy`,
JSON.stringify({
healthy: true,
specVersion: SPEC_VERSION_CURRENT,
}),
{
status: 200,
headers: {
'Content-Type': 'text/plain',
'Content-Type': 'application/json',
...HEALTH_CHECK_CORS_HEADERS,
},
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type Hook,
isLegacySpecVersion,
SPEC_VERSION_CURRENT,
SPEC_VERSION_LEGACY,
type WorkflowInvokePayload,
type WorkflowRun,
} from '@workflow/world';
Expand Down Expand Up @@ -194,6 +195,7 @@ export async function resumeHook<T = any>(
} satisfies WorkflowInvokePayload,
{
deploymentId: workflowRun.deploymentId,
specVersion: workflowRun.specVersion ?? SPEC_VERSION_LEGACY,
}
);

Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/runtime/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export async function reenqueueRun(world: World, runId: string): Promise<void> {
},
{
deploymentId: run.deploymentId,
specVersion: run.specVersion ?? SPEC_VERSION_LEGACY,
}
);
} catch (err) {
Expand Down Expand Up @@ -209,6 +210,7 @@ export async function wakeUpRun(
},
{
deploymentId: run.deploymentId,
specVersion: run.specVersion ?? SPEC_VERSION_LEGACY,
}
);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/runtime/start.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ describe('start', () => {
expect(mockQueue).toHaveBeenCalledWith(
expect.any(String),
expect.any(Object),
{ deploymentId: 'dpl_resolved_abc123' }
expect.objectContaining({ deploymentId: 'dpl_resolved_abc123' })
);
});

Expand Down
25 changes: 17 additions & 8 deletions packages/core/src/runtime/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import {
WorkflowWorldError,
} from '@workflow/errors';
import type { WorkflowInvokePayload, World } from '@workflow/world';
import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world';
import {
isLegacySpecVersion,
SPEC_VERSION_CURRENT,
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
} from '@workflow/world';
import { monotonicFactory } from 'ulid';
import { importKey } from '../encryption.js';
import { runtimeLogger } from '../logger.js';
Expand Down Expand Up @@ -220,16 +224,21 @@ export async function start<TArgs extends unknown[], TResult>(
{
runId,
traceCarrier,
runInput: {
input: workflowArguments,
deploymentId,
workflowName,
specVersion,
executionContext,
},
...(specVersion >= SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT
? {
runInput: {
input: workflowArguments,
deploymentId,
workflowName,
specVersion,
executionContext,
},
}
: {}),
} satisfies WorkflowInvokePayload,
{
deploymentId,
specVersion,
}
),
]);
Expand Down
3 changes: 2 additions & 1 deletion packages/world-postgres/test/storage.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { execSync } from 'node:child_process';
import { PostgreSqlContainer } from '@testcontainers/postgresql';
import type { Hook, Step, WorkflowRun } from '@workflow/world';
import { SPEC_VERSION_CURRENT } from '@workflow/world';
import { encode } from 'cbor-x';
import { Pool } from 'pg';
import {
Expand Down Expand Up @@ -385,7 +386,7 @@ describe('Storage (Postgres integration)', () => {
completedAt: undefined,
createdAt: expect.any(Date),
updatedAt: expect.any(Date),
specVersion: 2,
specVersion: SPEC_VERSION_CURRENT,
});
});
});
Expand Down
68 changes: 67 additions & 1 deletion packages/world-vercel/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
type QueueOptions,
type QueuePayload,
QueuePayloadSchema,
SPEC_VERSION_CURRENT,
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
ValidQueueName,
} from '@workflow/world';
import { decode, encode } from 'cbor-x';
Expand All @@ -18,6 +20,8 @@ import { type APIConfig, getHeaders, getHttpUrl } from './utils.js';
* CBOR-based queue transport. Encodes values with cbor-x on send and
* decodes on receive, preserving Uint8Array values natively (workflow
* input is a Uint8Array in specVersion >= 2).
*
* Used for specVersion >= SPEC_VERSION_CURRENT (3).
*/
class CborTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';
Expand All @@ -38,6 +42,58 @@ class CborTransport implements Transport<unknown> {
}
}

/**
* JSON-based queue transport. Used for specVersion < SPEC_VERSION_CURRENT
* to maintain compatibility with older deployments that expect JSON messages.
*/
class JsonTransport implements Transport<unknown> {
readonly contentType = 'application/json';

serialize(value: unknown): Buffer {
return Buffer.from(JSON.stringify(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
return JSON.parse(Buffer.concat(chunks).toString());
}
}

/**
* Dual transport for the queue handler. Serializes with CBOR (handler
* re-enqueues target the same new deployment) but deserializes with
* CBOR-first, falling back to JSON for messages from older deployments.
*/
class DualTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';

serialize(value: unknown): Buffer {
return Buffer.from(encode(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
const buffer = Buffer.concat(chunks);
try {
return decode(buffer);
} catch {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking observation: The CBOR-first, JSON-fallback strategy is safe because CBOR's binary encoding always starts with a type-length byte (e.g. 0xA2 for a 2-entry map) that is never valid as the first byte of a JSON string (which must start with {, [, ", digit, t, f, n, or whitespace in ASCII). So there's no ambiguity — cbor-x.decode() will throw on JSON input, and JSON.parse() will succeed.

The only theoretical concern is a corrupted message that happens to be valid CBOR but not valid JSON — but that's a data integrity issue unrelated to the dual transport.

return JSON.parse(buffer.toString());
}
}
}

const requestIdStorage = new AsyncLocalStorage<string | undefined>();

const MessageWrapper = z.object({
Expand Down Expand Up @@ -113,11 +169,13 @@ export function createQueue(config?: APIConfig): Queue {
const region = 'iad1';

const cborTransport = new CborTransport();
const jsonTransport = new JsonTransport();
const dualTransport = new DualTransport();

const clientOptions = {
region,
dispatcher: getDispatcher(),
transport: cborTransport,
transport: dualTransport,
...(usingProxy && {
// final path will be /queues-proxy/api/v3/topic/...
// and the proxy will strip the /queues-proxy prefix before forwarding to VQS
Expand All @@ -142,9 +200,17 @@ export function createQueue(config?: APIConfig): Queue {
);
}

// Select transport based on the target run's specVersion:
// CBOR for specVersion >= 3 (CBOR transport), JSON for older ones.
const useCbor =
(opts?.specVersion ?? SPEC_VERSION_CURRENT) >=
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT;
const transport = useCbor ? cborTransport : jsonTransport;

const client = new QueueClient({
...clientOptions,
deploymentId,
transport,
});

// The CborTransport handles CBOR encoding inside serialize(),
Expand Down
2 changes: 2 additions & 0 deletions packages/world/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ export {
requiresNewerWorld,
SPEC_VERSION_CURRENT,
SPEC_VERSION_LEGACY,
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
SPEC_VERSION_SUPPORTS_EVENT_SOURCING,
} from './spec-version.js';
export type * from './steps.js';
export { StepSchema, StepStatusSchema } from './steps.js';
Expand Down
2 changes: 2 additions & 0 deletions packages/world/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ export interface QueueOptions {
headers?: Record<string, string>;
/** Delay message delivery by this many seconds */
delaySeconds?: number;
/** Spec version of the target run. Used to select the queue transport format. */
specVersion?: number;
}

export interface Queue {
Expand Down
22 changes: 16 additions & 6 deletions packages/world/src/spec-version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,33 @@ export type SpecVersion = number & {
readonly [SpecVersionBrand]: typeof SpecVersionBrand;
};

/** Legacy spec version (pre-event-sourcing). Also used for runs without specVersion. */
/**
* Legacy spec version (pre-event-sourcing). Also used for runs without specVersion.
* This is the only true legacy version — specVersion 2+ all use the event-sourced model.
*/
export const SPEC_VERSION_LEGACY = 1 as SpecVersion;

/** Current spec version (event-sourced architecture). */
export const SPEC_VERSION_CURRENT = 2 as SpecVersion;
export const SPEC_VERSION_SUPPORTS_EVENT_SOURCING = 2 as SpecVersion;
export const SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT = 3 as SpecVersion;

/** Current spec version (event-sourced architecture with CBOR queue transport). */
export const SPEC_VERSION_CURRENT =
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT as SpecVersion;

/**
* Check if a spec version is legacy (< SPEC_VERSION_CURRENT or undefined).
* Check if a spec version is legacy (<= SPEC_VERSION_LEGACY or undefined).
* Legacy runs require different handling - they use direct entity mutation
* instead of the event-sourced model.
*
* Checks against SPEC_VERSION_LEGACY (1), not SPEC_VERSION_CURRENT, so that
* intermediate versions (e.g. 2) are not incorrectly treated as legacy when
* SPEC_VERSION_CURRENT is bumped.
*
* @param v - The spec version number, or undefined/null for legacy runs
* @returns true if the run is a legacy run
*/
export function isLegacySpecVersion(v: number | undefined | null): boolean {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: This is the key correctness fix. The old v < SPEC_VERSION_CURRENT would have made specVersion 2 runs legacy after the bump to 3, breaking event-sourced replay for all existing v2 runs. v <= SPEC_VERSION_LEGACY is the right semantic — only version 1 is truly legacy (direct entity mutation).

if (v === undefined || v === null) return true;
return v < SPEC_VERSION_CURRENT;
return v === undefined || v === null || v <= SPEC_VERSION_LEGACY;
}

/**
Expand Down
Loading