Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 287 additions & 49 deletions packages/alchemy/src/Cloudflare/Workers/Workflow.ts

Large diffs are not rendered by default.

55 changes: 48 additions & 7 deletions packages/alchemy/src/Cloudflare/Workers/WorkflowBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import {
type WorkflowExport,
type WorkflowImpl,
WorkflowStep,
WorkflowStepContext,
type WorkflowStepConfig,
type WorkflowTaskOptions,
} from "./Workflow.ts";

/**
Expand Down Expand Up @@ -105,22 +108,51 @@ export const makeWorkflowBridge =
}
};

const wrapWorkflowEvent = (
event: any,
): { payload: unknown; timestamp: Date; instanceId: string } => ({
const wrapWorkflowEvent = (event: any): WorkflowEventService["Service"] => ({
payload: event.payload,
timestamp:
event.timestamp instanceof Date
? event.timestamp
: new Date(event.timestamp),
instanceId: event.instanceId ?? "",
workflowName: event.workflowName ?? "",
schedule: event.schedule,
});

const wrapWorkflowStep = (step: any): WorkflowStep["Service"] => ({
do: <T>(name: string, effect: Effect.Effect<T>): Effect.Effect<T> =>
Effect.tryPromise(
() => step.do(name, () => Effect.runPromise(effect)) as Promise<T>,
),
do: <T>(options: WorkflowTaskOptions<T, any, any>): Effect.Effect<T> => {
const { name, effect } = options;
const config = toWorkflowStepConfig(options);
const rollbackEffect = options.rollback;
const callback = (context: any) =>
Effect.runPromise(
effect.pipe(
Effect.provideService(WorkflowStepContext, {
step: context.step,
attempt: context.attempt,
config: context.config,
}),
),
);
const rollback = rollbackEffect
? {
rollback: (context: any) =>
Effect.runPromise(
rollbackEffect({
error: context.error,
output: context.output,
}),
),
rollbackConfig: options.rollbackConfig,
}
: undefined;
return Effect.tryPromise(() => {
if (config && rollback) return step.do(name, config, callback, rollback);
if (config) return step.do(name, config, callback);
if (rollback) return step.do(name, callback, rollback);
return step.do(name, callback);
});
},
sleep: (name: string, duration: string | number): Effect.Effect<void> =>
Effect.tryPromise(() => step.sleep(name, duration)),
sleepUntil: (name: string, timestamp: Date | number): Effect.Effect<void> =>
Expand All @@ -130,4 +162,13 @@ const wrapWorkflowStep = (step: any): WorkflowStep["Service"] => ({
timestamp instanceof Date ? timestamp.toISOString() : timestamp,
),
),
waitForEvent: <T>(name: string, options: any): Effect.Effect<T> =>
Effect.tryPromise(() => step.waitForEvent(name, options) as Promise<T>),
});

const toWorkflowStepConfig = (
options: WorkflowTaskOptions,
): WorkflowStepConfig | undefined => {
if (!options.retries && !options.timeout) return undefined;
return { retries: options.retries, timeout: options.timeout };
};
91 changes: 78 additions & 13 deletions packages/alchemy/test/Cloudflare/Workers/Workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,41 @@ afterAll.skipIf(!!process.env.NO_DESTROY)(destroy(Stack));

interface WorkflowStatus {
status: string;
output?: { greeting: string; envBindingCount: number };
output?: {
greeting: string;
envBindingCount: number;
workflowName: string;
stepAttempt: number;
instanceId: string;
};
error?: { message?: string } | null;
rollback?: {
outcome: "complete" | "failed";
error: { message?: string } | null;
} | null;
}

const isTerminal = (status: WorkflowStatus) =>
status.status === "complete" ||
status.status === "errored" ||
status.status === "terminated";

const waitForStatus = (
client: HttpClient.HttpClient,
url: string,
id: string,
until: (status: WorkflowStatus) => boolean = isTerminal,
) =>
client.get(`${url}/workflow/status/${id}`).pipe(
Effect.flatMap((res) => res.json),
Effect.map((json) => json as unknown as WorkflowStatus),
Effect.repeat({
schedule: Schedule.spaced("2 seconds"),
until,
times: 30,
}),
);

// Start a fresh workflow instance and poll until it reaches a terminal state.
// A transient `errored` during edge/binding propagation fails this effect so
// the caller can retry with a brand-new instance.
Expand All @@ -58,17 +89,7 @@ const runWorkflowToCompletion = (url: string) =>
const { instanceId } = (yield* startRes.json) as { instanceId: string };
expect(instanceId).toBeTypeOf("string");

const lastStatus = yield* client
.get(`${url}/workflow/status/${instanceId}`)
.pipe(
Effect.flatMap((res) => res.json),
Effect.map((json) => json as unknown as WorkflowStatus),
Effect.repeat({
schedule: Schedule.spaced("2 seconds"),
until: (s) => s.status === "complete" || s.status === "errored",
times: 12,
}),
);
const lastStatus = yield* waitForStatus(client, url, instanceId);

// Surface a non-complete terminal state as a failure so the outer retry
// can take another swing (a fresh worker occasionally errors a step while
Expand Down Expand Up @@ -97,11 +118,55 @@ test(
expect(lastStatus.status).toBe("complete");
expect(lastStatus.error).toBeFalsy();
expect(lastStatus.output?.greeting).toBe("Hello, world!");
expect(lastStatus.output?.workflowName).toBe("TestWorkflow");
expect(lastStatus.output?.stepAttempt).toBe(1);
expect(lastStatus.rollback).toBeNull();
// The body yields `WorkerEnvironment` — if the regression from PR #71 ever
// returns, the body dies on the first yield and `output` is undefined.
expect(lastStatus.output?.envBindingCount).toBeGreaterThan(0);
}).pipe(logLevel),
{ timeout: 30_000 },
{ timeout: 180_000 },
);

test(
"workflow can wait for and receive external events",
Effect.gen(function* () {
const { url } = yield* stack;
const client = yield* HttpClient.HttpClient;

const startRes = yield* client.post(`${url}/workflow/wait/world`).pipe(
Effect.flatMap((res) =>
res.status === 200
? Effect.succeed(res)
: Effect.fail(new Error(`Worker not ready: ${res.status}`)),
),
Effect.retry({
schedule: Schedule.exponential("500 millis"),
times: 15,
}),
);
const { instanceId } = (yield* startRes.json) as { instanceId: string };

const waitingStatus = yield* waitForStatus(
client,
url,
instanceId,
(status) => status.status === "waiting" || isTerminal(status),
);
expect(waitingStatus.status).toBe("waiting");

const sendRes = yield* client.post(
`${url}/workflow/send/${instanceId}/external-ok`,
);
expect(sendRes.status).toBe(200);

const lastStatus = yield* waitForStatus(client, url, instanceId);
expect(lastStatus.status).toBe("complete");
expect(lastStatus.error).toBeFalsy();
expect(lastStatus.output?.greeting).toBe("external-ok");
expect(lastStatus.output?.instanceId).toBe(instanceId);
}).pipe(logLevel),
{ timeout: 180_000 },
);

// Canonical `list()` test (account collection): deploy the worker+workflow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ export default class DrizzleWorkflowWorker extends Cloudflare.Worker<DrizzleWork

if (request.url.startsWith("/workflow/start/")) {
const id = Number(request.url.split("/workflow/start/")[1] ?? "1");
const instance = yield* workflow.create({ id, name: `widget-${id}` });
const instance = yield* workflow.create({
params: { id, name: `widget-${id}` },
});
return yield* HttpServerResponse.json({ instanceId: instance.id });
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ export default class DrizzleWorkflow extends Cloudflare.Workflow<DrizzleWorkflow
const db = yield* Drizzle.postgres(conn.connectionString, { relations });

return Effect.fn(function* (input: { id: number; name: string }) {
const inserted = yield* Cloudflare.task(
"insert-widget",
Effect.gen(function* () {
const inserted = yield* Cloudflare.task({
name: "insert-widget",
effect: Effect.gen(function* () {
const [row] = yield* db
.insert(Widgets)
.values({ id: input.id, name: input.name })
Expand All @@ -40,19 +40,19 @@ export default class DrizzleWorkflow extends Cloudflare.Workflow<DrizzleWorkflow
.returning();
return row;
}).pipe(Effect.orDie),
);
});

yield* Cloudflare.sleep("settle", "1 second");

const rows = yield* Cloudflare.task(
"select-widget",
Effect.gen(function* () {
const rows = yield* Cloudflare.task({
name: "select-widget",
effect: Effect.gen(function* () {
return yield* db
.select()
.from(Widgets)
.where(eq(Widgets.id, input.id));
}).pipe(Effect.orDie),
);
});

// Step results are JSON-serialized by Cloudflare — return plain rows.
return { inserted, rowCount: rows.length, widget: rows[0] ?? null };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,61 @@ import * as Effect from "effect/Effect";
* Fixture workflow used by `Workflow.test.ts`.
*
* Exercises:
* - `Cloudflare.task` durable steps (greet + finalize)
* - `Cloudflare.task` durable steps with retry config and step context
* - `Cloudflare.sleep` between steps
* - `Cloudflare.waitForEvent` external event delivery
* - `Cloudflare.WorkerEnvironment` access from inside the body — regression
* guard for https://github.com/alchemy-run/alchemy-effect/pull/71
*/
export default class TestWorkflow extends Cloudflare.Workflow<TestWorkflow>()(
"TestWorkflow",
Effect.gen(function* () {
return Effect.fn(function* (input: { value: string }) {
console.log("greeted");
return Effect.fn(function* (input: { value: string; wait?: boolean }) {
const env = yield* Cloudflare.WorkerEnvironment;
const event = yield* Cloudflare.WorkflowEvent;

const greeted = yield* Cloudflare.task(
"greet",
Effect.succeed(`Hello, ${input.value}!`),
);
const greeted = yield* Cloudflare.task({
name: "greet",
retries: { limit: 3, delay: "1 second", backoff: "linear" },
timeout: "1 minute",
effect: Effect.gen(function* () {
const context = yield* Cloudflare.WorkflowStepContext;
return {
text: `Hello, ${input.value}!`,
attempt: context.attempt,
};
}),
});

if (input.wait) {
const external = yield* Cloudflare.waitForEvent<{ message: string }>(
"external-event",
{ type: "test-event", timeout: "5 minutes" },
);

return {
greeting: external.message,
envBindingCount: Object.keys(env).length,
workflowName: event.workflowName,
stepAttempt: greeted.attempt,
instanceId: event.instanceId,
};
}

yield* Cloudflare.sleep("cooldown", "1 second");

const finalized = yield* Cloudflare.task(
"finalize",
Effect.succeed({
greeting: greeted,
const finalized = yield* Cloudflare.task({
name: "finalize",
effect: Effect.succeed({
greeting: greeted.text,
envBindingCount: Object.keys(env).length,
workflowName: event.workflowName,
stepAttempt: greeted.attempt,
instanceId: event.instanceId,
}),
);
rollback: () => Effect.void,
rollbackConfig: { retries: { limit: 1, delay: "1 second" } },
});

return finalized;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,29 @@ export default class WorkflowTestWorker extends Cloudflare.Worker<WorkflowTestWo

if (request.url.startsWith("/workflow/start/")) {
const value = request.url.split("/workflow/start/")[1] ?? "world";
const instance = yield* workflow.create({ value });
const instance = yield* workflow.create({ params: { value } });
return yield* HttpServerResponse.json({ instanceId: instance.id });
}

if (request.url.startsWith("/workflow/wait/")) {
const value = request.url.split("/workflow/wait/")[1] ?? "world";
const instance = yield* workflow.create({
params: { value, wait: true },
});
return yield* HttpServerResponse.json({ instanceId: instance.id });
}

if (request.url.startsWith("/workflow/send/")) {
const [, rest = ""] = request.url.split("/workflow/send/");
const [instanceId = "", message = "received"] = rest.split("/");
const instance = yield* workflow.get(instanceId);
yield* instance.sendEvent({
type: "test-event",
payload: { message },
});
return yield* HttpServerResponse.json({ ok: true });
}

if (request.url.startsWith("/workflow/status/")) {
const instanceId = request.url.split("/workflow/status/")[1] ?? "";
const instance = yield* workflow.get(instanceId);
Expand Down
5 changes: 3 additions & 2 deletions scripts/generate-api-reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ function findPrimaryJSDoc(sourceFile: SourceFile): ParsedJSDoc {
}
}

let firstWithSummary: ParsedJSDoc | undefined;

for (const cls of sourceFile.getClasses()) {
if (!cls.isExported()) continue;
const jsdoc = parseJSDoc(cls);
Expand All @@ -245,10 +247,9 @@ function findPrimaryJSDoc(sourceFile: SourceFile): ParsedJSDoc {
jsdoc.sections.length > 0
)
return jsdoc;
if (jsdoc.summary) return jsdoc;
if (!firstWithSummary && jsdoc.summary) firstWithSummary = jsdoc;
}

let firstWithSummary: ParsedJSDoc | undefined;
for (const stmt of sourceFile.getStatements()) {
if (Node.isExportable(stmt) && stmt.isExported()) {
const jsdoc = parseJSDoc(stmt);
Expand Down