Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions doc/concept/layer/moq-lite.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ Here's a list of currently supported ALPNs:
See the Compatibility section below for more details about `moq-transport` support.

Once the QUIC or WebTransport connection is established, there is a minimal MoQ handshake.
The `SETUP` message is primarily used to negotiate extensions, then you're off to the races!
Each endpoint sends a single `SETUP` message advertising its capabilities (for example whether it can probe the available bitrate), then you're off to the races.
The two `SETUP` messages are independent, so neither side waits for the other before getting started.
Transports that don't carry a request URI (native QUIC, or qmux over TCP/TLS) also use `SETUP` to carry the path the client wants to reach.

### Announcements

Expand All @@ -67,6 +69,9 @@ The [moq-relay clustering](/bin/relay/cluster) feature actually uses this to dis
The peer first replies with the set of broadcasts that are currently live, then streams updates as they change.
This initial set is a discrete batch: the latest draft reports how many entries to expect up front, so a freshly connected session can wait until that snapshot has fully arrived before listing what's available, rather than racing the gossip.

Each broadcast also carries an **epoch** identifying its instance.
When the same broadcast is announced over multiple routes (or republished after going away), the epoch lets everyone converge on the newest instance instead of picking arbitrarily.

### Subscriptions

All data transfers are initiated by subscriptions.
Expand Down Expand Up @@ -102,8 +107,8 @@ Each Subscription consists of a few properties:
- **Group Order**: The order in which groups are delivered. Defaults to descending; higher IDs are delivered first.
- **Group Timeout**: The maximum duration to keep old groups in cache/transit. Defaults to 30 seconds.

The publisher also caps how long it retains old groups via a per-track **cache** age, announced in `SUBSCRIBE_OK` so relays re-serve with the same window.
A subscriber's Group Timeout can only be smaller than this cache age, since a group can't be waited for longer than it's kept around.
The publisher also keeps old groups around for a best-effort **cache** window so relays and late subscribers can still fetch them.
This is a local hint rather than a guarantee carried on the wire, and a subscriber's Group Timeout is bounded by it: a group can't be waited for longer than it's actually kept around.

By utilizing these properties, you can choose how your application behaves during congestion.
For example, consider a conference room with Alice and Bob:
Expand Down
64 changes: 64 additions & 0 deletions js/net/src/lite/announce.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { expect, test } from "bun:test";
import * as Path from "../path.ts";
import { Reader, Writer } from "../stream.ts";
import { AnnounceBroadcast } from "./announce.ts";
import { OriginSchema } from "./origin.ts";
import { Version } from "./version.ts";

function concat(chunks: Uint8Array[]): Uint8Array {
const total = chunks.reduce((sum, c) => sum + c.byteLength, 0);
const out = new Uint8Array(total);
let offset = 0;
for (const c of chunks) {
out.set(c, offset);
offset += c.byteLength;
}
return out;
}

async function bytes(f: (w: Writer) => Promise<void>): Promise<Uint8Array> {
const written: Uint8Array[] = [];
const writer = new Writer(
new WritableStream<Uint8Array>({ write: (chunk) => void written.push(new Uint8Array(chunk)) }),
);
await f(writer);
writer.close();
await writer.closed;
return concat(written);
}

async function roundTrip(msg: AnnounceBroadcast, version: Version): Promise<AnnounceBroadcast> {
const reader = new Reader(undefined, await bytes((w) => msg.encode(w, version)));
return AnnounceBroadcast.decode(reader, version);
}

