Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ Changes in one area usually need matching updates elsewhere, including docs. If
| `rs/moq-net` wire/API | `js/net`, `doc/concept` |
| `rs/hang` catalog/container | `js/hang`, `doc/concept` |
| `rs/moq-token` | `js/token` |
| `rs/moq-data` set wire/API | `js/data` (shared wire format, must stay byte-compatible) |
| `rs/moq-relay` config/behavior | `doc/bin/relay/` |
| `rs/moq-cli` | `doc/bin/cli.md` |
| `rs/moq-gst` | `doc/bin/gstreamer.md` |
Expand Down
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"rs/moq-bench",
"rs/moq-boy",
"rs/moq-cli",
"rs/moq-data",
"rs/moq-ffi",
"rs/moq-gst",
"rs/moq-json",
Expand All @@ -28,6 +29,7 @@ default-members = [
"rs/moq-audio",
"rs/moq-bench",
"rs/moq-cli",
"rs/moq-data",
# "rs/moq-ffi", # requires Python/maturin
# "rs/moq-gst", # requires GStreamer
"rs/moq-json",
Expand Down
18 changes: 18 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions js/data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<p align="center">
<img height="128px" src="https://github.com/moq-dev/moq/blob/main/.github/logo.svg" alt="Media over QUIC">
</p>

# @moq/data

[![npm version](https://img.shields.io/npm/v/@moq/data)](https://www.npmjs.com/package/@moq/data)
[![TypeScript](https://img.shields.io/badge/TypeScript-ready-blue.svg)](https://www.typescriptlang.org/)

Helpers for sending metadata over [Media over QUIC](https://moq.dev/) tracks.

Each helper maps an application data structure onto a [`@moq/net`](../net) track, handling snapshots and deltas so a late joiner can reconstruct the current state from the newest group alone.

- **`@moq/data/set`** syncs a `Set`-like collection of arbitrary binary items, encoding changes as `+`/`-` deltas.
- **`@moq/data/json`** re-exports [`@moq/json`](../json) for snapshot/delta JSON publishing. It lives in its own package today and will migrate here over time.

## Set

```ts
import { Producer, Consumer, stringCodec } from "@moq/data/set";

// Publish the set of track names in a broadcast.
const producer = new Producer(track, { codec: stringCodec });
producer.insert("video");
producer.insert("audio");
producer.remove("audio");

// Consume: each change is the items added and removed; `current()` is the full set.
const consumer = new Consumer(track, { codec: stringCodec });
for await (const update of consumer) {
for (const name of update.added) console.log("track appeared:", name);
for (const name of update.removed) console.log("track left:", name);
console.log("now:", consumer.current()); // Set<string>
}
```

Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. Each change is reported as the items it added and removed (a snapshot is diffed against the current set), so a watcher reacts to individual tracks appearing and leaving without comparing whole sets.

Items are arbitrary binary data via a `Codec<T>` (`encode`/`decode` to `Uint8Array`). `stringCodec` and `bytesCodec` are provided; supply your own for richer types. Items dedupe by their encoded bytes, so two values with the same encoding are the same member.

Deltas are on by default (`deltaRatio: 2`); a delta is appended while the group stays within `deltaRatio` times the size of a fresh snapshot, otherwise a new snapshot group is started.
32 changes: 32 additions & 0 deletions js/data/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name": "@moq/data",
"type": "module",
"version": "0.0.1",
"description": "Helpers for sending metadata (sets, JSON) over MoQ tracks.",
"license": "(MIT OR Apache-2.0)",
"repository": "github:moq-dev/moq",
"sideEffects": false,
"exports": {
".": "./src/index.ts",
"./set": "./src/set/index.ts",
"./json": "./src/json.ts"
},
"scripts": {
"build": "rimraf dist && tsc -b && bun ../common/package.ts",
"check": "tsc --noEmit",
"test": "bun test --only-failures",
"release": "bun ../common/release.ts"
},
"dependencies": {
"@moq/json": "workspace:^",
"@moq/net": "workspace:^"
},
"peerDependencies": {
"zod": "^4.0.0"
},
"devDependencies": {
"@types/bun": "^1.3.14",
"rimraf": "^6.1.3",
"typescript": "^6.0.3"
}
}
2 changes: 2 additions & 0 deletions js/data/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * as Json from "@moq/json";
export * as Set from "./set/index.ts";
2 changes: 2 additions & 0 deletions js/data/src/json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Snapshot/delta JSON publishing, re-exported from @moq/json.
export * from "@moq/json";
26 changes: 26 additions & 0 deletions js/data/src/set/codec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Encodes a set item to and from its wire bytes.
*
* Encoding must be deterministic and round-trip: `codec.decode(codec.encode(value))` must equal
* `value`. Two items are the same set member iff they encode to the same bytes, so distinct items
* must encode distinctly.
*/
export interface Codec<T> {
encode(value: T): Uint8Array;
decode(bytes: Uint8Array): T;
}

const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder();

/** A codec for UTF-8 strings, e.g. a set of track names. */
export const stringCodec: Codec<string> = {
encode: (value) => textEncoder.encode(value),
decode: (bytes) => textDecoder.decode(bytes),
};

/** A codec for raw binary items, passed through untouched. */
export const bytesCodec: Codec<Uint8Array> = {
encode: (value) => value,
decode: (bytes) => bytes,
};
116 changes: 116 additions & 0 deletions js/data/src/set/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import type * as Moq from "@moq/net";

import type { Codec } from "./codec.ts";
import type { Config } from "./producer.ts";
import { decodeDelta, decodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts";

/**
* The items added and removed by a single change, returned by {@link Consumer.next}.
*
* A delta carries one item in exactly one field. A snapshot (the first frame of a group, or a late
* joiner's first read) carries its difference from the previous state, so several items may be
* added and removed at once.
*/
export interface Update<T> {
added: T[];
removed: T[];
}

/**
* Consumes a set from a track, reconstructing it from snapshots and deltas.
*
* Each change is reduced to the items it added and removed; the full set is available via
* {@link current}.
*/
export class Consumer<T> {
#track: Moq.Track;
#codec: Codec<T>;

#group?: Moq.Group;
// Keyed by encoded bytes so items dedupe by value, not reference.
#current = new Map<string, T>();
#framesRead = 0;

constructor(track: Moq.Track, config: Config<T>) {
this.#track = track;
this.#codec = config.codec;
}

/** The full set as currently reconstructed. Updated by each {@link next}. */
current(): Set<T> {
return new Set(this.#current.values());
}

/**
* Get the next change as added/removed items, or undefined once the track ends.
*
* Frames that change nothing are skipped, so a returned {@link Update} is never empty. Switching
* to a newer group diffs its snapshot against the current set, so no change is missed or doubled.
*/
async next(): Promise<Update<T> | undefined> {
for (;;) {
if (!this.#group) {
// Advance to the next group with a higher sequence number (skipping late arrivals). We
// keep #current across the switch so the next snapshot diffs against it.
this.#group = await this.#track.nextGroupOrdered();
if (!this.#group) return undefined;
this.#framesRead = 0;
}

const frame = await this.#group.readFrame();
if (frame === undefined) {
// The group is exhausted; advance to the next one.
this.#group = undefined;
continue;
}

const update = this.#apply(frame);
if (update.added.length > 0 || update.removed.length > 0) return update;
// A no-op frame (redundant snapshot or delta); read the next one.
}
}

async *[Symbol.asyncIterator](): AsyncIterator<Update<T>> {
for (;;) {
const update = await this.next();
if (update === undefined) return;
yield update;
}
}

// Apply one frame, returning what it changed: frame 0 of a group is a snapshot (diffed against
// the current set), the rest are insert/remove deltas.
#apply(frame: Uint8Array): Update<T> {
this.#framesRead += 1;

if (this.#framesRead === 1) {
const next = new Map<string, T>();
for (const item of decodeSnapshot(frame)) {
next.set(keyOf(item), this.#codec.decode(item));
}

const added: T[] = [];
const removed: T[] = [];
for (const [key, value] of next) if (!this.#current.has(key)) added.push(value);
for (const [key, value] of this.#current) if (!next.has(key)) removed.push(value);
this.#current = next;
return { added, removed };
}

const [op, item] = decodeDelta(frame);
const key = keyOf(item);
if (op === INSERT) {
if (this.#current.has(key)) return { added: [], removed: [] };
const value = this.#codec.decode(item);
this.#current.set(key, value);
return { added: [value], removed: [] };
}
if (op === REMOVE) {
const value = this.#current.get(key);
if (value === undefined) return { added: [], removed: [] };
this.#current.delete(key);
return { added: [], removed: [value] };
}
throw new Error(`unknown op byte: ${op}`);
}
}
3 changes: 3 additions & 0 deletions js/data/src/set/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export { bytesCodec, type Codec, stringCodec } from "./codec.ts";
export { Consumer, type Update } from "./consumer.ts";
export { type Config, Producer } from "./producer.ts";
Loading