From a1d8d7b1d5be70fc8a66d686bdaa2e42ff991bda Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 10:46:01 -0700 Subject: [PATCH 1/9] feat(moq-data): add metadata-over-MoQ helpers (set + json) Add a new `moq-data` crate and `@moq/data` package providing helpers for sending metadata over MoQ tracks, mirroring the snapshot/delta machinery of moq-json. - `set`: a HashSet-like collection synced over a track with `+`/`-` delta encoding. Generic over any binary item via a small `Item` trait (Rust) or `Codec` (JS); `String`/`Vec`/`Bytes` and `stringCodec`/`bytesCodec` are provided. Each group is self-contained: frame 0 is an atomic snapshot (u32 count + length-prefixed items) and later frames are single ops (`+`/`-` byte then item bytes), so a late joiner reconstructs from the newest group alone. Deltas are on by default. The wire format uses big-endian u32 length prefixes so Rust and JS stay byte-compatible. - `json`: re-exports moq-json for now; JSON will migrate here over time. The motivating use case is a `tracks.set` track listing a broadcast's track names. Wiring that into the hang catalog is a follow-up (a catalog/wire change targeting `dev`); this crate is the generic, additive building block. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 1 + Cargo.lock | 11 + Cargo.toml | 3 + bun.lock | 30 +- js/data/README.md | 39 +++ js/data/package.json | 32 +++ js/data/src/index.ts | 2 + js/data/src/json.ts | 3 + js/data/src/set/codec.ts | 26 ++ js/data/src/set/consumer.ts | 77 +++++ js/data/src/set/index.ts | 3 + js/data/src/set/producer.ts | 121 ++++++++ js/data/src/set/set.test.ts | 104 +++++++ js/data/src/set/wire.ts | 67 +++++ js/data/tsconfig.json | 9 + package.json | 1 + rs/moq-data/Cargo.toml | 30 ++ rs/moq-data/README.md | 46 +++ rs/moq-data/src/lib.rs | 16 ++ rs/moq-data/src/set.rs | 549 ++++++++++++++++++++++++++++++++++++ 20 files changed, 1164 insertions(+), 6 deletions(-) create mode 100644 js/data/README.md create mode 100644 js/data/package.json create mode 100644 js/data/src/index.ts create mode 100644 js/data/src/json.ts create mode 100644 js/data/src/set/codec.ts create mode 100644 js/data/src/set/consumer.ts create mode 100644 js/data/src/set/index.ts create mode 100644 js/data/src/set/producer.ts create mode 100644 js/data/src/set/set.test.ts create mode 100644 js/data/src/set/wire.ts create mode 100644 js/data/tsconfig.json create mode 100644 rs/moq-data/Cargo.toml create mode 100644 rs/moq-data/README.md create mode 100644 rs/moq-data/src/lib.rs create mode 100644 rs/moq-data/src/set.rs diff --git a/CLAUDE.md b/CLAUDE.md index c7d143859..7eed1e7d5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -189,6 +189,7 @@ Changes in one area usually need matching updates elsewhere, including docs. If | `rs/moq-net` wire/API | `js/net`, `doc/concept` | | `rs/hang` catalog/container | `js/hang`, `doc/concept` | | `rs/moq-token` | `js/token` | +| `rs/moq-data` set wire/API | `js/data` (shared wire format, must stay byte-compatible) | | `rs/moq-relay` config/behavior | `doc/bin/relay/` | | `rs/moq-cli` | `doc/bin/cli.md` | | `rs/moq-gst` | `doc/bin/gstreamer.md` | diff --git a/Cargo.lock b/Cargo.lock index 8163bcb40..1ebc8624b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3845,6 +3845,17 @@ dependencies = [ "url", ] +[[package]] +name = "moq-data" +version = "0.0.1" +dependencies = [ + "bytes", + "kio", + "moq-json", + "moq-net", + "thiserror 2.0.18", +] + [[package]] name = "moq-ffi" version = "0.2.20" diff --git a/Cargo.toml b/Cargo.toml index 26676fb85..ae06bbd69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "rs/moq-bench", "rs/moq-boy", "rs/moq-cli", + "rs/moq-data", "rs/moq-ffi", "rs/moq-gst", "rs/moq-json", @@ -28,6 +29,7 @@ default-members = [ "rs/moq-audio", "rs/moq-bench", "rs/moq-cli", + "rs/moq-data", # "rs/moq-ffi", # requires Python/maturin # "rs/moq-gst", # requires GStreamer "rs/moq-json", @@ -50,6 +52,7 @@ rust-version = "1.85" hang = { version = "0.19", path = "rs/hang" } kio = { version = "0.3", path = "rs/kio" } moq-audio = { version = "0.0.3", path = "rs/moq-audio" } +moq-data = { version = "0.0.1", path = "rs/moq-data" } moq-json = { version = "0.0.2", path = "rs/moq-json" } moq-loc = { version = "0.1", path = "rs/moq-loc" } moq-msf = { version = "0.2", path = "rs/moq-msf" } diff --git a/bun.lock b/bun.lock index f3cefb272..7cbc5a8ab 100644 --- a/bun.lock +++ b/bun.lock @@ -77,9 +77,25 @@ "typescript": "^6.0.3", }, }, + "js/data": { + "name": "@moq/data", + "version": "0.0.1", + "dependencies": { + "@moq/json": "workspace:^", + "@moq/net": "workspace:^", + }, + "devDependencies": { + "@types/bun": "^1.3.14", + "rimraf": "^6.1.3", + "typescript": "^6.0.3", + }, + "peerDependencies": { + "zod": "^4.0.0", + }, + }, "js/hang": { "name": "@moq/hang", - "version": "0.2.7", + "version": "0.2.9", "dependencies": { "@kixelated/libavjs-webcodecs-polyfill": "^0.5.5", "@libav.js/variant-opus-af": "^6.8.8", @@ -100,7 +116,7 @@ }, "js/json": { "name": "@moq/json", - "version": "0.0.1", + "version": "0.0.2", "dependencies": { "@moq/net": "workspace:^", "@moq/signals": "workspace:^", @@ -159,7 +175,7 @@ }, "js/net": { "name": "@moq/net", - "version": "0.1.2", + "version": "0.1.4", "dependencies": { "@moq/qmux": "^0.0.6", "@moq/signals": "workspace:*", @@ -179,7 +195,7 @@ }, "js/publish": { "name": "@moq/publish", - "version": "0.2.10", + "version": "0.2.14", "dependencies": { "@moq/hang": "workspace:^", "@moq/json": "workspace:^", @@ -198,7 +214,7 @@ }, "js/signals": { "name": "@moq/signals", - "version": "0.1.7", + "version": "0.1.8", "devDependencies": { "@types/bun": "^1.3.11", "@types/react": "^19.2.15", @@ -238,7 +254,7 @@ }, "js/watch": { "name": "@moq/watch", - "version": "0.2.14", + "version": "0.2.16", "dependencies": { "@moq/hang": "workspace:^", "@moq/json": "workspace:^", @@ -517,6 +533,8 @@ "@moq/clock": ["@moq/clock@workspace:js/clock"], + "@moq/data": ["@moq/data@workspace:js/data"], + "@moq/demo": ["@moq/demo@workspace:demo/web"], "@moq/demo-boy": ["@moq/demo-boy@workspace:demo/boy"], diff --git a/js/data/README.md b/js/data/README.md new file mode 100644 index 000000000..a05a669e9 --- /dev/null +++ b/js/data/README.md @@ -0,0 +1,39 @@ +

+ Media over QUIC +

