diff --git a/CLAUDE.md b/CLAUDE.md index a73d86332..0631127be 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -190,6 +190,7 @@ Changes in one area usually need matching updates elsewhere, including docs. If | `rs/moq-net` wire/API | `js/net`, `doc/concept` | | `rs/hang` catalog/container | `js/hang`, `doc/concept` | | `rs/moq-token` | `js/token` | +| `rs/moq-data` set wire/API | `js/data` (shared wire format, must stay byte-compatible) | | `rs/moq-relay` config/behavior | `doc/bin/relay/` | | `rs/moq-cli` | `doc/bin/cli.md` | | `rs/moq-gst` | `doc/bin/gstreamer.md` | diff --git a/Cargo.lock b/Cargo.lock index f47a4f869..7a1fda3ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3835,6 +3835,17 @@ dependencies = [ "url", ] +[[package]] +name = "moq-data" +version = "0.0.1" +dependencies = [ + "bytes", + "kio", + "moq-json", + "moq-net", + "thiserror 2.0.18", +] + [[package]] name = "moq-ffi" version = "0.2.21" diff --git a/Cargo.toml b/Cargo.toml index 65d1268ab..87020006e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "rs/moq-bench", "rs/moq-boy", "rs/moq-cli", + "rs/moq-data", "rs/moq-ffi", "rs/moq-gst", "rs/moq-json", @@ -28,6 +29,7 @@ default-members = [ "rs/moq-audio", "rs/moq-bench", "rs/moq-cli", + "rs/moq-data", # "rs/moq-ffi", # requires Python/maturin # "rs/moq-gst", # requires GStreamer "rs/moq-json", diff --git a/bun.lock b/bun.lock index 878677e34..ab1cb0af4 100644 --- a/bun.lock +++ b/bun.lock @@ -77,6 +77,22 @@ "typescript": "^6.0.3", }, }, + "js/data": { + "name": "@moq/data", + "version": "0.0.1", + "dependencies": { + "@moq/json": "workspace:^", + "@moq/net": "workspace:^", + }, + "devDependencies": { + "@types/bun": "^1.3.14", + "rimraf": "^6.1.3", + "typescript": "^6.0.3", + }, + "peerDependencies": { + "zod": "^4.0.0", + }, + }, "js/hang": { "name": "@moq/hang", "version": "0.2.11", @@ -519,6 +535,8 @@ "@moq/clock": ["@moq/clock@workspace:js/clock"], + "@moq/data": ["@moq/data@workspace:js/data"], + "@moq/demo": ["@moq/demo@workspace:demo/web"], "@moq/demo-boy": ["@moq/demo-boy@workspace:demo/boy"], diff --git a/js/data/README.md b/js/data/README.md new file mode 100644 index 000000000..81f74ec6f --- /dev/null +++ b/js/data/README.md @@ -0,0 +1,41 @@ +

+ Media over QUIC +

+ +# @moq/data + +[![npm version](https://img.shields.io/npm/v/@moq/data)](https://www.npmjs.com/package/@moq/data) +[![TypeScript](https://img.shields.io/badge/TypeScript-ready-blue.svg)](https://www.typescriptlang.org/) + +Helpers for sending metadata over [Media over QUIC](https://moq.dev/) tracks. + +Each helper maps an application data structure onto a [`@moq/net`](../net) track, handling snapshots and deltas so a late joiner can reconstruct the current state from the newest group alone. + +- **`@moq/data/set`** syncs a `Set`-like collection of arbitrary binary items, encoding changes as `+`/`-` deltas. +- **`@moq/data/json`** re-exports [`@moq/json`](../json) for snapshot/delta JSON publishing. It lives in its own package today and will migrate here over time. + +## Set + +```ts +import { Producer, Consumer, stringCodec } from "@moq/data/set"; + +// Publish the set of track names in a broadcast. +const producer = new Producer(track, { codec: stringCodec }); +producer.insert("video"); +producer.insert("audio"); +producer.remove("audio"); + +// Consume: each change is the items added and removed; `current()` is the full set. +const consumer = new Consumer(track, { codec: stringCodec }); +for await (const update of consumer) { + for (const name of update.added) console.log("track appeared:", name); + for (const name of update.removed) console.log("track left:", name); + console.log("now:", consumer.current()); // Set +} +``` + +Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. Each change is reported as the items it added and removed (a snapshot is diffed against the current set), so a watcher reacts to individual tracks appearing and leaving without comparing whole sets. + +Items are arbitrary binary data via a `Codec` (`encode`/`decode` to `Uint8Array`). `stringCodec` and `bytesCodec` are provided; supply your own for richer types. Items dedupe by their encoded bytes, so two values with the same encoding are the same member. + +Deltas are on by default (`deltaRatio: 2`); a delta is appended while the group stays within `deltaRatio` times the size of a fresh snapshot, otherwise a new snapshot group is started. diff --git a/js/data/package.json b/js/data/package.json new file mode 100644 index 000000000..faf6a0196 --- /dev/null +++ b/js/data/package.json @@ -0,0 +1,32 @@ +{ + "name": "@moq/data", + "type": "module", + "version": "0.0.1", + "description": "Helpers for sending metadata (sets, JSON) over MoQ tracks.", + "license": "(MIT OR Apache-2.0)", + "repository": "github:moq-dev/moq", + "sideEffects": false, + "exports": { + ".": "./src/index.ts", + "./set": "./src/set/index.ts", + "./json": "./src/json.ts" + }, + "scripts": { + "build": "rimraf dist && tsc -b && bun ../common/package.ts", + "check": "tsc --noEmit", + "test": "bun test --only-failures", + "release": "bun ../common/release.ts" + }, + "dependencies": { + "@moq/json": "workspace:^", + "@moq/net": "workspace:^" + }, + "peerDependencies": { + "zod": "^4.0.0" + }, + "devDependencies": { + "@types/bun": "^1.3.14", + "rimraf": "^6.1.3", + "typescript": "^6.0.3" + } +} diff --git a/js/data/src/index.ts b/js/data/src/index.ts new file mode 100644 index 000000000..60637782c --- /dev/null +++ b/js/data/src/index.ts @@ -0,0 +1,2 @@ +export * as Json from "@moq/json"; +export * as Set from "./set/index.ts"; diff --git a/js/data/src/json.ts b/js/data/src/json.ts new file mode 100644 index 000000000..82042c535 --- /dev/null +++ b/js/data/src/json.ts @@ -0,0 +1,2 @@ +// Snapshot/delta JSON publishing, re-exported from @moq/json. +export * from "@moq/json"; diff --git a/js/data/src/set/codec.ts b/js/data/src/set/codec.ts new file mode 100644 index 000000000..243c12392 --- /dev/null +++ b/js/data/src/set/codec.ts @@ -0,0 +1,26 @@ +/** + * Encodes a set item to and from its wire bytes. + * + * Encoding must be deterministic and round-trip: `codec.decode(codec.encode(value))` must equal + * `value`. Two items are the same set member iff they encode to the same bytes, so distinct items + * must encode distinctly. + */ +export interface Codec { + encode(value: T): Uint8Array; + decode(bytes: Uint8Array): T; +} + +const textEncoder = new TextEncoder(); +const textDecoder = new TextDecoder(); + +/** A codec for UTF-8 strings, e.g. a set of track names. */ +export const stringCodec: Codec = { + encode: (value) => textEncoder.encode(value), + decode: (bytes) => textDecoder.decode(bytes), +}; + +/** A codec for raw binary items, passed through untouched. */ +export const bytesCodec: Codec = { + encode: (value) => value, + decode: (bytes) => bytes, +}; diff --git a/js/data/src/set/consumer.ts b/js/data/src/set/consumer.ts new file mode 100644 index 000000000..a0ff287df --- /dev/null +++ b/js/data/src/set/consumer.ts @@ -0,0 +1,116 @@ +import type * as Moq from "@moq/net"; + +import type { Codec } from "./codec.ts"; +import type { Config } from "./producer.ts"; +import { decodeDelta, decodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts"; + +/** + * The items added and removed by a single change, returned by {@link Consumer.next}. + * + * A delta carries one item in exactly one field. A snapshot (the first frame of a group, or a late + * joiner's first read) carries its difference from the previous state, so several items may be + * added and removed at once. + */ +export interface Update { + added: T[]; + removed: T[]; +} + +/** + * Consumes a set from a track, reconstructing it from snapshots and deltas. + * + * Each change is reduced to the items it added and removed; the full set is available via + * {@link current}. + */ +export class Consumer { + #track: Moq.Track; + #codec: Codec; + + #group?: Moq.Group; + // Keyed by encoded bytes so items dedupe by value, not reference. + #current = new Map(); + #framesRead = 0; + + constructor(track: Moq.Track, config: Config) { + this.#track = track; + this.#codec = config.codec; + } + + /** The full set as currently reconstructed. Updated by each {@link next}. */ + current(): Set { + return new Set(this.#current.values()); + } + + /** + * Get the next change as added/removed items, or undefined once the track ends. + * + * Frames that change nothing are skipped, so a returned {@link Update} is never empty. Switching + * to a newer group diffs its snapshot against the current set, so no change is missed or doubled. + */ + async next(): Promise | undefined> { + for (;;) { + if (!this.#group) { + // Advance to the next group with a higher sequence number (skipping late arrivals). We + // keep #current across the switch so the next snapshot diffs against it. + this.#group = await this.#track.nextGroupOrdered(); + if (!this.#group) return undefined; + this.#framesRead = 0; + } + + const frame = await this.#group.readFrame(); + if (frame === undefined) { + // The group is exhausted; advance to the next one. + this.#group = undefined; + continue; + } + + const update = this.#apply(frame); + if (update.added.length > 0 || update.removed.length > 0) return update; + // A no-op frame (redundant snapshot or delta); read the next one. + } + } + + async *[Symbol.asyncIterator](): AsyncIterator> { + for (;;) { + const update = await this.next(); + if (update === undefined) return; + yield update; + } + } + + // Apply one frame, returning what it changed: frame 0 of a group is a snapshot (diffed against + // the current set), the rest are insert/remove deltas. + #apply(frame: Uint8Array): Update { + this.#framesRead += 1; + + if (this.#framesRead === 1) { + const next = new Map(); + for (const item of decodeSnapshot(frame)) { + next.set(keyOf(item), this.#codec.decode(item)); + } + + const added: T[] = []; + const removed: T[] = []; + for (const [key, value] of next) if (!this.#current.has(key)) added.push(value); + for (const [key, value] of this.#current) if (!next.has(key)) removed.push(value); + this.#current = next; + return { added, removed }; + } + + const [op, item] = decodeDelta(frame); + const key = keyOf(item); + if (op === INSERT) { + if (this.#current.has(key)) return { added: [], removed: [] }; + const value = this.#codec.decode(item); + this.#current.set(key, value); + return { added: [value], removed: [] }; + } + if (op === REMOVE) { + const value = this.#current.get(key); + if (value === undefined) return { added: [], removed: [] }; + this.#current.delete(key); + return { added: [], removed: [value] }; + } + throw new Error(`unknown op byte: ${op}`); + } +} diff --git a/js/data/src/set/index.ts b/js/data/src/set/index.ts new file mode 100644 index 000000000..b9d03551d --- /dev/null +++ b/js/data/src/set/index.ts @@ -0,0 +1,3 @@ +export { bytesCodec, type Codec, stringCodec } from "./codec.ts"; +export { Consumer, type Update } from "./consumer.ts"; +export { type Config, Producer } from "./producer.ts"; diff --git a/js/data/src/set/producer.ts b/js/data/src/set/producer.ts new file mode 100644 index 000000000..0429336e8 --- /dev/null +++ b/js/data/src/set/producer.ts @@ -0,0 +1,121 @@ +import type * as Moq from "@moq/net"; + +import type { Codec } from "./codec.ts"; +import { encodeDelta, encodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts"; + +// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. Kept well +// below the per-group frame cap so a late joiner can always read the snapshot at frame 0. +const MAX_DELTA_FRAMES = 256; + +export interface Config { + // Encodes items to and from their wire bytes. Use `stringCodec` for a set of strings. + codec: Codec; + + // A delta is appended to the current group while the deltas accumulated since the last snapshot + // stay within `deltaRatio` times the size of a fresh snapshot; otherwise a new snapshot group is + // started. Defaults to 2. The whole point of a set track is incremental add/remove, so deltas are + // on by default (unlike @moq/json). + deltaRatio?: number; +} + +/** Publishes a set over a track, choosing snapshots and deltas automatically. */ +export class Producer { + #track: Moq.Track; + #codec: Codec; + #deltaRatio: number; + + // Keyed by encoded bytes so items dedupe by value, not reference. + #items = new Map(); + + #group?: Moq.Group; + #groupFrames = 0; + #groupDeltaBytes = 0; + + constructor(track: Moq.Track, config: Config) { + this.#track = track; + this.#codec = config.codec; + this.#deltaRatio = config.deltaRatio ?? 2; + } + + /** Insert an item, publishing a delta or snapshot. Returns true if it was newly inserted. */ + insert(value: T): boolean { + const bytes = this.#codec.encode(value); + const key = keyOf(bytes); + if (this.#items.has(key)) return false; + + this.#items.set(key, value); + this.#publish(INSERT, bytes); + return true; + } + + /** Remove an item, publishing a delta or snapshot. Returns true if it was present. */ + remove(value: T): boolean { + const bytes = this.#codec.encode(value); + const key = keyOf(bytes); + if (!this.#items.has(key)) return false; + + this.#items.delete(key); + this.#publish(REMOVE, bytes); + return true; + } + + /** Whether the item is currently in the set. */ + has(value: T): boolean { + return this.#items.has(keyOf(this.#codec.encode(value))); + } + + /** The number of items currently in the set. */ + get size(): number { + return this.#items.size; + } + + /** Iterate over the items currently in the set. */ + values(): IterableIterator { + return this.#items.values(); + } + + /** Finish the track, closing any open group. */ + finish(): void { + this.#group?.close(); + this.#group = undefined; + this.#track.close(); + } + + // Publish a single change. The change is already reflected in `#items`, so a snapshot captures it. + #publish(op: number, item: Uint8Array): void { + const snapshot = this.#snapshot(); + const deltaLen = 1 + item.length; + + if (this.#shouldSnapshot(deltaLen, snapshot.length)) { + this.#writeSnapshot(snapshot); + } else { + // biome-ignore lint/style/noNonNullAssertion: shouldSnapshot returning false guarantees an open group. + this.#group!.writeFrame(encodeDelta(op, item)); + this.#groupFrames += 1; + this.#groupDeltaBytes += deltaLen; + } + } + + #snapshot(): Uint8Array { + const items: Uint8Array[] = []; + for (const value of this.#items.values()) items.push(this.#codec.encode(value)); + return encodeSnapshot(items); + } + + #shouldSnapshot(deltaLen: number, snapshotLen: number): boolean { + if (!this.#group || this.#groupFrames >= MAX_DELTA_FRAMES) return true; + // Roll a snapshot once the replayed deltas would outgrow the budget relative to a snapshot. + return this.#groupDeltaBytes + deltaLen > this.#deltaRatio * snapshotLen; + } + + #writeSnapshot(snapshot: Uint8Array): void { + // The previous group is complete; no more frames will be appended to it. + this.#group?.close(); + + const group = this.#track.appendGroup(); + group.writeFrame(snapshot); + this.#group = group; + this.#groupFrames = 1; + this.#groupDeltaBytes = 0; + } +} diff --git a/js/data/src/set/set.test.ts b/js/data/src/set/set.test.ts new file mode 100644 index 000000000..121625fb5 --- /dev/null +++ b/js/data/src/set/set.test.ts @@ -0,0 +1,135 @@ +import { expect, test } from "bun:test"; +import { Track } from "@moq/net"; + +import { stringCodec } from "./codec.ts"; +import { Consumer } from "./consumer.ts"; +import { Producer } from "./producer.ts"; + +// Collect the full set after each change a consumer yields, in order. Consumes the track's groups. +async function drain(track: Track): Promise[]> { + const out: Set[] = []; + const consumer = new Consumer(track, { codec: stringCodec }); + for (;;) { + const update = await consumer.next(); + if (update === undefined) break; + out.push(consumer.current()); + } + return out; +} + +// Inspect the published layout via the public API: the frame count of each group, in order. Like +// `drain`, this consumes the track's groups, so don't call both on one track. Finish the track +// first so group/frame reads terminate. +async function structure(track: Track): Promise { + const counts: number[] = []; + for (;;) { + const group = await track.nextGroupOrdered(); + if (!group) break; + + let frames = 0; + while ((await group.readFrame()) !== undefined) frames++; + counts.push(frames); + } + return counts; +} + +function set(...items: string[]): Set { + return new Set(items); +} + +test("deltas off: a snapshot group per change", async () => { + const track = new Track("test"); + // A tight ratio leaves no room for any delta past the snapshot, so every change rolls a group. + const producer = new Producer(track, { codec: stringCodec, deltaRatio: 0 }); + producer.insert("video"); + producer.insert("audio"); + producer.finish(); + + // Each change is its own single-frame snapshot group. (Reconstruction is covered elsewhere.) + expect(await structure(track)).toEqual([1, 1]); +}); + +test("deltas share one group", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec }); + producer.insert("video"); // snapshot + producer.insert("audio"); // delta + producer.remove("video"); // delta + producer.finish(); + + // All changes fit in a single group as snapshot + two deltas. + expect(await structure(track)).toEqual([3]); +}); + +test("redundant insert and remove write nothing", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec }); + expect(producer.insert("video")).toBe(true); + expect(producer.insert("video")).toBe(false); // already present + expect(producer.remove("audio")).toBe(false); // never present + producer.finish(); + + expect(await structure(track)).toEqual([1]); +}); + +test("live consumer sees each change as added/removed", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec }); + const consumer = new Consumer(track, { codec: stringCodec }); + + producer.insert("video"); + expect(await consumer.next()).toEqual({ added: ["video"], removed: [] }); + + producer.insert("audio"); + expect(await consumer.next()).toEqual({ added: ["audio"], removed: [] }); + + producer.remove("video"); + expect(await consumer.next()).toEqual({ added: [], removed: ["video"] }); + + // The reconstructed set tracks the net result alongside the per-change deltas. + expect(consumer.current()).toEqual(set("audio")); + + producer.finish(); +}); + +test("snapshot-only stream still reports incremental changes", async () => { + const track = new Track("test"); + // A zero ratio rolls a fresh snapshot group on every change, so the consumer only sees + // snapshots, yet must still report each change as a single add or remove (diffed vs current). + const producer = new Producer(track, { codec: stringCodec, deltaRatio: 0 }); + const consumer = new Consumer(track, { codec: stringCodec }); + + producer.insert("a"); + expect(await consumer.next()).toEqual({ added: ["a"], removed: [] }); + + producer.insert("b"); + expect(await consumer.next()).toEqual({ added: ["b"], removed: [] }); + + producer.remove("a"); + expect(await consumer.next()).toEqual({ added: [], removed: ["a"] }); + expect(consumer.current()).toEqual(set("b")); + + producer.finish(); +}); + +test("late joiner reconstructs from deltas", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec }); + producer.insert("a"); + producer.insert("b"); + producer.insert("c"); + producer.remove("a"); + producer.finish(); + + expect((await drain(track)).at(-1)).toEqual(set("b", "c")); +}); + +test("frame cap rolls snapshot", async () => { + const track = new Track("test"); + const producer = new Producer(track, { codec: stringCodec, deltaRatio: 1_000_000 }); + // Snapshot (frame 0) plus deltas fill the group until the frame cap forces a roll. + for (let i = 0; i <= 256; i++) producer.insert(`item-${i}`); + producer.finish(); + + expect(await structure(track)).toEqual([256, 1]); +}); diff --git a/js/data/src/set/wire.ts b/js/data/src/set/wire.ts new file mode 100644 index 000000000..75dbef314 --- /dev/null +++ b/js/data/src/set/wire.ts @@ -0,0 +1,69 @@ +// Wire format for a set track. Each group is self-contained: frame 0 is a snapshot of every item, +// and each following frame is a single insert/remove delta. +// +// - snapshot: u32(count) followed by `count` repetitions of u32(len) then `len` item bytes. +// - delta: a one-byte op ('+' insert, '-' remove) followed by the item bytes to the end of frame. +// +// Lengths are big-endian u32 (not QUIC varints) so the format stays self-contained and trivially +// matches the Rust implementation (`moq-data`). + +export const INSERT = 0x2b; // '+' +export const REMOVE = 0x2d; // '-' + +/** A stable map key for an item's encoded bytes, giving the set value (not reference) semantics. */ +export function keyOf(bytes: Uint8Array): string { + let key = ""; + for (let i = 0; i < bytes.length; i++) key += String.fromCharCode(bytes[i]); + return key; +} + +export function encodeSnapshot(items: Uint8Array[]): Uint8Array { + let total = 4; + for (const item of items) total += 4 + item.length; + + const out = new Uint8Array(total); + const view = new DataView(out.buffer); + view.setUint32(0, items.length); + + let offset = 4; + for (const item of items) { + view.setUint32(offset, item.length); + offset += 4; + out.set(item, offset); + offset += item.length; + } + return out; +} + +export function decodeSnapshot(frame: Uint8Array): Uint8Array[] { + if (frame.length < 4) throw new Error("snapshot is missing its count"); + const view = new DataView(frame.buffer, frame.byteOffset, frame.byteLength); + const count = view.getUint32(0); + + const items: Uint8Array[] = []; + let offset = 4; + for (let i = 0; i < count; i++) { + if (offset + 4 > frame.length) throw new Error("snapshot is missing an item length"); + const len = view.getUint32(offset); + offset += 4; + + if (offset + len > frame.length) throw new Error("snapshot item runs past end of frame"); + items.push(frame.subarray(offset, offset + len)); + offset += len; + } + + if (offset !== frame.length) throw new Error("snapshot has trailing bytes"); + return items; +} + +export function encodeDelta(op: number, item: Uint8Array): Uint8Array { + const out = new Uint8Array(1 + item.length); + out[0] = op; + out.set(item, 1); + return out; +} + +export function decodeDelta(frame: Uint8Array): [number, Uint8Array] { + if (frame.length === 0) throw new Error("empty delta frame"); + return [frame[0], frame.subarray(1)]; +} diff --git a/js/data/tsconfig.json b/js/data/tsconfig.json new file mode 100644 index 000000000..bb55d7c43 --- /dev/null +++ b/js/data/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "./src", + "types": ["bun"] + }, + "include": ["src"] +} diff --git a/package.json b/package.json index 2d4c5a54b..91f8bc0a4 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,7 @@ "js/clock", "js/token", "js/json", + "js/data", "js/hang", "js/loc", "js/msf", diff --git a/rs/moq-data/Cargo.toml b/rs/moq-data/Cargo.toml new file mode 100644 index 000000000..c916d088a --- /dev/null +++ b/rs/moq-data/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "moq-data" +description = "Helpers for sending metadata (sets, JSON) over MoQ tracks." +authors = ["Luke Curley "] +repository = "https://github.com/moq-dev/moq" +license = "MIT OR Apache-2.0" + +version = "0.0.1" +edition = "2024" +rust-version.workspace = true + +keywords = ["quic", "http3", "webtransport", "metadata", "live"] +categories = ["multimedia", "network-programming", "web-programming"] + +[lib] +doctest = false + +[features] +default = ["json", "set"] +# Re-export `moq-json` as `moq_data::json`. JSON will migrate into this crate eventually. +json = ["dep:moq-json"] +# A `HashSet`-like collection synced over a track via add/remove deltas. +set = ["dep:moq-net", "dep:bytes", "dep:kio", "dep:thiserror"] + +[dependencies] +bytes = { version = "1", optional = true } +kio = { workspace = true, optional = true } +moq-json = { workspace = true, optional = true } +moq-net = { workspace = true, optional = true } +thiserror = { version = "2", optional = true } diff --git a/rs/moq-data/README.md b/rs/moq-data/README.md new file mode 100644 index 000000000..8deb7c65e --- /dev/null +++ b/rs/moq-data/README.md @@ -0,0 +1,52 @@ +

+ Media over QUIC +

+ +# moq-data + +[![crates.io](https://img.shields.io/crates/v/moq-data)](https://crates.io/crates/moq-data) +[![docs.rs](https://img.shields.io/docsrs/moq-data)](https://docs.rs/moq-data) + +Helpers for sending metadata over [Media over QUIC](https://moq.dev/) tracks. + +Each helper maps an application data structure onto a [`moq-net`](../moq-net) track, handling snapshots and deltas so a late joiner can reconstruct the current state from the newest group alone. + +- **`set`** syncs a `HashSet`-like collection of arbitrary binary items, encoding changes as `+`/`-` deltas. +- **`json`** re-exports [`moq-json`](../moq-json) for snapshot/delta JSON publishing. It lives in its own crate today and will migrate here over time. + +## Set + +```rust +use moq_data::set; + +// Publish the set of track names in a broadcast. +let mut tracks = set::Producer::::new(track, set::Config::default()); +tracks.insert("video".to_string())?; +tracks.insert("audio".to_string())?; +tracks.remove("audio")?; + +// Consume: each change is the items added and removed; `current()` is the full set. +let mut consumer = set::Consumer::::new(tracks.consume()); +while let Some(update) = consumer.next().await? { + for name in &update.added { + println!("track appeared: {name}"); + } + for name in &update.removed { + println!("track left: {name}"); + } + println!("now: {:?}", consumer.current()); +} +``` + +Each group is self-contained: its first frame is a full snapshot of every item and any following frames are single `+` (insert) or `-` (remove) deltas applied in order. A consumer jumps to the newest group, reads the snapshot, and replays the deltas. Each change is reported as the items it added and removed (a snapshot is diffed against the current set), so a watcher reacts to individual tracks appearing and leaving without comparing whole sets. + +Items are arbitrary binary data: implement the `set::Item` trait for any type. It encodes straight into the frame's `bytes::BufMut` and decodes from a `bytes::Buf`, both zero-copy (`copy_to_bytes` hands back a slice of the frame). `String`, `Vec`, and `bytes::Bytes` are supported out of the box. + +Deltas are on by default (`Config { delta_ratio: Some(2.0) }`); a delta is appended while the group stays within `delta_ratio` times the size of a fresh snapshot, otherwise a new snapshot group is started. Set `delta_ratio: None` to publish a full snapshot per change. + +## Features + +| Feature | Default | Description | +|---|---|---| +| `set` | yes | The `HashSet`-like collection. | +| `json` | yes | Re-export of `moq-json`. | diff --git a/rs/moq-data/src/lib.rs b/rs/moq-data/src/lib.rs new file mode 100644 index 000000000..e3163511b --- /dev/null +++ b/rs/moq-data/src/lib.rs @@ -0,0 +1,18 @@ +//! Helpers for sending metadata over [`moq-net`](https://docs.rs/moq-net) tracks. +//! +//! Each helper maps an application data structure onto a track, handling snapshots and deltas so a +//! late joiner can reconstruct the current state from the newest group alone. +//! +//! - [`set`] syncs a [`HashSet`](std::collections::HashSet)-like collection of arbitrary binary +//! items, encoding changes as `+`/`-` deltas. +//! - [`json`] re-exports [`moq-json`](https://docs.rs/moq-json) for snapshot/delta JSON publishing. + +/// Snapshot/delta JSON publishing, re-exported from [`moq-json`](https://docs.rs/moq-json). +#[cfg(feature = "json")] +pub use moq_json as json; + +#[cfg(feature = "set")] +pub mod set; + +#[cfg(feature = "set")] +mod sizer; diff --git a/rs/moq-data/src/set.rs b/rs/moq-data/src/set.rs new file mode 100644 index 000000000..da4d82766 --- /dev/null +++ b/rs/moq-data/src/set.rs @@ -0,0 +1,783 @@ +//! A [`HashSet`]-like collection synced over a [`moq-net`](moq_net) track. +//! +//! The set is published as a series of self-contained groups. A group's first frame is a full +//! snapshot of every item; each following frame is a single `+` (insert) or `-` (remove) delta +//! applied in order. A consumer jumps to the newest group, decodes the snapshot, and replays the +//! deltas, so a late joiner never needs older groups. +//! +//! Items are arbitrary binary data: any type implementing [`Item`] (encode to bytes, decode back) +//! can live in the set. [`String`], [`Vec`], and [`bytes::Bytes`] are supported out of the box; +//! a custom type can implement [`Item`] however it likes (e.g. via `serde_json`). +//! +//! # Wire format +//! +//! Every frame within a group is one of: +//! +//! - **snapshot** (frame 0): big-endian `u32(count)` followed by `count` repetitions of +//! big-endian `u32(len)` then `len` item bytes. +//! - **delta** (frame 1+): a one-byte op (`+` = `0x2B` insert, `-` = `0x2D` remove) followed by the +//! item bytes, which run to the end of the frame. + +use std::borrow::Borrow; +use std::collections::HashSet; +use std::hash::Hash; +use std::task::Poll; + +use bytes::{Buf, BufMut, Bytes}; + +/// One-byte op prefixing an insert delta frame. +const INSERT: u8 = b'+'; +/// One-byte op prefixing a remove delta frame. +const REMOVE: u8 = b'-'; + +/// Maximum frames (snapshot + deltas) in a single group before a new snapshot is forced. +/// +/// Kept well below moq-net's per-group frame cap so a late joiner can always read the snapshot at +/// frame 0 before the group is evicted. +const MAX_DELTA_FRAMES: usize = 256; + +/// Errors produced while publishing or consuming a set. +#[derive(thiserror::Error, Debug, Clone)] +#[non_exhaustive] +pub enum Error { + /// An error from the underlying track. + #[error(transparent)] + Net(#[from] moq_net::Error), + + /// A frame could not be parsed as a snapshot or delta. + #[error("malformed frame: {0}")] + Malformed(String), + + /// An item failed to encode or decode. + #[error("item: {0}")] + Item(String), +} + +/// A [`Result`](std::result::Result) using this module's [`Error`]. +pub type Result = std::result::Result; + +/// An item that can be stored in a [`Set`](Producer). +/// +/// Encoding must be deterministic and round-trip: decoding what [`encode`](Item::encode) wrote must +/// reproduce `item`. Two items are the same set member iff they're equal under [`Eq`]/[`Hash`], so +/// distinct items must encode to distinct bytes. +pub trait Item: Clone + Eq + Hash { + /// Encode the item's bytes directly into `buf`. + /// + /// Writing into the frame buffer (rather than returning a fresh `Bytes`) keeps a string or byte + /// vector to a single copy. + fn encode(&self, buf: &mut B); + + /// Decode an item from `buf`, which holds exactly this item's bytes. + /// + /// Read straight from the [`Buf`]: scalar getters (`get_u16`, ...) read in place, and + /// `buf.copy_to_bytes(buf.remaining())` hands back a zero-copy [`Bytes`] slice of the frame. + fn decode(buf: &mut B) -> Result + where + Self: Sized; + + /// The number of bytes [`encode`](Item::encode) writes, used to size a frame up front. + /// + /// The default runs `encode` against a counting `BufMut` (no allocation, no copy). Override it + /// when the length is known directly, e.g. `self.len()`. + fn encode_size(&self) -> usize { + let mut sizer = crate::sizer::Sizer::default(); + self.encode(&mut sizer); + sizer.size + } +} + +impl Item for String { + fn encode(&self, buf: &mut B) { + buf.put_slice(self.as_bytes()); + } + + fn decode(buf: &mut B) -> Result { + let bytes = buf.copy_to_bytes(buf.remaining()); + String::from_utf8(bytes.into()).map_err(|err| Error::Item(err.to_string())) + } + + fn encode_size(&self) -> usize { + self.len() + } +} + +impl Item for Vec { + fn encode(&self, buf: &mut B) { + buf.put_slice(self); + } + + fn decode(buf: &mut B) -> Result { + Ok(buf.copy_to_bytes(buf.remaining()).into()) + } + + fn encode_size(&self) -> usize { + self.len() + } +} + +impl Item for Bytes { + fn encode(&self, buf: &mut B) { + buf.put_slice(self); + } + + fn decode(buf: &mut B) -> Result { + Ok(buf.copy_to_bytes(buf.remaining())) + } + + fn encode_size(&self) -> usize { + self.len() + } +} + +/// Configuration for a [`Producer`]. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct Config { + /// Controls whether changes are published as deltas instead of full snapshots. + /// + /// `None` disables deltas: every change starts a new group with a full snapshot. + /// + /// `Some(ratio)` enables deltas. A `+`/`-` delta is appended to the current group as long as the + /// deltas accumulated since the last snapshot stay within `ratio` times the size of a fresh + /// snapshot; otherwise a new snapshot group is started, bounding how much a late joiner replays. + pub delta_ratio: Option, +} + +impl Default for Config { + fn default() -> Self { + // Deltas on by default: the whole point of a set track is incremental add/remove. + Self { delta_ratio: Some(2.0) } + } +} + +/// Publishes a set over a track, choosing snapshots and deltas automatically. +pub struct Producer { + track: moq_net::TrackProducer, + config: Config, + + current: HashSet, + group: Option, + /// Total frames in the open group, including the snapshot, for the frame cap. + group_frames: usize, + /// Bytes of delta frames appended since the last snapshot, for the ratio budget. + group_delta_bytes: u64, +} + +impl Producer { + /// Create a producer that publishes to the given track. + pub fn new(track: moq_net::TrackProducer, config: Config) -> Self { + Self { + track, + config, + current: HashSet::new(), + group: None, + group_frames: 0, + group_delta_bytes: 0, + } + } + + /// Insert an item, publishing a delta or snapshot. Returns `true` if it was newly inserted. + pub fn insert(&mut self, item: T) -> Result { + if self.current.contains(&item) { + return Ok(false); + } + + let delta_size = 1 + item.encode_size() as u64; + // The snapshot size once `item` is included, computed without inserting so we keep `&item`. + let snapshot_size = self.snapshot_size() + 4 + item.encode_size() as u64; + + if self.should_snapshot(delta_size, snapshot_size) { + // A snapshot encodes the full set, so insert first; undo if the write fails so our view + // stays consistent with what the track actually saw. + self.current.insert(item.clone()); + if let Err(err) = self.write_snapshot() { + self.current.remove(&item); + return Err(err); + } + } else { + // Write the delta straight from the reference, then move the item into the set. + self.write_delta(INSERT, &item)?; + self.current.insert(item); + } + Ok(true) + } + + /// Remove an item, publishing a delta or snapshot. Returns `true` if it was present. + pub fn remove(&mut self, item: &Q) -> Result + where + T: Borrow, + Q: Hash + Eq + ?Sized, + { + // `take` removes it from the set and hands back the owned value so we can encode the delta. + let Some(removed) = self.current.take(item) else { + return Ok(false); + }; + + let delta_size = 1 + removed.encode_size() as u64; + // `current` already reflects the removal. + let snapshot_size = self.snapshot_size(); + + let published = if self.should_snapshot(delta_size, snapshot_size) { + self.write_snapshot() + } else { + self.write_delta(REMOVE, &removed) + }; + if let Err(err) = published { + // Restore the item so our view stays consistent with what the track actually saw. + self.current.insert(removed); + return Err(err); + } + Ok(true) + } + + /// Whether the item is currently in the set. + pub fn contains(&self, item: &Q) -> bool + where + T: Borrow, + Q: Hash + Eq + ?Sized, + { + self.current.contains(item) + } + + /// The number of items currently in the set. + pub fn len(&self) -> usize { + self.current.len() + } + + /// Whether the set is currently empty. + pub fn is_empty(&self) -> bool { + self.current.is_empty() + } + + /// Iterate over the items currently in the set. + pub fn iter(&self) -> impl Iterator { + self.current.iter() + } + + /// Create a consumer for the underlying track. + pub fn consume(&self) -> moq_net::TrackConsumer { + self.track.consume() + } + + /// Finish the track, closing any open group. + pub fn finish(&mut self) -> Result<()> { + if let Some(mut group) = self.group.take() { + group.finish()?; + } + self.track.finish()?; + Ok(()) + } + + /// The byte size of a full snapshot of the current set: a `u32` count plus each item + /// length-prefixed. Computed from [`Item::encode_size`] so we can size a frame without a scratch + /// buffer. + fn snapshot_size(&self) -> u64 { + 4 + self + .current + .iter() + .map(|item| 4 + item.encode_size() as u64) + .sum::() + } + + fn should_snapshot(&self, delta_size: u64, snapshot_size: u64) -> bool { + let Some(ratio) = self.config.delta_ratio else { + return true; + }; + if self.group.is_none() || self.group_frames >= MAX_DELTA_FRAMES { + return true; + } + // Roll a snapshot once the replayed deltas would outgrow the budget relative to a snapshot. + (self.group_delta_bytes + delta_size) as f64 > ratio * snapshot_size as f64 + } + + /// Append a `+`/`-` delta frame for one item to the open group, encoding straight into the frame. + fn write_delta(&mut self, op: u8, item: &T) -> Result<()> { + let size = 1 + item.encode_size() as u64; + let group = self.group.as_mut().expect("delta requires an open group"); + + let mut frame = group.create_frame(size.into())?; + frame.put_u8(op); + item.encode(&mut frame); + frame.finish()?; + + self.group_frames += 1; + self.group_delta_bytes += size; + Ok(()) + } + + /// Start a new group whose first frame is a full snapshot of the current set, encoding straight + /// into the frame so each item is copied just once. + fn write_snapshot(&mut self) -> Result<()> { + // The previous group is complete; no more frames will be appended to it. + if let Some(mut group) = self.group.take() { + group.finish()?; + } + + let count = u32::try_from(self.current.len()).map_err(|_| Error::Malformed("set has too many items".into()))?; + let mut group = self.track.append_group()?; + + let mut frame = group.create_frame(self.snapshot_size().into())?; + frame.put_u32(count); + for item in &self.current { + let len = u32::try_from(item.encode_size()).map_err(|_| Error::Malformed("item is too large".into()))?; + frame.put_u32(len); + item.encode(&mut frame); + } + frame.finish()?; + + self.group_frames = 1; + self.group_delta_bytes = 0; + + if self.config.delta_ratio.is_some() { + // Keep the group open so future deltas can be appended. + self.group = Some(group); + } else { + // Deltas disabled: one snapshot per group. + group.finish()?; + } + + Ok(()) + } +} + +/// The items added and removed by a single change, returned by [`Consumer::next`]. +/// +/// A delta carries one item in exactly one of the fields. A snapshot (the first frame of a group, +/// or a late joiner's first read) carries its difference from the previous state, so several items +/// may be added and removed at once. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Update { + /// Items that joined the set. + pub added: Vec, + /// Items that left the set. + pub removed: Vec, +} + +// Manual impl so `T` needn't be `Default` (the derive would wrongly require it). +impl Default for Update { + fn default() -> Self { + Self { + added: Vec::new(), + removed: Vec::new(), + } + } +} + +impl Update { + /// Whether nothing changed. + pub fn is_empty(&self) -> bool { + self.added.is_empty() && self.removed.is_empty() + } +} + +/// Consumes a set from a track, reconstructing it from snapshots and deltas. +pub struct Consumer { + track: moq_net::TrackConsumer, + group: Option, + current: HashSet, + frames_read: usize, +} + +impl Consumer { + /// Create a consumer reading from the given track consumer. + pub fn new(track: moq_net::TrackConsumer) -> Self { + Self { + track, + group: None, + current: HashSet::new(), + frames_read: 0, + } + } + + /// The full set as currently reconstructed. Updated by each [`next`](Self::next). + pub fn current(&self) -> &HashSet { + &self.current + } + + /// Get the next change as added/removed items, or `None` once the track ends. + /// + /// Use [`current`](Self::current) afterward for the full set. + pub async fn next(&mut self) -> Result>> + where + T: Unpin, + { + kio::wait(|waiter| self.poll_next(waiter)).await + } + + /// Poll for the next change, without blocking. + /// + /// Jumps to the newest group, decodes its snapshot, and applies deltas in order. Each frame is + /// reduced to the items it added and removed; frames that change nothing are skipped, so a + /// returned [`Update`] is never empty. Switching to a newer group diffs its snapshot against the + /// current set, so no change is missed or duplicated. + pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll>>> { + loop { + // Drain to the newest group. We keep `current` across the switch so the next group's + // snapshot diffs against it, rather than re-reporting the whole set. + let track_finished = loop { + match self.track.poll_next_group(waiter)? { + Poll::Ready(Some(group)) => { + self.group = Some(group); + self.frames_read = 0; + } + Poll::Ready(None) => break true, + Poll::Pending => break false, + } + }; + + if let Some(group) = &mut self.group { + match group.poll_read_frame(waiter)? { + Poll::Ready(Some(frame)) => { + let update = self.apply(frame)?; + if !update.is_empty() { + return Poll::Ready(Ok(Some(update))); + } + // A no-op frame (redundant snapshot or delta); read the next one. + continue; + } + // The current group is exhausted; look for a newer one. + Poll::Ready(None) => { + self.group = None; + continue; + } + Poll::Pending => return Poll::Pending, + } + } + + return if track_finished { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + }; + } + } + + /// Apply one frame, returning what it changed: frame 0 of a group is a snapshot (diffed against + /// the current set), the rest are `+`/`-` deltas. + fn apply(&mut self, frame: Bytes) -> Result> { + self.frames_read += 1; + + if self.frames_read == 1 { + let snapshot = decode_snapshot(frame)?; + let removed = self.current.difference(&snapshot).cloned().collect(); + let added = snapshot.difference(&self.current).cloned().collect(); + self.current = snapshot; + return Ok(Update { added, removed }); + } + + let (op, mut item) = decode_delta(frame)?; + let item = T::decode(&mut item)?; + Ok(match op { + INSERT if self.current.insert(item.clone()) => Update { + added: vec![item], + removed: Vec::new(), + }, + REMOVE if self.current.remove(&item) => Update { + added: Vec::new(), + removed: vec![item], + }, + INSERT | REMOVE => Update::default(), + other => return Err(Error::Malformed(format!("unknown op byte: {other:#04x}"))), + }) + } +} + +/// Decode a snapshot frame: a `u32` count then each item `u32`-length-prefixed. +/// +/// Lengths are big-endian `u32` rather than QUIC varints so the format stays self-contained and +/// trivially matches the JS implementation (`@moq/data`). +fn decode_snapshot(mut frame: Bytes) -> Result> { + if frame.remaining() < 4 { + return Err(Error::Malformed("snapshot is missing its count".into())); + } + let count = frame.get_u32() as usize; + + // Every item costs at least its 4-byte length prefix, so a count larger than the remaining + // bytes allow can't be real. Reject it before allocating so a malformed frame can't ask for a + // huge capacity. + if count > frame.remaining() / 4 { + return Err(Error::Malformed("snapshot count exceeds frame bounds".into())); + } + + let mut set = HashSet::with_capacity(count); + for _ in 0..count { + if frame.remaining() < 4 { + return Err(Error::Malformed("snapshot is missing an item length".into())); + } + let len = frame.get_u32() as usize; + if frame.remaining() < len { + return Err(Error::Malformed("snapshot item runs past end of frame".into())); + } + set.insert(T::decode(&mut frame.split_to(len))?); + } + + if frame.has_remaining() { + return Err(Error::Malformed("snapshot has trailing bytes".into())); + } + + Ok(set) +} + +/// Decode a delta frame: one op byte followed by the item bytes. +fn decode_delta(mut frame: Bytes) -> Result<(u8, Bytes)> { + if !frame.has_remaining() { + return Err(Error::Malformed("empty delta frame".into())); + } + let op = frame.get_u8(); + Ok((op, frame)) +} + +#[cfg(test)] +mod test { + use super::*; + + fn producer(config: Config) -> (Producer, moq_net::TrackConsumer) { + let track = moq_net::Track::new("test").produce(); + let consumer = track.consume(); + (Producer::new(track, config), consumer) + } + + fn set(items: &[&str]) -> HashSet { + items.iter().map(|s| s.to_string()).collect() + } + + /// Collect the full set after each change a consumer yields, in order. + fn drain(track: moq_net::TrackConsumer) -> Vec> { + let mut consumer = Consumer::::new(track); + let waiter = kio::Waiter::noop(); + let mut out = Vec::new(); + while let Poll::Ready(Ok(Some(_))) = consumer.poll_next(&waiter) { + out.push(consumer.current().clone()); + } + out + } + + /// The next non-empty update, panicking if one isn't ready. + fn next_update(consumer: &mut Consumer) -> Update { + match consumer.poll_next(&kio::Waiter::noop()) { + Poll::Ready(Ok(Some(update))) => update, + other => panic!("expected an update, got {other:?}"), + } + } + + /// Build a snapshot frame for a set of strings, independent of the producer's encoder, as a + /// decode oracle. + fn snapshot_bytes(items: &[&str]) -> Vec { + let mut buf = Vec::new(); + buf.extend_from_slice(&(items.len() as u32).to_be_bytes()); + for item in items { + buf.extend_from_slice(&(item.len() as u32).to_be_bytes()); + buf.extend_from_slice(item.as_bytes()); + } + buf + } + + #[test] + fn snapshot_roundtrip() { + let frame = Bytes::from(snapshot_bytes(&["video", "audio", "captions"])); + assert_eq!( + decode_snapshot::(frame).unwrap(), + set(&["video", "audio", "captions"]) + ); + } + + #[test] + fn malformed_snapshot_is_rejected() { + // Trailing bytes past the declared items. + let mut frame = snapshot_bytes(&["video"]); + frame.push(0xff); + assert!(decode_snapshot::(Bytes::from(frame)).is_err()); + + // A count far larger than the frame can hold must not allocate; it's rejected up front. + let huge = Bytes::from(vec![0xff, 0xff, 0xff, 0xff]); + assert!(decode_snapshot::(huge).is_err()); + } + + #[test] + fn deltas_off_snapshot_per_change() { + let (mut producer, track) = producer(Config { delta_ratio: None }); + producer.insert("video".into()).unwrap(); + producer.insert("audio".into()).unwrap(); + producer.finish().unwrap(); + + // Two changes => two snapshot groups; a late joiner only sees the latest full set. + assert_eq!(track.latest(), Some(1)); + assert_eq!(drain(track).last().unwrap(), &set(&["video", "audio"])); + } + + #[test] + fn deltas_share_one_group() { + let (mut producer, track) = producer(Config::default()); + producer.insert("video".into()).unwrap(); // snapshot, group 0 + producer.insert("audio".into()).unwrap(); // delta + producer.remove("video").unwrap(); // delta + producer.finish().unwrap(); + + // All changes fit in a single group as snapshot + deltas. + assert_eq!(track.latest(), Some(0)); + assert_eq!(drain(track).last().unwrap(), &set(&["audio"])); + } + + #[test] + fn redundant_insert_and_remove_write_nothing() { + let (mut producer, track) = producer(Config::default()); + assert!(producer.insert("video".into()).unwrap()); + assert!(!producer.insert("video".into()).unwrap()); // already present + assert!(!producer.remove("audio").unwrap()); // never present + producer.finish().unwrap(); + + // Only the first insert wrote a frame, so there's exactly one group. + assert_eq!(track.latest(), Some(0)); + assert_eq!(drain(track).last().unwrap(), &set(&["video"])); + } + + #[test] + fn live_consumer_sees_each_change() { + let (mut producer, track) = producer(Config::default()); + let mut consumer = Consumer::::new(track); + + producer.insert("video".into()).unwrap(); + let update = next_update(&mut consumer); + assert_eq!(update.added, vec!["video".to_string()]); + assert!(update.removed.is_empty()); + + producer.insert("audio".into()).unwrap(); + assert_eq!(next_update(&mut consumer).added, vec!["audio".to_string()]); + + producer.remove("video").unwrap(); + assert_eq!(next_update(&mut consumer).removed, vec!["video".to_string()]); + + // The reconstructed set tracks the net result alongside the per-change deltas. + assert_eq!(consumer.current(), &set(&["audio"])); + } + + #[test] + fn snapshot_diff_reports_incremental_changes() { + // A zero ratio rolls a fresh snapshot group on every change, so the consumer only ever sees + // snapshots, yet must still report each change as a single add or remove (diffed vs current). + let (mut producer, track) = producer(Config { delta_ratio: Some(0.0) }); + let mut consumer = Consumer::::new(track); + + producer.insert("a".into()).unwrap(); + assert_eq!(next_update(&mut consumer).added, vec!["a".to_string()]); + + producer.insert("b".into()).unwrap(); + let update = next_update(&mut consumer); + assert_eq!(update.added, vec!["b".to_string()]); + assert!(update.removed.is_empty()); + + producer.remove("a").unwrap(); + let update = next_update(&mut consumer); + assert_eq!(update.removed, vec!["a".to_string()]); + assert!(update.added.is_empty()); + assert_eq!(consumer.current(), &set(&["b"])); + } + + #[test] + fn late_joiner_reconstructs_from_deltas() { + let (mut producer, track) = producer(Config::default()); + producer.insert("a".into()).unwrap(); + producer.insert("b".into()).unwrap(); + producer.insert("c".into()).unwrap(); + producer.remove("a").unwrap(); + producer.finish().unwrap(); + + // A consumer created only now still rebuilds the final set from snapshot + deltas. + assert_eq!(drain(track).last().unwrap(), &set(&["b", "c"])); + } + + #[test] + fn frame_cap_rolls_snapshot() { + // A huge ratio would otherwise keep everything in one group; the frame cap forces a roll. + let (mut producer, track) = producer(Config { + delta_ratio: Some(1_000_000.0), + }); + + // Snapshot (frame 0) plus MAX_DELTA_FRAMES - 1 deltas fill the first group, then one more rolls. + for i in 0..=MAX_DELTA_FRAMES { + producer.insert(format!("item-{i}")).unwrap(); + } + producer.finish().unwrap(); + + assert_eq!(track.latest(), Some(1)); + assert_eq!(drain(track).last().unwrap().len(), MAX_DELTA_FRAMES + 1); + } + + #[test] + fn failed_publish_preserves_view() { + let track = moq_net::Track::new("test").produce(); + let mut producer = Producer::::new(track, Config::default()); + producer.insert("video".into()).unwrap(); + producer.finish().unwrap(); + + // The track is finished, so publishing fails. The local view must not record the change, + // otherwise it would disagree with what the track actually saw. + assert!(producer.insert("audio".into()).is_err()); + assert!(!producer.contains("audio")); + + assert!(producer.remove("video").is_err()); + assert!(producer.contains("video")); + } + + #[test] + fn binary_items_roundtrip() { + let track = moq_net::Track::new("test").produce(); + let sub = track.consume(); + let mut producer = Producer::>::new(track, Config::default()); + + producer.insert(vec![0x00, 0xff, 0x42]).unwrap(); + producer.insert(vec![0x01]).unwrap(); + producer.finish().unwrap(); + + let mut consumer = Consumer::>::new(sub); + let waiter = kio::Waiter::noop(); + while let Poll::Ready(Ok(Some(_))) = consumer.poll_next(&waiter) {} + + let expected: HashSet> = [vec![0x00, 0xff, 0x42], vec![0x01]].into_iter().collect(); + assert_eq!(consumer.current(), &expected); + } + + #[test] + fn custom_item_roundtrips() { + // A user type that encodes itself directly into the frame buffer, no intermediate `Bytes`. + #[derive(Clone, PartialEq, Eq, Hash, Debug)] + struct Point { + x: u16, + y: u16, + } + + // No `encode_size` override: the default counts the bytes via a `Sizer`. + impl Item for Point { + fn encode(&self, buf: &mut B) { + buf.put_u16(self.x); + buf.put_u16(self.y); + } + + fn decode(buf: &mut B) -> Result { + if buf.remaining() != 4 { + return Err(Error::Item("point must be 4 bytes".into())); + } + Ok(Point { + x: buf.get_u16(), + y: buf.get_u16(), + }) + } + } + + let track = moq_net::Track::new("test").produce(); + let sub = track.consume(); + let mut producer = Producer::::new(track, Config::default()); + producer.insert(Point { x: 1, y: 2 }).unwrap(); + producer.insert(Point { x: 3, y: 4 }).unwrap(); + producer.remove(&Point { x: 1, y: 2 }).unwrap(); + producer.finish().unwrap(); + + let mut consumer = Consumer::::new(sub); + let waiter = kio::Waiter::noop(); + while let Poll::Ready(Ok(Some(_))) = consumer.poll_next(&waiter) {} + + let expected: HashSet = [Point { x: 3, y: 4 }].into_iter().collect(); + assert_eq!(consumer.current(), &expected); + } +} diff --git a/rs/moq-data/src/sizer.rs b/rs/moq-data/src/sizer.rs new file mode 100644 index 000000000..78d74226d --- /dev/null +++ b/rs/moq-data/src/sizer.rs @@ -0,0 +1,208 @@ +use std::mem::MaybeUninit; + +use bytes::{Buf, BufMut, buf::UninitSlice}; + +/// A [`BufMut`] that counts bytes instead of writing them. +/// +/// Lets us measure how many bytes an `Item` will encode (to size a frame up front) by running its +/// encoder against a throwaway target, with no allocation and no copy. +#[derive(Default)] +pub(crate) struct Sizer { + pub size: usize, +} + +unsafe impl BufMut for Sizer { + unsafe fn advance_mut(&mut self, cnt: usize) { + self.size += cnt; + } + + fn chunk_mut(&mut self) -> &mut UninitSlice { + // We need to return a valid slice, but it won't actually be written to + // Use a thread-local static buffer to avoid safety issues + thread_local! { + static BUFFER: std::cell::UnsafeCell<[MaybeUninit; 8192]> = + const { std::cell::UnsafeCell::new([MaybeUninit::uninit(); 8192]) }; + } + + BUFFER.with(|buf| { + let ptr = buf.get(); + unsafe { + let slice = (*ptr).as_mut_ptr(); + bytes::buf::UninitSlice::from_raw_parts_mut(slice as *mut u8, 8192) + } + }) + } + + fn remaining_mut(&self) -> usize { + usize::MAX + } + + fn has_remaining_mut(&self) -> bool { + true + } + + fn put(&mut self, mut src: T) { + self.size += src.remaining(); + src.advance(src.remaining()); + } + + fn put_bytes(&mut self, _val: u8, cnt: usize) { + self.size += cnt; + } + + fn put_f32(&mut self, _val: f32) { + self.size += 4; + } + + fn put_f32_le(&mut self, _: f32) { + self.size += 4 + } + + fn put_f32_ne(&mut self, _: f32) { + self.size += 4 + } + + fn put_f64(&mut self, _: f64) { + self.size += 8 + } + + fn put_f64_le(&mut self, _: f64) { + self.size += 8 + } + + fn put_f64_ne(&mut self, _: f64) { + self.size += 8 + } + + fn put_i128(&mut self, _: i128) { + self.size += 16 + } + + fn put_i128_le(&mut self, _: i128) { + self.size += 16 + } + + fn put_i128_ne(&mut self, _: i128) { + self.size += 16 + } + + fn put_i16(&mut self, _: i16) { + self.size += 2 + } + + fn put_i16_le(&mut self, _: i16) { + self.size += 2 + } + + fn put_i16_ne(&mut self, _: i16) { + self.size += 2 + } + + fn put_i32(&mut self, _: i32) { + self.size += 4 + } + + fn put_i32_le(&mut self, _: i32) { + self.size += 4 + } + + fn put_i32_ne(&mut self, _: i32) { + self.size += 4 + } + + fn put_i64(&mut self, _: i64) { + self.size += 8 + } + + fn put_i64_le(&mut self, _: i64) { + self.size += 8 + } + + fn put_i64_ne(&mut self, _: i64) { + self.size += 8 + } + + fn put_i8(&mut self, _: i8) { + self.size += 1 + } + + fn put_int(&mut self, _: i64, nbytes: usize) { + self.size += nbytes + } + + fn put_int_le(&mut self, _: i64, nbytes: usize) { + self.size += nbytes + } + + fn put_int_ne(&mut self, _: i64, nbytes: usize) { + self.size += nbytes + } + + fn put_slice(&mut self, src: &[u8]) { + self.size += src.len(); + } + + fn put_u128(&mut self, _: u128) { + self.size += 16 + } + + fn put_u128_le(&mut self, _: u128) { + self.size += 16 + } + + fn put_u128_ne(&mut self, _: u128) { + self.size += 16 + } + + fn put_u16(&mut self, _: u16) { + self.size += 2 + } + + fn put_u16_le(&mut self, _: u16) { + self.size += 2 + } + + fn put_u16_ne(&mut self, _: u16) { + self.size += 2 + } + + fn put_u32(&mut self, _: u32) { + self.size += 4 + } + + fn put_u32_le(&mut self, _: u32) { + self.size += 4 + } + + fn put_u32_ne(&mut self, _: u32) { + self.size += 4 + } + + fn put_u64(&mut self, _: u64) { + self.size += 8 + } + + fn put_u64_le(&mut self, _: u64) { + self.size += 8 + } + + fn put_u64_ne(&mut self, _: u64) { + self.size += 8 + } + + fn put_u8(&mut self, _: u8) { + self.size += 1 + } + + fn put_uint(&mut self, _: u64, nbytes: usize) { + self.size += nbytes + } + + fn put_uint_le(&mut self, _: u64, nbytes: usize) { + self.size += nbytes + } + + fn put_uint_ne(&mut self, _: u64, nbytes: usize) { + self.size += nbytes + } +}