Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ Changes in one area usually need matching updates elsewhere, including docs. If
| `rs/moq-net` wire/API | `js/net`, `doc/concept` |
| `rs/hang` catalog/container | `js/hang`, `doc/concept` |
| `rs/moq-token` | `js/token` |
| `rs/moq-data` set wire/API | `js/data` (shared wire format, must stay byte-compatible) |
| `rs/moq-relay` config/behavior | `doc/bin/relay/` |
| `rs/moq-cli` | `doc/bin/cli.md` |
| `rs/moq-gst` | `doc/bin/gstreamer.md` |
Expand Down
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"rs/moq-bench",
"rs/moq-boy",
"rs/moq-cli",
"rs/moq-data",
"rs/moq-ffi",
"rs/moq-gst",
"rs/moq-json",
Expand All @@ -28,6 +29,7 @@ default-members = [
"rs/moq-audio",
"rs/moq-bench",
"rs/moq-cli",
"rs/moq-data",
# "rs/moq-ffi", # requires Python/maturin
# "rs/moq-gst", # requires GStreamer
"rs/moq-json",
Expand Down
30 changes: 24 additions & 6 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions js/data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<p align="center">
<img height="128px" src="https://github.com/moq-dev/moq/blob/main/.github/logo.svg" alt="Media over QUIC">
</p>

# @moq/data

[![npm version](https://img.shields.io/npm/v/@moq/data)](https://www.npmjs.com/package/@moq/data)
[![TypeScript](https://img.shields.io/badge/TypeScript-ready-blue.svg)](https://www.typescriptlang.org/)

Helpers for sending metadata over [Media over QUIC](https://moq.dev/) tracks.

Each helper maps an application data structure onto a [`@moq/net`](../net) track, handling snapshots and deltas so a late joiner can reconstruct the current state from the newest group alone.

- **`@moq/data/set`** syncs a `Set`-like collection of arbitrary binary items, encoding changes as `+`/`-` deltas.
- **`@moq/data/json`** re-exports [`@moq/json`](../json) for snapshot/delta JSON publishing. It lives in its own package today and will migrate here over time.

## Set

```ts
import { Producer, Consumer, stringCodec } from "@moq/data/set";

// Publish the set of track names in a broadcast.
const producer = new Producer(track, { codec: stringCodec });
producer.insert("video");
producer.insert("audio");
producer.remove("audio");

// Consume: yields the full set after each change.
const consumer = new Consumer(track.subscribe(), { codec: stringCodec });
for await (const names of consumer) {
console.log(names); // Set<string>
}
```

Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas.

Items are arbitrary binary data via a `Codec<T>` (`encode`/`decode` to `Uint8Array`). `stringCodec` and `bytesCodec` are provided; supply your own for richer types. Items dedupe by their encoded bytes, so two values with the same encoding are the same member.

Deltas are on by default (`deltaRatio: 2`); a delta is appended while the group stays within `deltaRatio` times the size of a fresh snapshot, otherwise a new snapshot group is started.
32 changes: 32 additions & 0 deletions js/data/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name": "@moq/data",
"type": "module",
"version": "0.0.1",
"description": "Helpers for sending metadata (sets, JSON) over MoQ tracks.",
"license": "(MIT OR Apache-2.0)",
"repository": "github:moq-dev/moq",
"sideEffects": false,
"exports": {
".": "./src/index.ts",
"./set": "./src/set/index.ts",
"./json": "./src/json.ts"
},
"scripts": {
"build": "rimraf dist && tsc -b && bun ../common/package.ts",
"check": "tsc --noEmit",
"test": "bun test --only-failures",
"release": "bun ../common/release.ts"
},
"dependencies": {
"@moq/json": "workspace:^",
"@moq/net": "workspace:^"
},
"peerDependencies": {
"zod": "^4.0.0"
},
"devDependencies": {
"@types/bun": "^1.3.14",
"rimraf": "^6.1.3",
"typescript": "^6.0.3"
}
}
2 changes: 2 additions & 0 deletions js/data/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * as Json from "@moq/json";
export * as Set from "./set/index.ts";
2 changes: 2 additions & 0 deletions js/data/src/json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Snapshot/delta JSON publishing, re-exported from @moq/json.
export * from "@moq/json";
26 changes: 26 additions & 0 deletions js/data/src/set/codec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Encodes a set item to and from its wire bytes.
*
* Encoding must be deterministic and round-trip: `codec.decode(codec.encode(value))` must equal
* `value`. Two items are the same set member iff they encode to the same bytes, so distinct items
* must encode distinctly.
*/
export interface Codec<T> {
encode(value: T): Uint8Array;
decode(bytes: Uint8Array): T;
}

const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder();

/** A codec for UTF-8 strings, e.g. a set of track names. */
export const stringCodec: Codec<string> = {
encode: (value) => textEncoder.encode(value),
decode: (bytes) => textDecoder.decode(bytes),
};

/** A codec for raw binary items, passed through untouched. */
export const bytesCodec: Codec<Uint8Array> = {
encode: (value) => value,
decode: (bytes) => bytes,
};
78 changes: 78 additions & 0 deletions js/data/src/set/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import type * as Moq from "@moq/net";

import type { Codec } from "./codec.ts";
import type { Config } from "./producer.ts";
import { decodeDelta, decodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts";

/**
* Consumes a set from a track, reconstructing it from snapshots and deltas.
*
* Reads each group's snapshot (frame 0) and applies the following frames as insert/remove deltas,
* yielding the reconstructed set after each one.
*/
export class Consumer<T> {
#track: Moq.Track;
#codec: Codec<T>;

#group?: Moq.Group;
// Keyed by encoded bytes so items dedupe by value, not reference.
#current = new Map<string, T>();
#framesRead = 0;

constructor(track: Moq.Track, config: Config<T>) {
this.#track = track;
this.#codec = config.codec;
}

/** Get the set after the next change, or undefined once the track ends. */
async next(): Promise<Set<T> | undefined> {
for (;;) {
if (!this.#group) {
// Advance to the next group with a higher sequence number (skipping late arrivals).
this.#group = await this.#track.nextGroupOrdered();
if (!this.#group) return undefined;
this.#current = new Map();
this.#framesRead = 0;
}

const frame = await this.#group.readFrame();
if (frame === undefined) {
// The group is exhausted; advance to the next one.
this.#group = undefined;
continue;
}

this.#apply(frame);
return new Set(this.#current.values());
}
}

async *[Symbol.asyncIterator](): AsyncIterator<Set<T>> {
for (;;) {
const value = await this.next();
if (value === undefined) return;
yield value;
}
}

// Frame 0 of a group is a snapshot, the rest are insert/remove deltas.
#apply(frame: Uint8Array): void {
if (this.#framesRead === 0) {
this.#current = new Map();
for (const item of decodeSnapshot(frame)) {
this.#current.set(keyOf(item), this.#codec.decode(item));
}
} else {
const [op, item] = decodeDelta(frame);
const key = keyOf(item);
if (op === INSERT) {
this.#current.set(key, this.#codec.decode(item));
} else if (op === REMOVE) {
this.#current.delete(key);
} else {
throw new Error(`unknown op byte: ${op}`);
}
}
this.#framesRead += 1;
}
}
3 changes: 3 additions & 0 deletions js/data/src/set/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export { bytesCodec, type Codec, stringCodec } from "./codec.ts";
export { Consumer } from "./consumer.ts";
export { type Config, Producer } from "./producer.ts";
Loading
Loading