diff --git a/packages/alchemy/package.json b/packages/alchemy/package.json index 70697ea0c..99a927374 100644 --- a/packages/alchemy/package.json +++ b/packages/alchemy/package.json @@ -142,6 +142,12 @@ "worker": "./src/Cloudflare/Live.ts", "import": "./lib/Cloudflare/Live.js" }, + "./Cloudflare/SQL": { + "types": "./lib/Cloudflare/SQL.d.ts", + "bun": "./src/Cloudflare/SQL.ts", + "worker": "./src/Cloudflare/SQL.ts", + "import": "./lib/Cloudflare/SQL.js" + }, "./Cloudflare/*": { "types": "./lib/Cloudflare/*/index.d.ts", "bun": "./src/Cloudflare/*/index.ts", diff --git a/packages/alchemy/src/Cloudflare/Pipelines/Pipeline.ts b/packages/alchemy/src/Cloudflare/Pipelines/Pipeline.ts new file mode 100644 index 000000000..a5e946e37 --- /dev/null +++ b/packages/alchemy/src/Cloudflare/Pipelines/Pipeline.ts @@ -0,0 +1,355 @@ +import * as pipelines from "@distilled.cloud/cloudflare/pipelines"; +import * as Effect from "effect/Effect"; +import * as Option from "effect/Option"; +import * as StreamE from "effect/Stream"; +import { isResolved } from "../../Diff.ts"; +import type { Input } from "../../Input.ts"; +import * as Output from "../../Output.ts"; +import { createPhysicalName } from "../../PhysicalName.ts"; +import * as Provider from "../../Provider.ts"; +import { Resource } from "../../Resource.ts"; +import { CloudflareEnvironment } from "../CloudflareEnvironment.ts"; +import type { Providers } from "../Providers.ts"; +import { isSink, type Sink } from "./Sink.ts"; +import { isStream, type Stream } from "./Stream.ts"; + +export const isPipeline = (value: unknown): value is Pipeline => + typeof value === "object" && + (value as any)?.Type === "Cloudflare.PipelinesPipeline"; + +export type PipelineTableInfo = { + id: string; + name: string; + type: "stream" | "sink"; + version: number; + latest: number; +}; + +export type PipelineProps = { + /** + * Pipeline name. If omitted, a unique name is generated. Must match + * Cloudflare's pipeline-name rules (lowercase letters, digits, underscores). + * @default ${app}_${stage}_${id} + */ + name?: string; + /** + * SQL that defines the pipeline. Most callers should build this with the + * {@link pipelineSql} tagged template, which interpolates `Stream` and + * `Sink` resources by name so the engine wires up dependencies for you. + */ + sql: Input; + /** + * Validate the SQL against the configured stream and sink before sending + * the create request. Saves a noisy `InvalidSql`/`TableNotFound` failure + * after deployment has already started. + * @default true + */ + validate?: boolean; +}; + +export type Pipeline = Resource< + "Cloudflare.PipelinesPipeline", + PipelineProps, + { + pipelineId: string; + pipelineName: string; + sql: string; + status: string; + tables: PipelineTableInfo[]; + createdAt: string; + accountId: string; + }, + never, + Providers +>; + +/** + * A Cloudflare Pipelines Pipeline — the SQL job that reads events from a + * {@link Stream} and writes them to a {@link Sink}. + * + * The SQL is immutable in Cloudflare's API: any change to `sql` triggers a + * replace. Use {@link pipelineSql} to build the statement so stream and sink + * names are interpolated lazily, and the engine sees the resource dependencies. + * + * @section Creating a Pipeline + * @example Insert all events from a stream into a sink + * ```typescript + * const events = yield* Cloudflare.Stream("Events"); + * const sink = yield* Cloudflare.Sink("Lake", { type: "r2", bucket }); + * + * yield* Cloudflare.Pipeline("Ingest", { + * sql: Cloudflare.pipelineSql` + * INSERT INTO ${sink} + * SELECT * FROM ${events}`, + * }); + * ``` + * + * @example Transform on the way through + * ```typescript + * yield* Cloudflare.Pipeline("HighValue", { + * sql: Cloudflare.pipelineSql` + * INSERT INTO ${sink} + * SELECT user_id, amount FROM ${events} + * WHERE amount > 100`, + * }); + * ``` + */ +export const Pipeline = Resource("Cloudflare.PipelinesPipeline"); + +/** + * Tagged-template helper for building pipeline SQL. + * + * Interpolates {@link Stream} / {@link Sink} resources (or `Effect`s that + * yield them) by name. Effect args are resolved eagerly so call sites can + * inline a constructor in the template: + * + * ```typescript + * Cloudflare.Pipeline("Ingest", { + * sql: Cloudflare.pipelineSql` + * INSERT INTO ${Cloudflare.Sink("Lakehouse", { type: "r2", bucket })} + * SELECT * FROM ${Events}`, + * }); + * ``` + * + * The interpolated names become real dependency edges, so the engine + * provisions streams and sinks before the pipeline. + * + * @example Pre-yielded resources + * ```typescript + * const stream = yield* Cloudflare.Stream("Events"); + * const sink = yield* Cloudflare.Sink("Lake", { type: "r2", bucket }); + * + * const sql = Cloudflare.pipelineSql` + * INSERT INTO ${sink} + * SELECT * FROM ${stream} WHERE amount > 100`; + * ``` + */ +export const pipelineSql = ( + template: TemplateStringsArray, + ...args: ReadonlyArray< + Stream | Sink | Effect.Effect | Input + > +): Input => { + const resolved = args.map((arg) => { + if (isStream(arg)) return arg.streamName; + if (isSink(arg)) return arg.sinkName; + // Effects (e.g. an inline `Cloudflare.Sink(...)` call) are lifted into + // `Output` so the engine resolves them lazily; we map the result to + // `.streamName` / `.sinkName` once the resource materializes. Outputs + // are passed straight through — `Output.interpolate` folds them itself. + if (Output.isOutput(arg)) return arg; + if (Effect.isEffect(arg)) { + return Output.asOutput(arg as Effect.Effect).pipe( + Output.map((v: unknown) => + isStream(v) ? v.streamName : isSink(v) ? v.sinkName : v, + ), + ); + } + return arg; + }); + return Output.interpolate(template, ...(resolved as any[])); +}; + +const createPipelineName = (id: string, name: string | undefined) => + Effect.gen(function* () { + if (name) return name; + return (yield* createPhysicalName({ id, maxLength: 63 })) + .toLowerCase() + .replace(/-/g, "_"); + }); + +const findPipelineByName = (accountId: string, name: string) => + pipelines.listV1Pipeline.items({ accountId }).pipe( + StreamE.filter((p) => p.name === name), + StreamE.runHead, + Effect.map(Option.getOrUndefined), + ); + +const toTables = ( + tables: pipelines.GetV1PipelineResponse["tables"], +): PipelineTableInfo[] => + tables.map((t) => ({ + id: t.id, + name: t.name, + type: (t.type === "stream" ? "stream" : "sink") as "stream" | "sink", + version: t.version, + latest: t.latest, + })); + +export const PipelineProvider = () => + Provider.effect( + Pipeline, + Effect.gen(function* () { + const { accountId } = yield* CloudflareEnvironment; + const createV1Pipeline = yield* pipelines.createV1Pipeline; + const getV1Pipeline = yield* pipelines.getV1Pipeline; + const deleteV1Pipeline = yield* pipelines.deleteV1Pipeline; + const validateSqlPipeline = yield* pipelines.validateSqlPipeline; + + return { + stables: ["pipelineId", "pipelineName", "accountId"], + diff: Effect.fn(function* ({ id, olds, news, output }) { + if (!isResolved(news)) return undefined; + if ((output?.accountId ?? accountId) !== accountId) { + return { action: "replace" } as const; + } + const newName = yield* createPipelineName(id, news.name); + const oldName = + output?.pipelineName ?? (yield* createPipelineName(id, olds.name)); + if (newName !== oldName) { + return { action: "replace" } as const; + } + // SQL is the body of the pipeline; Cloudflare cannot update it in + // place. Replace on any change. + if (olds.sql !== news.sql) { + return { action: "replace" } as const; + } + }), + reconcile: Effect.fn(function* ({ id, news, output }) { + const acct = output?.accountId ?? accountId; + const name = yield* createPipelineName(id, news.name); + const sql = news.sql as string; + + // Observe — cached id, then name scan as a fallback. + let observed: pipelines.GetV1PipelineResponse | undefined; + if (output?.pipelineId) { + observed = yield* getV1Pipeline({ + accountId: acct, + pipelineId: output.pipelineId, + }).pipe( + Effect.catchTag("PipelineNotExists", () => + Effect.succeed(undefined), + ), + ); + } + if (!observed) { + const match = yield* findPipelineByName(acct, name); + if (match) { + observed = yield* getV1Pipeline({ + accountId: acct, + pipelineId: match.id, + }).pipe( + Effect.catchTag("PipelineNotExists", () => + Effect.succeed(undefined), + ), + ); + } + } + + if (observed) { + return { + pipelineId: observed.id, + pipelineName: observed.name, + sql: observed.sql, + status: observed.status, + tables: toTables(observed.tables), + createdAt: observed.createdAt, + accountId: acct, + }; + } + + // Validate before create when enabled — surfacing `InvalidSql` / + // `TableNotFound` here is cheaper than after a partial deploy. + if (news.validate !== false) { + yield* validateSqlPipeline({ accountId: acct, sql }); + } + + // Ensure — create. Tolerate the AlreadyExists race by re-resolving + // by name. + const created = yield* createV1Pipeline({ + accountId: acct, + name, + sql, + }).pipe( + Effect.catchTag("PipelineAlreadyExists", () => + Effect.gen(function* () { + const match = yield* findPipelineByName(acct, name); + if (!match) { + return yield* Effect.die( + `Cloudflare reported pipeline "${name}" already exists ` + + `but listV1Pipeline returned none. Retry the deploy.`, + ); + } + return yield* getV1Pipeline({ + accountId: acct, + pipelineId: match.id, + }); + }), + ), + ); + + const full = yield* getV1Pipeline({ + accountId: acct, + pipelineId: created.id, + }).pipe( + Effect.catchTag("PipelineNotExists", () => + Effect.succeed({ + ...created, + tables: [] as pipelines.GetV1PipelineResponse["tables"], + }), + ), + ); + + return { + pipelineId: full.id, + pipelineName: full.name, + sql: full.sql, + status: full.status, + tables: toTables(full.tables), + createdAt: full.createdAt, + accountId: acct, + }; + }), + delete: Effect.fn(function* ({ output }) { + yield* deleteV1Pipeline({ + accountId: output.accountId, + pipelineId: output.pipelineId, + }).pipe(Effect.catchTag("PipelineNotExists", () => Effect.void)); + }), + read: Effect.fn(function* ({ id, olds, output }) { + const acct = output?.accountId ?? accountId; + if (output?.pipelineId) { + return yield* getV1Pipeline({ + accountId: acct, + pipelineId: output.pipelineId, + }).pipe( + Effect.map((p) => ({ + pipelineId: p.id, + pipelineName: p.name, + sql: p.sql, + status: p.status, + tables: toTables(p.tables), + createdAt: p.createdAt, + accountId: acct, + })), + Effect.catchTag("PipelineNotExists", () => + Effect.succeed(undefined), + ), + ); + } + const name = yield* createPipelineName(id, olds?.name); + const match = yield* findPipelineByName(acct, name); + if (!match) return undefined; + const full = yield* getV1Pipeline({ + accountId: acct, + pipelineId: match.id, + }).pipe( + Effect.catchTag("PipelineNotExists", () => + Effect.succeed(undefined), + ), + ); + return full + ? { + pipelineId: full.id, + pipelineName: full.name, + sql: full.sql, + status: full.status, + tables: toTables(full.tables), + createdAt: full.createdAt, + accountId: acct, + } + : undefined; + }), + }; + }), + ); diff --git a/packages/alchemy/src/Cloudflare/Pipelines/Sink.ts b/packages/alchemy/src/Cloudflare/Pipelines/Sink.ts new file mode 100644 index 000000000..99e589511 --- /dev/null +++ b/packages/alchemy/src/Cloudflare/Pipelines/Sink.ts @@ -0,0 +1,553 @@ +import * as pipelines from "@distilled.cloud/cloudflare/pipelines"; +import * as Effect from "effect/Effect"; +import * as Option from "effect/Option"; +import * as Redacted from "effect/Redacted"; +import * as StreamE from "effect/Stream"; +import * as crypto from "node:crypto"; +import { deepEqual, isResolved } from "../../Diff.ts"; +import * as Output from "../../Output.ts"; +import { createPhysicalName } from "../../PhysicalName.ts"; +import type { InputProps } from "../../Input.ts"; +import * as Namespace from "../../Namespace.ts"; +import * as Provider from "../../Provider.ts"; +import { Resource } from "../../Resource.ts"; +import { + AccountApiToken, + type AccountApiToken as AccountApiTokenT, +} from "../ApiToken/AccountApiToken.ts"; +import type { ApiTokenPolicy } from "../ApiToken/Common.ts"; +import { CloudflareEnvironment } from "../CloudflareEnvironment.ts"; +import type { Providers } from "../Providers.ts"; +import { isR2Bucket, type R2Bucket } from "../R2/R2Bucket.ts"; + +// The `Input` wrapping (`Output | Effect | T`) is applied automatically by +// the `Resource(...)(props)` constructor — props here are declared with +// plain values, and the engine resolves them before they reach `reconcile`. + +export const isSink = (value: unknown): value is Sink => + typeof value === "object" && + (value as any)?.Type === "Cloudflare.PipelinesSink"; + +/** A Cloudflare R2 bucket reference accepted by {@link SinkProps}. */ +export type SinkBucketRef = R2Bucket | string; + +/** Output format for files written by an R2 sink. */ +export type SinkR2Format = + | { + type: "json"; + decimalEncoding?: "number" | "string" | "bytes"; + timestampFormat?: "rfc3339" | "unix_millis"; + unstructured?: boolean; + } + | { + type: "parquet"; + compression?: "uncompressed" | "snappy" | "gzip" | "zstd" | "lz4"; + rowGroupBytes?: number; + }; + +/** Output format for files written by an R2 Data Catalog sink (parquet only). */ +export type SinkDataCatalogFormat = { + type: "parquet"; + compression?: "uncompressed" | "snappy" | "gzip" | "zstd" | "lz4"; + rowGroupBytes?: number; +}; + +/** R2 sink file-naming policy. */ +export type SinkFileNaming = { + prefix?: string; + strategy?: "serial" | "uuid" | "uuid_v7" | "ulid"; + suffix?: string; +}; + +/** R2 sink time-bucket partitioning. */ +export type SinkPartitioning = { + /** strftime-style time pattern, e.g. `year=%Y/month=%m/day=%d`. */ + timePattern?: string; +}; + +/** Rolling policy controlling when a new file is written. */ +export type SinkRollingPolicy = { + fileSizeBytes?: number; + inactivitySeconds?: number; + intervalSeconds?: number; +}; + +/** Credentials accepted by an R2 sink — auto-provisioned when omitted. */ +export type SinkR2Credentials = { + /** + * S3-style access key id. The access-key portion is technically not + * sensitive on its own, but we still wrap it in `Redacted` to keep both + * halves of the credential out of plaintext logs / state. + */ + accessKeyId: Redacted.Redacted; + /** S3-style secret access key. */ + secretAccessKey: Redacted.Redacted; +}; + +/** Credentials accepted by an R2 Data Catalog sink — auto-provisioned when omitted. */ +export type SinkDataCatalogCredentials = { + /** Bearer token for the catalog API. */ + token: Redacted.Redacted; +}; + +export type R2SinkProps = { + type: "r2"; + /** Target R2 bucket — pass an {@link R2Bucket} resource or a bucket name. */ + bucket: SinkBucketRef; + /** + * Sink name. If omitted, a unique name is generated. + * @default ${app}_${stage}_${id} + */ + name?: string; + /** R2 jurisdiction (defaults to `default`). */ + jurisdiction?: "default" | "eu" | "fedramp"; + /** Path prefix written under the bucket. */ + path?: string; + /** File naming policy. */ + fileNaming?: SinkFileNaming; + /** Time-bucket partitioning. */ + partitioning?: SinkPartitioning; + /** Rolling policy controlling when a new file is written. */ + rollingPolicy?: SinkRollingPolicy; + /** Output format. */ + format?: SinkR2Format; + /** + * Explicit S3-style credentials. Required when `bucket` is a bucket name. + * When `bucket` is an {@link R2Bucket} resource and these are omitted, an + * {@link AccountApiToken} is auto-provisioned and the derived S3 creds + * (access key = token id, secret = sha256(token value)) are used. + */ + credentials?: SinkR2Credentials; +}; + +export type R2DataCatalogSinkProps = { + type: "r2_data_catalog"; + /** Target R2 bucket — pass an {@link R2Bucket} resource or a bucket name. */ + bucket: SinkBucketRef; + /** Iceberg table name. */ + tableName: string; + /** Iceberg namespace. */ + namespace?: string; + /** + * Sink name. If omitted, a unique name is generated. + * @default ${app}_${stage}_${id} + */ + name?: string; + /** Rolling policy controlling when a new file is written. */ + rollingPolicy?: SinkRollingPolicy; + /** Output format. R2 Data Catalog only supports `parquet`. */ + format?: SinkDataCatalogFormat; + /** + * Explicit catalog API token. Required when `bucket` is a bucket name. + * When `bucket` is an {@link R2Bucket} resource and this is omitted, an + * {@link AccountApiToken} with `Workers R2 Data Catalog Write` is + * auto-provisioned. + */ + credentials?: SinkDataCatalogCredentials; +}; + +/** + * Discriminated props for a Cloudflare Pipelines {@link Sink}. Sinks are + * **immutable** in Cloudflare's API — any prop change triggers a replace. + */ +export type SinkProps = R2SinkProps | R2DataCatalogSinkProps; + +export type Sink = Resource< + "Cloudflare.PipelinesSink", + SinkProps, + { + sinkId: string; + sinkName: string; + /** `"r2"` for a raw-files sink, `"r2_data_catalog"` for an Iceberg sink. */ + sinkType: "r2" | "r2_data_catalog"; + createdAt: string; + accountId: string; + }, + never, + Providers +>; + +const SinkResource = Resource("Cloudflare.PipelinesSink"); + +/** + * A Cloudflare Pipelines Sink — the destination a {@link Pipeline} writes to. + * Supports two flavors: raw files (`r2`) written to an R2 bucket, and Iceberg + * tables (`r2_data_catalog`) in R2 Data Catalog. + * + * Sinks are immutable: any prop change triggers a replace. + * + * When `bucket` is an {@link R2Bucket} resource and no `credentials` are + * supplied, a scoped {@link AccountApiToken} is auto-provisioned and the + * sink's credentials are derived from it (S3 creds for `r2`, bearer token + * for `r2_data_catalog`). + * + * @section Creating a Sink + * @example R2 sink (auto-provisioned credentials) + * ```typescript + * const bucket = yield* Cloudflare.R2Bucket("Lake"); + * const sink = yield* Cloudflare.Sink("Files", { + * type: "r2", + * bucket, + * format: { type: "parquet", compression: "zstd" }, + * }); + * ``` + * + * @example R2 Data Catalog sink (Apache Iceberg) + * ```typescript + * const sink = yield* Cloudflare.Sink("Lakehouse", { + * type: "r2_data_catalog", + * bucket, + * namespace: "analytics", + * tableName: "events", + * }); + * ``` + * + * @example Explicit credentials + * Pass `credentials` alongside a bucket-name string to skip auto-provisioning. + * Wrap both halves in `Redacted` so the secret never leaks to logs. + * ```typescript + * const sink = yield* Cloudflare.Sink("Files", { + * type: "r2", + * bucket: "my-bucket", + * credentials: { + * accessKeyId: Redacted.make(env.ACCESS_KEY), + * secretAccessKey: Redacted.make(env.SECRET), + * }, + * }); + * ``` + */ +export const Sink: { + ( + id: string, + props: + | InputProps + | Effect.Effect, never, Req>, + ): Effect.Effect; + /** @internal — exposed for `Provider.collection` registration only. */ + Resource: typeof SinkResource; +} = Object.assign( + ((id: string, propsEff: any) => + Effect.gen(function* () { + const props = Effect.isEffect(propsEff) + ? ((yield* propsEff) as InputProps) + : (propsEff as InputProps); + + if (props.credentials) { + return yield* SinkResource("Sink", props as any); + } + if (typeof props.bucket === "string" || !isR2Bucket(props.bucket)) { + return yield* Effect.die( + `Cloudflare.Sink("${id}"): explicit credentials are required when ` + + `bucket is a plain string. Pass an R2Bucket resource to ` + + `auto-provision an AccountApiToken instead.`, + ); + } + const { accountId } = yield* CloudflareEnvironment; + if (props.type === "r2") { + const token = yield* AccountApiToken("Token", { + policies: r2StoragePolicies(accountId), + }); + return yield* SinkResource("Sink", { + ...props, + credentials: deriveR2Credentials(token), + } as any); + } + const token = yield* AccountApiToken("Token", { + policies: dataCatalogPolicies(accountId), + }); + return yield* SinkResource("Sink", { + ...props, + credentials: { token: token.value }, + } as any); + }).pipe(Namespace.push(id))) as any, + { Resource: SinkResource }, +); + +const createSinkName = (id: string, name: string | undefined) => + Effect.gen(function* () { + if (name) return name; + return (yield* createPhysicalName({ id, maxLength: 63 })) + .toLowerCase() + .replace(/-/g, "_"); + }); + +const findSinkByName = (accountId: string, name: string) => + pipelines.listSinks.items({ accountId }).pipe( + StreamE.filter((s) => s.name === name), + StreamE.runHead, + Effect.map(Option.getOrUndefined), + ); + +const sha256Hex = (input: string): string => + crypto.createHash("sha256").update(input).digest("hex"); + +const r2StoragePolicies = (accountId: string): ApiTokenPolicy[] => [ + { + effect: "allow", + permissionGroups: ["Workers R2 Storage Write", "Workers R2 Storage Read"], + resources: { [`com.cloudflare.api.account.${accountId}`]: "*" }, + }, +]; + +const dataCatalogPolicies = (accountId: string): ApiTokenPolicy[] => [ + { + effect: "allow", + permissionGroups: [ + "Workers R2 Data Catalog Write", + "Workers R2 Data Catalog Read", + "Workers R2 Storage Write", + "Workers R2 Storage Read", + ], + resources: { [`com.cloudflare.api.account.${accountId}`]: "*" }, + }, +]; + +/** + * Derive S3-style credentials from an {@link AccountApiToken}'s outputs: + * `accessKeyId = token.tokenId`, `secretAccessKey = sha256(token.value)`. + * Both halves are wrapped in {@link Redacted} so the values never leak to + * logs or alchemy state outside the runtime context. + * + * The returned values are `Output>` — valid `Input<>` + * values that the Resource constructor unwraps before they reach the + * provider's `reconcile` hook. + */ +const deriveR2Credentials = ( + token: AccountApiTokenT, +): InputProps => ({ + accessKeyId: token.tokenId.pipe(Output.map((id) => Redacted.make(id))), + secretAccessKey: token.value.pipe( + Output.map((value) => Redacted.make(sha256Hex(Redacted.value(value)))), + ), +}); + +const resolveBucketName = (bucket: SinkBucketRef): string => { + if (typeof bucket === "string") return bucket; + return (bucket as any).bucketName as string; +}; + +const resolveJurisdiction = ( + bucket: SinkBucketRef, + override: string | undefined, +): string | undefined => { + if (override) return override; + if (typeof bucket === "string") return undefined; + const j = (bucket as any).jurisdiction as string | undefined; + return j === "default" ? undefined : j; +}; + +const buildCreateRequest = ( + accountId: string, + name: string, + props: SinkProps, +): pipelines.CreateSinkRequest => { + if (props.type === "r2") { + if (!props.credentials) { + throw new Error( + `Cloudflare.Sink("${name}"): unresolved credentials at create time`, + ); + } + return { + accountId, + name, + type: "r2", + config: { + accountId, + bucket: resolveBucketName(props.bucket), + credentials: { + accessKeyId: Redacted.value(props.credentials.accessKeyId), + secretAccessKey: Redacted.value(props.credentials.secretAccessKey), + }, + jurisdiction: resolveJurisdiction(props.bucket, props.jurisdiction), + path: props.path, + fileNaming: props.fileNaming, + partitioning: props.partitioning, + rollingPolicy: props.rollingPolicy, + }, + format: props.format as pipelines.CreateSinkRequest["format"], + }; + } + // r2_data_catalog + if (!props.credentials) { + throw new Error( + `Cloudflare.Sink("${name}"): unresolved catalog token at create time`, + ); + } + return { + accountId, + name, + type: "r2_data_catalog", + config: { + accountId, + bucket: resolveBucketName(props.bucket), + tableName: props.tableName, + namespace: props.namespace, + token: Redacted.value(props.credentials.token), + rollingPolicy: props.rollingPolicy, + }, + format: props.format as pipelines.CreateSinkRequest["format"], + }; +}; + +const sinkPropsFingerprint = (props: SinkProps | undefined): string => + JSON.stringify({ + type: props?.type, + // Bucket may be a resource ref or string; serialize by bucketName. + bucket: + typeof (props as any)?.bucket === "string" + ? (props as any).bucket + : (props as any)?.bucket?.bucketName, + rest: + props?.type === "r2" + ? { + jurisdiction: props.jurisdiction, + path: props.path, + fileNaming: props.fileNaming, + partitioning: props.partitioning, + rollingPolicy: props.rollingPolicy, + format: props.format, + } + : props?.type === "r2_data_catalog" + ? { + tableName: props.tableName, + namespace: props.namespace, + rollingPolicy: props.rollingPolicy, + format: props.format, + } + : {}, + }); + +export const SinkProvider = () => + Provider.effect( + SinkResource, + Effect.gen(function* () { + const { accountId } = yield* CloudflareEnvironment; + const createSink = yield* pipelines.createSink; + const getSink = yield* pipelines.getSink; + const deleteSink = yield* pipelines.deleteSink; + + return { + stables: ["sinkId", "sinkName", "sinkType", "accountId"], + diff: Effect.fn(function* ({ id, olds, news, output }) { + if (!isResolved(news)) return undefined; + if ((output?.accountId ?? accountId) !== accountId) { + return { action: "replace" } as const; + } + const newName = yield* createSinkName(id, (news as SinkProps).name); + const oldName = + output?.sinkName ?? + (yield* createSinkName(id, (olds as SinkProps).name)); + if (newName !== oldName) { + return { action: "replace" } as const; + } + if ( + !deepEqual( + sinkPropsFingerprint(olds as SinkProps), + sinkPropsFingerprint(news as SinkProps), + ) + ) { + return { action: "replace" } as const; + } + }), + reconcile: Effect.fn(function* ({ id, news, output }) { + const acct = output?.accountId ?? accountId; + const props = news as SinkProps; + const name = yield* createSinkName(id, props.name); + + // Observe — cached id, fall back to name scan. + let observed: pipelines.GetSinkResponse | undefined; + if (output?.sinkId) { + observed = yield* getSink({ + accountId: acct, + sinkId: output.sinkId, + }).pipe( + Effect.catchTag("SinkNotFound", () => Effect.succeed(undefined)), + Effect.catchTag("InvalidSinkId", () => Effect.succeed(undefined)), + ); + } + if (!observed) { + const match = yield* findSinkByName(acct, name); + if (match) { + observed = yield* getSink({ + accountId: acct, + sinkId: match.id, + }).pipe( + Effect.catchTag("SinkNotFound", () => + Effect.succeed(undefined), + ), + ); + } + } + + // Ensure — sinks are immutable. The diff function flagged any + // mutation as a replace, so we only land here on first create or + // crash-recovery from a peer reconciler. + if (!observed) { + const created = yield* createSink( + buildCreateRequest(acct, name, props), + ).pipe( + Effect.catchTag("SinkAlreadyExists", () => + Effect.gen(function* () { + const match = yield* findSinkByName(acct, name); + if (!match) { + return yield* Effect.die( + `Cloudflare reported sink "${name}" already exists ` + + `but listSinks returned none. Retry the deploy.`, + ); + } + return yield* getSink({ + accountId: acct, + sinkId: match.id, + }); + }), + ), + ); + observed = yield* getSink({ + accountId: acct, + sinkId: created.id, + }).pipe( + Effect.catchTag("SinkNotFound", () => Effect.succeed(created)), + ); + } + + return { + sinkId: observed.id, + sinkName: observed.name, + sinkType: (observed.type === "r2_data_catalog" + ? "r2_data_catalog" + : "r2") as "r2" | "r2_data_catalog", + createdAt: observed.createdAt, + accountId: acct, + }; + }), + delete: Effect.fn(function* ({ output }) { + // Sinks are idempotent on delete (Cloudflare returns success for + // a missing id). The engine deletes dependent pipelines first via + // the dependency graph, so we never need `force:"true"`. + yield* deleteSink({ + accountId: output.accountId, + sinkId: output.sinkId, + }); + }), + read: Effect.fn(function* ({ output }) { + if (!output?.sinkId) return undefined; + return yield* getSink({ + accountId: output.accountId, + sinkId: output.sinkId, + }).pipe( + Effect.map((s) => ({ + sinkId: s.id, + sinkName: s.name, + sinkType: (s.type === "r2_data_catalog" + ? "r2_data_catalog" + : "r2") as "r2" | "r2_data_catalog", + createdAt: s.createdAt, + accountId: output.accountId, + })), + Effect.catchTag("SinkNotFound", () => Effect.succeed(undefined)), + Effect.catchTag("InvalidSinkId", () => Effect.succeed(undefined)), + ); + }), + }; + }), + ); diff --git a/packages/alchemy/src/Cloudflare/Pipelines/Stream.ts b/packages/alchemy/src/Cloudflare/Pipelines/Stream.ts new file mode 100644 index 000000000..18bec9fb1 --- /dev/null +++ b/packages/alchemy/src/Cloudflare/Pipelines/Stream.ts @@ -0,0 +1,467 @@ +import * as pipelines from "@distilled.cloud/cloudflare/pipelines"; +import * as Effect from "effect/Effect"; +import * as Option from "effect/Option"; +import * as StreamE from "effect/Stream"; +import { deepEqual, isResolved } from "../../Diff.ts"; +import { createPhysicalName } from "../../PhysicalName.ts"; +import * as Provider from "../../Provider.ts"; +import { Resource } from "../../Resource.ts"; +import { CloudflareEnvironment } from "../CloudflareEnvironment.ts"; +import type { Providers } from "../Providers.ts"; +import { StreamBinding } from "./StreamBinding.ts"; +import { + compileStreamSchema, + type StreamSchemaFieldList, + type StreamSchemaInput, +} from "./StreamSchema.ts"; + +export const isStream = (value: unknown): value is Stream => + typeof value === "object" && + (value as any)?.Type === "Cloudflare.PipelinesStream"; + +/** + * Output / wire-format of the stream's structured schema. Cloudflare uses the + * same `{ fields: [...] }` shape on read as on create. + */ +export type StreamSchema = StreamSchemaFieldList; + +/** + * Output format of records written to the stream. + */ +export type StreamFormat = + | { + type: "json"; + decimalEncoding?: "number" | "string" | "bytes"; + timestampFormat?: "rfc3339" | "unix_millis"; + unstructured?: boolean; + } + | { + type: "parquet"; + compression?: "uncompressed" | "snappy" | "gzip" | "zstd" | "lz4"; + rowGroupBytes?: number; + }; + +export type StreamHttpSettings = { + /** + * Whether HTTP ingestion is enabled. + * @default true + */ + enabled?: boolean; + /** + * Whether HTTP ingest requires the `Authorization: Bearer ` header. + * @default false + */ + authentication?: boolean; + /** + * Optional CORS allowlist for browser-based ingestion. + */ + cors?: { + origins?: string[]; + }; +}; + +export type StreamWorkerBindingSettings = { + /** + * Whether the Worker `pipelines` binding is enabled for this stream. + * @default true + */ + enabled?: boolean; +}; + +export type StreamProps = { + /** + * Stream name. If omitted, a unique name is generated. Must match + * Cloudflare's stream-name rules (lowercase letters, digits, underscores). + * @default ${app}_${stage}_${id} + */ + name?: string; + /** + * Schema for incoming events. Pass an `effect/Schema` struct (compiled to + * Cloudflare's field list via {@link compileStreamSchema}) or the raw + * `{ fields: [...] }` payload as an escape hatch. Omit for an + * **unstructured** stream that stores arbitrary JSON in a single `value` + * column. + */ + schema?: StreamSchemaInput; + /** + * Output format used by downstream pipelines. + */ + format?: StreamFormat; + /** + * HTTP ingest configuration. + */ + http?: StreamHttpSettings; + /** + * Worker binding configuration. + * @default { enabled: true } + */ + workerBinding?: StreamWorkerBindingSettings; +}; + +export type Stream = Resource< + "Cloudflare.PipelinesStream", + StreamProps, + { + /** The stream's public Cloudflare id (used by the `pipelines` Worker binding). */ + streamId: string; + /** The stream's name. */ + streamName: string; + /** The HTTP ingest endpoint, when enabled. */ + endpoint: string | undefined; + /** Cloudflare-reported config version. */ + version: number; + http: { + enabled: boolean; + authentication: boolean; + cors: { origins: string[] | undefined } | undefined; + }; + workerBinding: { enabled: boolean }; + /** Resolved schema fields (or undefined for an unstructured stream). */ + schema: StreamSchema | undefined; + format: StreamFormat | undefined; + accountId: string; + }, + never, + Providers +>; + +/** + * A Cloudflare Pipelines Stream — a durable, buffered queue that receives + * events via HTTP ingest, Worker bindings, or Logpush, for downstream + * consumption by a {@link Pipeline}. + * + * Streams are **write-only**: producers write into them and a SQL Pipeline is + * the only consumer. + * + * @section Creating a Stream + * @example Unstructured stream + * ```typescript + * const events = yield* Cloudflare.Stream("Events"); + * ``` + * + * @example Structured stream defined with the SQL kit + * Pass fields directly — `schema` accepts a plain record of `Schema` values + * (it's wrapped in `Schema.Struct(...)` internally), a full `Schema.Struct` + * value, or the raw `{ fields: [...] }` escape hatch. + * ```typescript + * import * as SQL from "alchemy/Cloudflare/SQL"; + * + * const events = yield* Cloudflare.Stream("Events", { + * schema: { + * user_id: SQL.String, + * amount: SQL.optional(SQL.Number), + * at: SQL.Timestamp, + * }, + * }); + * ``` + * + * @section Binding to a Worker + * @example Sending records from a Worker + * ```typescript + * const events = yield* Cloudflare.Stream.bind(Events); + * yield* events.send({ user_id: "u1", amount: 42 }); + * ``` + */ +export const Stream = Resource("Cloudflare.PipelinesStream")({ + bind: StreamBinding.bind, +}); + +const createStreamName = (id: string, name: string | undefined) => + Effect.gen(function* () { + if (name) return name; + // Cloudflare requires lowercase + underscores for stream names; mirror + // the convention so generated names always parse. + return (yield* createPhysicalName({ id, maxLength: 63 })) + .toLowerCase() + .replace(/-/g, "_"); + }); + +type ObservedStream = NonNullable; + +const findStreamByName = (accountId: string, name: string) => + pipelines.listStreams.items({ accountId }).pipe( + StreamE.filter((s) => s.name === name), + StreamE.runHead, + Effect.map(Option.getOrUndefined), + ); + +const normalizeFields = ( + fields: NonNullable["fields"] | null | undefined, +): StreamSchemaFieldList["fields"] | undefined => { + if (!fields) return undefined; + // Strip `null` from the optional fields on the response shape so equality + // against compiled (no-null) inputs works. + return fields.map((f) => { + if (!f || typeof f !== "object") return f; + const out: Record = {}; + for (const [k, v] of Object.entries(f as Record)) { + if (v != null) out[k] = v; + } + return out; + }) as StreamSchemaFieldList["fields"]; +}; + +const observedHttp = (http: ObservedStream["http"]) => ({ + enabled: http.enabled, + authentication: http.authentication, + cors: http.cors ? { origins: http.cors.origins ?? undefined } : undefined, +}); + +const observedFormat = ( + format: ObservedStream["format"], +): StreamFormat | undefined => { + if (!format) return undefined; + if (format.type === "json") { + return { + type: "json", + decimalEncoding: + (format.decimalEncoding as StreamFormat extends infer F + ? F extends { type: "json"; decimalEncoding?: infer D } + ? D + : never + : never) ?? undefined, + timestampFormat: (format.timestampFormat as any) ?? undefined, + unstructured: format.unstructured ?? undefined, + } as StreamFormat; + } + if (format.type === "parquet") { + return { + type: "parquet", + compression: (format.compression as any) ?? undefined, + rowGroupBytes: format.rowGroupBytes ?? undefined, + } as StreamFormat; + } + return undefined; +}; + +const observedSchema = ( + schema: ObservedStream["schema"], +): StreamSchema | undefined => { + if (!schema || !schema.fields) return undefined; + return { fields: normalizeFields(schema.fields) }; +}; + +const desiredHttp = ( + http: StreamHttpSettings | undefined, +): { + authentication: boolean; + enabled: boolean; + cors?: { origins?: string[] }; +} => ({ + authentication: http?.authentication ?? false, + enabled: http?.enabled ?? true, + cors: http?.cors?.origins ? { origins: http.cors.origins } : undefined, +}); + +const desiredWorkerBinding = ( + workerBinding: StreamWorkerBindingSettings | undefined, +): { enabled: boolean } => ({ + enabled: workerBinding?.enabled ?? true, +}); + +const toAttrs = ( + observed: ObservedStream, + accountId: string, +): Stream["Attributes"] => ({ + streamId: observed.id, + streamName: observed.name, + endpoint: observed.endpoint ?? undefined, + version: observed.version, + http: observedHttp(observed.http), + workerBinding: { enabled: observed.workerBinding.enabled }, + schema: observedSchema(observed.schema), + format: observedFormat(observed.format), + accountId, +}); + +export const StreamProvider = () => + Provider.effect( + Stream, + Effect.gen(function* () { + const { accountId } = yield* CloudflareEnvironment; + const createStream = yield* pipelines.createStream; + const getStream = yield* pipelines.getStream; + const patchStream = yield* pipelines.patchStream; + const deleteStream = yield* pipelines.deleteStream; + + return { + stables: ["streamId", "streamName", "accountId"], + diff: Effect.fn(function* ({ id, olds = {}, news = {}, output }) { + if (!isResolved(news)) return undefined; + if ((output?.accountId ?? accountId) !== accountId) { + return { action: "replace" } as const; + } + const newName = yield* createStreamName(id, news.name); + const oldName = + output?.streamName ?? (yield* createStreamName(id, olds.name)); + if (newName !== oldName) { + return { action: "replace" } as const; + } + // Format and schema are baked at creation time and cannot be + // patched in place; treat any change as a replace. + if (!deepEqual(olds.format, news.format, { stripNullish: true })) { + return { action: "replace" } as const; + } + if (!deepEqual(olds.schema, news.schema, { stripNullish: true })) { + return { action: "replace" } as const; + } + }), + reconcile: Effect.fn(function* ({ id, news = {}, output }) { + const acct = output?.accountId ?? accountId; + const name = yield* createStreamName(id, news.name); + + // Observe — fetch cached, fall back to a name scan if the cached + // id is stale. listStreams paginates and has no name filter, so + // we use the items stream. + let observed: ObservedStream | undefined; + if (output?.streamId) { + observed = yield* getStream({ + accountId: acct, + streamId: output.streamId, + }).pipe( + Effect.catchTag("StreamNotFound", () => + Effect.succeed(undefined), + ), + Effect.catchTag("InvalidStreamId", () => + Effect.succeed(undefined), + ), + ); + } + if (!observed) { + const match = yield* findStreamByName(acct, name); + if (match) { + observed = yield* getStream({ + accountId: acct, + streamId: match.id, + }).pipe( + Effect.catchTag("StreamNotFound", () => + Effect.succeed(undefined), + ), + ); + } + } + + // Ensure — create if missing. Compile the user-facing + // effect/Schema (if any) just-in-time so plan-time refs resolve. + if (!observed) { + const schema = news.schema + ? yield* compileStreamSchema(news.schema).pipe( + Effect.mapError( + (e) => + new Error(`Cloudflare.Stream("${id}"): ${e.message}`), + ), + ) + : undefined; + + const created = yield* createStream({ + accountId: acct, + name, + format: news.format, + http: desiredHttp(news.http), + schema, + workerBinding: desiredWorkerBinding(news.workerBinding), + }).pipe( + // Race: a peer reconciler beat us to the create. Re-resolve + // by name and continue the sync path so observed != undefined. + Effect.catchTag("StreamAlreadyExists", () => + Effect.gen(function* () { + const match = yield* findStreamByName(acct, name); + if (!match) { + return yield* Effect.die( + `Cloudflare reported stream "${name}" already exists ` + + `but listStreams returned none. Retry the deploy; ` + + `if this persists, the stream is in an inconsistent state.`, + ); + } + return yield* getStream({ + accountId: acct, + streamId: match.id, + }); + }), + ), + ); + + // Probed: createStream → getStream returns success on the very + // next call, so no consistency retry is needed here. Catch + // StreamNotFound defensively in case the create result is stale. + observed = yield* getStream({ + accountId: acct, + streamId: created.id, + }).pipe( + Effect.catchTag("StreamNotFound", () => Effect.succeed(created)), + ); + } + + // Sync — http + workerBinding are the only mutable settings. + // Diff against observed cloud state, not olds, so adoption and + // out-of-band drift converge. + const desiredHttpSettings = desiredHttp(news.http); + const observedHttpSettings = observedHttp(observed.http); + const desiredWb = desiredWorkerBinding(news.workerBinding); + + const httpDrift = + observedHttpSettings.enabled !== desiredHttpSettings.enabled || + observedHttpSettings.authentication !== + desiredHttpSettings.authentication || + !deepEqual( + observedHttpSettings.cors?.origins, + desiredHttpSettings.cors?.origins, + { stripNullish: true }, + ); + + const wbDrift = observed.workerBinding.enabled !== desiredWb.enabled; + + if (httpDrift || wbDrift) { + observed = yield* patchStream({ + accountId: acct, + streamId: observed.id, + http: httpDrift ? desiredHttpSettings : undefined, + workerBinding: wbDrift ? desiredWb : undefined, + }); + } + + return toAttrs(observed, acct); + }), + delete: Effect.fn(function* ({ output }) { + // deleteStream is idempotent on a missing id and returns success + // — the distilled error union for this op only includes + // `PipelineNotExists`, which Cloudflare raises when `force:"true"` + // is passed against a stream that has no dependent pipeline. + // We do NOT pass `force`, so the only error path we catch is the + // dependency one (in case the engine ever calls delete in a + // different order than the dependency graph dictates). + yield* deleteStream({ + accountId: output.accountId, + streamId: output.streamId, + }).pipe(Effect.catchTag("PipelineNotExists", () => Effect.void)); + }), + read: Effect.fn(function* ({ id, olds, output }) { + const acct = output?.accountId ?? accountId; + if (output?.streamId) { + return yield* getStream({ + accountId: acct, + streamId: output.streamId, + }).pipe( + Effect.map((s) => toAttrs(s, acct)), + Effect.catchTag("StreamNotFound", () => + Effect.succeed(undefined), + ), + Effect.catchTag("InvalidStreamId", () => + Effect.succeed(undefined), + ), + ); + } + const name = yield* createStreamName(id, olds?.name); + const match = yield* findStreamByName(acct, name); + if (!match) return undefined; + const full = yield* getStream({ + accountId: acct, + streamId: match.id, + }).pipe( + Effect.catchTag("StreamNotFound", () => Effect.succeed(undefined)), + ); + return full ? toAttrs(full, acct) : undefined; + }), + }; + }), + ); diff --git a/packages/alchemy/src/Cloudflare/Pipelines/StreamBinding.ts b/packages/alchemy/src/Cloudflare/Pipelines/StreamBinding.ts new file mode 100644 index 000000000..1b2765843 --- /dev/null +++ b/packages/alchemy/src/Cloudflare/Pipelines/StreamBinding.ts @@ -0,0 +1,124 @@ +import * as Data from "effect/Data"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Binding from "../../Binding.ts"; +import type { ResourceLike } from "../../Resource.ts"; +import type { RuntimeContext } from "../../RuntimeContext.ts"; +import { isWorker, WorkerEnvironment } from "../Workers/Worker.ts"; +import type { Stream } from "./Stream.ts"; + +/** + * The runtime shape of a Pipelines `pipelines` binding (Cloudflare exposes + * `env.X.send(records)` where each record is a JSON-serializable value). + */ +export interface PipelinesSendBinding { + send(records: ReadonlyArray): Promise; +} + +/** Raised when `env.X.send(records)` rejects at runtime. */ +export class StreamSendError extends Data.TaggedError("StreamSendError")<{ + message: string; + cause: unknown; +}> {} + +/** + * Runtime client returned by `Stream.bind(stream)`. Schema-typed when the + * underlying stream was defined with an `effect/Schema` struct; falls back + * to `unknown` when the schema is opaque. + */ +export interface StreamSender { + /** The raw Cloudflare `pipelines` binding from `env`. */ + raw: Effect.Effect; + /** Send one record. */ + send(record: Record): Effect.Effect; + /** Send a batch of records. */ + sendBatch( + records: ReadonlyArray, + ): Effect.Effect; +} + +/** + * Runtime binding service for {@link Stream}. Resolves the native Cloudflare + * `pipelines` binding from the Worker's `env` and wraps it with `send` / + * `sendBatch` Effect callables. + * + * Not part of the public surface — callers reach this via `Stream.bind(stream)`, + * which the {@link Stream} resource wires through `Resource(...)({ bind })`. + */ +export class StreamBinding extends Binding.Service< + StreamBinding, + (stream: Stream) => Effect.Effect +>()("Cloudflare.PipelinesStream") {} + +/** + * Runtime layer for {@link StreamBinding}. Provide this in your Worker's + * runtime layer so `Stream.bind(stream)` resolves at request time. + */ +export const StreamBindingLive = Layer.effect( + StreamBinding, + Effect.gen(function* () { + const bind = yield* StreamBindingPolicy; + const env = yield* WorkerEnvironment; + + return Effect.fn(function* (stream: Stream) { + yield* bind(stream); + const raw = Effect.sync( + () => (env as Record)[stream.LogicalId]!, + ); + + const tryPromise = (fn: () => Promise) => + Effect.tryPromise({ + try: fn, + catch: (error: any) => + new StreamSendError({ + message: error?.message ?? "Unknown error sending to stream", + cause: error, + }), + }); + + return { + raw, + send: (record: unknown) => + raw.pipe(Effect.flatMap((b) => tryPromise(() => b.send([record])))), + sendBatch: (records: ReadonlyArray) => + raw.pipe( + Effect.flatMap((b) => + tryPromise(() => b.send(records as ReadonlyArray)), + ), + ), + } satisfies StreamSender as StreamSender; + }); + }), +); + +/** + * Deploy-time policy that records the native `pipelines` binding on the host + * Worker. The `pipeline` field carries the Cloudflare-side **stream id** + * (per Cloudflare's `pipelines` binding contract). + */ +export class StreamBindingPolicy extends Binding.Policy< + StreamBindingPolicy, + (stream: Stream) => Effect.Effect +>()("Cloudflare.PipelinesStream") {} + +export const StreamBindingPolicyLive = StreamBindingPolicy.layer.succeed( + Effect.fnUntraced(function* (host: ResourceLike, stream: Stream) { + if (isWorker(host)) { + yield* host.bind`${stream}`({ + bindings: [ + { + type: "pipelines", + name: stream.LogicalId, + pipeline: stream.streamId, + }, + ], + }); + } else { + return yield* Effect.die( + new Error( + `StreamBindingPolicy does not support runtime '${host.Type}'`, + ), + ); + } + }), +); diff --git a/packages/alchemy/src/Cloudflare/Pipelines/StreamSchema.ts b/packages/alchemy/src/Cloudflare/Pipelines/StreamSchema.ts new file mode 100644 index 000000000..192b5fefe --- /dev/null +++ b/packages/alchemy/src/Cloudflare/Pipelines/StreamSchema.ts @@ -0,0 +1,368 @@ +import type * as pipelines from "@distilled.cloud/cloudflare/pipelines"; +import * as Data from "effect/Data"; +import * as Effect from "effect/Effect"; +import * as S from "effect/Schema"; +import * as AST from "effect/SchemaAST"; + +/** + * The shape Cloudflare expects for a stream's structured-event schema. We + * compile an `effect/Schema` struct down to this list. + */ +export type StreamSchemaFieldList = NonNullable< + pipelines.CreateStreamRequest["schema"] +>; + +/** + * A single field entry inside {@link StreamSchemaFieldList}. The distilled + * union admits the `unknown` escape hatch for forward-compat types; we always + * emit a concrete tag. + */ +export type StreamSchemaField = NonNullable< + StreamSchemaFieldList["fields"] +>[number]; + +/** + * Accepted shapes for {@link compileStreamSchema}: + * + * - A plain field record (`{ user_id: SQL.String, ... }`) — the + * ergonomic default; internally wrapped in `Schema.Struct(...)`. + * - A `Schema.Struct(...)` value — useful when the schema is reused + * across resources. + * - A raw `{ fields: [...] }` payload — escape hatch for types not yet + * handled by the compiler. + */ +export type StreamSchemaInput = S.Struct.Fields | S.Top | StreamSchemaFieldList; + +/** Raised when {@link compileStreamSchema} hits an AST node it can't map. */ +export class UnsupportedStreamSchemaNode extends Data.TaggedError( + "UnsupportedStreamSchemaNode", +)<{ + message: string; + path: ReadonlyArray; + tag: string; +}> {} + +// --------------------------------------------------------------------------- +// Brand helpers — Cloudflare distinguishes int/float widths and timestamp +// units that plain `Schema.Number` / `Schema.Date` cannot express. We carry +// the precision as an AST annotation and read it back in the compiler. +// --------------------------------------------------------------------------- + +const CloudflareTypeKey = "alchemy.cloudflare.pipelines.streamType"; + +type CloudflareNumericTag = + | "int32" + | "int64" + | "float32" + | "float64" + | "timestamp_ms" + | "timestamp_s" + | "timestamp_us" + | "timestamp_ns"; + +const numericBrand = (tag: CloudflareNumericTag) => + S.Number.annotate({ [CloudflareTypeKey]: tag }); + +const timestampBrand = ( + unit: "second" | "millisecond" | "microsecond" | "nanosecond", +) => + S.Date.annotate({ + [CloudflareTypeKey]: + unit === "second" + ? "timestamp_s" + : unit === "millisecond" + ? "timestamp_ms" + : unit === "microsecond" + ? "timestamp_us" + : "timestamp_ns", + }); + +const readCloudflareTag = (ast: AST.AST): CloudflareNumericTag | undefined => { + const annotations = (ast as { annotations?: Record }) + .annotations; + const tag = annotations?.[CloudflareTypeKey]; + return typeof tag === "string" ? (tag as CloudflareNumericTag) : undefined; +}; + +/** + * A 32-bit signed integer field. Compiles to `{ type: "int32" }`. + */ +export const Int32 = numericBrand("int32"); + +/** + * A 64-bit signed integer field. Compiles to `{ type: "int64" }`. + */ +export const Int64 = numericBrand("int64"); + +/** + * A 32-bit float field. Compiles to `{ type: "float32" }`. + */ +export const Float32 = numericBrand("float32"); + +/** + * A 64-bit float field. Compiles to `{ type: "float64" }`. + * + * Use this when you want to be explicit; bare `Schema.Number` already + * compiles to `float64`. + */ +export const Float64 = numericBrand("float64"); + +/** + * A timestamp field with millisecond precision (the most common shape). + * Compiles to `{ type: "timestamp", unit: "millisecond" }`. + */ +export const Timestamp = timestampBrand("millisecond"); + +/** + * A timestamp field with second precision. + */ +export const TimestampSeconds = timestampBrand("second"); + +/** + * A timestamp field with microsecond precision. + */ +export const TimestampMicroseconds = timestampBrand("microsecond"); + +/** + * A timestamp field with nanosecond precision. + */ +export const TimestampNanoseconds = timestampBrand("nanosecond"); + +// --------------------------------------------------------------------------- +// Compiler +// --------------------------------------------------------------------------- + +const isFieldList = ( + input: StreamSchemaInput, +): input is StreamSchemaFieldList => + typeof input === "object" && + input !== null && + !(input as { ast?: unknown }).ast && + Array.isArray((input as StreamSchemaFieldList).fields); + +const isSchema = (input: unknown): input is S.Top => + typeof input === "object" && + input !== null && + (input as { ast?: unknown }).ast !== undefined; + +const isFieldRecord = (input: StreamSchemaInput): input is S.Struct.Fields => { + if (typeof input !== "object" || input === null) return false; + if (isFieldList(input)) return false; + if (isSchema(input)) return false; + // Every value must itself be a Schema (carry an `ast` property). The + // raw field-list and Schema shapes are excluded above; this leaves the + // ergonomic shape `{ user_id: SQL.String, ... }`. + for (const key of Object.keys(input)) { + if (!isSchema((input as Record)[key])) { + return false; + } + } + return true; +}; + +const decl = (ast: AST.AST) => + AST.isDeclaration(ast) + ? ((ast.annotations as { typeConstructor?: { _tag?: string } }) + ?.typeConstructor?._tag ?? undefined) + : undefined; + +interface CompileCtx { + path: ReadonlyArray; +} + +const fail = (ctx: CompileCtx, tag: string, message: string) => + Effect.fail( + new UnsupportedStreamSchemaNode({ message, path: ctx.path, tag }), + ); + +const compileType = ( + ast: AST.AST, + ctx: CompileCtx, +): Effect.Effect => + Effect.gen(function* () { + // Unions: tolerate `T | undefined` (the shape Schema.optional produces on + // the value side). Anything else with multiple non-undefined members is + // genuinely ambiguous for the Cloudflare wire format. + if (AST.isUnion(ast)) { + const nonUndefined = ast.types.filter((t) => !AST.isUndefined(t)); + if (nonUndefined.length === 1) { + return yield* compileType(nonUndefined[0]!, ctx); + } + return yield* fail( + ctx, + "Union", + `Unsupported union at ${ctx.path.join(".") || ""}: ` + + `Cloudflare stream schemas don't have a tagged-union representation. ` + + `Use Schema.optional for nullable fields.`, + ); + } + + const explicitTag = readCloudflareTag(ast); + if (explicitTag) { + switch (explicitTag) { + case "int32": + case "int64": + case "float32": + case "float64": + return { type: explicitTag }; + case "timestamp_ms": + return { type: "timestamp", unit: "millisecond" }; + case "timestamp_s": + return { type: "timestamp", unit: "second" }; + case "timestamp_us": + return { type: "timestamp", unit: "microsecond" }; + case "timestamp_ns": + return { type: "timestamp", unit: "nanosecond" }; + } + } + + if (AST.isString(ast)) return { type: "string" }; + if (AST.isBoolean(ast)) return { type: "bool" }; + if (AST.isNumber(ast)) return { type: "float64" }; + + if (AST.isArrays(ast)) { + // `Arrays` AST has a `rest` element list — the array's element type lives + // at index 0 for a homogeneous Schema.Array. + const rest = (ast as unknown as { rest?: ReadonlyArray }).rest; + const elementAst = rest && rest.length > 0 ? rest[0] : undefined; + if (!elementAst) { + return yield* fail( + ctx, + "Arrays", + `Empty Schema.Array at ${ctx.path.join(".") || ""}: ` + + `Cloudflare list fields require a single homogeneous element type.`, + ); + } + const inner = yield* compileType(elementAst, { + path: [...ctx.path, "[]"], + }); + return { + type: "list", + // The Cloudflare wire format nests the element under `items`; the + // distilled type still permits `unknown` here, so a structural cast + // is unavoidable. + items: inner as unknown, + } as StreamSchemaField; + } + + if (AST.isObjects(ast)) { + // Treat objects with index signatures or no properties as opaque JSON. + const obj = ast as AST.Objects; + if ( + obj.indexSignatures.length > 0 || + obj.propertySignatures.length === 0 + ) { + return { type: "json" }; + } + const fields: StreamSchemaField[] = []; + for (const ps of obj.propertySignatures) { + fields.push( + yield* compileProperty(ps, { path: [...ctx.path, ps.name] }), + ); + } + return { type: "struct", fields } as unknown as StreamSchemaField; + } + + // Effect schemas surface `Date` / `Uint8Array` as declarations; map them + // to Cloudflare's native primitives so callers don't need to brand + // bytes/dates explicitly. + const tc = decl(ast); + if (tc === "Date") return { type: "timestamp", unit: "millisecond" }; + if (tc === "Uint8Array") return { type: "binary" }; + + // Schema.Unknown / Schema.Any / Schema.Object → wire as JSON blob. + // Cloudflare keeps these as the `value` column when the stream is + // unstructured, but inside a struct field they round-trip as JSON. + const tag = (ast as { _tag?: string })._tag; + if (tag === "Unknown" || tag === "Any" || tag === "ObjectKeyword") { + return { type: "json" }; + } + + return yield* fail( + ctx, + (ast as { _tag?: string })._tag ?? "Unknown", + `Unsupported AST node ${(ast as { _tag?: string })._tag ?? ""} ` + + `at ${ctx.path.join(".") || ""}. Use Schema.Struct, ` + + `Schema.String/Number/Boolean, Schema.Array, Schema.Date, ` + + `Schema.optional, or the Cloudflare brand helpers ` + + `(Int32/Int64/Float32/Float64/Timestamp).`, + ); + }); + +const compileProperty = ( + ps: AST.PropertySignature, + ctx: CompileCtx, +): Effect.Effect => + Effect.gen(function* () { + if (typeof ps.name !== "string") { + return yield* fail( + ctx, + "PropertySignature", + `Cloudflare stream fields require string property keys; got ${String(ps.name)} ` + + `at ${ctx.path.join(".") || ""}.`, + ); + } + const compiled = (yield* compileType(ps.type, ctx)) as Record< + string, + unknown + >; + const required = !AST.isOptional(ps.type); + return { + ...compiled, + name: ps.name, + sqlName: ps.name, + required, + } as StreamSchemaField; + }); + +/** + * Compile an `effect/Schema` struct (or a plain record of `Schema` values) + * into Cloudflare's stream-schema field list. Callers can also pass a raw + * `{ fields: [...] }` payload to bypass the compiler. + * + * @example Compile from a field record (most ergonomic) + * ```typescript + * const fields = yield* compileStreamSchema({ + * user_id: SQL.String, + * amount: SQL.optional(SQL.Number), + * }); + * ``` + * + * @example Compile from a Schema.Struct value + * ```typescript + * const fields = yield* compileStreamSchema( + * SQL.Struct({ + * user_id: SQL.String, + * amount: SQL.optional(SQL.Number), + * }), + * ); + * ``` + */ +export const compileStreamSchema = ( + input: StreamSchemaInput, +): Effect.Effect => + Effect.gen(function* () { + if (isFieldList(input)) { + return input; + } + // Accept a plain field record (`{ user_id: SQL.String, ... }`) by + // wrapping it in Schema.Struct(...) so the user never has to write + // it themselves. + const schema: S.Top = isFieldRecord(input) ? S.Struct(input) : input; + const ast = schema.ast; + if (!AST.isObjects(ast)) { + return yield* fail( + { path: [] }, + (ast as { _tag?: string })._tag ?? "NonObject", + `Top-level stream schema must be a struct (a Schema.Struct or a ` + + `plain field record); got ` + + `${(ast as { _tag?: string })._tag ?? ""}.`, + ); + } + const obj = ast as AST.Objects; + const fields: StreamSchemaField[] = []; + for (const ps of obj.propertySignatures) { + fields.push(yield* compileProperty(ps, { path: [ps.name] })); + } + return { fields }; + }); diff --git a/packages/alchemy/src/Cloudflare/Pipelines/index.ts b/packages/alchemy/src/Cloudflare/Pipelines/index.ts new file mode 100644 index 000000000..a8709a912 --- /dev/null +++ b/packages/alchemy/src/Cloudflare/Pipelines/index.ts @@ -0,0 +1,58 @@ +// Re-export the user-facing Pipelines surface flat from `Cloudflare/index.ts`. +// `SinkResource` is intentionally excluded — `Sink.Resource` is reachable for +// internal `Provider.collection` registration via the explicit file import in +// `Cloudflare/Providers.ts`. + +export { + Pipeline, + PipelineProvider, + pipelineSql, + isPipeline, + type PipelineProps, + type PipelineTableInfo, +} from "./Pipeline.ts"; +export { + Sink, + SinkProvider, + isSink, + type R2DataCatalogSinkProps, + type R2SinkProps, + type SinkBucketRef, + type SinkDataCatalogCredentials, + type SinkDataCatalogFormat, + type SinkFileNaming, + type SinkPartitioning, + type SinkProps, + type SinkR2Credentials, + type SinkR2Format, + type SinkRollingPolicy, +} from "./Sink.ts"; +export { + Stream, + StreamProvider, + isStream, + type StreamFormat, + type StreamHttpSettings, + type StreamProps, + type StreamSchema, + type StreamWorkerBindingSettings, +} from "./Stream.ts"; +export { + StreamBinding, + StreamBindingLive, + StreamBindingPolicy, + StreamBindingPolicyLive, + StreamSendError, + type PipelinesSendBinding, + type StreamSender, +} from "./StreamBinding.ts"; +// Schema brand helpers (`Int32` / `Int64` / `Float32` / `Float64` / `Timestamp`) +// live in `Cloudflare/SQL.ts` — re-import them as +// `import * as SQL from "alchemy/Cloudflare/SQL"`. +export { + UnsupportedStreamSchemaNode, + compileStreamSchema, + type StreamSchemaField, + type StreamSchemaFieldList, + type StreamSchemaInput, +} from "./StreamSchema.ts"; diff --git a/packages/alchemy/src/Cloudflare/Providers.ts b/packages/alchemy/src/Cloudflare/Providers.ts index d260ef007..eabe83783 100644 --- a/packages/alchemy/src/Cloudflare/Providers.ts +++ b/packages/alchemy/src/Cloudflare/Providers.ts @@ -28,6 +28,7 @@ import * as Email from "./Email/index.ts"; import * as Hyperdrive from "./Hyperdrive/index.ts"; import * as Images from "./Images/index.ts"; import * as KV from "./KV/index.ts"; +import * as Pipelines from "./Pipelines/index.ts"; import * as Queue from "./Queue/index.ts"; import * as R2 from "./R2/index.ts"; import * as RateLimit from "./RateLimit/index.ts"; @@ -79,6 +80,10 @@ export const providers = () => Images.ImagesBindingPolicy, KV.KVNamespace, KV.KVNamespaceBindingPolicy, + Pipelines.Pipeline, + Pipelines.Sink.Resource, + Pipelines.Stream, + Pipelines.StreamBindingPolicy, Queue.Queue, Queue.QueueBindingPolicy, Queue.QueueConsumer, @@ -134,6 +139,10 @@ export const providers = () => Images.ImagesBindingPolicyLive, KV.KVNamespaceBindingPolicyLive, KV.KVNamespaceProvider(), + Pipelines.PipelineProvider(), + Pipelines.SinkProvider(), + Pipelines.StreamProvider(), + Pipelines.StreamBindingPolicyLive, Queue.QueueBindingPolicyLive, Queue.QueueEventSourcePolicyLive, Queue.QueueProvider(), diff --git a/packages/alchemy/src/Cloudflare/SQL.ts b/packages/alchemy/src/Cloudflare/SQL.ts new file mode 100644 index 000000000..e9ec21c96 --- /dev/null +++ b/packages/alchemy/src/Cloudflare/SQL.ts @@ -0,0 +1,33 @@ +/** + * Schema authoring kit for Cloudflare Pipelines streams and sinks. + * + * Re-exports `effect/Schema` so a single import covers every type Cloudflare + * understands, and layers on the precision/temporal brand helpers Cloudflare + * distinguishes that plain `Schema.Number` / `Schema.Date` cannot express. + * + * @example Defining a structured stream schema + * ```typescript + * import * as SQL from "alchemy/Cloudflare/SQL"; + * + * const Events = SQL.Struct({ + * user_id: SQL.String, + * count: SQL.Int64, + * at: SQL.Timestamp, + * tags: SQL.Array(SQL.String), + * meta: SQL.optional(SQL.Struct({ source: SQL.String })), + * }); + * + * const events = yield* Cloudflare.Stream("Events", { schema: Events }); + * ``` + */ +export * from "effect/Schema"; +export { + Float32, + Float64, + Int32, + Int64, + Timestamp, + TimestampMicroseconds, + TimestampNanoseconds, + TimestampSeconds, +} from "./Pipelines/StreamSchema.ts"; diff --git a/packages/alchemy/src/Cloudflare/Workers/WorkerAsyncBindings.ts b/packages/alchemy/src/Cloudflare/Workers/WorkerAsyncBindings.ts index b0027542d..4a2a7b6dd 100644 --- a/packages/alchemy/src/Cloudflare/Workers/WorkerAsyncBindings.ts +++ b/packages/alchemy/src/Cloudflare/Workers/WorkerAsyncBindings.ts @@ -17,6 +17,7 @@ import { isHyperdrive } from "../Hyperdrive/Hyperdrive.ts"; import { getHyperdriveDevOrigin } from "../Hyperdrive/HyperdriveBinding.ts"; import { isImages } from "../Images/Images.ts"; import { isKVNamespace } from "../KV/KVNamespace.ts"; +import { isStream as isPipelinesStream } from "../Pipelines/Stream.ts"; import { isQueue } from "../Queue/Queue.ts"; import { isR2Bucket } from "../R2/R2Bucket.ts"; import { isRateLimit } from "../RateLimit/RateLimit.ts"; @@ -189,6 +190,12 @@ const toBinding = ( name: bindingName, queueName: binding.queueName, }; + } else if (isPipelinesStream(binding)) { + return { + type: "pipelines", + name: bindingName, + pipeline: binding.streamId, + }; } else if (isAiGateway(binding)) { return { type: "ai", diff --git a/packages/alchemy/src/Cloudflare/Workers/WorkerBinding.ts b/packages/alchemy/src/Cloudflare/Workers/WorkerBinding.ts index d9c384a07..f41ab7d5d 100644 --- a/packages/alchemy/src/Cloudflare/Workers/WorkerBinding.ts +++ b/packages/alchemy/src/Cloudflare/Workers/WorkerBinding.ts @@ -15,6 +15,7 @@ import { SendEmail } from "../Email/SendEmail.ts"; import { Hyperdrive } from "../Hyperdrive/Hyperdrive.ts"; import { Images } from "../Images/Images.ts"; import type { KVNamespace } from "../KV/KVNamespace.ts"; +import type { Stream as PipelinesStream } from "../Pipelines/Stream.ts"; import type { Queue } from "../Queue/Queue.ts"; import type { R2Bucket } from "../R2/R2Bucket.ts"; import type { RateLimit } from "../RateLimit/RateLimit.ts"; @@ -47,6 +48,7 @@ export type WorkerBindingResource = | R2Bucket | D1Database | KVNamespace + | PipelinesStream | Queue | AiGateway | AnalyticsEngineDataset diff --git a/packages/alchemy/src/Cloudflare/index.ts b/packages/alchemy/src/Cloudflare/index.ts index bbb358131..fe753d01e 100644 --- a/packages/alchemy/src/Cloudflare/index.ts +++ b/packages/alchemy/src/Cloudflare/index.ts @@ -13,6 +13,7 @@ export * from "./Email/index.ts"; export * from "./Hyperdrive/index.ts"; export * from "./Images/index.ts"; export * from "./KV/index.ts"; +export * from "./Pipelines/index.ts"; export * from "./Providers.ts"; export * from "./Queue/index.ts"; export * from "./R2/index.ts"; diff --git a/packages/alchemy/test/Cloudflare/Pipelines/Pipeline.test.ts b/packages/alchemy/test/Cloudflare/Pipelines/Pipeline.test.ts new file mode 100644 index 000000000..7a7b4d9aa --- /dev/null +++ b/packages/alchemy/test/Cloudflare/Pipelines/Pipeline.test.ts @@ -0,0 +1,118 @@ +import * as Cloudflare from "@/Cloudflare"; +import { CloudflareEnvironment } from "@/Cloudflare/CloudflareEnvironment"; +import * as SQL from "@/Cloudflare/SQL"; +import * as Test from "@/Test/Vitest"; +import * as pipelines from "@distilled.cloud/cloudflare/pipelines"; +import { expect } from "@effect/vitest"; +import * as Effect from "effect/Effect"; +import { MinimumLogLevel } from "effect/References"; + +const { test } = Test.make({ providers: Cloudflare.providers() }); + +const logLevel = Effect.provideService( + MinimumLogLevel, + process.env.DEBUG ? "Debug" : "Info", +); + +test.provider( + "stream + r2 sink + pipeline smoke test", + (stack) => + Effect.gen(function* () { + const { accountId } = yield* CloudflareEnvironment; + + yield* stack.destroy(); + + const result = yield* stack.deploy( + Effect.gen(function* () { + const bucket = yield* Cloudflare.R2Bucket("Lake"); + + const events = yield* Cloudflare.Stream("Events", { + schema: { + user_id: SQL.String, + amount: SQL.Float64, + }, + }); + + const sink = yield* Cloudflare.Sink("Lakehouse", { + type: "r2", + bucket, + format: { type: "parquet", compression: "zstd" }, + }); + + const pipeline = yield* Cloudflare.Pipeline("Ingest", { + sql: Cloudflare.pipelineSql` + INSERT INTO ${sink} + SELECT user_id, amount FROM ${events}`, + }); + + return { bucket, events, sink, pipeline }; + }), + ); + + expect(result.events.streamId).toBeDefined(); + expect(result.sink.sinkId).toBeDefined(); + expect(result.sink.sinkType).toBe("r2"); + expect(result.pipeline.pipelineId).toBeDefined(); + // Pipeline.sql is the rendered, interpolated SQL. + expect(result.pipeline.sql).toContain(result.sink.sinkName); + expect(result.pipeline.sql).toContain(result.events.streamName); + + const observed = yield* pipelines.getV1Pipeline({ + accountId, + pipelineId: result.pipeline.pipelineId, + }); + expect(observed.id).toBe(result.pipeline.pipelineId); + const tableNames = observed.tables.map((t) => t.name); + expect(tableNames).toContain(result.events.streamName); + expect(tableNames).toContain(result.sink.sinkName); + + yield* stack.destroy(); + }).pipe(logLevel), + { timeout: 240_000 }, +); + +test.provider( + "sql change replaces the pipeline", + (stack) => + Effect.gen(function* () { + yield* stack.destroy(); + + const initial = yield* stack.deploy( + Effect.gen(function* () { + const bucket = yield* Cloudflare.R2Bucket("Lake"); + const events = yield* Cloudflare.Stream("Events"); + const sink = yield* Cloudflare.Sink("Lakehouse", { + type: "r2", + bucket, + }); + const pipeline = yield* Cloudflare.Pipeline("Ingest", { + sql: Cloudflare.pipelineSql`INSERT INTO ${sink} SELECT * FROM ${events}`, + }); + return { pipeline }; + }), + ); + + const replaced = yield* stack.deploy( + Effect.gen(function* () { + const bucket = yield* Cloudflare.R2Bucket("Lake"); + const events = yield* Cloudflare.Stream("Events"); + const sink = yield* Cloudflare.Sink("Lakehouse", { + type: "r2", + bucket, + }); + const pipeline = yield* Cloudflare.Pipeline("Ingest", { + sql: Cloudflare.pipelineSql` + INSERT INTO ${sink} SELECT * FROM ${events} WHERE user_id IS NOT NULL`, + }); + return { pipeline }; + }), + ); + + expect(replaced.pipeline.pipelineId).not.toBe( + initial.pipeline.pipelineId, + ); + + yield* stack.destroy(); + }).pipe(logLevel), + { timeout: 240_000 }, +); diff --git a/packages/alchemy/test/Cloudflare/Pipelines/RoundTrip.test.ts b/packages/alchemy/test/Cloudflare/Pipelines/RoundTrip.test.ts new file mode 100644 index 000000000..178770c4f --- /dev/null +++ b/packages/alchemy/test/Cloudflare/Pipelines/RoundTrip.test.ts @@ -0,0 +1,133 @@ +import * as Cloudflare from "@/Cloudflare"; +import { CloudflareEnvironment } from "@/Cloudflare/CloudflareEnvironment"; +import * as Test from "@/Test/Vitest"; +import * as pipelines from "@distilled.cloud/cloudflare/pipelines"; +import { expect } from "@effect/vitest"; +import * as Effect from "effect/Effect"; +import { MinimumLogLevel } from "effect/References"; +import * as Schedule from "effect/Schedule"; +import * as HttpClient from "effect/unstable/http/HttpClient"; +import * as HttpClientRequest from "effect/unstable/http/HttpClientRequest"; +import PipelinesRoundTripWorker, { Events } from "./round-trip-worker.ts"; + +const { test } = Test.make({ providers: Cloudflare.providers() }); + +const logLevel = Effect.provideService( + MinimumLogLevel, + process.env.DEBUG ? "Debug" : "Info", +); + +/** + * End-to-end Pipelines round-trip via a deployed Cloudflare Worker. + * + * Stack (defined in {@link "./round-trip-worker.ts"}): + * + * - `Lake` — `Cloudflare.R2Bucket`. + * - `Events` — `Cloudflare.Stream` with an `effect/Schema` struct. + * - `Lakehouse` — `Cloudflare.Sink` (R2) with auto-provisioned credentials. + * - `Ingest` — `Cloudflare.Pipeline` wiring Events → Lakehouse via SQL. + * - `PipelinesRoundTripWorker` — binds `Events` via `Stream.bind(...)` and + * exposes `POST /send` so the test can produce records, plus + * `GET /health` so we can confirm the worker came up. + * + * The test deploys, then redeploys without changes to prove the create → + * update path is idempotent for every resource in the graph, then exercises + * the producer binding via HTTP and confirms the live stream survives. + */ +test.provider( + "deploy → redeploy → send via Stream.bind → tear down", + (stack) => + Effect.gen(function* () { + const { accountId } = yield* CloudflareEnvironment; + + yield* stack.destroy(); + + // First deploy — brings up R2, Stream, Sink (+ auto AccountApiToken), + // Pipeline, and the Worker. + const first = yield* stack.deploy( + Effect.gen(function* () { + const worker = yield* PipelinesRoundTripWorker; + const events = yield* Events; + return { + url: worker.url, + streamId: events.streamId, + streamName: events.streamName, + }; + }), + ); + + expect(first.url).toBeTypeOf("string"); + expect(first.streamId).toBeTypeOf("string"); + + // Second deploy with the same inputs — every resource should + // converge to a no-op (or in-place update), and ids must stay the + // same. This is the create/update side of the matrix. + const second = yield* stack.deploy( + Effect.gen(function* () { + const worker = yield* PipelinesRoundTripWorker; + const events = yield* Events; + return { + url: worker.url, + streamId: events.streamId, + }; + }), + ); + + expect(second.url).toEqual(first.url); + expect(second.streamId).toEqual(first.streamId); + + // Confirm the live stream actually exists in Cloudflare. + const observedStream = yield* pipelines.getStream({ + accountId, + streamId: first.streamId, + }); + expect(observedStream.id).toEqual(first.streamId); + expect(observedStream.workerBinding.enabled).toBe(true); + expect(observedStream.name).toEqual(first.streamName); + + // Locate the pipeline by stream and confirm its `tables` list + // resolves the stream + sink ends. + const pipelineList = yield* pipelines.listV1Pipeline({ accountId }); + const ingest = pipelineList.result.find((p) => + p.sql.includes(first.streamName!), + ); + expect(ingest).toBeDefined(); + const observedPipeline = yield* pipelines.getV1Pipeline({ + accountId, + pipelineId: ingest!.id, + }); + const tableNames = observedPipeline.tables.map((t) => t.name); + expect(tableNames).toContain(first.streamName); + + // Drive the runtime path: produce a few records via the deployed + // worker's Stream binding. The first call is retried while + // workers.dev propagates the new URL. + const baseUrl = first.url as string; + const records = [ + { user_id: "alpha", amount: 11.5 }, + { user_id: "beta", amount: 42 }, + { user_id: "gamma", amount: 1234.56 }, + ]; + const sendResponse = yield* HttpClient.execute( + HttpClientRequest.post(`${baseUrl}/send`).pipe( + HttpClientRequest.bodyJsonUnsafe(records), + ), + ).pipe( + Effect.flatMap((res) => + res.status === 202 + ? Effect.succeed(res) + : Effect.fail(new Error(`Worker /send not ready: ${res.status}`)), + ), + Effect.retry({ + schedule: Schedule.exponential("500 millis").pipe( + Schedule.both(Schedule.recurs(20)), + ), + }), + ); + const body = (yield* sendResponse.json) as { sent: number }; + expect(body.sent).toBe(records.length); + + yield* stack.destroy(); + }).pipe(logLevel), + { timeout: 360_000 }, +); diff --git a/packages/alchemy/test/Cloudflare/Pipelines/Stream.test.ts b/packages/alchemy/test/Cloudflare/Pipelines/Stream.test.ts new file mode 100644 index 000000000..f9a3a0add --- /dev/null +++ b/packages/alchemy/test/Cloudflare/Pipelines/Stream.test.ts @@ -0,0 +1,114 @@ +import * as Cloudflare from "@/Cloudflare"; +import { CloudflareEnvironment } from "@/Cloudflare/CloudflareEnvironment"; +import * as SQL from "@/Cloudflare/SQL"; +import * as Test from "@/Test/Vitest"; +import * as pipelines from "@distilled.cloud/cloudflare/pipelines"; +import { expect } from "@effect/vitest"; +import * as Effect from "effect/Effect"; +import { MinimumLogLevel } from "effect/References"; + +const { test } = Test.make({ providers: Cloudflare.providers() }); + +const logLevel = Effect.provideService( + MinimumLogLevel, + process.env.DEBUG ? "Debug" : "Info", +); + +test.provider("create and delete an unstructured stream", (stack) => + Effect.gen(function* () { + const { accountId } = yield* CloudflareEnvironment; + + yield* stack.destroy(); + + const stream = yield* stack.deploy( + Effect.gen(function* () { + return yield* Cloudflare.Stream("Events"); + }), + ); + + expect(stream.streamId).toBeDefined(); + expect(stream.streamName).toContain("events"); + expect(stream.http.enabled).toBe(true); + expect(stream.workerBinding.enabled).toBe(true); + + const observed = yield* pipelines.getStream({ + accountId, + streamId: stream.streamId, + }); + expect(observed.id).toBe(stream.streamId); + expect(observed.name).toBe(stream.streamName); + + yield* stack.destroy(); + }).pipe(logLevel), +); + +test.provider("patches http auth + workerBinding without replace", (stack) => + Effect.gen(function* () { + const { accountId } = yield* CloudflareEnvironment; + + yield* stack.destroy(); + + const initial = yield* stack.deploy( + Effect.gen(function* () { + return yield* Cloudflare.Stream("Events", { + http: { enabled: true, authentication: false }, + workerBinding: { enabled: true }, + }); + }), + ); + + expect(initial.http.authentication).toBe(false); + + const updated = yield* stack.deploy( + Effect.gen(function* () { + return yield* Cloudflare.Stream("Events", { + http: { enabled: true, authentication: true }, + workerBinding: { enabled: false }, + }); + }), + ); + + expect(updated.streamId).toBe(initial.streamId); + expect(updated.http.authentication).toBe(true); + expect(updated.workerBinding.enabled).toBe(false); + + const observed = yield* pipelines.getStream({ + accountId, + streamId: updated.streamId, + }); + expect(observed.http.authentication).toBe(true); + expect(observed.workerBinding.enabled).toBe(false); + + yield* stack.destroy(); + }).pipe(logLevel), +); + +test.provider("schema change triggers replace", (stack) => + Effect.gen(function* () { + yield* stack.destroy(); + + const initial = yield* stack.deploy( + Effect.gen(function* () { + return yield* Cloudflare.Stream("Events", { + schema: { user_id: SQL.String }, + }); + }), + ); + + const replaced = yield* stack.deploy( + Effect.gen(function* () { + return yield* Cloudflare.Stream("Events", { + schema: { + user_id: SQL.String, + amount: SQL.Float64, + }, + }); + }), + ); + + // Schema is immutable, so the stream id must change on replace. + expect(replaced.streamId).not.toBe(initial.streamId); + + yield* stack.destroy(); + }).pipe(logLevel), +); diff --git a/packages/alchemy/test/Cloudflare/Pipelines/StreamSchema.test.ts b/packages/alchemy/test/Cloudflare/Pipelines/StreamSchema.test.ts new file mode 100644 index 000000000..d407ca0d4 --- /dev/null +++ b/packages/alchemy/test/Cloudflare/Pipelines/StreamSchema.test.ts @@ -0,0 +1,204 @@ +import { compileStreamSchema } from "@/Cloudflare/Pipelines/StreamSchema"; +import * as SQL from "@/Cloudflare/SQL"; +import { describe, expect, test } from "@effect/vitest"; +import * as Effect from "effect/Effect"; +import * as Schema from "effect/Schema"; + +const { + Float32, + Float64, + Int32, + Int64, + Timestamp, + TimestampMicroseconds, + TimestampNanoseconds, + TimestampSeconds, +} = SQL; + +const run = (effect: Effect.Effect) => + Effect.runPromise(effect as Effect.Effect); + +describe("compileStreamSchema", () => { + test("compiles scalars from a plain field record", async () => { + const fields = await run( + compileStreamSchema({ + s: Schema.String, + b: Schema.Boolean, + n: Schema.Number, + }), + ); + expect(fields).toEqual({ + fields: [ + { type: "string", name: "s", sqlName: "s", required: true }, + { type: "bool", name: "b", sqlName: "b", required: true }, + { type: "float64", name: "n", sqlName: "n", required: true }, + ], + }); + }); + + test("accepts a Schema.Struct value too", async () => { + const fields = await run( + compileStreamSchema( + Schema.Struct({ + s: Schema.String, + }), + ), + ); + expect(fields.fields).toEqual([ + { type: "string", name: "s", sqlName: "s", required: true }, + ]); + }); + + test("optional fields produce required: false", async () => { + const fields = await run( + compileStreamSchema( + Schema.Struct({ + required: Schema.String, + maybe: Schema.optional(Schema.String), + }), + ), + ); + expect(fields.fields).toEqual([ + { type: "string", name: "required", sqlName: "required", required: true }, + { type: "string", name: "maybe", sqlName: "maybe", required: false }, + ]); + }); + + test("compiles Schema.Array as list with element type", async () => { + const fields = await run( + compileStreamSchema( + Schema.Struct({ + tags: Schema.Array(Schema.String), + }), + ), + ); + expect(fields.fields?.[0]).toMatchObject({ + type: "list", + name: "tags", + sqlName: "tags", + required: true, + items: { type: "string" }, + }); + }); + + test("compiles nested Schema.Struct as struct with recursive fields", async () => { + const fields = await run( + compileStreamSchema( + Schema.Struct({ + meta: Schema.Struct({ + source: Schema.String, + priority: Schema.optional(Schema.Number), + }), + }), + ), + ); + expect(fields.fields?.[0]).toMatchObject({ + type: "struct", + name: "meta", + sqlName: "meta", + required: true, + fields: [ + { type: "string", name: "source", sqlName: "source", required: true }, + { + type: "float64", + name: "priority", + sqlName: "priority", + required: false, + }, + ], + }); + }); + + test("int/float brand helpers emit precision-specific types", async () => { + const fields = await run( + compileStreamSchema( + Schema.Struct({ + i32: Int32, + i64: Int64, + f32: Float32, + f64: Float64, + }), + ), + ); + const byName = Object.fromEntries( + (fields.fields ?? []).map((f: any) => [f.name, f.type]), + ); + expect(byName).toEqual({ + i32: "int32", + i64: "int64", + f32: "float32", + f64: "float64", + }); + }); + + test("Timestamp brands emit unit-specific timestamps", async () => { + const fields = await run( + compileStreamSchema( + Schema.Struct({ + ms: Timestamp, + s: TimestampSeconds, + us: TimestampMicroseconds, + ns: TimestampNanoseconds, + }), + ), + ); + const byName = Object.fromEntries( + (fields.fields ?? []).map((f: any) => [f.name, f]), + ); + expect(byName.ms).toMatchObject({ type: "timestamp", unit: "millisecond" }); + expect(byName.s).toMatchObject({ type: "timestamp", unit: "second" }); + expect(byName.us).toMatchObject({ type: "timestamp", unit: "microsecond" }); + expect(byName.ns).toMatchObject({ type: "timestamp", unit: "nanosecond" }); + }); + + test("plain Schema.Date compiles to millisecond timestamp", async () => { + const fields = await run( + compileStreamSchema(Schema.Struct({ at: Schema.Date })), + ); + expect(fields.fields?.[0]).toMatchObject({ + type: "timestamp", + unit: "millisecond", + name: "at", + sqlName: "at", + required: true, + }); + }); + + test("Schema.Unknown compiles to json", async () => { + const fields = await run( + compileStreamSchema(Schema.Struct({ blob: Schema.Unknown })), + ); + expect(fields.fields?.[0]).toMatchObject({ + type: "json", + name: "blob", + sqlName: "blob", + required: true, + }); + }); + + test("raw field-list payload passes through unchanged", async () => { + const input = { + fields: [{ type: "string" as const, name: "x", required: true }], + }; + const out = await run(compileStreamSchema(input)); + expect(out).toBe(input); + }); + + test("non-struct top-level fails with a tagged error", async () => { + const result = await Effect.runPromiseExit( + compileStreamSchema(Schema.String as any), + ); + expect(result._tag).toBe("Failure"); + }); + + test("multi-branch union (not just T | undefined) fails with a tagged error", async () => { + const result = await Effect.runPromiseExit( + compileStreamSchema( + Schema.Struct({ + ambiguous: Schema.Union([Schema.String, Schema.Number]), + }) as any, + ), + ); + expect(result._tag).toBe("Failure"); + }); +}); diff --git a/packages/alchemy/test/Cloudflare/Pipelines/round-trip-worker.ts b/packages/alchemy/test/Cloudflare/Pipelines/round-trip-worker.ts new file mode 100644 index 000000000..68b240a50 --- /dev/null +++ b/packages/alchemy/test/Cloudflare/Pipelines/round-trip-worker.ts @@ -0,0 +1,73 @@ +import * as Cloudflare from "@/Cloudflare/index.ts"; +import * as SQL from "@/Cloudflare/SQL.ts"; +import * as Effect from "effect/Effect"; +import { HttpServerRequest } from "effect/unstable/http/HttpServerRequest"; +import * as HttpServerResponse from "effect/unstable/http/HttpServerResponse"; + +/** + * R2 bucket used as the sink target. Declared once at the module level so + * every yield reuses the same logical id across stack-deploy iterations. + */ +export const Lake = Cloudflare.R2Bucket("Lake"); + +/** + * Structured event stream the worker writes to. The schema is what + * `Stream.bind(...).send(...)` typechecks `records` against in user code. + */ +export const Events = Cloudflare.Stream("Events", { + schema: { + user_id: SQL.String, + amount: SQL.Float64, + }, +}); + +export const Sink = Cloudflare.Sink("Lakehouse", { + type: "r2", + bucket: Lake, + format: { type: "parquet", compression: "zstd" }, +}); + +export const Ingest = Cloudflare.Pipeline("Ingest", { + sql: Cloudflare.pipelineSql` + INSERT INTO ${Sink} + SELECT user_id, amount FROM ${Events}`, +}); + +/** + * End-to-end test fixture: a Worker that binds the {@link Events} stream + * via `Cloudflare.Stream.bind(...)`, declares the sink + pipeline siblings + * inline so the engine pulls in the whole graph, and exposes `POST /send` + * for the test to produce records. + */ +export default class PipelinesRoundTripWorker extends Cloudflare.Worker()( + "PipelinesRoundTripWorker", + { main: import.meta.filename }, + Effect.gen(function* () { + const send = yield* Cloudflare.Stream.bind(Events); + + return { + fetch: Effect.gen(function* () { + const request = yield* HttpServerRequest; + const url = new URL(request.url, "http://x"); + + if (request.method === "POST" && url.pathname === "/send") { + const body = (yield* request.json) as + | { user_id: string; amount: number } + | ReadonlyArray<{ user_id: string; amount: number }>; + const records = Array.isArray(body) ? body : [body]; + yield* send.sendBatch(records).pipe(Effect.orDie); + return yield* HttpServerResponse.json( + { sent: records.length }, + { status: 202 }, + ); + } + + if (request.method === "GET" && url.pathname === "/health") { + return yield* HttpServerResponse.json({ ok: true }); + } + + return HttpServerResponse.text("Not Found", { status: 404 }); + }), + }; + }).pipe(Effect.provide(Cloudflare.StreamBindingLive)), +) {}