Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion packages/alchemy/src/Cloudflare/Workers/WorkerRuntimeContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,25 @@ export const makeWorkerRuntimeContext = (id: string): WorkerRuntimeContext => {
env,
context,
};
const effects: Effect.Effect<unknown>[] = [];
for (const handler of handlers) {
const eff = handler(event);
if (Effect.isEffect(eff)) {
return [eff, services];
effects.push(eff);
}
}
if (effects.length === 1) {
return [effects[0], services];
}
if (effects.length > 1) {
return [
Effect.all(effects, {
concurrency: "unbounded",
discard: true,
}),
services,
];
}
return [
Effect.die(
new Error(`No event handler found for event type '${type}'`),
Expand Down
115 changes: 81 additions & 34 deletions packages/alchemy/test/Cloudflare/Queue/RoundTrip.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import { MinimumLogLevel } from "effect/References";
import * as Schedule from "effect/Schedule";
import * as HttpClient from "effect/unstable/http/HttpClient";
import * as HttpClientRequest from "effect/unstable/http/HttpClientRequest";
import QueueWorker, { Counter, RoundTripQueue } from "./round-trip-worker.ts";
import QueueWorker, {
Counter,
RoundTripQueue,
SecondaryRoundTripQueue,
} from "./round-trip-worker.ts";

const { test } = Test.make({ providers: Cloudflare.providers() });

Expand All @@ -28,11 +32,14 @@ class CountMismatch extends Data.TaggedError("CountMismatch")<{
* Stack:
*
* - `Counter` Durable Object (per-key count + last-bodies tail).
* - `RoundTripQueue` (Cloudflare.Queue).
* - `RoundTripQueue` and `SecondaryRoundTripQueue`
* (Cloudflare.Queue).
* - `QueueRoundTripWorker` — exposes:
* - `POST /send?name=K` → enqueues a message via the
* `Cloudflare.QueueBinding` producer.
* - subscribe handler → increments the named Counter DO
* - `POST /send-secondary?name=K` → enqueues to a second
* Queue bound to the same Worker.
* - subscribe handlers → increment the named Counter DO
* and stores the body, via
* `Cloudflare.messages(RoundTripQueue).subscribe(...)`.
* - `GET /count?name=K` → reads the DO snapshot.
Expand All @@ -46,19 +53,25 @@ class CountMismatch extends Data.TaggedError("CountMismatch")<{
* to the registered consumer, the subscribe handler runs, the DO
* RPC stub from inside the queue handler works, and the test
* client can read the resulting DO state.
*
* The second queue catches regressions where Worker dispatch stops
* after the first registered queue listener. The listener generated
* by `messages().subscribe(...)` performs its queue-name check inside
* the returned Effect, so dispatch must invoke every listener for
* the event type.
*/
test.provider(
"send → subscribe handler → DO state → polled by test client",
"send → subscribe handlers → DO state → polled by test client",
(stack) =>
Effect.gen(function* () {
yield* stack.destroy();

const out = yield* stack.deploy(
Effect.gen(function* () {
// The Worker's init body yields Counter and
// RoundTripQueue internally — yielding QueueWorker is
// enough to bring the whole stack (Queue +
// QueueConsumer + Counter DO + Worker) into the plan.
// both Queue resources internally — yielding QueueWorker
// is enough to bring the whole stack (Queues +
// QueueConsumers + Counter DO + Worker) into the plan.
const worker = yield* QueueWorker;
return { url: worker.url };
}),
Expand All @@ -71,14 +84,16 @@ test.provider(
// accumulate state from prior runs (the DO survives across
// deploys when the namespace logical id is stable).
const name = `roundtrip-${Math.random().toString(36).slice(2, 8)}`;
const secondaryName = `roundtrip-secondary-${Math.random()
.toString(36)
.slice(2, 8)}`;
const messages = ["alpha", "beta", "gamma", "delta"];
const secondaryMessages = ["one", "two"];

for (const text of messages) {
// Cloudflare's edge takes a few seconds to start serving a fresh
// workers.dev URL — retry until the worker returns 202.
const sendResponse = yield* HttpClient.execute(
const sendMessage = (pathname: string, counterName: string, text: string) =>
HttpClient.execute(
HttpClientRequest.post(
`${baseUrl}/send?name=${encodeURIComponent(name)}`,
`${baseUrl}${pathname}?name=${encodeURIComponent(counterName)}`,
).pipe(HttpClientRequest.bodyText(text)),
).pipe(
Effect.flatMap((res) =>
Expand All @@ -92,6 +107,11 @@ test.provider(
),
}),
);

for (const text of messages) {
// Cloudflare's edge takes a few seconds to start serving a fresh
// workers.dev URL — retry until the worker returns 202.
const sendResponse = yield* sendMessage("/send", name, text);
expect(sendResponse.status).toBe(202);
const sent = (yield* sendResponse.json) as {
sent: { name: string; text: string };
Expand All @@ -100,38 +120,65 @@ test.provider(
expect(sent.sent.text).toBe(text);
}

// Poll the DO snapshot until the consumer has caught up. The
for (const text of secondaryMessages) {
const sendResponse = yield* sendMessage(
"/send-secondary",
secondaryName,
text,
);
expect(sendResponse.status).toBe(202);
const sent = (yield* sendResponse.json) as {
sent: { name: string; text: string };
};
expect(sent.sent.name).toBe(secondaryName);
expect(sent.sent.text).toBe(text);
}

const readSnapshot = (counterName: string, expected: number) =>
HttpClient.get(
`${baseUrl}/count?name=${encodeURIComponent(counterName)}`,
).pipe(
Effect.flatMap((res) => res.json),
Effect.flatMap((body) => {
const snap = body as { count: number; lastBodies: string[] };
return snap.count >= expected
? Effect.succeed(snap)
: Effect.fail(
new CountMismatch({
expected,
actual: snap.count,
}),
);
}),
Effect.retry({
while: (e): e is CountMismatch => e instanceof CountMismatch,
schedule: Schedule.exponential("500 millis").pipe(
Schedule.both(Schedule.recurs(40)),
),
}),
);

// Poll the DO snapshot until each consumer has caught up. The
// exponential schedule + recurs cap gives Cloudflare ~60s to
// dispatch and ack — comfortably above the typical 1–5s
// dispatch latency we saw in practice without flaking.
const snapshot = yield* HttpClient.get(
`${baseUrl}/count?name=${encodeURIComponent(name)}`,
).pipe(
Effect.flatMap((res) => res.json),
Effect.flatMap((body) => {
const snap = body as { count: number; lastBodies: string[] };
return snap.count >= messages.length
? Effect.succeed(snap)
: Effect.fail(
new CountMismatch({
expected: messages.length,
actual: snap.count,
}),
);
}),
Effect.retry({
while: (e): e is CountMismatch => e instanceof CountMismatch,
schedule: Schedule.exponential("500 millis").pipe(
Schedule.both(Schedule.recurs(40)),
),
}),
const snapshot = yield* readSnapshot(name, messages.length);
const secondarySnapshot = yield* readSnapshot(
secondaryName,
secondaryMessages.length,
);

// The DO observed every message. The order is best-effort
// (Cloudflare may dispatch batches in parallel) so we
// compare as a multiset.
expect(snapshot.count).toBeGreaterThanOrEqual(messages.length);
expect([...snapshot.lastBodies].sort()).toEqual([...messages].sort());
expect(secondarySnapshot.count).toBeGreaterThanOrEqual(
secondaryMessages.length,
);
expect([...secondarySnapshot.lastBodies].sort()).toEqual(
[...secondaryMessages].sort(),
);

yield* stack.destroy();
}).pipe(logLevel),
Expand Down
35 changes: 34 additions & 1 deletion packages/alchemy/test/Cloudflare/Queue/round-trip-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class Counter extends Cloudflare.DurableObjectNamespace<Counter>()(
) {}

/**
* The queue resource the worker produces to and consumes from. The
* The queue resources the worker produces to and consumes from. The
* test deploys this stack, sends N messages via `POST /send?name=K`,
* waits for the consumer to push them through to the Counter DO,
* and polls `GET /count?name=K` until the count matches.
Expand All @@ -47,6 +47,9 @@ export class Counter extends Cloudflare.DurableObjectNamespace<Counter>()(
* deploy time, so this fixture has no separate consumer wiring.
*/
export const RoundTripQueue = Cloudflare.Queue("RoundTripQueue");
export const SecondaryRoundTripQueue = Cloudflare.Queue(
"SecondaryRoundTripQueue",
);

interface QueueMessageBody {
name: string;
Expand All @@ -64,6 +67,10 @@ export default class QueueWorker extends Cloudflare.Worker<QueueWorker>()(
const counters = yield* Counter;
const queueResource = yield* RoundTripQueue;
const queue = yield* Cloudflare.QueueBinding.bind(queueResource);
const secondaryQueueResource = yield* SecondaryRoundTripQueue;
const secondaryQueue = yield* Cloudflare.QueueBinding.bind(
secondaryQueueResource,
);

// Effect-style queue consumer. The handler delegates to the
// Counter DO so the test can verify the message landed by
Expand All @@ -87,6 +94,19 @@ export default class QueueWorker extends Cloudflare.Worker<QueueWorker>()(
),
);

// A second queue subscription on the same Worker verifies that
// dispatch reaches every listener for the `queue` event. Each
// listener still scopes itself by queue name internally.
yield* Cloudflare.messages<QueueMessageBody>(secondaryQueueResource, {
batchSize: 1,
maxRetries: 2,
retryDelay: "2 seconds",
}).subscribe((stream) =>
Stream.runForEach(stream, (msg) =>
counters.getByName(msg.body.name).record(msg.body.text),
),
);

return {
fetch: Effect.gen(function* () {
const request = yield* HttpServerRequest;
Expand All @@ -102,6 +122,19 @@ export default class QueueWorker extends Cloudflare.Worker<QueueWorker>()(
);
}

if (
request.method === "POST" &&
url.pathname === "/send-secondary"
) {
const name = url.searchParams.get("name") ?? "default";
const text = yield* request.text;
yield* secondaryQueue.send({ name, text }).pipe(Effect.orDie);
return yield* HttpServerResponse.json(
{ sent: { name, text } },
{ status: 202 },
);
}

if (request.method === "GET" && url.pathname === "/count") {
const name = url.searchParams.get("name") ?? "default";
const snapshot = yield* counters.getByName(name).snapshot();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { makeWorkerRuntimeContext } from "@/Cloudflare/Workers/WorkerRuntimeContext.ts";
import * as Effect from "effect/Effect";
import { describe, expect, it } from "vitest";

describe("WorkerRuntimeContext", () => {
it("dispatches an event to every listener for that event type", async () => {
const ctx = makeWorkerRuntimeContext("test-worker");
const observed: string[] = [];

await Effect.runPromise(
Effect.gen(function* () {
yield* ctx.listen((event) => {
if (event.type !== "queue") return;
return Effect.sync(() => {
observed.push("first");
});
});
yield* ctx.listen((event) => {
if (event.type !== "queue") return;
return Effect.sync(() => {
observed.push("second");
});
});
}),
);

const exports = await Effect.runPromise(ctx.exports);
const [program, services] = exports.default.queue(
{ queue: "queue-a", messages: [] },
{},
{} as ExecutionContext,
);

await Effect.runPromise(program.pipe(Effect.provide(services)));

expect(observed).toEqual(["first", "second"]);
});
});