+ +# @moq/data + +[![npm version](https://img.shields.io/npm/v/@moq/data)](https://www.npmjs.com/package/@moq/data) +[![TypeScript](https://img.shields.io/badge/TypeScript-ready-blue.svg)](https://www.typescriptlang.org/) + +Helpers for sending metadata over [Media over QUIC](https://moq.dev/) tracks. + +Each helper maps an application data structure onto a [`@moq/net`](../net) track, handling snapshots and deltas so a late joiner can reconstruct the current state from the newest group alone. + +- **`@moq/data/set`** syncs a `Set`-like collection of arbitrary binary items, encoding changes as `+`/`-` deltas. +- **`@moq/data/json`** re-exports [`@moq/json`](../json) for snapshot/delta JSON publishing. It lives in its own package today and will migrate here over time. + +## Set + +```ts +import { Producer, Consumer, stringCodec } from "@moq/data/set"; + +// Publish the set of track names in a broadcast. +const producer = new Producer(track, { codec: stringCodec }); +producer.insert("video"); +producer.insert("audio"); +producer.remove("audio"); + +// Consume: yields the full set after each change. +const consumer = new Consumer(track.subscribe(), { codec: stringCodec }); +for await (const names of consumer) { + console.log(names); // Set +} +``` + +Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. + +Items are arbitrary binary data via a `Codec` (`encode`/`decode` to `Uint8Array`). `stringCodec` and `bytesCodec` are provided; supply your own for richer types. Items dedupe by their encoded bytes, so two values with the same encoding are the same member. + +Deltas are on by default (`deltaRatio: 2`); a delta is appended while the group stays within `deltaRatio` times the size of a fresh snapshot, otherwise a new snapshot group is started. diff --git a/js/data/package.json b/js/data/package.json new file mode 100644 index 000000000..faf6a0196 --- /dev/null +++ b/js/data/package.json @@ -0,0 +1,32 @@ +{ + "name": "@moq/data", + "type": "module", + "version": "0.0.1", + "description": "Helpers for sending metadata (sets, JSON) over MoQ tracks.", + "license": "(MIT OR Apache-2.0)", + "repository": "github:moq-dev/moq", + "sideEffects": false, + "exports": { + ".": "./src/index.ts", + "./set": "./src/set/index.ts", + "./json": "./src/json.ts" + }, + "scripts": { + "build": "rimraf dist && tsc -b && bun ../common/package.ts", + "check": "tsc --noEmit", + "test": "bun test --only-failures", + "release": "bun ../common/release.ts" + }, + "dependencies": { + "@moq/json": "workspace:^", + "@moq/net": "workspace:^" + }, + "peerDependencies": { + "zod": "^4.0.0" + }, + "devDependencies": { + "@types/bun": "^1.3.14", + "rimraf": "^6.1.3", + "typescript": "^6.0.3" + } +} diff --git a/js/data/src/index.ts b/js/data/src/index.ts new file mode 100644 index 000000000..60637782c --- /dev/null +++ b/js/data/src/index.ts @@ -0,0 +1,2 @@ +export * as Json from "@moq/json"; +export * as Set from "./set/index.ts"; diff --git a/js/data/src/json.ts b/js/data/src/json.ts new file mode 100644 index 000000000..882e72b9a --- /dev/null +++ b/js/data/src/json.ts @@ -0,0 +1,3 @@ +// Snapshot/delta JSON publishing, re-exported from @moq/json. JSON lives in its own package today +// and will migrate here over time. +export * from "@moq/json"; diff --git a/js/data/src/set/codec.ts b/js/data/src/set/codec.ts new file mode 100644 index 000000000..243c12392 --- /dev/null +++ b/js/data/src/set/codec.ts @@ -0,0 +1,26 @@ +/** + * Encodes a set item to and from its wire bytes. + * + * Encoding must be deterministic and round-trip: `codec.decode(codec.encode(value))` must equal + * `value`. Two items are the same set member iff they encode to the same bytes, so distinct items + * must encode distinctly. + */ +export interface Codec { + encode(value: T): Uint8Array; + decode(bytes: Uint8Array): T; +} + +const textEncoder = new TextEncoder(); +const textDecoder = new TextDecoder(); + +/** A codec for UTF-8 strings, e.g. a set of track names. */ +export const stringCodec: Codec = { + encode: (value) => textEncoder.encode(value), + decode: (bytes) => textDecoder.decode(bytes), +}; + +/** A codec for raw binary items, passed through untouched. */ +export const bytesCodec: Codec = { + encode: (value) => value, + decode: (bytes) => bytes, +}; diff --git a/js/data/src/set/consumer.ts b/js/data/src/set/consumer.ts new file mode 100644 index 000000000..99cc2310b --- /dev/null +++ b/js/data/src/set/consumer.ts @@ -0,0 +1,77 @@ +import type * as Moq from "@moq/net"; + +import type { Config } from "./producer.ts"; +import { decodeDelta, decodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts"; + +/** + * Consumes a set from a track, reconstructing it from snapshots and deltas. + * + * Reads each group's snapshot (frame 0) and applies the following frames as insert/remove deltas, + * yielding the reconstructed set after each one. + */ +export class Consumer { + #track: Moq.Track; + #codec: Config["codec"]; + + #group?: Moq.Group; + // Keyed by encoded bytes so items dedupe by value, not reference. + #current = new Map(); + #framesRead = 0; + + constructor(track: Moq.Track, config: Config) { + this.#track = track; + this.#codec = config.codec; + } + + /** Get the set after the next change, or undefined once the track ends. */ + async next(): Promise | undefined> { + for (;;) { + if (!this.#group) { + // Advance to the next group with a higher sequence number (skipping late arrivals). + this.#group = await this.#track.nextGroupOrdered(); + if (!this.#group) return undefined; + this.#current = new Map(); + this.#framesRead = 0; + } + + const frame = await this.#group.readFrame(); + if (frame === undefined) { + // The group is exhausted; advance to the next one. + this.#group = undefined; + continue; + } + + this.#apply(frame); + return new Set(this.#current.values()); + } + } + + async *[Symbol.asyncIterator](): AsyncIterator> { + for (;;) { + const value = await this.next(); + if (value === undefined) return; + yield value; + } + } + + // Frame 0 of a group is a snapshot, the rest are insert/remove deltas. + #apply(frame: Uint8Array): void { + if (this.#framesRead === 0) { + this.#current = new Map(); + for (const item of decodeSnapshot(frame)) { + this.#current.set(keyOf(item), this.#codec.decode(item)); + } + } else { + const [op, item] = decodeDelta(frame); + const key = keyOf(item); + if (op === INSERT) { + this.#current.set(key, this.#codec.decode(item)); + } else if (op === REMOVE) { + this.#current.delete(key); + } else { + throw new Error(`unknown op byte: ${op}`); + } + } + this.#framesRead += 1; + } +} diff --git a/js/data/src/set/index.ts b/js/data/src/set/index.ts new file mode 100644 index 000000000..a200f821a --- /dev/null +++ b/js/data/src/set/index.ts @@ -0,0 +1,3 @@ +export { bytesCodec, type Codec, stringCodec } from "./codec.ts"; +export { Consumer } from "./consumer.ts"; +export { type Config, Producer } from "./producer.ts"; diff --git a/js/data/src/set/producer.ts b/js/data/src/set/producer.ts new file mode 100644 index 000000000..0429336e8 --- /dev/null +++ b/js/data/src/set/producer.ts @@ -0,0 +1,121 @@ +import type * as Moq from "@moq/net"; + +import type { Codec } from "./codec.ts"; +import { encodeDelta, encodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts"; + +// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. Kept well +// below the per-group frame cap so a late joiner can always read the snapshot at frame 0. +const MAX_DELTA_FRAMES = 256; + +export interface Config { + // Encodes items to and from their wire bytes. Use `stringCodec` for a set of strings. + codec: Codec; + + // A delta is appended to the current group while the deltas accumulated since the last snapshot + // stay within `deltaRatio` times the size of a fresh snapshot; otherwise a new snapshot group is + // started. Defaults to 2. The whole point of a set track is incremental add/remove, so deltas are + // on by default (unlike @moq/json). + deltaRatio?: number; +} + +/** Publishes a set over a track, choosing snapshots and deltas automatically. */ +export class Producer { + #track: Moq.Track; + #codec: Codec; + #deltaRatio: number; + + // Keyed by encoded bytes so items dedupe by value, not reference. + #items = new Map(); + + #group?: Moq.Group; + #groupFrames = 0; + #groupDeltaBytes = 0; + + constructor(track: Moq.Track, config: Config) { + this.#track = track; + this.#codec = config.codec; + this.#deltaRatio = config.deltaRatio ?? 2; + } + + /** Insert an item, publishing a delta or snapshot. Returns true if it was newly inserted. */ + insert(value: T): boolean { + const bytes = this.#codec.encode(value); + const key = keyOf(bytes); + if (this.#items.has(key)) return false; + + this.#items.set(key, value); + this.#publish(INSERT, bytes); + return true; + } + + /** Remove an item, publishing a delta or snapshot. Returns true if it was present. */ + remove(value: T): boolean { + const bytes = this.#codec.encode(value); + const key = keyOf(bytes); + if (!this.#items.has(key)) return false; + + this.#items.delete(key); + this.#publish(REMOVE, bytes); + return true; + } + + /** Whether the item is currently in the set. */ + has(value: T): boolean { + return this.#items.has(keyOf(this.#codec.encode(value))); + } + + /** The number of items currently in the set. */ + get size(): number { + return this.#items.size; + } + + /** Iterate over the items currently in the set. */ + values(): IterableIterator { + return this.#items.values(); + } + + /** Finish the track, closing any open group. */ + finish(): void { + this.#group?.close(); + this.#group = undefined; + this.#track.close(); + } + + // Publish a single change. The change is already reflected in `#items`, so a snapshot captures it. + #publish(op: number, item: Uint8Array): void { + const snapshot = this.#snapshot(); + const deltaLen = 1 + item.length; + + if (this.#shouldSnapshot(deltaLen, snapshot.length)) { + this.#writeSnapshot(snapshot); + } else { + // biome-ignore lint/style/noNonNullAssertion: shouldSnapshot returning false guarantees an open group. + this.#group!.writeFrame(encodeDelta(op, item)); + this.#groupFrames += 1; + this.#groupDeltaBytes += deltaLen; + } + } + + #snapshot(): Uint8Array { + const items: Uint8Array[] = []; + for (const value of this.#items.values()) items.push(this.#codec.encode(value)); + return encodeSnapshot(items); + } + + #shouldSnapshot(deltaLen: number, snapshotLen: number): boolean { + if (!this.#group || this.#groupFrames >= MAX_DELTA_FRAMES) return true; + // Roll a snapshot once the replayed deltas would outgrow the budget relative to a snapshot. + return this.#groupDeltaBytes + deltaLen > this.#deltaRatio * snapshotLen; + } + + #writeSnapshot(snapshot: Uint8Array): void { + // The previous group is complete; no more frames will be appended to it. + this.#group?.close(); + + const group = this.#track.appendGroup(); + group.writeFrame(snapshot); + this.#group = group; + this.#groupFrames = 1; + this.#groupDeltaBytes = 0; + } +} diff --git a/js/data/src/set/set.test.ts b/js/data/src/set/set.test.ts new file mode 100644 index 000000000..02afb67c2 --- /dev/null +++ b/js/data/src/set/set.test.ts @@ -0,0 +1,104 @@ +import { expect, test } from "bun:test"; +import { Track } from "@moq/net"; + +import { stringCodec } from "./codec.ts"; +import { Consumer } from "./consumer.ts"; +import { Producer } from "./producer.ts"; + +// Reconstruct every set a consumer yields, in order. Consumes the track's groups. +async function drain(track: Track): Promise[]> { + const out: Set[] = []; + for await (const value of new Consumer(track, { codec: stringCodec })) out.push(value); + return out; +} + +// Inspect the published layout via the public API: the frame count of each group, in order. Like +// `drain`, this consumes the track's groups, so don't call both on one track. Finish the track +// first so group/frame reads terminate. +async function structure(track: Track): Promise { + const counts: number[] = []; + for (;;) { + const group = await track.nextGroupOrdered(); + if (!group) break; + + let frames = 0; + while ((await group.readFrame()) !== undefined) frames++; + counts.push(frames); + } + return counts; +} + +function set(...items: string[]): Set { + return new Set(items); +} + +test("deltas off: a snapshot group per change", async () => { + const track = new Track("test"); + // A tight ratio leaves no room for any delta past the snapshot, so every change rolls a group. + const producer = new Producer(track, { codec: stringCodec, deltaRatio: 0 }); + producer.insert("video"); + producer.insert("audio"); + producer.finish(); + + expect((await drain(track)).at(-1)).toEqual(set("video", "audio")); +}); + +test("deltas share one group", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec }); + producer.insert("video"); // snapshot + producer.insert("audio"); // delta + producer.remove("video"); // delta + producer.finish(); + + // All changes fit in a single group as snapshot + two deltas. + expect(await structure(track)).toEqual([3]); +}); + +test("redundant insert and remove write nothing", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec }); + expect(producer.insert("video")).toBe(true); + expect(producer.insert("video")).toBe(false); // already present + expect(producer.remove("audio")).toBe(false); // never present + producer.finish(); + + expect(await structure(track)).toEqual([1]); +}); + +test("live consumer sees each change", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec }); + const consumer = new Consumer(track, { codec: stringCodec }); + + producer.insert("video"); + expect(await consumer.next()).toEqual(set("video")); + + producer.insert("audio"); + expect(await consumer.next()).toEqual(set("video", "audio")); + + producer.remove("video"); + expect(await consumer.next()).toEqual(set("audio")); +}); + +test("late joiner reconstructs from deltas", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec }); + producer.insert("a"); + producer.insert("b"); + producer.insert("c"); + producer.remove("a"); + producer.finish(); + + expect((await drain(track)).at(-1)).toEqual(set("b", "c")); +}); + +test("frame cap rolls snapshot", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec, deltaRatio: 1_000_000 }); + // Snapshot (frame 0) plus deltas fill the group until the frame cap forces a roll. + for (let i = 0; i <= 256; i++) producer.insert(`item-${i}`); + producer.finish(); + + expect(await structure(track)).toEqual([256, 1]); +}); diff --git a/js/data/src/set/wire.ts b/js/data/src/set/wire.ts new file mode 100644 index 000000000..6cdbc03b5 --- /dev/null +++ b/js/data/src/set/wire.ts @@ -0,0 +1,67 @@ +// Wire format for a set track. Each group is self-contained: frame 0 is a snapshot of every item, +// and each following frame is a single insert/remove delta. +// +// - snapshot: u32(count) followed by `count` repetitions of u32(len) then `len` item bytes. +// - delta: a one-byte op ('+' insert, '-' remove) followed by the item bytes to the end of frame. +// +// Lengths are big-endian u32 (not QUIC varints) so the format stays self-contained and trivially +// matches the Rust implementation (`moq-data`). + +export const INSERT = 0x2b; // '+' +export const REMOVE = 0x2d; // '-' + +/** A stable map key for an item's encoded bytes, giving the set value (not reference) semantics. */ +export function keyOf(bytes: Uint8Array): string { + let key = ""; + for (let i = 0; i < bytes.length; i++) key += String.fromCharCode(bytes[i]); + return key; +} + +export function encodeSnapshot(items: Uint8Array[]): Uint8Array { + let total = 4; + for (const item of items) total += 4 + item.length; + + const out = new Uint8Array(total); + const view = new DataView(out.buffer); + view.setUint32(0, items.length); + + let offset = 4; + for (const item of items) { + view.setUint32(offset, item.length); + offset += 4; + out.set(item, offset); + offset += item.length; + } + return out; +} + +export function decodeSnapshot(frame: Uint8Array): Uint8Array[] { + if (frame.length < 4) throw new Error("snapshot is missing its count"); + const view = new DataView(frame.buffer, frame.byteOffset, frame.byteLength); + const count = view.getUint32(0); + + const items: Uint8Array[] = []; + let offset = 4; + for (let i = 0; i < count; i++) { + if (offset + 4 > frame.length) throw new Error("snapshot is missing an item length"); + const len = view.getUint32(offset); + offset += 4; + + if (offset + len > frame.length) throw new Error("snapshot item runs past end of frame"); + items.push(frame.subarray(offset, offset + len)); + offset += len; + } + return items; +} + +export function encodeDelta(op: number, item: Uint8Array): Uint8Array { + const out = new Uint8Array(1 + item.length); + out[0] = op; + out.set(item, 1); + return out; +} + +export function decodeDelta(frame: Uint8Array): [number, Uint8Array] { + if (frame.length === 0) throw new Error("empty delta frame"); + return [frame[0], frame.subarray(1)]; +} diff --git a/js/data/tsconfig.json b/js/data/tsconfig.json new file mode 100644 index 000000000..bb55d7c43 --- /dev/null +++ b/js/data/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "./src", + "types": ["bun"] + }, + "include": ["src"] +} diff --git a/package.json b/package.json index 2d4c5a54b..91f8bc0a4 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,7 @@ "js/clock", "js/token", "js/json", + "js/data", "js/hang", "js/loc", "js/msf", diff --git a/rs/moq-data/Cargo.toml b/rs/moq-data/Cargo.toml new file mode 100644 index 000000000..c916d088a --- /dev/null +++ b/rs/moq-data/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "moq-data" +description = "Helpers for sending metadata (sets, JSON) over MoQ tracks." +authors = ["Luke Curley "] +repository = "https://github.com/moq-dev/moq" +license = "MIT OR Apache-2.0" + +version = "0.0.1" +edition = "2024" +rust-version.workspace = true + +keywords = ["quic", "http3", "webtransport", "metadata", "live"] +categories = ["multimedia", "network-programming", "web-programming"] + +[lib] +doctest = false + +[features] +default = ["json", "set"] +# Re-export `moq-json` as `moq_data::json`. JSON will migrate into this crate eventually. +json = ["dep:moq-json"] +# A `HashSet`-like collection synced over a track via add/remove deltas. +set = ["dep:moq-net", "dep:bytes", "dep:kio", "dep:thiserror"] + +[dependencies] +bytes = { version = "1", optional = true } +kio = { workspace = true, optional = true } +moq-json = { workspace = true, optional = true } +moq-net = { workspace = true, optional = true } +thiserror = { version = "2", optional = true } diff --git a/rs/moq-data/README.md b/rs/moq-data/README.md new file mode 100644 index 000000000..78133abc4 --- /dev/null +++ b/rs/moq-data/README.md @@ -0,0 +1,46 @@ +

+ Media over QUIC +

