From f941cd0547bcc2199f1145ccbf6f75835acc4029 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 10:24:05 -0700 Subject: [PATCH 1/7] feat(wasm): full @moq/wasm API as a drop-in for @moq/net; flip watch/publish/boy Expand the rs/moq-wasm bindings into a full consume + publish object model and add a hand-written TypeScript shim so @moq/wasm presents the same surface as @moq/net, then flip the apps to use it. rs/moq-wasm: - Full producer + consumer model: Session (connect/consume/publish), dual-use Broadcast (requested), TrackRequest, TrackProducer, TrackConsumer, TrackSubscriber, dual-use Group, plus a real OriginConsumer with announce discovery + consume (no stub). - transport.rs advertises the moq ALPNs via web-transport-wasm 0.5.8's new ClientBuilder::with_protocols so the relay negotiates lite-04/05 like @moq/net (without it the browser sent no subprotocol and fell back to a dead lite-02). - setup() caps tracing at WARN (default TRACE floods the console under churn). js/wasm (TS shim): - Connection (connect + Established + ported Reload), model wrappers with the string/json/bool conveniences, options-object signatures, reactive state.closed, synchronous lazy consume/subscribe, number sequences. - Path/Time/Signals/Varint/TrackInfo re-exported from @moq/net (pure helpers). - For local http:// dev, connect fetches /certificate.sha256, pins it, and upgrades to https:// (mirrors @moq/net). - src imports the wasm-bindgen output via the "#bindgen" package-imports subpath (never names dist/); a new vite plugin (js/common/vite-plugin-wasm) builds the wasm on demand and hot-rebuilds on Rust changes, so no manual `just wasm`. Decouple the serialization layers from the networking model so the apps can flip backends without the TS private-field nominal-typing wall: - @moq/hang is serialization-only now: the container Consumer moved to js/watch, the Legacy Producer to js/publish; the pure Format/encodeFrame stay. - @moq/json and @moq/msf take minimal structural track interfaces instead of concrete @moq/net classes, so any backend (or a test double) satisfies them. @moq/net is now a devDependency there. Flip js/watch, js/publish, and js/moq-boy app code from @moq/net to @moq/wasm. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 7 +- bun.lock | 28 +- demo/web/vite.config.ts | 3 +- js/common/vite-plugin-wasm.ts | 60 ++ js/hang/src/container/format.test.ts | 137 +++++ js/hang/src/container/index.ts | 1 - js/hang/src/container/legacy.ts | 32 +- js/hang/src/index.ts | 3 - js/json/package.json | 2 +- js/json/src/consumer.ts | 16 +- js/json/src/index.ts | 4 +- js/json/src/producer.ts | 18 +- js/moq-boy/package.json | 1 + js/moq-boy/src/element.tsx | 2 +- js/moq-boy/src/index.ts | 2 +- js/moq-boy/src/ui/components/GameCard.tsx | 2 +- js/moq-boy/src/ui/components/StatsPanel.tsx | 2 +- js/moq-boy/src/ui/context.tsx | 2 +- js/msf/package.json | 2 +- js/msf/src/catalog.ts | 8 +- js/publish/package.json | 1 + js/publish/src/audio/capture-worklet.ts | 2 +- js/publish/src/audio/capture.ts | 2 +- js/publish/src/audio/encoder.ts | 4 +- js/publish/src/broadcast.ts | 2 +- js/publish/src/catalog.ts | 3 +- js/publish/src/container.ts | 33 ++ js/publish/src/element.ts | 2 +- js/publish/src/index.ts | 6 +- js/publish/src/preview.ts | 2 +- js/publish/src/video/encoder.ts | 8 +- js/publish/src/video/index.ts | 2 +- js/publish/src/video/polyfill.ts | 2 +- js/wasm/README.md | 54 +- js/wasm/package.json | 29 +- js/wasm/src/connection.ts | 249 +++++++++ js/wasm/src/index.ts | 399 ++++++++++++++ js/wasm/tsconfig.json | 10 + js/watch/package.json | 1 + js/watch/src/audio/buffer.ts | 2 +- js/watch/src/audio/decoder.ts | 9 +- js/watch/src/audio/mse.ts | 5 +- js/watch/src/audio/render.ts | 2 +- js/watch/src/audio/ring-buffer.ts | 2 +- js/watch/src/audio/shared-ring-buffer.ts | 2 +- js/watch/src/audio/source.ts | 4 +- js/watch/src/backend.ts | 2 +- js/watch/src/broadcast.ts | 4 +- .../src/container.test.ts} | 165 +----- .../consumer.ts => watch/src/container.ts} | 35 +- js/watch/src/element.ts | 4 +- js/watch/src/index.ts | 6 +- js/watch/src/mse.ts | 2 +- js/watch/src/sync.ts | 4 +- js/watch/src/ui/components/buffer-control.ts | 28 +- js/watch/src/ui/components/latency.ts | 4 +- js/watch/src/video/backend.ts | 2 +- js/watch/src/video/decoder.ts | 9 +- js/watch/src/video/mse.ts | 5 +- js/watch/src/video/renderer.ts | 2 +- js/watch/src/video/source.ts | 4 +- rs/moq-wasm/Cargo.toml | 3 +- rs/moq-wasm/README.md | 28 +- rs/moq-wasm/src/lib.rs | 514 ++++++++++++++++-- rs/moq-wasm/src/transport.rs | 26 +- 65 files changed, 1642 insertions(+), 374 deletions(-) create mode 100644 js/common/vite-plugin-wasm.ts create mode 100644 js/hang/src/container/format.test.ts create mode 100644 js/publish/src/container.ts create mode 100644 js/wasm/src/connection.ts create mode 100644 js/wasm/src/index.ts create mode 100644 js/wasm/tsconfig.json rename js/{hang/src/container/consumer.test.ts => watch/src/container.test.ts} (75%) rename js/{hang/src/container/consumer.ts => watch/src/container.ts} (91%) diff --git a/Cargo.lock b/Cargo.lock index 9c9b5ddc6..440bb8a2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1710,7 +1710,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccc2776f0c61eca1ca32528f85548abd1a4be8fb53d1b21c013e4f18da1e7090" dependencies = [ "data-encoding", - "syn 2.0.117", + "syn 1.0.109", ] [[package]] @@ -4662,6 +4662,7 @@ dependencies = [ "js-sys", "moq-net", "thiserror 2.0.18", + "tracing", "tracing-wasm", "url", "wasm-bindgen", @@ -9250,9 +9251,9 @@ dependencies = [ [[package]] name = "web-transport-wasm" -version = "0.5.7" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ee22c228309b45651038d975a9f3c041525e9f2cc0f6c3bd8753a110804df11" +checksum = "304af61705cf9212d1686fc6b2eb10110bea706a1ccaa475065947bc8562f553" dependencies = [ "bytes", "js-sys", diff --git a/bun.lock b/bun.lock index bbc1d8582..55c3fce0d 100644 --- a/bun.lock +++ b/bun.lock @@ -79,7 +79,7 @@ }, "js/hang": { "name": "@moq/hang", - "version": "0.2.7", + "version": "0.2.8", "dependencies": { "@kixelated/libavjs-webcodecs-polyfill": "^0.5.5", "@libav.js/variant-opus-af": "^6.8.8", @@ -100,12 +100,12 @@ }, "js/json": { "name": "@moq/json", - "version": "0.0.1", + "version": "0.0.2", "dependencies": { - "@moq/net": "workspace:^", "@moq/signals": "workspace:^", }, "devDependencies": { + "@moq/net": "workspace:^", "@types/bun": "^1.3.14", "rimraf": "^6.1.3", "typescript": "^6.0.3", @@ -132,6 +132,7 @@ "dependencies": { "@moq/net": "workspace:^", "@moq/signals": "workspace:^", + "@moq/wasm": "workspace:^", "@moq/watch": "workspace:^", "zod": "^4.4.3", }, @@ -149,17 +150,17 @@ "name": "@moq/msf", "version": "0.1.1", "dependencies": { - "@moq/net": "workspace:^", "zod": "^4.4.3", }, "devDependencies": { + "@moq/net": "workspace:^", "rimraf": "^6.1.3", "typescript": "^6.0.3", }, }, "js/net": { "name": "@moq/net", - "version": "0.1.2", + "version": "0.1.3", "dependencies": { "@moq/qmux": "^0.1.1", "@moq/signals": "workspace:*", @@ -179,12 +180,13 @@ }, "js/publish": { "name": "@moq/publish", - "version": "0.2.10", + "version": "0.2.12", "dependencies": { "@moq/hang": "workspace:^", "@moq/json": "workspace:^", "@moq/net": "workspace:^", "@moq/signals": "workspace:^", + "@moq/wasm": "workspace:^", }, "devDependencies": { "@types/audioworklet": "^0.0.100", @@ -198,7 +200,7 @@ }, "js/signals": { "name": "@moq/signals", - "version": "0.1.7", + "version": "0.1.8", "devDependencies": { "@types/bun": "^1.3.11", "@types/react": "^19.2.15", @@ -239,16 +241,26 @@ "js/wasm": { "name": "@moq/wasm", "version": "0.0.0", + "dependencies": { + "@moq/net": "workspace:^", + "@moq/signals": "workspace:^", + }, + "devDependencies": { + "@types/bun": "^1.3.14", + "@typescript/lib-dom": "npm:@types/web@^0.0.350", + "typescript": "^6.0.3", + }, }, "js/watch": { "name": "@moq/watch", - "version": "0.2.14", + "version": "0.2.15", "dependencies": { "@moq/hang": "workspace:^", "@moq/json": "workspace:^", "@moq/msf": "workspace:^", "@moq/net": "workspace:^", "@moq/signals": "workspace:^", + "@moq/wasm": "workspace:^", }, "devDependencies": { "@types/audioworklet": "^0.0.100", diff --git a/demo/web/vite.config.ts b/demo/web/vite.config.ts index 5cd3ce357..967ccdb5e 100644 --- a/demo/web/vite.config.ts +++ b/demo/web/vite.config.ts @@ -2,12 +2,13 @@ import tailwindcss from "@tailwindcss/vite"; import { resolve } from "path"; import { defineConfig } from "vite"; import solidPlugin from "vite-plugin-solid"; +import { moqWasm } from "../../js/common/vite-plugin-wasm"; import { workletInline } from "../../js/common/vite-plugin-worklet"; export default defineConfig({ root: "src", envDir: resolve(__dirname), - plugins: [tailwindcss(), solidPlugin(), workletInline()], + plugins: [moqWasm(), tailwindcss(), solidPlugin(), workletInline()], build: { target: "esnext", sourcemap: process.env.NODE_ENV === "production" ? false : "inline", diff --git a/js/common/vite-plugin-wasm.ts b/js/common/vite-plugin-wasm.ts new file mode 100644 index 000000000..a81a1f412 --- /dev/null +++ b/js/common/vite-plugin-wasm.ts @@ -0,0 +1,60 @@ +import { execFileSync } from "node:child_process"; +import { existsSync, readdirSync, statSync } from "node:fs"; +import { dirname, join, resolve } from "node:path"; +import { fileURLToPath } from "node:url"; +import type { Plugin } from "vite"; + +// js/common -> repo root +const repoRoot = resolve(dirname(fileURLToPath(import.meta.url)), "../.."); +const dist = join(repoRoot, "js/wasm/dist/moq.js"); + +// Rebuild when the Rust sources behind the bindings change. +const watchDirs = ["rs/moq-wasm/src", "rs/moq-net/src"].map((d) => join(repoRoot, d)).filter(existsSync); + +function newestMtime(dir: string): number { + let max = 0; + for (const entry of readdirSync(dir, { withFileTypes: true })) { + const p = join(dir, entry.name); + max = Math.max(max, entry.isDirectory() ? newestMtime(p) : statSync(p).mtimeMs); + } + return max; +} + +function build(): void { + // `just wasm` = cargo build (wasm32) + wasm-bindgen into js/wasm/dist. Needs the + // nix dev shell on PATH, which `just dev` already provides. + execFileSync("just", ["wasm"], { cwd: repoRoot, stdio: "inherit" }); +} + +function buildIfStale(): void { + const distTime = existsSync(dist) ? statSync(dist).mtimeMs : 0; + const srcTime = watchDirs.length ? Math.max(...watchDirs.map(newestMtime)) : 0; + if (srcTime > distTime) build(); +} + +/** + * Builds `@moq/wasm` (the wasm-bindgen output in `js/wasm/dist`) on demand, so a + * consumer never has to run `just wasm` first. Rebuilds and full-reloads when the + * `rs/moq-wasm` / `rs/moq-net` sources change. + */ +export function moqWasm(): Plugin { + return { + name: "moq-wasm", + enforce: "pre", + buildStart() { + buildIfStale(); + }, + configureServer(server) { + for (const d of watchDirs) server.watcher.add(d); + server.watcher.on("change", (file) => { + if (!watchDirs.some((d) => file.startsWith(d))) return; + try { + build(); + server.ws.send({ type: "full-reload" }); + } catch (err) { + server.config.logger.error(`moq-wasm rebuild failed: ${String(err)}`); + } + }); + }, + }; +} diff --git a/js/hang/src/container/format.test.ts b/js/hang/src/container/format.test.ts new file mode 100644 index 000000000..8df3b619f --- /dev/null +++ b/js/hang/src/container/format.test.ts @@ -0,0 +1,137 @@ +import { expect, test } from "bun:test"; +import { type Time, Varint } from "@moq/net"; +import type { InitSegment } from "./cmaf/decode.ts"; +import { encodeDataSegment } from "./cmaf/encode.ts"; +import { Format as CmafFormat } from "./cmaf/format.ts"; +import { Format as LegacyFormat } from "./legacy.ts"; + +const TIMESCALE = 90_000; +const TEST_INIT: InitSegment = { + timescale: TIMESCALE, + trackId: 1, + defaultSampleDuration: 0, + defaultSampleSize: 0, + defaultSampleFlags: 0, +}; + +function encodeLegacyFrame(timestamp: Time.Micro, payload: Uint8Array): Uint8Array { + const tsBytes = Varint.encode(timestamp); + const data = new Uint8Array(tsBytes.byteLength + payload.byteLength); + data.set(tsBytes, 0); + data.set(payload, tsBytes.byteLength); + return data; +} + +// --- LegacyFormat --- + +test("LegacyFormat decodes a valid frame", () => { + const format = new LegacyFormat(); + const payload = new Uint8Array([0xde, 0xad]); + const timestamp = 1000 as Time.Micro; + const frame = encodeLegacyFrame(timestamp, payload); + + const result = format.decode(frame); + + expect(result).toHaveLength(1); + expect(result[0].timestamp).toBe(timestamp); + expect(result[0].data).toEqual(payload); + expect(result[0].keyframe).toBe(false); +}); + +test("LegacyFormat always returns keyframe: false", () => { + const format = new LegacyFormat(); + const frame = encodeLegacyFrame(0 as Time.Micro, new Uint8Array([0x01])); + + const [decoded] = format.decode(frame); + expect(decoded.keyframe).toBe(false); +}); + +test("LegacyFormat always returns exactly one frame", () => { + const format = new LegacyFormat(); + const frame = encodeLegacyFrame(5000 as Time.Micro, new Uint8Array([0x01, 0x02, 0x03])); + + const result = format.decode(frame); + expect(result).toHaveLength(1); +}); + +test("LegacyFormat throws on empty input", () => { + const format = new LegacyFormat(); + expect(() => format.decode(new Uint8Array(0))).toThrow(); +}); + +test("LegacyFormat throws on truncated input", () => { + const format = new LegacyFormat(); + // A varint that indicates more bytes follow but is truncated + expect(() => format.decode(new Uint8Array([0x80]))).toThrow(); +}); + +// --- CmafFormat --- + +test("CmafFormat decodes a valid keyframe segment", () => { + const format = new CmafFormat(TEST_INIT); + const segment = encodeDataSegment({ + data: new Uint8Array([0xca, 0xfe]), + timestamp: 0, + duration: 3000, + keyframe: true, + sequence: 0, + }); + + const result = format.decode(segment); + + expect(result).toHaveLength(1); + expect(result[0].data).toEqual(new Uint8Array([0xca, 0xfe])); + expect(result[0].timestamp).toBe(0 as Time.Micro); + expect(result[0].keyframe).toBe(true); +}); + +test("CmafFormat decodes a delta frame segment", () => { + const format = new CmafFormat(TEST_INIT); + const segment = encodeDataSegment({ + data: new Uint8Array([0xbe, 0xef]), + timestamp: 3000, + duration: 3000, + keyframe: false, + sequence: 1, + }); + + const result = format.decode(segment); + + expect(result).toHaveLength(1); + expect(result[0].keyframe).toBe(false); +}); + +test("CmafFormat converts timescale units to microseconds", () => { + const format = new CmafFormat(TEST_INIT); + // 90000 timescale units = 1 second = 1_000_000 microseconds + const segment = encodeDataSegment({ + data: new Uint8Array([0x01]), + timestamp: TIMESCALE, + duration: 3000, + keyframe: true, + sequence: 0, + }); + + const result = format.decode(segment); + expect(result[0].timestamp).toBe(1_000_000 as Time.Micro); +}); + +test("CmafFormat throws on corrupt segment", () => { + const format = new CmafFormat(TEST_INIT); + expect(() => format.decode(new Uint8Array([0x00, 0x01, 0x02]))).toThrow(); +}); + +test("CmafFormat decodes the per-sample duration", () => { + const format = new CmafFormat(TEST_INIT); + const segment = encodeDataSegment({ + data: new Uint8Array([0xca, 0xfe]), + timestamp: 0, + duration: 3000, + keyframe: true, + sequence: 0, + }); + + const [frame] = format.decode(segment); + // 3000 ticks / 90000 timescale * 1_000_000 = 33333µs + expect(frame.duration).toBe(33_333 as Time.Micro); +}); diff --git a/js/hang/src/container/index.ts b/js/hang/src/container/index.ts index 11d81b49f..b72beabfa 100644 --- a/js/hang/src/container/index.ts +++ b/js/hang/src/container/index.ts @@ -1,6 +1,5 @@ export * as Loc from "@moq/loc"; export * as Cmaf from "./cmaf"; -export { Consumer, type ConsumerProps } from "./consumer"; export type { Format } from "./format"; export * as Legacy from "./legacy"; export * from "./types"; diff --git a/js/hang/src/container/legacy.ts b/js/hang/src/container/legacy.ts index 1112555b1..599cf419f 100644 --- a/js/hang/src/container/legacy.ts +++ b/js/hang/src/container/legacy.ts @@ -1,5 +1,5 @@ import type { Time } from "@moq/net"; -import * as Moq from "@moq/net"; +import { Varint } from "@moq/net"; export type { BufferedRange, BufferedRanges, Frame } from "./types"; @@ -8,7 +8,7 @@ import type { Frame } from "./types"; export class Format implements ContainerFormat { decode(frame: Uint8Array): Frame[] { - const [timestamp, data] = Moq.Varint.decode(frame); + const [timestamp, data] = Varint.decode(frame); return [{ data, timestamp: timestamp as Time.Micro, keyframe: false }]; } } @@ -20,7 +20,7 @@ export interface Source { // Encode a frame as a timestamp varint followed by the payload bytes. export function encodeFrame(source: Uint8Array | Source, timestamp: Time.Micro): Uint8Array { - const timestampBytes = Moq.Varint.encode(timestamp); + const timestampBytes = Varint.encode(timestamp); const data = new Uint8Array(timestampBytes.byteLength + source.byteLength); data.set(timestampBytes, 0); @@ -32,29 +32,3 @@ export function encodeFrame(source: Uint8Array | Source, timestamp: Time.Micro): return data; } - -// A Helper class to encode frames into a track. -export class Producer { - #track: Moq.TrackProducer; - #group?: Moq.Group; - - constructor(track: Moq.TrackProducer) { - this.#track = track; - } - - encode(data: Uint8Array | Source, timestamp: Time.Micro, keyframe: boolean) { - if (keyframe) { - this.#group?.close(); - this.#group = this.#track.appendGroup(); - } else if (!this.#group) { - throw new Error("must start with a keyframe"); - } - - this.#group?.writeFrame(encodeFrame(data, timestamp)); - } - - close(err?: Error) { - this.#track.close(err); - this.#group?.close(); - } -} diff --git a/js/hang/src/index.ts b/js/hang/src/index.ts index 8692777ac..60275586f 100644 --- a/js/hang/src/index.ts +++ b/js/hang/src/index.ts @@ -1,6 +1,3 @@ -export * as Net from "@moq/net"; -/** @deprecated Use `Net` instead. */ -export * as Moq from "@moq/net"; export * as Signals from "@moq/signals"; export * as Catalog from "./catalog"; export * as Container from "./container"; diff --git a/js/json/package.json b/js/json/package.json index 2b3fbe64f..627518b02 100644 --- a/js/json/package.json +++ b/js/json/package.json @@ -16,13 +16,13 @@ "release": "bun ../common/release.ts" }, "dependencies": { - "@moq/net": "workspace:^", "@moq/signals": "workspace:^" }, "peerDependencies": { "zod": "^4.0.0" }, "devDependencies": { + "@moq/net": "workspace:^", "@types/bun": "^1.3.14", "rimraf": "^6.1.3", "typescript": "^6.0.3" diff --git a/js/json/src/consumer.ts b/js/json/src/consumer.ts index 3c127f78a..8681bebf5 100644 --- a/js/json/src/consumer.ts +++ b/js/json/src/consumer.ts @@ -1,8 +1,16 @@ -import type * as Moq from "@moq/net"; import type * as z from "zod/mini"; import { merge } from "./diff.ts"; import type { Config } from "./producer.ts"; +// Minimal structural track surface @moq/json reads from, so the package stays +// networking-model-agnostic (works with @moq/net, @moq/wasm, or a test double). +interface Group { + readFrame(): Promise; +} +export interface TrackSubscriber { + nextGroup(): Promise; +} + /** * Consumes a JSON value from a track, reconstructing it from snapshots and deltas. * @@ -10,14 +18,14 @@ import type { Config } from "./producer.ts"; * yielding the reconstructed value after each one. */ export class Consumer { - #track: Moq.TrackSubscriber; + #track: TrackSubscriber; #schema?: z.ZodMiniType; - #group?: Moq.Group; + #group?: Group; #current?: unknown; #framesRead = 0; - constructor(track: Moq.TrackSubscriber, config: Config = {}) { + constructor(track: TrackSubscriber, config: Config = {}) { this.#track = track; this.#schema = config.schema; } diff --git a/js/json/src/index.ts b/js/json/src/index.ts index ea6ff76d3..8422c5574 100644 --- a/js/json/src/index.ts +++ b/js/json/src/index.ts @@ -1,3 +1,3 @@ -export { Consumer } from "./consumer.ts"; +export { Consumer, type TrackSubscriber } from "./consumer.ts"; export { type Diff, deepEqual, diff, merge } from "./diff.ts"; -export { type Config, Producer } from "./producer.ts"; +export { type Config, type Group, Producer, type TrackProducer } from "./producer.ts"; diff --git a/js/json/src/producer.ts b/js/json/src/producer.ts index 79f1d19d8..049cceba6 100644 --- a/js/json/src/producer.ts +++ b/js/json/src/producer.ts @@ -1,8 +1,18 @@ -import type * as Moq from "@moq/net"; import type * as z from "zod/mini"; import { deepEqual, diff } from "./diff.ts"; +// Minimal structural track surface @moq/json writes to, so the package stays +// networking-model-agnostic (works with @moq/net, @moq/wasm, or a test double). +export interface Group { + writeFrame(frame: Uint8Array): void; + close(): void; +} +export interface TrackProducer { + appendGroup(): Group; + close(): void; +} + // Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. Kept // well below the per-group frame cap so a late joiner can always read the snapshot at frame 0. const MAX_DELTA_FRAMES = 256; @@ -27,15 +37,15 @@ export interface Config { /** Publishes a JSON value over a track, choosing snapshots and deltas automatically. */ export class Producer { - #track: Moq.TrackProducer; + #track: TrackProducer; #config: Config; - #group?: Moq.Group; + #group?: Group; #last?: unknown; #groupBytes = 0; #groupFrames = 0; - constructor(track: Moq.TrackProducer, config: Config = {}) { + constructor(track: TrackProducer, config: Config = {}) { this.#track = track; this.#config = config; } diff --git a/js/moq-boy/package.json b/js/moq-boy/package.json index 50a42606d..0720c4fa8 100644 --- a/js/moq-boy/package.json +++ b/js/moq-boy/package.json @@ -23,6 +23,7 @@ }, "dependencies": { "@moq/net": "workspace:^", + "@moq/wasm": "workspace:^", "@moq/watch": "workspace:^", "@moq/signals": "workspace:^", "zod": "^4.4.3" diff --git a/js/moq-boy/src/element.tsx b/js/moq-boy/src/element.tsx index 6b1f11b5d..43c96d6e8 100644 --- a/js/moq-boy/src/element.tsx +++ b/js/moq-boy/src/element.tsx @@ -1,4 +1,4 @@ -import * as Moq from "@moq/net"; +import * as Moq from "@moq/wasm"; import { render } from "solid-js/web"; import type { GameConfig } from "./index.ts"; import { Game } from "./index.ts"; diff --git a/js/moq-boy/src/index.ts b/js/moq-boy/src/index.ts index 5f90de608..a2d7af08c 100644 --- a/js/moq-boy/src/index.ts +++ b/js/moq-boy/src/index.ts @@ -1,4 +1,4 @@ -import * as Moq from "@moq/net"; +import * as Moq from "@moq/wasm"; import * as Watch from "@moq/watch"; export type { default as MoqBoy } from "./element.tsx"; diff --git a/js/moq-boy/src/ui/components/GameCard.tsx b/js/moq-boy/src/ui/components/GameCard.tsx index 3c6cf930d..f33a63eb2 100644 --- a/js/moq-boy/src/ui/components/GameCard.tsx +++ b/js/moq-boy/src/ui/components/GameCard.tsx @@ -1,4 +1,4 @@ -import { Signals } from "@moq/net"; +import { Signals } from "@moq/wasm"; import { createEffect, onCleanup, onMount, Show } from "solid-js"; import type { Game } from "../../index.ts"; import { KEY_MAP } from "../../index.ts"; diff --git a/js/moq-boy/src/ui/components/StatsPanel.tsx b/js/moq-boy/src/ui/components/StatsPanel.tsx index 22891910f..df47acdbf 100644 --- a/js/moq-boy/src/ui/components/StatsPanel.tsx +++ b/js/moq-boy/src/ui/components/StatsPanel.tsx @@ -11,7 +11,7 @@ export default function StatsPanel() { const onJitterInput = (e: Event) => { const el = e.currentTarget as HTMLInputElement; - game.latency.set(Number.parseInt(el.value, 10) as import("@moq/net").Time.Milli); + game.latency.set(Number.parseInt(el.value, 10) as import("@moq/wasm").Time.Milli); }; const location = () => ctx.status()?.location; diff --git a/js/moq-boy/src/ui/context.tsx b/js/moq-boy/src/ui/context.tsx index 37a97382a..cdcd597e4 100644 --- a/js/moq-boy/src/ui/context.tsx +++ b/js/moq-boy/src/ui/context.tsx @@ -1,5 +1,5 @@ -import { Signals } from "@moq/net"; import { createAccessor } from "@moq/signals/solid"; +import { Signals } from "@moq/wasm"; import type { JSX } from "solid-js"; import { createContext, onCleanup } from "solid-js"; import type MoqBoy from "../element"; diff --git a/js/msf/package.json b/js/msf/package.json index 42e220b61..88cd9db8c 100644 --- a/js/msf/package.json +++ b/js/msf/package.json @@ -15,10 +15,10 @@ "release": "bun ../common/release.ts" }, "dependencies": { - "@moq/net": "workspace:^", "zod": "^4.4.3" }, "devDependencies": { + "@moq/net": "workspace:^", "rimraf": "^6.1.3", "typescript": "^6.0.3" } diff --git a/js/msf/src/catalog.ts b/js/msf/src/catalog.ts index 53a6c95b2..a67f75fe1 100644 --- a/js/msf/src/catalog.ts +++ b/js/msf/src/catalog.ts @@ -1,6 +1,10 @@ -import type * as Moq from "@moq/net"; import * as z from "zod/mini"; +// Minimal structural track surface, so @moq/msf stays networking-model-agnostic. +interface TrackSubscriber { + readFrame(): Promise; +} + export const PackagingSchema = z.union([ z.enum(["loc", "cmaf", "legacy", "mediatimeline", "eventtimeline"]), z.string(), @@ -63,7 +67,7 @@ export function decode(raw: Uint8Array): Catalog { } } -export async function fetch(track: Moq.TrackSubscriber): Promise { +export async function fetch(track: TrackSubscriber): Promise { const frame = await track.readFrame(); if (!frame) return undefined; return decode(frame); diff --git a/js/publish/package.json b/js/publish/package.json index 4da89d32b..2ad648e21 100644 --- a/js/publish/package.json +++ b/js/publish/package.json @@ -27,6 +27,7 @@ "@moq/hang": "workspace:^", "@moq/json": "workspace:^", "@moq/net": "workspace:^", + "@moq/wasm": "workspace:^", "@moq/signals": "workspace:^" }, "devDependencies": { diff --git a/js/publish/src/audio/capture-worklet.ts b/js/publish/src/audio/capture-worklet.ts index 3456bc5e4..ad94d5d35 100644 --- a/js/publish/src/audio/capture-worklet.ts +++ b/js/publish/src/audio/capture-worklet.ts @@ -1,4 +1,4 @@ -import { Time } from "@moq/net"; +import { Time } from "@moq/wasm"; import type { AudioFrame } from "./capture"; class Capture extends AudioWorkletProcessor { diff --git a/js/publish/src/audio/capture.ts b/js/publish/src/audio/capture.ts index 99ece11ba..79f9ae451 100644 --- a/js/publish/src/audio/capture.ts +++ b/js/publish/src/audio/capture.ts @@ -1,4 +1,4 @@ -import type { Time } from "@moq/net"; +import type { Time } from "@moq/wasm"; export interface AudioFrame { timestamp: Time.Micro; diff --git a/js/publish/src/audio/encoder.ts b/js/publish/src/audio/encoder.ts index 4621770cb..7bfcb50c4 100644 --- a/js/publish/src/audio/encoder.ts +++ b/js/publish/src/audio/encoder.ts @@ -1,9 +1,9 @@ import * as Catalog from "@moq/hang/catalog"; import * as Container from "@moq/hang/container"; import * as Util from "@moq/hang/util"; -import type * as Moq from "@moq/net"; -import { Time } from "@moq/net"; import { Effect, type Getter, Signal } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; +import { Time } from "@moq/wasm"; import type * as Capture from "./capture"; import { type Kind, normalizeSource, type Source } from "./types"; diff --git a/js/publish/src/broadcast.ts b/js/publish/src/broadcast.ts index 7f47799da..b4d51ad40 100644 --- a/js/publish/src/broadcast.ts +++ b/js/publish/src/broadcast.ts @@ -1,6 +1,6 @@ import * as Catalog from "@moq/hang/catalog"; -import * as Moq from "@moq/net"; import { Effect, Signal } from "@moq/signals"; +import * as Moq from "@moq/wasm"; import * as Audio from "./audio"; import { CatalogProducer } from "./catalog"; import * as Video from "./video"; diff --git a/js/publish/src/catalog.ts b/js/publish/src/catalog.ts index ed371cba2..4a28cb2cb 100644 --- a/js/publish/src/catalog.ts +++ b/js/publish/src/catalog.ts @@ -1,6 +1,5 @@ import type * as Catalog from "@moq/hang/catalog"; import * as Json from "@moq/json"; -import type * as Moq from "@moq/net"; import type { Effect } from "@moq/signals"; /** @@ -25,7 +24,7 @@ export class CatalogProducer { } /** Serve a subscription request: seed it with the current catalog, then forward updates. */ - serve(track: Moq.TrackProducer, effect: Effect): void { + serve(track: Json.TrackProducer, effect: Effect): void { const output = new Json.Producer(track); output.update(this.#value); diff --git a/js/publish/src/container.ts b/js/publish/src/container.ts new file mode 100644 index 000000000..319cbf429 --- /dev/null +++ b/js/publish/src/container.ts @@ -0,0 +1,33 @@ +import { Container } from "@moq/hang"; +import type * as Moq from "@moq/wasm"; +import type { Time } from "@moq/wasm"; + +type Source = Container.Legacy.Source; + +// Helper to encode frames into a track, one group per keyframe. Moved out of +// @moq/hang so that hang stays serialization-only (no networking model); the +// frame encoding itself still lives in hang as `Container.Legacy.encodeFrame`. +export class Producer { + #track: Moq.TrackProducer; + #group?: Moq.Group; + + constructor(track: Moq.TrackProducer) { + this.#track = track; + } + + encode(data: Uint8Array | Source, timestamp: Time.Micro, keyframe: boolean) { + if (keyframe) { + this.#group?.close(); + this.#group = this.#track.appendGroup(); + } else if (!this.#group) { + throw new Error("must start with a keyframe"); + } + + this.#group?.writeFrame(Container.Legacy.encodeFrame(data, timestamp)); + } + + close(err?: Error) { + this.#track.close(err); + this.#group?.close(); + } +} diff --git a/js/publish/src/element.ts b/js/publish/src/element.ts index 5a59dcf5b..8e2d01abb 100644 --- a/js/publish/src/element.ts +++ b/js/publish/src/element.ts @@ -1,5 +1,5 @@ -import * as Moq from "@moq/net"; import { Effect, Signal } from "@moq/signals"; +import * as Moq from "@moq/wasm"; import { Broadcast } from "./broadcast"; import * as Preview from "./preview"; import * as Source from "./source"; diff --git a/js/publish/src/index.ts b/js/publish/src/index.ts index 082e2f005..552e65c6d 100644 --- a/js/publish/src/index.ts +++ b/js/publish/src/index.ts @@ -1,8 +1,8 @@ export * as Hang from "@moq/hang"; -export * as Net from "@moq/net"; -/** @deprecated Use `Net` instead. */ -export * as Lite from "@moq/net"; export * as Signals from "@moq/signals"; +export * as Net from "@moq/wasm"; +/** @deprecated Use `Net` instead. */ +export * as Lite from "@moq/wasm"; export * as Audio from "./audio"; export * from "./broadcast"; export * from "./catalog"; diff --git a/js/publish/src/preview.ts b/js/publish/src/preview.ts index 216748c92..5956d077d 100644 --- a/js/publish/src/preview.ts +++ b/js/publish/src/preview.ts @@ -1,5 +1,5 @@ -import { Time } from "@moq/net"; import { Effect, type Getter, Signal } from "@moq/signals"; +import { Time } from "@moq/wasm"; import type * as Video from "./video"; // What the canvas preview renders. diff --git a/js/publish/src/video/encoder.ts b/js/publish/src/video/encoder.ts index d9d004abb..a5dab8bf3 100644 --- a/js/publish/src/video/encoder.ts +++ b/js/publish/src/video/encoder.ts @@ -1,9 +1,9 @@ import * as Catalog from "@moq/hang/catalog"; -import * as Container from "@moq/hang/container"; import * as Util from "@moq/hang/util"; -import type * as Moq from "@moq/net"; -import { Time } from "@moq/net"; import { Effect, type Getter, Signal } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; +import { Time } from "@moq/wasm"; +import { Producer } from "../container.ts"; import type { Source } from "./types"; export interface EncoderProps { @@ -85,7 +85,7 @@ export class Encoder { serve(track: Moq.TrackProducer, effect: Effect): void { if (!effect.get(this.enabled)) return; - const producer = new Container.Legacy.Producer(track); + const producer = new Producer(track); effect.cleanup(() => producer.close()); let lastKeyframe: Time.Micro | undefined; diff --git a/js/publish/src/video/index.ts b/js/publish/src/video/index.ts index 547f1a8cf..a0155b54c 100644 --- a/js/publish/src/video/index.ts +++ b/js/publish/src/video/index.ts @@ -1,6 +1,6 @@ import * as Catalog from "@moq/hang/catalog"; -import type * as Moq from "@moq/net"; import { Effect, type Getter, Signal } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; import { Encoder, type EncoderProps } from "./encoder"; import { TrackProcessor } from "./polyfill"; import type { Source } from "./types"; diff --git a/js/publish/src/video/polyfill.ts b/js/publish/src/video/polyfill.ts index 70fa97e74..b2fbe2384 100644 --- a/js/publish/src/video/polyfill.ts +++ b/js/publish/src/video/polyfill.ts @@ -1,4 +1,4 @@ -import { Time } from "@moq/net"; +import { Time } from "@moq/wasm"; import type { StreamTrack } from "./types"; // Firefox doesn't support MediaStreamTrackProcessor so we need to use a polyfill. diff --git a/js/wasm/README.md b/js/wasm/README.md index 529f87a58..373f600e9 100644 --- a/js/wasm/README.md +++ b/js/wasm/README.md @@ -1,28 +1,44 @@ # @moq/wasm (experiment) Browser bindings for [`moq-net`](../../rs/moq-net), compiled to WebAssembly with -`wasm-bindgen`. This is the JS-facing half of the `rs/moq-wasm` crate: it -packages the generated bindings so a JS app can `import` the real Rust moq-lite -implementation instead of the hand-written TypeScript one in `@moq/net`. +`wasm-bindgen`. This is the JS-facing half of the `rs/moq-wasm` crate: it lets a +JS app `import` the real Rust moq-lite implementation instead of the +hand-written TypeScript one in `@moq/net`, so the wire protocol lives in exactly +one place (Rust). + +The package is a thin hand-written TypeScript shim (`src/`) over the wasm-bindgen +output (`dist/`). The shim presents the **same surface as `@moq/net`** so it can +be a drop-in replacement: the `Connection` / `Path` / `Time` namespaces, the +`Broadcast` / `Track*` / `Group` model classes, the string/json/bool +conveniences, options-object signatures, a reactive `state.closed` signal, and +`number` (not `bigint`) sequences. `Path` and `Time` are re-exported from +`@moq/net` (they carry no wire code, so a bundler tree-shakes the rest and the +branded types stay identical). ```ts -import init, * as Moq from "@moq/wasm"; +import * as Moq from "@moq/wasm"; -await init(); // load the wasm module (wasm-bindgen's default loader) -Moq.setup(); // install panic/tracing hooks for readable errors +const conn = await Moq.Connection.connect(new URL("https://relay.example.com/anon")); -const session = await Moq.Session.connect("https://relay.example.com/anon"); -const broadcast = await session.consume("room/alice"); -const track = await broadcast?.subscribe("video"); -for (let group = await track?.recvGroup(); group; group = await track?.recvGroup()) { +// Consume +const broadcast = conn.consume("room/alice"); // synchronous; subscribing waits for the announce +const track = await broadcast.subscribe("video"); +for (let group = await track.recvGroup(); group; group = await track.recvGroup()) { for (let frame = await group.readFrame(); frame; frame = await group.readFrame()) { // frame: Uint8Array } } -``` -The classes (`Moq.Session`, `Moq.Broadcast`, `Moq.Track`, `Moq.Group`) drop the -`Moq` prefix since they're already namespaced under the import. +// Publish +const out = new Moq.Broadcast(); +conn.publish("room/me", out); +for (let req = await out.requested(); req; req = await out.requested()) { + const producer = req.accept(); + const g = producer.appendGroup(); + g.writeFrame(new Uint8Array([1, 2, 3])); + g.close(); +} +``` ## Building @@ -35,12 +51,14 @@ just wasm That compiles `rs/moq-wasm` for `wasm32-unknown-unknown`, runs `wasm-bindgen` (web target) into `dist/`, and shrinks the binary with `wasm-opt`. The required toolchain (wasm target, `wasm-bindgen-cli`, `binaryen`) is provided by the Nix -dev shell. +dev shell. The shim loads the wasm lazily on the first `Connection.connect`. ## Status -Compiles and produces a typed, importable package. The consume path is -runtime-portable: `moq-net`'s timers and `Instant` go through `web_async::time` -(wasmtimer on wasm), so they no longer panic. Not yet exercised end-to-end in a -browser against a relay, and media muxing (`moq-mux`) is still out. See +The consume **and** publish paths are bound (including real announce discovery +via the `OriginConsumer`), and `@moq/watch` / `@moq/publish` now import +`@moq/wasm` directly. `moq-net`'s timers and `Instant` go through +`web_async::time` (wasmtimer on wasm), so they don't panic. Still pending: +end-to-end exercise in a browser against a relay, bandwidth/RTT telemetry +(declared but undefined), and media muxing (`moq-mux` is not yet wasm-ready). See [`rs/moq-wasm/README.md`](../../rs/moq-wasm/README.md) for details. diff --git a/js/wasm/package.json b/js/wasm/package.json index d213700a1..ad63f6f9d 100644 --- a/js/wasm/package.json +++ b/js/wasm/package.json @@ -7,20 +7,27 @@ "repository": "github:moq-dev/moq", "private": true, "files": [ + "src", "dist", "README.md" ], - "main": "./dist/moq.js", - "module": "./dist/moq.js", - "types": "./dist/moq.d.ts", + "sideEffects": false, "exports": { - ".": { - "types": "./dist/moq.d.ts", - "default": "./dist/moq.js" - } + ".": "./src/index.ts" }, - "sideEffects": [ - "./dist/moq.js", - "./dist/moq_bg.js" - ] + "imports": { + "#bindgen": "./dist/moq.js" + }, + "scripts": { + "check": "if [ -f dist/moq.js ]; then tsc --noEmit; else echo 'skipping @moq/wasm tsc: dist/ not built (run `just wasm` first)'; fi" + }, + "dependencies": { + "@moq/net": "workspace:^", + "@moq/signals": "workspace:^" + }, + "devDependencies": { + "@types/bun": "^1.3.14", + "@typescript/lib-dom": "npm:@types/web@^0.0.350", + "typescript": "^6.0.3" + } } diff --git a/js/wasm/src/connection.ts b/js/wasm/src/connection.ts new file mode 100644 index 000000000..ea68121b4 --- /dev/null +++ b/js/wasm/src/connection.ts @@ -0,0 +1,249 @@ +// The `Connection` namespace, mirroring `@moq/net`'s `Connection.connect` / +// `Established` / `Reload`. The wire lives in wasm (`Session` + its +// `OriginConsumer`); this establishes it and presents the `@moq/net` shape. +// +// `Reload` is backend-agnostic reconnection + announce-aggregation logic, ported +// from `@moq/net` with the `connect` import pointed at the wasm one. + +import { Path, type Time } from "@moq/net"; +import { Effect, type Getter, Signal } from "@moq/signals"; +import * as Wasm from "#bindgen"; +import type { Announced, Broadcast } from "./index.ts"; +import { init, Session } from "./index.ts"; + +/** A connected MoQ session. Structurally matches `@moq/net`'s `Connection.Established`. */ +export interface Established { + readonly url: URL; + readonly version: string; + + // Telemetry, surfaced for `@moq/net` parity. Not yet bridged from wasm (undefined). + readonly sendBandwidth?: Signal; + readonly recvBandwidth?: Signal; + readonly rtt?: Signal; + + consume(broadcast: string): Broadcast; + publish(path: string, broadcast: Broadcast): void; + announced(prefix?: Path.Valid): Announced; + close(): void; + closed: Promise; +} + +// WebTransport-only for now: the wasm transport has no WebSocket fallback, so +// these options are accepted for `@moq/net` parity but ignored. +export interface WebSocketOptions { + enabled?: boolean; + url?: URL; + delay?: DOMHighResTimeStamp; +} + +export interface ConnectProps { + webtransport?: WebTransportOptions; + websocket?: WebSocketOptions; +} + +/** + * Establish a connection to a MoQ relay over WebTransport. + * + * Over `http://` (local dev) this fetches the relay's self-signed certificate + * fingerprint from `/certificate.sha256`, pins it, and upgrades the scheme to + * `https://` (WebTransport requires https). Mirrors `@moq/net`. Pass explicit + * `webtransport.serverCertificateHashes` for serverless dev; otherwise the + * system roots are used. + */ +export async function connect(url: URL, props?: ConnectProps): Promise { + await init(); + + const hashes: Uint8Array[] = (props?.webtransport?.serverCertificateHashes ?? []).map(hashToBytes); + + let target = url; + if (url.protocol === "http:") { + const fingerprintUrl = new URL(url); + fingerprintUrl.pathname = "/certificate.sha256"; + fingerprintUrl.search = ""; + console.warn(fingerprintUrl.toString(), "performing an insecure fingerprint fetch; use https:// in production"); + + const res = await fetch(fingerprintUrl); + hashes.push(hexToBytes((await res.text()).trim())); + + target = new URL(url); + target.protocol = "https:"; + } + + const inner = hashes.length + ? await Wasm.Session.connectWithHashes(target.toString(), hashes) + : await Wasm.Session.connect(target.toString()); + + // Keep the original URL (e.g. http://localhost) for display, like @moq/net. + return new Session(url, inner); +} + +function hashToBytes(hash: WebTransportHash): Uint8Array { + const value = hash.value; + if (value instanceof ArrayBuffer) return new Uint8Array(value); + // ArrayBufferView (e.g. Uint8Array): copy its viewed bytes. + return new Uint8Array(value.buffer, value.byteOffset, value.byteLength); +} + +function hexToBytes(hex: string): Uint8Array { + const bytes = new Uint8Array(hex.length >> 1); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = Number.parseInt(hex.slice(i * 2, i * 2 + 2), 16); + } + return bytes; +} + +export type ReloadDelay = { + initial: DOMHighResTimeStamp; + multiplier: number; + max: DOMHighResTimeStamp; + timeout?: DOMHighResTimeStamp; +}; + +export type ReloadProps = ConnectProps & { + enabled?: boolean | Signal; + url?: URL | Signal; + delay?: ReloadDelay; +}; + +export type ReloadStatus = "connecting" | "connected" | "disconnected"; + +/** Reconnects on drop and aggregates announce events into a reactive set. */ +export class Reload { + url: Signal; + enabled: Signal; + + status = new Signal("disconnected"); + established = new Signal(undefined); + + #announced = new Signal>(new Set()); + readonly announced: Getter> = this.#announced; + + webtransport?: WebTransportOptions; + websocket: WebSocketOptions | undefined; + + delay: ReloadDelay; + + signals = new Effect(); + + closed: Promise; + #closedResolve!: () => void; + #closedReject!: (err: Error) => void; + + #delay: DOMHighResTimeStamp; + #retryStart: DOMHighResTimeStamp | undefined; + #tick = new Signal(0); + + constructor(props?: ReloadProps) { + this.url = Signal.from(props?.url); + this.enabled = Signal.from(props?.enabled ?? false); + this.delay = props?.delay ?? { initial: 1000, multiplier: 2, max: 30000 }; + this.webtransport = props?.webtransport; + this.websocket = props?.websocket; + + this.#delay = this.delay.initial; + + this.closed = new Promise((resolve, reject) => { + this.#closedResolve = resolve; + this.#closedReject = reject; + }); + + this.signals.run(this.#connect.bind(this)); + this.signals.run(this.#runAnnounced.bind(this)); + } + + #connect(effect: Effect): void { + effect.get(this.#tick); + + const enabled = effect.get(this.enabled); + if (!enabled) return; + + const url = effect.get(this.url); + if (!url) return; + + effect.set(this.status, "connecting", "disconnected"); + + effect.spawn(async () => { + try { + const pending = connect(url, { websocket: this.websocket, webtransport: this.webtransport }); + + const connection = await Promise.race([effect.cancel, pending]); + if (!connection) { + pending.then((conn) => conn.close()).catch(() => {}); + return; + } + + effect.set(this.established, connection); + effect.cleanup(() => connection.close()); + + effect.set(this.status, "connected", "disconnected"); + + this.#delay = this.delay.initial; + this.#retryStart = undefined; + + await Promise.race([effect.cancel, connection.closed]); + } catch (err) { + console.warn("connection error:", err); + + this.#retryStart ??= performance.now(); + + const timeout = this.delay.timeout ?? 300000; + if (timeout > 0) { + const elapsed = performance.now() - this.#retryStart; + if (elapsed >= timeout) { + console.warn("reconnect timed out"); + this.#closedReject(err instanceof Error ? err : new Error(String(err))); + return; + } + } + + const tick = this.#tick.peek() + 1; + effect.timer(() => this.#tick.update((prev) => Math.max(prev, tick)), this.#delay); + + this.#delay = Math.min(this.#delay * this.delay.multiplier, this.delay.max); + } + }); + } + + #runAnnounced(effect: Effect): void { + this.#announced.set(new Set()); + + const conn = effect.get(this.established); + if (!conn) return; + + effect.cleanup(() => this.#announced.set(new Set())); + + // Cloudflare's relay does not yet support SUBSCRIBE_NAMESPACE, so skip + // announce subscriptions entirely for those hosts. + if (conn.url.hostname.endsWith("mediaoverquic.com")) { + return; + } + + const announced = conn.announced(Path.empty()); + effect.cleanup(() => announced.close()); + + effect.spawn(async () => { + try { + for (;;) { + const entry = await Promise.race([effect.cancel, announced.next()]); + if (!entry) break; + + this.#announced.mutate((active) => { + if (entry.active) { + active.add(entry.path); + } else { + active.delete(entry.path); + } + }); + } + } catch (err) { + this.#announced.set(new Set()); + throw err; + } + }); + } + + close() { + this.signals.close(); + this.#closedResolve(); + } +} diff --git a/js/wasm/src/index.ts b/js/wasm/src/index.ts new file mode 100644 index 000000000..395aa4ca3 --- /dev/null +++ b/js/wasm/src/index.ts @@ -0,0 +1,399 @@ +// Hand-written TypeScript shim over the wasm-bindgen output (`../dist/moq.js`). +// +// The generated classes are primitive (frames are `Uint8Array`, options are +// positional, `sequence` is a `bigint`). This layer wraps them to present the +// exact `@moq/net` surface so it can be a drop-in replacement: the +// string/json/bool conveniences, options-object signatures, the `Connection` +// namespace, a reactive `state.closed` signal, lazy `consume`, and `number` +// sequences. +// +// The pure helpers (`Path`, `Time`) and the `TrackInfo` type are re-exported +// from `@moq/net` rather than copied: they carry no wire code, so with +// `@moq/net`'s `sideEffects: false` a bundler drops everything else, and the +// branded types stay identical to the ones publish/watch already import. (A +// future `@moq/model` package would hold these and let `@moq/wasm` stand alone.) + +import { Path, Time, type TrackInfo } from "@moq/net"; +import { Signal } from "@moq/signals"; +import initWasm, * as Wasm from "#bindgen"; + +export type { AnnouncedEntry } from "@moq/net"; +// Pure, wire-free helpers re-exported from @moq/net so the barrel matches it +// (tree-shaken via @moq/net's `sideEffects: false`; identical branded types). +export { Signals, Varint } from "@moq/net"; +export * as Connection from "./connection.ts"; +export type { TrackInfo }; +export { Path, Time }; + +// Load the wasm module once. `--target web` fetches `moq_bg.wasm` relative to +// the JS via `import.meta.url`, which bundlers (vite/esbuild) resolve as an asset. +let loaded: Promise | undefined; +export function init(): Promise { + if (!loaded) { + loaded = initWasm().then(() => { + Wasm.setup(); + }); + } + return loaded; +} + +const textEncoder = new TextEncoder(); +const textDecoder = new TextDecoder(); + +/** + * A request for a track the peer wants, yielded by {@link Broadcast.requested}. + * Mirrors `@moq/net`'s `TrackRequest`. + */ +export class TrackRequest { + #inner: Wasm.TrackRequest; + readonly name: string; + readonly priority: number; + + constructor(inner: Wasm.TrackRequest) { + this.#inner = inner; + this.name = inner.name; + this.priority = inner.priority; + } + + /** Accept the request, committing the immutable {@link TrackInfo} and returning a producer. */ + accept(info: Partial = {}): TrackProducer { + return new TrackProducer(this.#inner.accept(info)); + } + + /** Reject the request, closing the track. */ + reject(_err?: Error): void { + this.#inner.reject(); + } +} + +/** A lazy handle to a track on a consumed broadcast. Mirrors `@moq/net`'s `TrackConsumer`. */ +export class TrackConsumer { + // Resolves the underlying broadcast (which may still be waiting for its announce). + #broadcast: () => Promise; + readonly name: string; + + constructor(broadcast: () => Promise, name: string) { + this.#broadcast = broadcast; + this.name = name; + } + + async #track(): Promise { + const broadcast = await this.#broadcast(); + if (!broadcast) throw new Error(`broadcast not found for track ${this.name}`); + return broadcast.track(this.name); + } + + /** + * Open a live subscription, streaming the track's groups. `priority` defaults + * to `0`. Returns synchronously (like `@moq/net`); the wire subscribe resolves + * in the background and the reader methods await it. + */ + subscribe(options?: { priority?: number }): TrackSubscriber { + const priority = options?.priority ?? 0; + const inner = this.#track().then((track) => track.subscribe(priority)); + return new TrackSubscriber(this.name, inner); + } + + /** Fetch the track's immutable publisher properties without subscribing (lite-05+). */ + async info(): Promise { + const track = await this.#track(); + return (await track.info()) as TrackInfo; + } +} + +/** Reactive `state.closed`, the one piece of `@moq/net`'s model the apps read directly. */ +class ClosedState { + readonly closed = new Signal(false); +} + +/** The write side of a track. Mirrors `@moq/net`'s `TrackProducer`. */ +export class TrackProducer { + #inner: Wasm.TrackProducer; + readonly name: string; + readonly state = new ClosedState(); + readonly closed: Promise; + + constructor(inner: Wasm.TrackProducer) { + this.#inner = inner; + this.name = inner.name; + + this.closed = inner.closed().then((msg) => { + const err = msg ? new Error(msg) : undefined; + this.state.closed.set(err ?? true); + return err; + }); + } + + /** Append a new group with the next sequence number. */ + appendGroup(): Group { + return Group.fromWasm(this.#inner.appendGroup()); + } + + /** Append a frame as its own single-frame group. */ + writeFrame(frame: Uint8Array): void { + this.#inner.writeFrame(frame); + } + + writeString(str: string): void { + this.writeFrame(textEncoder.encode(str)); + } + + writeJson(json: unknown): void { + this.writeString(JSON.stringify(json)); + } + + writeBool(bool: boolean): void { + this.writeFrame(new Uint8Array([bool ? 1 : 0])); + } + + /** Close the track (cleanly when no error, aborting otherwise). */ + close(abort?: Error): void { + if (abort) this.#inner.abort(abort.message); + else this.#inner.close(); + this.state.closed.set(abort ?? true); + } +} + +/** The read side of a live track subscription. Mirrors `@moq/net`'s `TrackSubscriber`. */ +export class TrackSubscriber { + // The wire subscribe resolves in the background; readers await it. This keeps + // `subscribe()` synchronous, matching `@moq/net`. + #inner: Promise; + #nextSequence = 0; + readonly name: string; + + constructor(name: string, inner: Promise) { + this.name = name; + this.#inner = inner; + } + + async info(): Promise { + return (await this.#inner).info() as TrackInfo; + } + + /** Receive the next group in arrival order, or `undefined` when the track ends. */ + async recvGroup(): Promise { + const group = await (await this.#inner).recvGroup(); + return group ? Group.fromWasm(group) : undefined; + } + + /** Next group with a strictly-greater sequence than the last returned (skips late arrivals). */ + async nextGroup(): Promise { + const inner = await this.#inner; + for (;;) { + const group = await inner.nextGroup(); + if (!group) return undefined; + const wrapped = Group.fromWasm(group); + if (wrapped.sequence < this.#nextSequence) { + wrapped.close(); + continue; + } + this.#nextSequence = wrapped.sequence + 1; + return wrapped; + } + } + + async readFrame(): Promise { + const group = await this.recvGroup(); + if (!group) return undefined; + const frame = await group.readFrame(); + group.close(); + return frame; + } + + async readString(): Promise { + const frame = await this.readFrame(); + return frame ? textDecoder.decode(frame) : undefined; + } + + async readJson(): Promise { + const str = await this.readString(); + return str ? JSON.parse(str) : undefined; + } + + async readBool(): Promise { + const frame = await this.readFrame(); + return frame ? frame[0] === 1 : undefined; + } + + updatePriority(priority: number): void { + void this.#inner.then((s) => s.updatePriority(priority)); + } + + /** Stop the subscription (unsubscribes once the wire subscribe resolves). */ + close(_abort?: Error): void { + void this.#inner.then((s) => s.close()); + } +} + +/** A group of frames: writable when produced, readable when consumed. Mirrors `@moq/net`'s `Group`. */ +export class Group { + #inner: Wasm.Group; + readonly sequence: number; + + private constructor(inner: Wasm.Group) { + this.#inner = inner; + this.sequence = Number(inner.sequence); + } + + static fromWasm(inner: Wasm.Group): Group { + return new Group(inner); + } + + writeFrame(frame: Uint8Array): void { + this.#inner.writeFrame(frame); + } + + writeString(str: string): void { + this.writeFrame(textEncoder.encode(str)); + } + + writeJson(json: unknown): void { + this.writeString(JSON.stringify(json)); + } + + writeBool(bool: boolean): void { + this.writeFrame(new Uint8Array([bool ? 1 : 0])); + } + + async readFrame(): Promise { + return (await this.#inner.readFrame()) ?? undefined; + } + + async readString(): Promise { + const frame = await this.readFrame(); + return frame ? textDecoder.decode(frame) : undefined; + } + + async readJson(): Promise { + const str = await this.readString(); + return str ? JSON.parse(str) : undefined; + } + + async readBool(): Promise { + const frame = await this.readFrame(); + return frame ? frame[0] === 1 : undefined; + } + + close(_abort?: Error): void { + this.#inner.close(); + } +} + +/** + * A broadcast: construct one to publish (`new Broadcast()`), or receive one from + * {@link Session.consume}. Mirrors `@moq/net`'s dual-use `Broadcast`. + */ +export class Broadcast { + // Producer side: a concrete wasm handle. Consume side: a (memoized) resolver + // that waits for the broadcast's announce on first use, keeping `consume` sync. + readonly #producer?: Wasm.Broadcast; + readonly #resolve?: () => Promise; + #resolved?: Promise; + + constructor(resolve?: () => Promise) { + if (resolve) this.#resolve = resolve; + else this.#producer = new Wasm.Broadcast(); + } + + // The wasm handle for Session.publish; only meaningful on a producer broadcast. + get handle(): Wasm.Broadcast | undefined { + return this.#producer; + } + + #broadcast(): Promise { + if (this.#producer) return Promise.resolve(this.#producer); + if (!this.#resolved) this.#resolved = this.#resolve?.() ?? Promise.resolve(undefined); + return this.#resolved; + } + + /** A track requested over the network (producer side), or `undefined` once closed. */ + async requested(): Promise { + if (!this.#producer) throw new Error("requested() is only valid on a published broadcast"); + const request = await this.#producer.requested(); + return request ? new TrackRequest(request) : undefined; + } + + /** Get a lazy {@link TrackConsumer} handle for a track (consumer side). */ + track(name: string): TrackConsumer { + return new TrackConsumer(() => this.#broadcast(), name); + } + + /** Open a live subscription to a track (consumer side). */ + subscribe(name: string, priority = 0): TrackSubscriber { + return this.track(name).subscribe({ priority }); + } + + close(_abort?: Error): void { + this.#producer?.close(); + void this.#resolved?.then((b) => b?.close()); + } +} + +/** A live announce / unannounce stream. Mirrors `@moq/net`'s `Announced`. */ +export class Announced { + #inner: Wasm.Announced; + + constructor(inner: Wasm.Announced) { + this.#inner = inner; + } + + /** The next `{ path, active }` event, or `undefined` once the stream ends. */ + async next(): Promise<{ path: Path.Valid; active: boolean } | undefined> { + const entry = (await this.#inner.next()) as { path: string; active: boolean } | undefined; + if (!entry) return undefined; + return { path: entry.path as Path.Valid, active: entry.active }; + } + + close(): void { + this.#inner.close(); + } +} + +/** A connected session, presented as a {@link Connection.Established}. */ +export class Session { + readonly url: URL; + readonly version: string; + readonly closed: Promise; + + // Bandwidth / RTT telemetry isn't surfaced from wasm yet (always undefined); + // declared for `@moq/net` `Established` parity. + readonly sendBandwidth?: Signal; + readonly recvBandwidth?: Signal; + readonly rtt?: Signal; + + #inner: Wasm.Session; + // The OriginConsumer carries announce discovery + consume (mirrors moq-net). + #consumer: Wasm.OriginConsumer; + + constructor(url: URL, inner: Wasm.Session) { + this.url = url; + this.#inner = inner; + this.version = inner.version(); + this.closed = inner.closed(); + this.#consumer = inner.consumer(); + } + + /** A lazy {@link Broadcast} handle; subscribing waits for its announce. */ + consume(broadcast: string): Broadcast { + return new Broadcast(() => this.#consumer.consume(broadcast)); + } + + /** + * Stream announce / unannounce events. The OriginConsumer is already scoped, + * so `prefix` is accepted for `@moq/net` parity but not used as a filter. + */ + announced(_prefix?: Path.Valid): Announced { + return new Announced(this.#consumer.announced()); + } + + publish(path: string, broadcast: Broadcast): void { + const handle = broadcast.handle; + if (!handle) throw new Error("can only publish a broadcast created with new Broadcast()"); + this.#inner.publish(path, handle); + } + + close(): void { + this.#inner.close(); + } +} diff --git a/js/wasm/tsconfig.json b/js/wasm/tsconfig.json new file mode 100644 index 000000000..005a52cca --- /dev/null +++ b/js/wasm/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../tsconfig.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "dist", + "emitDeclarationOnly": true, + "types": ["bun"] + }, + "include": ["src"] +} diff --git a/js/watch/package.json b/js/watch/package.json index c12885aa2..6c609548d 100644 --- a/js/watch/package.json +++ b/js/watch/package.json @@ -27,6 +27,7 @@ "@moq/hang": "workspace:^", "@moq/json": "workspace:^", "@moq/net": "workspace:^", + "@moq/wasm": "workspace:^", "@moq/msf": "workspace:^", "@moq/signals": "workspace:^" }, diff --git a/js/watch/src/audio/buffer.ts b/js/watch/src/audio/buffer.ts index 9ca13c5d8..039f0b4d2 100644 --- a/js/watch/src/audio/buffer.ts +++ b/js/watch/src/audio/buffer.ts @@ -1,5 +1,5 @@ -import { Time } from "@moq/net"; import { Effect, type Getter, Signal } from "@moq/signals"; +import { Time } from "@moq/wasm"; import type { Data, InitPost, InitShared, Latency, State } from "./render"; import { allocSharedRingBuffer, SharedRingBuffer } from "./shared-ring-buffer"; diff --git a/js/watch/src/audio/decoder.ts b/js/watch/src/audio/decoder.ts index 0d8e3849c..5f5130d25 100644 --- a/js/watch/src/audio/decoder.ts +++ b/js/watch/src/audio/decoder.ts @@ -1,11 +1,12 @@ import * as Catalog from "@moq/hang/catalog"; import * as Container from "@moq/hang/container"; import * as Util from "@moq/hang/util"; -import type * as Moq from "@moq/net"; -import { Time } from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; +import { Time } from "@moq/wasm"; import type { BufferedRanges } from "../backend"; import { base64ToBytes } from "../base64"; +import { Consumer } from "../container.ts"; import type { Sync } from "../sync"; import { type AudioBuffer, createAudioBuffer } from "./buffer"; // Compiled and inlined as a blob URL via vite-plugin-worklet. @@ -198,7 +199,7 @@ export class Decoder { const format = config.container.kind === "loc" ? new Container.Loc.Format() : new Container.Legacy.Format(); // Create consumer with slightly less latency than the render worklet to avoid underflowing. // TODO include JITTER_UNDERHEAD - const consumer = new Container.Consumer(sub, { + const consumer = new Consumer(sub, { format, latency: this.sync.output.buffer, }); @@ -285,7 +286,7 @@ export class Decoder { ? Util.Hex.toBytes(config.description) : init.description; - const consumer = new Container.Consumer(sub, { + const consumer = new Consumer(sub, { format: new Container.Cmaf.Format(init), latency: this.sync.output.buffer, }); diff --git a/js/watch/src/audio/mse.ts b/js/watch/src/audio/mse.ts index d13b4782f..290736b09 100644 --- a/js/watch/src/audio/mse.ts +++ b/js/watch/src/audio/mse.ts @@ -1,9 +1,10 @@ import * as Catalog from "@moq/hang/catalog"; import * as Container from "@moq/hang/container"; -import * as Moq from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import * as Moq from "@moq/wasm"; import { type BufferedRanges, timeRangesToArray } from "../backend"; import { base64ToBytes } from "../base64"; +import { Consumer } from "../container.ts"; import type { Muxer } from "../mse"; import type { Sync } from "../sync"; import type { Backend, Stats } from "./backend"; @@ -151,7 +152,7 @@ export class Mse implements Backend { ): void { const format = config.container.kind === "loc" ? new Container.Loc.Format() : new Container.Legacy.Format(); // Create consumer that reorders groups/frames up to the provided latency. - const consumer = new Container.Consumer(sub, { + const consumer = new Consumer(sub, { format, latency: this.sync.output.buffer, }); diff --git a/js/watch/src/audio/render.ts b/js/watch/src/audio/render.ts index 09f23efe2..d8a9bd43c 100644 --- a/js/watch/src/audio/render.ts +++ b/js/watch/src/audio/render.ts @@ -1,4 +1,4 @@ -import type { Time } from "@moq/net"; +import type { Time } from "@moq/wasm"; import type { SharedRingBufferInit } from "./shared-ring-buffer"; export type Message = InitShared | InitPost | Data | Latency; diff --git a/js/watch/src/audio/ring-buffer.ts b/js/watch/src/audio/ring-buffer.ts index 3048607a3..15b59031b 100644 --- a/js/watch/src/audio/ring-buffer.ts +++ b/js/watch/src/audio/ring-buffer.ts @@ -1,4 +1,4 @@ -import { Time } from "@moq/net"; +import { Time } from "@moq/wasm"; export class AudioRingBuffer { #buffer: Float32Array[]; diff --git a/js/watch/src/audio/shared-ring-buffer.ts b/js/watch/src/audio/shared-ring-buffer.ts index 2954fd772..2f035e4b6 100644 --- a/js/watch/src/audio/shared-ring-buffer.ts +++ b/js/watch/src/audio/shared-ring-buffer.ts @@ -1,4 +1,4 @@ -import { Time } from "@moq/net"; +import { Time } from "@moq/wasm"; // Control array slot indices const WRITE = 0; diff --git a/js/watch/src/audio/source.ts b/js/watch/src/audio/source.ts index fc8d34a27..dceadbadc 100644 --- a/js/watch/src/audio/source.ts +++ b/js/watch/src/audio/source.ts @@ -1,7 +1,7 @@ import type * as Catalog from "@moq/hang/catalog"; -import type * as Moq from "@moq/net"; -import { Time } from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; +import { Time } from "@moq/wasm"; import type { Broadcast } from "../broadcast"; // AudioWorklet always renders in 128-sample quanta. diff --git a/js/watch/src/backend.ts b/js/watch/src/backend.ts index cf8e5b60e..5765720a5 100644 --- a/js/watch/src/backend.ts +++ b/js/watch/src/backend.ts @@ -1,5 +1,5 @@ -import * as Moq from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import * as Moq from "@moq/wasm"; import * as Audio from "./audio"; import type { Broadcast } from "./broadcast"; import { Muxer } from "./mse"; diff --git a/js/watch/src/broadcast.ts b/js/watch/src/broadcast.ts index dd5d92e6b..456ced0a6 100644 --- a/js/watch/src/broadcast.ts +++ b/js/watch/src/broadcast.ts @@ -1,9 +1,9 @@ import * as Catalog from "@moq/hang/catalog"; import * as Json from "@moq/json"; import * as Msf from "@moq/msf"; -import type * as Moq from "@moq/net"; -import { Path } from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; +import { Path } from "@moq/wasm"; import { toHang } from "./msf"; diff --git a/js/hang/src/container/consumer.test.ts b/js/watch/src/container.test.ts similarity index 75% rename from js/hang/src/container/consumer.test.ts rename to js/watch/src/container.test.ts index 42bd18e95..f5d31b09a 100644 --- a/js/hang/src/container/consumer.test.ts +++ b/js/watch/src/container.test.ts @@ -1,15 +1,10 @@ import { expect, test } from "bun:test"; +import { Cmaf, type Format as ContainerFormat, type Frame, Legacy } from "@moq/hang/container"; import { Group, type Time, TrackProducer, Varint } from "@moq/net"; -import type { InitSegment } from "./cmaf/decode.ts"; -import { encodeDataSegment } from "./cmaf/encode.ts"; -import { Format as CmafFormat } from "./cmaf/format.ts"; -import { Consumer } from "./consumer.ts"; -import type { Format as ContainerFormat } from "./format.ts"; -import { Format as LegacyFormat } from "./legacy.ts"; -import type { Frame } from "./types.ts"; +import { Consumer } from "./container.ts"; const TIMESCALE = 90_000; -const TEST_INIT: InitSegment = { +const TEST_INIT: Cmaf.InitSegment = { timescale: TIMESCALE, trackId: 1, defaultSampleDuration: 0, @@ -17,115 +12,6 @@ const TEST_INIT: InitSegment = { defaultSampleFlags: 0, }; -function encodeLegacyFrame(timestamp: Time.Micro, payload: Uint8Array): Uint8Array { - const tsBytes = Varint.encode(timestamp); - const data = new Uint8Array(tsBytes.byteLength + payload.byteLength); - data.set(tsBytes, 0); - data.set(payload, tsBytes.byteLength); - return data; -} - -// --- LegacyFormat --- - -test("LegacyFormat decodes a valid frame", () => { - const format = new LegacyFormat(); - const payload = new Uint8Array([0xde, 0xad]); - const timestamp = 1000 as Time.Micro; - const frame = encodeLegacyFrame(timestamp, payload); - - const result = format.decode(frame); - - expect(result).toHaveLength(1); - expect(result[0].timestamp).toBe(timestamp); - expect(result[0].data).toEqual(payload); - expect(result[0].keyframe).toBe(false); -}); - -test("LegacyFormat always returns keyframe: false", () => { - const format = new LegacyFormat(); - const frame = encodeLegacyFrame(0 as Time.Micro, new Uint8Array([0x01])); - - const [decoded] = format.decode(frame); - expect(decoded.keyframe).toBe(false); -}); - -test("LegacyFormat always returns exactly one frame", () => { - const format = new LegacyFormat(); - const frame = encodeLegacyFrame(5000 as Time.Micro, new Uint8Array([0x01, 0x02, 0x03])); - - const result = format.decode(frame); - expect(result).toHaveLength(1); -}); - -test("LegacyFormat throws on empty input", () => { - const format = new LegacyFormat(); - expect(() => format.decode(new Uint8Array(0))).toThrow(); -}); - -test("LegacyFormat throws on truncated input", () => { - const format = new LegacyFormat(); - // A varint that indicates more bytes follow but is truncated - expect(() => format.decode(new Uint8Array([0x80]))).toThrow(); -}); - -// --- CmafFormat --- - -test("CmafFormat decodes a valid keyframe segment", () => { - const format = new CmafFormat(TEST_INIT); - const segment = encodeDataSegment({ - data: new Uint8Array([0xca, 0xfe]), - timestamp: 0, - duration: 3000, - keyframe: true, - sequence: 0, - }); - - const result = format.decode(segment); - - expect(result).toHaveLength(1); - expect(result[0].data).toEqual(new Uint8Array([0xca, 0xfe])); - expect(result[0].timestamp).toBe(0 as Time.Micro); - expect(result[0].keyframe).toBe(true); -}); - -test("CmafFormat decodes a delta frame segment", () => { - const format = new CmafFormat(TEST_INIT); - const segment = encodeDataSegment({ - data: new Uint8Array([0xbe, 0xef]), - timestamp: 3000, - duration: 3000, - keyframe: false, - sequence: 1, - }); - - const result = format.decode(segment); - - expect(result).toHaveLength(1); - expect(result[0].keyframe).toBe(false); -}); - -test("CmafFormat converts timescale units to microseconds", () => { - const format = new CmafFormat(TEST_INIT); - // 90000 timescale units = 1 second = 1_000_000 microseconds - const segment = encodeDataSegment({ - data: new Uint8Array([0x01]), - timestamp: TIMESCALE, - duration: 3000, - keyframe: true, - sequence: 0, - }); - - const result = format.decode(segment); - expect(result[0].timestamp).toBe(1_000_000 as Time.Micro); -}); - -test("CmafFormat throws on corrupt segment", () => { - const format = new CmafFormat(TEST_INIT); - expect(() => format.decode(new Uint8Array([0x00, 0x01, 0x02]))).toThrow(); -}); - -// --- Consumer --- - function encodeLegacy(timestamp: Time.Micro): Uint8Array { const tsBytes = Varint.encode(timestamp); const payload = new Uint8Array([0xde, 0xad]); @@ -164,7 +50,7 @@ async function drainFrames( test("Consumer delivers frames from a single group", async () => { const track = new TrackProducer("test"); - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 500 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 500 as Time.Milli }); writeGroupWithLegacyFrames(track, 0, [0 as Time.Micro, 33_000 as Time.Micro]); track.close(); @@ -178,7 +64,7 @@ test("Consumer delivers frames from a single group", async () => { test("Consumer forces keyframe true at index 0", async () => { const track = new TrackProducer("test"); - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 500 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 500 as Time.Milli }); writeGroupWithLegacyFrames(track, 0, [0 as Time.Micro, 33_000 as Time.Micro]); track.close(); @@ -257,7 +143,7 @@ test("Consumer keeps frames decoded before an error (truncated GoP)", async () = test("Consumer close returns undefined from next()", async () => { const track = new TrackProducer("test"); - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 500 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 500 as Time.Milli }); const promise = consumer.next(); consumer.close(); @@ -268,7 +154,7 @@ test("Consumer close returns undefined from next()", async () => { test("Consumer throws on concurrent next() calls", async () => { const track = new TrackProducer("test"); - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 500 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 500 as Time.Milli }); // First call blocks waiting for data consumer.next(); @@ -281,7 +167,7 @@ test("Consumer throws on concurrent next() calls", async () => { test("Consumer skips groups via PTS-span when over latency", async () => { const track = new TrackProducer("test"); // Zero latency = skip everything that's not the latest - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 0 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 0 as Time.Milli }); // Write groups with increasing timestamps. With 0 latency, any PTS span > 0 triggers skip. writeGroupWithLegacyFrames(track, 0, [0 as Time.Micro]); @@ -300,7 +186,7 @@ test("Consumer skips groups via PTS-span when over latency", async () => { test("Consumer delivers groups in sequence order regardless of arrival order", async () => { const track = new TrackProducer("test"); - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 500 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 500 as Time.Milli }); writeGroupWithLegacyFrames(track, 2, [60_000 as Time.Micro]); writeGroupWithLegacyFrames(track, 0, [0 as Time.Micro]); @@ -319,7 +205,7 @@ test("Consumer delivers groups in sequence order regardless of arrival order", a test("Consumer rejects stale groups", async () => { const track = new TrackProducer("test"); - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 500 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 500 as Time.Milli }); // Group 5 arrives first (sets active = 5) writeGroupWithLegacyFrames(track, 5, [0 as Time.Micro]); @@ -344,7 +230,7 @@ test("Consumer rejects stale groups", async () => { test("Consumer next() returns group-done signals", async () => { const track = new TrackProducer("test"); - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 500 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 500 as Time.Milli }); writeGroupWithLegacyFrames(track, 0, [0 as Time.Micro, 33_000 as Time.Micro]); writeGroupWithLegacyFrames(track, 1, [66_000 as Time.Micro]); @@ -375,7 +261,7 @@ test("Consumer next() returns group-done signals", async () => { test("Consumer buffered signal updates as frames arrive", async () => { const track = new TrackProducer("test"); - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 500 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 500 as Time.Milli }); expect(consumer.buffered.peek()).toEqual([]); @@ -397,7 +283,7 @@ test("Consumer buffered signal updates as frames arrive", async () => { test("Consumer recovers from gap in group sequence numbers", async () => { const track = new TrackProducer("test"); - const consumer = new Consumer(track.subscribe(), { format: new LegacyFormat(), latency: 100 as Time.Milli }); + const consumer = new Consumer(track.subscribe(), { format: new Legacy.Format(), latency: 100 as Time.Milli }); writeGroupWithLegacyFrames(track, 0, [0 as Time.Micro, 20_000 as Time.Micro]); writeGroupWithLegacyFrames(track, 1, [40_000 as Time.Micro, 60_000 as Time.Micro]); @@ -438,9 +324,7 @@ test("Consumer handles empty decode result without deadlock", async () => { const frames = await drainFrames(consumer, 300); // The empty decode produces no frames, but the second MoQ frame does. - // Since index 0 was never used (empty result), the first actual frame gets index=1 → keyframe false? - // Actually index increments per sample, and empty decode means 0 samples → index stays at 0. - // So the next frame's first sample gets index=0 → keyframe=true. + // Since index 0 was never used (empty result), the first actual frame gets index=0 → keyframe=true. expect(frames).toHaveLength(1); expect(frames[0].keyframe).toBe(true); consumer.close(); @@ -451,13 +335,13 @@ test("Consumer handles empty decode result without deadlock", async () => { test("Consumer with CmafFormat delivers correct timestamps", async () => { const track = new TrackProducer("test"); const consumer = new Consumer(track.subscribe(), { - format: new CmafFormat(TEST_INIT), + format: new Cmaf.Format(TEST_INIT), latency: 500 as Time.Milli, }); const group = new Group(0); group.writeFrame( - encodeDataSegment({ + Cmaf.encodeDataSegment({ data: new Uint8Array([0xca, 0xfe]), timestamp: 0, duration: 3000, @@ -466,7 +350,7 @@ test("Consumer with CmafFormat delivers correct timestamps", async () => { }), ); group.writeFrame( - encodeDataSegment({ + Cmaf.encodeDataSegment({ data: new Uint8Array([0xbe, 0xef]), timestamp: 3000, duration: 3000, @@ -487,21 +371,6 @@ test("Consumer with CmafFormat delivers correct timestamps", async () => { consumer.close(); }); -test("CmafFormat decodes the per-sample duration", () => { - const format = new CmafFormat(TEST_INIT); - const segment = encodeDataSegment({ - data: new Uint8Array([0xca, 0xfe]), - timestamp: 0, - duration: 3000, - keyframe: true, - sequence: 0, - }); - - const [frame] = format.decode(segment); - // 3000 ticks / 90000 timescale * 1_000_000 = 33333µs - expect(frame.duration).toBe(33_333 as Time.Micro); -}); - // --- Duration skipping --- // Format whose frames carry a fixed 33ms duration; the timestamp is byte 0 (ms). diff --git a/js/hang/src/container/consumer.ts b/js/watch/src/container.ts similarity index 91% rename from js/hang/src/container/consumer.ts rename to js/watch/src/container.ts index b773142e9..bee02bd9c 100644 --- a/js/hang/src/container/consumer.ts +++ b/js/watch/src/container.ts @@ -1,9 +1,18 @@ -import type { Time } from "@moq/net"; -import * as Moq from "@moq/net"; +import type { BufferedRanges, Format, Frame } from "@moq/hang/container"; import { Effect, type Getter, type GetterInit, getter, Signal } from "@moq/signals"; - -import type { Format } from "./format"; -import type { BufferedRanges, Frame } from "./types"; +import { Time } from "@moq/wasm"; + +// Minimal structural track surface this consumer pumps, so it works with any +// networking backend (@moq/net, @moq/wasm) or a test double. +interface GroupConsumer { + readonly sequence: number; + readFrame(): Promise; + close(): void; +} +interface TrackSubscriber { + recvGroup(): Promise; + close(): void; +} export interface ConsumerProps { format: Format; @@ -13,7 +22,7 @@ export interface ConsumerProps { } interface Group { - consumer: Moq.Group; + consumer: GroupConsumer; frames: Frame[]; // decode order latest?: Time.Micro; // The timestamp of the latest known frame end?: Time.Micro; // The furthest presentation point so far, i.e. max(timestamp + duration) @@ -21,7 +30,7 @@ interface Group { } export class Consumer { - #track: Moq.TrackSubscriber; + #track: TrackSubscriber; #format: Format; #latency: Getter; #groups: Group[] = []; @@ -35,10 +44,10 @@ export class Consumer { #signals = new Effect(); - constructor(track: Moq.TrackSubscriber, props: ConsumerProps) { + constructor(track: TrackSubscriber, props: ConsumerProps) { this.#track = track; this.#format = props.format; - this.#latency = getter(props.latency ?? Moq.Time.Milli.zero); + this.#latency = getter(props.latency ?? Time.Milli.zero); this.#signals.spawn(this.#run.bind(this)); this.#signals.cleanup(() => { @@ -169,7 +178,7 @@ export class Consumer { // This also handles gaps in group sequence numbers: if #active points to a missing // group, the latency span proves the missing content is too old to wait for. while (this.#groups.length >= 2) { - const threshold = Moq.Time.Micro.fromMilli(this.#latency.peek()); + const threshold = Time.Micro.fromMilli(this.#latency.peek()); // Check the difference between the earliest and latest known frames. let min: number | undefined; @@ -300,13 +309,13 @@ export class Consumer { const first = group.frames.at(0); if (!first || group.latest === undefined) continue; - const start = Moq.Time.Milli.fromMicro(first.timestamp); - const end = Moq.Time.Milli.fromMicro(group.latest); + const start = Time.Milli.fromMicro(first.timestamp); + const end = Time.Milli.fromMicro(group.latest); const last = ranges.at(-1); const contiguous = prev?.done && prev.consumer.sequence + 1 === group.consumer.sequence; if (last && (last.end >= start || contiguous)) { - last.end = Moq.Time.Milli.max(last.end, end); + last.end = Time.Milli.max(last.end, end); } else { ranges.push({ start, end }); } diff --git a/js/watch/src/element.ts b/js/watch/src/element.ts index 76c17acfe..9d25858a7 100644 --- a/js/watch/src/element.ts +++ b/js/watch/src/element.ts @@ -1,7 +1,7 @@ import type * as Catalog from "@moq/hang/catalog"; -import type { Time } from "@moq/net"; -import * as Moq from "@moq/net"; import { Effect, Signal } from "@moq/signals"; +import type { Time } from "@moq/wasm"; +import * as Moq from "@moq/wasm"; import { MultiBackend } from "./backend"; import { Broadcast, type CatalogFormat, parseCatalogFormat } from "./broadcast"; import type { Latency } from "./sync"; diff --git a/js/watch/src/index.ts b/js/watch/src/index.ts index 6103f721f..7adfb2d1c 100644 --- a/js/watch/src/index.ts +++ b/js/watch/src/index.ts @@ -1,8 +1,8 @@ export * as Hang from "@moq/hang"; -export * as Net from "@moq/net"; -/** @deprecated Use `Net` instead. */ -export * as Lite from "@moq/net"; export * as Signals from "@moq/signals"; +export * as Net from "@moq/wasm"; +/** @deprecated Use `Net` instead. */ +export * as Lite from "@moq/wasm"; export * as Audio from "./audio"; export * from "./backend"; export * from "./broadcast"; diff --git a/js/watch/src/mse.ts b/js/watch/src/mse.ts index 2696248e0..42977096f 100644 --- a/js/watch/src/mse.ts +++ b/js/watch/src/mse.ts @@ -1,5 +1,5 @@ -import { Time } from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import { Time } from "@moq/wasm"; import type { Sync } from "./sync"; type MuxerInput = { diff --git a/js/watch/src/sync.ts b/js/watch/src/sync.ts index 5475d7686..5f3a6827c 100644 --- a/js/watch/src/sync.ts +++ b/js/watch/src/sync.ts @@ -1,6 +1,6 @@ -import type * as Moq from "@moq/net"; -import { Time } from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; +import { Time } from "@moq/wasm"; /** Latency: `"real-time"` auto-computes jitter from RTT; a `Time.Milli` sets a fixed jitter. */ export type Latency = "real-time" | Time.Milli; diff --git a/js/watch/src/ui/components/buffer-control.ts b/js/watch/src/ui/components/buffer-control.ts index d75ed50cb..bb85a9cf8 100644 --- a/js/watch/src/ui/components/buffer-control.ts +++ b/js/watch/src/ui/components/buffer-control.ts @@ -1,18 +1,18 @@ -import { Moq } from "@moq/hang"; import type { Effect } from "@moq/signals"; +import { Time } from "@moq/wasm"; import type { BufferedRanges } from "../.."; import type MoqWatch from "../../element"; -const MIN_RANGE = Moq.Time.Milli(0); -const RANGE_STEP = Moq.Time.Milli(10); -const DEFAULT_MAX = Moq.Time.Milli(4000); +const MIN_RANGE = Time.Milli(0); +const RANGE_STEP = Time.Milli(10); +const DEFAULT_MAX = Time.Milli(4000); const LABEL_WIDTH = 48; function drawRanges( canvas: HTMLCanvasElement, ranges: BufferedRanges, - timestamp: Moq.Time.Milli | undefined, - max: Moq.Time.Milli, + timestamp: Time.Milli | undefined, + max: Time.Milli, isBuffering: boolean, ) { const ctx = canvas.getContext("2d"); @@ -41,8 +41,8 @@ function drawRanges( for (let i = 0; i < ranges.length; i++) { const range = ranges[i]; - const startMs = Moq.Time.Milli(range.start - timestamp); - const endMs = Moq.Time.Milli(range.end - timestamp); + const startMs = Time.Milli(range.start - timestamp); + const endMs = Time.Milli(range.end - timestamp); const visibleStart = Math.max(0, startMs); const visibleEnd = Math.min(endMs, max); @@ -74,7 +74,7 @@ function drawRanges( } } -export function bufferControl(parent: Effect, watch: MoqWatch, max: Moq.Time.Milli = DEFAULT_MAX): HTMLElement { +export function bufferControl(parent: Effect, watch: MoqWatch, max: Time.Milli = DEFAULT_MAX): HTMLElement { const wrapper = document.createElement("div"); wrapper.className = "buffer"; @@ -139,8 +139,8 @@ export function bufferControl(parent: Effect, watch: MoqWatch, max: Moq.Time.Mil const trackWidth = rect.width - LABEL_WIDTH; const x = Math.max(0, Math.min(clientX - rect.left - LABEL_WIDTH, trackWidth)); const ms = (x / trackWidth) * max; - const snapped = Moq.Time.Milli(Math.round(ms / RANGE_STEP) * RANGE_STEP); - const clamped = Moq.Time.Milli(Math.max(MIN_RANGE, Math.min(max, snapped))); + const snapped = Time.Milli(Math.round(ms / RANGE_STEP) * RANGE_STEP); + const clamped = Time.Milli(Math.max(MIN_RANGE, Math.min(max, snapped))); watch.controls.latency.set(clamped); }; @@ -169,18 +169,18 @@ export function bufferControl(parent: Effect, watch: MoqWatch, max: Moq.Time.Mil }); parent.event(viz, "keydown", (e) => { - let delta = Moq.Time.Milli(0); + let delta = Time.Milli(0); if (e.key === "ArrowRight" || e.key === "ArrowUp") { delta = RANGE_STEP; } else if (e.key === "ArrowLeft" || e.key === "ArrowDown") { - delta = Moq.Time.Milli(-RANGE_STEP); + delta = Time.Milli(-RANGE_STEP); } else { return; } e.preventDefault(); interact(); const current = watch.backend.output.jitter.peek(); - const value = Moq.Time.Milli(Math.max(MIN_RANGE, Math.min(max, current + delta))); + const value = Time.Milli(Math.max(MIN_RANGE, Math.min(max, current + delta))); watch.controls.latency.set(value); }); diff --git a/js/watch/src/ui/components/latency.ts b/js/watch/src/ui/components/latency.ts index 875d696e7..ed168c302 100644 --- a/js/watch/src/ui/components/latency.ts +++ b/js/watch/src/ui/components/latency.ts @@ -1,6 +1,6 @@ -import { Moq } from "@moq/hang"; import type { Effect } from "@moq/signals"; import * as DOM from "@moq/signals/dom"; +import { Time } from "@moq/wasm"; import type MoqWatch from "../../element"; import { formatMillis } from "../format"; import { bufferControl } from "./buffer-control"; @@ -25,7 +25,7 @@ export function latencyTab(parent: Effect, watch: MoqWatch): HTMLElement { const buttons = PRESETS.map((preset) => { const chip = DOM.create("button", { className: "chip", type: "button" }, preset.label); parent.event(chip, "click", () => { - watch.controls.latency.set(preset.value === "real-time" ? "real-time" : Moq.Time.Milli(preset.value)); + watch.controls.latency.set(preset.value === "real-time" ? "real-time" : Time.Milli(preset.value)); }); chips.appendChild(chip); return { preset, chip }; diff --git a/js/watch/src/video/backend.ts b/js/watch/src/video/backend.ts index 098d158d4..5f5d7875b 100644 --- a/js/watch/src/video/backend.ts +++ b/js/watch/src/video/backend.ts @@ -1,5 +1,5 @@ -import type * as Moq from "@moq/net"; import type { Getter } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; import type { BufferedRanges } from "../backend"; import type { Source } from "./source"; diff --git a/js/watch/src/video/decoder.ts b/js/watch/src/video/decoder.ts index 900839177..089266d6b 100644 --- a/js/watch/src/video/decoder.ts +++ b/js/watch/src/video/decoder.ts @@ -1,11 +1,12 @@ import * as Catalog from "@moq/hang/catalog"; import * as Container from "@moq/hang/container"; import * as Util from "@moq/hang/util"; -import type * as Moq from "@moq/net"; -import { Time } from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; +import { Time } from "@moq/wasm"; import type { BufferedRanges } from "../backend"; import { base64ToBytes } from "../base64"; +import { Consumer } from "../container.ts"; import type { Sync } from "../sync"; import type { Backend, Stats } from "./backend"; import type { Source } from "./source"; @@ -316,7 +317,7 @@ class DecoderTrack { const format = this.config.container.kind === "loc" ? new Container.Loc.Format() : new Container.Legacy.Format(); // Create consumer that reorders groups/frames up to the provided latency. - const consumer = new Container.Consumer(sub, { + const consumer = new Consumer(sub, { format, latency: this.sync.output.buffer, }); @@ -396,7 +397,7 @@ class DecoderTrack { const init = Container.Cmaf.decodeInitSegment(initSegment); const description = this.config.description ? Util.Hex.toBytes(this.config.description) : init.description; - const consumer = new Container.Consumer(sub, { + const consumer = new Consumer(sub, { format: new Container.Cmaf.Format(init), latency: this.sync.output.buffer, }); diff --git a/js/watch/src/video/mse.ts b/js/watch/src/video/mse.ts index 52b753b4e..ec76e974e 100644 --- a/js/watch/src/video/mse.ts +++ b/js/watch/src/video/mse.ts @@ -1,9 +1,10 @@ import * as Catalog from "@moq/hang/catalog"; import * as Container from "@moq/hang/container"; -import * as Moq from "@moq/net"; import { Effect, readonlys, Signal } from "@moq/signals"; +import * as Moq from "@moq/wasm"; import { type BufferedRanges, timeRangesToArray } from "../backend"; import { base64ToBytes } from "../base64"; +import { Consumer } from "../container.ts"; import type { Muxer } from "../mse"; import type { Sync } from "../sync"; import type { Backend, Stats } from "./backend"; @@ -155,7 +156,7 @@ export class Mse implements Backend { const format = config.container.kind === "loc" ? new Container.Loc.Format() : new Container.Legacy.Format(); // Create consumer that reorders groups/frames up to the provided latency. - const consumer = new Container.Consumer(data, { + const consumer = new Consumer(data, { format, latency: this.sync.output.buffer, }); diff --git a/js/watch/src/video/renderer.ts b/js/watch/src/video/renderer.ts index 12ca8cc40..1f8c253ee 100644 --- a/js/watch/src/video/renderer.ts +++ b/js/watch/src/video/renderer.ts @@ -1,5 +1,5 @@ -import { Time } from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import { Time } from "@moq/wasm"; import type { Decoder } from "./decoder"; type RendererInput = { diff --git a/js/watch/src/video/source.ts b/js/watch/src/video/source.ts index 18bde27b4..9c4fbd9de 100644 --- a/js/watch/src/video/source.ts +++ b/js/watch/src/video/source.ts @@ -1,7 +1,7 @@ import type * as Catalog from "@moq/hang/catalog"; -import type * as Moq from "@moq/net"; -import { Time } from "@moq/net"; import { Effect, type Getter, getter, type Inputs, type Readonlys, readonlys, Signal } from "@moq/signals"; +import type * as Moq from "@moq/wasm"; +import { Time } from "@moq/wasm"; import type { Broadcast } from "../broadcast"; /** diff --git a/rs/moq-wasm/Cargo.toml b/rs/moq-wasm/Cargo.toml index fbca5c887..6586b1b71 100644 --- a/rs/moq-wasm/Cargo.toml +++ b/rs/moq-wasm/Cargo.toml @@ -31,6 +31,7 @@ getrandom = { version = "0.4", features = ["wasm_js"] } # wasm_js backend for ra js-sys = "0.3" moq-net = { workspace = true, features = ["serde"] } thiserror = "2" +tracing = "0.1" tracing-wasm = "0.2" url = "2" # Pinned to the wasm-bindgen-cli version in the Nix dev shell: the bindgen @@ -39,4 +40,4 @@ wasm-bindgen = "=0.2.121" wasm-bindgen-futures = "0.4" web-async = { workspace = true } web-transport-trait = { workspace = true } -web-transport-wasm = "0.5" +web-transport-wasm = "0.5.8" # 0.5.8+ for ClientBuilder::with_protocols (ALPN) diff --git a/rs/moq-wasm/README.md b/rs/moq-wasm/README.md index 6c60d82f2..6a46b19f0 100644 --- a/rs/moq-wasm/README.md +++ b/rs/moq-wasm/README.md @@ -13,7 +13,7 @@ Go). Browsers need `wasm-bindgen`, so this is a separate sibling crate. (For *React Native* JS, `uniffi-bindgen-react-native` can reuse `moq-ffi` directly; that path is unrelated to this crate.) -## Status: compiles and ships a typed JS package; one runtime blocker left +## Status: full consume + publish surface, drop-in for `@moq/net` at the type level What works today: @@ -24,13 +24,25 @@ What works today: bridge from `web-transport-wasm` (browser WebTransport) to the `web-transport-trait` abstraction `moq-net` consumes. The orphan rule forces the newtypes; the shapes line up almost 1:1. -- **It compiles to `wasm32-unknown-unknown` and produces `@moq/wasm`**: `just - wasm` emits a typed, importable package (`Session` / `Broadcast` / `Track` / - `Group`, used as `Moq.Session` etc. via `import * as Moq`, `Promise`-returning - methods, `.d.ts`). Verified: it bundles under esbuild and a strict-mode TS - consumer type-checks against it. -- Scope is the consume path (connect -> broadcast -> track -> group -> frame), - the `@moq/watch` use case. The publish path follows the same shape. +- **Both the consume and publish paths are bound.** `lib.rs` exposes the full + producer/consumer model: `Session` (connect / consume / publish), a dual-use + `Broadcast` (with `requested`), `TrackRequest`, `TrackProducer`, + `TrackConsumer`, `TrackSubscriber`, and a dual-use `Group`. The bindings stay + primitive (frames are `Uint8Array`, options positional, `sequence` a `bigint`). +- **A hand-written TS shim (`js/wasm/src`) presents the `@moq/net` surface** on + top of those primitives: the `Connection` / `Path` / `Time` namespaces, the + string/json/bool conveniences, options-object signatures, a reactive + `state.closed` signal, lazy synchronous `consume`, and `number` sequences. It + type-checks against the exact call sites in `@moq/watch` / `@moq/publish`, so + it is a drop-in for the surface they use. + +Announce discovery is real: `Session::consumer` exposes a wasm `OriginConsumer` +with `announced()` (a live `{ path, active }` stream) and `consume()`, mirroring +`moq-net`. `@moq/watch` and `@moq/publish` import `@moq/wasm` directly today. + +Still pending: a real browser-against-relay run, bandwidth/RTT telemetry +(`Established.sendBandwidth`/`rtt` are declared but undefined), and `moq-mux` +media muxing (see below). ### Three moq-net changes this required (all landed here) diff --git a/rs/moq-wasm/src/lib.rs b/rs/moq-wasm/src/lib.rs index 8bae924ab..998c11785 100644 --- a/rs/moq-wasm/src/lib.rs +++ b/rs/moq-wasm/src/lib.rs @@ -5,22 +5,24 @@ //! implementation to WebAssembly and drive the browser's WebTransport from //! inside it. See `transport.rs` for the WebTransport adapter. //! -//! Scope: the consume path (connect -> broadcast -> track -> group -> frame), -//! which is the highest-value target (the `@moq/watch` use case). The publish -//! path follows the same shape and is left as the obvious next step. +//! The exported classes are deliberately primitive (frames are `Uint8Array`, +//! options are positional). The hand-written `@moq/wasm` TypeScript shim +//! (`js/wasm/src`) wraps them to present the exact `@moq/net` surface: the +//! string/json/bool conveniences, options-object signatures, the `Connection` +//! / `Path` / `Time` namespaces, and a reactive `state.closed` signal. Keeping +//! those in TS keeps this layer thin and the wasm boundary chatter-free. //! //! moq-net's timers and `Instant` go through `web_async::time` (tokio on native, -//! wasmtimer on wasm), so the consume path runs in the browser. (`model/time.rs` -//! has an unused wall-clock helper that isn't wasm-portable, but nothing calls -//! it, so it never runs. See README.md.) +//! wasmtimer on wasm), so both the consume and publish paths run in the browser. // Browser-only crate. Empty on native so `cargo check --workspace` stays green. #![cfg(target_arch = "wasm32")] use std::cell::RefCell; use std::rc::Rc; +use std::time::Duration; -use js_sys::Uint8Array; +use js_sys::{Object, Reflect, Uint8Array}; use wasm_bindgen::prelude::*; mod transport; @@ -30,16 +32,64 @@ fn js_err(e: impl std::fmt::Display) -> JsValue { JsError::new(&e.to_string()).into() } +/// Read an optional boolean property off a JS object. +fn get_bool(obj: &JsValue, key: &str) -> Option { + Reflect::get(obj, &JsValue::from_str(key)) + .ok() + .and_then(|v| v.as_bool()) +} + +/// Read an optional number property off a JS object. +fn get_f64(obj: &JsValue, key: &str) -> Option { + Reflect::get(obj, &JsValue::from_str(key)).ok().and_then(|v| v.as_f64()) +} + +/// Build a `moq_net::TrackInfo` from a JS `{ compress?, cache?, priority?, ordered? }`. +/// Unset fields keep their defaults. `cache` is milliseconds, matching `@moq/net`. +fn parse_track_info(value: &JsValue) -> moq_net::TrackInfo { + let mut info = moq_net::TrackInfo::default(); + if value.is_object() { + if let Some(c) = get_bool(value, "compress") { + info.compress = c; + } + if let Some(ms) = get_f64(value, "cache") { + info.cache = Duration::from_millis(ms as u64); + } + if let Some(p) = get_f64(value, "priority") { + info.priority = p as u8; + } + if let Some(o) = get_bool(value, "ordered") { + info.ordered = o; + } + } + info +} + +/// Serialize a `moq_net::TrackInfo` to a JS `{ compress, cache, priority, ordered }`. +fn track_info_to_js(info: &moq_net::TrackInfo) -> JsValue { + let obj = Object::new(); + let _ = Reflect::set(&obj, &"compress".into(), &info.compress.into()); + let _ = Reflect::set(&obj, &"cache".into(), &(info.cache.as_millis() as f64).into()); + let _ = Reflect::set(&obj, &"priority".into(), &(info.priority as f64).into()); + let _ = Reflect::set(&obj, &"ordered".into(), &info.ordered.into()); + obj.into() +} + /// Install panic + tracing hooks for readable errors. Call once after the wasm /// module's default `init()` loader resolves. (Named `setup` to avoid colliding /// with wasm-bindgen's default `init` export, which loads the module itself.) #[wasm_bindgen] pub fn setup() { console_error_panic_hook::set_once(); - let _ = tracing_wasm::try_set_as_global_default(); + + // Cap tracing at WARN. The default is TRACE, which logs every wire message; + // under heavy announce churn that floods the console and can freeze the page. + let mut config = tracing_wasm::WASMLayerConfigBuilder::new(); + config.set_max_level(tracing::Level::WARN); + tracing_wasm::set_as_global_default_with_config(config.build()); } -/// A connected MoQ session. +/// A connected MoQ session: the wasm counterpart of `@moq/net`'s `Connection.Established`. #[wasm_bindgen] pub struct Session { inner: moq_net::Session, @@ -64,8 +114,9 @@ impl Session { } async fn handshake(transport: transport::Session) -> Result { - let client = moq_net::Client::new(); - let inner = client.connect(transport).await.map_err(js_err)?; + // The default client shares one duplex origin for publish + consume, + // surfaced after connect as `Session::publisher` / `Session::consumer`. + let inner = moq_net::Client::new().connect(transport).await.map_err(js_err)?; Ok(Session { inner }) } @@ -79,44 +130,357 @@ impl Session { self.inner.closed().await.map_err(js_err) } + /// Close the session. + pub fn close(&mut self) { + self.inner.close(moq_net::Error::Cancel); + } + + /// The read handle over remote broadcasts: announce discovery + consume. + pub fn consumer(&self) -> OriginConsumer { + OriginConsumer { + inner: self.inner.consumer().clone(), + } + } + + /// Publish a local broadcast at the given path, announcing it to the relay. + /// + /// The broadcast must have been created with `new Broadcast()`. The announce + /// stays live until the broadcast is closed (dropped). + pub fn publish(&self, path: String, broadcast: &Broadcast) -> Result<(), JsValue> { + broadcast.publish_to(self.inner.publisher(), &path) + } +} + +/// The read handle over an origin's broadcasts: the wasm counterpart of +/// `moq-net`'s `OriginConsumer`. Carries both announce discovery and consume. +#[wasm_bindgen] +pub struct OriginConsumer { + inner: moq_net::OriginConsumer, +} + +#[wasm_bindgen] +impl OriginConsumer { + /// Stream announce / unannounce events for the broadcasts under this origin. + pub fn announced(&self) -> Announced { + Announced { + inner: Rc::new(RefCell::new(Some(self.inner.announced()))), + } + } + /// Subscribe to a broadcast by path, waiting until it is announced. pub async fn consume(&self, path: String) -> Result, JsValue> { - let broadcast = self.inner.consumer().announced_broadcast(path.as_str()).await; - Ok(broadcast.map(|inner| Broadcast { inner })) + let broadcast = self.inner.announced_broadcast(path.as_str()).await; + Ok(broadcast.map(Broadcast::from_consumer)) + } +} + +/// A stream of announce / unannounce events, yielding `{ path, active }`. +/// Mirrors `moq-net`'s `AnnounceConsumer`. +#[wasm_bindgen] +pub struct Announced { + // `next` is `&mut self`; held in a cell so the async method can move it out + // across the await (one in-flight call at a time), like the other consumers. + inner: Rc>>, +} + +#[wasm_bindgen] +impl Announced { + /// The next announce event as `{ path: string, active: boolean }`, or `null` + /// once the stream ends. `active` is false only for an unannounce. + pub async fn next(&self) -> Result, JsValue> { + let cell = self.inner.clone(); + let mut consumer = cell + .borrow_mut() + .take() + .ok_or_else(|| js_err("announced.next already in progress"))?; + let result = consumer.next().await; + *cell.borrow_mut() = Some(consumer); + + Ok(result.map(|(path, status)| { + let active = !matches!(status, moq_net::Announced::Ended); + let obj = Object::new(); + let _ = Reflect::set(&obj, &"path".into(), &JsValue::from_str(path.as_str())); + let _ = Reflect::set(&obj, &"active".into(), &active.into()); + obj + })) + } + + /// Stop receiving announce events. + pub fn close(&self) { + self.inner.borrow_mut().take(); } } -/// A consumer handle for a single broadcast. +// Producer-side broadcast state: the broadcast itself, its dynamic-track request +// handler, and the announce guard once published. +struct ProducerInner { + producer: moq_net::BroadcastProducer, + // `requested_track` needs `&mut self`; held in a cell so the async `requested` + // method can move it out across the await (one in-flight call at a time). + dynamic: Rc>>, + // Keeps the broadcast announced; dropping it unannounces. + publish: RefCell>, +} + +enum BroadcastInner { + Producer(Rc), + Consumer(moq_net::BroadcastConsumer), +} + +/// A broadcast: either one you publish (`new Broadcast()`) or one you consume +/// (`session.consume(path)`). Mirrors `@moq/net`'s dual-use `Broadcast`. #[wasm_bindgen] pub struct Broadcast { - inner: moq_net::BroadcastConsumer, + inner: BroadcastInner, } #[wasm_bindgen] impl Broadcast { - /// Subscribe to a track by name, resolving once the publisher accepts. - pub async fn subscribe(&self, name: String) -> Result { - let track = self.inner.track(&name).map_err(js_err)?; - let subscriber = track.subscribe(None).map_err(js_err)?.await.map_err(js_err)?; - Ok(Track { - inner: Rc::new(RefCell::new(Some(subscriber))), - }) + /// Create a publishable broadcast. Hand it to `session.publish(path, broadcast)`, + /// then answer `requested()` to serve tracks. + #[wasm_bindgen(constructor)] + pub fn new() -> Broadcast { + let producer = moq_net::BroadcastInfo::new().produce(); + let dynamic = producer.dynamic(); + Broadcast { + inner: BroadcastInner::Producer(Rc::new(ProducerInner { + producer, + dynamic: Rc::new(RefCell::new(Some(dynamic))), + publish: RefCell::new(None), + })), + } + } + + fn from_consumer(inner: moq_net::BroadcastConsumer) -> Broadcast { + Broadcast { + inner: BroadcastInner::Consumer(inner), + } + } + + fn publish_to(&self, origin: &moq_net::OriginProducer, path: &str) -> Result<(), JsValue> { + match &self.inner { + BroadcastInner::Producer(p) => { + let guard = origin.publish_broadcast(path, p.producer.consume()).map_err(js_err)?; + *p.publish.borrow_mut() = Some(guard); + Ok(()) + } + BroadcastInner::Consumer(_) => Err(js_err("cannot publish a consumed broadcast")), + } + } + + /// Wait for the next track the peer requests, or `null` once the broadcast ends. + /// Producer side only. + pub async fn requested(&self) -> Result, JsValue> { + let cell = match &self.inner { + BroadcastInner::Producer(p) => p.dynamic.clone(), + BroadcastInner::Consumer(_) => return Err(js_err("cannot accept requests on a consumed broadcast")), + }; + + let mut dynamic = cell + .borrow_mut() + .take() + .ok_or_else(|| js_err("requested already in progress"))?; + let result = dynamic.requested_track().await; + *cell.borrow_mut() = Some(dynamic); + + // A closed/dropped broadcast yields no more requests (JS `undefined`). + Ok(result.ok().map(|inner| TrackRequest { inner: Some(inner) })) + } + + /// Get a lazy consumer handle for a track by name. Consumer side only. + pub fn track(&self, name: String) -> Result { + match &self.inner { + BroadcastInner::Consumer(c) => { + let track = c.track(&name).map_err(js_err)?; + Ok(TrackConsumer { inner: track }) + } + BroadcastInner::Producer(_) => Err(js_err("cannot consume a track on a published broadcast")), + } + } + + /// Close the broadcast. Drops the announce guard (unpublishing) on the + /// producer side; drops the read handle on the consumer side. + pub fn close(&self) { + if let BroadcastInner::Producer(p) = &self.inner { + p.publish.borrow_mut().take(); + } + } +} + +impl Default for Broadcast { + fn default() -> Self { + Self::new() } } -/// A subscriber to a single track, yielding groups. +/// A track the peer requested, yielded by `Broadcast.requested`. Answer it with +/// `accept(info)` (returning a producer) or `reject(reason)`. #[wasm_bindgen] -pub struct Track { - // Rc>> for interior mutability: wasm-bindgen async methods - // take `&self` and must produce 'static futures, so we move the value out of - // the cell for the duration of the await rather than holding a borrow across - // it (which would make the future self-referential). One in-flight call at a - // time; a re-entrant call while one is pending errors instead of aliasing. +pub struct TrackRequest { + // `accept`/`reject` consume the request; `Option` lets the `&mut self` methods + // take it (wasm-bindgen can't take `self` by value). + inner: Option, +} + +#[wasm_bindgen] +impl TrackRequest { + #[wasm_bindgen(getter)] + pub fn name(&self) -> String { + self.inner.as_ref().map(|r| r.name().to_string()).unwrap_or_default() + } + + /// The subscriber's requested priority (0 if none yet). + #[wasm_bindgen(getter)] + pub fn priority(&self) -> u8 { + self.inner + .as_ref() + .and_then(|r| r.subscription()) + .map(|s| s.priority) + .unwrap_or(0) + } + + /// Accept the request, committing the immutable track properties and + /// returning a producer to write groups into. + pub fn accept(&mut self, info: JsValue) -> Result { + let req = self.inner.take().ok_or_else(|| js_err("request already answered"))?; + let producer = req.accept(parse_track_info(&info)); + Ok(TrackProducer::new(producer)) + } + + /// Reject the request, closing the track. + pub fn reject(&mut self) -> Result<(), JsValue> { + let req = self.inner.take().ok_or_else(|| js_err("request already answered"))?; + req.reject(moq_net::Error::Cancel); + Ok(()) + } +} + +/// A lazy consumer handle for a track. Mirrors `@moq/net`'s `TrackConsumer`. +#[wasm_bindgen] +pub struct TrackConsumer { + inner: moq_net::TrackConsumer, +} + +#[wasm_bindgen] +impl TrackConsumer { + #[wasm_bindgen(getter)] + pub fn name(&self) -> String { + self.inner.name().to_string() + } + + /// Open a live subscription at the given priority (default 0). + pub async fn subscribe(&self, priority: Option) -> Result { + let subscription = moq_net::Subscription::default().with_priority(priority.unwrap_or(0)); + let subscriber = self + .inner + .subscribe(subscription) + .map_err(js_err)? + .await + .map_err(js_err)?; + Ok(TrackSubscriber::new(subscriber)) + } + + /// Fetch the track's immutable publisher properties without subscribing. + /// Lite-05+ only. + pub async fn info(&self) -> Result { + let info = self.inner.info().await.map_err(js_err)?; + Ok(track_info_to_js(&info)) + } +} + +/// The write side of a track. Mirrors `@moq/net`'s `TrackProducer`. +#[wasm_bindgen] +pub struct TrackProducer { + // TrackProducer is cheaply clonable; cloning for the async `closed` waiter + // avoids holding a RefCell borrow across an await. + inner: Rc>, +} + +#[wasm_bindgen] +impl TrackProducer { + fn new(inner: moq_net::TrackProducer) -> Self { + Self { + inner: Rc::new(RefCell::new(inner)), + } + } + + #[wasm_bindgen(getter)] + pub fn name(&self) -> String { + self.inner.borrow().name().to_string() + } + + /// Append a new group with the next sequence number. + #[wasm_bindgen(js_name = appendGroup)] + pub fn append_group(&self) -> Result { + let group = self.inner.borrow_mut().append_group().map_err(js_err)?; + Ok(Group::from_producer(group)) + } + + /// Append a frame as its own single-frame group. + #[wasm_bindgen(js_name = writeFrame)] + pub fn write_frame(&self, frame: Uint8Array) -> Result<(), JsValue> { + self.inner.borrow_mut().write_frame(frame.to_vec()).map_err(js_err) + } + + /// Close the track, finishing cleanly (no error) or aborting. + pub fn close(&self) -> Result<(), JsValue> { + self.inner.borrow_mut().finish().map_err(js_err) + } + + /// Abort the track with an error message. + pub fn abort(&self, reason: String) -> Result<(), JsValue> { + self.inner + .borrow_mut() + .abort(moq_net::Error::Transport(reason)) + .map_err(js_err) + } + + /// Resolve when the track closes. Resolves to an error string, or `null` on + /// a clean close. + pub async fn closed(&self) -> Option { + let producer = self.inner.borrow().clone(); + let err = producer.closed().await; + match err { + moq_net::Error::Cancel | moq_net::Error::Closed => None, + other => Some(other.to_string()), + } + } +} + +/// The read side of a live track subscription. Mirrors `@moq/net`'s `TrackSubscriber`. +#[wasm_bindgen] +pub struct TrackSubscriber { + // `recv_group` is `&mut self` and the future must be `'static`; move the + // subscriber out of the cell for the await rather than borrowing across it. + // A re-entrant call while one is pending errors instead of aliasing. inner: Rc>>, + info: moq_net::TrackInfo, } #[wasm_bindgen] -impl Track { +impl TrackSubscriber { + fn new(inner: moq_net::TrackSubscriber) -> Self { + let info = inner.info().clone(); + Self { + inner: Rc::new(RefCell::new(Some(inner))), + info, + } + } + + #[wasm_bindgen(getter)] + pub fn name(&self) -> String { + self.inner + .borrow() + .as_ref() + .map(|s| s.name().to_string()) + .unwrap_or_default() + } + + /// The track's immutable publisher properties (resolved at subscribe time). + pub fn info(&self) -> JsValue { + track_info_to_js(&self.info) + } + /// Receive the next group in arrival order, or `null` when the track ends. #[wasm_bindgen(js_name = recvGroup)] pub async fn recv_group(&self) -> Result, JsValue> { @@ -129,31 +493,93 @@ impl Track { *cell.borrow_mut() = Some(sub); let group = result.map_err(js_err)?; - Ok(group.map(|g| Group { - sequence: g.sequence, - inner: Rc::new(RefCell::new(Some(g))), - })) + Ok(group.map(Group::from_consumer)) + } + + /// Return the next group with a strictly-greater sequence than the last, + /// skipping late arrivals. `null` when the track ends. + #[wasm_bindgen(js_name = nextGroup)] + pub async fn next_group(&self) -> Result, JsValue> { + let cell = self.inner.clone(); + let mut sub = cell + .borrow_mut() + .take() + .ok_or_else(|| js_err("nextGroup already in progress"))?; + let result = sub.next_group().await; + *cell.borrow_mut() = Some(sub); + + let group = result.map_err(js_err)?; + Ok(group.map(Group::from_consumer)) + } + + /// Change this subscription's priority. + #[wasm_bindgen(js_name = updatePriority)] + pub fn update_priority(&self, priority: u8) -> Result<(), JsValue> { + let mut guard = self.inner.borrow_mut(); + let sub = guard.as_mut().ok_or_else(|| js_err("recvGroup in progress"))?; + let subscription = sub.subscription().with_priority(priority); + sub.update(subscription); + Ok(()) + } + + /// Stop the subscription. Dropping the inner subscriber unsubscribes. + pub fn close(&self) { + self.inner.borrow_mut().take(); } } -/// A consumer for a single group, yielding frames. +enum GroupInner { + Producer(Rc>), + // `read_frame` is `&mut self`; take/restore across the await like the subscriber. + Consumer(Rc>>), +} + +/// A group of frames: writable when produced, readable when consumed. Mirrors +/// `@moq/net`'s dual-use `Group`. #[wasm_bindgen] pub struct Group { sequence: u64, - inner: Rc>>, + inner: GroupInner, } #[wasm_bindgen] impl Group { + fn from_producer(inner: moq_net::GroupProducer) -> Group { + Group { + sequence: inner.sequence, + inner: GroupInner::Producer(Rc::new(RefCell::new(inner))), + } + } + + fn from_consumer(inner: moq_net::GroupConsumer) -> Group { + Group { + sequence: inner.sequence, + inner: GroupInner::Consumer(Rc::new(RefCell::new(Some(inner)))), + } + } + #[wasm_bindgen(getter)] pub fn sequence(&self) -> u64 { self.sequence } - /// Read the next frame in the group, or `null` at the end of the group. + /// Write a frame to the group. Producer side only. + #[wasm_bindgen(js_name = writeFrame)] + pub fn write_frame(&self, frame: Uint8Array) -> Result<(), JsValue> { + match &self.inner { + GroupInner::Producer(p) => p.borrow_mut().write_frame(frame.to_vec()).map_err(js_err), + GroupInner::Consumer(_) => Err(js_err("cannot write to a consumed group")), + } + } + + /// Read the next frame in the group, or `null` at the end. Consumer side only. #[wasm_bindgen(js_name = readFrame)] pub async fn read_frame(&self) -> Result, JsValue> { - let cell = self.inner.clone(); + let cell = match &self.inner { + GroupInner::Consumer(c) => c.clone(), + GroupInner::Producer(_) => return Err(js_err("cannot read from a produced group")), + }; + let mut group = cell .borrow_mut() .take() @@ -164,4 +590,16 @@ impl Group { let frame = result.map_err(js_err)?; Ok(frame.map(|bytes| Uint8Array::from(bytes.as_ref()))) } + + /// Close the group: finish it cleanly on the producer side, drop the read + /// handle on the consumer side. + pub fn close(&self) -> Result<(), JsValue> { + match &self.inner { + GroupInner::Producer(p) => p.borrow_mut().finish().map_err(js_err), + GroupInner::Consumer(c) => { + c.borrow_mut().take(); + Ok(()) + } + } + } } diff --git a/rs/moq-wasm/src/transport.rs b/rs/moq-wasm/src/transport.rs index 92de85b24..2d6cfaba1 100644 --- a/rs/moq-wasm/src/transport.rs +++ b/rs/moq-wasm/src/transport.rs @@ -14,18 +14,28 @@ use web_transport_trait as wtt; #[derive(Clone)] pub struct Session(web_transport_wasm::Session); -/// Open a browser WebTransport connection to `url`. +/// A `ClientBuilder` advertising the moq ALPNs so the relay negotiates the same +/// version `@moq/net` does. Without this the browser sends no subprotocol and +/// the relay falls back to an old version over SETUP (e.g. lite-02), whose data +/// path doesn't work here. Needs web-transport-wasm 0.5.8+ for `with_protocols`. +fn builder() -> web_transport_wasm::ClientBuilder { + web_transport_wasm::ClientBuilder::new().with_protocols(moq_net::ALPNS.iter().copied()) +} + +/// Open a browser WebTransport connection to `url`, using the system roots. pub async fn connect(url: Url) -> Result { - let client = web_transport_wasm::ClientBuilder::new().with_system_roots(); - let session = client.connect(url).await.map_err(Error)?; + let session = builder().with_system_roots().connect(url).await.map_err(Error)?; Ok(Session(session)) } /// Connect, trusting only the given sha-256 certificate hashes (serverless dev, /// matching the browser's `serverCertificateHashes` option). pub async fn connect_with_hashes(url: Url, hashes: Vec>) -> Result { - let client = web_transport_wasm::ClientBuilder::new().with_server_certificate_hashes(hashes); - let session = client.connect(url).await.map_err(Error)?; + let session = builder() + .with_server_certificate_hashes(hashes) + .connect(url) + .await + .map_err(Error)?; Ok(Session(session)) } @@ -90,7 +100,11 @@ impl wtt::Session for Session { } fn protocol(&self) -> Option<&str> { - self.0.protocol() + // The browser can't advertise moq ALPNs (web-transport-wasm exposes no + // `protocols` option), so it reports an empty subprotocol. Map "" to None + // so moq-net negotiates the version over SETUP instead of erroring on a + // blank ALPN. + self.0.protocol().filter(|p| !p.is_empty()) } fn close(&self, code: u32, reason: &str) { From e86a61af04f18e5fe6524b2aee5d9168a4f3346f Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:10:05 -0700 Subject: [PATCH 2/7] fix(wasm): yield each frame in the lite group reader so consume doesn't freeze the browser On wasm the subscriber runs on the browser's single thread. A relay sends its whole cache backlog on subscribe, and over a local WebTransport every stream read resolves synchronously (a microtask), so `run_group` drained frames back to back without ever yielding to a macrotask, starving the event loop (no render, no timers, setTimeout never fires) and freezing the page. Add a `#[cfg(target_arch = "wasm32")]` `web_async::time::sleep(ZERO).await` per frame so the page stays responsive. No-op on native, where this is a background task with nothing to starve (verified: a native moq-cli subscriber reads the same broadcast fine, 246 groups of <=60 frames, so this is browser-thread starvation, not a grouping/loop bug). A WebWorker would be the proper fix (true off-main-thread + no per-frame yield throttle); this unblocks the wasm consume path in the meantime. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-net/src/lite/subscriber.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index ef7f972bb..2b17344fa 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -573,6 +573,15 @@ impl Subscriber { let mut prev_dur: u64 = 0; loop { + // On wasm this runs on the browser's single thread. A relay sends its + // whole cache backlog on subscribe, and over a local WebTransport every + // read resolves synchronously (a microtask), so draining frames back to + // back would starve the event loop (no render, no timers). Yield to a + // macrotask each frame so the page stays responsive. No-op on native, + // where this is a background task with nothing to starve. + #[cfg(target_arch = "wasm32")] + web_async::time::sleep(std::time::Duration::ZERO).await; + let (timestamp, duration) = if let Some(scale) = timescale { // Publisher advertised a timescale, so every frame on this stream is // prefixed with a zigzag-delta timestamp followed by a zigzag-delta From 84ef29b60eff08ebec40e983ff1aaf917da9f865 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:23:04 -0700 Subject: [PATCH 3/7] DEBUG(wasm): temporary boundary logging in the consume path Logs every wasm consume-API call (consume/announced.next/subscribe/recvGroup/ nextGroup/readFrame) with an `await...` before and a `-> result` after, target "wasm", at WARN so it shows with the default setup() tracing level. Use it to see where the consume path stalls: the last `await...` line without a matching `->` is the stuck call. Revert before merge. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-wasm/src/lib.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rs/moq-wasm/src/lib.rs b/rs/moq-wasm/src/lib.rs index 998c11785..4b005a2a3 100644 --- a/rs/moq-wasm/src/lib.rs +++ b/rs/moq-wasm/src/lib.rs @@ -169,7 +169,9 @@ impl OriginConsumer { /// Subscribe to a broadcast by path, waiting until it is announced. pub async fn consume(&self, path: String) -> Result, JsValue> { + tracing::warn!(target: "wasm", "consume({path}) await announced_broadcast..."); let broadcast = self.inner.announced_broadcast(path.as_str()).await; + tracing::warn!(target: "wasm", "consume({path}) -> {}", if broadcast.is_some() { "found" } else { "none" }); Ok(broadcast.map(Broadcast::from_consumer)) } } @@ -193,7 +195,9 @@ impl Announced { .borrow_mut() .take() .ok_or_else(|| js_err("announced.next already in progress"))?; + tracing::warn!(target: "wasm", "announced.next await..."); let result = consumer.next().await; + tracing::warn!(target: "wasm", "announced.next -> {:?}", result.as_ref().map(|(p, _)| p.as_str().to_string())); *cell.borrow_mut() = Some(consumer); Ok(result.map(|(path, status)| { @@ -370,6 +374,8 @@ impl TrackConsumer { /// Open a live subscription at the given priority (default 0). pub async fn subscribe(&self, priority: Option) -> Result { + let name = self.inner.name().to_string(); + tracing::warn!(target: "wasm", "subscribe({name}) await SUBSCRIBE_OK..."); let subscription = moq_net::Subscription::default().with_priority(priority.unwrap_or(0)); let subscriber = self .inner @@ -377,6 +383,7 @@ impl TrackConsumer { .map_err(js_err)? .await .map_err(js_err)?; + tracing::warn!(target: "wasm", "subscribe({name}) -> ok"); Ok(TrackSubscriber::new(subscriber)) } @@ -489,10 +496,13 @@ impl TrackSubscriber { .borrow_mut() .take() .ok_or_else(|| js_err("recvGroup already in progress"))?; + let name = sub.name().to_string(); + tracing::warn!(target: "wasm", "recvGroup({name}) await..."); let result = sub.recv_group().await; *cell.borrow_mut() = Some(sub); let group = result.map_err(js_err)?; + tracing::warn!(target: "wasm", "recvGroup({name}) -> {:?}", group.as_ref().map(|g| g.sequence)); Ok(group.map(Group::from_consumer)) } @@ -505,10 +515,13 @@ impl TrackSubscriber { .borrow_mut() .take() .ok_or_else(|| js_err("nextGroup already in progress"))?; + let name = sub.name().to_string(); + tracing::warn!(target: "wasm", "nextGroup({name}) await..."); let result = sub.next_group().await; *cell.borrow_mut() = Some(sub); let group = result.map_err(js_err)?; + tracing::warn!(target: "wasm", "nextGroup({name}) -> {:?}", group.as_ref().map(|g| g.sequence)); Ok(group.map(Group::from_consumer)) } @@ -584,10 +597,13 @@ impl Group { .borrow_mut() .take() .ok_or_else(|| js_err("readFrame already in progress"))?; + let seq = group.sequence; + tracing::warn!(target: "wasm", "readFrame(group={seq}) await..."); let result = group.read_frame().await; *cell.borrow_mut() = Some(group); let frame = result.map_err(js_err)?; + tracing::warn!(target: "wasm", "readFrame(group={seq}) -> {:?} bytes", frame.as_ref().map(|b| b.len())); Ok(frame.map(|bytes| Uint8Array::from(bytes.as_ref()))) } From 3ecab16774c8f78a4d358b953eb7ac5049c8ca55 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 11:36:19 -0700 Subject: [PATCH 4/7] DEBUG(wasm): RUST_LOG-style tracing to the browser console Swap the WARN-capped tracing-wasm for tracing-web plus a tracing-subscriber Targets filter, so setup() takes a RUST_LOG-style directive and routes moq_net's internal spans/events to the browser console. The boundary logs only show the shim surface; this exposes the consume path inside moq_net (lite::subscriber has the frame loop) to find where it stalls. setup(filter) parses e.g. "warn,moq_net::lite=trace,wasm=trace" (Targets, not EnvFilter, to avoid the regex/env-filter bloat on wasm). @moq/wasm's init() reads localStorage.moq_log, so you can crank a target up from the browser console and reload without a rebuild. Defaults to "warn". Temporary debug aid alongside the boundary logging; revert before merge. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 15 +++++++++------ js/wasm/src/index.ts | 24 ++++++++++++++++++++++-- rs/moq-wasm/Cargo.toml | 5 ++++- rs/moq-wasm/src/lib.rs | 29 +++++++++++++++++++++++------ 4 files changed, 58 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 440bb8a2c..34b8b4e2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1710,7 +1710,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccc2776f0c61eca1ca32528f85548abd1a4be8fb53d1b21c013e4f18da1e7090" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.117", ] [[package]] @@ -4663,7 +4663,8 @@ dependencies = [ "moq-net", "thiserror 2.0.18", "tracing", - "tracing-wasm", + "tracing-subscriber", + "tracing-web", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -8562,14 +8563,16 @@ dependencies = [ ] [[package]] -name = "tracing-wasm" -version = "0.2.1" +name = "tracing-web" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4575c663a174420fa2d78f4108ff68f65bf2fbb7dd89f33749b6e826b3626e07" +checksum = "b9e6a141feebd51f8d91ebfd785af50fca223c570b86852166caa3b141defe7c" dependencies = [ - "tracing", + "js-sys", + "tracing-core", "tracing-subscriber", "wasm-bindgen", + "web-sys", ] [[package]] diff --git a/js/wasm/src/index.ts b/js/wasm/src/index.ts index 395aa4ca3..a87469cbf 100644 --- a/js/wasm/src/index.ts +++ b/js/wasm/src/index.ts @@ -28,15 +28,35 @@ export { Path, Time }; // Load the wasm module once. `--target web` fetches `moq_bg.wasm` relative to // the JS via `import.meta.url`, which bundlers (vite/esbuild) resolve as an asset. let loaded: Promise | undefined; -export function init(): Promise { + +/** + * Load the wasm module and install the panic/tracing hooks. + * + * `filter` is a RUST_LOG-style tracing directive (e.g. `"moq_net=debug"`, + * `"warn,moq_net::lite=trace,wasm=trace"`). When omitted it falls back to + * `localStorage.moq_log`, so you can crank up logging from the browser console + * (`localStorage.moq_log = "moq_net::lite=trace"`) and reload without a rebuild. + * Defaults to `"warn"` inside the wasm if neither is set. + */ +export function init(filter?: string): Promise { if (!loaded) { + const directive = filter ?? logDirective(); loaded = initWasm().then(() => { - Wasm.setup(); + Wasm.setup(directive); }); } return loaded; } +function logDirective(): string | undefined { + try { + return globalThis.localStorage?.getItem("moq_log") ?? undefined; + } catch { + // localStorage can throw (private mode, no DOM); fall back to the default. + return undefined; + } +} + const textEncoder = new TextEncoder(); const textDecoder = new TextDecoder(); diff --git a/rs/moq-wasm/Cargo.toml b/rs/moq-wasm/Cargo.toml index 6586b1b71..dc8f52144 100644 --- a/rs/moq-wasm/Cargo.toml +++ b/rs/moq-wasm/Cargo.toml @@ -32,7 +32,10 @@ js-sys = "0.3" moq-net = { workspace = true, features = ["serde"] } thiserror = "2" tracing = "0.1" -tracing-wasm = "0.2" +# tracing-web routes spans/events to the browser console; tracing-subscriber's +# `Targets` filter parses a RUST_LOG-style directive (no env-filter/regex bloat). +tracing-subscriber = { version = "0.3", default-features = false, features = ["registry", "fmt"] } +tracing-web = "0.1" url = "2" # Pinned to the wasm-bindgen-cli version in the Nix dev shell: the bindgen # schema must match exactly between crate and CLI. Bump both together. diff --git a/rs/moq-wasm/src/lib.rs b/rs/moq-wasm/src/lib.rs index 4b005a2a3..3fb53fa7b 100644 --- a/rs/moq-wasm/src/lib.rs +++ b/rs/moq-wasm/src/lib.rs @@ -23,6 +23,8 @@ use std::rc::Rc; use std::time::Duration; use js_sys::{Object, Reflect, Uint8Array}; +use tracing_subscriber::filter::Targets; +use tracing_subscriber::prelude::*; use wasm_bindgen::prelude::*; mod transport; @@ -78,15 +80,30 @@ fn track_info_to_js(info: &moq_net::TrackInfo) -> JsValue { /// Install panic + tracing hooks for readable errors. Call once after the wasm /// module's default `init()` loader resolves. (Named `setup` to avoid colliding /// with wasm-bindgen's default `init` export, which loads the module itself.) +/// +/// `filter` is a RUST_LOG-style directive routed to the browser console, e.g. +/// `"warn"`, `"moq_net=debug"`, or `"warn,moq_net::lite=trace,wasm=trace"`. +/// Defaults to `"warn"`: moq_net logs every wire message at TRACE, which floods +/// the console under announce churn, so crank a specific target up only when +/// debugging. `@moq/wasm`'s `init` reads `localStorage.moq_log` for this. #[wasm_bindgen] -pub fn setup() { +pub fn setup(filter: Option) { console_error_panic_hook::set_once(); - // Cap tracing at WARN. The default is TRACE, which logs every wire message; - // under heavy announce churn that floods the console and can freeze the page. - let mut config = tracing_wasm::WASMLayerConfigBuilder::new(); - config.set_max_level(tracing::Level::WARN); - tracing_wasm::set_as_global_default_with_config(config.build()); + let directive = filter.as_deref().unwrap_or("warn"); + let targets = directive + .parse::() + .unwrap_or_else(|_| Targets::new().with_default(tracing::Level::WARN)); + + let fmt = tracing_subscriber::fmt::layer() + .with_ansi(false) + .without_time() // wasm has no SystemTime; the default fmt timer panics. + .with_writer(tracing_web::MakeWebConsoleWriter::new()) + .with_filter(targets); + + // try_init (not init): setup() is memoized in JS, but tolerate a double call + // under HMR rather than panicking on the second global-default registration. + let _ = tracing_subscriber::registry().with(fmt).try_init(); } /// A connected MoQ session: the wasm counterpart of `@moq/net`'s `Connection.Established`. From c4992f2ed7eb4f7d75e52bdc1f4c980194a53d91 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 13:37:51 -0700 Subject: [PATCH 5/7] fix(moq-net): read frames in place, without minting a per-poll FrameConsumer GroupConsumer::poll_read_frame / poll_read_frame_chunks called poll_get_frame -> frame.consume() on every poll, then dropped that FrameConsumer whenever the frame's data wasn't complete yet (still in flight). A FrameConsumer is a kio consumer handle, so that create+drop flips the frame's consumer count 0->1->0 each poll, and kio wakes the state's waiters on both the first-appears and last-drops transitions -- the same waiters our own read registered on. Every poll re-woke itself: a silent busy spin. On a multi-threaded runtime the producer fills the frame concurrently so the spin ends in microseconds (wasted CPU, no visible hang). On a single-thread executor (wasm) the consumer's self-wake loop starves the producer, so the frame never completes and the spin runs away into a hard freeze (~22M re-polls / ~45M wakes on one frame). Read the frame in place instead of through a consumer handle: - kio: add `Producer::poll_ref`, a read-only counterpart to `Producer::poll` that registers a waiter on a read condition without taking a `Mut` (no modified flag, no consumer-count churn). - model/frame: `FrameProducer::poll_read_all` reads the producer's own buffer once finished, via poll_ref. Stateless (always offset 0), so parallel readers are fine. - model/group: `GroupState::poll_frame_read_all` reads the cached FrameProducer directly; poll_read_frame / poll_read_frame_chunks use it and no longer mint a FrameConsumer. GroupConsumer stays a plain derive(Clone) with no extra state. Also drop the per-frame web_async::time::sleep(ZERO) yield in run_group: it didn't address this freeze (the spin is consumer-side) and spamming wasmtimer's shared global timer driver is itself a hazard. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/kio/src/producer.rs | 28 +++++++++++++++++++++++++ rs/moq-net/src/lite/subscriber.rs | 9 -------- rs/moq-net/src/model/frame.rs | 29 ++++++++++++++++++++++++++ rs/moq-net/src/model/group.rs | 34 ++++++++++++++++++++++++------- 4 files changed, 84 insertions(+), 16 deletions(-) diff --git a/rs/kio/src/producer.rs b/rs/kio/src/producer.rs index 1d41ca7ec..fd0e8fd85 100644 --- a/rs/kio/src/producer.rs +++ b/rs/kio/src/producer.rs @@ -111,6 +111,34 @@ impl Producer { Poll::Pending } + /// Poll-based **read-only** access with waker registration. + /// + /// Like [`Self::poll`] but hands `f` a [`Ref`] instead of a [`Mut`], so it + /// never flags the state modified and never wakes consumers. Use it to wait on + /// a read condition (e.g. a `fin` flag) from the producer side without creating + /// a [`Consumer`] — creating/dropping a consumer churns the consumer count and + /// wakes the value waiters, which would spin a polling reader. + pub fn poll_ref(&self, waiter: &Waiter, mut f: F) -> Poll>> + where + F: FnMut(&Ref<'_, T>) -> Poll, + { + let state = self.state.lock(); + let state = Ref { state }; + + if let Poll::Ready(res) = f(&state) { + return Poll::Ready(Ok(res)); + } + + if state.state.closed { + return Poll::Ready(Err(state)); + } + + let mut state = state.state; + waiter.register(&mut state.waiters); + + Poll::Pending + } + /// Wait for the closure to return [`Poll::Ready`], re-polling on each state change. /// /// Returns `Ok(R)` when the closure returns [`Poll::Ready`], or `Err(Ref)` with diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index 2b17344fa..ef7f972bb 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -573,15 +573,6 @@ impl Subscriber { let mut prev_dur: u64 = 0; loop { - // On wasm this runs on the browser's single thread. A relay sends its - // whole cache backlog on subscribe, and over a local WebTransport every - // read resolves synchronously (a microtask), so draining frames back to - // back would starve the event loop (no render, no timers). Yield to a - // macrotask each frame so the page stays responsive. No-op on native, - // where this is a background task with nothing to starve. - #[cfg(target_arch = "wasm32")] - web_async::time::sleep(std::time::Duration::ZERO).await; - let (timestamp, duration) = if let Some(scale) = timescale { // Publisher advertised a timescale, so every frame on this stream is // prefixed with a zigzag-delta timestamp followed by a zigzag-delta diff --git a/rs/moq-net/src/model/frame.rs b/rs/moq-net/src/model/frame.rs index 8f614eb66..b04684078 100644 --- a/rs/moq-net/src/model/frame.rs +++ b/rs/moq-net/src/model/frame.rs @@ -264,6 +264,35 @@ impl FrameProducer { } } + /// Poll for the frame's full payload, resolving once it's finished. + /// + /// Reads from the producer side (via `kio::Producer::poll_ref`) so a polling + /// reader doesn't mint a transient [`FrameConsumer`] per poll: that would churn + /// the consumer count and wake the value waiters, spinning the poll. Always + /// returns the whole payload (offset 0), so it's safe to call from any number + /// of readers in parallel. + pub(crate) fn poll_read_all(&self, waiter: &kio::Waiter) -> Poll> { + let res = ready!(self.state.poll_ref(waiter, |state| { + if state.fin { + Poll::Ready(Ok(())) + } else if let Some(err) = &state.abort { + Poll::Ready(Err(err.clone())) + } else { + Poll::Pending + } + })); + + match res { + Ok(Ok(())) => { + // `fin` implies written == capacity (the producer fills the whole buffer). + let written = self.buf.written(Ordering::Acquire); + Poll::Ready(Ok(Bytes::from_owner(self.buf.clone()).slice(0..written))) + } + Ok(Err(err)) => Poll::Ready(Err(err)), + Err(state) => Poll::Ready(Err(state.abort.clone().unwrap_or(Error::Dropped))), + } + } + /// Block until there are no active consumers. pub async fn unused(&self) -> Result<()> { self.state diff --git a/rs/moq-net/src/model/group.rs b/rs/moq-net/src/model/group.rs index 310de94c8..edb522468 100644 --- a/rs/moq-net/src/model/group.rs +++ b/rs/moq-net/src/model/group.rs @@ -109,6 +109,26 @@ impl GroupState { } } + /// Poll for the full payload of the frame at `index`, reading it in place. + /// + /// Unlike [`Self::poll_get_frame`] this never mints a [`FrameConsumer`] (which + /// would churn the frame's consumer count and wake its waiters every poll); it + /// reads the cached [`FrameProducer`] directly. `waiter` is registered on the + /// frame's state so the reader wakes when it finishes. + fn poll_frame_read_all(&self, index: usize, waiter: &kio::Waiter) -> Poll>> { + if index < self.offset { + return Poll::Ready(Err(Error::CacheFull)); + } + match self.frames.get(index - self.offset) { + Some(frame) => Poll::Ready(Ok(Some(ready!(frame.poll_read_all(waiter))?))), + None if self.fin => Poll::Ready(Ok(None)), + None => match &self.abort { + Some(err) => Poll::Ready(Err(err.clone())), + None => Poll::Pending, + }, + } + } + fn poll_finished(&self) -> Poll> { if self.fin { Poll::Ready(Ok((self.offset + self.frames.len()) as u64)) @@ -373,13 +393,12 @@ impl GroupConsumer { /// Read the next frame's data all at once, without blocking. pub fn poll_read_frame(&mut self, waiter: &kio::Waiter) -> Poll>> { - let Some(mut frame) = ready!(self.poll(waiter, |state| state.poll_get_frame(self.index))?) else { + let index = self.index; + let Some(data) = ready!(self.poll(waiter, |state| state.poll_frame_read_all(index, waiter))?) else { return Poll::Ready(Ok(None)); }; - let data = ready!(frame.poll_read_all(waiter))?; self.index += 1; - Poll::Ready(Ok(Some(data))) } @@ -390,14 +409,15 @@ impl GroupConsumer { /// Read all of the chunks of the next frame, without blocking. pub fn poll_read_frame_chunks(&mut self, waiter: &kio::Waiter) -> Poll>>> { - let Some(mut frame) = ready!(self.poll(waiter, |state| state.poll_get_frame(self.index))?) else { + let index = self.index; + let Some(data) = ready!(self.poll(waiter, |state| state.poll_frame_read_all(index, waiter))?) else { return Poll::Ready(Ok(None)); }; - let data = ready!(frame.poll_read_all_chunks(waiter))?; self.index += 1; - - Poll::Ready(Ok(Some(data))) + // In-place reads return the whole frame as one slice; keep the chunked API + // shape (empty payload -> no chunks). + Poll::Ready(Ok(Some(if data.is_empty() { Vec::new() } else { vec![data] }))) } /// Read all of the chunks of the next frame. From 2231f942c21eb3c4814c1c3d5c2f2e928bcf0b34 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 14:59:10 -0700 Subject: [PATCH 6/7] fix(kio): split waiters by condition so writes don't churn closed/consumer waiters (#1739) Co-authored-by: Claude Opus 4.8 (1M context) --- rs/kio/CHANGELOG.md | 7 +++++++ rs/kio/src/consumer.rs | 14 ++++++++------ rs/kio/src/lib.rs | 29 +++++++++++++++++++++++++++-- rs/kio/src/producer.rs | 39 +++++++++++++++++++++++++++------------ rs/kio/src/weak.rs | 10 +++++----- 5 files changed, 74 insertions(+), 25 deletions(-) diff --git a/rs/kio/CHANGELOG.md b/rs/kio/CHANGELOG.md index 5541c3b96..05625b21b 100644 --- a/rs/kio/CHANGELOG.md +++ b/rs/kio/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Split the internal waiter list into separate value / closed / consumer lists, + so an event only wakes the waiters that care about it. Previously every value + modification (the hot path) also woke parked `closed()` and `used`/`unused` + waiters, which re-registered and ping-ponged. No public API change. + ### Changed - Reworked `Producer::poll` / `Producer::wait`. They previously handed the diff --git a/rs/kio/src/consumer.rs b/rs/kio/src/consumer.rs index c91041b5a..a06bc7e2a 100644 --- a/rs/kio/src/consumer.rs +++ b/rs/kio/src/consumer.rs @@ -14,8 +14,8 @@ use crate::{ /// /// Consumers have read-only access to the shared value and are notified when /// a producer modifies it. Cloning a consumer increments the consumer reference -/// count. When the last consumer is dropped, all waiters (e.g. [`Producer::unused`]) -/// are notified. +/// count. When the last consumer is dropped, the consumer-count waiters +/// (e.g. [`Producer::unused`]) are notified. #[derive(Debug)] pub struct Consumer { pub(crate) state: Lock>, @@ -46,7 +46,7 @@ impl Consumer { // Re-extract state from consumer_state to register let mut state = consumer_state.state; - waiter.register(&mut state.waiters); + waiter.register(&mut state.waiters_value); Poll::Pending } @@ -58,7 +58,7 @@ impl Consumer { return Poll::Ready(()); } - waiter.register(&mut state.waiters); + waiter.register(&mut state.waiters_closed); Poll::Pending } @@ -149,10 +149,12 @@ impl Drop for Consumer { return; } - // We were the last consumer, need to wake waiters + // We were the last consumer, so wake the `unused()` waiters. The value + // and closed waiters don't care about the consumer count, so leave + // them alone. let mut waiters = { let mut state = self.state.lock(); - state.waiters.take() + state.waiters_consumer.take() }; waiters.wake(); diff --git a/rs/kio/src/lib.rs b/rs/kio/src/lib.rs index 5493a91ee..ee76da376 100644 --- a/rs/kio/src/lib.rs +++ b/rs/kio/src/lib.rs @@ -18,16 +18,29 @@ mod future; mod producer; mod weak; +#[cfg(test)] +mod tests; + pub use consumer::Consumer; pub use future::{Future, Pending}; pub use producer::{Mut, Producer, Ref}; pub use waiter::{Waiter, WaiterList, wait}; pub use weak::Weak; +/// Waiters split by what they're waiting on, so an event only wakes the +/// waiters that care about it. The big win is per-modification writes (the hot +/// path) waking only `value`, leaving the long-lived `closed` and `consumer` +/// waiters untouched. #[derive(Debug)] pub(crate) struct State { pub value: T, - pub waiters: waiter::WaiterList, + /// Value changes (`poll`/`wait`). Woken on every modification. + pub waiters_value: waiter::WaiterList, + /// Closure (`closed`). Woken only when the channel closes. + pub waiters_closed: waiter::WaiterList, + /// Consumer-count changes (`used`/`unused`). `used`/`unused` are used + /// sequentially in practice, so they share one list. + pub waiters_consumer: waiter::WaiterList, pub closed: bool, } @@ -42,9 +55,21 @@ impl State { Self { value, closed: false, - waiters: waiter::WaiterList::new(), + waiters_value: waiter::WaiterList::new(), + waiters_closed: waiter::WaiterList::new(), + waiters_consumer: waiter::WaiterList::new(), } } + + /// Drain every waiter list. Used on close, which all waiters react to. + /// Caller wakes the returned lists after releasing the lock. + pub fn take_close_waiters(&mut self) -> [waiter::WaiterList; 3] { + [ + self.waiters_value.take(), + self.waiters_closed.take(), + self.waiters_consumer.take(), + ] + } } impl Deref for State { diff --git a/rs/kio/src/producer.rs b/rs/kio/src/producer.rs index 71f8953be..e9e8f5788 100644 --- a/rs/kio/src/producer.rs +++ b/rs/kio/src/producer.rs @@ -40,9 +40,9 @@ impl Producer { pub fn consume(&self) -> Consumer { let prev = self.counts.consumers.fetch_add(1, Ordering::AcqRel); - // Wake waiters (e.g. `used()`) when the first consumer appears. + // Wake `used()` waiters when the first consumer appears. if prev == 0 { - let mut waiters = self.state.lock().waiters.take(); + let mut waiters = self.state.lock().waiters_consumer.take(); waiters.wake(); } @@ -98,7 +98,7 @@ impl Producer { // Upgrade the Ref to a Mut, keeping the same lock guard. Poll::Ready(()) => Poll::Ready(Ok(Mut::new(guard.state))), Poll::Pending => { - waiter.register(&mut guard.state.waiters); + waiter.register(&mut guard.state.waiters_value); Poll::Pending } } @@ -126,7 +126,7 @@ impl Producer { return Poll::Ready(()); } - waiter.register(&mut state.waiters); + waiter.register(&mut state.waiters_closed); Poll::Pending } @@ -152,7 +152,7 @@ impl Producer { return Poll::Ready(Some(())); } - waiter.register(&mut state.waiters); + waiter.register(&mut state.waiters_consumer); // Re-check after registration to avoid TOCTOU race where the last // consumer drops between the initial check and waiter registration. @@ -183,7 +183,7 @@ impl Producer { return Poll::Ready(Some(())); } - waiter.register(&mut state.waiters); + waiter.register(&mut state.waiters_consumer); // Re-check after registration to avoid TOCTOU race where a consumer // is created between the initial check and waiter registration. @@ -244,7 +244,9 @@ impl Drop for Producer { return; } - // We were the last producer, need to close + // We were the last producer, need to close. Every waiter reacts to + // closure (value/closed resolve, `used`/`unused` resolve to `None`), + // so wake all the lists. let mut waiters = { let mut state = self.state.lock(); if state.closed { @@ -252,10 +254,12 @@ impl Drop for Producer { } state.closed = true; - state.waiters.take() + state.take_close_waiters() }; - waiters.wake(); + for list in &mut waiters { + list.wake(); + } } } @@ -311,11 +315,22 @@ impl Drop for Mut<'_, T> { return; } - // Drain wakers while holding lock, then wake after releasing - let mut waiters = state.waiters.take(); + // Drain wakers while holding lock, then wake after releasing. + // A modification that also closed the channel (e.g. `close()`) must + // wake the closed and consumer-count waiters too, since they resolve + // on closure. A plain modification touches only the value waiters. + let mut waiters_value = state.waiters_value.take(); + let extra = state + .closed + .then(|| [state.waiters_closed.take(), state.waiters_consumer.take()]); drop(state); // Release Mutex BEFORE waking - waiters.wake(); + waiters_value.wake(); + if let Some(mut extra) = extra { + for list in &mut extra { + list.wake(); + } + } } } diff --git a/rs/kio/src/weak.rs b/rs/kio/src/weak.rs index 3986bd956..7cf358256 100644 --- a/rs/kio/src/weak.rs +++ b/rs/kio/src/weak.rs @@ -46,9 +46,9 @@ impl Weak { pub fn consume(&self) -> Consumer { let prev = self.counts.consumers.fetch_add(1, Ordering::AcqRel); - // Wake waiters (e.g. `used()`) when the first consumer appears. + // Wake `used()` waiters when the first consumer appears. if prev == 0 { - let mut waiters = self.state.lock().waiters.take(); + let mut waiters = self.state.lock().waiters_consumer.take(); waiters.wake(); } @@ -94,7 +94,7 @@ impl Weak { return Poll::Ready(()); } - waiter.register(&mut state.waiters); + waiter.register(&mut state.waiters_closed); Poll::Pending } @@ -118,7 +118,7 @@ impl Weak { return Poll::Ready(None); } - waiter.register(&mut state.waiters); + waiter.register(&mut state.waiters_consumer); // Re-check after registration to avoid TOCTOU race where the last // consumer drops between the initial check and waiter registration. @@ -149,7 +149,7 @@ impl Weak { return Poll::Ready(None); } - waiter.register(&mut state.waiters); + waiter.register(&mut state.waiters_consumer); // Re-check after registration to avoid TOCTOU race. if self.counts.consumers.load(Ordering::Relaxed) > 0 { From c4e2fe9ba32de0a77f63df595aefa31669bde730 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 15:58:17 -0700 Subject: [PATCH 7/7] chore(demo): gitignore the local preview launch.json --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 22106299b..2ff2ec490 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,4 @@ __pycache__/ # Generated JSR manifest (derived from package.json by js/common/package.ts) jsr.json +.claude/launch.json