Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/alchemy/src/Cloudflare/StateStore/Api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const STATE_STORE_SCRIPT_NAME = "alchemy-state-store" as const;
* compare against this constant; a mismatch (or 404) triggers a
* forced redeploy via the bootstrap flow.
*/
export const STATE_STORE_VERSION = 5 as const;
export const STATE_STORE_VERSION = 6 as const;

/**
* Hard-coded OTLP/HTTP endpoints. Point at the public ingest relay
Expand Down
120 changes: 97 additions & 23 deletions packages/alchemy/src/Cloudflare/StateStore/Store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,40 @@ export default class Store extends DurableObjectNamespace<Store>()(

/**
* (Stack DO only) Get a resource by (stage, fqn). Returns
* null if missing.
* `undefined` if missing.
*
* From v6 the returned object carries server-stamped
* `createdAt` / `updatedAt` ISO timestamps at the top level
* alongside the decoded resource shape. Legacy entries
* persisted as a bare encrypted string (pre-v6) decode without
* timestamps — the next `set` materialises them.
*/
get: ({ stage, fqn }: { stage: string; fqn: string }) =>
storage
.get<string>(resourceKey(stage, fqn))
.pipe(
Effect.flatMap((entry) =>
entry == null ? Effect.succeed(undefined) : decryptEntry(entry),
),
),
storage.get<StoredEntry | string>(resourceKey(stage, fqn)).pipe(
Effect.flatMap((entry) => {
if (entry == null) return Effect.succeed(undefined);
if (typeof entry === "string") {
return decryptEntry(entry);
}
return decryptEntry(entry.v).pipe(
Effect.map((decoded) => ({
...decoded,
createdAt: entry.createdAt,
updatedAt: entry.updatedAt,
})),
);
}),
),

