Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/cbor-transport-compat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/world-vercel": patch
"@workflow/world": patch
"@workflow/core": patch
---

Bump specVersion to 3 and gate CBOR queue transport on spec version. Old deployments (specVersion < 3) receive JSON queue messages; new deployments receive CBOR. Handler uses dual transport to deserialize both formats. Fixes replay/reenqueue from dashboard to older deployments.
26 changes: 15 additions & 11 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
WorkflowRunFailedError,
WorkflowWorldError,
} from '@workflow/errors';
import type { World } from '@workflow/world';
import { SPEC_VERSION_CURRENT, type World } from '@workflow/world';
import {
afterAll,
assert,
Expand Down Expand Up @@ -1544,11 +1544,13 @@ describe('e2e', () => {
headers: getProtectionBypassHeaders(),
});
expect(flowRes.status).toBe(200);
expect(flowRes.headers.get('Content-Type')).toBe('text/plain');
const flowBody = await flowRes.text();
expect(flowBody).toBe(
'Workflow SDK "/.well-known/workflow/v1/flow" endpoint is healthy'
);
expect(flowRes.headers.get('Content-Type')).toBe('application/json');
const flowBody = await flowRes.json();
expect(flowBody).toEqual({
healthy: true,
Comment thread
vercel[bot] marked this conversation as resolved.
endpoint: '/.well-known/workflow/v1/flow',
specVersion: SPEC_VERSION_CURRENT,
});

// Test the step endpoint health check
const stepHealthUrl = new URL(
Expand All @@ -1560,11 +1562,13 @@ describe('e2e', () => {
headers: getProtectionBypassHeaders(),
});
expect(stepRes.status).toBe(200);
expect(stepRes.headers.get('Content-Type')).toBe('text/plain');
const stepBody = await stepRes.text();
expect(stepBody).toBe(
'Workflow SDK "/.well-known/workflow/v1/step" endpoint is healthy'
);
expect(stepRes.headers.get('Content-Type')).toBe('application/json');
const stepBody = await stepRes.json();
expect(stepBody).toEqual({
healthy: true,
endpoint: '/.well-known/workflow/v1/step',
specVersion: SPEC_VERSION_CURRENT,
});
}
);

Expand Down
22 changes: 18 additions & 4 deletions packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type {
ValidQueueName,
World,
} from '@workflow/world';
import { HealthCheckPayloadSchema } from '@workflow/world';
import { HealthCheckPayloadSchema, SPEC_VERSION_CURRENT } from '@workflow/world';
import { monotonicFactory } from 'ulid';

import { runtimeLogger } from '../logger.js';
Expand Down Expand Up @@ -54,6 +54,8 @@ export interface HealthCheckResult {
error?: string;
/** Latency if the health check was successful */
latencyMs?: number;
/** Spec version of the responding deployment */
specVersion?: number;
}

