Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cbor-storage-world-local.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-local': patch
---

Switch filesystem entity storage from JSON to CBOR with backwards-compatible legacy JSON read and in-place migration on writes.
7 changes: 7 additions & 0 deletions .changeset/cbor-x-catalog-version.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@workflow/world-postgres': patch
'@workflow/world-vercel': patch
'@workflow/web': patch
---

Update `cbor-x` to v1.6.4 via workspace catalog.
6 changes: 3 additions & 3 deletions docs/content/docs/changelog/resilient-start.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ runs stuck at `pending` (no `run_started` event), and Windows CI showed
"Unconsumed event in event log" errors from duplicate `run_created` events.

**Root cause:** A TOCTOU race between the normal `run_created` path and the
resilient start path. Both used `writeJSON` which checks existence with
resilient start path. Both used `writeEntity` (formerly `writeJSON`) which checks existence with
`fs.access()` (non-atomic), so both could pass the check and write separate
`run_created` events with different event IDs. Fixed by switching both paths to
`writeExclusive` (O_CREAT|O_EXCL) — see retrospective items 12 and 16.
Expand Down Expand Up @@ -243,7 +243,7 @@ violating repo rules ("all changes should be patch"). Changed after discussion.
### 12. world-local TOCTOU race causing duplicate `run_created` events (Windows CI)

The resilient start path AND the normal `run_created` path in `world-local/events-storage.ts`
both used `writeJSON` to create the run entity. `writeJSON` checks file existence with
both used `writeEntity` (formerly `writeJSON`) to create the run entity. `writeEntity` checks file existence with
`fs.access()` then writes via temp+rename — a classic TOCTOU race. On the local world,
the queue delivers via an async IIFE in the same event loop, so `events.create(run_created)`
and `events.create(run_started)` (with resilient start) run concurrently:
Expand Down Expand Up @@ -298,7 +298,7 @@ the pre-refactor behavior where eventData was explicitly stripped from the resul
### 16. Normal `run_created` path also needed `writeExclusive` (Windows CI)

The initial TOCTOU fix (item 12) only changed the resilient start path to use
`writeExclusive`. The normal `run_created` entity write still used `writeJSON` which
`writeExclusive`. The normal `run_created` entity write still used `writeEntity` (formerly `writeJSON`) which
checks existence with `fs.access()` then writes via temp+rename — not atomic. On
Windows CI, the local queue's async IIFE delivered fast enough for both paths to pass
their existence checks simultaneously, producing two `run_created` events with different
Expand Down
2 changes: 1 addition & 1 deletion packages/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"@workflow/world": "workspace:*",
"@workflow/world-vercel": "workspace:*",
"@xyflow/react": "12.10.1",
"cbor-x": "^1",
"cbor-x": "catalog:",
"class-variance-authority": "0.7.1",
"clsx": "2.1.1",
"cross-env": "^7.0.3",
Expand Down
3 changes: 1 addition & 2 deletions packages/world-local/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

Filesystem-based workflow backend for local development and testing.

Stores workflow data as JSON files on disk and provides in-memory queuing. Automatically detects development server port for queue transport.
Stores workflow data as CBOR files on disk (with legacy JSON read compatibility) and provides in-memory queuing. Automatically detects development server port for queue transport.

Used by default on `next dev` and `next start`.

