diff --git a/packages/alchemy/src/Cloudflare/Workers/Workflow.ts b/packages/alchemy/src/Cloudflare/Workers/Workflow.ts index fab05122b..7b1129c6f 100644 --- a/packages/alchemy/src/Cloudflare/Workers/Workflow.ts +++ b/packages/alchemy/src/Cloudflare/Workers/Workflow.ts @@ -30,19 +30,99 @@ export class WorkflowEvent extends Context.Service< payload: unknown; timestamp: Date; instanceId: string; + workflowName: string; + schedule?: WorkflowCronSchedule; } >()("Cloudflare.WorkflowEvent") {} +export interface WorkflowCronSchedule { + cron: string; + scheduledTime: number; +} + +export type WorkflowBackoff = "constant" | "linear" | "exponential"; + +export interface WorkflowStepConfig { + retries?: { + limit: number; + delay: string | number; + backoff?: WorkflowBackoff; + }; + timeout?: string | number; +} + +export interface WorkflowStepContextData { + step: { + name: string; + count: number; + }; + attempt: number; + config: WorkflowStepConfig; +} + +/** + * Runtime information for the current `task` attempt. + */ +export class WorkflowStepContext extends Context.Service< + WorkflowStepContext, + WorkflowStepContextData +>()("Cloudflare.WorkflowStepContext") {} + +export interface WorkflowRollbackContext { + error: Error; + output: Output | undefined; +} + +export interface WorkflowRollbackOptions { + rollback: ( + context: WorkflowRollbackContext, + ) => Effect.Effect; + rollbackConfig?: WorkflowStepConfig; +} + +/** + * A durable workflow step. Keep the name, Effect, retry config, timeout, and + * rollback handler together in one object. + */ +export interface WorkflowTaskOptions< + Output = unknown, + R = never, + RollbackReq = never, +> extends WorkflowStepConfig { + name: string; + effect: Effect.Effect; + rollback?: ( + context: WorkflowRollbackContext, + ) => Effect.Effect; + rollbackConfig?: WorkflowStepConfig; +} + +export interface WorkflowWaitForEventOptions { + type: string; + timeout?: string | number; +} + +type ExcludeWorkflowStepContext = R extends { + readonly key: "Cloudflare.WorkflowStepContext"; +} + ? never + : R; + /** * Internal service that wraps the Cloudflare `WorkflowStep` object. - * Not accessed directly by users -- use `task`, `sleep`, `sleepUntil` instead. + * Not accessed directly by users -- use `task`, `sleep`, `sleepUntil`, and + * `waitForEvent` instead. */ export class WorkflowStep extends Context.Service< WorkflowStep, { - do(name: string, effect: Effect.Effect): Effect.Effect; + do(options: WorkflowTaskOptions): Effect.Effect; sleep(name: string, duration: string | number): Effect.Effect; sleepUntil(name: string, timestamp: Date | number): Effect.Effect; + waitForEvent( + name: string, + options: WorkflowWaitForEventOptions, + ): Effect.Effect; } >()("Cloudflare.WorkflowStep") {} @@ -59,16 +139,34 @@ export class WorkflowStep extends Context.Service< * binding like `kv.put` / `kv.get`) are threaded through automatically by * capturing the surrounding workflow body's context and providing it to * the inner effect before it runs inside `step.do`. + * + * `task` takes one options object so the step name, Effect, retry config, and + * rollback handler stay together. */ -export const task = ( - name: string, - effect: Effect.Effect, -): Effect.Effect => - Effect.gen(function* () { +export function task( + options: WorkflowTaskOptions, +): Effect.Effect< + T, + never, + WorkflowStep | ExcludeWorkflowStepContext +> { + return Effect.gen(function* () { const step = yield* WorkflowStep; - const context = yield* Effect.context(); - return yield* step.do(name, effect.pipe(Effect.provide(context))); + const context = + yield* Effect.context>(); + const rollbackEffect = options.rollback; + const rollbackConfig = options.rollbackConfig; + return yield* step.do({ + ...options, + effect: options.effect.pipe(Effect.provide(context)), + rollback: rollbackEffect + ? (rollbackContext: WorkflowRollbackContext) => + rollbackEffect(rollbackContext).pipe(Effect.provide(context)) + : undefined, + rollbackConfig, + } as WorkflowTaskOptions); }); +} /** * Pause the workflow for the given duration. @@ -77,10 +175,10 @@ export const sleep = ( name: string, duration: string | number, ): Effect.Effect => - WorkflowStep.pipe( - Effect.flatMap((step) => step.sleep(name, duration)), - Effect.orDie, - ); + Effect.gen(function* () { + const step = yield* WorkflowStep; + yield* step.sleep(name, duration); + }).pipe(Effect.orDie); /** * Pause the workflow until the given timestamp. @@ -89,10 +187,23 @@ export const sleepUntil = ( name: string, timestamp: Date | number, ): Effect.Effect => - WorkflowStep.pipe( - Effect.flatMap((step) => step.sleepUntil(name, timestamp)), - Effect.orDie, - ); + Effect.gen(function* () { + const step = yield* WorkflowStep; + yield* step.sleepUntil(name, timestamp); + }).pipe(Effect.orDie); + +/** + * Pause the workflow until an external event is delivered with + * `WorkflowInstance.sendEvent`. + */ +export const waitForEvent = ( + name: string, + options: WorkflowWaitForEventOptions, +): Effect.Effect => + Effect.gen(function* () { + const step = yield* WorkflowStep; + return yield* step.waitForEvent(name, options); + }).pipe(Effect.orDie); /** * The services available inside a workflow run body. @@ -161,22 +272,75 @@ export const isWorkflowBinding = (binding: { export interface WorkflowHandle { Type: WorkflowTypeId; name: string; - create(input: Input): Effect.Effect>; + /** + * Start a workflow instance. Pass payload through `params`; omit `id` to let + * Cloudflare generate an instance ID. + */ + create( + options?: WorkflowInstanceCreateOptions, + ): Effect.Effect>; + createBatch( + batch: WorkflowInstanceCreateOptions[], + ): Effect.Effect[]>; get(instanceId: string): Effect.Effect>; } +/** Options for starting a workflow instance. */ +export interface WorkflowInstanceCreateOptions { + id?: string; + params?: Input; + retention?: WorkflowInstanceRetention; +} + +export interface WorkflowInstanceRetention { + successRetention?: string | number; + errorRetention?: string | number; +} + +/** Handle for a single Cloudflare workflow instance. */ export interface WorkflowInstance { id: string; status(): Effect.Effect>; pause(): Effect.Effect; resume(): Effect.Effect; + restart(options?: WorkflowInstanceRestartOptions): Effect.Effect; terminate(): Effect.Effect; + sendEvent( + event: WorkflowInstanceEvent, + ): Effect.Effect; +} + +export interface WorkflowInstanceRestartOptions { + from?: { + name: string; + count?: number; + type?: "do" | "sleep" | "waitForEvent"; + }; +} + +export interface WorkflowInstanceEvent { + type: string; + payload?: Payload; } export interface WorkflowInstanceStatus { - status: string; + status: + | "queued" + | "running" + | "paused" + | "errored" + | "terminated" + | "complete" + | "waiting" + | "waitingForPause" + | "unknown" + | (string & {}); output?: Result; error?: { name: string; message: string } | null; + rollback?: { + outcome: "complete" | "failed"; + error: { name: string; message: string } | null; + } | null; } export interface WorkflowClass extends Effect.Effect< @@ -219,7 +383,8 @@ export class WorkflowScope extends Context.Service< * Objects. The outer `Effect.gen` resolves shared dependencies. The inner * `Effect.fn` is the workflow body — a function from a typed `input` * payload to an Effect that runs steps using `task`, `sleep`, and - * `sleepUntil`. + * `sleepUntil`. `task` keeps the step name, Effect, retry config, timeout, + * and rollback handler in one object. * * ```typescript * Effect.gen(function* () { @@ -228,7 +393,10 @@ export class WorkflowScope extends Context.Service< * * return Effect.fn(function* (input: { orderId: string }) { * // Phase 2: workflow body (durable steps) - * const result = yield* Cloudflare.task("process", doWork(input.orderId)); + * const result = yield* Cloudflare.task({ + * name: "process", + * effect: doWork(input.orderId), + * }); * yield* Cloudflare.sleep("cooldown", "10 seconds"); * return result; * }); @@ -255,10 +423,33 @@ export class WorkflowScope extends Context.Service< * @section Step Primitives * @example Running a named task * ```typescript - * const result = yield* Cloudflare.task( - * "process-order", - * Effect.succeed({ orderId: "abc", total: 42 }), - * ); + * const result = yield* Cloudflare.task({ + * name: "process-order", + * effect: Effect.succeed({ orderId: "abc", total: 42 }), + * }); + * ``` + * + * @example Configuring retries and reading step context + * ```typescript + * const result = yield* Cloudflare.task({ + * name: "call-api", + * retries: { limit: 3, delay: "5 seconds", backoff: "linear" }, + * effect: Effect.gen(function* () { + * const context = yield* Cloudflare.WorkflowStepContext; + * return { attempt: context.attempt }; + * }), + * }); + * ``` + * + * @example Registering rollback + * ```typescript + * yield* Cloudflare.task({ + * name: "reserve-inventory", + * effect: reserveInventory, + * rollback: ({ output }) => + * output ? releaseInventory(output.reservationId) : Effect.void, + * rollbackConfig: { retries: { limit: 3, delay: "10 seconds" } }, + * }); * ``` * * @example Sleeping between steps @@ -266,6 +457,14 @@ export class WorkflowScope extends Context.Service< * yield* Cloudflare.sleep("cooldown", "30 seconds"); * ``` * + * @example Waiting for an external event + * ```typescript + * const event = yield* Cloudflare.waitForEvent<{ approved: boolean }>( + * "approval", + * { type: "approval", timeout: "1 day" }, + * ); + * ``` + * * @example Accessing env bindings inside a task * Bind a resource (e.g. `KVNamespace`, `R2Bucket`) in the workflow's * outer init phase to get a typed Effect-native client, then use it @@ -280,14 +479,14 @@ export class WorkflowScope extends Context.Service< * return Effect.fn(function* (input: { roomId: string; message: string }) { * const { roomId, message } = input; * - * const stored = yield* Cloudflare.task( - * "kv-roundtrip", - * Effect.gen(function* () { + * const stored = yield* Cloudflare.task({ + * name: "kv-roundtrip", + * effect: Effect.gen(function* () { * const key = `workflow:${roomId}`; * yield* kv.put(key, message); * return yield* kv.get(key); * }).pipe(Effect.orDie), - * ); + * }); * * return stored; * }); @@ -295,10 +494,31 @@ export class WorkflowScope extends Context.Service< * ``` * * @section Starting and Monitoring Instances + * `create` mirrors Cloudflare's native Workflow API: pass workflow input in + * `params`, pass `id` only when you need a deterministic instance ID, and omit + * `id` to let Cloudflare generate one. + * * @example Creating an instance from a Worker * ```typescript * const workflow = yield* MyWorkflow; - * const instance = yield* workflow.create({ orderId: "abc" }); + * const instance = yield* workflow.create({ params: { orderId: "abc" } }); + * ``` + * + * @example Creating an instance with id and retention + * ```typescript + * const instance = yield* workflow.create({ + * id: "order-abc", + * params: { orderId: "abc" }, + * retention: { successRetention: "1 day", errorRetention: "7 days" }, + * }); + * ``` + * + * @example Creating a batch + * ```typescript + * const instances = yield* workflow.createBatch([ + * { id: "order-a", params: { orderId: "a" } }, + * { id: "order-b", params: { orderId: "b" } }, + * ]); * ``` * * @example Checking instance status @@ -308,6 +528,13 @@ export class WorkflowScope extends Context.Service< * const status = yield* handle.status(); * ``` * + * @example Sending events and restarting instances + * ```typescript + * const instance = yield* workflow.get(instanceId); + * yield* instance.sendEvent({ type: "approval", payload: { approved: true } }); + * yield* instance.restart({ from: { name: "approval", type: "waitForEvent" } }); + * ``` + * * @section Triggering from a Worker * Wire the workflow into HTTP routes so callers can fire instances * and poll for completion. @@ -323,7 +550,7 @@ export class WorkflowScope extends Context.Service< * * if (request.url.startsWith("/workflow/start/")) { * const id = request.url.split("/").pop()!; - * const instance = yield* notifier.create({ id }); + * const instance = yield* notifier.create({ params: { orderId: id } }); * return HttpServerResponse.json({ instanceId: instance.id }); * } * @@ -339,9 +566,8 @@ export class WorkflowScope extends Context.Service< * ``` * * @section Testing Workflows - * Workflows run asynchronously, so tests start an instance and - * poll until it reaches a terminal status. A simple recipe with - * `alchemy/Test/Bun`: + * Workflows run asynchronously, so tests start an instance and poll until it + * reaches a terminal status. Keep polling bounded with `Effect.repeat`. * * @example Polling for workflow completion * ```typescript @@ -353,20 +579,20 @@ export class WorkflowScope extends Context.Service< * const start = yield* HttpClient.post(`${url}/workflow/start/x`); * const { instanceId } = (yield* start.json) as { instanceId: string }; * - * let status: { status: string } | undefined; - * const deadline = Date.now() + 60_000; - * while (Date.now() < deadline) { - * const res = yield* HttpClient.get( - * `${url}/workflow/status/${instanceId}`, - * ); - * status = (yield* res.json) as { status: string }; - * if (status.status === "complete" || status.status === "errored") { - * break; - * } - * yield* Effect.sleep("2 seconds"); - * } + * const status = yield* HttpClient.get( + * `${url}/workflow/status/${instanceId}`, + * ).pipe( + * Effect.flatMap((res) => res.json), + * Effect.map((json) => json as { status: string }), + * Effect.repeat({ + * schedule: Schedule.spaced("2 seconds"), + * until: (status) => + * status.status === "complete" || status.status === "errored", + * times: 30, + * }), + * ); * - * expect(status?.status).toBe("complete"); + * expect(status.status).toBe("complete"); * }), * { timeout: 120_000 }, * ); @@ -425,11 +651,18 @@ export const Workflow: WorkflowClass = taggedFunction(WorkflowScope, (( const self: WorkflowHandle = { Type: WorkflowTypeId, name, - create: (input: unknown) => - Effect.tryPromise(() => binding.create({ params: input })).pipe( + create: (options?: WorkflowInstanceCreateOptions) => + Effect.tryPromise(() => binding.create(options)).pipe( Effect.map(wrapInstance), Effect.orDie, ), + createBatch: (batch: WorkflowInstanceCreateOptions[]) => + Effect.tryPromise( + () => binding.createBatch(batch) as Promise, + ).pipe( + Effect.map((instances: any[]) => instances.map(wrapInstance)), + Effect.orDie, + ), get: (instanceId: string) => Effect.tryPromise(() => binding.get(instanceId)).pipe( Effect.map(wrapInstance), @@ -588,10 +821,15 @@ const wrapInstance = (raw: any): WorkflowInstance => ({ status: s.status as string, output: s.output as Result, error: s.error, + rollback: s.rollback, })), Effect.orDie, ), pause: () => Effect.tryPromise(() => raw.pause()).pipe(Effect.orDie), resume: () => Effect.tryPromise(() => raw.resume()).pipe(Effect.orDie), + restart: (options?: WorkflowInstanceRestartOptions) => + Effect.tryPromise(() => raw.restart(options)).pipe(Effect.orDie), terminate: () => Effect.tryPromise(() => raw.terminate()).pipe(Effect.orDie), + sendEvent: (event: WorkflowInstanceEvent) => + Effect.tryPromise(() => raw.sendEvent(event)).pipe(Effect.orDie), }); diff --git a/packages/alchemy/src/Cloudflare/Workers/WorkflowBridge.ts b/packages/alchemy/src/Cloudflare/Workers/WorkflowBridge.ts index 7a3000881..5ca87e352 100644 --- a/packages/alchemy/src/Cloudflare/Workers/WorkflowBridge.ts +++ b/packages/alchemy/src/Cloudflare/Workers/WorkflowBridge.ts @@ -11,6 +11,9 @@ import { type WorkflowExport, type WorkflowImpl, WorkflowStep, + WorkflowStepContext, + type WorkflowStepConfig, + type WorkflowTaskOptions, } from "./Workflow.ts"; /** @@ -105,22 +108,51 @@ export const makeWorkflowBridge = } }; -const wrapWorkflowEvent = ( - event: any, -): { payload: unknown; timestamp: Date; instanceId: string } => ({ +const wrapWorkflowEvent = (event: any): WorkflowEventService["Service"] => ({ payload: event.payload, timestamp: event.timestamp instanceof Date ? event.timestamp : new Date(event.timestamp), instanceId: event.instanceId ?? "", + workflowName: event.workflowName ?? "", + schedule: event.schedule, }); const wrapWorkflowStep = (step: any): WorkflowStep["Service"] => ({ - do: (name: string, effect: Effect.Effect): Effect.Effect => - Effect.tryPromise( - () => step.do(name, () => Effect.runPromise(effect)) as Promise, - ), + do: (options: WorkflowTaskOptions): Effect.Effect => { + const { name, effect } = options; + const config = toWorkflowStepConfig(options); + const rollbackEffect = options.rollback; + const callback = (context: any) => + Effect.runPromise( + effect.pipe( + Effect.provideService(WorkflowStepContext, { + step: context.step, + attempt: context.attempt, + config: context.config, + }), + ), + ); + const rollback = rollbackEffect + ? { + rollback: (context: any) => + Effect.runPromise( + rollbackEffect({ + error: context.error, + output: context.output, + }), + ), + rollbackConfig: options.rollbackConfig, + } + : undefined; + return Effect.tryPromise(() => { + if (config && rollback) return step.do(name, config, callback, rollback); + if (config) return step.do(name, config, callback); + if (rollback) return step.do(name, callback, rollback); + return step.do(name, callback); + }); + }, sleep: (name: string, duration: string | number): Effect.Effect => Effect.tryPromise(() => step.sleep(name, duration)), sleepUntil: (name: string, timestamp: Date | number): Effect.Effect => @@ -130,4 +162,13 @@ const wrapWorkflowStep = (step: any): WorkflowStep["Service"] => ({ timestamp instanceof Date ? timestamp.toISOString() : timestamp, ), ), + waitForEvent: (name: string, options: any): Effect.Effect => + Effect.tryPromise(() => step.waitForEvent(name, options) as Promise), }); + +const toWorkflowStepConfig = ( + options: WorkflowTaskOptions, +): WorkflowStepConfig | undefined => { + if (!options.retries && !options.timeout) return undefined; + return { retries: options.retries, timeout: options.timeout }; +}; diff --git a/packages/alchemy/test/Cloudflare/Workers/Workflow.test.ts b/packages/alchemy/test/Cloudflare/Workers/Workflow.test.ts index 356c97ef6..a60998c69 100644 --- a/packages/alchemy/test/Cloudflare/Workers/Workflow.test.ts +++ b/packages/alchemy/test/Cloudflare/Workers/Workflow.test.ts @@ -29,10 +29,41 @@ afterAll.skipIf(!!process.env.NO_DESTROY)(destroy(Stack)); interface WorkflowStatus { status: string; - output?: { greeting: string; envBindingCount: number }; + output?: { + greeting: string; + envBindingCount: number; + workflowName: string; + stepAttempt: number; + instanceId: string; + }; error?: { message?: string } | null; + rollback?: { + outcome: "complete" | "failed"; + error: { message?: string } | null; + } | null; } +const isTerminal = (status: WorkflowStatus) => + status.status === "complete" || + status.status === "errored" || + status.status === "terminated"; + +const waitForStatus = ( + client: HttpClient.HttpClient, + url: string, + id: string, + until: (status: WorkflowStatus) => boolean = isTerminal, +) => + client.get(`${url}/workflow/status/${id}`).pipe( + Effect.flatMap((res) => res.json), + Effect.map((json) => json as unknown as WorkflowStatus), + Effect.repeat({ + schedule: Schedule.spaced("2 seconds"), + until, + times: 30, + }), + ); + // Start a fresh workflow instance and poll until it reaches a terminal state. // A transient `errored` during edge/binding propagation fails this effect so // the caller can retry with a brand-new instance. @@ -58,17 +89,7 @@ const runWorkflowToCompletion = (url: string) => const { instanceId } = (yield* startRes.json) as { instanceId: string }; expect(instanceId).toBeTypeOf("string"); - const lastStatus = yield* client - .get(`${url}/workflow/status/${instanceId}`) - .pipe( - Effect.flatMap((res) => res.json), - Effect.map((json) => json as unknown as WorkflowStatus), - Effect.repeat({ - schedule: Schedule.spaced("2 seconds"), - until: (s) => s.status === "complete" || s.status === "errored", - times: 12, - }), - ); + const lastStatus = yield* waitForStatus(client, url, instanceId); // Surface a non-complete terminal state as a failure so the outer retry // can take another swing (a fresh worker occasionally errors a step while @@ -97,11 +118,55 @@ test( expect(lastStatus.status).toBe("complete"); expect(lastStatus.error).toBeFalsy(); expect(lastStatus.output?.greeting).toBe("Hello, world!"); + expect(lastStatus.output?.workflowName).toBe("TestWorkflow"); + expect(lastStatus.output?.stepAttempt).toBe(1); + expect(lastStatus.rollback).toBeNull(); // The body yields `WorkerEnvironment` — if the regression from PR #71 ever // returns, the body dies on the first yield and `output` is undefined. expect(lastStatus.output?.envBindingCount).toBeGreaterThan(0); }).pipe(logLevel), - { timeout: 30_000 }, + { timeout: 180_000 }, +); + +test( + "workflow can wait for and receive external events", + Effect.gen(function* () { + const { url } = yield* stack; + const client = yield* HttpClient.HttpClient; + + const startRes = yield* client.post(`${url}/workflow/wait/world`).pipe( + Effect.flatMap((res) => + res.status === 200 + ? Effect.succeed(res) + : Effect.fail(new Error(`Worker not ready: ${res.status}`)), + ), + Effect.retry({ + schedule: Schedule.exponential("500 millis"), + times: 15, + }), + ); + const { instanceId } = (yield* startRes.json) as { instanceId: string }; + + const waitingStatus = yield* waitForStatus( + client, + url, + instanceId, + (status) => status.status === "waiting" || isTerminal(status), + ); + expect(waitingStatus.status).toBe("waiting"); + + const sendRes = yield* client.post( + `${url}/workflow/send/${instanceId}/external-ok`, + ); + expect(sendRes.status).toBe(200); + + const lastStatus = yield* waitForStatus(client, url, instanceId); + expect(lastStatus.status).toBe("complete"); + expect(lastStatus.error).toBeFalsy(); + expect(lastStatus.output?.greeting).toBe("external-ok"); + expect(lastStatus.output?.instanceId).toBe(instanceId); + }).pipe(logLevel), + { timeout: 180_000 }, ); // Canonical `list()` test (account collection): deploy the worker+workflow diff --git a/packages/alchemy/test/Cloudflare/Workers/fixtures/drizzle-workflow/worker.ts b/packages/alchemy/test/Cloudflare/Workers/fixtures/drizzle-workflow/worker.ts index a92dd508e..50dbce98c 100644 --- a/packages/alchemy/test/Cloudflare/Workers/fixtures/drizzle-workflow/worker.ts +++ b/packages/alchemy/test/Cloudflare/Workers/fixtures/drizzle-workflow/worker.ts @@ -25,7 +25,9 @@ export default class DrizzleWorkflowWorker extends Cloudflare.Worker()( "TestWorkflow", Effect.gen(function* () { - return Effect.fn(function* (input: { value: string }) { - console.log("greeted"); + return Effect.fn(function* (input: { value: string; wait?: boolean }) { const env = yield* Cloudflare.WorkerEnvironment; + const event = yield* Cloudflare.WorkflowEvent; - const greeted = yield* Cloudflare.task( - "greet", - Effect.succeed(`Hello, ${input.value}!`), - ); + const greeted = yield* Cloudflare.task({ + name: "greet", + retries: { limit: 3, delay: "1 second", backoff: "linear" }, + timeout: "1 minute", + effect: Effect.gen(function* () { + const context = yield* Cloudflare.WorkflowStepContext; + return { + text: `Hello, ${input.value}!`, + attempt: context.attempt, + }; + }), + }); + + if (input.wait) { + const external = yield* Cloudflare.waitForEvent<{ message: string }>( + "external-event", + { type: "test-event", timeout: "5 minutes" }, + ); + + return { + greeting: external.message, + envBindingCount: Object.keys(env).length, + workflowName: event.workflowName, + stepAttempt: greeted.attempt, + instanceId: event.instanceId, + }; + } yield* Cloudflare.sleep("cooldown", "1 second"); - const finalized = yield* Cloudflare.task( - "finalize", - Effect.succeed({ - greeting: greeted, + const finalized = yield* Cloudflare.task({ + name: "finalize", + effect: Effect.succeed({ + greeting: greeted.text, envBindingCount: Object.keys(env).length, + workflowName: event.workflowName, + stepAttempt: greeted.attempt, + instanceId: event.instanceId, }), - ); + rollback: () => Effect.void, + rollbackConfig: { retries: { limit: 1, delay: "1 second" } }, + }); return finalized; }); diff --git a/packages/alchemy/test/Cloudflare/Workers/fixtures/workflow/workflow-worker.ts b/packages/alchemy/test/Cloudflare/Workers/fixtures/workflow/workflow-worker.ts index 420a1566d..3480e3632 100644 --- a/packages/alchemy/test/Cloudflare/Workers/fixtures/workflow/workflow-worker.ts +++ b/packages/alchemy/test/Cloudflare/Workers/fixtures/workflow/workflow-worker.ts @@ -18,10 +18,29 @@ export default class WorkflowTestWorker extends Cloudflare.Worker