diff --git a/.changeset/cbor-storage-world-local.md b/.changeset/cbor-storage-world-local.md new file mode 100644 index 0000000000..37e712a453 --- /dev/null +++ b/.changeset/cbor-storage-world-local.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-local': patch +--- + +Switch filesystem entity storage from JSON to CBOR with backwards-compatible legacy JSON read and in-place migration on writes. diff --git a/.changeset/cbor-x-catalog-version.md b/.changeset/cbor-x-catalog-version.md new file mode 100644 index 0000000000..99ec75db77 --- /dev/null +++ b/.changeset/cbor-x-catalog-version.md @@ -0,0 +1,7 @@ +--- +'@workflow/world-postgres': patch +'@workflow/world-vercel': patch +'@workflow/web': patch +--- + +Update `cbor-x` to v1.6.4 via workspace catalog. diff --git a/docs/content/docs/changelog/resilient-start.mdx b/docs/content/docs/changelog/resilient-start.mdx index 653c439e0f..78f7ee4231 100644 --- a/docs/content/docs/changelog/resilient-start.mdx +++ b/docs/content/docs/changelog/resilient-start.mdx @@ -121,7 +121,7 @@ runs stuck at `pending` (no `run_started` event), and Windows CI showed "Unconsumed event in event log" errors from duplicate `run_created` events. **Root cause:** A TOCTOU race between the normal `run_created` path and the -resilient start path. Both used `writeJSON` which checks existence with +resilient start path. Both used `writeEntity` (formerly `writeJSON`) which checks existence with `fs.access()` (non-atomic), so both could pass the check and write separate `run_created` events with different event IDs. Fixed by switching both paths to `writeExclusive` (O_CREAT|O_EXCL) — see retrospective items 12 and 16. @@ -243,7 +243,7 @@ violating repo rules ("all changes should be patch"). Changed after discussion. ### 12. world-local TOCTOU race causing duplicate `run_created` events (Windows CI) The resilient start path AND the normal `run_created` path in `world-local/events-storage.ts` -both used `writeJSON` to create the run entity. `writeJSON` checks file existence with +both used `writeEntity` (formerly `writeJSON`) to create the run entity. `writeEntity` checks file existence with `fs.access()` then writes via temp+rename — a classic TOCTOU race. On the local world, the queue delivers via an async IIFE in the same event loop, so `events.create(run_created)` and `events.create(run_started)` (with resilient start) run concurrently: @@ -298,7 +298,7 @@ the pre-refactor behavior where eventData was explicitly stripped from the resul ### 16. Normal `run_created` path also needed `writeExclusive` (Windows CI) The initial TOCTOU fix (item 12) only changed the resilient start path to use -`writeExclusive`. The normal `run_created` entity write still used `writeJSON` which +`writeExclusive`. The normal `run_created` entity write still used `writeEntity` (formerly `writeJSON`) which checks existence with `fs.access()` then writes via temp+rename — not atomic. On Windows CI, the local queue's async IIFE delivered fast enough for both paths to pass their existence checks simultaneously, producing two `run_created` events with different diff --git a/packages/web/package.json b/packages/web/package.json index 460630bc08..2701dadb43 100644 --- a/packages/web/package.json +++ b/packages/web/package.json @@ -66,7 +66,7 @@ "@workflow/world": "workspace:*", "@workflow/world-vercel": "workspace:*", "@xyflow/react": "12.10.1", - "cbor-x": "^1", + "cbor-x": "catalog:", "class-variance-authority": "0.7.1", "clsx": "2.1.1", "cross-env": "^7.0.3", diff --git a/packages/world-local/README.md b/packages/world-local/README.md index 9e3f0d95cc..bddaa5efa2 100644 --- a/packages/world-local/README.md +++ b/packages/world-local/README.md @@ -2,7 +2,6 @@ Filesystem-based workflow backend for local development and testing. -Stores workflow data as JSON files on disk and provides in-memory queuing. Automatically detects development server port for queue transport. +Stores workflow data as CBOR files on disk (with legacy JSON read compatibility) and provides in-memory queuing. Automatically detects development server port for queue transport. Used by default on `next dev` and `next start`. - diff --git a/packages/world-local/package.json b/packages/world-local/package.json index 0a8bd37fbe..5dd01d8622 100644 --- a/packages/world-local/package.json +++ b/packages/world-local/package.json @@ -35,6 +35,7 @@ "@workflow/utils": "workspace:*", "@workflow/world": "workspace:*", "async-sema": "3.1.1", + "cbor-x": "catalog:", "ulid": "catalog:", "undici": "catalog:", "zod": "catalog:" diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index 99431be3e1..6e7a2463d6 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -17,7 +17,7 @@ export type Config = { baseUrl?: string; /** * Optional tag to scope filesystem operations. - * When set, files are written as `{id}.{tag}.json` and `clear()` only deletes + * When set, files are written as `{id}.{tag}.cbor` and `clear()` only deletes * files matching this tag. Used by vitest to isolate test data in the shared * `.workflow-data` directory. */ diff --git a/packages/world-local/src/fs.test.ts b/packages/world-local/src/fs.test.ts index 2293633801..967f092e73 100644 --- a/packages/world-local/src/fs.test.ts +++ b/packages/world-local/src/fs.test.ts @@ -14,7 +14,12 @@ import { vi, } from 'vitest'; import { z } from 'zod'; -import { paginatedFileSystemQuery, ulidToDate, writeJSON } from './fs.js'; +import { + paginatedFileSystemQuery, + readEntity, + ulidToDate, + writeEntity, +} from './fs.js'; // Create a new monotonic ULID factory for each test to avoid state pollution let ulid = monotonicFactory(() => Math.random()); @@ -87,10 +92,72 @@ describe('fs utilities', () => { }); }); + describe('cbor entity storage', () => { + const BinaryEntitySchema = z.object({ + id: z.string(), + payload: z.instanceof(Uint8Array), + createdAt: z.coerce.date(), + }); + + it('should write and read CBOR entities with Uint8Array payloads', async () => { + const filePath = path.join(testDir, 'entity.cbor'); + const entity = { + id: 'entity-1', + payload: new Uint8Array([1, 2, 3]), + createdAt: new Date(), + }; + + await writeEntity(filePath, entity); + + const readByPath = await readEntity(filePath, BinaryEntitySchema); + expect(readByPath).not.toBeNull(); + expect(readByPath?.payload).toBeInstanceOf(Uint8Array); + expect(readByPath?.payload).toEqual(new Uint8Array([1, 2, 3])); + + const readByBasePath = await readEntity( + path.join(testDir, 'entity'), + BinaryEntitySchema + ); + expect(readByBasePath).not.toBeNull(); + expect(readByBasePath?.payload).toEqual(new Uint8Array([1, 2, 3])); + }); + + it('should remove legacy JSON sibling when writing CBOR', async () => { + const cborPath = path.join(testDir, 'migrate.cbor'); + const legacyPath = path.join(testDir, 'migrate.json'); + const entity = { + id: 'entity-2', + payload: new Uint8Array([4, 5, 6]), + createdAt: new Date(), + }; + + await writeEntity(legacyPath, entity); + const legacyBefore = await fs + .access(legacyPath) + .then(() => true) + .catch(() => false); + expect(legacyBefore).toBe(true); + + await writeEntity(cborPath, entity, { overwrite: true }); + + const cborExists = await fs + .access(cborPath) + .then(() => true) + .catch(() => false); + const legacyAfter = await fs + .access(legacyPath) + .then(() => true) + .catch(() => false); + + expect(cborExists).toBe(true); + expect(legacyAfter).toBe(false); + }); + }); + describe('paginatedFileSystemQuery', () => { - // Simple getCreatedAt function that strips .json and tries to parse as ULID + // Simple getCreatedAt function that strips entity extension and parses ULIDs const getCreatedAt = (filename: string): Date | null => { - const name = filename.replace('.json', ''); + const name = filename.replace(/\.(json|cbor)$/, ''); return ulidToDate(name); }; @@ -529,7 +596,7 @@ describe('fs utilities', () => { // Custom getCreatedAt for prefix tests that handles prefix_ULID pattern const getPrefixCreatedAt = (filename: string): Date | null => { - const name = filename.replace('.json', ''); + const name = filename.replace(/\.(json|cbor)$/, ''); if (name.includes('_')) { const parts = name.split('_'); const lastPart = parts[parts.length - 1]; @@ -760,12 +827,12 @@ describe('fs utilities', () => { const secondUlid = ulid(); await Promise.all([ - writeJSON(filePath, { + writeEntity(filePath, { id: firstUlid, name: 'test-item-1', createdAt: testTime, }), - writeJSON(filePath, { + writeEntity(filePath, { id: secondUlid, name: 'test-item-2', createdAt: testTime, diff --git a/packages/world-local/src/fs.ts b/packages/world-local/src/fs.ts index 1b7e4fd505..f5091ed544 100644 --- a/packages/world-local/src/fs.ts +++ b/packages/world-local/src/fs.ts @@ -2,12 +2,15 @@ import { promises as fs } from 'node:fs'; import path from 'node:path'; import { EntityConflictError } from '@workflow/errors'; import type { PaginatedResponse } from '@workflow/world'; +import { decode, encode } from 'cbor-x'; import { monotonicFactory } from 'ulid'; import { z } from 'zod'; const ulid = monotonicFactory(() => Math.random()); const isWindows = process.platform === 'win32'; +const ENTITY_FILE_EXTENSION = '.cbor'; +const LEGACY_ENTITY_FILE_EXTENSION = '.json'; /** * Execute a filesystem operation with retry logic on Windows. @@ -54,7 +57,7 @@ export function clearCreatedFilesCache(): void { export { ulidToDate } from '@workflow/world'; /** - * Regex matching a tag suffix on a fileId (after `.json` has been stripped). + * Regex matching a tag suffix on a fileId (after extension has been stripped). * E.g., `wrun_ABC.vitest-0` → the `.vitest-0` part. * Tags start with a letter and contain alphanumeric chars and hyphens. * Entity IDs (ULIDs, step_N, etc.) never contain dots, so the first dot @@ -73,8 +76,8 @@ export function stripTag(fileId: string): string { /** * Build the file path for an entity, with optional tag embedded in the filename. - * `taggedPath('runs', 'wrun_ABC', 'vitest-0')` → `runs/wrun_ABC.vitest-0.json` - * `taggedPath('runs', 'wrun_ABC')` → `runs/wrun_ABC.json` + * `taggedPath('runs', 'wrun_ABC', 'vitest-0')` → `runs/wrun_ABC.vitest-0.cbor` + * `taggedPath('runs', 'wrun_ABC')` → `runs/wrun_ABC.cbor` */ export function taggedPath( basedir: string, @@ -82,16 +85,18 @@ export function taggedPath( fileId: string, tag?: string ): string { - const filename = tag ? `${fileId}.${tag}.json` : `${fileId}.json`; + const filename = tag + ? `${fileId}.${tag}${ENTITY_FILE_EXTENSION}` + : `${fileId}${ENTITY_FILE_EXTENSION}`; return path.join(basedir, entityDir, filename); } /** - * Read a JSON entity with tagged fallback. + * Read an entity with tagged fallback. * When a tag is set, tries the tagged path first, then falls back to the * untagged path (so a tagged world can read entities written without a tag). */ -export async function readJSONWithFallback( +export async function readEntityWithFallback( basedir: string, entityDir: string, fileId: string, @@ -99,27 +104,28 @@ export async function readJSONWithFallback( tag?: string ): Promise { if (tag) { - const result = await readJSON( - path.join(basedir, entityDir, `${fileId}.${tag}.json`), - schema - ); + const taggedBasePath = path.join(basedir, entityDir, `${fileId}.${tag}`); + const result = await readEntity(taggedBasePath, schema); if (result !== null) return result; } - return readJSON(path.join(basedir, entityDir, `${fileId}.json`), schema); + return readEntity(path.join(basedir, entityDir, fileId), schema); } /** * List all filenames in a directory that have a specific tag. - * Returns full filenames (e.g., `wrun_ABC.vitest-0.json`). + * Returns full filenames (e.g., `wrun_ABC.vitest-0.cbor`, `wrun_ABC.vitest-0.json`). */ export async function listTaggedFiles( dirPath: string, tag: string ): Promise { - const suffix = `.${tag}.json`; + const cborSuffix = `.${tag}${ENTITY_FILE_EXTENSION}`; + const jsonSuffix = `.${tag}${LEGACY_ENTITY_FILE_EXTENSION}`; try { const files = await fs.readdir(dirPath); - return files.filter((f) => f.endsWith(suffix)); + return files.filter( + (f) => f.endsWith(cborSuffix) || f.endsWith(jsonSuffix) + ); } catch (error) { if ((error as any).code === 'ENOENT') return []; throw error; @@ -158,7 +164,8 @@ interface WriteOptions { } /** - * Custom JSON replacer that encodes Uint8Array as base64 strings. + * Custom JSON replacer used for legacy .json files. + * Encodes Uint8Array as base64 strings. * Format: { __type: 'Uint8Array', data: '' } */ export function jsonReplacer(_key: string, value: unknown): unknown { @@ -172,7 +179,8 @@ export function jsonReplacer(_key: string, value: unknown): unknown { } /** - * Custom JSON reviver that decodes base64 strings back to Uint8Array. + * Custom JSON reviver used for legacy .json files. + * Decodes base64 strings back to Uint8Array. */ export function jsonReviver(_key: string, value: unknown): unknown { if ( @@ -186,12 +194,83 @@ export function jsonReviver(_key: string, value: unknown): unknown { return value; } -export async function writeJSON( +function toLegacyEntityPath(filePath: string): string | null { + if (!filePath.endsWith(ENTITY_FILE_EXTENSION)) { + return null; + } + return ( + filePath.slice(0, -ENTITY_FILE_EXTENSION.length) + + LEGACY_ENTITY_FILE_EXTENSION + ); +} + +function isPlainObject(value: unknown): value is Record { + if (value === null || typeof value !== 'object') { + return false; + } + const prototype = Object.getPrototypeOf(value); + return prototype === Object.prototype || prototype === null; +} + +/** + * cbor-x may deserialize binary values as Node.js Buffers in some environments. + * Convert those values to Uint8Array so world schemas and tests stay consistent. + */ +function normalizeDecodedBinary(value: unknown): unknown { + if (Buffer.isBuffer(value)) { + return new Uint8Array(value.buffer, value.byteOffset, value.byteLength); + } + if (Array.isArray(value)) { + return value.map((item) => normalizeDecodedBinary(item)); + } + if (isPlainObject(value)) { + const normalized: Record = {}; + for (const [key, entry] of Object.entries(value)) { + normalized[key] = normalizeDecodedBinary(entry); + } + return normalized; + } + return value; +} + +export async function writeEntity( filePath: string, data: any, opts?: WriteOptions ): Promise { - return write(filePath, JSON.stringify(data, jsonReplacer, 2), opts); + if (filePath.endsWith(LEGACY_ENTITY_FILE_EXTENSION)) { + return write(filePath, JSON.stringify(data, jsonReplacer, 2), opts); + } + + if (!filePath.endsWith(ENTITY_FILE_EXTENSION)) { + throw new Error(`Unsupported storage file extension for path: ${filePath}`); + } + + if (!opts?.overwrite) { + const legacyPath = toLegacyEntityPath(filePath); + if (legacyPath) { + try { + await fs.access(legacyPath); + throw new EntityConflictError( + `File ${legacyPath} already exists and 'overwrite' is false` + ); + } catch (error: any) { + if (error instanceof EntityConflictError) { + throw error; + } + if (error.code !== 'ENOENT') { + throw error; + } + } + } + } + + await write(filePath, Buffer.from(encode(data)), opts); + + const legacyPath = toLegacyEntityPath(filePath); + if (legacyPath) { + await deleteFile(legacyPath); + } } /** @@ -251,11 +330,29 @@ export async function write( } } -export async function readJSON( +export async function readEntity( filePath: string, decoder: z.ZodType ): Promise { + if ( + !filePath.endsWith(ENTITY_FILE_EXTENSION) && + !filePath.endsWith(LEGACY_ENTITY_FILE_EXTENSION) + ) { + const cborResult = await readEntity( + `${filePath}${ENTITY_FILE_EXTENSION}`, + decoder + ); + if (cborResult !== null) { + return cborResult; + } + return readEntity(`${filePath}${LEGACY_ENTITY_FILE_EXTENSION}`, decoder); + } + try { + if (filePath.endsWith(ENTITY_FILE_EXTENSION)) { + const content = await fs.readFile(filePath); + return decoder.parse(normalizeDecodedBinary(decode(content))); + } const content = await fs.readFile(filePath, 'utf-8'); return decoder.parse(JSON.parse(content, jsonReviver)); } catch (error) { @@ -269,7 +366,7 @@ export async function readBuffer(filePath: string): Promise { return content; } -export async function deleteJSON(filePath: string): Promise { +export async function deleteFile(filePath: string): Promise { try { await fs.unlink(filePath); } catch (error) { @@ -277,6 +374,17 @@ export async function deleteJSON(filePath: string): Promise { } } +/** + * Delete an entity by base path (without extension), removing both + * `.cbor` and legacy `.json` variants. + */ +export async function deleteEntity(basePath: string): Promise { + await Promise.all([ + deleteFile(`${basePath}${ENTITY_FILE_EXTENSION}`), + deleteFile(`${basePath}${LEGACY_ENTITY_FILE_EXTENSION}`), + ]); +} + /** * Atomically create a file using O_CREAT | O_EXCL flags. * Returns true if the file was created, false if it already exists. @@ -284,7 +392,7 @@ export async function deleteJSON(filePath: string): Promise { */ export async function writeExclusive( filePath: string, - data: string + data: string | Buffer ): Promise { await ensureDir(path.dirname(filePath)); try { @@ -298,8 +406,66 @@ export async function writeExclusive( } } -export async function listJSONFiles(dirPath: string): Promise { - return listFilesByExtension(dirPath, '.json'); +/** + * Like writeExclusive, but encodes entity data using the correct format + * (CBOR for `.cbor` paths, JSON for `.json` paths) before writing. + * Also treats an existing legacy `.json` sibling as a conflict (returns false), + * and removes the legacy sibling on success to complete migration. + */ +export async function writeEntityExclusive( + filePath: string, + data: any +): Promise { + const legacyPath = toLegacyEntityPath(filePath); + if (legacyPath) { + try { + await fs.access(legacyPath); + // Legacy sibling exists — treat as conflict + return false; + } catch (error: any) { + if (error.code !== 'ENOENT') throw error; + } + } + + const encoded = filePath.endsWith(ENTITY_FILE_EXTENSION) + ? Buffer.from(encode(data)) + : JSON.stringify(data, jsonReplacer, 2); + const created = await writeExclusive(filePath, encoded); + + if (created && legacyPath) { + await deleteFile(legacyPath); + } + + return created; +} + +export async function listEntityFiles(dirPath: string): Promise { + try { + const files = await fs.readdir(dirPath); + const fileIds = new Map(); + + for (const file of files) { + if (file.endsWith(ENTITY_FILE_EXTENSION)) { + fileIds.set( + file.slice(0, -ENTITY_FILE_EXTENSION.length), + ENTITY_FILE_EXTENSION + ); + continue; + } + + if (file.endsWith(LEGACY_ENTITY_FILE_EXTENSION)) { + const fileId = file.slice(0, -LEGACY_ENTITY_FILE_EXTENSION.length); + if (!fileIds.has(fileId)) { + fileIds.set(fileId, LEGACY_ENTITY_FILE_EXTENSION); + } + } + } + + return [...fileIds.keys()]; + } catch (error) { + if ((error as any).code === 'ENOENT') return []; + throw error; + } } export async function listFilesByExtension( @@ -363,21 +529,21 @@ export async function paginatedFileSystemQuery( getId, } = config; - // 1. Get all JSON files in directory - const fileIds = await listJSONFiles(directory); + // 1. Get all entity files in directory + const fileIds = await listEntityFiles(directory); // 2. Filter by prefix if provided const relevantFileIds = filePrefix ? fileIds.filter((fileId) => fileId.startsWith(filePrefix)) : fileIds; - // 3. ULID Optimization: Filter by cursor using filename timestamps before loading JSON + // 3. ULID optimization: Filter by cursor using filename timestamps before loading payloads const parsedCursor = parseCursor(cursor); let candidateFileIds = relevantFileIds; if (parsedCursor) { candidateFileIds = relevantFileIds.filter((fileId) => { - const filenameDate = getCreatedAt(`${fileId}.json`); + const filenameDate = getCreatedAt(`${fileId}${ENTITY_FILE_EXTENSION}`); if (filenameDate) { // Use filename timestamp for cursor filtering // We need to be careful here: if parsedCursor has an ID (for tie-breaking), @@ -399,7 +565,7 @@ export async function paginatedFileSystemQuery( } } // Can't extract timestamp from filename (e.g., steps use sequential IDs). - // Include the file and defer to JSON-based filtering below. + // Include the file and defer to payload-based filtering below. return true; }); } @@ -408,18 +574,18 @@ export async function paginatedFileSystemQuery( const validItems: T[] = []; for (const fileId of candidateFileIds) { - const filePath = path.join(directory, `${fileId}.json`); + const filePath = path.join(directory, fileId); let item: T | null = null; try { - item = await readJSON(filePath, schema); + item = await readEntity(filePath, schema); } catch (error: unknown) { - // We don't expect zod errors to happen, but if the JSON does get malformed, + // We don't expect zod errors to happen, but if a payload does get malformed, // we skip the item. Preferably, we'd have a way to mark items as malformed, // so that the UI can display them as such, with richer messaging. In the meantime, // we just log a warning and skip the item. if (error instanceof z.ZodError) { console.warn( - `Skipping item ${fileId} due to malformed JSON: ${error.message}` + `Skipping item ${fileId} due to malformed storage payload: ${error.message}` ); continue; } @@ -430,7 +596,7 @@ export async function paginatedFileSystemQuery( // Apply custom filter early if provided if (filter && !filter(item)) continue; - // Double-check cursor filtering with actual createdAt from JSON + // Double-check cursor filtering with actual createdAt from payload // (in case ULID timestamp differs from stored createdAt) if (parsedCursor) { const itemTime = item.createdAt.getTime(); diff --git a/packages/world-local/src/index.ts b/packages/world-local/src/index.ts index ccc28f8188..33b595f378 100644 --- a/packages/world-local/src/index.ts +++ b/packages/world-local/src/index.ts @@ -7,10 +7,10 @@ import type { Config } from './config.js'; import { config } from './config.js'; import { clearCreatedFilesCache, - deleteJSON, + deleteFile, listTaggedFiles, listTaggedFilesByExtension, - readJSON, + readEntity, } from './fs.js'; import { initDataDir } from './init.js'; import { instrumentObject } from './instrumentObject.js'; @@ -29,7 +29,7 @@ export { parseVersion, } from './init.js'; -export { type DirectHandler } from './queue.js'; +export type { DirectHandler } from './queue.js'; export type LocalWorld = World & { /** Register a direct in-process handler for a queue prefix, bypassing HTTP. */ @@ -49,7 +49,7 @@ export type LocalWorld = World & { * @param args.port - Port override for queue transport (default: auto-detected) * @param args.baseUrl - Full base URL override for queue transport (default: `http://localhost:{port}`) * @param args.tag - Optional tag to scope files (e.g., `vitest-0`). When set, files are written - * as `{id}.{tag}.json` and `clear()` only deletes files matching this tag. + * as `{id}.{tag}.cbor` and `clear()` only deletes files matching this tag. * @throws {DataDirAccessError} If the data directory cannot be created or accessed * @throws {DataDirVersionError} If the data directory version is incompatible */ @@ -93,12 +93,12 @@ export function createLocalWorld(args?: Partial): LocalWorld { const { HookSchema } = await import('@workflow/world'); await Promise.all( taggedHookFiles.map(async (hookFile) => { - const hook = await readJSON( + const hook = await readEntity( path.join(hooksDir, hookFile), HookSchema ); if (hook?.token) { - await deleteJSON( + await deleteFile( path.join(hooksDir, 'tokens', `${hashToken(hook.token)}.json`) ); } @@ -119,7 +119,7 @@ export function createLocalWorld(args?: Partial): LocalWorld { const fullDir = path.join(basedir, dir); const files = await listTaggedFiles(fullDir, tag); await Promise.all( - files.map((f) => deleteJSON(path.join(fullDir, f))) + files.map((f) => deleteFile(path.join(fullDir, f))) ); }) ); diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index 71632f98f6..67bc318735 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -6,7 +6,7 @@ import type { Event, Storage } from '@workflow/world'; import { stripEventDataRefs } from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import { writeJSON } from './fs.js'; +import { writeEntity } from './fs.js'; import { createStorage } from './storage.js'; import { completeWait, @@ -178,7 +178,7 @@ describe('Storage', () => { expect(run.updatedAt).toBeInstanceOf(Date); // Verify file was created - const filePath = path.join(testDir, 'runs', `${run.runId}.json`); + const filePath = path.join(testDir, 'runs', `${run.runId}.cbor`); const fileExists = await fs .access(filePath) .then(() => true) @@ -390,7 +390,7 @@ describe('Storage', () => { const filePath = path.join( testDir, 'steps', - `${testRunId}-step_123.json` + `${testRunId}-step_123.cbor` ); const fileExists = await fs .access(filePath) @@ -791,7 +791,7 @@ describe('Storage', () => { const filePath = path.join( testDir, 'events', - `${testRunId}-${event.eventId}.json` + `${testRunId}-${event.eventId}.cbor` ); const fileExists = await fs .access(filePath) @@ -1311,7 +1311,7 @@ describe('Storage', () => { expect(hook.createdAt).toBeInstanceOf(Date); // Verify file was created - const filePath = path.join(testDir, 'hooks', 'hook_123.json'); + const filePath = path.join(testDir, 'hooks', 'hook_123.cbor'); const fileExists = await fs .access(filePath) .then(() => true) @@ -2650,10 +2650,63 @@ describe('Storage', () => { createdAt: new Date(), updatedAt: new Date(), }; - await writeJSON(path.join(runsDir, `${runId}.json`), run); + await writeEntity(path.join(runsDir, `${runId}.json`), run); return run; } + describe('json entity migration', () => { + it('should read run entities from legacy .json files', async () => { + const run = await createRun(storage, { + deploymentId: 'legacy-json-read', + workflowName: 'legacy-json-read', + input: new Uint8Array([1, 2, 3]), + }); + + const cborPath = path.join(testDir, 'runs', `${run.runId}.cbor`); + const legacyPath = path.join(testDir, 'runs', `${run.runId}.json`); + + await fs.unlink(cborPath); + await writeEntity(legacyPath, run, { overwrite: true }); + + const fetched = await storage.runs.get(run.runId); + expect(fetched.workflowName).toBe('legacy-json-read'); + expect(fetched.input).toEqual(new Uint8Array([1, 2, 3])); + }); + + it('should migrate legacy .json runs to .cbor on update writes', async () => { + const run = await createRun(storage, { + deploymentId: 'legacy-json-migrate', + workflowName: 'legacy-json-migrate', + input: new Uint8Array([4, 5, 6]), + }); + + const cborPath = path.join(testDir, 'runs', `${run.runId}.cbor`); + const legacyPath = path.join(testDir, 'runs', `${run.runId}.json`); + + await fs.unlink(cborPath); + await writeEntity(legacyPath, run, { overwrite: true }); + + await storage.events.create(run.runId, { + eventType: 'run_started', + }); + + const cborExists = await fs + .access(cborPath) + .then(() => true) + .catch(() => false); + const legacyExists = await fs + .access(legacyPath) + .then(() => true) + .catch(() => false); + + expect(cborExists).toBe(true); + expect(legacyExists).toBe(false); + + const updated = await storage.runs.get(run.runId); + expect(updated.status).toBe('running'); + }); + }); + describe('legacy runs (specVersion < 2 or undefined)', () => { it('should handle run_cancelled on legacy run with specVersion=1', async () => { const runId = 'wrun_legacy_v1'; diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 63df4a81c0..530763aa39 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -31,14 +31,15 @@ import { } from '@workflow/world'; import { DEFAULT_RESOLVE_DATA_OPTION } from '../config.js'; import { - deleteJSON, - jsonReplacer, - listJSONFiles, + deleteEntity, + deleteFile, + listEntityFiles, paginatedFileSystemQuery, - readJSONWithFallback, + readEntityWithFallback, taggedPath, + writeEntity, + writeEntityExclusive, writeExclusive, - writeJSON, } from '../fs.js'; import { stripEventDataRefs } from './filters.js'; import { getObjectCreatedAt, hashToken, monotonicUlid } from './helpers.js'; @@ -54,14 +55,13 @@ async function deleteAllWaitsForRun( runId: string ): Promise { const waitsDir = path.join(basedir, 'waits'); - const files = await listJSONFiles(waitsDir); + const files = await listEntityFiles(waitsDir); for (const file of files) { // fileIds may contain tag suffixes (e.g., "wrun_ABC-corrId.vitest-0") // but startsWith still matches correctly since the tag is a suffix. if (file.startsWith(`${runId}-`)) { - const waitPath = path.join(waitsDir, `${file}.json`); - await deleteJSON(waitPath); + await deleteEntity(path.join(waitsDir, file)); } } } @@ -118,7 +118,7 @@ export function createEventsStorage( data.eventType !== 'run_created' && !skipRunValidationEvents.includes(data.eventType) ) { - currentRun = await readJSONWithFallback( + currentRun = await readEntityWithFallback( basedir, 'runs', effectiveRunId, @@ -167,10 +167,7 @@ export function createEventsStorage( updatedAt: now, }; const runPath = taggedPath(basedir, 'runs', effectiveRunId, tag); - const created = await writeExclusive( - runPath, - JSON.stringify(createdRun, jsonReplacer) - ); + const created = await writeEntityExclusive(runPath, createdRun); if (created) { // We created the run — also write the run_created event. @@ -189,7 +186,7 @@ export function createEventsStorage( }, }; const createdCompositeKey = `${effectiveRunId}-${runCreatedEventId}`; - await writeJSON( + await writeEntity( taggedPath(basedir, 'events', createdCompositeKey, tag), runCreatedEvent ); @@ -197,7 +194,7 @@ export function createEventsStorage( } else { // Run already exists (concurrent run_created won the // race). Re-read it so downstream logic sees the real state. - currentRun = await readJSONWithFallback( + currentRun = await readEntityWithFallback( basedir, 'runs', effectiveRunId, @@ -261,7 +258,7 @@ export function createEventsStorage( specVersion: effectiveSpecVersion, }; const compositeKey = `${effectiveRunId}-${eventId}`; - await writeJSON( + await writeEntity( taggedPath(basedir, 'events', compositeKey, tag), event ); @@ -314,7 +311,7 @@ export function createEventsStorage( ]; if (stepEvents.includes(data.eventType) && data.correlationId) { const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`; - validatedStep = await readJSONWithFallback( + validatedStep = await readEntityWithFallback( basedir, 'steps', stepCompositeKey, @@ -352,7 +349,7 @@ export function createEventsStorage( hookEventsRequiringExistence.includes(data.eventType) && data.correlationId ) { - const existingHook = await readJSONWithFallback( + const existingHook = await readEntityWithFallback( basedir, 'hooks', data.correlationId, @@ -412,10 +409,7 @@ export function createEventsStorage( // start path (run_started on non-existent run) that could result // in duplicate run_created events in the event log. const runPath = taggedPath(basedir, 'runs', effectiveRunId, tag); - const created = await writeExclusive( - runPath, - JSON.stringify(run, jsonReplacer, 2) - ); + const created = await writeEntityExclusive(runPath, run); if (!created) { throw new EntityConflictError( `Workflow run "${effectiveRunId}" already exists` @@ -449,7 +443,7 @@ export function createEventsStorage( startedAt: currentRun.startedAt ?? now, updatedAt: now, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'runs', effectiveRunId, tag), run, { overwrite: true } @@ -475,7 +469,7 @@ export function createEventsStorage( completedAt: now, updatedAt: now, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'runs', effectiveRunId, tag), run, { overwrite: true } @@ -515,7 +509,7 @@ export function createEventsStorage( completedAt: now, updatedAt: now, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'runs', effectiveRunId, tag), run, { overwrite: true } @@ -544,7 +538,7 @@ export function createEventsStorage( completedAt: now, updatedAt: now, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'runs', effectiveRunId, tag), run, { overwrite: true } @@ -581,7 +575,7 @@ export function createEventsStorage( specVersion: effectiveSpecVersion, }; const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`; - await writeJSON( + await writeEntity( taggedPath(basedir, 'steps', stepCompositeKey, tag), step ); @@ -611,7 +605,7 @@ export function createEventsStorage( // (the local world is single-process / dev-only; the postgres // world uses SQL-level atomic guards for production). const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`; - const freshStep = await readJSONWithFallback( + const freshStep = await readEntityWithFallback( basedir, 'steps', stepCompositeKey, @@ -635,7 +629,7 @@ export function createEventsStorage( retryAfter: undefined, updatedAt: now, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'steps', stepCompositeKey, tag), step, { overwrite: true } @@ -670,7 +664,7 @@ export function createEventsStorage( completedAt: now, updatedAt: now, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'steps', stepCompositeKey, tag), step, { overwrite: true } @@ -715,7 +709,7 @@ export function createEventsStorage( completedAt: now, updatedAt: now, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'steps', stepCompositeKey, tag), step, { overwrite: true } @@ -744,7 +738,7 @@ export function createEventsStorage( retryAfter: retryData.retryAfter, updatedAt: now, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'steps', stepCompositeKey, tag), step, { overwrite: true } @@ -795,7 +789,7 @@ export function createEventsStorage( // Store the conflict event const compositeKey = `${effectiveRunId}-${eventId}`; - await writeJSON( + await writeEntity( taggedPath(basedir, 'events', compositeKey, tag), conflictEvent ); @@ -826,7 +820,7 @@ export function createEventsStorage( specVersion: effectiveSpecVersion, isWebhook: hookData.isWebhook ?? false, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'hooks', data.correlationId, tag), hook ); @@ -845,8 +839,7 @@ export function createEventsStorage( ); } // Read the hook to get its token before deleting - const hookPath = taggedPath(basedir, 'hooks', data.correlationId, tag); - const existingHook = await readJSONWithFallback( + const existingHook = await readEntityWithFallback( basedir, 'hooks', data.correlationId, @@ -861,16 +854,26 @@ export function createEventsStorage( 'tokens', `${hashToken(existingHook.token)}.json` ); - await deleteJSON(disposedConstraintPath); + await deleteFile(disposedConstraintPath); + } + // Delete both tagged and untagged entity variants in both formats + const hookBasePath = path.join( + basedir, + 'hooks', + tag ? `${data.correlationId}.${tag}` : data.correlationId + ); + await deleteEntity(hookBasePath); + if (tag) { + // Also delete untagged variant in case it was written without a tag + await deleteEntity(path.join(basedir, 'hooks', data.correlationId)); } - await deleteJSON(hookPath); } else if (data.eventType === 'wait_created' && 'eventData' in data) { // wait_created: Creates wait entity with status 'waiting' const waitData = data.eventData as { resumeAt?: Date; }; const waitCompositeKey = `${effectiveRunId}-${data.correlationId}`; - const existingWait = await readJSONWithFallback( + const existingWait = await readEntityWithFallback( basedir, 'waits', waitCompositeKey, @@ -892,7 +895,7 @@ export function createEventsStorage( updatedAt: now, specVersion: effectiveSpecVersion, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'waits', waitCompositeKey, tag), wait ); @@ -911,7 +914,7 @@ export function createEventsStorage( `Wait "${data.correlationId}" already completed` ); } - const existingWait = await readJSONWithFallback( + const existingWait = await readEntityWithFallback( basedir, 'waits', waitCompositeKey, @@ -933,7 +936,7 @@ export function createEventsStorage( completedAt: now, updatedAt: now, }; - await writeJSON( + await writeEntity( taggedPath(basedir, 'waits', waitCompositeKey, tag), wait, { overwrite: true } @@ -944,7 +947,10 @@ export function createEventsStorage( // Store event using composite key {runId}-{eventId} const compositeKey = `${effectiveRunId}-${eventId}`; - await writeJSON(taggedPath(basedir, 'events', compositeKey, tag), event); + await writeEntity( + taggedPath(basedir, 'events', compositeKey, tag), + event + ); const resolveData = params?.resolveData ?? DEFAULT_RESOLVE_DATA_OPTION; const filteredEvent = stripEventDataRefs(event, resolveData); @@ -977,7 +983,7 @@ export function createEventsStorage( async get(runId, eventId, params) { const compositeKey = `${runId}-${eventId}`; - const event = await readJSONWithFallback( + const event = await readEntityWithFallback( basedir, 'events', compositeKey, diff --git a/packages/world-local/src/storage/helpers.ts b/packages/world-local/src/storage/helpers.ts index ffdda0447a..733f52d6c1 100644 --- a/packages/world-local/src/storage/helpers.ts +++ b/packages/world-local/src/storage/helpers.ts @@ -26,27 +26,28 @@ export const getObjectCreatedAt = (idPrefix: string) => (filename: string): Date | null => { // Strip tag suffix before ULID extraction - // e.g., "wrun_ABC.vitest-0.json" → "wrun_ABC.json" - const cleanName = stripTag(filename.replace(/\.json$/, '')) + '.json'; + // e.g., "wrun_ABC.vitest-0.cbor" → "wrun_ABC.cbor" + const cleanName = + stripTag(filename.replace(/\.(json|cbor)$/, '')) + '.cbor'; const replaceRegex = new RegExp(`^${idPrefix}_`, 'g'); const dashIndex = cleanName.indexOf('-'); if (dashIndex === -1) { - // No dash - extract ULID from the filename (e.g., wrun_ULID.json, evnt_ULID.json) - const ulid = cleanName.replace(/\.json$/, '').replace(replaceRegex, ''); + // No dash - extract ULID from the filename (e.g., wrun_ULID.cbor, evnt_ULID.cbor) + const ulid = cleanName.replace(/\.cbor$/, '').replace(replaceRegex, ''); return ulidToDate(ulid); } // For composite keys like {runId}-{stepId}, extract from the appropriate part if (idPrefix === 'step') { // Steps use sequential IDs (step_0, step_1, etc.) - no timestamp in filename. - // Return null to skip filename-based optimization and defer to JSON-based filtering. + // Return null to skip filename-based optimization and defer to payload-based filtering. return null; } - // For events: wrun_ULID-evnt_ULID.json - extract from the eventId part - const id = cleanName.substring(dashIndex + 1).replace(/\.json$/, ''); + // For events: wrun_ULID-evnt_ULID.cbor - extract from the eventId part + const id = cleanName.substring(dashIndex + 1).replace(/\.cbor$/, ''); const ulid = id.replace(replaceRegex, ''); return ulidToDate(ulid); }; diff --git a/packages/world-local/src/storage/hooks-storage.ts b/packages/world-local/src/storage/hooks-storage.ts index 8992b19db0..938b940062 100644 --- a/packages/world-local/src/storage/hooks-storage.ts +++ b/packages/world-local/src/storage/hooks-storage.ts @@ -10,11 +10,12 @@ import type { import { HookSchema } from '@workflow/world'; import { DEFAULT_RESOLVE_DATA_OPTION } from '../config.js'; import { - deleteJSON, - listJSONFiles, + deleteEntity, + deleteFile, + listEntityFiles, paginatedFileSystemQuery, - readJSON, - readJSONWithFallback, + readEntity, + readEntityWithFallback, } from '../fs.js'; import { filterHookData } from './filters.js'; import { hashToken } from './helpers.js'; @@ -30,11 +31,11 @@ export function createHooksStorage( // Helper function to find a hook by token (shared between getByToken) async function findHookByToken(token: string): Promise { const hooksDir = path.join(basedir, 'hooks'); - const files = await listJSONFiles(hooksDir); + const files = await listEntityFiles(hooksDir); for (const file of files) { - const hookPath = path.join(hooksDir, `${file}.json`); - const hook = await readJSON(hookPath, HookSchema); + const hookBasePath = path.join(hooksDir, file); + const hook = await readEntity(hookBasePath, HookSchema); if (hook && hook.token === token) { return { ...hook, isWebhook: hook.isWebhook ?? true }; } @@ -44,7 +45,7 @@ export function createHooksStorage( } async function get(hookId: string, params?: GetHookParams): Promise { - const hook = await readJSONWithFallback( + const hook = await readEntityWithFallback( basedir, 'hooks', hookId, @@ -91,7 +92,7 @@ export function createHooksStorage( }, getCreatedAt: () => { // Hook files don't have ULID timestamps in filename, so return null - // to skip the filename-based optimization and defer to JSON-based + // to skip the filename-based optimization and defer to payload-based // cursor filtering which uses the actual createdAt from the file. return null; }, @@ -117,11 +118,11 @@ export async function deleteAllHooksForRun( runId: string ): Promise { const hooksDir = path.join(basedir, 'hooks'); - const files = await listJSONFiles(hooksDir); + const files = await listEntityFiles(hooksDir); for (const file of files) { - const hookPath = path.join(hooksDir, `${file}.json`); - const hook = await readJSON(hookPath, HookSchema); + const hookBasePath = path.join(hooksDir, file); + const hook = await readEntity(hookBasePath, HookSchema); if (hook && hook.runId === runId) { // Delete the token constraint file to free up the token const constraintPath = path.join( @@ -129,8 +130,8 @@ export async function deleteAllHooksForRun( 'tokens', `${hashToken(hook.token)}.json` ); - await deleteJSON(constraintPath); - await deleteJSON(hookPath); + await deleteFile(constraintPath); + await deleteEntity(hookBasePath); } } } diff --git a/packages/world-local/src/storage/legacy.ts b/packages/world-local/src/storage/legacy.ts index 0ca6004040..5385030e12 100644 --- a/packages/world-local/src/storage/legacy.ts +++ b/packages/world-local/src/storage/legacy.ts @@ -2,7 +2,7 @@ import path from 'node:path'; import type { Event, EventResult, WorkflowRun } from '@workflow/world'; import { SPEC_VERSION_CURRENT } from '@workflow/world'; import { DEFAULT_RESOLVE_DATA_OPTION } from '../config.js'; -import { writeJSON } from '../fs.js'; +import { writeEntity } from '../fs.js'; import { filterRunData, stripEventDataRefs } from './filters.js'; import { monotonicUlid } from './helpers.js'; import { deleteAllHooksForRun } from './hooks-storage.js'; @@ -44,8 +44,8 @@ export async function handleLegacyEvent( completedAt: now, updatedAt: now, }; - const runPath = path.join(basedir, 'runs', `${runId}.json`); - await writeJSON(runPath, run, { overwrite: true }); + const runPath = path.join(basedir, 'runs', `${runId}.cbor`); + await writeEntity(runPath, run, { overwrite: true }); await deleteAllHooksForRun(basedir, runId); // Return without event (legacy behavior skips event storage) // Type assertion: EventResult expects WorkflowRun, filterRunData may return WorkflowRunWithoutData @@ -70,8 +70,8 @@ export async function handleLegacyEvent( specVersion: SPEC_VERSION_CURRENT, }; const compositeKey = `${runId}-${eventId}`; - const eventPath = path.join(basedir, 'events', `${compositeKey}.json`); - await writeJSON(eventPath, event); + const eventPath = path.join(basedir, 'events', `${compositeKey}.cbor`); + await writeEntity(eventPath, event); return { event: stripEventDataRefs(event, resolveData) }; } diff --git a/packages/world-local/src/storage/runs-storage.ts b/packages/world-local/src/storage/runs-storage.ts index 967d6f4f11..72498cc115 100644 --- a/packages/world-local/src/storage/runs-storage.ts +++ b/packages/world-local/src/storage/runs-storage.ts @@ -3,7 +3,7 @@ import { WorkflowRunNotFoundError } from '@workflow/errors'; import type { Storage, WorkflowRunWithoutData } from '@workflow/world'; import { WorkflowRunSchema } from '@workflow/world'; import { DEFAULT_RESOLVE_DATA_OPTION } from '../config.js'; -import { paginatedFileSystemQuery, readJSONWithFallback } from '../fs.js'; +import { paginatedFileSystemQuery, readEntityWithFallback } from '../fs.js'; import { filterRunData } from './filters.js'; import { getObjectCreatedAt } from './helpers.js'; @@ -17,7 +17,7 @@ export function createRunsStorage( ): Storage['runs'] { return { get: (async (id: string, params?: any) => { - const run = await readJSONWithFallback( + const run = await readEntityWithFallback( basedir, 'runs', id, diff --git a/packages/world-local/src/storage/steps-storage.ts b/packages/world-local/src/storage/steps-storage.ts index 50dfb41c81..e2cd5bd7cb 100644 --- a/packages/world-local/src/storage/steps-storage.ts +++ b/packages/world-local/src/storage/steps-storage.ts @@ -3,9 +3,9 @@ import type { StepWithoutData, Storage } from '@workflow/world'; import { StepSchema } from '@workflow/world'; import { DEFAULT_RESOLVE_DATA_OPTION } from '../config.js'; import { - listJSONFiles, + listEntityFiles, paginatedFileSystemQuery, - readJSONWithFallback, + readEntityWithFallback, stripTag, } from '../fs.js'; import { filterStepData } from './filters.js'; @@ -22,7 +22,7 @@ export function createStepsStorage( return { get: (async (runId: string | undefined, stepId: string, params?: any) => { if (!runId) { - const fileIds = await listJSONFiles(path.join(basedir, 'steps')); + const fileIds = await listEntityFiles(path.join(basedir, 'steps')); const fileId = fileIds.find((fid) => stripTag(fid).endsWith(`-${stepId}`) ); @@ -32,7 +32,7 @@ export function createStepsStorage( runId = stripTag(fileId).split('-')[0]; } const compositeKey = `${runId}-${stepId}`; - const step = await readJSONWithFallback( + const step = await readEntityWithFallback( basedir, 'steps', compositeKey, diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index 9a55fabb04..ff5acd0186 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -11,10 +11,10 @@ import { z } from 'zod'; import { listFilesByExtension, readBuffer, - readJSONWithFallback, + readEntityWithFallback, taggedPath, write, - writeJSON, + writeEntity, } from './fs.js'; // Create a monotonic ULID factory that ensures ULIDs are always increasing @@ -125,7 +125,7 @@ export function createStreamer(basedir: string, tag?: string): Streamer { const runStreamsPath = taggedPath(basedir, 'streams/runs', runId, tag); // Read existing streams for this run (try tagged first, fall back to untagged) - const existing = await readJSONWithFallback( + const existing = await readEntityWithFallback( basedir, 'streams/runs', runId, @@ -137,7 +137,7 @@ export function createStreamer(basedir: string, tag?: string): Streamer { // Add stream if not already present if (!streams.includes(streamName)) { streams.push(streamName); - await writeJSON(runStreamsPath, { streams }, { overwrite: true }); + await writeEntity(runStreamsPath, { streams }, { overwrite: true }); } registeredStreams.add(cacheKey); @@ -281,7 +281,7 @@ export function createStreamer(basedir: string, tag?: string): Streamer { }, async listStreamsByRunId(runId: string) { - const data = await readJSONWithFallback( + const data = await readEntityWithFallback( basedir, 'streams/runs', runId, diff --git a/packages/world-local/src/tag.test.ts b/packages/world-local/src/tag.test.ts index 6376ae9032..0c1780255f 100644 --- a/packages/world-local/src/tag.test.ts +++ b/packages/world-local/src/tag.test.ts @@ -2,7 +2,7 @@ import { promises as fs } from 'node:fs'; import os from 'node:os'; import path from 'node:path'; import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import { listJSONFiles, stripTag } from './fs.js'; +import { listEntityFiles, stripTag } from './fs.js'; import { createStorage } from './storage.js'; import { createHook, @@ -56,7 +56,7 @@ describe('File tagging', () => { const runsDir = path.join(testDir, 'runs'); const files = await fs.readdir(runsDir); expect(files).toHaveLength(1); - expect(files[0]).toMatch(/\.vitest-0\.json$/); + expect(files[0]).toMatch(/\.vitest-0\.cbor$/); expect(files[0]).toContain(run.runId); }); @@ -71,7 +71,7 @@ describe('File tagging', () => { const eventsDir = path.join(testDir, 'events'); const files = await fs.readdir(eventsDir); expect(files).toHaveLength(1); - expect(files[0]).toMatch(/\.vitest-0\.json$/); + expect(files[0]).toMatch(/\.vitest-0\.cbor$/); }); it('should write step files with tag suffix', async () => { @@ -91,7 +91,7 @@ describe('File tagging', () => { const stepsDir = path.join(testDir, 'steps'); const files = await fs.readdir(stepsDir); expect(files).toHaveLength(1); - expect(files[0]).toMatch(/\.vitest-0\.json$/); + expect(files[0]).toMatch(/\.vitest-0\.cbor$/); }); }); @@ -333,16 +333,16 @@ describe('File tagging', () => { // Verify hook file was created with tag const hooksDir = path.join(testDir, 'hooks'); const hookFiles = (await fs.readdir(hooksDir)).filter((f) => - f.endsWith('.json') + f.endsWith('.cbor') ); expect(hookFiles).toHaveLength(1); - expect(hookFiles[0]).toMatch(/\.vitest-0\.json$/); + expect(hookFiles[0]).toMatch(/\.vitest-0\.cbor$/); await world.clear(); // Both the tagged hook file and the untagged constraint file should be gone const hookFilesAfter = (await fs.readdir(hooksDir)).filter((f) => - f.endsWith('.json') + f.endsWith('.cbor') ); expect(hookFilesAfter).toHaveLength(0); @@ -399,30 +399,36 @@ describe('File tagging', () => { for (const dir of [runsDir, eventsDir, stepsDir]) { const files = await fs.readdir(dir); for (const file of files) { - expect(file).toMatch(/\.vitest-0\.json$/); + expect(file).toMatch(/\.vitest-0\.cbor$/); } } }); }); - describe('listJSONFiles with tagged files', () => { - it('should return fileIds including tag for correct path construction', async () => { + describe('listEntityFiles with tagged files', () => { + it('should return fileIds for both cbor and legacy json files', async () => { const dir = path.join(testDir, 'runs'); await fs.mkdir(dir, { recursive: true }); - // Write tagged and untagged files + // Write tagged and untagged files in both cbor and json formats. + // listEntityFiles should dedupe by fileId and prefer cbor when both exist. + await fs.writeFile(path.join(dir, 'wrun_ABC.cbor'), Buffer.from([0xa0])); await fs.writeFile( path.join(dir, 'wrun_ABC.json'), JSON.stringify({ id: 'wrun_ABC' }) ); + await fs.writeFile( + path.join(dir, 'wrun_DEF.vitest-0.cbor'), + Buffer.from([0xa0]) + ); await fs.writeFile( path.join(dir, 'wrun_DEF.vitest-0.json'), JSON.stringify({ id: 'wrun_DEF' }) ); - const fileIds = await listJSONFiles(dir); + const fileIds = await listEntityFiles(dir); expect(fileIds).toHaveLength(2); - // fileIds include the tag so paginatedFileSystemQuery can construct correct paths + // fileIds include the tag so paginatedFileSystemQuery can construct correct paths. expect(fileIds.sort()).toEqual(['wrun_ABC', 'wrun_DEF.vitest-0']); }); }); diff --git a/packages/world-postgres/package.json b/packages/world-postgres/package.json index fd4cbc00a6..ad44de1180 100644 --- a/packages/world-postgres/package.json +++ b/packages/world-postgres/package.json @@ -50,7 +50,7 @@ "@workflow/utils": "workspace:*", "@workflow/world": "workspace:*", "@workflow/world-local": "workspace:*", - "cbor-x": "1.6.0", + "cbor-x": "catalog:", "dotenv": "17.3.1", "drizzle-orm": "0.45.1", "graphile-worker": "0.16.6", diff --git a/packages/world-vercel/package.json b/packages/world-vercel/package.json index a57a57617c..c09d1c31be 100644 --- a/packages/world-vercel/package.json +++ b/packages/world-vercel/package.json @@ -34,7 +34,7 @@ "@vercel/queue": "catalog:", "@workflow/errors": "workspace:*", "@workflow/world": "workspace:*", - "cbor-x": "1.6.0", + "cbor-x": "catalog:", "undici": "catalog:", "zod": "catalog:" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 47678829b6..7bf78ecef1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -30,6 +30,9 @@ catalogs: ai: specifier: 6.0.116 version: 6.0.116 + cbor-x: + specifier: 1.6.4 + version: 1.6.4 esbuild: specifier: ^0.27.3 version: 0.27.3 @@ -819,7 +822,7 @@ importers: devDependencies: '@nuxt/module-builder': specifier: 1.0.2 - version: 1.0.2(@nuxt/cli@3.34.0(@nuxt/schema@4.4.2)(cac@6.7.14)(magicast@0.5.2))(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(typescript@5.9.3)(vue@3.5.30(typescript@5.9.3)) + version: 1.0.2(@nuxt/cli@3.34.0(@nuxt/schema@4.4.2)(cac@6.7.14)(magicast@0.5.2))(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(typescript@5.9.3)(vue@3.5.30(typescript@5.9.3)) '@nuxt/schema': specifier: 4.4.2 version: 4.4.2 @@ -1092,8 +1095,8 @@ importers: specifier: 12.10.1 version: 12.10.1(@types/react@19.1.13)(react-dom@19.1.0(react@19.1.0))(react@19.1.0) cbor-x: - specifier: ^1 - version: 1.6.0 + specifier: 'catalog:' + version: 1.6.4 class-variance-authority: specifier: 0.7.1 version: 0.7.1 @@ -1319,6 +1322,9 @@ importers: async-sema: specifier: 3.1.1 version: 3.1.1 + cbor-x: + specifier: 'catalog:' + version: 1.6.4 ulid: specifier: 'catalog:' version: 3.0.1 @@ -1366,8 +1372,8 @@ importers: specifier: workspace:* version: link:../world-local cbor-x: - specifier: 1.6.0 - version: 1.6.0 + specifier: 'catalog:' + version: 1.6.4 dotenv: specifier: 17.3.1 version: 17.3.1 @@ -1467,8 +1473,8 @@ importers: specifier: workspace:* version: link:../world cbor-x: - specifier: 1.6.0 - version: 1.6.0 + specifier: 'catalog:' + version: 1.6.4 undici: specifier: 'catalog:' version: 7.22.0 @@ -2799,33 +2805,33 @@ packages: resolution: {integrity: sha512-VERIM64vtTP1C4mxQ5thVT9fK0apjPFobqybMtA1UdUujWka24ERHbRHFGmpbbhp73MhV+KSsHQH9C6uOTdEQA==} engines: {node: '>=18'} - '@cbor-extract/cbor-extract-darwin-arm64@2.2.0': - resolution: {integrity: sha512-P7swiOAdF7aSi0H+tHtHtr6zrpF3aAq/W9FXx5HektRvLTM2O89xCyXF3pk7pLc7QpaY7AoaE8UowVf9QBdh3w==} + '@cbor-extract/cbor-extract-darwin-arm64@2.2.2': + resolution: {integrity: sha512-ZKZ/F8US7JR92J4DMct6cLW/Y66o2K576+zjlEN/MevH70bFIsB10wkZEQPLzl2oNh2SMGy55xpJ9JoBRl5DOA==} cpu: [arm64] os: [darwin] - '@cbor-extract/cbor-extract-darwin-x64@2.2.0': - resolution: {integrity: sha512-1liF6fgowph0JxBbYnAS7ZlqNYLf000Qnj4KjqPNW4GViKrEql2MgZnAsExhY9LSy8dnvA4C0qHEBgPrll0z0w==} + '@cbor-extract/cbor-extract-darwin-x64@2.2.2': + resolution: {integrity: sha512-32b1mgc+P61Js+KW9VZv/c+xRw5EfmOcPx990JbCBSkYJFY0l25VinvyyWfl+3KjibQmAcYwmyzKF9J4DyKP/Q==} cpu: [x64] os: [darwin] - '@cbor-extract/cbor-extract-linux-arm64@2.2.0': - resolution: {integrity: sha512-rQvhNmDuhjTVXSPFLolmQ47/ydGOFXtbR7+wgkSY0bdOxCFept1hvg59uiLPT2fVDuJFuEy16EImo5tE2x3RsQ==} + '@cbor-extract/cbor-extract-linux-arm64@2.2.2': + resolution: {integrity: sha512-wfqgzqCAy/Vn8i6WVIh7qZd0DdBFaWBjPdB6ma+Wihcjv0gHqD/mw3ouVv7kbbUNrab6dKEx/w3xQZEdeXIlzg==} cpu: [arm64] os: [linux] - '@cbor-extract/cbor-extract-linux-arm@2.2.0': - resolution: {integrity: sha512-QeBcBXk964zOytiedMPQNZr7sg0TNavZeuUCD6ON4vEOU/25+pLhNN6EDIKJ9VLTKaZ7K7EaAriyYQ1NQ05s/Q==} + '@cbor-extract/cbor-extract-linux-arm@2.2.2': + resolution: {integrity: sha512-tNg0za41TpQfkhWjptD+0gSD2fggMiDCSacuIeELyb2xZhr7PrhPe5h66Jc67B/5dmpIhI2QOUtv4SBsricyYQ==} cpu: [arm] os: [linux] - '@cbor-extract/cbor-extract-linux-x64@2.2.0': - resolution: {integrity: sha512-cWLAWtT3kNLHSvP4RKDzSTX9o0wvQEEAj4SKvhWuOVZxiDAeQazr9A+PSiRILK1VYMLeDml89ohxCnUNQNQNCw==} + '@cbor-extract/cbor-extract-linux-x64@2.2.2': + resolution: {integrity: sha512-rpiLnVEsqtPJ+mXTdx1rfz4RtUGYIUg2rUAZgd1KjiC1SehYUSkJN7Yh+aVfSjvCGtVP0/bfkQkXpPXKbmSUaA==} cpu: [x64] os: [linux] - '@cbor-extract/cbor-extract-win32-x64@2.2.0': - resolution: {integrity: sha512-l2M+Z8DO2vbvADOBNLbbh9y5ST1RY5sqkWOg/58GkUPBYou/cuNZ68SGQ644f1CvZ8kcOxyZtw06+dxWHIoN/w==} + '@cbor-extract/cbor-extract-win32-x64@2.2.2': + resolution: {integrity: sha512-dI+9P7cfWxkTQ+oE+7Aa6onEn92PHgfWXZivjNheCRmTBDBf2fx6RyTi0cmgpYLnD1KLZK9ZYrMxaPZ4oiXhGA==} cpu: [x64] os: [win32] @@ -8949,12 +8955,12 @@ packages: castable-video@1.1.11: resolution: {integrity: sha512-LCRTK6oe7SB1SiUQFzZCo6D6gcEzijqBTVIuj3smKpQdesXM18QTbCVqWgh9MfOeQgTx/i9ji5jGcdqNPeWg2g==} - cbor-extract@2.2.0: - resolution: {integrity: sha512-Ig1zM66BjLfTXpNgKpvBePq271BPOvu8MR0Jl080yG7Jsl+wAZunfrwiwA+9ruzm/WEdIV5QF/bjDZTqyAIVHA==} + cbor-extract@2.2.2: + resolution: {integrity: sha512-hlSxxI9XO2yQfe9g6msd3g4xCfDqK5T5P0fRMLuaLHhxn4ViPrm+a+MUfhrvH2W962RGxcBwEGzLQyjbDG1gng==} hasBin: true - cbor-x@1.6.0: - resolution: {integrity: sha512-0kareyRwHSkL6ws5VXHEf8uY1liitysCVJjlmhaLG+IXLqhSaOO+t63coaso7yjwEzWZzLy8fJo06gZDVQM9Qg==} + cbor-x@1.6.4: + resolution: {integrity: sha512-UGKHjp6RHC6QuZ2yy5LCKm7MojM4716DwoSaqwQpaH4DvZvbBTGcoDNTiG9Y2lByXZYFEs9WRkS5tLl96IrF1Q==} ccount@2.0.1: resolution: {integrity: sha512-eyrF0jiFpY+3drT6383f1qhkbGsLSifNAjA61IUjZjmLCWjItY6LB9ft9YhoDgwfmclB2zhu51Lc7+95b8NRAg==} @@ -16268,22 +16274,22 @@ snapshots: dependencies: fontkitten: 1.0.2 - '@cbor-extract/cbor-extract-darwin-arm64@2.2.0': + '@cbor-extract/cbor-extract-darwin-arm64@2.2.2': optional: true - '@cbor-extract/cbor-extract-darwin-x64@2.2.0': + '@cbor-extract/cbor-extract-darwin-x64@2.2.2': optional: true - '@cbor-extract/cbor-extract-linux-arm64@2.2.0': + '@cbor-extract/cbor-extract-linux-arm64@2.2.2': optional: true - '@cbor-extract/cbor-extract-linux-arm@2.2.0': + '@cbor-extract/cbor-extract-linux-arm@2.2.2': optional: true - '@cbor-extract/cbor-extract-linux-x64@2.2.0': + '@cbor-extract/cbor-extract-linux-x64@2.2.2': optional: true - '@cbor-extract/cbor-extract-win32-x64@2.2.0': + '@cbor-extract/cbor-extract-win32-x64@2.2.2': optional: true '@changesets/apply-release-plan@7.0.14': @@ -17967,7 +17973,7 @@ snapshots: transitivePeerDependencies: - magicast - '@nuxt/module-builder@1.0.2(@nuxt/cli@3.34.0(@nuxt/schema@4.4.2)(cac@6.7.14)(magicast@0.5.2))(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(typescript@5.9.3)(vue@3.5.30(typescript@5.9.3))': + '@nuxt/module-builder@1.0.2(@nuxt/cli@3.34.0(@nuxt/schema@4.4.2)(cac@6.7.14)(magicast@0.5.2))(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(typescript@5.9.3)(vue@3.5.30(typescript@5.9.3))': dependencies: '@nuxt/cli': 3.34.0(@nuxt/schema@4.4.2)(cac@6.7.14)(magicast@0.5.2) citty: 0.1.6 @@ -17975,14 +17981,14 @@ snapshots: defu: 6.1.4 jiti: 2.6.1 magic-regexp: 0.10.0 - mkdist: 2.4.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)) + mkdist: 2.4.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)) mlly: 1.8.0 pathe: 2.0.3 pkg-types: 2.3.0 tsconfck: 3.1.6(typescript@5.9.3) typescript: 5.9.3 - unbuild: 3.6.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)) - vue-sfc-transformer: 0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(vue@3.5.30(typescript@5.9.3)) + unbuild: 3.6.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)) + vue-sfc-transformer: 0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(vue@3.5.30(typescript@5.9.3)) transitivePeerDependencies: - '@vue/compiler-core' - esbuild @@ -23899,21 +23905,21 @@ snapshots: dependencies: custom-media-element: 1.4.5 - cbor-extract@2.2.0: + cbor-extract@2.2.2: dependencies: node-gyp-build-optional-packages: 5.1.1 optionalDependencies: - '@cbor-extract/cbor-extract-darwin-arm64': 2.2.0 - '@cbor-extract/cbor-extract-darwin-x64': 2.2.0 - '@cbor-extract/cbor-extract-linux-arm': 2.2.0 - '@cbor-extract/cbor-extract-linux-arm64': 2.2.0 - '@cbor-extract/cbor-extract-linux-x64': 2.2.0 - '@cbor-extract/cbor-extract-win32-x64': 2.2.0 + '@cbor-extract/cbor-extract-darwin-arm64': 2.2.2 + '@cbor-extract/cbor-extract-darwin-x64': 2.2.2 + '@cbor-extract/cbor-extract-linux-arm': 2.2.2 + '@cbor-extract/cbor-extract-linux-arm64': 2.2.2 + '@cbor-extract/cbor-extract-linux-x64': 2.2.2 + '@cbor-extract/cbor-extract-win32-x64': 2.2.2 optional: true - cbor-x@1.6.0: + cbor-x@1.6.4: optionalDependencies: - cbor-extract: 2.2.0 + cbor-extract: 2.2.2 ccount@2.0.1: {} @@ -27443,7 +27449,7 @@ snapshots: mkdirp@3.0.1: {} - mkdist@2.4.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)): + mkdist@2.4.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)): dependencies: autoprefixer: 10.4.21(postcss@8.5.6) citty: 0.1.6 @@ -27461,7 +27467,7 @@ snapshots: optionalDependencies: typescript: 5.9.3 vue: 3.5.30(typescript@5.9.3) - vue-sfc-transformer: 0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(vue@3.5.30(typescript@5.9.3)) + vue-sfc-transformer: 0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(vue@3.5.30(typescript@5.9.3)) mlly@1.8.0: dependencies: @@ -31166,7 +31172,7 @@ snapshots: ultrahtml@1.6.0: {} - unbuild@3.6.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)): + unbuild@3.6.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)): dependencies: '@rollup/plugin-alias': 5.1.1(rollup@4.53.2) '@rollup/plugin-commonjs': 28.0.9(rollup@4.53.2) @@ -31182,7 +31188,7 @@ snapshots: hookable: 5.5.3 jiti: 2.6.1 magic-string: 0.30.21 - mkdist: 2.4.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)) + mkdist: 2.4.1(typescript@5.9.3)(vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(vue@3.5.30(typescript@5.9.3)))(vue@3.5.30(typescript@5.9.3)) mlly: 1.8.0 pathe: 2.0.3 pkg-types: 2.3.0 @@ -31978,11 +31984,11 @@ snapshots: optionalDependencies: '@vue/compiler-sfc': 3.5.30 - vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.4)(vue@3.5.30(typescript@5.9.3)): + vue-sfc-transformer@0.1.17(@vue/compiler-core@3.5.30)(esbuild@0.27.3)(vue@3.5.30(typescript@5.9.3)): dependencies: '@babel/parser': 7.28.5 '@vue/compiler-core': 3.5.30 - esbuild: 0.27.4 + esbuild: 0.27.3 vue: 3.5.30(typescript@5.9.3) vue@3.5.30(typescript@5.9.3): diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 667f4878f3..5ca8710999 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -14,6 +14,7 @@ catalog: "@vercel/queue": 0.1.4 "@vitest/coverage-v8": ^4.0.18 ai: 6.0.116 + cbor-x: 1.6.4 esbuild: ^0.27.3 nitro: 3.0.1-alpha.1 semver: 7.7.4