diff --git a/Cargo.lock b/Cargo.lock index 5d97a772c..5b01e8f23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4227,6 +4227,7 @@ dependencies = [ "futures", "kio", "num_enum", + "object_store", "rand 0.10.1", "serde", "serde_json", @@ -5347,6 +5348,29 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbfbfff40aeccab00ec8a910b57ca8ecf4319b335c542f2edcd19dd25a1e2a00" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "http", + "humantime", + "itertools 0.14.0", + "parking_lot", + "percent-encoding", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", + "wasm-bindgen-futures", + "web-time", +] + [[package]] name = "octets" version = "0.3.5" @@ -6080,9 +6104,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.14" +version = "0.11.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +checksum = "4fcb935c5bec503c2f0e306bdd3e58bb9029dcb14fa8d9ac76e3a5256ac0763e" dependencies = [ "aws-lc-rs", "bytes", diff --git a/Cargo.toml b/Cargo.toml index ad0b4da94..2d2c308c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ moq-vaapi = "0.0.2" # opts in: binaries (libmoq, moq-boy, moq-cli) enable them, but a self-compiler can # leave them off to drop the CUDA / libva deps. moq-video itself still defaults them on. moq-video = { version = "0.0.4", path = "rs/moq-video", default-features = false } +object_store = { version = "0.12", default-features = false } qmux = { version = "0.2", default-features = false } serde = { version = "1", features = ["derive"] } tokio = "1.48" diff --git a/rs/moq-net/CACHE.md b/rs/moq-net/CACHE.md new file mode 100644 index 000000000..d7b4b158f --- /dev/null +++ b/rs/moq-net/CACHE.md @@ -0,0 +1,117 @@ +# moq-net track cache + +A per-track durable cache. It lets a relay or edge keep recent groups past the live window and +serve them back on a FETCH, spilling to local disk and optionally remote object storage. It lives +in `moq-net` so any consumer of a track (relay, edge, archiver) gets durable caching for free. + +## Shape + +The cache is **not** a separate handle you wire onto both endpoints. It lives on the shared track +state (`TrackState`), so the RAM tier is the track's own live `groups` buffer and the disk/remote +tiers hang off the same state. One store therefore backs the track's `TrackProducer` and every +`TrackConsumer` automatically; a fetch is served from whichever tier holds the group. + +```rust +// module moq_net::cache (native-only types are target-gated to non-wasm) + +let disk = cache::Disk::new(store, prefix, bounds) // object_store + key prefix + bounds + .with_remote(remote); // optional rollup target + +let producer = TrackProducer::new(name, info).with_cache(disk); +let consumer = producer.consume(); // shares the same store +``` + +## Principles + +- **Local, not on the wire.** The cache is local policy set by whoever holds a track endpoint (the + relay or edge), never by the original publisher and never carried on the wire. +- **RAM is the live window.** There is no second in-memory copy of recent groups: the cache reuses + `TrackState.groups`, the buffer the track already keeps for live subscribers. A group is + serialized (to `cache::Group`) and handed to the disk tier only when it ages out of that window. +- **No traits, no callbacks.** The cache is concrete values you configure and attach. moq-net owns + all behavior; the disk and remote backends are a configured `object_store`, not a + consumer-implemented extension point. +- **Per-track, no shared LRU.** Each track keeps its own recent window; there is no cross-track + accounting, so no shared lock. Footprint is the sum of per-track windows across live tracks. + +## Retention: two gates + +A group is evicted from the live window (`TrackState::evict_expired`) when it trips **either** of +two gates, both sized by `TrackInfo::cache` (the publisher's retention duration). The newest group +(`max_sequence`) is never evicted. + +- **Wall-clock** — the group was *received* more than the window ago. The receive time is an + `Instant` stamped when the group lands in `groups`; it is never sent over the wire or set by the + publisher. This is the hard memory backstop: a publisher can't pin RAM by lying about media + timestamps. +- **Media-time** — the group's last frame timestamp is more than the window behind the live media + edge (the newest frame timestamp buffered). This bounds a startup stampede, where a burst of + buffered media arrives at once (all "received now", so the wall-clock gate alone would keep it + all) and a fresh subscriber would otherwise be flooded. + +In steady state, where media time advances with wall-clock time, the two gates coincide. They +diverge only under a stampede (media-time trims it) or timestamp abuse (wall-clock trims it). + +## Spill and serve + +```text +evict_expired: (synchronous, under the state lock) + for each group outside the window (not max_sequence): + tombstone it in `groups` + if a cache is attached: hand its live GroupConsumer to the flush task + +flush task: (one background task per cached track) + per eviction pass: drain the groups into cache::Group, write ONE disk segment, + then compact (roll the oldest disk segments up into one remote object, or evict + them when there is no remote tier) + +fetch_group(seq): + live hit in `groups` -> serve immediately + live miss, cache attached -> spawn an async disk/remote lookup; a hit + resolves the fetch, a miss chains upstream + (queues for a TrackDynamic), else NotFound + live miss, no cache -> queue for a TrackDynamic, or NotFound +``` + +`get_group(seq)` stays synchronous and only consults the live window; a spilled group is reachable +only through the async `fetch_group`. + +On a tier miss the lookup task chains upstream: it queues the request for a `TrackDynamic` (a wire +FETCH for a relay) when one exists, so the fetch then resolves once upstream serves the group into +the live window. Queuing only *after* the store misses keeps the store the fast path and avoids a +redundant upstream fetch when the group is already cached. With no handler, a miss is `NotFound`. + +Batching the disk write per eviction pass keeps a stampede-trim (many groups evicted at once) to a +single object. A steady-state single eviction still writes one small disk segment per group; the +remote tier is where rollup (`segment::rollup`) concatenates those into large objects, so a +per-frame (audio) track does not litter object storage with tiny remote objects. + +## Tiers and the byte format + +RAM is always present and dependency-free. Disk and remote are `object_store`, target-gated to +non-wasm targets (`cfg(not(target_arch = "wasm32"))`) so native builds get the tiers with no flag +and wasm builds drop the server-side cloud stack automatically. + +The on-disk format lives in `segment.rs`: a band of groups serialized as one self-describing +object (a footer offset table read from a fixed trailer), lossless per-frame timestamps (raw +value + scale, so any timescale round-trips), `rollup` to concatenate small segments into one +larger object, and `group_from_blob` for the ranged-read decode path. `index.rs` is the +storage-agnostic multi-tier index (`sequence -> (tier, segment, byte range)`), per-tier byte and +duration accounting, and the promotion that picks the oldest disk segments over the disk high +watermark. `store.rs` is the `object_store` glue tying them together. The disk `Bounds` (a +low/high watermark) govern when disk segments roll up to remote, independent of the RAM retention +window above. + +## Bridging live <-> cached + +`cache::Group::read` drains a finished live `GroupConsumer` into the serializable `cache::Group` +(done on the flush task, off the state lock). `cache::Group::produce` rebuilds a live +`GroupConsumer` from a stored group at the track's timescale, for serving a fetch. + +## Still design + +- **Removing `TrackInfo::cache`.** The retention window is still read from the wire-carried + `TrackInfo::cache`. Making retention purely local policy (and dropping the wire field) is a + separate wire change. +- **moq-cli / moq-relay flags.** Surfacing `with_cache` as CLI/TOML configuration (a disk path, a + remote URL, bounds) is follow-up work; the model API is in place. diff --git a/rs/moq-net/Cargo.toml b/rs/moq-net/Cargo.toml index f7ca2afba..721dcd528 100644 --- a/rs/moq-net/Cargo.toml +++ b/rs/moq-net/Cargo.toml @@ -31,6 +31,12 @@ tracing = "0.1" web-async = { workspace = true } web-transport-trait = { workspace = true } +# The disk/remote cache tiers use object_store, a server-side library that doesn't build for +# wasm (browsers don't spill to disk/S3). Target-gated rather than feature-gated so native builds +# get the tiers automatically with no flag, and wasm builds drop them automatically. +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +object_store = { workspace = true } + [dev-dependencies] # test-util (tokio::time::pause/advance) is test-only and is NOT supported on # wasm, so it must not leak into the normal dependency feature set. diff --git a/rs/moq-net/src/model/cache/index.rs b/rs/moq-net/src/model/cache/index.rs new file mode 100644 index 000000000..9b88fe84f --- /dev/null +++ b/rs/moq-net/src/model/cache/index.rs @@ -0,0 +1,429 @@ +//! Multi-tier index: which segment, in which tier, holds each group, and which segments to promote. +//! +//! This is the storage-agnostic orchestration the disk and remote tiers run on top of the +//! [`segment`](super::segment) format. It records, per group sequence, a [`Location`] (tier + +//! segment + byte range), so a fetch is "look up the location, ranged-read that segment." It also +//! drives **promotion**: when the disk tier grows past its bound, [`Index::promotion`] picks the +//! oldest disk segments to compact, and after the caller rolls them into one remote object +//! ([`segment::rollup`](super::segment::rollup)) [`Index::apply_promotion`] repoints those +//! sequences at the remote tier and drops the disk segments. +//! +//! The index holds only metadata (offsets, sizes, timestamps), never group bytes, so it is the +//! piece that stays in memory while the bytes live on disk or in remote storage. The actual I/O +//! (object_store `put` / `get_range` / `delete`) is a thin layer that calls these methods for its +//! decisions; nothing here blocks or allocates per byte. + +use std::collections::{BTreeMap, HashSet}; +use std::time::Duration; + +use super::segment::Segment; +use super::{Bounds, Limit}; + +/// Identifier for a stored segment, assigned in creation order (so a lower id is older). +pub type SegmentId = u64; + +/// Which durable tier a segment lives in. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Tier { + /// Local disk: the staging tier that batches flushed RAM bands. + Disk, + /// Remote object storage: the long-term tier disk segments roll up into. + Remote, +} + +/// Where a group's bytes live: which tier and segment, and the byte range within that segment. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct Location { + /// The tier holding the segment. + pub tier: Tier, + /// The segment within that tier. + pub segment: SegmentId, + /// Byte offset of the group blob within the segment object. + pub offset: u64, + /// Byte length of the group blob. + pub length: u64, +} + +/// Per-segment bookkeeping for tier accounting and promotion. +struct Meta { + tier: Tier, + bytes: u64, + /// Timestamp extent, as durations (a common scale), so cross-timescale segments compare. + ts_min: Option, + ts_max: Option, +} + +/// A map from group sequence to its [`Location`], plus per-segment metadata for promotion. +#[derive(Default)] +pub struct Index { + groups: BTreeMap, + segments: BTreeMap, + next_id: SegmentId, +} + +impl Index { + /// An empty index. + pub fn new() -> Self { + Self::default() + } + + /// The id the next [`add`](Self::add) (or [`apply_promotion`](Self::apply_promotion)) will + /// assign. A caller can put the object under this key *before* recording it, so a failed write + /// leaves the index unchanged. + pub fn next_id(&self) -> SegmentId { + self.next_id + } + + /// Record a freshly written `segment` on `tier`, returning its new id. Each group in the + /// segment becomes locatable; an already-present sequence is repointed to this segment (this + /// is how [`apply_promotion`](Self::apply_promotion) moves sequences to the remote tier). + pub fn add(&mut self, tier: Tier, segment: &Segment) -> SegmentId { + let id = self.next_id; + self.next_id += 1; + + let mut ts_min: Option = None; + let mut ts_max: Option = None; + + for entry in segment.entries() { + self.groups.insert( + entry.sequence, + Location { + tier, + segment: id, + offset: entry.offset, + length: entry.length, + }, + ); + if let Some(t) = entry.ts_first { + let d = Duration::from(t); + ts_min = Some(ts_min.map_or(d, |m| m.min(d))); + } + if let Some(t) = entry.ts_last { + let d = Duration::from(t); + ts_max = Some(ts_max.map_or(d, |m| m.max(d))); + } + } + + self.segments.insert( + id, + Meta { + tier, + bytes: segment.byte_len() as u64, + ts_min, + ts_max, + }, + ); + id + } + + /// Where the group with this sequence lives, or `None` if it is not in any tier. + pub fn locate(&self, sequence: u64) -> Option { + self.groups.get(&sequence).copied() + } + + /// Total bytes stored in `tier`. Test-only accounting (promotion uses `stats`). + #[cfg(test)] + pub fn bytes(&self, tier: Tier) -> u64 { + self.segments.values().filter(|m| m.tier == tier).map(|m| m.bytes).sum() + } + + /// Number of segments in `tier`. Test-only. + #[cfg(test)] + pub fn segment_count(&self, tier: Tier) -> usize { + self.segments.values().filter(|m| m.tier == tier).count() + } + + /// Segment ids in `tier`, oldest first. + fn tier_segments(&self, tier: Tier) -> Vec { + // BTreeMap iterates by id, which is creation order, i.e. oldest first. + self.segments + .iter() + .filter(|(_, m)| m.tier == tier) + .map(|(id, _)| *id) + .collect() + } + + /// Total bytes and timestamp span across a set of segments. + fn stats(&self, ids: &[SegmentId]) -> (u64, Duration) { + let mut bytes = 0; + let mut lo: Option = None; + let mut hi: Option = None; + for id in ids { + let Some(m) = self.segments.get(id) else { continue }; + bytes += m.bytes; + if let Some(d) = m.ts_min { + lo = Some(lo.map_or(d, |x| x.min(d))); + } + if let Some(d) = m.ts_max { + hi = Some(hi.map_or(d, |x| x.max(d))); + } + } + let span = match (lo, hi) { + (Some(a), Some(b)) => b.saturating_sub(a), + _ => Duration::ZERO, + }; + (bytes, span) + } + + /// Whether `(bytes, span)` trips a high watermark. An all-unset limit is unbounded. + fn over_max(stats: (u64, Duration), max: Limit) -> bool { + !max.is_unset() && Self::over(stats, max) + } + + /// Whether `(bytes, span)` is still above a low watermark. An all-unset limit is a floor of + /// zero, so any non-empty content is above it. + fn above_min(stats: (u64, Duration), min: Limit) -> bool { + if min.is_unset() { + return stats.0 > 0; + } + Self::over(stats, min) + } + + fn over((bytes, span): (u64, Duration), limit: Limit) -> bool { + limit.bytes.is_some_and(|b| bytes > b) || limit.duration.is_some_and(|d| span > d) + } + + /// The oldest disk segments to promote so the disk tier returns within `bounds`. Empty unless + /// the disk tier is over its high watermark; otherwise the oldest segments are selected until + /// what remains is within the low watermark, oldest first (the order to roll them up in). + /// + /// The single newest disk segment is never selected, mirroring the RAM tier's "keep the latest + /// group" rule. Without this, an unset low watermark (a floor of zero) would drain the whole + /// disk tier on one over-max trip, which for the no-remote eviction path is data loss of even + /// the most recent flushed groups. + pub fn promotion(&self, bounds: Bounds) -> Vec { + let disk = self.tier_segments(Tier::Disk); + if !Self::over_max(self.stats(&disk), bounds.max) { + return Vec::new(); + } + + let mut promote = Vec::new(); + let mut remaining = disk; + // Keep at least the newest segment (`len() > 1`), oldest-first. + while remaining.len() > 1 && Self::above_min(self.stats(&remaining), bounds.min) { + promote.push(remaining.remove(0)); + } + promote + } + + /// Register `remote` (the rollup of `promoted`) on the remote tier, repoint its sequences, and + /// drop the promoted disk segments. Returns the new remote segment id. `remote` must contain + /// exactly the groups of `promoted`; any sequence missing from it is dropped from the index. + pub fn apply_promotion(&mut self, promoted: &[SegmentId], remote: &Segment) -> SegmentId { + let new_id = self.add(Tier::Remote, remote); + + let promoted: HashSet = promoted.iter().copied().collect(); + // `add` already repointed every sequence in `remote` to `new_id`; anything still pointing + // at a promoted segment was not in the rollup, so drop it. + self.groups.retain(|_, loc| !promoted.contains(&loc.segment)); + self.segments.retain(|id, _| !promoted.contains(id)); + new_id + } + + /// Drop a set of segments and the group locations pointing at them. Used to evict from the + /// disk tier when there is no remote tier to promote into. + pub fn evict(&mut self, segments: &[SegmentId]) { + let drop: HashSet = segments.iter().copied().collect(); + self.groups.retain(|_, loc| !drop.contains(&loc.segment)); + self.segments.retain(|id, _| !drop.contains(id)); + } +} + +#[cfg(test)] +mod tests { + use super::super::segment; + use super::super::{Frame, Group}; + use super::*; + use crate::Timestamp; + use bytes::Bytes; + use std::collections::HashMap; + + /// A one-frame group of `bytes` bytes at `secs` seconds, so segments carry a timestamp span. + fn group(sequence: u64, bytes: usize, secs: u64) -> Group { + Group { + sequence, + frames: vec![Frame { + timestamp: Some(Timestamp::from_secs(secs).unwrap()), + payload: Bytes::from(vec![7u8; bytes]), + }], + } + } + + fn encoded(groups: &[Group]) -> Bytes { + segment::encode(groups).unwrap() + } + + /// A tiny stand-in for the eventual object_store: segment id -> bytes. Mirrors what the I/O + /// layer will do (put on add, get_range on locate), so the index logic is exercised end to end. + #[derive(Default)] + struct Store { + objects: HashMap, + } + + impl Store { + /// Read a group as the real tier will: ranged-read `[offset, offset+length)`, decode the + /// blob with `group_from_blob`. No footer, no full-segment parse. + fn read(&self, sequence: u64, loc: Location) -> Group { + let bytes = &self.objects[&loc.segment]; + let blob = bytes.slice(loc.offset as usize..(loc.offset + loc.length) as usize); + segment::group_from_blob(sequence, blob).unwrap() + } + } + + #[test] + fn add_and_locate_disk() { + let mut index = Index::new(); + let seg = Segment::open(encoded(&[group(0, 10, 0), group(1, 10, 1)])).unwrap(); + let id = index.add(Tier::Disk, &seg); + + let loc0 = index.locate(0).unwrap(); + assert_eq!(loc0.tier, Tier::Disk); + assert_eq!(loc0.segment, id); + assert!(index.locate(2).is_none()); + // The footer entry and the index agree on the byte range. + assert_eq!( + (loc0.offset, loc0.length), + (seg.entries()[0].offset, seg.entries()[0].length) + ); + } + + #[test] + fn tier_byte_accounting() { + let mut index = Index::new(); + let a = Segment::open(encoded(&[group(0, 100, 0)])).unwrap(); + let b = Segment::open(encoded(&[group(1, 50, 1)])).unwrap(); + index.add(Tier::Disk, &a); + index.add(Tier::Disk, &b); + assert_eq!(index.bytes(Tier::Disk), a.byte_len() as u64 + b.byte_len() as u64); + assert_eq!(index.segment_count(Tier::Disk), 2); + assert_eq!(index.bytes(Tier::Remote), 0); + } + + #[test] + fn promotion_empty_within_bounds() { + let mut index = Index::new(); + index.add(Tier::Disk, &Segment::open(encoded(&[group(0, 10, 0)])).unwrap()); + // A high watermark well above the single small segment: nothing to promote. + let bounds = Bounds::new(Limit::bytes(0), Limit::bytes(1_000_000)); + assert!(index.promotion(bounds).is_empty()); + } + + #[test] + fn promotion_selects_oldest_over_high_watermark() { + let mut index = Index::new(); + let mut ids = Vec::new(); + for seq in 0..5u64 { + let seg = Segment::open(encoded(&[group(seq, 100, seq)])).unwrap(); + ids.push(index.add(Tier::Disk, &seg)); + } + // Each segment is >100 bytes; keep ~150 bytes, flush over ~350. + let bounds = Bounds::new(Limit::bytes(150), Limit::bytes(350)); + let promote = index.promotion(bounds); + + // Oldest-first, leaving the remainder within the low watermark. + assert_eq!(&promote[..], &ids[..promote.len()]); + assert!(!promote.is_empty()); + let remaining: Vec = ids[promote.len()..].to_vec(); + assert!(index.bytes(Tier::Disk) > 0); + // What remains must be within the low watermark (<= 150 bytes worth of segments). + let remaining_bytes: u64 = remaining.iter().map(|id| index.segments[id].bytes).sum(); + assert!(remaining_bytes <= 150, "remaining {remaining_bytes} over low watermark"); + } + + #[test] + fn promotion_duration_watermark() { + let mut index = Index::new(); + // Segments at 0s, 1s, 2s, 3s; keep 1s, flush over 2s of span. + for seq in 0..4u64 { + index.add(Tier::Disk, &Segment::open(encoded(&[group(seq, 10, seq)])).unwrap()); + } + let bounds = Bounds::new( + Limit::duration(Duration::from_secs(1)), + Limit::duration(Duration::from_secs(2)), + ); + let promote = index.promotion(bounds); + assert!(!promote.is_empty(), "3s span should exceed the 2s high watermark"); + } + + #[test] + fn apply_promotion_repoints_to_remote() { + let mut index = Index::new(); + let g0 = group(0, 100, 0); + let g1 = group(1, 100, 1); + let g2 = group(2, 100, 2); + let s0 = index.add(Tier::Disk, &Segment::open(encoded(std::slice::from_ref(&g0))).unwrap()); + let s1 = index.add(Tier::Disk, &Segment::open(encoded(std::slice::from_ref(&g1))).unwrap()); + index.add(Tier::Disk, &Segment::open(encoded(std::slice::from_ref(&g2))).unwrap()); + + // Roll up the two oldest disk segments into one remote object. + let promoted = [s0, s1]; + let rolled = segment::rollup(&[encoded(&[g0]), encoded(&[g1])]).unwrap(); + let remote = Segment::open(rolled).unwrap(); + let new_id = index.apply_promotion(&promoted, &remote); + + // Sequences 0 and 1 now live remotely in one segment; the disk segments are gone. + assert_eq!(index.locate(0).unwrap().tier, Tier::Remote); + assert_eq!(index.locate(1).unwrap().tier, Tier::Remote); + assert_eq!(index.locate(0).unwrap().segment, new_id); + assert_eq!(index.locate(1).unwrap().segment, new_id); + // Sequence 2 is untouched on disk. + assert_eq!(index.locate(2).unwrap().tier, Tier::Disk); + // Disk dropped the two promoted segments; remote gained one. + assert_eq!(index.segment_count(Tier::Disk), 1); + assert_eq!(index.segment_count(Tier::Remote), 1); + } + + #[test] + fn evict_drops_segments_and_their_locations() { + let mut index = Index::new(); + let a = index.add(Tier::Disk, &Segment::open(encoded(&[group(0, 10, 0)])).unwrap()); + index.add(Tier::Disk, &Segment::open(encoded(&[group(1, 10, 1)])).unwrap()); + + index.evict(&[a]); + assert!(index.locate(0).is_none(), "evicted segment's groups are gone"); + assert!(index.locate(1).is_some(), "other segment untouched"); + assert_eq!(index.segment_count(Tier::Disk), 1); + } + + #[test] + fn end_to_end_locate_then_read_through_promotion() { + // Build disk segments, store their bytes, and verify a located group decodes correctly + // both before and after promotion (the rollup repoints offsets, the read still matches). + let mut index = Index::new(); + let mut store = Store::default(); + + let groups = [group(0, 40, 0), group(1, 40, 1), group(2, 40, 2)]; + for g in &groups { + let bytes = encoded(std::slice::from_ref(g)); + let id = index.add(Tier::Disk, &Segment::open(bytes.clone()).unwrap()); + store.objects.insert(id, bytes); + } + + // Before promotion: each group reads back identically from its disk location. + for g in &groups { + let loc = index.locate(g.sequence).unwrap(); + assert_eq!(&store.read(g.sequence, loc), g); + } + + // Promote sequences 0 and 1 into one remote object. + let promoted = [index.locate(0).unwrap().segment, index.locate(1).unwrap().segment]; + let rolled = segment::rollup(&[ + encoded(std::slice::from_ref(&groups[0])), + encoded(std::slice::from_ref(&groups[1])), + ]) + .unwrap(); + let remote_id = index.apply_promotion(&promoted, &Segment::open(rolled.clone()).unwrap()); + store.objects.insert(remote_id, rolled); + + // After promotion: every group still reads back identically, now via the remote segment. + for g in &groups { + let loc = index.locate(g.sequence).unwrap(); + assert_eq!( + &store.read(g.sequence, loc), + g, + "sequence {} mismatched after promotion", + g.sequence + ); + } + } +} diff --git a/rs/moq-net/src/model/cache/mod.rs b/rs/moq-net/src/model/cache/mod.rs new file mode 100644 index 000000000..12c212383 --- /dev/null +++ b/rs/moq-net/src/model/cache/mod.rs @@ -0,0 +1,468 @@ +//! Per-track durable cache: the disk/remote spill tiers behind a track's live RAM window. +//! +//! The RAM tier is the track's own live group buffer ([`crate::TrackProducer`]'s `groups`); this +//! module is everything below it. When a group ages out of that window (see the two retention gates +//! in `track.rs`), it is serialized through `Group` and handed to the disk tier; a fetch that misses +//! the live window then reads it back from disk (or remote) instead of failing. +//! +//! A cache is local policy attached to a single track, independent of any retention the original +//! publisher set (it is never carried on the wire). Attach one with +//! [`crate::TrackProducer::with_cache`]; the disk tier and an optional remote rollup target are +//! described by `Disk`. Because the cache lives on the shared track state, the same store backs +//! the track's producer and every consumer, so a fetch is served from whichever tier holds the +//! group. +//! +//! The `segment` submodule is the on-disk byte format (a band of groups serialized as one +//! self-describing object) plus the rollup that concatenates several small segments into one larger +//! object. `Group::read` / `Group::produce` bridge a cached group to and from the live group model. +//! The `store` submodule is the object_store glue (native-only). + +use std::sync::Arc; +use std::time::Duration; + +use bytes::Bytes; + +use super::{Timescale, Timestamp}; + +#[cfg(not(target_arch = "wasm32"))] +use object_store::{ObjectStore, path::Path}; + +// Internal orchestration for the disk/remote tiers; not part of the public surface, and only +// needed (and only buildable) where object_store is available. +#[cfg(not(target_arch = "wasm32"))] +mod index; + +pub mod segment; + +/// Disk and remote tiers backed by object_store. Native-only (object_store doesn't build on wasm). +#[cfg(not(target_arch = "wasm32"))] +pub mod store; + +/// A cache bound, as a duration, a byte count, or both (the first to trip wins). +/// +/// All-`None` means "no threshold": as a high watermark that is unbounded (never flush), as a low +/// watermark that is a floor of zero (drain everything but the latest group). +#[derive(Clone, Copy, Debug, Default)] +pub struct Limit { + /// Bound on the span between the oldest and newest buffered group's media timestamps. + pub duration: Option, + /// Bound on the total bytes of buffered group frames. + pub bytes: Option, +} + +impl Limit { + /// A duration-only limit. + pub fn duration(duration: Duration) -> Self { + Self { + duration: Some(duration), + bytes: None, + } + } + + /// A byte-only limit. + pub fn bytes(bytes: u64) -> Self { + Self { + duration: None, + bytes: Some(bytes), + } + } + + /// Whether both thresholds are unset (so the limit imposes no ceiling). + pub(crate) fn is_unset(&self) -> bool { + self.duration.is_none() && self.bytes.is_none() + } +} + +/// A low/high watermark pair. The gap between them is the flush batch size. +#[derive(Clone, Copy, Debug, Default)] +pub struct Bounds { + /// Low watermark: a flush drains down to this. + pub min: Limit, + /// High watermark: exceeding it triggers a flush. + pub max: Limit, +} + +impl Bounds { + /// Build bounds from a low and high watermark. + pub fn new(min: Limit, max: Limit) -> Self { + Self { min, max } + } +} + +/// The disk spill tier: an object store, a key prefix, retention bounds, and an optional remote +/// store the disk tier rolls up into. Native-only (`object_store` does not build on wasm). Build +/// with [`Disk::new`], optionally [`with_remote`](Disk::with_remote), then attach via +/// [`crate::TrackProducer::with_cache`]. +#[cfg(not(target_arch = "wasm32"))] +#[derive(Clone, Debug)] +#[non_exhaustive] +pub struct Disk { + /// The object store for the disk tier (e.g. a `LocalFileSystem`). + pub store: Arc, + /// Key prefix under which segments are written. + pub prefix: Path, + /// Retention bounds on the disk tier; exceeding them rolls up to `remote` (or evicts). + pub bounds: Bounds, + /// Optional remote store the disk tier rolls up into when over its bounds. + pub remote: Option>, +} + +#[cfg(not(target_arch = "wasm32"))] +impl Disk { + /// A disk tier over `store`, writing under `prefix`, capped by `bounds`. No remote rollup. + pub fn new(store: Arc, prefix: Path, bounds: Bounds) -> Self { + Self { + store, + prefix, + bounds, + remote: None, + } + } + + /// Set the remote store the disk tier rolls up into. + pub fn with_remote(mut self, remote: Arc) -> Self { + self.remote = Some(remote); + self + } +} + +/// One frame within a cached group: its optional media timestamp and its payload. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Frame { + /// The frame's media timestamp, if the track carries them. + pub timestamp: Option, + /// The frame's payload bytes. + pub payload: Bytes, +} + +/// One cached group: its sequence and frames, enough to re-serve it or serialize it to a tier. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Group { + /// The group's sequence number within its track. + pub sequence: u64, + /// The group's frames, in order. + pub frames: Vec, +} + +impl Group { + /// Total size of the group's frame payloads in bytes. + pub fn size(&self) -> u64 { + self.frames.iter().map(|f| f.payload.len() as u64).sum() + } + + /// The first frame's media timestamp, if any. Used as the group's lower time bound. + pub fn ts_first(&self) -> Option { + self.frames.first().and_then(|f| f.timestamp) + } + + /// The last frame's media timestamp, if any. Used as the group's upper time bound. + pub fn ts_last(&self) -> Option { + self.frames.last().and_then(|f| f.timestamp) + } + + /// Drain a live [`GroupConsumer`](crate::GroupConsumer) into a cached group, reading every + /// frame's payload and timestamp. Resolves once the group is finished, so this is how an evicted + /// group is snapshotted before it is written to a tier. + pub async fn read(mut group: crate::GroupConsumer) -> Result { + let sequence = group.sequence; + let mut frames = Vec::new(); + while let Some(mut frame) = group.next_frame().await? { + let timestamp = frame.timestamp; + let payload = frame.read_all().await?; + frames.push(Frame { timestamp, payload }); + } + Ok(Self { sequence, frames }) + } + + /// Rebuild a live [`GroupConsumer`](crate::GroupConsumer) from this cached group, for serving a + /// fetch. `timescale` must match the track's: each frame timestamp is validated against it. + pub fn produce(&self, timescale: impl Into>) -> Result { + let mut producer = crate::GroupProducer::new( + crate::Group { + sequence: self.sequence, + }, + timescale.into(), + ); + for frame in &self.frames { + let info = crate::Frame { + size: frame.payload.len() as u64, + timestamp: frame.timestamp, + }; + let mut chunk = producer.create_frame(info)?; + chunk.write(frame.payload.clone())?; + chunk.finish()?; + } + producer.finish()?; + Ok(producer.consume()) + } +} + +/// A band of groups serialized to a tier in one flush, oldest first. +pub type Batch = Vec; + +/// Backlog of eviction passes the flush task may fall behind before evicted groups are dropped +/// rather than queued. A queued pass pins its groups' frame buffers, so an unbounded queue would +/// let a slow disk migrate the RAM the live tier just freed into the channel. The cache is +/// best-effort, so on overflow we drop (creating a hole) instead of growing memory. +#[cfg(not(target_arch = "wasm32"))] +const FLUSH_BACKLOG: usize = 256; + +/// The disk/remote spill handle held on a track's shared state. +/// +/// Holds a sender to a background task that drains evicted groups to the disk tier, and the store +/// itself for fetch reads. Native-only (`object_store` does not build on wasm). Constructed by +/// [`crate::TrackProducer::with_cache`]. +#[cfg(not(target_arch = "wasm32"))] +pub(crate) struct Tiers { + /// Hands each batch of evicted live groups to the background flush task. Bounded (see + /// [`FLUSH_BACKLOG`]); a full channel drops rather than blocks the eviction path. + flush: tokio::sync::mpsc::Sender>, + /// The disk/remote store, shared with the flush task; used to serve fetch misses. + store: Arc>, +} + +#[cfg(not(target_arch = "wasm32"))] +impl Tiers { + /// Build the store and spawn the background task that serializes evicted groups into it. + pub(crate) fn spawn(disk: Disk) -> Self { + let store = store::Store::new(disk.store, disk.remote, disk.prefix, disk.bounds); + let store = Arc::new(tokio::sync::RwLock::new(store)); + let (flush, mut rx) = tokio::sync::mpsc::channel::>(FLUSH_BACKLOG); + let writer = store.clone(); + web_async::spawn(async move { + while let Some(first) = rx.recv().await { + // Coalesce every eviction pass already queued into one segment, so a backlog (or a + // stampede-trim) becomes one disk object rather than one per pass. + let mut passes = vec![first]; + while let Ok(more) = rx.try_recv() { + passes.push(more); + } + + let mut batch = Batch::new(); + for consumer in passes.into_iter().flatten() { + match Group::read(consumer).await { + Ok(group) => batch.push(group), + // A group torn down before we drained it (e.g. abort) is dropped, not cached. + Err(err) => tracing::debug!(%err, "skipped uncacheable evicted group"), + } + } + if batch.is_empty() { + continue; + } + // Keep groups in ascending sequence so the segment's footer is ordered. + batch.sort_by_key(|group| group.sequence); + if let Err(err) = writer.write().await.flush(batch).await { + tracing::warn!(%err, "cache disk flush failed"); + continue; + } + + // Compact in phases so the slow remote upload runs without the store lock that + // fetches need: plan (locked) snapshots the rollup, upload (unlocked) does the remote + // put, apply (locked) repoints the index. Bind each phase to its own statement so the + // lock guard drops at the `;` rather than being held (a held guard would also + // deadlock the re-entrant `write()` in apply). + let planned = writer.write().await.plan_compaction().await; + match planned { + Ok(Some(rollup)) => { + if let Err(err) = rollup.upload().await { + // The index still points at the intact disk segments; safe to leave. + tracing::warn!(%err, "cache remote rollup upload failed"); + } else if let Err(err) = writer.write().await.apply_compaction(rollup).await { + tracing::warn!(%err, "cache rollup apply failed"); + } + } + Ok(None) => {} + Err(err) => tracing::warn!(%err, "cache compaction planning failed"), + } + } + }); + Self { flush, store } + } + + /// Hand a batch of evicted live groups to the flush task. Non-blocking (the caller holds the + /// track state lock): a full backlog or a gone task drops the batch rather than waiting, leaving + /// a hole in the best-effort cache instead of stalling eviction or growing RAM. + pub(crate) fn evict(&self, groups: Vec) { + if groups.is_empty() { + return; + } + if let Err(err) = self.flush.try_send(groups) { + let dropped = match &err { + tokio::sync::mpsc::error::TrySendError::Full(g) => g.len(), + tokio::sync::mpsc::error::TrySendError::Closed(g) => g.len(), + }; + tracing::warn!(dropped, "cache flush backlog full; dropping evicted groups"); + } + } + + /// A handle to the shared disk/remote store, for serving a fetch off the track's poll path. + pub(crate) fn store_handle(&self) -> Arc> { + self.store.clone() + } + + /// Fetch a group from the disk/remote tiers, rebuilt at `timescale`. `None` on a miss or any + /// tier read / rebuild error (a fetch falls through to the live path). + #[cfg(test)] + pub(crate) async fn fetch(&self, sequence: u64, timescale: Option) -> Option { + let group = self.store.read().await.get(sequence).await.ok()??; + group.produce(timescale).ok() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// A frame of `bytes` zero bytes at an optional micros timestamp. + fn frame(bytes: usize, ts_micros: Option) -> Frame { + Frame { + timestamp: ts_micros.map(|t| Timestamp::from_micros(t).unwrap()), + payload: Bytes::from(vec![0u8; bytes]), + } + } + + /// A two-frame group spanning `[t0, t1]` micros, total `bytes`. + fn timed(seq: u64, bytes: usize, t0: u64, t1: u64) -> Group { + Group { + sequence: seq, + frames: vec![frame(bytes / 2, Some(t0)), frame(bytes - bytes / 2, Some(t1))], + } + } + + #[test] + fn size_sums_frame_bytes() { + let g = Group { + sequence: 0, + frames: vec![frame(10, None), frame(10, None), frame(10, None)], + }; + assert_eq!(g.size(), 30); + } + + #[test] + fn ts_first_and_last() { + let g = timed(0, 8, 100, 900); + assert_eq!(g.ts_first(), Some(Timestamp::from_micros(100).unwrap())); + assert_eq!(g.ts_last(), Some(Timestamp::from_micros(900).unwrap())); + } + + #[tokio::test] + async fn bridge_round_trips_a_live_group() { + // Build a live timed group, drain it into a cached group, rebuild a live one, drain again, + // and confirm the two cached snapshots match (payloads and per-frame timestamps survive). + let scale = Timescale::new(1_000_000).unwrap(); + let mut live = crate::GroupProducer::new(crate::Group { sequence: 4 }, Some(scale)); + for (i, payload) in [b"hello".as_slice(), b"world".as_slice()].into_iter().enumerate() { + let info = crate::Frame { + size: payload.len() as u64, + timestamp: Some(Timestamp::new(i as u64 * 1000, scale).unwrap()), + }; + let mut frame = live.create_frame(info).unwrap(); + frame.write(Bytes::copy_from_slice(payload)).unwrap(); + frame.finish().unwrap(); + } + live.finish().unwrap(); + + let cached = Group::read(live.consume()).await.unwrap(); + assert_eq!(cached.sequence, 4); + assert_eq!(cached.frames.len(), 2); + assert_eq!(cached.frames[0].payload, Bytes::from_static(b"hello")); + assert_eq!(cached.frames[1].timestamp, Some(Timestamp::new(1000, scale).unwrap())); + + let rebuilt = Group::read(cached.produce(scale).unwrap()).await.unwrap(); + assert_eq!(cached, rebuilt); + } + + #[tokio::test] + async fn bridge_untimed_group() { + // An untimed track (no timescale, no frame timestamps) round-trips too. + let mut live = crate::GroupProducer::new(crate::Group { sequence: 0 }, None); + live.write_frame(Bytes::from_static(b"data")).unwrap(); + live.finish().unwrap(); + + let cached = Group::read(live.consume()).await.unwrap(); + assert_eq!(cached.frames.len(), 1); + assert_eq!(cached.frames[0].timestamp, None); + assert_eq!(Group::read(cached.produce(None).unwrap()).await.unwrap(), cached); + } + + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test] + async fn tiers_evict_then_fetch_back() { + use object_store::memory::InMemory; + use object_store::path::Path; + + // Disk is unbounded so it keeps everything handed to it. + let disk = Disk::new(Arc::new(InMemory::new()), Path::from("cache"), Bounds::default()); + let tiers = Tiers::spawn(disk); + + // Build three finished live groups and hand them to the flush task as one eviction pass. + let mut consumers = Vec::new(); + for seq in 0..3u64 { + let mut live = crate::GroupProducer::new(crate::Group { sequence: seq }, None); + live.write_frame(Bytes::from(vec![seq as u8; 100])).unwrap(); + live.finish().unwrap(); + consumers.push(live.consume()); + } + tiers.evict(consumers); + + // The background task writes them to disk; fetch reads them back. + let mut fetched = None; + for _ in 0..200 { + if let Some(group) = tiers.fetch(0, None).await { + fetched = Some(group); + break; + } + tokio::task::yield_now().await; + } + let mut group = fetched.expect("group 0 fetched from disk"); + assert_eq!(group.sequence, 0); + assert_eq!(group.read_frame().await.unwrap().unwrap(), Bytes::from(vec![0u8; 100])); + assert!(tiers.fetch(2, None).await.is_some()); + assert!(tiers.fetch(99, None).await.is_none()); + } + + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test] + async fn tiers_roll_up_to_remote_and_fetch() { + use object_store::memory::InMemory; + use object_store::path::Path; + + // Disk keeps ~1 segment (promote over budget); the rolled-up bytes go to the remote tier. + // This exercises the phased plan -> upload (off-lock) -> apply path in the flush task. + let bounds = Bounds::new(Limit::bytes(1100), Limit::bytes(2000)); + let disk = + Disk::new(Arc::new(InMemory::new()), Path::from("cache"), bounds).with_remote(Arc::new(InMemory::new())); + let tiers = Tiers::spawn(disk); + + // Evict five ~1 KB groups, one pass at a time, so the disk tier exceeds budget and rolls up. + for seq in 0..5u64 { + let mut live = crate::GroupProducer::new(crate::Group { sequence: seq }, None); + live.write_frame(Bytes::from(vec![seq as u8; 1000])).unwrap(); + live.finish().unwrap(); + tiers.evict(vec![live.consume()]); + // Let the flush task process this pass (flush + compaction) before the next eviction, so + // each becomes its own segment rather than coalescing into one. + for _ in 0..50 { + tokio::task::yield_now().await; + } + } + + // Every group is still fetchable, whether it stayed on disk or rolled up to the remote tier. + for seq in 0..5u64 { + let mut found = None; + for _ in 0..200 { + if let Some(group) = tiers.fetch(seq, None).await { + found = Some(group); + break; + } + tokio::task::yield_now().await; + } + let mut group = found.unwrap_or_else(|| panic!("group {seq} fetchable after rollup")); + assert_eq!(group.sequence, seq); + assert_eq!( + group.read_frame().await.unwrap().unwrap(), + Bytes::from(vec![seq as u8; 1000]) + ); + } + } +} diff --git a/rs/moq-net/src/model/cache/segment.rs b/rs/moq-net/src/model/cache/segment.rs new file mode 100644 index 000000000..d84509a66 --- /dev/null +++ b/rs/moq-net/src/model/cache/segment.rs @@ -0,0 +1,491 @@ +//! On-disk byte format for the cache's disk and remote tiers. +//! +//! A *segment* is one band of groups ([`super::Batch`]) serialized as a single self-describing +//! object: the group blobs back to back, then a footer holding a per-group offset table, then an +//! 8-byte trailer (footer length + magic). Because the trailer is last and fixed-size, a reader +//! can fetch it with one tail-ranged GET, parse the footer, then fetch just the byte range of the +//! group it wants. Each group blob is itself self-delimiting (frame count, then length-prefixed +//! frames carrying their optional media timestamp), so frames round-trip losslessly. +//! +//! `rollup` concatenates several small segments into one larger object, rewriting the offset +//! table. It copies group blobs verbatim (no frame re-encoding), so it is cheap and lossless; it +//! is how the disk tier compacts into one remote object. + +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +use super::{Frame, Group}; +use crate::{DecodeError, EncodeError, Timescale, Timestamp, VarInt}; + +/// Magic trailer identifying a cache segment ("MOQS"). +const MAGIC: u32 = 0x4D4F_5153; + +/// Fixed trailer size: a little-endian u32 footer length followed by the u32 magic. +const TRAILER: usize = 8; + +/// An error decoding or encoding a [`Segment`]. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + /// The data is shorter than a declared length or the trailer. + #[error("segment truncated")] + Truncated, + /// The trailing magic did not match, so this is not a cache segment. + #[error("bad segment magic")] + BadMagic, + /// A varint or field failed to decode. + #[error(transparent)] + Decode(#[from] DecodeError), + /// A varint failed to encode. + #[error(transparent)] + Encode(#[from] EncodeError), + /// A value (varint or timestamp) was out of the representable range. + #[error("value out of range")] + Value, +} + +/// One row of a segment's footer: where a group lives and its summary, without decoding the blob. +#[derive(Clone, Debug, PartialEq, Eq)] +#[non_exhaustive] +pub struct GroupEntry { + /// The group's sequence number within its track. + pub sequence: u64, + /// Byte offset of the group blob within the segment. + pub offset: u64, + /// Byte length of the group blob. + pub length: u64, + /// Number of frames in the group. + pub frames: u64, + /// Media timestamp of the group's first frame, if any. + pub ts_first: Option, + /// Media timestamp of the group's last frame, if any. + pub ts_last: Option, +} + +/// Serialize a band of groups into one segment. +pub fn encode(batch: &[Group]) -> Result { + let mut buf = BytesMut::new(); + let mut entries = Vec::with_capacity(batch.len()); + + for group in batch { + let offset = buf.len() as u64; + put_group(&mut buf, group)?; + let length = buf.len() as u64 - offset; + entries.push(GroupEntry { + sequence: group.sequence, + offset, + length, + frames: group.frames.len() as u64, + ts_first: group.ts_first(), + ts_last: group.ts_last(), + }); + } + + write_footer(&mut buf, &entries)?; + Ok(buf.freeze()) +} + +/// Concatenate several segments into one, rewriting offsets. Group blobs are copied verbatim, so +/// this is lossless and does not re-encode frames. Entries keep their original order across the +/// inputs (segments are expected to cover disjoint, ascending sequence ranges). +pub fn rollup(segments: &[Bytes]) -> Result { + let mut buf = BytesMut::new(); + let mut entries = Vec::new(); + + for bytes in segments { + let segment = Segment::open(bytes.clone())?; + for entry in segment.entries() { + let blob = segment.blob(entry)?; + let offset = buf.len() as u64; + buf.extend_from_slice(&blob); + entries.push(GroupEntry { + offset, + ..entry.clone() + }); + } + } + + write_footer(&mut buf, &entries)?; + Ok(buf.freeze()) +} + +/// A parsed segment: the raw bytes plus its decoded footer. Cheap to clone (the bytes are shared). +#[derive(Clone)] +pub struct Segment { + data: Bytes, + entries: Vec, +} + +impl Segment { + /// Parse a segment from its full bytes. Reads the trailer, validates the magic, and decodes + /// the footer; group blobs are decoded lazily by [`group`](Self::group). + pub fn open(data: Bytes) -> Result { + let n = data.len(); + if n < TRAILER { + return Err(Error::Truncated); + } + + let trailer = &data[n - TRAILER..]; + let footer_len = u32::from_le_bytes(trailer[0..4].try_into().expect("4 bytes")) as usize; + let magic = u32::from_le_bytes(trailer[4..8].try_into().expect("4 bytes")); + if magic != MAGIC { + return Err(Error::BadMagic); + } + + let footer_end = n - TRAILER; + let footer_start = footer_end.checked_sub(footer_len).ok_or(Error::Truncated)?; + let entries = read_footer(data.slice(footer_start..footer_end))?; + + Ok(Self { data, entries }) + } + + /// The footer's offset table. + pub fn entries(&self) -> &[GroupEntry] { + &self.entries + } + + /// Number of groups in the segment. + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Whether the segment holds no groups. + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Total size of the segment object in bytes (blobs, footer, and trailer). + pub fn byte_len(&self) -> usize { + self.data.len() + } + + /// Decode the group with this sequence, or `None` if the segment does not contain it. + pub fn group(&self, sequence: u64) -> Option> { + let entry = self.entries.iter().find(|e| e.sequence == sequence)?; + Some(self.blob(entry).and_then(|b| group_from_blob(entry.sequence, b))) + } + + /// Decode the group at the given footer index. + pub fn group_at(&self, index: usize) -> Option> { + let entry = self.entries.get(index)?; + Some(self.blob(entry).and_then(|b| group_from_blob(entry.sequence, b))) + } + + /// The raw blob bytes for an entry, bounds-checked against the data. + fn blob(&self, entry: &GroupEntry) -> Result { + let start = entry.offset as usize; + let end = start.checked_add(entry.length as usize).ok_or(Error::Truncated)?; + if end > self.data.len() { + return Err(Error::Truncated); + } + Ok(self.data.slice(start..end)) + } +} + +/// Decode one group from just its blob bytes and known sequence. +/// +/// This is the ranged-read decode path: the disk/remote tier reads `[offset, offset+length)` for +/// a group (from the index) and decodes those bytes without the surrounding segment or footer. +pub fn group_from_blob(sequence: u64, mut blob: Bytes) -> Result { + let count = get_varint(&mut blob)? as usize; + let mut frames = Vec::with_capacity(count.min(8192)); + for _ in 0..count { + frames.push(get_frame(&mut blob)?); + } + Ok(Group { sequence, frames }) +} + +fn put_group(buf: &mut BytesMut, group: &Group) -> Result<(), Error> { + put_varint(buf, group.frames.len() as u64)?; + for frame in &group.frames { + put_frame(buf, frame)?; + } + Ok(()) +} + +fn put_frame(buf: &mut BytesMut, frame: &Frame) -> Result<(), Error> { + put_varint(buf, frame.payload.len() as u64)?; + let flags = u8::from(frame.timestamp.is_some()); + buf.put_u8(flags); + if let Some(ts) = frame.timestamp { + put_timestamp(buf, ts)?; + } + buf.extend_from_slice(&frame.payload); + Ok(()) +} + +fn get_frame(buf: &mut Bytes) -> Result { + let len = get_varint(buf)? as usize; + let flags = get_u8(buf)?; + let timestamp = if flags & 1 != 0 { + Some(get_timestamp(buf)?) + } else { + None + }; + if buf.remaining() < len { + return Err(Error::Truncated); + } + let payload = buf.copy_to_bytes(len); + Ok(Frame { timestamp, payload }) +} + +fn write_footer(buf: &mut BytesMut, entries: &[GroupEntry]) -> Result<(), Error> { + let start = buf.len(); + put_varint(buf, entries.len() as u64)?; + for entry in entries { + put_varint(buf, entry.sequence)?; + put_varint(buf, entry.offset)?; + put_varint(buf, entry.length)?; + put_varint(buf, entry.frames)?; + let flags = u8::from(entry.ts_first.is_some()) | (u8::from(entry.ts_last.is_some()) << 1); + buf.put_u8(flags); + if let Some(ts) = entry.ts_first { + put_timestamp(buf, ts)?; + } + if let Some(ts) = entry.ts_last { + put_timestamp(buf, ts)?; + } + } + let footer_len = (buf.len() - start) as u32; + buf.put_u32_le(footer_len); + buf.put_u32_le(MAGIC); + Ok(()) +} + +fn read_footer(mut body: Bytes) -> Result, Error> { + let count = get_varint(&mut body)? as usize; + let mut entries = Vec::with_capacity(count.min(65536)); + for _ in 0..count { + let sequence = get_varint(&mut body)?; + let offset = get_varint(&mut body)?; + let length = get_varint(&mut body)?; + let frames = get_varint(&mut body)?; + let flags = get_u8(&mut body)?; + let ts_first = if flags & 1 != 0 { + Some(get_timestamp(&mut body)?) + } else { + None + }; + let ts_last = if flags & 2 != 0 { + Some(get_timestamp(&mut body)?) + } else { + None + }; + entries.push(GroupEntry { + sequence, + offset, + length, + frames, + ts_first, + ts_last, + }); + } + Ok(entries) +} + +fn put_timestamp(buf: &mut BytesMut, ts: Timestamp) -> Result<(), Error> { + // Store the raw (value, scale) so any timescale (e.g. 90kHz video) round-trips exactly. + put_varint(buf, ts.value())?; + put_varint(buf, ts.scale().as_u64())?; + Ok(()) +} + +fn get_timestamp(buf: &mut impl Buf) -> Result { + let value = get_varint(buf)?; + let scale = get_varint(buf)?; + let scale = Timescale::try_from(scale).map_err(|_| Error::Value)?; + Timestamp::new(value, scale).map_err(|_| Error::Value) +} + +fn put_varint(buf: &mut BytesMut, value: u64) -> Result<(), Error> { + VarInt::try_from(value).map_err(|_| Error::Value)?.encode_quic(buf)?; + Ok(()) +} + +fn get_varint(buf: &mut impl Buf) -> Result { + Ok(VarInt::decode_quic(buf)?.into()) +} + +fn get_u8(buf: &mut impl Buf) -> Result { + if buf.remaining() < 1 { + return Err(Error::Truncated); + } + Ok(buf.get_u8()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn ts(value: u64, scale: u64) -> Timestamp { + Timestamp::from_scale(value, scale).unwrap() + } + + fn frame(payload: &[u8], timestamp: Option) -> Frame { + Frame { + timestamp, + payload: Bytes::copy_from_slice(payload), + } + } + + fn group(sequence: u64, frames: Vec) -> Group { + Group { sequence, frames } + } + + /// A small group whose frames carry 90kHz timestamps (a non-micro scale). + fn video_group(sequence: u64, base: u64) -> Group { + group( + sequence, + vec![ + frame(b"keyframe", Some(ts(base, 90_000))), + frame(b"delta", Some(ts(base + 3000, 90_000))), + ], + ) + } + + #[test] + fn round_trip_single_group() { + let g = video_group(7, 0); + let bytes = encode(std::slice::from_ref(&g)).unwrap(); + let segment = Segment::open(bytes).unwrap(); + + assert_eq!(segment.len(), 1); + let decoded = segment.group(7).unwrap().unwrap(); + assert_eq!(decoded, g); + } + + #[test] + fn round_trip_batch_and_entries() { + let batch = vec![video_group(0, 0), video_group(1, 6000), video_group(2, 12000)]; + let bytes = encode(&batch).unwrap(); + let segment = Segment::open(bytes).unwrap(); + + assert_eq!(segment.len(), 3); + // Footer summarizes each group. + for (entry, g) in segment.entries().iter().zip(&batch) { + assert_eq!(entry.sequence, g.sequence); + assert_eq!(entry.frames, g.frames.len() as u64); + assert_eq!(entry.ts_first, g.ts_first()); + assert_eq!(entry.ts_last, g.ts_last()); + } + // Every group decodes back to the original, by sequence and by index. + for (i, g) in batch.iter().enumerate() { + assert_eq!(&segment.group(g.sequence).unwrap().unwrap(), g); + assert_eq!(&segment.group_at(i).unwrap().unwrap(), g); + } + } + + #[test] + fn timestamps_lossless_at_any_scale() { + // A 90kHz tick is not an integer number of micros; raw (value, scale) must survive. + let bytes = encode(&[video_group(0, 1)]).unwrap(); + let segment = Segment::open(bytes).unwrap(); + let decoded = segment.group(0).unwrap().unwrap(); + + let t = decoded.frames[0].timestamp.unwrap(); + assert_eq!(t.value(), 1); + assert_eq!(t.scale().as_u64(), 90_000); + } + + #[test] + fn mixed_and_absent_timestamps() { + let g = group( + 3, + vec![ + frame(b"a", None), + frame(b"b", Some(ts(500, 1_000_000))), + frame(b"c", None), + ], + ); + let bytes = encode(std::slice::from_ref(&g)).unwrap(); + let segment = Segment::open(bytes).unwrap(); + assert_eq!(segment.group(3).unwrap().unwrap(), g); + // ts_first is absent (first frame), ts_last is absent (last frame). + assert_eq!(segment.entries()[0].ts_first, None); + assert_eq!(segment.entries()[0].ts_last, None); + } + + #[test] + fn empty_group_and_empty_batch() { + // A group with no frames, and a segment with no groups, both round-trip. + let g = group(9, vec![]); + let segment = Segment::open(encode(std::slice::from_ref(&g)).unwrap()).unwrap(); + assert_eq!(segment.group(9).unwrap().unwrap(), g); + + let empty = Segment::open(encode(&[]).unwrap()).unwrap(); + assert!(empty.is_empty()); + assert!(empty.group(0).is_none()); + } + + #[test] + fn missing_sequence_is_none() { + let segment = Segment::open(encode(&[video_group(5, 0)]).unwrap()).unwrap(); + assert!(segment.group(6).is_none()); + assert!(segment.group_at(1).is_none()); + } + + #[test] + fn bad_magic_is_rejected() { + let mut bytes = encode(&[video_group(0, 0)]).unwrap().to_vec(); + let n = bytes.len(); + bytes[n - 1] ^= 0xFF; // corrupt the magic + assert!(matches!(Segment::open(Bytes::from(bytes)), Err(Error::BadMagic))); + } + + #[test] + fn truncated_is_rejected() { + let bytes = encode(&[video_group(0, 0)]).unwrap(); + // Drop the trailer entirely. + assert!(Segment::open(bytes.slice(0..4)).is_err()); + // Keep the trailer but lie about the footer length by chopping the middle. + let short = bytes.slice(0..bytes.len() - TRAILER - 1); + assert!(matches!( + Segment::open(short), + Err(Error::Truncated) | Err(Error::BadMagic) + )); + } + + #[test] + fn rollup_concatenates_and_preserves_groups() { + let first = encode(&[video_group(0, 0), video_group(1, 6000)]).unwrap(); + let second = encode(&[video_group(2, 12000), video_group(3, 18000)]).unwrap(); + + let rolled = rollup(&[first, second]).unwrap(); + let segment = Segment::open(rolled).unwrap(); + + // All four groups present, in order, decoding identically to the originals. + assert_eq!(segment.len(), 4); + let expected = [ + video_group(0, 0), + video_group(1, 6000), + video_group(2, 12000), + video_group(3, 18000), + ]; + for (i, g) in expected.iter().enumerate() { + assert_eq!(&segment.group_at(i).unwrap().unwrap(), g); + assert_eq!(&segment.group(g.sequence).unwrap().unwrap(), g); + } + + // Offsets are rewritten to be ascending and non-overlapping in the merged object. + let entries = segment.entries(); + for pair in entries.windows(2) { + assert!(pair[1].offset >= pair[0].offset + pair[0].length); + } + } + + #[test] + fn rollup_of_one_segment_round_trips() { + let batch = vec![video_group(0, 0), video_group(1, 6000)]; + let single = encode(&batch).unwrap(); + let rolled = Segment::open(rollup(std::slice::from_ref(&single)).unwrap()).unwrap(); + for g in &batch { + assert_eq!(&rolled.group(g.sequence).unwrap().unwrap(), g); + } + } + + #[test] + fn rollup_rejects_corrupt_input() { + let good = encode(&[video_group(0, 0)]).unwrap(); + let bad = Bytes::from_static(b"not a segment!!!"); + assert!(rollup(&[good, bad]).is_err()); + } +} diff --git a/rs/moq-net/src/model/cache/store.rs b/rs/moq-net/src/model/cache/store.rs new file mode 100644 index 000000000..a0c7de815 --- /dev/null +++ b/rs/moq-net/src/model/cache/store.rs @@ -0,0 +1,267 @@ +//! Disk and remote cache tiers backed by [`object_store`]. +//! +//! A `Store` persists flush bands from the RAM tier as segments (one object each), serves a group +//! by ranged-reading its blob, and compacts: once the disk tier is over its bounds, the oldest +//! segments roll up into one remote object (or are evicted if there is no remote tier). All the +//! decisions live in the `index` module; this module is the object_store glue. + +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use object_store::{ObjectStore, PutPayload, path::Path}; + +use super::index::{Index, SegmentId, Tier}; +use super::segment::{self, Segment}; +use super::{Batch, Bounds, Group}; + +/// An error from a tiered [`Store`]. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum Error { + /// A segment failed to encode or decode. + #[error(transparent)] + Segment(#[from] segment::Error), + /// The backing object store failed. + #[error(transparent)] + Store(#[from] object_store::Error), +} + +/// A tiered durable store: a disk object store, an optional remote one, and the index mapping +/// group sequences to their location. Bands flushed from the RAM tier land here; old disk segments +/// roll up into the remote tier, or are evicted when there is none. +pub struct Store { + disk: Arc, + remote: Option>, + bounds: Bounds, + prefix: Path, + index: Index, +} + +impl Store { + /// Create a store over `disk` with an optional `remote` tier, keyed under `prefix`. `bounds` + /// caps the disk tier; exceeding the high watermark promotes (or evicts) the oldest segments. + pub fn new(disk: Arc, remote: Option>, prefix: Path, bounds: Bounds) -> Self { + Self { + disk, + remote, + bounds, + prefix, + index: Index::new(), + } + } + + fn store_of(&self, tier: Tier) -> &Arc { + match tier { + // A remote location is only ever recorded when a remote tier is configured (see + // `compact`), so this never falls back. + Tier::Remote => self + .remote + .as_ref() + .expect("a remote location implies a configured remote tier"), + Tier::Disk => &self.disk, + } + } + + fn key(&self, tier: Tier, id: SegmentId) -> Path { + let dir = match tier { + Tier::Disk => "disk", + Tier::Remote => "remote", + }; + self.prefix.child(dir).child(id.to_string()) + } + + /// Persist a flushed band as one disk segment. Does not compact: the caller drives compaction + /// (see [`Self::plan_compaction`]) so the slow remote upload can run without the store lock. + pub async fn flush(&mut self, batch: Batch) -> Result<(), Error> { + if batch.is_empty() { + return Ok(()); + } + let bytes = segment::encode(&batch)?; + let segment = Segment::open(bytes.clone())?; + // Write the object before recording it, so a failed put leaves the index unchanged. + let id = self.index.next_id(); + self.disk + .put(&self.key(Tier::Disk, id), PutPayload::from_bytes(bytes)) + .await?; + let added = self.index.add(Tier::Disk, &segment); + debug_assert_eq!(added, id, "index id drifted from the written key"); + Ok(()) + } + + /// Fetch a group by sequence: locate it, ranged-read its blob, decode it. `None` if not stored. + pub async fn get(&self, sequence: u64) -> Result, Error> { + let Some(loc) = self.index.locate(sequence) else { + return Ok(None); + }; + let end = loc.offset.checked_add(loc.length).ok_or(segment::Error::Truncated)?; + let range: Range = loc.offset..end; + let bytes = self + .store_of(loc.tier) + .get_range(&self.key(loc.tier, loc.segment), range) + .await?; + Ok(Some(segment::group_from_blob(sequence, bytes)?)) + } + + /// Phase 1 of compaction, run **under the store lock**: if the disk tier is over bounds, snapshot + /// a rollup of the oldest segments (read their bytes and build the rolled object), reserving the + /// remote segment id. With no remote tier, drop the oldest disk segments inline instead. Returns + /// the snapshot to [upload](Rollup::upload), or `None` when within bounds or evicted inline. + /// + /// Disk reads happen here (local, fast); only the remote upload is slow, and it runs after this + /// returns so the lock can be released across it. The disk segments stay in place and indexed + /// until [`Self::apply_compaction`], so a concurrent fetch still reads them. + pub async fn plan_compaction(&mut self) -> Result, Error> { + let promoted = self.index.promotion(self.bounds); + if promoted.is_empty() { + return Ok(None); + } + + let Some(remote) = self.remote.clone() else { + // No remote tier: drop the oldest disk segments outright (local deletes, fast). + for id in &promoted { + self.disk.delete(&self.key(Tier::Disk, *id)).await?; + } + self.index.evict(&promoted); + return Ok(None); + }; + + // Read the promoted disk segments whole and roll them into one. + let mut segments = Vec::with_capacity(promoted.len()); + for id in &promoted { + let bytes = self.disk.get(&self.key(Tier::Disk, *id)).await?.bytes().await?; + segments.push(bytes); + } + let rolled = segment::rollup(&segments)?; + let segment = Segment::open(rolled.clone())?; + // Reserve the id (and thus the key) the upload writes to. Only the flush task mutates the + // index, so `next_id` is stable until `apply_compaction` consumes it. + let new_id = self.index.next_id(); + let key = self.key(Tier::Remote, new_id); + Ok(Some(Rollup { + promoted, + rolled, + segment, + new_id, + remote, + key, + })) + } + + /// Phase 3 of compaction, run **under the store lock** after [`Rollup::upload`] succeeds: repoint + /// the index at the uploaded remote object and delete the now-orphaned disk segments. + pub async fn apply_compaction(&mut self, rollup: Rollup) -> Result<(), Error> { + let applied = self.index.apply_promotion(&rollup.promoted, &rollup.segment); + debug_assert_eq!(applied, rollup.new_id, "index id drifted from the uploaded key"); + // Best-effort cleanup; an index now pointing at remote makes any leftover disk objects + // orphans, not inconsistency. + for id in &rollup.promoted { + self.disk.delete(&self.key(Tier::Disk, *id)).await?; + } + Ok(()) + } + + /// Bring the disk tier within bounds in one call, holding the lock across the remote upload. + /// Convenience for callers that don't need to release the lock (tests, single-tier setups); the + /// tiered cache path uses [`plan_compaction`](Self::plan_compaction) / + /// [`Rollup::upload`] / [`apply_compaction`](Self::apply_compaction) so a slow remote upload + /// doesn't block fetches. + pub async fn compact(&mut self) -> Result<(), Error> { + if let Some(rollup) = self.plan_compaction().await? { + rollup.upload().await?; + self.apply_compaction(rollup).await?; + } + Ok(()) + } +} + +/// A snapshot of a planned disk -> remote rollup, taken under the store lock by +/// [`Store::plan_compaction`] so the slow remote upload can run without holding it. The rolled +/// bytes are reference-counted, so the snapshot is cheap to carry across the unlocked upload. +pub struct Rollup { + promoted: Vec, + rolled: Bytes, + segment: Segment, + new_id: SegmentId, + remote: Arc, + key: Path, +} + +impl Rollup { + /// Upload the rolled object to the remote tier. Run this **without** holding the store lock; the + /// index still points at the (intact) disk segments until [`Store::apply_compaction`], so a + /// failed upload is a safe no-op to retry and a concurrent fetch is unaffected. + pub async fn upload(&self) -> Result<(), Error> { + self.remote + .put(&self.key, PutPayload::from_bytes(self.rolled.clone())) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::super::{Frame, Limit}; + use super::*; + use bytes::Bytes; + use object_store::memory::InMemory; + + /// A one-frame untimed group of `bytes` bytes at the given sequence. + fn group(sequence: u64, bytes: usize) -> Group { + Group { + sequence, + frames: vec![Frame { + timestamp: None, + payload: Bytes::from(vec![sequence as u8; bytes]), + }], + } + } + + fn memory() -> Arc { + Arc::new(InMemory::new()) + } + + #[tokio::test] + async fn flush_and_get_from_disk() { + let mut store = Store::new(memory(), None, Path::from("cache"), Bounds::default()); + store.flush(vec![group(0, 10), group(1, 20)]).await.unwrap(); + + assert_eq!(store.get(0).await.unwrap().unwrap(), group(0, 10)); + assert_eq!(store.get(1).await.unwrap().unwrap(), group(1, 20)); + assert!(store.get(99).await.unwrap().is_none()); + } + + #[tokio::test] + async fn promotes_to_remote_over_budget() { + // Segments are ~1 KB; keep ~1 in disk (min 1100), promote at 2 (max 2000). + let bounds = Bounds::new(Limit::bytes(1100), Limit::bytes(2000)); + let mut store = Store::new(memory(), Some(memory()), Path::from("cache"), bounds); + + for seq in 0..5 { + store.flush(vec![group(seq, 1000)]).await.unwrap(); + store.compact().await.unwrap(); + } + + // Every group is still readable, whether it stayed on disk or rolled up to remote. + for seq in 0..5 { + assert_eq!(store.get(seq).await.unwrap().unwrap(), group(seq, 1000)); + } + // Some bytes ended up in the remote tier. + assert!(store.index.bytes(Tier::Remote) > 0); + } + + #[tokio::test] + async fn evicts_oldest_without_remote() { + let bounds = Bounds::new(Limit::bytes(1100), Limit::bytes(2000)); + let mut store = Store::new(memory(), None, Path::from("cache"), bounds); + + for seq in 0..5 { + store.flush(vec![group(seq, 1000)]).await.unwrap(); + store.compact().await.unwrap(); + } + + // The newest group is retained; the oldest was evicted (no remote to promote into). + assert_eq!(store.get(4).await.unwrap().unwrap(), group(4, 1000)); + assert!(store.get(0).await.unwrap().is_none()); + } +} diff --git a/rs/moq-net/src/model/group.rs b/rs/moq-net/src/model/group.rs index 470bca67d..b9c63eb69 100644 --- a/rs/moq-net/src/model/group.rs +++ b/rs/moq-net/src/model/group.rs @@ -185,6 +185,19 @@ impl GroupProducer { self.timescale } + /// The media timestamp of the last buffered frame, for the track cache's media-time + /// retention gate. `None` when the group has no frames yet or the track is untimed. + /// Reads the producer's own buffer, so it never blocks. + pub(crate) fn last_timestamp(&self) -> Option { + self.state.read().frames.back().and_then(|f| f.timestamp) + } + + /// Whether the group has been finished (no more frames will be written). The track cache only + /// spills finished groups: draining an open one would park the flush task until it completes. + pub(crate) fn is_finished(&self) -> bool { + self.state.read().fin + } + /// A helper method to write a frame from a single byte buffer. /// /// If you want to write multiple chunks, use [Self::create_frame] to get a frame producer. diff --git a/rs/moq-net/src/model/mod.rs b/rs/moq-net/src/model/mod.rs index 27258d847..2cd55b13c 100644 --- a/rs/moq-net/src/model/mod.rs +++ b/rs/moq-net/src/model/mod.rs @@ -8,6 +8,10 @@ mod subscription; mod time; mod track; +/// Per-track durable cache: the disk/remote spill tiers below a track's live RAM window. +/// Attached via [`TrackProducer::with_cache`]. Namespaced: `cache::Disk`, `cache::Group`. +pub mod cache; + pub use bandwidth::*; pub use broadcast::*; pub use compression::*; diff --git a/rs/moq-net/src/model/track.rs b/rs/moq-net/src/model/track.rs index 173df8668..994fd6280 100644 --- a/rs/moq-net/src/model/track.rs +++ b/rs/moq-net/src/model/track.rs @@ -15,7 +15,7 @@ use crate::{Error, Result, Subscription, Timescale, coding}; -use super::{Fetch, Group, GroupConsumer, GroupProducer}; +use super::{Fetch, Group, GroupConsumer, GroupProducer, cache}; use std::{ collections::{HashSet, VecDeque}, @@ -187,6 +187,13 @@ struct TrackState { // uncached groups, so a cache-miss `fetch` on an accepted track fails fast // instead of blocking forever (mirrors `BroadcastState::dynamic`). dynamic: usize, + + // Optional durable spill below the live `groups` window: groups aged out of `groups` are + // serialized to the disk tier, and a fetch that misses `groups` reads them back. Shared by the + // producer and every consumer through this state. Native-only (object_store doesn't build on + // wasm). Set via `TrackProducer::with_cache`. + #[cfg(not(target_arch = "wasm32"))] + cache: Option, } impl TrackState { @@ -384,13 +391,37 @@ impl TrackState { } } - /// Evict groups older than `max_age`, never evicting the max_sequence group. + /// Evict groups that fall outside the retention window by either gate, never evicting the + /// max_sequence group. + /// + /// Two gates, both sized by `max_age`; a group is evicted when it trips either: + /// - **wall-clock**: it was received more than `max_age` ago. The hard memory backstop, so a + /// publisher can't pin RAM by lying about media timestamps. A finished group aged out this + /// way is archived to the cache (when one is attached) so a later fetch reads it back. + /// - **media-time**: its last frame's media timestamp is more than `max_age` behind the live + /// media edge. Drops a stale arrival (a group whose media is already past the window the + /// instant it lands, e.g. a startup burst or a lagging publisher) rather than spending RAM on + /// media too old to serve. A media-stale group is not archived. /// - /// Groups are in arrival order, so we can stop early when we hit a non-expired, - /// non-max_sequence group (everything after it arrived even later). - /// When max_sequence is at the front, we skip past it and tombstone expired groups - /// behind it. + /// An unfinished evicted group is [aborted](GroupProducer::abort): a producer still filling it + /// (a wire receive loop downloading a group already too stale to keep) stops and releases its + /// buffers instead of finishing a group we will immediately drop. + /// + /// Groups arrive in wall-clock order, but a late out-of-order group can be media-expired + /// anywhere in the deque, so this scans the whole (small) window rather than breaking early. fn evict_expired(&mut self, now: web_async::time::Instant, max_age: Duration) { + // The live media edge: the newest frame timestamp across buffered groups. + let media_now = self + .groups + .iter() + .flatten() + .filter_map(|(group, _)| group.last_timestamp()) + .max(); + + let mut removed = Vec::new(); + #[cfg(not(target_arch = "wasm32"))] + let mut evicted = Vec::new(); + for slot in self.groups.iter_mut() { let Some((group, created_at)) = slot else { continue }; @@ -398,14 +429,43 @@ impl TrackState { continue; } - if now.duration_since(*created_at) <= max_age { - break; + let wall_expired = now.duration_since(*created_at) > max_age; + let media_expired = match (media_now, group.last_timestamp()) { + (Some(latest), Some(ts)) => Duration::from(latest).saturating_sub(Duration::from(ts)) > max_age, + _ => false, + }; + if !wall_expired && !media_expired { + continue; } - self.duplicates.remove(&group.sequence); + removed.push(group.sequence); + if group.is_finished() { + // A finished group that aged out by wall-clock is archived to the cache. A finished + // group that is only media-stale (a deliberately fetched old group, or a brief live + // group) is dropped without archiving and without aborting, so a consumer still + // reading it is unaffected. + #[cfg(not(target_arch = "wasm32"))] + if wall_expired && self.cache.is_some() { + evicted.push(group.consume()); + } + } else { + // An unfinished group is dropped from RAM regardless. Abort it so a producer still + // filling it (e.g. a wire receive loop downloading a group already too stale to + // serve) stops wasting bandwidth and releases its buffers, rather than finishing a + // group we will never keep. + let _ = group.abort(Error::Old); + } *slot = None; } + for sequence in removed { + self.duplicates.remove(&sequence); + } + #[cfg(not(target_arch = "wasm32"))] + if let Some(cache) = &self.cache { + cache.evict(evicted); + } + // Trim leading tombstones to advance the offset. while let Some(None) = self.groups.front() { self.groups.pop_front(); @@ -691,6 +751,21 @@ impl TrackProducer { } } + /// Attach a durable disk (and optional remote) cache below this track's live window. + /// + /// Groups aged out of the live `groups` window are serialized to `disk` instead of just + /// dropped, and a [`fetch_group`](TrackConsumer::fetch_group) that misses the live window reads + /// them back from disk (or the rolled-up remote tier). The cache lives on the shared track + /// state, so every [`TrackConsumer`] of this track serves from it automatically. Native-only + /// (`object_store` does not build on wasm). + #[cfg(not(target_arch = "wasm32"))] + pub fn with_cache(self, disk: cache::Disk) -> Self { + if let Ok(mut state) = self.modify() { + state.cache = Some(cache::Tiers::spawn(disk)); + } + self + } + /// Block until the aggregate subscription changes, then return the new value. /// /// Yields the most demanding request across all live subscribers, or `None` @@ -995,9 +1070,9 @@ impl TrackConsumer { })) } - /// Return a cached group by sequence without blocking, or `None` if it isn't in - /// the cache. Use [`Self::fetch_group`] to wait for a group that a [`TrackDynamic`] - /// will serve on demand. + /// Return a live-cached group by sequence without blocking, or `None` if it isn't in the live + /// window. A group spilled to the durable cache is only reachable via the async + /// [`Self::fetch_group`], as is one a [`TrackDynamic`] serves on demand. pub fn get_group(&self, sequence: u64) -> Option { self.state.read().cached_group(sequence) } @@ -1005,12 +1080,12 @@ impl TrackConsumer { /// Fetch a single past group, without holding a live subscription. /// /// Returns a [`kio::Pending`] that resolves to the [`GroupConsumer`]: - /// immediately if the group is cached, otherwise once a [`TrackDynamic`] serves - /// the request (a wire FETCH for a relay). `options` accepts `None`, a [`Fetch`], - /// or `Fetch::default()`. + /// immediately if the group is in the live window, otherwise once it is read back from the + /// durable cache (when one is attached) or a [`TrackDynamic`] serves the request (a wire FETCH + /// for a relay). `options` accepts `None`, a [`Fetch`], or `Fetch::default()`. /// /// Fails synchronously with [`Error::NotFound`] when the group can never be served - /// (past the final sequence, or no [`TrackDynamic`] on the track), or the track's + /// (past the final sequence, or no cache and no [`TrackDynamic`] on the track), or the track's /// abort error if it's already closed. pub fn fetch_group(&self, sequence: u64, options: impl Into>) -> Result> { let options = options.into().unwrap_or_default(); @@ -1020,21 +1095,49 @@ impl TrackConsumer { .write() .map_err(|s| s.abort.clone().unwrap_or(Error::Dropped))?; match state.poll_fetch(sequence) { - // Cached: the pending resolves immediately, no handler needed. + // Cached live: the pending resolves immediately from state, no lookup needed. Poll::Ready(Ok(_)) => {} - // Unservable (NotFound) or already aborted: report it synchronously. - Poll::Ready(Err(err)) => return Err(err), - // A handler exists but the group isn't cached yet: queue it. - Poll::Pending => state.fetches.push_back(GroupRequested { - sequence, - priority: options.priority, - }), + // Live miss. If a durable cache is attached and the group could still exist, spawn an + // async lookup across its disk/remote tiers. The returned `TrackFetch` resolves from it + // on a hit; on a miss the lookup task chains upstream (queues for a `TrackDynamic`) and + // the fetch falls through to that live decision. + other => { + #[cfg(not(target_arch = "wasm32"))] + { + // A group past the final sequence can never exist in any tier, and an aborted + // track is terminal, so skip the cache and report those synchronously below. + let exhausted = state.abort.is_some() || state.final_sequence.is_some_and(|fin| sequence >= fin); + if !exhausted && let Some(tiers) = &state.cache { + let timescale = state.info.as_ref().and_then(|info| info.timescale); + let lookup = + spawn_cache_lookup(tiers, self.state.clone(), sequence, options.priority, timescale); + drop(state); + return Ok(kio::Pending::new(TrackFetch { + state: self.state.clone(), + sequence, + lookup: Some(lookup), + })); + } + } + match other { + // Unservable (NotFound) or already aborted: report it synchronously. + Poll::Ready(Err(err)) => return Err(err), + // A handler exists but the group isn't cached yet: queue it. + Poll::Pending => state.fetches.push_back(GroupRequested { + sequence, + priority: options.priority, + }), + Poll::Ready(Ok(_)) => unreachable!("handled above"), + } + } } drop(state); Ok(kio::Pending::new(TrackFetch { state: self.state.clone(), sequence, + #[cfg(not(target_arch = "wasm32"))] + lookup: None, })) } @@ -1159,20 +1262,85 @@ impl GroupRequest { } } +/// A pending durable-cache lookup spawned for a [`TrackFetch`] on a live miss. The background task +/// writes [`Done`](CacheLookup::Done) once the disk/remote tiers resolve (the group, or `None` on a +/// miss). Native-only (the durable cache doesn't build on wasm). +#[cfg(not(target_arch = "wasm32"))] +enum CacheLookup { + /// The lookup task is still reading the tiers. + Pending, + /// The lookup finished: the rebuilt group, or `None` on a miss. + Done(Option), +} + +/// Spawn a background task that reads `sequence` from the cache's disk/remote tiers, rebuilds it at +/// `timescale`, and publishes the result through the returned slot. +/// +/// On a tier miss the task chains upstream: it queues the request for a [`TrackDynamic`] (a wire +/// FETCH for a relay) when one exists, so the [`TrackFetch`] then resolves once upstream serves it. +/// Queuing only after the store misses keeps the store the fast path and avoids a redundant +/// upstream fetch when the group is already cached. +#[cfg(not(target_arch = "wasm32"))] +fn spawn_cache_lookup( + tiers: &cache::Tiers, + state: kio::Consumer, + sequence: u64, + priority: u8, + timescale: Option, +) -> kio::Consumer { + let slot = kio::Producer::new(CacheLookup::Pending); + let consumer = slot.consume(); + let store = tiers.store_handle(); + web_async::spawn(async move { + let group = match store.read().await.get(sequence).await { + Ok(Some(group)) => group.produce(timescale).ok(), + // Miss, tier read error, or rebuild failure: report a miss so the fetch falls through. + Ok(None) | Err(_) => None, + }; + if group.is_none() + && let Ok(mut state) = state.write() + && state.dynamic > 0 + { + // Cache miss: chain upstream so a handler fetches the group into the live window. + state.fetches.push_back(GroupRequested { sequence, priority }); + } + if let Ok(mut slot) = slot.write() { + *slot = CacheLookup::Done(group); + } + }); + consumer +} + /// The pollable state of a [`TrackConsumer::fetch_group`]. /// -/// Awaited via the [`kio::Pending`] wrapper; resolves to the -/// [`GroupConsumer`] once the group lands in the track's cache (already present, -/// or produced after a wire FETCH), or [`Error::NotFound`] if it can never exist. +/// Awaited via the [`kio::Pending`] wrapper; resolves to the [`GroupConsumer`] once the group is +/// read back from the durable cache, lands in the live window (e.g. after a wire FETCH), or +/// [`Error::NotFound`] if it can never exist. pub struct TrackFetch { state: kio::Consumer, sequence: u64, + /// A durable-cache lookup spawned on a live miss. On a hit it resolves the fetch; on a miss the + /// poll falls through to the live state. Native-only. + #[cfg(not(target_arch = "wasm32"))] + lookup: Option>, } impl kio::Future for TrackFetch { type Output = Result; fn poll(&self, waiter: &kio::Waiter) -> Poll { + // A durable-cache lookup, if one was spawned, resolves the fetch on a hit. On a miss (or if + // the task died without publishing) fall through to the live state below. + #[cfg(not(target_arch = "wasm32"))] + if let Some(lookup) = &self.lookup { + let resolved = ready!(lookup.poll(waiter, |slot| match &**slot { + CacheLookup::Pending => Poll::Pending, + CacheLookup::Done(group) => Poll::Ready(group.clone()), + })); + if let Ok(Some(group)) = resolved { + return Poll::Ready(Ok(group)); + } + } // `poll_fetch` already yields a `Result` (group, or NotFound / // abort); the outer error is the channel closing without one. Poll::Ready( @@ -1624,6 +1792,179 @@ mod test { } } + /// A disk-backed cache over an in-memory object store, retaining 1s of live groups. + #[cfg(not(target_arch = "wasm32"))] + fn disk_cached_producer() -> TrackProducer { + use object_store::memory::InMemory; + use object_store::path::Path; + let disk = cache::Disk::new(Arc::new(InMemory::new()), Path::from("test"), cache::Bounds::default()); + TrackProducer::new("test", TrackInfo::default().with_cache(Duration::from_secs(1))).with_cache(disk) + } + + /// Write+finish a single-frame group at the next sequence. + fn write_group(producer: &mut TrackProducer, payload: &'static [u8]) { + let mut group = producer.append_group().unwrap(); + group.write_frame(bytes::Bytes::from_static(payload)).unwrap(); + group.finish().unwrap(); + } + + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test] + async fn get_group_does_not_read_disk() { + tokio::time::pause(); + let mut producer = disk_cached_producer(); + write_group(&mut producer, b"hello"); // seq 0 + + // seq 0 is live: get_group sees it synchronously. + assert!(producer.consume().get_group(0).is_some()); + + // Age seq 0 out of the live window; it spills to disk. + tokio::time::advance(Duration::from_secs(2)).await; + write_group(&mut producer, b"world"); // seq 1, evicts seq 0 + + // get_group is sync and never reads disk, so an evicted group is a miss there. + assert!(producer.consume().get_group(0).is_none()); + assert!(producer.consume().get_group(1).is_some()); + } + + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test] + async fn fetch_group_serves_evicted_group_from_disk() { + tokio::time::pause(); + let mut producer = disk_cached_producer(); + write_group(&mut producer, b"hello"); // seq 0 + + // Age seq 0 out of the live window so it is flushed to the disk tier. + tokio::time::advance(Duration::from_secs(2)).await; + write_group(&mut producer, b"world"); // seq 1, evicts seq 0 + + let consumer = producer.consume(); + + // The background flush is async; retry the fetch until the disk write lands. With no + // TrackDynamic, a disk miss resolves to NotFound, so a failed fetch just means "not yet". + let mut served = None; + for _ in 0..500 { + if let Ok(group) = consumer.fetch_group(0, None).unwrap().await { + served = Some(group); + break; + } + tokio::task::yield_now().await; + } + let mut group = served.expect("evicted group served from disk"); + assert_eq!(group.sequence, 0); + assert_eq!( + group.read_frame().await.unwrap().unwrap(), + bytes::Bytes::from_static(b"hello") + ); + } + + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test] + async fn fetch_chains_upstream_on_cache_miss() { + // A group that is neither live nor in the cache must still reach a TrackDynamic: the cache + // miss chains upstream rather than dead-ending in NotFound. + let producer = disk_cached_producer(); + let dynamic = producer.dynamic(); + let consumer = producer.consume(); + + let fetch = consumer.fetch_group(5, None).unwrap(); + + // The async cache miss queues the request, which the handler then receives and serves. + let request = dynamic.requested_group().await.unwrap(); + assert_eq!(request.sequence(), 5); + let mut group = request.accept(None).unwrap(); + group.write_frame(bytes::Bytes::from_static(b"upstream")).unwrap(); + group.finish().unwrap(); + + let mut served = fetch.await.unwrap(); + assert_eq!(served.sequence, 5); + assert_eq!( + served.read_frame().await.unwrap().unwrap(), + bytes::Bytes::from_static(b"upstream") + ); + } + + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test] + async fn fetch_cache_miss_without_dynamic_is_not_found() { + // A cache miss with no handler to chain to resolves NotFound, not a hang. + let producer = disk_cached_producer(); + let consumer = producer.consume(); + assert!(matches!( + consumer.fetch_group(5, None).unwrap().await, + Err(Error::NotFound) + )); + } + + #[cfg(not(target_arch = "wasm32"))] + #[tokio::test] + async fn unfinished_evicted_group_is_not_spilled() { + tokio::time::pause(); + let mut producer = disk_cached_producer(); + + // An open (never-finished) group, kept open by holding its producer handle. + let mut open = producer.append_group().unwrap(); // seq 0 + open.write_frame(bytes::Bytes::from_static(b"partial")).unwrap(); + + // Age it out of the live window; it is dropped, not handed to the flush task (draining an + // open group would park the task forever). + tokio::time::advance(Duration::from_secs(2)).await; + write_group(&mut producer, b"next"); // seq 1, evicts the open seq 0 + + let consumer = producer.consume(); + for _ in 0..50 { + tokio::task::yield_now().await; + } + // seq 0 is gone from RAM and was never spilled, and there is no dynamic: a clean NotFound, + // not a hang. + assert!(matches!( + consumer.fetch_group(0, None).unwrap().await, + Err(Error::NotFound) + )); + } + + #[tokio::test] + async fn media_stale_unfinished_group_is_aborted() { + tokio::time::pause(); + let scale = crate::Timescale::new(1_000_000).unwrap(); // microseconds + + fn timed_frame(group: &mut GroupProducer, scale: crate::Timescale, micros: u64, payload: &'static [u8]) { + let info = crate::Frame { + size: payload.len() as u64, + timestamp: Some(crate::Timestamp::new(micros, scale).unwrap()), + }; + let mut frame = group.create_frame(info).unwrap(); + frame.write(bytes::Bytes::from_static(payload)).unwrap(); + frame.finish().unwrap(); + } + + // Retain 10s of media (and wall-clock); no time advances, so only the media gate can fire. + let mut producer = TrackProducer::new( + "test", + TrackInfo::default() + .with_timescale(scale) + .with_cache(Duration::from_secs(10)), + ); + + // A stale group still being received: one frame at media t=0, left open (producer held). + let mut stale = producer.create_group(Group { sequence: 0 }).unwrap(); + let mut reader = stale.consume(); + timed_frame(&mut stale, scale, 0, b"old"); + + // The live edge jumps media time to 20s, well past the 10s window. + let mut edge = producer.create_group(Group { sequence: 1 }).unwrap(); + timed_frame(&mut edge, scale, 20_000_000, b"new"); + edge.finish().unwrap(); + + // A further insert runs eviction with media_now = 20s; the open group 0 (t=0) is >10s stale. + producer.append_group().unwrap(); // seq 2 + + // The stale, still-open group was aborted (Error::Old), signaling its receiver to stop, and + // it is gone from the live window. + assert!(matches!(reader.read_frame().await, Err(Error::Old))); + assert!(producer.consume().get_group(0).is_none()); + } + #[tokio::test] async fn no_eviction_when_fresh() { tokio::time::pause();