/**
Expand Down Expand Up @@ -96,6 +98,7 @@ export async function handleHealthCheckMessage(
healthy: true,
endpoint,
correlationId: healthCheck.correlationId,
specVersion: SPEC_VERSION_CURRENT,
timestamp: Date.now(),
});
// Use a fake runId that passes validation.
Expand Down Expand Up @@ -195,7 +198,14 @@ function parseHealthCheckResponse(
return null;
}

return { healthy: (response as { healthy: boolean }).healthy };
const r = response as Record<string, unknown>;
const parsed: { healthy: boolean; specVersion?: number } = {
healthy: r.healthy as boolean,
};
if (typeof r.specVersion === 'number') {
parsed.specVersion = r.specVersion;
}
return parsed;
}

export async function healthCheck(
Expand Down Expand Up @@ -361,11 +371,15 @@ export function withHealthCheck(
});
}
return new Response(
`Workflow SDK "${url.pathname}" endpoint is healthy`,
JSON.stringify({
healthy: true,
endpoint: url.pathname,
specVersion: SPEC_VERSION_CURRENT,
}),
{
status: 200,
headers: {
'Content-Type': 'text/plain',
'Content-Type': 'application/json',
...HEALTH_CHECK_CORS_HEADERS,
},
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type Hook,
isLegacySpecVersion,
SPEC_VERSION_CURRENT,
SPEC_VERSION_LEGACY,
type WorkflowInvokePayload,
type WorkflowRun,
} from '@workflow/world';
Expand Down Expand Up @@ -194,6 +195,7 @@ export async function resumeHook<T = any>(
} satisfies WorkflowInvokePayload,
{
deploymentId: workflowRun.deploymentId,
specVersion: workflowRun.specVersion ?? SPEC_VERSION_LEGACY,
}
);

Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/runtime/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export async function reenqueueRun(world: World, runId: string): Promise<void> {
},
{
deploymentId: run.deploymentId,
specVersion: run.specVersion ?? SPEC_VERSION_LEGACY,
}
);
} catch (err) {
Expand Down Expand Up @@ -209,6 +210,7 @@ export async function wakeUpRun(
},
{
deploymentId: run.deploymentId,
specVersion: run.specVersion ?? SPEC_VERSION_LEGACY,
}
);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/runtime/start.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ describe('start', () => {
expect(mockQueue).toHaveBeenCalledWith(
expect.any(String),
expect.any(Object),
{ deploymentId: 'dpl_resolved_abc123' }
expect.objectContaining({ deploymentId: 'dpl_resolved_abc123' })
);
});

Expand Down
25 changes: 17 additions & 8 deletions packages/core/src/runtime/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import {
WorkflowWorldError,
} from '@workflow/errors';
import type { WorkflowInvokePayload, World } from '@workflow/world';
import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world';
import {
isLegacySpecVersion,
SPEC_VERSION_CURRENT,
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
} from '@workflow/world';
import { monotonicFactory } from 'ulid';
import { importKey } from '../encryption.js';
import { runtimeLogger } from '../logger.js';
Expand Down Expand Up @@ -220,16 +224,21 @@ export async function start<TArgs extends unknown[], TResult>(
{
runId,
traceCarrier,
runInput: {
input: workflowArguments,
deploymentId,
workflowName,
specVersion,
executionContext,
},
...(specVersion >= SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT
? {
runInput: {
input: workflowArguments,
deploymentId,
workflowName,
specVersion,
executionContext,
},
}
: {}),
} satisfies WorkflowInvokePayload,
{
deploymentId,
specVersion,
}
),
]);
Expand Down
3 changes: 2 additions & 1 deletion packages/world-postgres/test/storage.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { execSync } from 'node:child_process';
import { PostgreSqlContainer } from '@testcontainers/postgresql';
import type { Hook, Step, WorkflowRun } from '@workflow/world';
import { SPEC_VERSION_CURRENT } from '@workflow/world';
import { encode } from 'cbor-x';
import { Pool } from 'pg';
import {
Expand Down Expand Up @@ -385,7 +386,7 @@ describe('Storage (Postgres integration)', () => {
completedAt: undefined,
createdAt: expect.any(Date),
updatedAt: expect.any(Date),
specVersion: 2,
specVersion: SPEC_VERSION_CURRENT,
});
});
});
Expand Down
68 changes: 67 additions & 1 deletion packages/world-vercel/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
type QueueOptions,
type QueuePayload,
QueuePayloadSchema,
SPEC_VERSION_CURRENT,
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
ValidQueueName,
} from '@workflow/world';
import { decode, encode } from 'cbor-x';
Expand All @@ -18,6 +20,8 @@ import { type APIConfig, getHeaders, getHttpUrl } from './utils.js';
* CBOR-based queue transport. Encodes values with cbor-x on send and
* decodes on receive, preserving Uint8Array values natively (workflow
* input is a Uint8Array in specVersion >= 2).
*
* Used for specVersion >= SPEC_VERSION_CURRENT (3).
*/
class CborTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';
Expand All @@ -38,6 +42,58 @@ class CborTransport implements Transport<unknown> {
}
}

