diff --git a/packages/alchemy/src/Cloudflare/Workers/WorkerRuntimeContext.ts b/packages/alchemy/src/Cloudflare/Workers/WorkerRuntimeContext.ts index 54e95f76a..52cfffa94 100644 --- a/packages/alchemy/src/Cloudflare/Workers/WorkerRuntimeContext.ts +++ b/packages/alchemy/src/Cloudflare/Workers/WorkerRuntimeContext.ts @@ -130,12 +130,25 @@ export const makeWorkerRuntimeContext = (id: string): WorkerRuntimeContext => { env, context, }; + const effects: Effect.Effect[] = []; for (const handler of handlers) { const eff = handler(event); if (Effect.isEffect(eff)) { - return [eff, services]; + effects.push(eff); } } + if (effects.length === 1) { + return [effects[0], services]; + } + if (effects.length > 1) { + return [ + Effect.all(effects, { + concurrency: "unbounded", + discard: true, + }), + services, + ]; + } return [ Effect.die( new Error(`No event handler found for event type '${type}'`), diff --git a/packages/alchemy/test/Cloudflare/Queue/RoundTrip.test.ts b/packages/alchemy/test/Cloudflare/Queue/RoundTrip.test.ts index 2be41dc3b..64eff3d8c 100644 --- a/packages/alchemy/test/Cloudflare/Queue/RoundTrip.test.ts +++ b/packages/alchemy/test/Cloudflare/Queue/RoundTrip.test.ts @@ -7,7 +7,11 @@ import { MinimumLogLevel } from "effect/References"; import * as Schedule from "effect/Schedule"; import * as HttpClient from "effect/unstable/http/HttpClient"; import * as HttpClientRequest from "effect/unstable/http/HttpClientRequest"; -import QueueWorker, { Counter, RoundTripQueue } from "./round-trip-worker.ts"; +import QueueWorker, { + Counter, + RoundTripQueue, + SecondaryRoundTripQueue, +} from "./round-trip-worker.ts"; const { test } = Test.make({ providers: Cloudflare.providers() }); @@ -28,11 +32,14 @@ class CountMismatch extends Data.TaggedError("CountMismatch")<{ * Stack: * * - `Counter` Durable Object (per-key count + last-bodies tail). - * - `RoundTripQueue` (Cloudflare.Queue). + * - `RoundTripQueue` and `SecondaryRoundTripQueue` + * (Cloudflare.Queue). * - `QueueRoundTripWorker` — exposes: * - `POST /send?name=K` → enqueues a message via the * `Cloudflare.QueueBinding` producer. - * - subscribe handler → increments the named Counter DO + * - `POST /send-secondary?name=K` → enqueues to a second + * Queue bound to the same Worker. + * - subscribe handlers → increment the named Counter DO * and stores the body, via * `Cloudflare.messages(RoundTripQueue).subscribe(...)`. * - `GET /count?name=K` → reads the DO snapshot. @@ -46,9 +53,15 @@ class CountMismatch extends Data.TaggedError("CountMismatch")<{ * to the registered consumer, the subscribe handler runs, the DO * RPC stub from inside the queue handler works, and the test * client can read the resulting DO state. + * + * The second queue catches regressions where Worker dispatch stops + * after the first registered queue listener. The listener generated + * by `messages().subscribe(...)` performs its queue-name check inside + * the returned Effect, so dispatch must invoke every listener for + * the event type. */ test.provider( - "send → subscribe handler → DO state → polled by test client", + "send → subscribe handlers → DO state → polled by test client", (stack) => Effect.gen(function* () { yield* stack.destroy(); @@ -56,9 +69,9 @@ test.provider( const out = yield* stack.deploy( Effect.gen(function* () { // The Worker's init body yields Counter and - // RoundTripQueue internally — yielding QueueWorker is - // enough to bring the whole stack (Queue + - // QueueConsumer + Counter DO + Worker) into the plan. + // both Queue resources internally — yielding QueueWorker + // is enough to bring the whole stack (Queues + + // QueueConsumers + Counter DO + Worker) into the plan. const worker = yield* QueueWorker; return { url: worker.url }; }), @@ -71,14 +84,16 @@ test.provider( // accumulate state from prior runs (the DO survives across // deploys when the namespace logical id is stable). const name = `roundtrip-${Math.random().toString(36).slice(2, 8)}`; + const secondaryName = `roundtrip-secondary-${Math.random() + .toString(36) + .slice(2, 8)}`; const messages = ["alpha", "beta", "gamma", "delta"]; + const secondaryMessages = ["one", "two"]; - for (const text of messages) { - // Cloudflare's edge takes a few seconds to start serving a fresh - // workers.dev URL — retry until the worker returns 202. - const sendResponse = yield* HttpClient.execute( + const sendMessage = (pathname: string, counterName: string, text: string) => + HttpClient.execute( HttpClientRequest.post( - `${baseUrl}/send?name=${encodeURIComponent(name)}`, + `${baseUrl}${pathname}?name=${encodeURIComponent(counterName)}`, ).pipe(HttpClientRequest.bodyText(text)), ).pipe( Effect.flatMap((res) => @@ -92,6 +107,11 @@ test.provider( ), }), ); + + for (const text of messages) { + // Cloudflare's edge takes a few seconds to start serving a fresh + // workers.dev URL — retry until the worker returns 202. + const sendResponse = yield* sendMessage("/send", name, text); expect(sendResponse.status).toBe(202); const sent = (yield* sendResponse.json) as { sent: { name: string; text: string }; @@ -100,31 +120,52 @@ test.provider( expect(sent.sent.text).toBe(text); } - // Poll the DO snapshot until the consumer has caught up. The + for (const text of secondaryMessages) { + const sendResponse = yield* sendMessage( + "/send-secondary", + secondaryName, + text, + ); + expect(sendResponse.status).toBe(202); + const sent = (yield* sendResponse.json) as { + sent: { name: string; text: string }; + }; + expect(sent.sent.name).toBe(secondaryName); + expect(sent.sent.text).toBe(text); + } + + const readSnapshot = (counterName: string, expected: number) => + HttpClient.get( + `${baseUrl}/count?name=${encodeURIComponent(counterName)}`, + ).pipe( + Effect.flatMap((res) => res.json), + Effect.flatMap((body) => { + const snap = body as { count: number; lastBodies: string[] }; + return snap.count >= expected + ? Effect.succeed(snap) + : Effect.fail( + new CountMismatch({ + expected, + actual: snap.count, + }), + ); + }), + Effect.retry({ + while: (e): e is CountMismatch => e instanceof CountMismatch, + schedule: Schedule.exponential("500 millis").pipe( + Schedule.both(Schedule.recurs(40)), + ), + }), + ); + + // Poll the DO snapshot until each consumer has caught up. The // exponential schedule + recurs cap gives Cloudflare ~60s to // dispatch and ack — comfortably above the typical 1–5s // dispatch latency we saw in practice without flaking. - const snapshot = yield* HttpClient.get( - `${baseUrl}/count?name=${encodeURIComponent(name)}`, - ).pipe( - Effect.flatMap((res) => res.json), - Effect.flatMap((body) => { - const snap = body as { count: number; lastBodies: string[] }; - return snap.count >= messages.length - ? Effect.succeed(snap) - : Effect.fail( - new CountMismatch({ - expected: messages.length, - actual: snap.count, - }), - ); - }), - Effect.retry({ - while: (e): e is CountMismatch => e instanceof CountMismatch, - schedule: Schedule.exponential("500 millis").pipe( - Schedule.both(Schedule.recurs(40)), - ), - }), + const snapshot = yield* readSnapshot(name, messages.length); + const secondarySnapshot = yield* readSnapshot( + secondaryName, + secondaryMessages.length, ); // The DO observed every message. The order is best-effort @@ -132,6 +173,12 @@ test.provider( // compare as a multiset. expect(snapshot.count).toBeGreaterThanOrEqual(messages.length); expect([...snapshot.lastBodies].sort()).toEqual([...messages].sort()); + expect(secondarySnapshot.count).toBeGreaterThanOrEqual( + secondaryMessages.length, + ); + expect([...secondarySnapshot.lastBodies].sort()).toEqual( + [...secondaryMessages].sort(), + ); yield* stack.destroy(); }).pipe(logLevel), diff --git a/packages/alchemy/test/Cloudflare/Queue/round-trip-worker.ts b/packages/alchemy/test/Cloudflare/Queue/round-trip-worker.ts index e369ba9e8..266eb331d 100644 --- a/packages/alchemy/test/Cloudflare/Queue/round-trip-worker.ts +++ b/packages/alchemy/test/Cloudflare/Queue/round-trip-worker.ts @@ -37,7 +37,7 @@ export class Counter extends Cloudflare.DurableObjectNamespace()( ) {} /** - * The queue resource the worker produces to and consumes from. The + * The queue resources the worker produces to and consumes from. The * test deploys this stack, sends N messages via `POST /send?name=K`, * waits for the consumer to push them through to the Counter DO, * and polls `GET /count?name=K` until the count matches. @@ -47,6 +47,9 @@ export class Counter extends Cloudflare.DurableObjectNamespace()( * deploy time, so this fixture has no separate consumer wiring. */ export const RoundTripQueue = Cloudflare.Queue("RoundTripQueue"); +export const SecondaryRoundTripQueue = Cloudflare.Queue( + "SecondaryRoundTripQueue", +); interface QueueMessageBody { name: string; @@ -64,6 +67,10 @@ export default class QueueWorker extends Cloudflare.Worker()( const counters = yield* Counter; const queueResource = yield* RoundTripQueue; const queue = yield* Cloudflare.QueueBinding.bind(queueResource); + const secondaryQueueResource = yield* SecondaryRoundTripQueue; + const secondaryQueue = yield* Cloudflare.QueueBinding.bind( + secondaryQueueResource, + ); // Effect-style queue consumer. The handler delegates to the // Counter DO so the test can verify the message landed by @@ -87,6 +94,19 @@ export default class QueueWorker extends Cloudflare.Worker()( ), ); + // A second queue subscription on the same Worker verifies that + // dispatch reaches every listener for the `queue` event. Each + // listener still scopes itself by queue name internally. + yield* Cloudflare.messages(secondaryQueueResource, { + batchSize: 1, + maxRetries: 2, + retryDelay: "2 seconds", + }).subscribe((stream) => + Stream.runForEach(stream, (msg) => + counters.getByName(msg.body.name).record(msg.body.text), + ), + ); + return { fetch: Effect.gen(function* () { const request = yield* HttpServerRequest; @@ -102,6 +122,19 @@ export default class QueueWorker extends Cloudflare.Worker()( ); } + if ( + request.method === "POST" && + url.pathname === "/send-secondary" + ) { + const name = url.searchParams.get("name") ?? "default"; + const text = yield* request.text; + yield* secondaryQueue.send({ name, text }).pipe(Effect.orDie); + return yield* HttpServerResponse.json( + { sent: { name, text } }, + { status: 202 }, + ); + } + if (request.method === "GET" && url.pathname === "/count") { const name = url.searchParams.get("name") ?? "default"; const snapshot = yield* counters.getByName(name).snapshot(); diff --git a/packages/alchemy/test/Cloudflare/Workers/WorkerRuntimeContext.test.ts b/packages/alchemy/test/Cloudflare/Workers/WorkerRuntimeContext.test.ts new file mode 100644 index 000000000..bb66c949a --- /dev/null +++ b/packages/alchemy/test/Cloudflare/Workers/WorkerRuntimeContext.test.ts @@ -0,0 +1,38 @@ +import { makeWorkerRuntimeContext } from "@/Cloudflare/Workers/WorkerRuntimeContext.ts"; +import * as Effect from "effect/Effect"; +import { describe, expect, it } from "vitest"; + +describe("WorkerRuntimeContext", () => { + it("dispatches an event to every listener for that event type", async () => { + const ctx = makeWorkerRuntimeContext("test-worker"); + const observed: string[] = []; + + await Effect.runPromise( + Effect.gen(function* () { + yield* ctx.listen((event) => { + if (event.type !== "queue") return; + return Effect.sync(() => { + observed.push("first"); + }); + }); + yield* ctx.listen((event) => { + if (event.type !== "queue") return; + return Effect.sync(() => { + observed.push("second"); + }); + }); + }), + ); + + const exports = await Effect.runPromise(ctx.exports); + const [program, services] = exports.default.queue( + { queue: "queue-a", messages: [] }, + {}, + {} as ExecutionContext, + ); + + await Effect.runPromise(program.pipe(Effect.provide(services))); + + expect(observed).toEqual(["first", "second"]); + }); +});