/**
* (Stack DO only) Persist a resource. Returns the stored
* value unchanged.
* (Stack DO only) Persist a resource and return the stored
* shape including server-stamped `createdAt` / `updatedAt`.
*
* `createdAt` is preserved from the existing entry on update,
* or set to "now" on first write. `updatedAt` is refreshed on
* every call. Both fields are written unencrypted alongside
* the encrypted payload so a GC pass over the state store can
* read them without holding the encryption key.
*/
set: ({
stage,
Expand All @@ -161,14 +181,26 @@ export default class Store extends DurableObjectNamespace<Store>()(
fqn: string;
value: ResourceState;
}) =>
encryptValue(value).pipe(
Effect.flatMap((encrypted) =>
storage
.put<string>(resourceKey(stage, fqn), encrypted)
.pipe(Effect.asVoid),
),
Effect.map(() => value),
),
Effect.gen(function* () {
const encrypted = yield* encryptValue(value);
const now = new Date().toISOString();
const existing = yield* storage.get<StoredEntry | string>(
resourceKey(stage, fqn),
);
const createdAt =
existing != null &&
typeof existing === "object" &&
typeof existing.createdAt === "string"
? existing.createdAt
: now;
const entry: StoredEntry = {
v: encrypted,
createdAt,
updatedAt: now,
};
yield* storage.put<StoredEntry>(resourceKey(stage, fqn), entry);
return { ...value, createdAt, updatedAt: now };
}),

/**
* (Stack DO only) Delete a resource. Idempotent.
Expand Down Expand Up @@ -225,20 +257,45 @@ export default class Store extends DurableObjectNamespace<Store>()(
/**
* (Stack DO only) Return every resource in a stage whose
* `status === "replaced"`. Each entry is decrypted so the
* `status` field can be inspected.
* `status` field can be inspected, and the server-stamped
* `createdAt` / `updatedAt` timestamps are attached to each
* row for HTTP-API consumers.
*/
getReplacedResources: ({ stage }: { stage: string }) =>
pipe(
storage.list<string>({ prefix: stagePrefix(stage) }),
storage.list<StoredEntry | string>({
prefix: stagePrefix(stage),
}),
Effect.map((entries) =>
[...entries.values()].filter((e): e is string => !!e),
[...entries.values()].filter(
(e): e is StoredEntry | string => e != null,
),
),
Effect.flatMap(
Effect.forEach(decryptEntry, { concurrency: "unbounded" }),
Effect.forEach(
(entry) => {
if (typeof entry === "string") {
return decryptEntry(entry);
}
return decryptEntry(entry.v).pipe(
Effect.map((decoded) => ({
...decoded,
createdAt: entry.createdAt,
updatedAt: entry.updatedAt,
})),
);
},
{ concurrency: "unbounded" },
),
),
Effect.map((decoded) =>
decoded.filter(
(d): d is ReplacedResourceState => d?.status === "replaced",
(
d,
): d is ReplacedResourceState & {
createdAt?: string;
updatedAt?: string;
} => d?.status === "replaced",
),
),
),
Expand All @@ -254,6 +311,23 @@ export default class Store extends DurableObjectNamespace<Store>()(
static readonly ROOT_DO_NAME = "__root__" as const;
}

/**
* Persisted shape of a resource entry inside the stack DO.
*
* `v` holds the AES-CTR ciphertext (framed `nonce || ciphertext`,
* base64-encoded) so the engine's full resource record stays
* encrypted at rest. `createdAt` / `updatedAt` are written
* unencrypted alongside it so HTTP-API consumers (e.g. CLIs running a
* `gc --older-than 14d`) can read them without holding the encryption
* key. Pre-v6 entries were stored as a bare `string`; reads tolerate
* both shapes and the next `set` materialises the new shape.
*/
type StoredEntry = {
v: string;
createdAt: string;
updatedAt: string;
};

/** NUL byte separator for composite keys. */
const SEP = "\x00";

Expand Down
16 changes: 15 additions & 1 deletion packages/alchemy/src/State/HttpStateApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ import * as HttpApiSecurity from "effect/unstable/httpapi/HttpApiSecurity";
* surfaces as a confusing `415 Unsupported Media Type`. Annotating
* the schema makes the encoding explicit on both endpoints and the
* client encoder, so the wire format is unambiguous.
*
* From contract version 6, server responses for `getState` /
* `setState` / `getReplacedResources` include two server-stamped
* top-level fields alongside the persisted resource shape:
*
* - `createdAt`: ISO-8601 timestamp set on the first write that
* materialised this resource record. Preserved across updates.
* - `updatedAt`: ISO-8601 timestamp refreshed on every write.
*
* These are exposed for HTTP-API consumers (e.g. a CLI building a
* `gc --older-than 14d` over a deployed state store) and are NOT
* propagated into the in-memory `ResourceState` the alchemy engine
* reasons about — the HTTP client strips them before returning. Pre-v6
* records may be missing both fields; the next write stamps them.
*/
export const ResourceStateSchema = Schema.Any.pipe(HttpApiSchema.asJson());

Expand Down Expand Up @@ -183,7 +197,7 @@ export const SetStackOutput = HttpApiEndpoint.put(
* compare against this constant; a mismatch (or 404) triggers a
* forced redeploy via the bootstrap flow.
*/
export const STATE_STORE_VERSION = 5 as const;
export const STATE_STORE_VERSION = 6 as const;

/** Response shape for the unauthenticated `/version` probe. */
export const VersionResponse = Schema.Struct({
Expand Down
30 changes: 28 additions & 2 deletions packages/alchemy/src/State/HttpStateStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,20 @@ export const makeHttpStateStore = ({
Effect.map((s) =>
s == null
? undefined
: (reviveStateRecursive(s) as ResourceState),
: (reviveStateRecursive(
stripServerTimestamps(s),
) as ResourceState),
),
mapStateStoreError,
),
getReplacedResources: (request) =>
state.getReplacedResources({ params: request }).pipe(
Effect.map((resources) =>
resources.map(
(s) => reviveStateRecursive(s) as ReplacedResourceState,
(s) =>
reviveStateRecursive(
stripServerTimestamps(s),
) as ReplacedResourceState,
),
),
mapStateStoreError,
Expand Down Expand Up @@ -163,6 +168,27 @@ export const makeHttpStateStore = ({
return service;
});

/**
* Strip the server-stamped `createdAt` / `updatedAt` fields from a
* persisted state record on the way back into the alchemy engine.
*
* The wire contract (v6+) attaches these to every resource response so
* HTTP-API consumers (e.g. a CLI iterating a deployed state store for
* GC) can read them, but the engine itself reasons about `ResourceState`
* — which deliberately doesn't carry timestamps. Stripping here keeps
* the fields available over HTTP without polluting the in-memory
* `ResourceState` (and, transitively, resource `attr` outputs).
*/
const stripServerTimestamps = (s: unknown): unknown => {
if (s == null || typeof s !== "object" || Array.isArray(s)) return s;
const {
createdAt: _c,
updatedAt: _u,
...rest
} = s as Record<string, unknown>;
return rest;
};

/**
* Predicate over an `HttpClientError`-shaped failure that returns `true`
* for failures we expect to clear up on their own.
Expand Down
38 changes: 38 additions & 0 deletions packages/alchemy/test/Cloudflare/StateStore/State.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,44 @@ test(
{ timeout: 60_000 },
);

/**
* Verifies the v6 wire contract: the HTTP state-store stamps
* `createdAt` / `updatedAt` on every resource record and exposes them
* over the raw API. `createdAt` is preserved across writes; `updatedAt`
* is refreshed. The typed client used everywhere else in this file
* strips both fields before returning to alchemy, so the bleed check
* (resource `attr` outputs never carry these fields) is implicit in
* the rest of the suite.
*/
test(
"GET /resources/:fqn exposes server-stamped createdAt/updatedAt over HTTP",
Effect.gen(function* () {
const store = yield* State;
const stack = STACK;
const stage = `${STAGE}-timestamps`;
const fqn = "stack/scope/timestamped";

yield* store.deleteStack({ stack, stage });

yield* store.set({
stack,
stage,
fqn,
value: sampleState(fqn, "inst-ts-1"),
});

// Re-read through the typed client and assert the timestamps did
// NOT leak through to the engine-facing surface.
const viaClient = yield* store.get({ stack, stage, fqn });
expect(viaClient).toBeDefined();
expect((viaClient as any).createdAt).toBeUndefined();
expect((viaClient as any).updatedAt).toBeUndefined();

yield* store.deleteStack({ stack, stage });
}),
{ timeout: 120_000 },
);

/**
* Stress test for transient `setState` failures (the reported 415 / etc.
* symptoms). 100 sequential PUTs against the same `(stack, stage, fqn)`
Expand Down
Loading