+ +# moq-data + +[![crates.io](https://img.shields.io/crates/v/moq-data)](https://crates.io/crates/moq-data) +[![docs.rs](https://img.shields.io/docsrs/moq-data)](https://docs.rs/moq-data) + +Helpers for sending metadata over [Media over QUIC](https://moq.dev/) tracks. + +Each helper maps an application data structure onto a [`moq-net`](../moq-net) track, handling snapshots and deltas so a late joiner can reconstruct the current state from the newest group alone. + +- **`set`** syncs a `HashSet`-like collection of arbitrary binary items, encoding changes as `+`/`-` deltas. +- **`json`** re-exports [`moq-json`](../moq-json) for snapshot/delta JSON publishing. It lives in its own crate today and will migrate here over time. + +## Set + +```rust +use moq_data::set; + +// Publish the set of track names in a broadcast. +let mut tracks = set::Producer::::new(track, set::Config::default()); +tracks.insert("video".to_string())?; +tracks.insert("audio".to_string())?; +tracks.remove("audio")?; + +// Consume: yields the full set after each change. +let mut consumer = set::Consumer::::new(track.subscribe(None)); +while let Some(names) = consumer.next().await? { + println!("{names:?}"); +} +``` + +Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. + +Items are arbitrary binary data: implement the `set::Item` trait (encode to bytes, decode back) for any type. `String`, `Vec`, and `bytes::Bytes` are supported out of the box. + +Deltas are on by default (`Config { delta_ratio: Some(2.0) }`); a delta is appended while the group stays within `delta_ratio` times the size of a fresh snapshot, otherwise a new snapshot group is started. Set `delta_ratio: None` to publish a full snapshot per change. + +## Features + +| Feature | Default | Description | +|---|---|---| +| `set` | yes | The `HashSet`-like collection. | +| `json` | yes | Re-export of `moq-json`. | diff --git a/rs/moq-data/src/lib.rs b/rs/moq-data/src/lib.rs new file mode 100644 index 000000000..ff3bf3990 --- /dev/null +++ b/rs/moq-data/src/lib.rs @@ -0,0 +1,16 @@ +//! Helpers for sending metadata over [`moq-net`](https://docs.rs/moq-net) tracks. +//! +//! Each helper maps an application data structure onto a track, handling snapshots and deltas so a +//! late joiner can reconstruct the current state from the newest group alone. +//! +//! - [`set`] syncs a [`HashSet`](std::collections::HashSet)-like collection of arbitrary binary +//! items, encoding changes as `+`/`-` deltas. +//! - [`json`] re-exports [`moq-json`](https://docs.rs/moq-json) for snapshot/delta JSON publishing. +//! It lives in its own crate today and will migrate here over time. + +/// Snapshot/delta JSON publishing, re-exported from [`moq-json`](https://docs.rs/moq-json). +#[cfg(feature = "json")] +pub use moq_json as json; + +#[cfg(feature = "set")] +pub mod set; diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs new file mode 100644 index 000000000..803e1ecff --- /dev/null +++ b/rs/moq-data/src/set.rs @@ -0,0 +1,549 @@ +//! A [`HashSet`]-like collection synced over a [`moq-net`](moq_net) track. +//! +//! The set is published as a series of self-contained groups. A group's first frame is a full +//! snapshot of every item; each following frame is a single `+` (insert) or `-` (remove) delta +//! applied in order. A consumer jumps to the newest group, decodes the snapshot, and replays the +//! deltas, so a late joiner never needs older groups. +//! +//! Items are arbitrary binary data: any type implementing [`Item`] (encode to bytes, decode back) +//! can live in the set. [`String`], [`Vec`], and [`bytes::Bytes`] are supported out of the box; +//! a custom type can implement [`Item`] however it likes (e.g. via `serde_json`). +//! +//! # Wire format +//! +//! Every frame within a group is one of: +//! +//! - **snapshot** (frame 0): `varint(count)` followed by `count` repetitions of +//! `varint(len)` then `len` item bytes. +//! - **delta** (frame 1+): a one-byte op (`+` = `0x2B` insert, `-` = `0x2D` remove) followed by the +//! item bytes, which run to the end of the frame. + +use std::borrow::Borrow; +use std::collections::HashSet; +use std::hash::Hash; +use std::task::Poll; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +/// One-byte op prefixing an insert delta frame. +const INSERT: u8 = b'+'; +/// One-byte op prefixing a remove delta frame. +const REMOVE: u8 = b'-'; + +/// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. +/// +/// Kept well below moq-net's per-group frame cap so a late joiner can always read the snapshot at +/// frame 0 before the group is evicted. +const MAX_DELTA_FRAMES: usize = 256; + +/// Errors produced while publishing or consuming a set. +#[derive(thiserror::Error, Debug, Clone)] +#[non_exhaustive] +pub enum Error { + /// An error from the underlying track. + #[error(transparent)] + Net(#[from] moq_net::Error), + + /// A frame could not be parsed as a snapshot or delta. + #[error("malformed frame: {0}")] + Malformed(String), + + /// An item failed to encode or decode. + #[error("item: {0}")] + Item(String), +} + +/// A [`Result`](std::result::Result) using this module's [`Error`]. +pub type Result = std::result::Result; + +/// An item that can be stored in a [`Set`](Producer). +/// +/// Encoding must be deterministic and round-trip: `Item::decode(item.encode())` must equal `item`. +/// Two items are the same set member iff they're equal under [`Eq`]/[`Hash`], so distinct items must +/// encode to distinct bytes. +pub trait Item: Clone + Eq + Hash { + /// Encode the item to its wire bytes. + fn encode(&self) -> Bytes; + + /// Decode an item from wire bytes. + fn decode(bytes: Bytes) -> Result + where + Self: Sized; +} + +impl Item for String { + fn encode(&self) -> Bytes { + Bytes::copy_from_slice(self.as_bytes()) + } + + fn decode(bytes: Bytes) -> Result { + String::from_utf8(bytes.into()).map_err(|err| Error::Item(err.to_string())) + } +} + +impl Item for Vec { + fn encode(&self) -> Bytes { + Bytes::copy_from_slice(self) + } + + fn decode(bytes: Bytes) -> Result { + Ok(bytes.into()) + } +} + +impl Item for Bytes { + fn encode(&self) -> Bytes { + self.clone() + } + + fn decode(bytes: Bytes) -> Result { + Ok(bytes) + } +} + +/// Configuration for a [`Producer`]. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct Config { + /// Controls whether changes are published as deltas instead of full snapshots. + /// + /// `None` disables deltas: every change starts a new group with a full snapshot. + /// + /// `Some(ratio)` enables deltas. A `+`/`-` delta is appended to the current group as long as the + /// deltas accumulated since the last snapshot stay within `ratio` times the size of a fresh + /// snapshot; otherwise a new snapshot group is started, bounding how much a late joiner replays. + pub delta_ratio: Option, +} + +impl Default for Config { + fn default() -> Self { + // Deltas on by default: the whole point of a set track is incremental add/remove. + Self { delta_ratio: Some(2.0) } + } +} + +/// Publishes a set over a track, choosing snapshots and deltas automatically. +pub struct Producer { + track: moq_net::TrackProducer, + config: Config, + + current: HashSet, + group: Option, + /// Total frames in the open group, including the snapshot, for the frame cap. + group_frames: usize, + /// Bytes of delta frames appended since the last snapshot, for the ratio budget. + group_delta_bytes: u64, +} + +impl Producer { + /// Create a producer that publishes to the given track. + pub fn new(track: moq_net::TrackProducer, config: Config) -> Self { + Self { + track, + config, + current: HashSet::new(), + group: None, + group_frames: 0, + group_delta_bytes: 0, + } + } + + /// Insert an item, publishing a delta or snapshot. Returns `true` if it was newly inserted. + pub fn insert(&mut self, item: T) -> Result { + if self.current.contains(&item) { + return Ok(false); + } + + // Encode while we still own the item, then move it into the set so the snapshot reflects it. + let bytes = item.encode(); + self.current.insert(item); + self.publish(INSERT, bytes)?; + Ok(true) + } + + /// Remove an item, publishing a delta or snapshot. Returns `true` if it was present. + pub fn remove(&mut self, item: &Q) -> Result + where + T: Borrow, + Q: Hash + Eq + ?Sized, + { + // `take` removes it from the set and hands back the owned value so we can encode the delta. + let Some(removed) = self.current.take(item) else { + return Ok(false); + }; + let bytes = removed.encode(); + self.publish(REMOVE, bytes)?; + Ok(true) + } + + /// Whether the item is currently in the set. + pub fn contains(&self, item: &Q) -> bool + where + T: Borrow, + Q: Hash + Eq + ?Sized, + { + self.current.contains(item) + } + + /// The number of items currently in the set. + pub fn len(&self) -> usize { + self.current.len() + } + + /// Whether the set is currently empty. + pub fn is_empty(&self) -> bool { + self.current.is_empty() + } + + /// Iterate over the items currently in the set. + pub fn iter(&self) -> impl Iterator { + self.current.iter() + } + + /// Create a consumer for the underlying track. + pub fn consume(&self) -> moq_net::TrackConsumer { + self.track.consume() + } + + /// Finish the track, closing any open group. + pub fn finish(&mut self) -> Result<()> { + if let Some(mut group) = self.group.take() { + group.finish()?; + } + self.track.finish()?; + Ok(()) + } + + /// Publish a single change, either as a delta on the open group or a fresh snapshot group. The + /// change is already reflected in `self.current`, so a snapshot here captures it. + fn publish(&mut self, op: u8, item: Bytes) -> Result<()> { + let snapshot = encode_snapshot(&self.current)?; + let delta_len = 1 + item.len() as u64; + + if self.should_snapshot(delta_len, snapshot.len() as u64) { + self.write_snapshot(snapshot) + } else { + let group = self.group.as_mut().expect("delta requires an open group"); + group.write_frame(encode_delta(op, &item))?; + self.group_frames += 1; + self.group_delta_bytes += delta_len; + Ok(()) + } + } + + fn should_snapshot(&self, delta_len: u64, snapshot_len: u64) -> bool { + let Some(ratio) = self.config.delta_ratio else { + return true; + }; + if self.group.is_none() || self.group_frames >= MAX_DELTA_FRAMES { + return true; + } + // Roll a snapshot once the replayed deltas would outgrow the budget relative to a snapshot. + (self.group_delta_bytes + delta_len) as f64 > ratio * snapshot_len as f64 + } + + fn write_snapshot(&mut self, snapshot: Bytes) -> Result<()> { + // The previous group is complete; no more frames will be appended to it. + if let Some(mut group) = self.group.take() { + group.finish()?; + } + + let mut group = self.track.append_group()?; + group.write_frame(snapshot)?; + self.group_frames = 1; + self.group_delta_bytes = 0; + + if self.config.delta_ratio.is_some() { + // Keep the group open so future deltas can be appended. + self.group = Some(group); + } else { + // Deltas disabled: one snapshot per group. + group.finish()?; + } + + Ok(()) + } +} + +/// Consumes a set from a track, reconstructing it from snapshots and deltas. +pub struct Consumer { + track: moq_net::TrackConsumer, + group: Option, + current: HashSet, + frames_read: usize, +} + +impl Consumer { + /// Create a consumer reading from the given track consumer. + pub fn new(track: moq_net::TrackConsumer) -> Self { + Self { + track, + group: None, + current: HashSet::new(), + frames_read: 0, + } + } + + /// Get the set after the next change, or `None` once the track ends. + pub async fn next(&mut self) -> Result>> + where + T: Unpin, + { + kio::wait(|waiter| self.poll_next(waiter)).await + } + + /// Poll for the set after the next change, without blocking. + /// + /// Jumps to the newest group, decodes its snapshot, and applies deltas in order, yielding the + /// reconstructed set after each frame. Switching to a newer group discards the older one. + pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll>>> { + // Drain to the newest group, resetting reconstruction state whenever we switch. + let track_finished = loop { + match self.track.poll_next_group(waiter)? { + Poll::Ready(Some(group)) => { + self.group = Some(group); + self.current.clear(); + self.frames_read = 0; + } + Poll::Ready(None) => break true, + Poll::Pending => break false, + } + }; + + if let Some(group) = &mut self.group { + match group.poll_read_frame(waiter)? { + Poll::Ready(Some(frame)) => { + self.apply(frame)?; + return Poll::Ready(Ok(Some(self.current.clone()))); + } + // The current group is exhausted; wait for a newer one. + Poll::Ready(None) => self.group = None, + Poll::Pending => return Poll::Pending, + } + } + + if track_finished { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } + } + + /// Apply one frame: frame 0 of a group is a snapshot, the rest are `+`/`-` deltas. + fn apply(&mut self, frame: Bytes) -> Result<()> { + if self.frames_read == 0 { + self.current = decode_snapshot(frame)?; + } else { + let (op, item) = decode_delta(frame)?; + let item = T::decode(item)?; + match op { + INSERT => { + self.current.insert(item); + } + REMOVE => { + self.current.remove(&item); + } + other => return Err(Error::Malformed(format!("unknown op byte: {other:#04x}"))), + } + } + self.frames_read += 1; + Ok(()) + } +} + +/// Encode the full set as a snapshot frame: a `u32` count then each item `u32`-length-prefixed. +/// +/// Lengths are big-endian `u32` rather than QUIC varints so the format stays self-contained and +/// trivially matches the JS implementation (`@moq/data`). +fn encode_snapshot(set: &HashSet) -> Result { + let count = u32::try_from(set.len()).map_err(|_| Error::Malformed("set has too many items".into()))?; + + let mut buf = BytesMut::new(); + buf.put_u32(count); + for item in set { + let bytes = item.encode(); + let len = u32::try_from(bytes.len()).map_err(|_| Error::Malformed("item is too large".into()))?; + buf.put_u32(len); + buf.put_slice(&bytes); + } + Ok(buf.freeze()) +} + +fn decode_snapshot(mut frame: Bytes) -> Result> { + if frame.remaining() < 4 { + return Err(Error::Malformed("snapshot is missing its count".into())); + } + let count = frame.get_u32(); + + let mut set = HashSet::with_capacity(count as usize); + for _ in 0..count { + if frame.remaining() < 4 { + return Err(Error::Malformed("snapshot is missing an item length".into())); + } + let len = frame.get_u32() as usize; + if frame.remaining() < len { + return Err(Error::Malformed("snapshot item runs past end of frame".into())); + } + set.insert(T::decode(frame.split_to(len))?); + } + Ok(set) +} + +/// Encode a delta frame: one op byte followed by the item bytes. +fn encode_delta(op: u8, item: &Bytes) -> Bytes { + let mut buf = BytesMut::with_capacity(1 + item.len()); + buf.put_u8(op); + buf.put_slice(item); + buf.freeze() +} + +fn decode_delta(mut frame: Bytes) -> Result<(u8, Bytes)> { + if !frame.has_remaining() { + return Err(Error::Malformed("empty delta frame".into())); + } + let op = frame.get_u8(); + Ok((op, frame)) +} + +#[cfg(test)] +mod test { + use super::*; + + fn producer(config: Config) -> (Producer, moq_net::TrackConsumer) { + let track = moq_net::Track::new("test").produce(); + let consumer = track.consume(); + (Producer::new(track, config), consumer) + } + + fn set(items: &[&str]) -> HashSet { + items.iter().map(|s| s.to_string()).collect() + } + + /// Reconstruct every set a consumer yields, in order. + fn drain(track: moq_net::TrackConsumer) -> Vec> { + let mut consumer = Consumer::::new(track); + let waiter = kio::Waiter::noop(); + let mut out = Vec::new(); + while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) { + out.push(value); + } + out + } + + #[test] + fn snapshot_roundtrip() { + let original = set(&["video", "audio", "captions"]); + let frame = encode_snapshot(&original).unwrap(); + assert_eq!(decode_snapshot::(frame).unwrap(), original); + } + + #[test] + fn deltas_off_snapshot_per_change() { + let (mut producer, track) = producer(Config { delta_ratio: None }); + producer.insert("video".into()).unwrap(); + producer.insert("audio".into()).unwrap(); + producer.finish().unwrap(); + + // Two changes => two snapshot groups; a late joiner only sees the latest full set. + assert_eq!(track.latest(), Some(1)); + assert_eq!(drain(track).last().unwrap(), &set(&["video", "audio"])); + } + + #[test] + fn deltas_share_one_group() { + let (mut producer, track) = producer(Config::default()); + producer.insert("video".into()).unwrap(); // snapshot, group 0 + producer.insert("audio".into()).unwrap(); // delta + producer.remove("video").unwrap(); // delta + producer.finish().unwrap(); + + // All changes fit in a single group as snapshot + deltas. + assert_eq!(track.latest(), Some(0)); + assert_eq!(drain(track).last().unwrap(), &set(&["audio"])); + } + + #[test] + fn redundant_insert_and_remove_write_nothing() { + let (mut producer, track) = producer(Config::default()); + assert!(producer.insert("video".into()).unwrap()); + assert!(!producer.insert("video".into()).unwrap()); // already present + assert!(!producer.remove("audio").unwrap()); // never present + producer.finish().unwrap(); + + // Only the first insert wrote a frame, so there's exactly one group. + assert_eq!(track.latest(), Some(0)); + assert_eq!(drain(track).last().unwrap(), &set(&["video"])); + } + + #[test] + fn live_consumer_sees_each_change() { + let (mut producer, track) = producer(Config::default()); + let mut consumer = Consumer::::new(track); + let waiter = kio::Waiter::noop(); + + let next = |consumer: &mut Consumer| match consumer.poll_next(&waiter) { + Poll::Ready(Ok(Some(value))) => value, + other => panic!("expected a set, got {other:?}"), + }; + + producer.insert("video".into()).unwrap(); + assert_eq!(next(&mut consumer), set(&["video"])); + + producer.insert("audio".into()).unwrap(); + assert_eq!(next(&mut consumer), set(&["video", "audio"])); + + producer.remove("video").unwrap(); + assert_eq!(next(&mut consumer), set(&["audio"])); + } + + #[test] + fn late_joiner_reconstructs_from_deltas() { + let (mut producer, track) = producer(Config::default()); + producer.insert("a".into()).unwrap(); + producer.insert("b".into()).unwrap(); + producer.insert("c".into()).unwrap(); + producer.remove("a").unwrap(); + producer.finish().unwrap(); + + // A consumer created only now still rebuilds the final set from snapshot + deltas. + assert_eq!(drain(track).last().unwrap(), &set(&["b", "c"])); + } + + #[test] + fn frame_cap_rolls_snapshot() { + // A huge ratio would otherwise keep everything in one group; the frame cap forces a roll. + let (mut producer, track) = producer(Config { + delta_ratio: Some(1_000_000.0), + }); + + // Snapshot (frame 0) plus MAX_DELTA_FRAMES - 1 deltas fill the first group, then one more rolls. + for i in 0..=MAX_DELTA_FRAMES { + producer.insert(format!("item-{i}")).unwrap(); + } + producer.finish().unwrap(); + + assert_eq!(track.latest(), Some(1)); + assert_eq!(drain(track).last().unwrap().len(), MAX_DELTA_FRAMES + 1); + } + + #[test] + fn binary_items_roundtrip() { + let track = moq_net::Track::new("test").produce(); + let sub = track.consume(); + let mut producer = Producer::>::new(track, Config::default()); + + producer.insert(vec![0x00, 0xff, 0x42]).unwrap(); + producer.insert(vec![0x01]).unwrap(); + producer.finish().unwrap(); + + let mut consumer = Consumer::>::new(sub); + let waiter = kio::Waiter::noop(); + let mut last = None; + while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) { + last = Some(value); + } + + let expected: HashSet> = [vec![0x00, 0xff, 0x42], vec![0x01]].into_iter().collect(); + assert_eq!(last.unwrap(), expected); + } +} From a9ae3d0c3cf815ebfd1644f330b0330fac4aa6cd Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:00:25 -0700 Subject: [PATCH 2/9] fix(moq-data): drop unused workspace dep, harden snapshot decode, review nits - Remove the `moq-data` entry from `[workspace.dependencies]`: nothing consumes the crate yet, so cargo-shear flagged it as unused and failed CI. It stays a workspace member, so it still builds. - decode_snapshot (Rust + JS): reject trailing bytes after the declared items, and bound the item count by the remaining frame size before allocating so a malformed frame can't request a huge HashSet capacity. - Fix the set.rs module doc to describe the u32 length prefixes (was varint). - Consumer: import `Codec` directly instead of `Config["codec"]`. - Drop forward-looking "will migrate" wording from the json re-export comments (kept in the READMEs); strengthen the set tests. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.toml | 1 - js/data/src/json.ts | 3 +-- js/data/src/set/consumer.ts | 3 ++- js/data/src/set/set.test.ts | 5 ++++- js/data/src/set/wire.ts | 2 ++ rs/moq-data/src/lib.rs | 1 - rs/moq-data/src/set.rs | 32 ++++++++++++++++++++++++++++---- 7 files changed, 37 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ae06bbd69..72eb106f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,6 @@ rust-version = "1.85" hang = { version = "0.19", path = "rs/hang" } kio = { version = "0.3", path = "rs/kio" } moq-audio = { version = "0.0.3", path = "rs/moq-audio" } -moq-data = { version = "0.0.1", path = "rs/moq-data" } moq-json = { version = "0.0.2", path = "rs/moq-json" } moq-loc = { version = "0.1", path = "rs/moq-loc" } moq-msf = { version = "0.2", path = "rs/moq-msf" } diff --git a/js/data/src/json.ts b/js/data/src/json.ts index 882e72b9a..82042c535 100644 --- a/js/data/src/json.ts +++ b/js/data/src/json.ts @@ -1,3 +1,2 @@ -// Snapshot/delta JSON publishing, re-exported from @moq/json. JSON lives in its own package today -// and will migrate here over time. +// Snapshot/delta JSON publishing, re-exported from @moq/json. export * from "@moq/json"; diff --git a/js/data/src/set/consumer.ts b/js/data/src/set/consumer.ts index 99cc2310b..8cb8c064c 100644 --- a/js/data/src/set/consumer.ts +++ b/js/data/src/set/consumer.ts @@ -1,5 +1,6 @@ import type * as Moq from "@moq/net"; +import type { Codec } from "./codec.ts"; import type { Config } from "./producer.ts"; import { decodeDelta, decodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts"; @@ -11,7 +12,7 @@ import { decodeDelta, decodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts"; */ export class Consumer { #track: Moq.Track; - #codec: Config["codec"]; + #codec: Codec; #group?: Moq.Group; // Keyed by encoded bytes so items dedupe by value, not reference. diff --git a/js/data/src/set/set.test.ts b/js/data/src/set/set.test.ts index 02afb67c2..54dd6bfbb 100644 --- a/js/data/src/set/set.test.ts +++ b/js/data/src/set/set.test.ts @@ -40,7 +40,8 @@ test("deltas off: a snapshot group per change", async () => { producer.insert("audio"); producer.finish(); - expect((await drain(track)).at(-1)).toEqual(set("video", "audio")); + // Each change is its own single-frame snapshot group. (Reconstruction is covered elsewhere.) + expect(await structure(track)).toEqual([1, 1]); }); test("deltas share one group", async () => { @@ -79,6 +80,8 @@ test("live consumer sees each change", async () => { producer.remove("video"); expect(await consumer.next()).toEqual(set("audio")); + + producer.finish(); }); test("late joiner reconstructs from deltas", async () => { diff --git a/js/data/src/set/wire.ts b/js/data/src/set/wire.ts index 6cdbc03b5..75dbef314 100644 --- a/js/data/src/set/wire.ts +++ b/js/data/src/set/wire.ts @@ -51,6 +51,8 @@ export function decodeSnapshot(frame: Uint8Array): Uint8Array[] { items.push(frame.subarray(offset, offset + len)); offset += len; } + + if (offset !== frame.length) throw new Error("snapshot has trailing bytes"); return items; } diff --git a/rs/moq-data/src/lib.rs b/rs/moq-data/src/lib.rs index ff3bf3990..fd238085a 100644 --- a/rs/moq-data/src/lib.rs +++ b/rs/moq-data/src/lib.rs @@ -6,7 +6,6 @@ //! - [`set`] syncs a [`HashSet`](std::collections::HashSet)-like collection of arbitrary binary //! items, encoding changes as `+`/`-` deltas. //! - [`json`] re-exports [`moq-json`](https://docs.rs/moq-json) for snapshot/delta JSON publishing. -//! It lives in its own crate today and will migrate here over time. /// Snapshot/delta JSON publishing, re-exported from [`moq-json`](https://docs.rs/moq-json). #[cfg(feature = "json")] diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs index 803e1ecff..e5df49769 100644 --- a/rs/moq-data/src/set.rs +++ b/rs/moq-data/src/set.rs @@ -13,8 +13,8 @@ //! //! Every frame within a group is one of: //! -//! - **snapshot** (frame 0): `varint(count)` followed by `count` repetitions of -//! `varint(len)` then `len` item bytes. +//! - **snapshot** (frame 0): big-endian `u32(count)` followed by `count` repetitions of +//! big-endian `u32(len)` then `len` item bytes. //! - **delta** (frame 1+): a one-byte op (`+` = `0x2B` insert, `-` = `0x2D` remove) followed by the //! item bytes, which run to the end of the frame. @@ -373,9 +373,16 @@ fn decode_snapshot(mut frame: Bytes) -> Result> { if frame.remaining() < 4 { return Err(Error::Malformed("snapshot is missing its count".into())); } - let count = frame.get_u32(); + let count = frame.get_u32() as usize; - let mut set = HashSet::with_capacity(count as usize); + // Every item costs at least its 4-byte length prefix, so a count larger than the remaining + // bytes allow can't be real. Reject it before allocating so a malformed frame can't ask for a + // huge capacity. + if count > frame.remaining() / 4 { + return Err(Error::Malformed("snapshot count exceeds frame bounds".into())); + } + + let mut set = HashSet::with_capacity(count); for _ in 0..count { if frame.remaining() < 4 { return Err(Error::Malformed("snapshot is missing an item length".into())); @@ -386,6 +393,11 @@ fn decode_snapshot(mut frame: Bytes) -> Result> { } set.insert(T::decode(frame.split_to(len))?); } + + if frame.has_remaining() { + return Err(Error::Malformed("snapshot has trailing bytes".into())); + } + Ok(set) } @@ -437,6 +449,18 @@ mod test { assert_eq!(decode_snapshot::(frame).unwrap(), original); } + #[test] + fn malformed_snapshot_is_rejected() { + // Trailing bytes past the declared items. + let mut frame = encode_snapshot(&set(&["video"])).unwrap().to_vec(); + frame.push(0xff); + assert!(decode_snapshot::(Bytes::from(frame)).is_err()); + + // A count far larger than the frame can hold must not allocate; it's rejected up front. + let huge = Bytes::from(vec![0xff, 0xff, 0xff, 0xff]); + assert!(decode_snapshot::(huge).is_err()); + } + #[test] fn deltas_off_snapshot_per_change() { let (mut producer, track) = producer(Config { delta_ratio: None }); From ca5fc62b9063d1987dfffe8c0cdc88aac9fe410d Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:14:53 -0700 Subject: [PATCH 3/9] refactor(moq-data): encode set items through bytes::BufMut Change `set::Item::encode` to write directly into the frame's `bytes::BufMut` instead of returning a fresh `Bytes`, with a new `size()` for the length prefix. A string or byte-vector item was copied twice (into an intermediate `Bytes`, then into the frame); now it's a single copy straight into the frame buffer. `decode` keeps taking `bytes::Bytes` so an item can hold a zero-copy slice of the frame. The wire format is unchanged (big-endian u32 length prefix + raw item bytes), so the JS implementation and cross-language interop are unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-data/README.md | 2 +- rs/moq-data/src/set.rs | 120 ++++++++++++++++++++++++++++++++--------- 2 files changed, 96 insertions(+), 26 deletions(-) diff --git a/rs/moq-data/README.md b/rs/moq-data/README.md index 78133abc4..f221266ec 100644 --- a/rs/moq-data/README.md +++ b/rs/moq-data/README.md @@ -34,7 +34,7 @@ while let Some(names) = consumer.next().await? { Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. -Items are arbitrary binary data: implement the `set::Item` trait (encode to bytes, decode back) for any type. `String`, `Vec`, and `bytes::Bytes` are supported out of the box. +Items are arbitrary binary data: implement the `set::Item` trait for any type. It encodes straight into the frame's `bytes::BufMut` (one copy, no intermediate buffer) and decodes from a `bytes::Bytes` slice. `String`, `Vec`, and `bytes::Bytes` are supported out of the box. Deltas are on by default (`Config { delta_ratio: Some(2.0) }`); a delta is appended while the group stays within `delta_ratio` times the size of a fresh snapshot, otherwise a new snapshot group is started. Set `delta_ratio: None` to publish a full snapshot per change. diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs index e5df49769..458c925a4 100644 --- a/rs/moq-data/src/set.rs +++ b/rs/moq-data/src/set.rs @@ -62,18 +62,33 @@ pub type Result = std::result::Result; /// Two items are the same set member iff they're equal under [`Eq`]/[`Hash`], so distinct items must /// encode to distinct bytes. pub trait Item: Clone + Eq + Hash { - /// Encode the item to its wire bytes. - fn encode(&self) -> Bytes; + /// The number of bytes [`encode`](Item::encode) writes. + /// + /// Read up front to length-prefix the item in a snapshot, so it must equal the number of bytes + /// `encode` goes on to write. + fn size(&self) -> usize; + + /// Encode the item's bytes directly into `buf`, writing exactly [`size`](Item::size) bytes. + /// + /// Writing into the frame buffer (rather than returning a fresh `Bytes`) keeps a string or byte + /// vector to a single copy. + fn encode(&self, buf: &mut B); - /// Decode an item from wire bytes. + /// Decode an item from its wire bytes. + /// + /// Takes [`Bytes`] so an implementation can keep a zero-copy slice of the frame if it wants one. fn decode(bytes: Bytes) -> Result where Self: Sized; } impl Item for String { - fn encode(&self) -> Bytes { - Bytes::copy_from_slice(self.as_bytes()) + fn size(&self) -> usize { + self.len() + } + + fn encode(&self, buf: &mut B) { + buf.put_slice(self.as_bytes()); } fn decode(bytes: Bytes) -> Result { @@ -82,8 +97,12 @@ impl Item for String { } impl Item for Vec { - fn encode(&self) -> Bytes { - Bytes::copy_from_slice(self) + fn size(&self) -> usize { + self.len() + } + + fn encode(&self, buf: &mut B) { + buf.put_slice(self); } fn decode(bytes: Bytes) -> Result { @@ -92,8 +111,12 @@ impl Item for Vec { } impl Item for Bytes { - fn encode(&self) -> Bytes { - self.clone() + fn size(&self) -> usize { + self.len() + } + + fn encode(&self, buf: &mut B) { + buf.put_slice(self); } fn decode(bytes: Bytes) -> Result { @@ -154,10 +177,10 @@ impl Producer { return Ok(false); } - // Encode while we still own the item, then move it into the set so the snapshot reflects it. - let bytes = item.encode(); + // Build the delta from a reference, then move the item into the set so a snapshot reflects it. + let delta = encode_delta(INSERT, &item); self.current.insert(item); - self.publish(INSERT, bytes)?; + self.publish(delta)?; Ok(true) } @@ -171,8 +194,8 @@ impl Producer { let Some(removed) = self.current.take(item) else { return Ok(false); }; - let bytes = removed.encode(); - self.publish(REMOVE, bytes)?; + let delta = encode_delta(REMOVE, &removed); + self.publish(delta)?; Ok(true) } @@ -214,17 +237,18 @@ impl Producer { Ok(()) } - /// Publish a single change, either as a delta on the open group or a fresh snapshot group. The - /// change is already reflected in `self.current`, so a snapshot here captures it. - fn publish(&mut self, op: u8, item: Bytes) -> Result<()> { + /// Publish a single change, either as the prebuilt `delta` frame on the open group or a fresh + /// snapshot group. The change is already reflected in `self.current`, so a snapshot captures it + /// and the `delta` is discarded. + fn publish(&mut self, delta: Bytes) -> Result<()> { let snapshot = encode_snapshot(&self.current)?; - let delta_len = 1 + item.len() as u64; + let delta_len = delta.len() as u64; if self.should_snapshot(delta_len, snapshot.len() as u64) { self.write_snapshot(snapshot) } else { let group = self.group.as_mut().expect("delta requires an open group"); - group.write_frame(encode_delta(op, &item))?; + group.write_frame(delta)?; self.group_frames += 1; self.group_delta_bytes += delta_len; Ok(()) @@ -361,10 +385,9 @@ fn encode_snapshot(set: &HashSet) -> Result { let mut buf = BytesMut::new(); buf.put_u32(count); for item in set { - let bytes = item.encode(); - let len = u32::try_from(bytes.len()).map_err(|_| Error::Malformed("item is too large".into()))?; + let len = u32::try_from(item.size()).map_err(|_| Error::Malformed("item is too large".into()))?; buf.put_u32(len); - buf.put_slice(&bytes); + item.encode(&mut buf); } Ok(buf.freeze()) } @@ -402,10 +425,10 @@ fn decode_snapshot(mut frame: Bytes) -> Result> { } /// Encode a delta frame: one op byte followed by the item bytes. -fn encode_delta(op: u8, item: &Bytes) -> Bytes { - let mut buf = BytesMut::with_capacity(1 + item.len()); +fn encode_delta(op: u8, item: &T) -> Bytes { + let mut buf = BytesMut::with_capacity(1 + item.size()); buf.put_u8(op); - buf.put_slice(item); + item.encode(&mut buf); buf.freeze() } @@ -570,4 +593,51 @@ mod test { let expected: HashSet> = [vec![0x00, 0xff, 0x42], vec![0x01]].into_iter().collect(); assert_eq!(last.unwrap(), expected); } + + #[test] + fn custom_item_roundtrips() { + // A user type that encodes itself directly into the frame buffer, no intermediate `Bytes`. + #[derive(Clone, PartialEq, Eq, Hash, Debug)] + struct Point { + x: u16, + y: u16, + } + + impl Item for Point { + fn size(&self) -> usize { + 4 + } + + fn encode(&self, buf: &mut B) { + buf.put_u16(self.x); + buf.put_u16(self.y); + } + + fn decode(mut bytes: Bytes) -> Result { + if bytes.remaining() != 4 { + return Err(Error::Item("point must be 4 bytes".into())); + } + Ok(Point { + x: bytes.get_u16(), + y: bytes.get_u16(), + }) + } + } + + let track = moq_net::Track::new("test").produce(); + let sub = track.consume(); + let mut producer = Producer::::new(track, Config::default()); + producer.insert(Point { x: 1, y: 2 }).unwrap(); + producer.insert(Point { x: 3, y: 4 }).unwrap(); + producer.remove(&Point { x: 1, y: 2 }).unwrap(); + producer.finish().unwrap(); + + let mut consumer = Consumer::::new(sub); + let waiter = kio::Waiter::noop(); + let mut last = None; + while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) { + last = Some(value); + } + assert_eq!(last.unwrap(), [Point { x: 3, y: 4 }].into_iter().collect()); + } } From 604dcc7c61511386306112c125bc0fab568c816f Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:19:43 -0700 Subject: [PATCH 4/9] refactor(moq-data): decode set items through bytes::Buf Make `set::Item::decode` take `&mut impl Buf` instead of an owned `Bytes`, mirroring the `BufMut`-based `encode`. A custom item can now read its fields straight off the buffer (`get_u16`, ...), and `buf.copy_to_bytes(remaining)` stays zero-copy on the `Bytes`-backed frame for items that want the raw slice. The caller hands `decode` a buffer holding exactly the item's bytes (the snapshot loop splits each item off; a delta is the rest of the frame), so the wire format is unchanged and JS interop is unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-data/README.md | 2 +- rs/moq-data/src/set.rs | 32 +++++++++++++++++--------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/rs/moq-data/README.md b/rs/moq-data/README.md index f221266ec..965bb8f9a 100644 --- a/rs/moq-data/README.md +++ b/rs/moq-data/README.md @@ -34,7 +34,7 @@ while let Some(names) = consumer.next().await? { Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. -Items are arbitrary binary data: implement the `set::Item` trait for any type. It encodes straight into the frame's `bytes::BufMut` (one copy, no intermediate buffer) and decodes from a `bytes::Bytes` slice. `String`, `Vec`, and `bytes::Bytes` are supported out of the box. +Items are arbitrary binary data: implement the `set::Item` trait for any type. It encodes straight into the frame's `bytes::BufMut` and decodes from a `bytes::Buf`, both zero-copy (`copy_to_bytes` hands back a slice of the frame). `String`, `Vec`, and `bytes::Bytes` are supported out of the box. Deltas are on by default (`Config { delta_ratio: Some(2.0) }`); a delta is appended while the group stays within `delta_ratio` times the size of a fresh snapshot, otherwise a new snapshot group is started. Set `delta_ratio: None` to publish a full snapshot per change. diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs index 458c925a4..9fa958c0d 100644 --- a/rs/moq-data/src/set.rs +++ b/rs/moq-data/src/set.rs @@ -74,10 +74,11 @@ pub trait Item: Clone + Eq + Hash { /// vector to a single copy. fn encode(&self, buf: &mut B); - /// Decode an item from its wire bytes. + /// Decode an item from `buf`, which holds exactly this item's bytes. /// - /// Takes [`Bytes`] so an implementation can keep a zero-copy slice of the frame if it wants one. - fn decode(bytes: Bytes) -> Result + /// Read straight from the [`Buf`]: scalar getters (`get_u16`, ...) read in place, and + /// `buf.copy_to_bytes(buf.remaining())` hands back a zero-copy [`Bytes`] slice of the frame. + fn decode(buf: &mut B) -> Result where Self: Sized; } @@ -91,7 +92,8 @@ impl Item for String { buf.put_slice(self.as_bytes()); } - fn decode(bytes: Bytes) -> Result { + fn decode(buf: &mut B) -> Result { + let bytes = buf.copy_to_bytes(buf.remaining()); String::from_utf8(bytes.into()).map_err(|err| Error::Item(err.to_string())) } } @@ -105,8 +107,8 @@ impl Item for Vec { buf.put_slice(self); } - fn decode(bytes: Bytes) -> Result { - Ok(bytes.into()) + fn decode(buf: &mut B) -> Result { + Ok(buf.copy_to_bytes(buf.remaining()).into()) } } @@ -119,8 +121,8 @@ impl Item for Bytes { buf.put_slice(self); } - fn decode(bytes: Bytes) -> Result { - Ok(bytes) + fn decode(buf: &mut B) -> Result { + Ok(buf.copy_to_bytes(buf.remaining())) } } @@ -358,8 +360,8 @@ impl Consumer { if self.frames_read == 0 { self.current = decode_snapshot(frame)?; } else { - let (op, item) = decode_delta(frame)?; - let item = T::decode(item)?; + let (op, mut item) = decode_delta(frame)?; + let item = T::decode(&mut item)?; match op { INSERT => { self.current.insert(item); @@ -414,7 +416,7 @@ fn decode_snapshot(mut frame: Bytes) -> Result> { if frame.remaining() < len { return Err(Error::Malformed("snapshot item runs past end of frame".into())); } - set.insert(T::decode(frame.split_to(len))?); + set.insert(T::decode(&mut frame.split_to(len))?); } if frame.has_remaining() { @@ -613,13 +615,13 @@ mod test { buf.put_u16(self.y); } - fn decode(mut bytes: Bytes) -> Result { - if bytes.remaining() != 4 { + fn decode(buf: &mut B) -> Result { + if buf.remaining() != 4 { return Err(Error::Item("point must be 4 bytes".into())); } Ok(Point { - x: bytes.get_u16(), - y: bytes.get_u16(), + x: buf.get_u16(), + y: buf.get_u16(), }) } } From 85f9d36888bc2cec61d66c962ccfb7463516061e Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:30:24 -0700 Subject: [PATCH 5/9] perf(moq-data): write set frames straight into the FrameProducer Encode snapshots and deltas directly into the frame via `GroupProducer:: create_frame` + the `FrameProducer` `BufMut`, instead of building an intermediate `BytesMut` and handing it to `write_frame`. A frame is a single pre-sized buffer and `write_frame` memcpys into it, so the old path copied each item twice (item -> BytesMut -> FrameBuf); now it's one copy (item -> FrameBuf). This is what `Item::size()` enables: `create_frame` needs the total frame size up front, and summing `size()` gives it with no scratch buffer. `should_snapshot` now sizes the snapshot arithmetically rather than building it just to measure. `insert`/`remove` pick snapshot-vs-delta from sizes while they still hold the item reference, so a delta is written straight from `&item` with no clone. Wire format is unchanged, so JS interop is unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-data/src/set.rs | 137 ++++++++++++++++++++++++----------------- 1 file changed, 81 insertions(+), 56 deletions(-) diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs index 9fa958c0d..097a08e28 100644 --- a/rs/moq-data/src/set.rs +++ b/rs/moq-data/src/set.rs @@ -23,7 +23,7 @@ use std::collections::HashSet; use std::hash::Hash; use std::task::Poll; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes}; /// One-byte op prefixing an insert delta frame. const INSERT: u8 = b'+'; @@ -58,9 +58,9 @@ pub type Result = std::result::Result; /// An item that can be stored in a [`Set`](Producer). /// -/// Encoding must be deterministic and round-trip: `Item::decode(item.encode())` must equal `item`. -/// Two items are the same set member iff they're equal under [`Eq`]/[`Hash`], so distinct items must -/// encode to distinct bytes. +/// Encoding must be deterministic and round-trip: decoding what [`encode`](Item::encode) wrote must +/// reproduce `item`. Two items are the same set member iff they're equal under [`Eq`]/[`Hash`], so +/// distinct items must encode to distinct bytes. pub trait Item: Clone + Eq + Hash { /// The number of bytes [`encode`](Item::encode) writes. /// @@ -179,10 +179,18 @@ impl Producer { return Ok(false); } - // Build the delta from a reference, then move the item into the set so a snapshot reflects it. - let delta = encode_delta(INSERT, &item); - self.current.insert(item); - self.publish(delta)?; + let delta_size = 1 + item.size() as u64; + // The snapshot size once `item` is included, computed without inserting so we keep `&item`. + let snapshot_size = self.snapshot_size() + 4 + item.size() as u64; + + if self.should_snapshot(delta_size, snapshot_size) { + self.current.insert(item); + self.write_snapshot()?; + } else { + // Write the delta straight from the reference, then move the item into the set. + self.write_delta(INSERT, &item)?; + self.current.insert(item); + } Ok(true) } @@ -196,8 +204,16 @@ impl Producer { let Some(removed) = self.current.take(item) else { return Ok(false); }; - let delta = encode_delta(REMOVE, &removed); - self.publish(delta)?; + + let delta_size = 1 + removed.size() as u64; + // `current` already reflects the removal. + let snapshot_size = self.snapshot_size(); + + if self.should_snapshot(delta_size, snapshot_size) { + self.write_snapshot()?; + } else { + self.write_delta(REMOVE, &removed)?; + } Ok(true) } @@ -239,25 +255,13 @@ impl Producer { Ok(()) } - /// Publish a single change, either as the prebuilt `delta` frame on the open group or a fresh - /// snapshot group. The change is already reflected in `self.current`, so a snapshot captures it - /// and the `delta` is discarded. - fn publish(&mut self, delta: Bytes) -> Result<()> { - let snapshot = encode_snapshot(&self.current)?; - let delta_len = delta.len() as u64; - - if self.should_snapshot(delta_len, snapshot.len() as u64) { - self.write_snapshot(snapshot) - } else { - let group = self.group.as_mut().expect("delta requires an open group"); - group.write_frame(delta)?; - self.group_frames += 1; - self.group_delta_bytes += delta_len; - Ok(()) - } + /// The byte size of a full snapshot of the current set: a `u32` count plus each item + /// length-prefixed. Computed from [`Item::size`] so we can size a frame without a scratch buffer. + fn snapshot_size(&self) -> u64 { + 4 + self.current.iter().map(|item| 4 + item.size() as u64).sum::() } - fn should_snapshot(&self, delta_len: u64, snapshot_len: u64) -> bool { + fn should_snapshot(&self, delta_size: u64, snapshot_size: u64) -> bool { let Some(ratio) = self.config.delta_ratio else { return true; }; @@ -265,17 +269,44 @@ impl Producer { return true; } // Roll a snapshot once the replayed deltas would outgrow the budget relative to a snapshot. - (self.group_delta_bytes + delta_len) as f64 > ratio * snapshot_len as f64 + (self.group_delta_bytes + delta_size) as f64 > ratio * snapshot_size as f64 } - fn write_snapshot(&mut self, snapshot: Bytes) -> Result<()> { + /// Append a `+`/`-` delta frame for one item to the open group, encoding straight into the frame. + fn write_delta(&mut self, op: u8, item: &T) -> Result<()> { + let size = 1 + item.size() as u64; + let group = self.group.as_mut().expect("delta requires an open group"); + + let mut frame = group.create_frame(size.into())?; + frame.put_u8(op); + item.encode(&mut frame); + frame.finish()?; + + self.group_frames += 1; + self.group_delta_bytes += size; + Ok(()) + } + + /// Start a new group whose first frame is a full snapshot of the current set, encoding straight + /// into the frame so each item is copied just once. + fn write_snapshot(&mut self) -> Result<()> { // The previous group is complete; no more frames will be appended to it. if let Some(mut group) = self.group.take() { group.finish()?; } + let count = u32::try_from(self.current.len()).map_err(|_| Error::Malformed("set has too many items".into()))?; let mut group = self.track.append_group()?; - group.write_frame(snapshot)?; + + let mut frame = group.create_frame(self.snapshot_size().into())?; + frame.put_u32(count); + for item in &self.current { + let len = u32::try_from(item.size()).map_err(|_| Error::Malformed("item is too large".into()))?; + frame.put_u32(len); + item.encode(&mut frame); + } + frame.finish()?; + self.group_frames = 1; self.group_delta_bytes = 0; @@ -377,23 +408,10 @@ impl Consumer { } } -/// Encode the full set as a snapshot frame: a `u32` count then each item `u32`-length-prefixed. +/// Decode a snapshot frame: a `u32` count then each item `u32`-length-prefixed. /// /// Lengths are big-endian `u32` rather than QUIC varints so the format stays self-contained and /// trivially matches the JS implementation (`@moq/data`). -fn encode_snapshot(set: &HashSet) -> Result { - let count = u32::try_from(set.len()).map_err(|_| Error::Malformed("set has too many items".into()))?; - - let mut buf = BytesMut::new(); - buf.put_u32(count); - for item in set { - let len = u32::try_from(item.size()).map_err(|_| Error::Malformed("item is too large".into()))?; - buf.put_u32(len); - item.encode(&mut buf); - } - Ok(buf.freeze()) -} - fn decode_snapshot(mut frame: Bytes) -> Result> { if frame.remaining() < 4 { return Err(Error::Malformed("snapshot is missing its count".into())); @@ -426,14 +444,7 @@ fn decode_snapshot(mut frame: Bytes) -> Result> { Ok(set) } -/// Encode a delta frame: one op byte followed by the item bytes. -fn encode_delta(op: u8, item: &T) -> Bytes { - let mut buf = BytesMut::with_capacity(1 + item.size()); - buf.put_u8(op); - item.encode(&mut buf); - buf.freeze() -} - +/// Decode a delta frame: one op byte followed by the item bytes. fn decode_delta(mut frame: Bytes) -> Result<(u8, Bytes)> { if !frame.has_remaining() { return Err(Error::Malformed("empty delta frame".into())); @@ -467,17 +478,31 @@ mod test { out } + /// Build a snapshot frame for a set of strings, independent of the producer's encoder, as a + /// decode oracle. + fn snapshot_bytes(items: &[&str]) -> Vec { + let mut buf = Vec::new(); + buf.extend_from_slice(&(items.len() as u32).to_be_bytes()); + for item in items { + buf.extend_from_slice(&(item.len() as u32).to_be_bytes()); + buf.extend_from_slice(item.as_bytes()); + } + buf + } + #[test] fn snapshot_roundtrip() { - let original = set(&["video", "audio", "captions"]); - let frame = encode_snapshot(&original).unwrap(); - assert_eq!(decode_snapshot::(frame).unwrap(), original); + let frame = Bytes::from(snapshot_bytes(&["video", "audio", "captions"])); + assert_eq!( + decode_snapshot::(frame).unwrap(), + set(&["video", "audio", "captions"]) + ); } #[test] fn malformed_snapshot_is_rejected() { // Trailing bytes past the declared items. - let mut frame = encode_snapshot(&set(&["video"])).unwrap().to_vec(); + let mut frame = snapshot_bytes(&["video"]); frame.push(0xff); assert!(decode_snapshot::(Bytes::from(frame)).is_err()); From 38c71404bae0805c42676fc0dd1f2b902ba8e54a Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:47:26 -0700 Subject: [PATCH 6/9] refactor(moq-data): replace Item::size with an optional encode_size Drop the required `size()` method from `set::Item`. Computing a frame's size up front no longer needs a dedicated trait method: the new `encode_size` defaults to running `encode` against a counting `BufMut` (moq-net's `Sizer`), so a custom item only has to implement `encode`/`decode`. Items whose length is known directly (`String`, `Vec`, `Bytes`) override `encode_size` to return `self.len()`. Export `Sizer` from moq-net (it was already `pub` inside the private `coding` module). This is a Rust-only encoding helper, no wire or JS change, so no cross-package sync is needed. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-data/src/set.rs | 59 +++++++++++++++++++++++------------------- rs/moq-net/src/lib.rs | 2 +- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs index 097a08e28..35951e7b9 100644 --- a/rs/moq-data/src/set.rs +++ b/rs/moq-data/src/set.rs @@ -62,13 +62,7 @@ pub type Result = std::result::Result; /// reproduce `item`. Two items are the same set member iff they're equal under [`Eq`]/[`Hash`], so /// distinct items must encode to distinct bytes. pub trait Item: Clone + Eq + Hash { - /// The number of bytes [`encode`](Item::encode) writes. - /// - /// Read up front to length-prefix the item in a snapshot, so it must equal the number of bytes - /// `encode` goes on to write. - fn size(&self) -> usize; - - /// Encode the item's bytes directly into `buf`, writing exactly [`size`](Item::size) bytes. + /// Encode the item's bytes directly into `buf`. /// /// Writing into the frame buffer (rather than returning a fresh `Bytes`) keeps a string or byte /// vector to a single copy. @@ -81,13 +75,19 @@ pub trait Item: Clone + Eq + Hash { fn decode(buf: &mut B) -> Result where Self: Sized; -} -impl Item for String { - fn size(&self) -> usize { - self.len() + /// The number of bytes [`encode`](Item::encode) writes, used to size a frame up front. + /// + /// The default runs `encode` against a counting [`Sizer`](moq_net::Sizer) (no allocation, no + /// copy). Override it when the length is known directly, e.g. `self.len()`. + fn encode_size(&self) -> usize { + let mut sizer = moq_net::Sizer::default(); + self.encode(&mut sizer); + sizer.size } +} +impl Item for String { fn encode(&self, buf: &mut B) { buf.put_slice(self.as_bytes()); } @@ -96,13 +96,13 @@ impl Item for String { let bytes = buf.copy_to_bytes(buf.remaining()); String::from_utf8(bytes.into()).map_err(|err| Error::Item(err.to_string())) } -} -impl Item for Vec { - fn size(&self) -> usize { + fn encode_size(&self) -> usize { self.len() } +} +impl Item for Vec { fn encode(&self, buf: &mut B) { buf.put_slice(self); } @@ -110,13 +110,13 @@ impl Item for Vec { fn decode(buf: &mut B) -> Result { Ok(buf.copy_to_bytes(buf.remaining()).into()) } -} -impl Item for Bytes { - fn size(&self) -> usize { + fn encode_size(&self) -> usize { self.len() } +} +impl Item for Bytes { fn encode(&self, buf: &mut B) { buf.put_slice(self); } @@ -124,6 +124,10 @@ impl Item for Bytes { fn decode(buf: &mut B) -> Result { Ok(buf.copy_to_bytes(buf.remaining())) } + + fn encode_size(&self) -> usize { + self.len() + } } /// Configuration for a [`Producer`]. @@ -179,9 +183,9 @@ impl Producer { return Ok(false); } - let delta_size = 1 + item.size() as u64; + let delta_size = 1 + item.encode_size() as u64; // The snapshot size once `item` is included, computed without inserting so we keep `&item`. - let snapshot_size = self.snapshot_size() + 4 + item.size() as u64; + let snapshot_size = self.snapshot_size() + 4 + item.encode_size() as u64; if self.should_snapshot(delta_size, snapshot_size) { self.current.insert(item); @@ -205,7 +209,7 @@ impl Producer { return Ok(false); }; - let delta_size = 1 + removed.size() as u64; + let delta_size = 1 + removed.encode_size() as u64; // `current` already reflects the removal. let snapshot_size = self.snapshot_size(); @@ -258,7 +262,11 @@ impl Producer { /// The byte size of a full snapshot of the current set: a `u32` count plus each item /// length-prefixed. Computed from [`Item::size`] so we can size a frame without a scratch buffer. fn snapshot_size(&self) -> u64 { - 4 + self.current.iter().map(|item| 4 + item.size() as u64).sum::() + 4 + self + .current + .iter() + .map(|item| 4 + item.encode_size() as u64) + .sum::() } fn should_snapshot(&self, delta_size: u64, snapshot_size: u64) -> bool { @@ -274,7 +282,7 @@ impl Producer { /// Append a `+`/`-` delta frame for one item to the open group, encoding straight into the frame. fn write_delta(&mut self, op: u8, item: &T) -> Result<()> { - let size = 1 + item.size() as u64; + let size = 1 + item.encode_size() as u64; let group = self.group.as_mut().expect("delta requires an open group"); let mut frame = group.create_frame(size.into())?; @@ -301,7 +309,7 @@ impl Producer { let mut frame = group.create_frame(self.snapshot_size().into())?; frame.put_u32(count); for item in &self.current { - let len = u32::try_from(item.size()).map_err(|_| Error::Malformed("item is too large".into()))?; + let len = u32::try_from(item.encode_size()).map_err(|_| Error::Malformed("item is too large".into()))?; frame.put_u32(len); item.encode(&mut frame); } @@ -630,11 +638,8 @@ mod test { y: u16, } + // No `encode_size` override: the default counts the bytes via a `Sizer`. impl Item for Point { - fn size(&self) -> usize { - 4 - } - fn encode(&self, buf: &mut B) { buf.put_u16(self.x); buf.put_u16(self.y); diff --git a/rs/moq-net/src/lib.rs b/rs/moq-net/src/lib.rs index 5eadaabda..9cc0ee3d1 100644 --- a/rs/moq-net/src/lib.rs +++ b/rs/moq-net/src/lib.rs @@ -62,7 +62,7 @@ mod stats; mod version; pub use client::*; -pub use coding::{BoundsExceeded, DecodeError, EncodeError}; +pub use coding::{BoundsExceeded, DecodeError, EncodeError, Sizer}; pub use error::*; pub use model::*; pub use path::*; From 899e058dde002578ac73bf58fe12dd097ec8d06e Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:51:26 -0700 Subject: [PATCH 7/9] refactor(moq-data): vendor the Sizer instead of exporting it from moq-net Revert the `moq_net::Sizer` re-export and copy the counting `BufMut` into moq-data as a private `sizer` module instead, so the encoding helper doesn't widen moq-net's public surface. `Item::encode_size`'s default uses the local copy. No behavior or wire change. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-data/src/lib.rs | 3 + rs/moq-data/src/set.rs | 6 +- rs/moq-data/src/sizer.rs | 208 +++++++++++++++++++++++++++++++++++++++ rs/moq-net/src/lib.rs | 2 +- 4 files changed, 215 insertions(+), 4 deletions(-) create mode 100644 rs/moq-data/src/sizer.rs diff --git a/rs/moq-data/src/lib.rs b/rs/moq-data/src/lib.rs index fd238085a..e3163511b 100644 --- a/rs/moq-data/src/lib.rs +++ b/rs/moq-data/src/lib.rs @@ -13,3 +13,6 @@ pub use moq_json as json; #[cfg(feature = "set")] pub mod set; + +#[cfg(feature = "set")] +mod sizer; diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs index 35951e7b9..6306afd63 100644 --- a/rs/moq-data/src/set.rs +++ b/rs/moq-data/src/set.rs @@ -78,10 +78,10 @@ pub trait Item: Clone + Eq + Hash { /// The number of bytes [`encode`](Item::encode) writes, used to size a frame up front. /// - /// The default runs `encode` against a counting [`Sizer`](moq_net::Sizer) (no allocation, no - /// copy). Override it when the length is known directly, e.g. `self.len()`. + /// The default runs `encode` against a counting `BufMut` (no allocation, no copy). Override it + /// when the length is known directly, e.g. `self.len()`. fn encode_size(&self) -> usize { - let mut sizer = moq_net::Sizer::default(); + let mut sizer = crate::sizer::Sizer::default(); self.encode(&mut sizer); sizer.size } diff --git a/rs/moq-data/src/sizer.rs b/rs/moq-data/src/sizer.rs new file mode 100644 index 000000000..78d74226d --- /dev/null +++ b/rs/moq-data/src/sizer.rs @@ -0,0 +1,208 @@ +use std::mem::MaybeUninit; + +use bytes::{Buf, BufMut, buf::UninitSlice}; + +/// A [`BufMut`] that counts bytes instead of writing them. +/// +/// Lets us measure how many bytes an `Item` will encode (to size a frame up front) by running its +/// encoder against a throwaway target, with no allocation and no copy. +#[derive(Default)] +pub(crate) struct Sizer { + pub size: usize, +} + +unsafe impl BufMut for Sizer { + unsafe fn advance_mut(&mut self, cnt: usize) { + self.size += cnt; + } + + fn chunk_mut(&mut self) -> &mut UninitSlice { + // We need to return a valid slice, but it won't actually be written to + // Use a thread-local static buffer to avoid safety issues + thread_local! { + static BUFFER: std::cell::UnsafeCell<[MaybeUninit; 8192]> = + const { std::cell::UnsafeCell::new([MaybeUninit::uninit(); 8192]) }; + } + + BUFFER.with(|buf| { + let ptr = buf.get(); + unsafe { + let slice = (*ptr).as_mut_ptr(); + bytes::buf::UninitSlice::from_raw_parts_mut(slice as *mut u8, 8192) + } + }) + } + + fn remaining_mut(&self) -> usize { + usize::MAX + } + + fn has_remaining_mut(&self) -> bool { + true + } + + fn put(&mut self, mut src: T) { + self.size += src.remaining(); + src.advance(src.remaining()); + } + + fn put_bytes(&mut self, _val: u8, cnt: usize) { + self.size += cnt; + } + + fn put_f32(&mut self, _val: f32) { + self.size += 4; + } + + fn put_f32_le(&mut self, _: f32) { + self.size += 4 + } + + fn put_f32_ne(&mut self, _: f32) { + self.size += 4 + } + + fn put_f64(&mut self, _: f64) { + self.size += 8 + } + + fn put_f64_le(&mut self, _: f64) { + self.size += 8 + } + + fn put_f64_ne(&mut self, _: f64) { + self.size += 8 + } + + fn put_i128(&mut self, _: i128) { + self.size += 16 + } + + fn put_i128_le(&mut self, _: i128) { + self.size += 16 + } + + fn put_i128_ne(&mut self, _: i128) { + self.size += 16 + } + + fn put_i16(&mut self, _: i16) { + self.size += 2 + } + + fn put_i16_le(&mut self, _: i16) { + self.size += 2 + } + + fn put_i16_ne(&mut self, _: i16) { + self.size += 2 + } + + fn put_i32(&mut self, _: i32) { + self.size += 4 + } + + fn put_i32_le(&mut self, _: i32) { + self.size += 4 + } + + fn put_i32_ne(&mut self, _: i32) { + self.size += 4 + } + + fn put_i64(&mut self, _: i64) { + self.size += 8 + } + + fn put_i64_le(&mut self, _: i64) { + self.size += 8 + } + + fn put_i64_ne(&mut self, _: i64) { + self.size += 8 + } + + fn put_i8(&mut self, _: i8) { + self.size += 1 + } + + fn put_int(&mut self, _: i64, nbytes: usize) { + self.size += nbytes + } + + fn put_int_le(&mut self, _: i64, nbytes: usize) { + self.size += nbytes + } + + fn put_int_ne(&mut self, _: i64, nbytes: usize) { + self.size += nbytes + } + + fn put_slice(&mut self, src: &[u8]) { + self.size += src.len(); + } + + fn put_u128(&mut self, _: u128) { + self.size += 16 + } + + fn put_u128_le(&mut self, _: u128) { + self.size += 16 + } + + fn put_u128_ne(&mut self, _: u128) { + self.size += 16 + } + + fn put_u16(&mut self, _: u16) { + self.size += 2 + } + + fn put_u16_le(&mut self, _: u16) { + self.size += 2 + } + + fn put_u16_ne(&mut self, _: u16) { + self.size += 2 + } + + fn put_u32(&mut self, _: u32) { + self.size += 4 + } + + fn put_u32_le(&mut self, _: u32) { + self.size += 4 + } + + fn put_u32_ne(&mut self, _: u32) { + self.size += 4 + } + + fn put_u64(&mut self, _: u64) { + self.size += 8 + } + + fn put_u64_le(&mut self, _: u64) { + self.size += 8 + } + + fn put_u64_ne(&mut self, _: u64) { + self.size += 8 + } + + fn put_u8(&mut self, _: u8) { + self.size += 1 + } + + fn put_uint(&mut self, _: u64, nbytes: usize) { + self.size += nbytes + } + + fn put_uint_le(&mut self, _: u64, nbytes: usize) { + self.size += nbytes + } + + fn put_uint_ne(&mut self, _: u64, nbytes: usize) { + self.size += nbytes + } +} diff --git a/rs/moq-net/src/lib.rs b/rs/moq-net/src/lib.rs index 9cc0ee3d1..5eadaabda 100644 --- a/rs/moq-net/src/lib.rs +++ b/rs/moq-net/src/lib.rs @@ -62,7 +62,7 @@ mod stats; mod version; pub use client::*; -pub use coding::{BoundsExceeded, DecodeError, EncodeError, Sizer}; +pub use coding::{BoundsExceeded, DecodeError, EncodeError}; pub use error::*; pub use model::*; pub use path::*; From 09f5ea24f9777817a30c976162afb4311102787b Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:57:38 -0700 Subject: [PATCH 8/9] fix(moq-data): keep the producer's view consistent on publish failure `insert`/`remove` mutated `self.current` before the frame write succeeded, so a failed publish (e.g. the track was closed) left the local set disagreeing with what the track actually saw. Roll the change back on error: the snapshot path of `insert` removes the just-inserted item, and `remove` re-inserts the taken one. The delta paths already write before mutating. Adds a regression test. Also fix a stale doc reference (`Item::size` -> `Item::encode_size`). Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-data/src/set.rs | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs index 6306afd63..2752aa68a 100644 --- a/rs/moq-data/src/set.rs +++ b/rs/moq-data/src/set.rs @@ -188,8 +188,13 @@ impl Producer { let snapshot_size = self.snapshot_size() + 4 + item.encode_size() as u64; if self.should_snapshot(delta_size, snapshot_size) { - self.current.insert(item); - self.write_snapshot()?; + // A snapshot encodes the full set, so insert first; undo if the write fails so our view + // stays consistent with what the track actually saw. + self.current.insert(item.clone()); + if let Err(err) = self.write_snapshot() { + self.current.remove(&item); + return Err(err); + } } else { // Write the delta straight from the reference, then move the item into the set. self.write_delta(INSERT, &item)?; @@ -213,10 +218,15 @@ impl Producer { // `current` already reflects the removal. let snapshot_size = self.snapshot_size(); - if self.should_snapshot(delta_size, snapshot_size) { - self.write_snapshot()?; + let published = if self.should_snapshot(delta_size, snapshot_size) { + self.write_snapshot() } else { - self.write_delta(REMOVE, &removed)?; + self.write_delta(REMOVE, &removed) + }; + if let Err(err) = published { + // Restore the item so our view stays consistent with what the track actually saw. + self.current.insert(removed); + return Err(err); } Ok(true) } @@ -260,7 +270,8 @@ impl Producer { } /// The byte size of a full snapshot of the current set: a `u32` count plus each item - /// length-prefixed. Computed from [`Item::size`] so we can size a frame without a scratch buffer. + /// length-prefixed. Computed from [`Item::encode_size`] so we can size a frame without a scratch + /// buffer. fn snapshot_size(&self) -> u64 { 4 + self .current @@ -608,6 +619,22 @@ mod test { assert_eq!(drain(track).last().unwrap().len(), MAX_DELTA_FRAMES + 1); } + #[test] + fn failed_publish_preserves_view() { + let track = moq_net::Track::new("test").produce(); + let mut producer = Producer::::new(track, Config::default()); + producer.insert("video".into()).unwrap(); + producer.finish().unwrap(); + + // The track is finished, so publishing fails. The local view must not record the change, + // otherwise it would disagree with what the track actually saw. + assert!(producer.insert("audio".into()).is_err()); + assert!(!producer.contains("audio")); + + assert!(producer.remove("video").is_err()); + assert!(producer.contains("video")); + } + #[test] fn binary_items_roundtrip() { let track = moq_net::Track::new("test").produce(); From 079104ee1807ab0e5cbf86acd66bf4deb9c94f3f Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 14:52:53 -0700 Subject: [PATCH 9/9] feat(moq-data): report set changes as added/removed, not just the full set The set consumer now yields an `Update { added, removed }` per change instead of the whole reconstructed set, so a watcher (e.g. tracks.set) can react to individual items appearing and leaving. The full set is still available via the new `Consumer::current()`. A delta maps to a single add or remove. A snapshot is diffed against the current set (the consumer no longer clears state when switching groups), so a snapshot-only stream still produces per-item changes and a group roll never re-reports the whole set. No-op frames are skipped, so an `Update` is never empty. Same change applied to `@moq/data`. Wire format unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- js/data/README.md | 12 +- js/data/src/set/consumer.ts | 94 ++++++++++----- js/data/src/set/index.ts | 2 +- js/data/src/set/set.test.ts | 40 ++++++- rs/moq-data/README.md | 16 ++- rs/moq-data/src/set.rs | 223 ++++++++++++++++++++++++------------ 6 files changed, 271 insertions(+), 116 deletions(-) diff --git a/js/data/README.md b/js/data/README.md index a05a669e9..81f74ec6f 100644 --- a/js/data/README.md +++ b/js/data/README.md @@ -25,14 +25,16 @@ producer.insert("video"); producer.insert("audio"); producer.remove("audio"); -// Consume: yields the full set after each change. -const consumer = new Consumer(track.subscribe(), { codec: stringCodec }); -for await (const names of consumer) { - console.log(names); // Set +// Consume: each change is the items added and removed; `current()` is the full set. +const consumer = new Consumer(track, { codec: stringCodec }); +for await (const update of consumer) { + for (const name of update.added) console.log("track appeared:", name); + for (const name of update.removed) console.log("track left:", name); + console.log("now:", consumer.current()); // Set } ``` -Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. +Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. Each change is reported as the items it added and removed (a snapshot is diffed against the current set), so a watcher reacts to individual tracks appearing and leaving without comparing whole sets. Items are arbitrary binary data via a `Codec` (`encode`/`decode` to `Uint8Array`). `stringCodec` and `bytesCodec` are provided; supply your own for richer types. Items dedupe by their encoded bytes, so two values with the same encoding are the same member. diff --git a/js/data/src/set/consumer.ts b/js/data/src/set/consumer.ts index 8cb8c064c..a0ff287df 100644 --- a/js/data/src/set/consumer.ts +++ b/js/data/src/set/consumer.ts @@ -4,11 +4,23 @@ import type { Codec } from "./codec.ts"; import type { Config } from "./producer.ts"; import { decodeDelta, decodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts"; +/** + * The items added and removed by a single change, returned by {@link Consumer.next}. + * + * A delta carries one item in exactly one field. A snapshot (the first frame of a group, or a late + * joiner's first read) carries its difference from the previous state, so several items may be + * added and removed at once. + */ +export interface Update { + added: T[]; + removed: T[]; +} + /** * Consumes a set from a track, reconstructing it from snapshots and deltas. * - * Reads each group's snapshot (frame 0) and applies the following frames as insert/remove deltas, - * yielding the reconstructed set after each one. + * Each change is reduced to the items it added and removed; the full set is available via + * {@link current}. */ export class Consumer { #track: Moq.Track; @@ -24,14 +36,24 @@ export class Consumer { this.#codec = config.codec; } - /** Get the set after the next change, or undefined once the track ends. */ - async next(): Promise | undefined> { + /** The full set as currently reconstructed. Updated by each {@link next}. */ + current(): Set { + return new Set(this.#current.values()); + } + + /** + * Get the next change as added/removed items, or undefined once the track ends. + * + * Frames that change nothing are skipped, so a returned {@link Update} is never empty. Switching + * to a newer group diffs its snapshot against the current set, so no change is missed or doubled. + */ + async next(): Promise | undefined> { for (;;) { if (!this.#group) { - // Advance to the next group with a higher sequence number (skipping late arrivals). + // Advance to the next group with a higher sequence number (skipping late arrivals). We + // keep #current across the switch so the next snapshot diffs against it. this.#group = await this.#track.nextGroupOrdered(); if (!this.#group) return undefined; - this.#current = new Map(); this.#framesRead = 0; } @@ -42,37 +64,53 @@ export class Consumer { continue; } - this.#apply(frame); - return new Set(this.#current.values()); + const update = this.#apply(frame); + if (update.added.length > 0 || update.removed.length > 0) return update; + // A no-op frame (redundant snapshot or delta); read the next one. } } - async *[Symbol.asyncIterator](): AsyncIterator> { + async *[Symbol.asyncIterator](): AsyncIterator> { for (;;) { - const value = await this.next(); - if (value === undefined) return; - yield value; + const update = await this.next(); + if (update === undefined) return; + yield update; } } - // Frame 0 of a group is a snapshot, the rest are insert/remove deltas. - #apply(frame: Uint8Array): void { - if (this.#framesRead === 0) { - this.#current = new Map(); + // Apply one frame, returning what it changed: frame 0 of a group is a snapshot (diffed against + // the current set), the rest are insert/remove deltas. + #apply(frame: Uint8Array): Update { + this.#framesRead += 1; + + if (this.#framesRead === 1) { + const next = new Map(); for (const item of decodeSnapshot(frame)) { - this.#current.set(keyOf(item), this.#codec.decode(item)); - } - } else { - const [op, item] = decodeDelta(frame); - const key = keyOf(item); - if (op === INSERT) { - this.#current.set(key, this.#codec.decode(item)); - } else if (op === REMOVE) { - this.#current.delete(key); - } else { - throw new Error(`unknown op byte: ${op}`); + next.set(keyOf(item), this.#codec.decode(item)); } + + const added: T[] = []; + const removed: T[] = []; + for (const [key, value] of next) if (!this.#current.has(key)) added.push(value); + for (const [key, value] of this.#current) if (!next.has(key)) removed.push(value); + this.#current = next; + return { added, removed }; } - this.#framesRead += 1; + + const [op, item] = decodeDelta(frame); + const key = keyOf(item); + if (op === INSERT) { + if (this.#current.has(key)) return { added: [], removed: [] }; + const value = this.#codec.decode(item); + this.#current.set(key, value); + return { added: [value], removed: [] }; + } + if (op === REMOVE) { + const value = this.#current.get(key); + if (value === undefined) return { added: [], removed: [] }; + this.#current.delete(key); + return { added: [], removed: [value] }; + } + throw new Error(`unknown op byte: ${op}`); } } diff --git a/js/data/src/set/index.ts b/js/data/src/set/index.ts index a200f821a..b9d03551d 100644 --- a/js/data/src/set/index.ts +++ b/js/data/src/set/index.ts @@ -1,3 +1,3 @@ export { bytesCodec, type Codec, stringCodec } from "./codec.ts"; -export { Consumer } from "./consumer.ts"; +export { Consumer, type Update } from "./consumer.ts"; export { type Config, Producer } from "./producer.ts"; diff --git a/js/data/src/set/set.test.ts b/js/data/src/set/set.test.ts index 54dd6bfbb..121625fb5 100644 --- a/js/data/src/set/set.test.ts +++ b/js/data/src/set/set.test.ts @@ -5,10 +5,15 @@ import { stringCodec } from "./codec.ts"; import { Consumer } from "./consumer.ts"; import { Producer } from "./producer.ts"; -// Reconstruct every set a consumer yields, in order. Consumes the track's groups. +// Collect the full set after each change a consumer yields, in order. Consumes the track's groups. async function drain(track: Track): Promise[]> { const out: Set[] = []; - for await (const value of new Consumer(track, { codec: stringCodec })) out.push(value); + const consumer = new Consumer(track, { codec: stringCodec }); + for (;;) { + const update = await consumer.next(); + if (update === undefined) break; + out.push(consumer.current()); + } return out; } @@ -67,19 +72,42 @@ test("redundant insert and remove write nothing", async () => { expect(await structure(track)).toEqual([1]); }); -test("live consumer sees each change", async () => { +test("live consumer sees each change as added/removed", async () => { const track = new Track("test"); const producer = new Producer(track, { codec: stringCodec }); const consumer = new Consumer(track, { codec: stringCodec }); producer.insert("video"); - expect(await consumer.next()).toEqual(set("video")); + expect(await consumer.next()).toEqual({ added: ["video"], removed: [] }); producer.insert("audio"); - expect(await consumer.next()).toEqual(set("video", "audio")); + expect(await consumer.next()).toEqual({ added: ["audio"], removed: [] }); producer.remove("video"); - expect(await consumer.next()).toEqual(set("audio")); + expect(await consumer.next()).toEqual({ added: [], removed: ["video"] }); + + // The reconstructed set tracks the net result alongside the per-change deltas. + expect(consumer.current()).toEqual(set("audio")); + + producer.finish(); +}); + +test("snapshot-only stream still reports incremental changes", async () => { + const track = new Track("test"); + // A zero ratio rolls a fresh snapshot group on every change, so the consumer only sees + // snapshots, yet must still report each change as a single add or remove (diffed vs current). + const producer = new Producer(track, { codec: stringCodec, deltaRatio: 0 }); + const consumer = new Consumer(track, { codec: stringCodec }); + + producer.insert("a"); + expect(await consumer.next()).toEqual({ added: ["a"], removed: [] }); + + producer.insert("b"); + expect(await consumer.next()).toEqual({ added: ["b"], removed: [] }); + + producer.remove("a"); + expect(await consumer.next()).toEqual({ added: [], removed: ["a"] }); + expect(consumer.current()).toEqual(set("b")); producer.finish(); }); diff --git a/rs/moq-data/README.md b/rs/moq-data/README.md index 965bb8f9a..8deb7c65e 100644 --- a/rs/moq-data/README.md +++ b/rs/moq-data/README.md @@ -25,14 +25,20 @@ tracks.insert("video".to_string())?; tracks.insert("audio".to_string())?; tracks.remove("audio")?; -// Consume: yields the full set after each change. -let mut consumer = set::Consumer::::new(track.subscribe(None)); -while let Some(names) = consumer.next().await? { - println!("{names:?}"); +// Consume: each change is the items added and removed; `current()` is the full set. +let mut consumer = set::Consumer::::new(tracks.consume()); +while let Some(update) = consumer.next().await? { + for name in &update.added { + println!("track appeared: {name}"); + } + for name in &update.removed { + println!("track left: {name}"); + } + println!("now: {:?}", consumer.current()); } ``` -Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. +Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. Each change is reported as the items it added and removed (a snapshot is diffed against the current set), so a watcher reacts to individual tracks appearing and leaving without comparing whole sets. Items are arbitrary binary data: implement the `set::Item` trait for any type. It encodes straight into the frame's `bytes::BufMut` and decodes from a `bytes::Buf`, both zero-copy (`copy_to_bytes` hands back a slice of the frame). `String`, `Vec`, and `bytes::Bytes` are supported out of the box. diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs index 2752aa68a..da4d82766 100644 --- a/rs/moq-data/src/set.rs +++ b/rs/moq-data/src/set.rs @@ -341,6 +341,36 @@ impl Producer { } } +/// The items added and removed by a single change, returned by [`Consumer::next`]. +/// +/// A delta carries one item in exactly one of the fields. A snapshot (the first frame of a group, +/// or a late joiner's first read) carries its difference from the previous state, so several items +/// may be added and removed at once. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Update { + /// Items that joined the set. + pub added: Vec, + /// Items that left the set. + pub removed: Vec, +} + +// Manual impl so `T` needn't be `Default` (the derive would wrongly require it). +impl Default for Update { + fn default() -> Self { + Self { + added: Vec::new(), + removed: Vec::new(), + } + } +} + +impl Update { + /// Whether nothing changed. + pub fn is_empty(&self) -> bool { + self.added.is_empty() && self.removed.is_empty() + } +} + /// Consumes a set from a track, reconstructing it from snapshots and deltas. pub struct Consumer { track: moq_net::TrackConsumer, @@ -360,70 +390,96 @@ impl Consumer { } } - /// Get the set after the next change, or `None` once the track ends. - pub async fn next(&mut self) -> Result>> + /// The full set as currently reconstructed. Updated by each [`next`](Self::next). + pub fn current(&self) -> &HashSet { + &self.current + } + + /// Get the next change as added/removed items, or `None` once the track ends. + /// + /// Use [`current`](Self::current) afterward for the full set. + pub async fn next(&mut self) -> Result>> where T: Unpin, { kio::wait(|waiter| self.poll_next(waiter)).await } - /// Poll for the set after the next change, without blocking. + /// Poll for the next change, without blocking. /// - /// Jumps to the newest group, decodes its snapshot, and applies deltas in order, yielding the - /// reconstructed set after each frame. Switching to a newer group discards the older one. - pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll>>> { - // Drain to the newest group, resetting reconstruction state whenever we switch. - let track_finished = loop { - match self.track.poll_next_group(waiter)? { - Poll::Ready(Some(group)) => { - self.group = Some(group); - self.current.clear(); - self.frames_read = 0; + /// Jumps to the newest group, decodes its snapshot, and applies deltas in order. Each frame is + /// reduced to the items it added and removed; frames that change nothing are skipped, so a + /// returned [`Update`] is never empty. Switching to a newer group diffs its snapshot against the + /// current set, so no change is missed or duplicated. + pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll>>> { + loop { + // Drain to the newest group. We keep `current` across the switch so the next group's + // snapshot diffs against it, rather than re-reporting the whole set. + let track_finished = loop { + match self.track.poll_next_group(waiter)? { + Poll::Ready(Some(group)) => { + self.group = Some(group); + self.frames_read = 0; + } + Poll::Ready(None) => break true, + Poll::Pending => break false, } - Poll::Ready(None) => break true, - Poll::Pending => break false, - } - }; - - if let Some(group) = &mut self.group { - match group.poll_read_frame(waiter)? { - Poll::Ready(Some(frame)) => { - self.apply(frame)?; - return Poll::Ready(Ok(Some(self.current.clone()))); + }; + + if let Some(group) = &mut self.group { + match group.poll_read_frame(waiter)? { + Poll::Ready(Some(frame)) => { + let update = self.apply(frame)?; + if !update.is_empty() { + return Poll::Ready(Ok(Some(update))); + } + // A no-op frame (redundant snapshot or delta); read the next one. + continue; + } + // The current group is exhausted; look for a newer one. + Poll::Ready(None) => { + self.group = None; + continue; + } + Poll::Pending => return Poll::Pending, } - // The current group is exhausted; wait for a newer one. - Poll::Ready(None) => self.group = None, - Poll::Pending => return Poll::Pending, } - } - if track_finished { - Poll::Ready(Ok(None)) - } else { - Poll::Pending + return if track_finished { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + }; } } - /// Apply one frame: frame 0 of a group is a snapshot, the rest are `+`/`-` deltas. - fn apply(&mut self, frame: Bytes) -> Result<()> { - if self.frames_read == 0 { - self.current = decode_snapshot(frame)?; - } else { - let (op, mut item) = decode_delta(frame)?; - let item = T::decode(&mut item)?; - match op { - INSERT => { - self.current.insert(item); - } - REMOVE => { - self.current.remove(&item); - } - other => return Err(Error::Malformed(format!("unknown op byte: {other:#04x}"))), - } - } + /// Apply one frame, returning what it changed: frame 0 of a group is a snapshot (diffed against + /// the current set), the rest are `+`/`-` deltas. + fn apply(&mut self, frame: Bytes) -> Result> { self.frames_read += 1; - Ok(()) + + if self.frames_read == 1 { + let snapshot = decode_snapshot(frame)?; + let removed = self.current.difference(&snapshot).cloned().collect(); + let added = snapshot.difference(&self.current).cloned().collect(); + self.current = snapshot; + return Ok(Update { added, removed }); + } + + let (op, mut item) = decode_delta(frame)?; + let item = T::decode(&mut item)?; + Ok(match op { + INSERT if self.current.insert(item.clone()) => Update { + added: vec![item], + removed: Vec::new(), + }, + REMOVE if self.current.remove(&item) => Update { + added: Vec::new(), + removed: vec![item], + }, + INSERT | REMOVE => Update::default(), + other => return Err(Error::Malformed(format!("unknown op byte: {other:#04x}"))), + }) } } @@ -486,17 +542,25 @@ mod test { items.iter().map(|s| s.to_string()).collect() } - /// Reconstruct every set a consumer yields, in order. + /// Collect the full set after each change a consumer yields, in order. fn drain(track: moq_net::TrackConsumer) -> Vec> { let mut consumer = Consumer::::new(track); let waiter = kio::Waiter::noop(); let mut out = Vec::new(); - while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) { - out.push(value); + while let Poll::Ready(Ok(Some(_))) = consumer.poll_next(&waiter) { + out.push(consumer.current().clone()); } out } + /// The next non-empty update, panicking if one isn't ready. + fn next_update(consumer: &mut Consumer) -> Update { + match consumer.poll_next(&kio::Waiter::noop()) { + Poll::Ready(Ok(Some(update))) => update, + other => panic!("expected an update, got {other:?}"), + } + } + /// Build a snapshot frame for a set of strings, independent of the producer's encoder, as a /// decode oracle. fn snapshot_bytes(items: &[&str]) -> Vec { @@ -572,21 +636,42 @@ mod test { fn live_consumer_sees_each_change() { let (mut producer, track) = producer(Config::default()); let mut consumer = Consumer::::new(track); - let waiter = kio::Waiter::noop(); - - let next = |consumer: &mut Consumer| match consumer.poll_next(&waiter) { - Poll::Ready(Ok(Some(value))) => value, - other => panic!("expected a set, got {other:?}"), - }; producer.insert("video".into()).unwrap(); - assert_eq!(next(&mut consumer), set(&["video"])); + let update = next_update(&mut consumer); + assert_eq!(update.added, vec!["video".to_string()]); + assert!(update.removed.is_empty()); producer.insert("audio".into()).unwrap(); - assert_eq!(next(&mut consumer), set(&["video", "audio"])); + assert_eq!(next_update(&mut consumer).added, vec!["audio".to_string()]); producer.remove("video").unwrap(); - assert_eq!(next(&mut consumer), set(&["audio"])); + assert_eq!(next_update(&mut consumer).removed, vec!["video".to_string()]); + + // The reconstructed set tracks the net result alongside the per-change deltas. + assert_eq!(consumer.current(), &set(&["audio"])); + } + + #[test] + fn snapshot_diff_reports_incremental_changes() { + // A zero ratio rolls a fresh snapshot group on every change, so the consumer only ever sees + // snapshots, yet must still report each change as a single add or remove (diffed vs current). + let (mut producer, track) = producer(Config { delta_ratio: Some(0.0) }); + let mut consumer = Consumer::::new(track); + + producer.insert("a".into()).unwrap(); + assert_eq!(next_update(&mut consumer).added, vec!["a".to_string()]); + + producer.insert("b".into()).unwrap(); + let update = next_update(&mut consumer); + assert_eq!(update.added, vec!["b".to_string()]); + assert!(update.removed.is_empty()); + + producer.remove("a").unwrap(); + let update = next_update(&mut consumer); + assert_eq!(update.removed, vec!["a".to_string()]); + assert!(update.added.is_empty()); + assert_eq!(consumer.current(), &set(&["b"])); } #[test] @@ -647,13 +732,10 @@ mod test { let mut consumer = Consumer::>::new(sub); let waiter = kio::Waiter::noop(); - let mut last = None; - while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) { - last = Some(value); - } + while let Poll::Ready(Ok(Some(_))) = consumer.poll_next(&waiter) {} let expected: HashSet> = [vec![0x00, 0xff, 0x42], vec![0x01]].into_iter().collect(); - assert_eq!(last.unwrap(), expected); + assert_eq!(consumer.current(), &expected); } #[test] @@ -693,10 +775,9 @@ mod test { let mut consumer = Consumer::::new(sub); let waiter = kio::Waiter::noop(); - let mut last = None; - while let Poll::Ready(Ok(Some(value))) = consumer.poll_next(&waiter) { - last = Some(value); - } - assert_eq!(last.unwrap(), [Point { x: 3, y: 4 }].into_iter().collect()); + while let Poll::Ready(Ok(Some(_))) = consumer.poll_next(&waiter) {} + + let expected: HashSet = [Point { x: 3, y: 4 }].into_iter().collect(); + assert_eq!(consumer.current(), &expected); } }