/**
* JSON-based queue transport. Used for specVersion < SPEC_VERSION_CURRENT
* to maintain compatibility with older deployments that expect JSON messages.
*/
class JsonTransport implements Transport<unknown> {
readonly contentType = 'application/json';

serialize(value: unknown): Buffer {
return Buffer.from(JSON.stringify(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
return JSON.parse(Buffer.concat(chunks).toString());
}
}

/**
* Dual transport for the queue handler. Serializes with CBOR (handler
* re-enqueues target the same new deployment) but deserializes with
* CBOR-first, falling back to JSON for messages from older deployments.
*/
class DualTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';

serialize(value: unknown): Buffer {
return Buffer.from(encode(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
const buffer = Buffer.concat(chunks);
try {
return decode(buffer);
} catch {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking observation: The CBOR-first, JSON-fallback strategy is safe because CBOR's binary encoding always starts with a type-length byte (e.g. 0xA2 for a 2-entry map) that is never valid as the first byte of a JSON string (which must start with {, [, ", digit, t, f, n, or whitespace in ASCII). So there's no ambiguity — cbor-x.decode() will throw on JSON input, and JSON.parse() will succeed.

The only theoretical concern is a corrupted message that happens to be valid CBOR but not valid JSON — but that's a data integrity issue unrelated to the dual transport.

return JSON.parse(buffer.toString());
}
}
}

const requestIdStorage = new AsyncLocalStorage<string | undefined>();

const MessageWrapper = z.object({
Expand Down Expand Up @@ -113,11 +169,13 @@ export function createQueue(config?: APIConfig): Queue {
const region = 'iad1';

const cborTransport = new CborTransport();
const jsonTransport = new JsonTransport();
const dualTransport = new DualTransport();

const clientOptions = {
region,
dispatcher: getDispatcher(),
transport: cborTransport,
transport: dualTransport,
...(usingProxy && {
// final path will be /queues-proxy/api/v3/topic/...
// and the proxy will strip the /queues-proxy prefix before forwarding to VQS
Expand All @@ -142,9 +200,17 @@ export function createQueue(config?: APIConfig): Queue {
);
}

// Select transport based on the target run's specVersion:
// CBOR for specVersion >= 3 (CBOR transport), JSON for older ones.
const useCbor =
(opts?.specVersion ?? SPEC_VERSION_CURRENT) >=
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT;
const transport = useCbor ? cborTransport : jsonTransport;

const client = new QueueClient({
...clientOptions,
deploymentId,
transport,
});

// The CborTransport handles CBOR encoding inside serialize(),
Expand Down
2 changes: 2 additions & 0 deletions packages/world/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ export {
requiresNewerWorld,
SPEC_VERSION_CURRENT,
SPEC_VERSION_LEGACY,
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
SPEC_VERSION_SUPPORTS_EVENT_SOURCING,
} from './spec-version.js';
export type * from './steps.js';
export { StepSchema, StepStatusSchema } from './steps.js';
Expand Down
2 changes: 2 additions & 0 deletions packages/world/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ export interface QueueOptions {
headers?: Record<string, string>;
/** Delay message delivery by this many seconds */
delaySeconds?: number;
/** Spec version of the target run. Used to select the queue transport format. */
specVersion?: number;
}

export interface Queue {
Expand Down
22 changes: 16 additions & 6 deletions packages/world/src/spec-version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,33 @@ export type SpecVersion = number & {
readonly [SpecVersionBrand]: typeof SpecVersionBrand;
};

/** Legacy spec version (pre-event-sourcing). Also used for runs without specVersion. */
/**
* Legacy spec version (pre-event-sourcing). Also used for runs without specVersion.
* This is the only true legacy version — specVersion 2+ all use the event-sourced model.
*/
export const SPEC_VERSION_LEGACY = 1 as SpecVersion;

/** Current spec version (event-sourced architecture). */
export const SPEC_VERSION_CURRENT = 2 as SpecVersion;
export const SPEC_VERSION_SUPPORTS_EVENT_SOURCING = 2 as SpecVersion;
export const SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT = 3 as SpecVersion;

/** Current spec version (event-sourced architecture with CBOR queue transport). */
export const SPEC_VERSION_CURRENT =
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT as SpecVersion;

/**
* Check if a spec version is legacy (< SPEC_VERSION_CURRENT or undefined).
* Check if a spec version is legacy (<= SPEC_VERSION_LEGACY or undefined).
* Legacy runs require different handling - they use direct entity mutation
* instead of the event-sourced model.
*
* Checks against SPEC_VERSION_LEGACY (1), not SPEC_VERSION_CURRENT, so that
* intermediate versions (e.g. 2) are not incorrectly treated as legacy when
* SPEC_VERSION_CURRENT is bumped.
*
* @param v - The spec version number, or undefined/null for legacy runs
* @returns true if the run is a legacy run
*/
export function isLegacySpecVersion(v: number | undefined | null): boolean {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: This is the key correctness fix. The old v < SPEC_VERSION_CURRENT would have made specVersion 2 runs legacy after the bump to 3, breaking event-sourced replay for all existing v2 runs. v <= SPEC_VERSION_LEGACY is the right semantic — only version 1 is truly legacy (direct entity mutation).

if (v === undefined || v === null) return true;
return v < SPEC_VERSION_CURRENT;
return v === undefined || v === null || v <= SPEC_VERSION_LEGACY;
}

/**
Expand Down
Loading