1 change: 1 addition & 0 deletions packages/world-local/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"@workflow/utils": "workspace:*",
"@workflow/world": "workspace:*",
"async-sema": "3.1.1",
"cbor-x": "catalog:",
"ulid": "catalog:",
"undici": "catalog:",
"zod": "catalog:"
Expand Down
2 changes: 1 addition & 1 deletion packages/world-local/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export type Config = {
baseUrl?: string;
/**
* Optional tag to scope filesystem operations.
* When set, files are written as `{id}.{tag}.json` and `clear()` only deletes
* When set, files are written as `{id}.{tag}.cbor` and `clear()` only deletes
* files matching this tag. Used by vitest to isolate test data in the shared
* `.workflow-data` directory.
*/
Expand Down
79 changes: 73 additions & 6 deletions packages/world-local/src/fs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ import {
vi,
} from 'vitest';
import { z } from 'zod';
import { paginatedFileSystemQuery, ulidToDate, writeJSON } from './fs.js';
import {
paginatedFileSystemQuery,
readEntity,
ulidToDate,
writeEntity,
} from './fs.js';

// Create a new monotonic ULID factory for each test to avoid state pollution
let ulid = monotonicFactory(() => Math.random());
Expand Down Expand Up @@ -87,10 +92,72 @@ describe('fs utilities', () => {
});
});

describe('cbor entity storage', () => {
const BinaryEntitySchema = z.object({
id: z.string(),
payload: z.instanceof(Uint8Array),
createdAt: z.coerce.date(),
});

it('should write and read CBOR entities with Uint8Array payloads', async () => {
const filePath = path.join(testDir, 'entity.cbor');
const entity = {
id: 'entity-1',
payload: new Uint8Array([1, 2, 3]),
createdAt: new Date(),
};

await writeEntity(filePath, entity);

const readByPath = await readEntity(filePath, BinaryEntitySchema);
expect(readByPath).not.toBeNull();
expect(readByPath?.payload).toBeInstanceOf(Uint8Array);
expect(readByPath?.payload).toEqual(new Uint8Array([1, 2, 3]));

const readByBasePath = await readEntity(
path.join(testDir, 'entity'),
BinaryEntitySchema
);
expect(readByBasePath).not.toBeNull();
expect(readByBasePath?.payload).toEqual(new Uint8Array([1, 2, 3]));
});

it('should remove legacy JSON sibling when writing CBOR', async () => {
const cborPath = path.join(testDir, 'migrate.cbor');
const legacyPath = path.join(testDir, 'migrate.json');
const entity = {
id: 'entity-2',
payload: new Uint8Array([4, 5, 6]),
createdAt: new Date(),
};

await writeEntity(legacyPath, entity);
const legacyBefore = await fs
.access(legacyPath)
.then(() => true)
.catch(() => false);
expect(legacyBefore).toBe(true);

await writeEntity(cborPath, entity, { overwrite: true });

const cborExists = await fs
.access(cborPath)
.then(() => true)
.catch(() => false);
const legacyAfter = await fs
.access(legacyPath)
.then(() => true)
.catch(() => false);

expect(cborExists).toBe(true);
expect(legacyAfter).toBe(false);
});
});

describe('paginatedFileSystemQuery', () => {
// Simple getCreatedAt function that strips .json and tries to parse as ULID
// Simple getCreatedAt function that strips entity extension and parses ULIDs
const getCreatedAt = (filename: string): Date | null => {
const name = filename.replace('.json', '');
const name = filename.replace(/\.(json|cbor)$/, '');
return ulidToDate(name);
};

Expand Down Expand Up @@ -529,7 +596,7 @@ describe('fs utilities', () => {

// Custom getCreatedAt for prefix tests that handles prefix_ULID pattern
const getPrefixCreatedAt = (filename: string): Date | null => {
const name = filename.replace('.json', '');
const name = filename.replace(/\.(json|cbor)$/, '');
if (name.includes('_')) {
const parts = name.split('_');
const lastPart = parts[parts.length - 1];
Expand Down Expand Up @@ -760,12 +827,12 @@ describe('fs utilities', () => {
const secondUlid = ulid();

await Promise.all([
writeJSON(filePath, {
writeEntity(filePath, {
id: firstUlid,
name: 'test-item-1',
createdAt: testTime,
}),
writeJSON(filePath, {
writeEntity(filePath, {
id: secondUlid,
name: 'test-item-2',
createdAt: testTime,
Expand Down
Loading
Loading