test("AnnounceBroadcast epoch round-trips on draft-05", async () => {
const hops = [OriginSchema.parse(7n)];
// 1_700_000_000_000 is ~ms since 2020 in 2023; the others probe the edges of u53.
for (const epoch of [0, 1, 1_700_000_000_000, 2 ** 52]) {
const active = new AnnounceBroadcast({ suffix: Path.from("room/cam"), active: true, epoch, hops });
const gotActive = await roundTrip(active, Version.DRAFT_05_WIP);
expect(gotActive.active).toBe(true);
expect(gotActive.epoch).toBe(epoch);
expect(gotActive.suffix).toBe(Path.from("room/cam"));
expect(gotActive.hops).toEqual(hops);

const ended = new AnnounceBroadcast({ suffix: Path.from("room/cam"), active: false, epoch });
const gotEnded = await roundTrip(ended, Version.DRAFT_05_WIP);
expect(gotEnded.active).toBe(false);
expect(gotEnded.epoch).toBe(epoch);
}
});

test("AnnounceBroadcast epoch is omitted before draft-05", async () => {
// Pre-lite-05 carries no epoch on the wire, so a nonzero epoch decodes back as 0.
const msg = new AnnounceBroadcast({
suffix: Path.from("room/cam"),
active: true,
epoch: 42,
hops: [OriginSchema.parse(7n)],
});
const got = await roundTrip(msg, Version.DRAFT_04);
expect(got.epoch).toBe(0);
expect(got.suffix).toBe(Path.from("room/cam"));
});
68 changes: 54 additions & 14 deletions js/net/src/lite/announce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,49 @@ import * as Path from "../path.ts";
import type { Reader, Writer } from "../stream.ts";
import * as Message from "./message.ts";
import { type Origin, OriginSchema } from "./origin.ts";
import { hopsFixedWidth, Version } from "./version.ts";
import { hasBroadcastEpoch, hopsFixedWidth, Version } from "./version.ts";

// Must match the MAX_HOPS in Rust's model/origin.rs. Broadcasts with longer
// hop chains are rejected; this keeps loop-detection bounded and rejects
// pathological announcements across clusters with unbounded forwarding.
export const MAX_HOPS = 32;

export class Announce {
/**
* Seconds between the Unix epoch and 2020-01-01T00:00:00 UTC.
*
* Broadcast epochs ride the wire as milliseconds since this base (smaller than a
* Unix-epoch value, and good past the year 2500 in a varint). See {@link epochNow}.
*/
export const EPOCH_BASE_SECONDS = 1_577_836_800;

/**
* The current wall clock as a broadcast epoch: whole milliseconds since
* 2020-01-01 UTC (the wire value). Saturates to `0` for a clock before the base.
*/
export function epochNow(): number {
return Math.max(0, Math.floor(Date.now() - EPOCH_BASE_SECONDS * 1000));
}

/**
* ANNOUNCE_BROADCAST: sent by the publisher to advertise (or retract) a broadcast.
*
* Carries the broadcast path suffix, its instance {@link epoch} (lite-05+), and the
* hop chain. Renamed from `Announce` in lite-05.
*/
export class AnnounceBroadcast {
suffix: Path.Valid;
active: boolean;
/**
* Broadcast instance epoch: milliseconds since 2020-01-01 UTC (see {@link epochNow}).
* Only carried on the wire for lite-05+; `0` on older versions.
*/
epoch: number;
hops: Origin[];

constructor(props: { suffix: Path.Valid; active: boolean; hops?: Origin[] }) {
constructor(props: { suffix: Path.Valid; active: boolean; epoch?: number; hops?: Origin[] }) {
this.suffix = props.suffix;
this.active = props.active;
this.epoch = props.epoch ?? 0;
this.hops = props.hops ?? [];
if (this.hops.length > MAX_HOPS) {
throw new Error(`hop count ${this.hops.length} exceeds maximum ${MAX_HOPS}`);
Expand All @@ -27,6 +55,11 @@ export class Announce {
await w.bool(this.active);
await w.string(this.suffix);

// Lite05+: the epoch varint sits after the suffix and before the hop chain.
if (hasBroadcastEpoch(version)) {
await w.u53(this.epoch);
}

switch (version) {
case Version.DRAFT_01:
case Version.DRAFT_02:
Expand All @@ -49,10 +82,13 @@ export class Announce {
}
}

static async #decode(r: Reader, version: Version): Promise<Announce> {
static async #decode(r: Reader, version: Version): Promise<AnnounceBroadcast> {
const active = await r.bool();
const suffix = Path.from(await r.string());

// Lite05+ carries the epoch after the suffix; older versions default it to 0.
const epoch = hasBroadcastEpoch(version) ? await r.u53() : 0;

let hops: Origin[] = [];
switch (version) {
case Version.DRAFT_01:
Expand Down Expand Up @@ -80,23 +116,27 @@ export class Announce {
}
}

return new Announce({ suffix, active, hops });
return new AnnounceBroadcast({ suffix, active, epoch, hops });
}

async encode(w: Writer, version: Version): Promise<void> {
return Message.encode(w, (w) => this.#encode(w, version));
}

static async decode(r: Reader, version: Version): Promise<Announce> {
return Message.decode(r, (r) => Announce.#decode(r, version));
static async decode(r: Reader, version: Version): Promise<AnnounceBroadcast> {
return Message.decode(r, (r) => AnnounceBroadcast.#decode(r, version));
}

static async decodeMaybe(r: Reader, version: Version): Promise<Announce | undefined> {
return Message.decodeMaybe(r, (r) => Announce.#decode(r, version));
static async decodeMaybe(r: Reader, version: Version): Promise<AnnounceBroadcast | undefined> {
return Message.decodeMaybe(r, (r) => AnnounceBroadcast.#decode(r, version));
}
}

export class AnnounceInterest {
/**
* ANNOUNCE_REQUEST: sent by the subscriber to request ANNOUNCE_BROADCAST messages
* for a path prefix. Renamed from `AnnounceInterest` in lite-05.
*/
export class AnnounceRequest {
prefix: Path.Valid;
// Hop ID of the peer asking for announces. Zero means "no exclusion".
// Must be a bigint: peer origins are up to 64 bits and overflow u53.
Expand Down Expand Up @@ -125,7 +165,7 @@ export class AnnounceInterest {
}
}

static async #decode(r: Reader, version: Version): Promise<AnnounceInterest> {
static async #decode(r: Reader, version: Version): Promise<AnnounceRequest> {
const prefix = Path.from(await r.string());
let excludeHop = 0n;
switch (version) {
Expand All @@ -137,15 +177,15 @@ export class AnnounceInterest {
excludeHop = hopsFixedWidth(version) ? await r.u64() : await r.u62();
break;
}
return new AnnounceInterest(prefix, excludeHop);
return new AnnounceRequest(prefix, excludeHop);
}

async encode(w: Writer, version: Version): Promise<void> {
return Message.encode(w, (w) => this.#encode(w, version));
}

static async decode(r: Reader, version: Version): Promise<AnnounceInterest> {
return Message.decode(r, (r) => AnnounceInterest.#decode(r, version));
static async decode(r: Reader, version: Version): Promise<AnnounceRequest> {
return Message.decode(r, (r) => AnnounceRequest.#decode(r, version));
}
}

Expand Down
52 changes: 45 additions & 7 deletions js/net/src/lite/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ import { type Bandwidth, createBandwidth } from "../bandwidth.ts";
import type { Broadcast } from "../broadcast.ts";
import type { Established } from "../connection/established.ts";
import * as Path from "../path.ts";
import { type Reader, Readers, Stream } from "../stream.ts";
import { type Reader, Readers, Stream, Writer } from "../stream.ts";
import type * as Time from "../time.ts";
import { AnnounceInterest } from "./announce.ts";
import { AnnounceRequest } from "./announce.ts";
import { Goaway } from "./goaway.ts";
import { Group } from "./group.ts";
import { type Origin, randomOrigin } from "./origin.ts";
import { Publisher } from "./publisher.ts";
import { SessionInfo } from "./session.ts";
import { StreamId } from "./stream.ts";
import { ProbeLevel, Setup } from "./setup.ts";
import { DataType, StreamId } from "./stream.ts";
import { Subscribe } from "./subscribe.ts";
import { Subscriber } from "./subscriber.ts";
import { Track as TrackMessage } from "./track.ts";
import { Version, versionName } from "./version.ts";
import { hasSetupStream, Version, versionName } from "./version.ts";

const SEND_BW_POLL_INTERVAL = 100; // ms

Expand Down Expand Up @@ -60,6 +61,11 @@ export class Connection implements Established {
* chains) and Subscriber (available for optional self-filtering on announces). */
readonly origin: Origin;

// The peer's SETUP, recorded once its Setup stream is read (lite-05+). Streams whose
// encoding depends on a negotiated capability (e.g. PROBE) wait on this. undefined
// until the peer's SETUP arrives; stays undefined forever on older drafts.
#peerSetup = new Signal<Setup | undefined>(undefined);

/**
* Creates a new Connection instance.
* @param url - The URL of the connection
Expand Down Expand Up @@ -92,7 +98,14 @@ export class Connection implements Established {

this.origin = randomOrigin();
this.#publisher = new Publisher(this.#quic, this.#version, this.origin);
this.#subscriber = new Subscriber(this.#quic, this.#version, this.origin, this.recvBandwidth, this.rtt);
this.#subscriber = new Subscriber(
this.#quic,
this.#version,
this.origin,
this.recvBandwidth,
this.rtt,
this.#peerSetup,
);

this.#run();
}
Expand All @@ -115,6 +128,10 @@ export class Connection implements Established {
async #run(): Promise<void> {
const tasks: Promise<void>[] = [this.#runSession(), this.#runBidis(), this.#runUnis()];

if (hasSetupStream(this.#version)) {
tasks.push(this.#sendSetup());
}

if (this.sendBandwidth) {
tasks.push(this.#runSendBandwidth(this.sendBandwidth));
}
Expand Down Expand Up @@ -159,6 +176,22 @@ export class Connection implements Established {
}
}

// Open the unidirectional Setup Stream, send our single SETUP, and FIN (lite-05+).
// The browser uses WebTransport, which carries the request URI, so we advertise no
// path and leave routing to the URL. We advertise probe = Report (we measure and
// report bitrate over the PROBE stream, but don't actively pad the connection).
async #sendSetup(): Promise<void> {
const writer = await Writer.open(this.#quic);
try {
await writer.u8(DataType.Setup);
await new Setup(ProbeLevel.Report).encode(writer, this.#version);
writer.close();
} catch (err: unknown) {
writer.reset(err);
throw err;
}
}

async #runBidis() {
for (;;) {
const stream = await Stream.accept(this.#quic);
Expand All @@ -180,7 +213,7 @@ export class Connection implements Established {
if (typ === StreamId.Session) {
throw new Error("duplicate session stream");
} else if (typ === StreamId.Announce) {
const msg = await AnnounceInterest.decode(stream.reader, this.#version);
const msg = await AnnounceRequest.decode(stream.reader, this.#version);
await this.#publisher.runAnnounce(msg, stream);
} else if (typ === StreamId.Subscribe) {
const msg = await Subscribe.decode(stream.reader, this.#version);
Expand Down Expand Up @@ -217,9 +250,14 @@ export class Connection implements Established {

async #runUni(stream: Reader) {
const typ = await stream.u8();
if (typ === 0) {
if (typ === DataType.Group) {
const msg = await Group.decode(stream);
await this.#subscriber.runGroup(msg, stream);
} else if (typ === DataType.Setup) {
// The peer sends exactly one SETUP, then FINs. Record it so capability-gated
// streams (e.g. PROBE) can react, then drain to the FIN.
const setup = await Setup.decode(stream, this.#version);
this.#peerSetup.set(setup);
} else {
throw new Error(`unknown stream type: ${typ.toString()}`);
}
Expand Down
1 change: 1 addition & 0 deletions js/net/src/lite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from "./goaway.ts";
export * from "./group.ts";
export * from "./probe.ts";
export * from "./session.ts";
export * from "./setup.ts";
export * from "./stream.ts";
export * from "./subscribe.ts";
export * from "./track.ts";
Expand Down
Loading