Skip to content

feat(cloudflare/pipelines): Stream, Sink, Pipeline resources#532

Open
sam-goodwin wants to merge 2 commits into
mainfrom
feat/cloudflare-pipelines
Open

feat(cloudflare/pipelines): Stream, Sink, Pipeline resources#532
sam-goodwin wants to merge 2 commits into
mainfrom
feat/cloudflare-pipelines

Conversation

@sam-goodwin

Copy link
Copy Markdown
Contributor

Effect-native support for Cloudflare Pipelines (open-beta v1 model): Stream for write-only event ingestion, Sink for R2 / R2 Data Catalog destinations, and Pipeline for the SQL job that wires them together. Schema authoring lives in alchemy/Cloudflare/SQL (re-exports effect/Schema + Cloudflare precision/temporal brand helpers).

Define a stream, sink, and pipeline

schema accepts a plain record of Schema values — wrapped in Schema.Struct(...) for you:

import * as Cloudflare from "alchemy/Cloudflare";
import * as SQL from "alchemy/Cloudflare/SQL";

const lake = yield* Cloudflare.R2Bucket("Lake");

const events = yield* Cloudflare.Stream("Events", {
  schema: {
    user_id: SQL.String,
    amount: SQL.Float64,
    at: SQL.Timestamp,
  },
});

// Auto-mints a scoped AccountApiToken from the R2Bucket resource —
// pass `credentials: { ... }` (Redacted) alongside a bucket name to skip.
const sink = yield* Cloudflare.Sink("Lakehouse", {
  type: "r2_data_catalog",
  bucket: lake,
  namespace: "analytics",
  tableName: "events",
});

yield* Cloudflare.Pipeline("Ingest", {
  // `pipelineSql` interpolates Stream/Sink by name and registers the
  // dependency edges, so the engine provisions stream + sink first.
  sql: Cloudflare.pipelineSql`
    INSERT INTO ${sink}
    SELECT user_id, amount FROM ${events} WHERE amount > 100`,
});

Produce from a Worker

import * as Cloudflare from "alchemy/Cloudflare";

export default class Ingestor extends Cloudflare.Worker<Ingestor>()(
  "Ingestor",
  { main: import.meta.filename },
  Effect.gen(function* () {
    const events = yield* Cloudflare.Stream.bind(Events);
    return {
      fetch: Effect.gen(function* () {
        const body = (yield* HttpServerRequest.json) as Record<string, unknown>[];
        yield* events.sendBatch(body);
        return HttpServerResponse.empty({ status: 202 });
      }),
    };
  }).pipe(Effect.provide(Cloudflare.StreamBindingLive)),
) {}

What gets created on Sink

+ Cloudflare.R2Bucket("Lake")
+ Cloudflare.AccountApiToken("Lakehouse/Token")   ← auto-provisioned, scoped to R2 perms
+ Cloudflare.PipelinesSink("Lakehouse/Sink")

Credentials are derived from the token's outputs (accessKeyId = tokenId, secretAccessKey = sha256(token.value) for r2; token for r2_data_catalog) and carried as Redacted<string> end-to-end.

Types Cloudflare distinguishes that Schema.Number / Schema.Date can't

SQL.Int32 / SQL.Int64 / SQL.Float32 / SQL.Float64
SQL.Timestamp                  // millisecond
SQL.TimestampSeconds / SQL.TimestampMicroseconds / SQL.TimestampNanoseconds

Bare Schema.Number compiles to float64; Schema.Date to timestamp:millisecond. Schema.Array(x) becomes list<x>, nested Schema.Struct becomes struct, Schema.Unknown becomes json. Optional fields (SQL.optional(...)) emit required: false. schema also accepts a Schema.Struct(...) value or a raw { fields: [...] } payload as escape hatches.

Add Effect-native support for Cloudflare Pipelines (open-beta v1 model)
plus an `alchemy/Cloudflare/SQL` namespace for schema authoring.

Co-authored-by: Cursor <cursoragent@cursor.com>
@alchemy-version-bot

alchemy-version-bot Bot commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

Install the packages built from this commit:

alchemy

bun add alchemy@https://pkg.ing/alchemy/c024b17

@alchemy.run/better-auth

bun add @alchemy.run/better-auth@https://pkg.ing/@alchemy.run/better-auth/c024b17

@alchemy.run/pr-package

bun add @alchemy.run/pr-package@https://pkg.ing/@alchemy.run/pr-package/c024b17

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant