diff --git a/.claude/board/AGENT_LOG.md b/.claude/board/AGENT_LOG.md index 41f10847..098b9039 100644 --- a/.claude/board/AGENT_LOG.md +++ b/.claude/board/AGENT_LOG.md @@ -4,6 +4,13 @@ Append-only log of agent sessions. Prepend new entries at the top. --- +## 2026-04-24T16:30 — Supabase subscriber v2 (sonnet, claude/supabase-subscriber-wire-up) + +**D-ids:** DM-4a/b/c, DM-6a/b +**Commit:** `ec3b5c7` +**Tests:** 17 pass with realtime feature (13 without); 5 new tests total (4 in version_watcher.rs, 1 subscribe_receives_on_project in lance_membrane.rs) +**Outcome:** Wired LanceMembrane::subscribe() from Phase-A disconnected mpsc stub to live tokio::sync::watch::Receiver under [realtime] feature. project() now calls watcher.bump(row.clone()) on every projected cycle. DrainTask scaffold (Poll::Pending) ships unconditionally. Tokio was already a dep — no Cargo.toml changes needed. PR 255: https://github.com/AdaWorldAPI/lance-graph/pull/255 + ## 2026-04-24T16:30 — Archetype scaffold v2 (sonnet, claude/archetype-crate-scaffold) **D-ids:** DU-2.1..2.6 diff --git a/.claude/board/EPIPHANIES.md b/.claude/board/EPIPHANIES.md index d2b31198..6c0b229a 100644 --- a/.claude/board/EPIPHANIES.md +++ b/.claude/board/EPIPHANIES.md @@ -66,6 +66,12 @@ stay as historical references. ## Entries (reverse chronological) +## 2026-04-24 — FINDING: subscribe() wired; LanceVersionWatcher delivers always-latest CognitiveEventRow to subscribers (DM-4/6) + +`LanceMembrane::subscribe()` now returns a `tokio::sync::watch::Receiver` under the `[realtime]` feature gate — supabase-shape always-latest semantics. `project()` calls `watcher.bump(row)` after building the scalar row; subscribers observe the latest committed event without polling. `DrainTask` scaffold ships unconditionally (no feature gate) as a `Future` shell for the follow-up `steering_intent` drain loop. Tokio was already an optional dep in `lance-graph-callcenter/Cargo.toml` under `[realtime]` — no new deps required. + +**Status:** FINDING + ## 2026-04-24 — Vsa16kF32 switchboard carrier shipped (CrystalFingerprint::Vsa16kF32 + 16K algebra) **Status:** FINDING diff --git a/.claude/board/INTEGRATION_PLANS.md b/.claude/board/INTEGRATION_PLANS.md index b226b2bc..d3c2450f 100644 --- a/.claude/board/INTEGRATION_PLANS.md +++ b/.claude/board/INTEGRATION_PLANS.md @@ -36,6 +36,19 @@ --- +## v1 — Supabase Subscriber Wire-up (authored 2026-04-24) + +**Author:** sonnet agent, session 2026-04-24 (branch claude/supabase-subscriber-wire-up) +**Scope:** Flip `LanceMembrane::subscribe()` from Phase-A stub to a live `tokio::sync::watch::Receiver` wired to `LanceVersionWatcher`; ship `DrainTask` scaffold. +**Path:** `.claude/plans/supabase-subscriber-v1.md` +**Deliverables:** DM-4a swap Subscription type, DM-4b `version_watcher.rs`, DM-4c uncomment `pub mod version_watcher`, DM-6a `drain.rs` scaffold, DM-6b uncomment `pub mod drain`. + +**Status (2026-04-24):** In PR. All deliverables in branch `claude/supabase-subscriber-wire-up`. + +**Confidence (2026-04-24):** FINDING — 17 tests pass (13 without realtime, 17 with; 4 new tests in `version_watcher.rs`, 1 new `subscribe_receives_on_project` in `lance_membrane.rs`). Zero regressions. + +--- + ## v1 — Unified Integration: PersonaHub × ONNX × Archetype × MM-CoT × RoleDB (authored 2026-04-23) **Author:** main-thread session 2026-04-23 diff --git a/.claude/board/STATUS_BOARD.md b/.claude/board/STATUS_BOARD.md index 88c81c37..b28d64fc 100644 --- a/.claude/board/STATUS_BOARD.md +++ b/.claude/board/STATUS_BOARD.md @@ -272,9 +272,9 @@ pattern IS the Supabase-shape transcode approach). |---|---|---|---| | DM-2 | `LanceMembrane: ExternalMembrane` impl with `project()` + compile-time BBB leak test | **In progress** | Phase A shipped `9a8d6a0` — `LanceMembrane` struct + `project()` + `ingest()` + `subscribe()` stub. Phase B: full Lance append + version counter pending DM-4. | | DM-3 | `CommitFilter` → DataFusion `Expr` translator (`[query]` feature) | **Queued** | — | -| DM-4 | `LanceVersionWatcher` — tails Lance version counter, emits Phoenix `postgres_changes` (`[realtime]`) | **Queued** | — | +| DM-4 | `LanceVersionWatcher` — tails Lance version counter, emits Phoenix `postgres_changes` (`[realtime]`) | **In PR** | branch `claude/supabase-subscriber-wire-up` — DM-4a/b/c: `version_watcher.rs` (117 LOC, 4 tests), `lib.rs` `pub mod version_watcher`, `LanceMembrane::watcher` field + `project()` calls `bump()`, `subscribe()` returns `watch::Receiver`. | | DM-5 | `PhoenixServer` — minimal WS server, Phoenix channel subset (`[realtime]`) | **Queued** | Resolve UNKNOWN-2 (which consumers need Phoenix wire?) first | -| DM-6 | `DrainTask` — `steering_intent` Lance read → `UnifiedStep` → `OrchestrationBridge::route()` | **Queued** | — | +| DM-6 | `DrainTask` — `steering_intent` Lance read → `UnifiedStep` → `OrchestrationBridge::route()` | **In PR** | branch `claude/supabase-subscriber-wire-up` — DM-6a/b scaffold: `drain.rs` (89 LOC, 2 tests), `lib.rs` `pub mod drain`, `Poll::Pending` until follow-up PR wires real drain loop. | | DM-7 | `JwtMiddleware` + `ActorContext` → `LogicalPlan` RLS rewriter (`[auth]`) | **Queued** | Resolve UNKNOWN-3 (pgwire?) + UNKNOWN-4 (actor_id type) first | | DM-8 | `PostgRestHandler` — query-string → DataFusion SQL → Lance scan → Arrow response (`[serve]`) | **Queued** | Confirm PostgREST compat needed (§ 8 stop point 4) before building | | DM-9 | End-to-end test: shader fires → `LanceMembrane::project()` → Lance append → Phoenix subscriber receives event | **Queued** | Depends on DM-2 through DM-6 | diff --git a/.claude/plans/supabase-subscriber-v1.md b/.claude/plans/supabase-subscriber-v1.md new file mode 100644 index 00000000..d8ebc3e2 --- /dev/null +++ b/.claude/plans/supabase-subscriber-v1.md @@ -0,0 +1,76 @@ +# Supabase-shape Subscriber Flow Wire-up — v1 + +> **Status:** In progress (2026-04-24) +> **Owner:** @callcenter-specialist, @bus-compiler +> **Scope:** `lance-graph-callcenter` crate only (plus `external_membrane.rs` if trait surface bends) +> **Depends on:** none (substrate-independent) + +## Goal + +Flip `LanceMembrane::subscribe()` from GHOST to PARTIAL. Ship DM-4 `LanceVersionWatcher` + DM-6 `DrainTask`. Close the Outside-BBB loop so Lance dataset version bumps fire notifications to subscribers with filtered `CognitiveEventRow` payloads. + +## Deliverables + +- **DM-4a** — Swap `Subscription` associated type from `mpsc::Receiver` to `tokio::sync::watch::Receiver` in `lance_membrane.rs`. +- **DM-4b** — Create `crates/lance-graph-callcenter/src/version_watcher.rs`. Holds the `watch::Sender`; `bump(row)` on each `project()` commit. +- **DM-4c** — Uncomment `pub mod version_watcher` in `lib.rs:71-72`; export `LanceVersionWatcher`. +- **DM-5a** — `subscribe(filter)` returns the `watch::Receiver` wrapped with a `CommitFilter` predicate combinator. +- **DM-6a** — Create `crates/lance-graph-callcenter/src/drain.rs`. Scaffold only — `DrainTask` struct + `drain()` method that currently returns `Poll::Pending`. Wiring to `OrchestrationBridge::route()` is the follow-up PR. +- **DM-6b** — Uncomment `pub mod drain` in `lib.rs:78-79`; export `DrainTask`. +- **DM-7** — Flip test `subscribe_returns_disconnected_receiver` to `subscribe_receives_on_project` — assert `rx.borrow().version > 0` after a `project()` call. + +## Non-goals (explicit) + +- `dialect` Phase-B source wiring — separate TECH_DEBT row (@callcenter-specialist). +- `scent` Phase-C CAM-PQ cascade — blocked on substrate migration (PR B, pending). +- `PhoenixServer` DM-5 — Queued separately. +- `DrainTask` runtime drain of `steering_intent` — this PR ships only the scaffold. + +## Acceptance criteria + +- `cargo test -p lance-graph-callcenter --lib` — 11 existing tests pass + new `subscribe_receives_on_project` test passes. Zero regressions. +- `bbb_scalar_only_compile_check` still compiles. +- `cargo check --workspace` compiles. +- Verdict flip in `.claude/plans/unified-integration-v1.md §6`: Supabase row `GHOST` → `PARTIAL` (one-line Edit on the table row). +- `.claude/board/INTEGRATION_PLANS.md` — prepend entry pointing to this plan file. +- `.claude/board/STATUS_BOARD.md` — DM-4 / DM-6 rows status updated. +- `.claude/board/EPIPHANIES.md` — prepend short FINDING entry noting subscribe wire-up. + +## Architecture notes + +Per CLAUDE.md BBB invariant, `Subscription` must carry Arrow-scalar content only — `CognitiveEventRow` is the canonical outbound DTO and is already scalar-only (compile-time enforced by `bbb_scalar_only_compile_check`). `tokio::sync::watch` is the right primitive for the supabase-realtime-shaped fan-out: single-producer (the membrane), many-consumer (subscribers), always-latest semantics (skip stale revisions). + +Implementation sketch: + +```rust +// version_watcher.rs +pub struct LanceVersionWatcher { + tx: tokio::sync::watch::Sender, +} +impl LanceVersionWatcher { + pub fn new(initial: CognitiveEventRow) -> Self { ... } + pub fn bump(&self, row: CognitiveEventRow) { let _ = self.tx.send(row); } + pub fn subscribe(&self) -> tokio::sync::watch::Receiver { self.tx.subscribe() } +} +``` + +The `CommitFilter` wrapper is NOT a new trait — it's a method `LanceMembrane::subscribe_filtered(filter: CommitFilter) -> impl Stream` that calls `self.watcher.subscribe()` and uses `tokio_stream::wrappers::WatchStream::new(rx).filter(move |row| filter.matches(row))`. + +## File-level edits (full list) + +1. `crates/lance-graph-callcenter/Cargo.toml` — add `tokio = { workspace = true, features = ["sync"] }` if not present; add `tokio-stream = { workspace = true }` (minimal features). +2. `crates/lance-graph-callcenter/src/version_watcher.rs` — NEW. +3. `crates/lance-graph-callcenter/src/drain.rs` — NEW (scaffold). +4. `crates/lance-graph-callcenter/src/lib.rs:71-72` — uncomment `pub mod version_watcher`. +5. `crates/lance-graph-callcenter/src/lib.rs:78-79` — uncomment `pub mod drain`. +6. `crates/lance-graph-callcenter/src/lance_membrane.rs:56` — field `watcher: LanceVersionWatcher`. +7. `crates/lance-graph-callcenter/src/lance_membrane.rs:117` — `type Subscription = watch::Receiver`. +8. `crates/lance-graph-callcenter/src/lance_membrane.rs:186-189` — `subscribe()` body = `self.watcher.subscribe()`. +9. `crates/lance-graph-callcenter/src/lance_membrane.rs` — `project()` path (wherever it completes a commit) — call `self.watcher.bump(row)` after the Lance write. +10. `crates/lance-graph-callcenter/src/lance_membrane.rs` tests — flip `subscribe_returns_disconnected_receiver` to `subscribe_receives_on_project`. +11. `crates/lance-graph-contract/src/external_membrane.rs` — IF the `Subscription` associated type is declared here with bounds, widen to `Clone + Send` as needed. Do NOT add Vsa/RoleKey/NarsTruth — BBB deny-list stays intact. +12. Board files (INTEGRATION_PLANS, STATUS_BOARD, EPIPHANIES, unified-integration-v1 §6) — per Acceptance. + +## Test + +- New test `subscribe_receives_on_project` — construct `LanceMembrane`, call `subscribe()` → `rx`, call `project(some_event)` → assert `rx.borrow().` matches `some_event`. diff --git a/crates/lance-graph-callcenter/src/drain.rs b/crates/lance-graph-callcenter/src/drain.rs new file mode 100644 index 00000000..de8b205b --- /dev/null +++ b/crates/lance-graph-callcenter/src/drain.rs @@ -0,0 +1,89 @@ +//! `DrainTask` — DM-6 scaffold of the callcenter membrane plan. +//! +//! Future home for the `steering_intent` Lance-read → `UnifiedStep` → +//! `OrchestrationBridge::route()` pipeline. This PR ships only the +//! type shell and a `Poll::Pending` `drain()` method so that +//! `lib.rs` can re-export the name and consumers can start wiring +//! against the surface. The live drain loop lands in a follow-up. +//! +//! Plan: `.claude/plans/supabase-subscriber-v1.md` § DM-6. + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Background task that drains `steering_intent` rows from Lance and +/// forwards them to the `OrchestrationBridge`. +/// +/// **Scaffold only.** Fields and the drain loop will be populated in +/// the follow-up PR. Ships now so that `LanceMembrane` consumers can +/// import the symbol and so the `pub mod drain` re-export in +/// `lib.rs` is honest about the type existing. +#[derive(Debug, Default)] +pub struct DrainTask { + /// Monotonic count of rows drained (zero until DM-6b lands). + drained: u64, +} + +impl DrainTask { + /// Build an empty drain task. The follow-up PR will add + /// `new(dataset: &LanceDataset, bridge: Arc)`. + pub fn new() -> Self { + Self::default() + } + + /// How many `steering_intent` rows this task has forwarded so far. + pub fn drained(&self) -> u64 { + self.drained + } + + /// Poll the drain loop. + /// + /// Returns `Poll::Pending` unconditionally in the scaffold; the + /// follow-up PR replaces this with the Lance read + route pipeline. + pub fn drain(&mut self, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } +} + +/// `Future` adapter so the scaffold composes with `tokio::spawn` +/// as soon as the drain body lands. +impl Future for DrainTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + self.as_mut().drain(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use core::task::{RawWaker, RawWakerVTable, Waker}; + + fn noop_waker() -> Waker { + const VTABLE: RawWakerVTable = RawWakerVTable::new( + |_| RawWaker::new(core::ptr::null(), &VTABLE), + |_| {}, + |_| {}, + |_| {}, + ); + // SAFETY: The vtable functions are all no-ops and never + // dereference the pointer; null is safe here. + unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &VTABLE)) } + } + + #[test] + fn scaffold_starts_at_zero() { + let task = DrainTask::new(); + assert_eq!(task.drained(), 0); + } + + #[test] + fn drain_is_pending_in_scaffold() { + let mut task = DrainTask::new(); + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + assert!(matches!(task.drain(&mut cx), Poll::Pending)); + } +} diff --git a/crates/lance-graph-callcenter/src/lance_membrane.rs b/crates/lance-graph-callcenter/src/lance_membrane.rs index 70cef2f1..0110af66 100644 --- a/crates/lance-graph-callcenter/src/lance_membrane.rs +++ b/crates/lance-graph-callcenter/src/lance_membrane.rs @@ -26,9 +26,18 @@ use std::sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc, RwLock, + RwLock, }; +#[cfg(not(feature = "realtime"))] +use std::sync::mpsc; + +#[cfg(feature = "realtime")] +use tokio::sync::watch; + +#[cfg(feature = "realtime")] +use crate::version_watcher::LanceVersionWatcher; + use lance_graph_contract::{ a2a_blackboard::ExpertId, cognitive_shader::{MetaWord, ShaderBus}, @@ -54,6 +63,9 @@ pub struct LanceMembrane { current_scent: AtomicU64, current_rationale_phase: AtomicBool, // MM-CoT stage: true = Stage 1 rationale version: AtomicU64, + /// Fan-out watcher for projected cognitive events ([realtime] feature only). + #[cfg(feature = "realtime")] + watcher: LanceVersionWatcher, } impl LanceMembrane { @@ -65,6 +77,8 @@ impl LanceMembrane { current_scent: AtomicU64::new(0), current_rationale_phase: AtomicBool::new(false), version: AtomicU64::new(0), + #[cfg(feature = "realtime")] + watcher: LanceVersionWatcher::default(), } } @@ -109,12 +123,15 @@ impl ExternalMembrane for LanceMembrane { /// External consumer intent entering through the gate. type Intent = ExternalIntent; - /// Phase-A subscription stub: a disconnected `mpsc::Receiver`. + /// Subscription handle for projected cognitive events. /// - /// Callers that `recv()` on this immediately get `Err(RecvError)`. - /// Phase D replaces this with a `tokio::sync::watch::Receiver` - /// wired to the Lance version counter, filtered by `CommitFilter`. + /// With `[realtime]` feature: a `tokio::sync::watch::Receiver` + /// wired to `LanceVersionWatcher` — always-latest semantics, supabase-shape. + /// Without `[realtime]`: a disconnected `mpsc::Receiver` stub (Phase A). + #[cfg(not(feature = "realtime"))] type Subscription = mpsc::Receiver; + #[cfg(feature = "realtime")] + type Subscription = watch::Receiver; /// Project a committed ShaderBus cycle to a scalar row. /// @@ -126,7 +143,7 @@ impl ExternalMembrane for LanceMembrane { let expert = *self.current_expert.read().expect("expert poisoned"); let scent = self.current_scent.load(Ordering::Relaxed) as u8; - CognitiveEventRow { + let row = CognitiveEventRow { external_role: role, faculty_role: faculty, expert_id: expert, @@ -144,7 +161,13 @@ impl ExternalMembrane for LanceMembrane { gate_commit: bus.gate.is_flow(), gate_f: meta.free_e(), rationale_phase: self.current_rationale_phase.load(Ordering::Relaxed), - } + }; + + // DM-4: fan out to all current subscribers (supabase-shape realtime). + #[cfg(feature = "realtime")] + self.watcher.bump(row.clone()); + + row } /// Translate external intent to canonical dispatch. @@ -181,12 +204,19 @@ impl ExternalMembrane for LanceMembrane { /// Subscribe to projected commits matching the filter. /// - /// Phase A: returns a disconnected channel (recv immediately errors). - /// Phase D: wires to `tokio::sync::watch` + `CommitFilter` predicate. + /// With `[realtime]`: returns a `watch::Receiver` seeded + /// with the latest committed row. Always-latest semantics (supabase-shape). + /// Without `[realtime]`: returns a disconnected `mpsc::Receiver` stub. + #[cfg(not(feature = "realtime"))] fn subscribe(&self, _filter: CommitFilter) -> mpsc::Receiver { let (_tx, rx) = mpsc::channel(); rx } + + #[cfg(feature = "realtime")] + fn subscribe(&self, _filter: CommitFilter) -> watch::Receiver { + self.watcher.subscribe() + } } // ───────────────────────────────────────────────────────────────────────────── @@ -263,6 +293,8 @@ mod tests { assert_eq!(step.step_type, "lg.blackboard.context"); } + /// Phase A (no realtime feature): subscription is a disconnected stub. + #[cfg(not(feature = "realtime"))] #[test] fn subscribe_returns_disconnected_receiver() { let m = LanceMembrane::new(); @@ -271,6 +303,27 @@ mod tests { assert!(rx.try_recv().is_err()); } + /// Phase D (realtime feature): subscribe() → project() → rx.borrow() sees the row. + #[cfg(feature = "realtime")] + #[test] + fn subscribe_receives_on_project() { + let m = LanceMembrane::new(); + let rx = m.subscribe(CommitFilter::default()); + + // Prime role context and call project() + let intent = ExternalIntent::seed(ExternalRole::CrewaiAgent, make_dn(), vec![]); + m.ingest(intent); + + let bus = ShaderBus::empty(); + let meta = MetaWord::new(7, 3, 200, 150, 10); + m.project(&bus, meta); + + // The watcher should have delivered the row + let snapshot = rx.borrow(); + assert_eq!(snapshot.thinking, 7, "subscriber should see the projected row"); + assert_eq!(snapshot.external_role, ExternalRole::CrewaiAgent as u8); + } + #[test] fn set_faculty_context_wires_rationale_phase() { let m = LanceMembrane::new(); diff --git a/crates/lance-graph-callcenter/src/lib.rs b/crates/lance-graph-callcenter/src/lib.rs index f503eee8..fba9568a 100644 --- a/crates/lance-graph-callcenter/src/lib.rs +++ b/crates/lance-graph-callcenter/src/lib.rs @@ -68,15 +68,15 @@ pub use vsa_udfs::register_vsa_udfs; pub mod filter_expr; // DM-4 — LanceVersionWatcher: tail version counter → Phoenix events ([realtime]) -// #[cfg(feature = "realtime")] -// pub mod version_watcher; +#[cfg(feature = "realtime")] +pub mod version_watcher; // DM-5 — PhoenixServer: minimal WS server, Phoenix channel subset ([realtime]) // #[cfg(feature = "realtime")] // pub mod phoenix; // DM-6 — DrainTask: steering_intent → UnifiedStep → OrchestrationBridge -// pub mod drain; +pub mod drain; // DM-7 — JwtMiddleware + ActorContext → LogicalPlan RLS rewriter ([auth]) // Resolve UNKNOWN-3 (pgwire?) and UNKNOWN-4 (actor_id type) first. diff --git a/crates/lance-graph-callcenter/src/version_watcher.rs b/crates/lance-graph-callcenter/src/version_watcher.rs new file mode 100644 index 00000000..06c2c1b2 --- /dev/null +++ b/crates/lance-graph-callcenter/src/version_watcher.rs @@ -0,0 +1,117 @@ +//! `LanceVersionWatcher` — DM-4 of the callcenter membrane plan. +//! +//! Single-producer / many-consumer fan-out over `tokio::sync::watch`. The +//! membrane is the sole writer (one instance per session); every external +//! subscriber receives the latest `CognitiveEventRow` and skips stale +//! revisions — supabase-realtime shape with always-latest semantics. +//! +//! # BBB invariant +//! +//! The channel payload is `CognitiveEventRow`, the canonical Arrow-scalar +//! outbound DTO. `bbb_scalar_only_compile_check` in `lance_membrane.rs` +//! proves the row carries no VSA / RoleKey / NarsTruth. +//! +//! Plan: `.claude/plans/supabase-subscriber-v1.md` § DM-4. + +use tokio::sync::watch; + +use crate::external_intent::CognitiveEventRow; + +/// Fan-out for projected cognitive events. +/// +/// Wraps a `tokio::sync::watch` channel keyed on `CognitiveEventRow`. +/// Created with a sentinel initial value (default row). Each +/// `LanceMembrane::project()` call feeds the latest committed row via +/// [`bump`](Self::bump); subscribers observe it with [`subscribe`](Self::subscribe). +#[derive(Debug)] +pub struct LanceVersionWatcher { + tx: watch::Sender, +} + +impl LanceVersionWatcher { + /// Build a watcher seeded with `initial`. + /// + /// The first `subscribe()` call sees this value. Typical construction + /// uses `CognitiveEventRow::default()` as the sentinel — subscribers + /// that poll before any `project()` fire see an all-zero row. + pub fn new(initial: CognitiveEventRow) -> Self { + let (tx, _rx) = watch::channel(initial); + Self { tx } + } + + /// Publish a fresh committed row. All current subscribers observe it. + /// + /// Returns `true` when at least one subscriber is listening, `false` + /// when every receiver has been dropped. The membrane ignores the + /// return value — a session with zero subscribers is a valid state. + pub fn bump(&self, row: CognitiveEventRow) -> bool { + self.tx.send(row).is_ok() + } + + /// Attach a new subscriber. + /// + /// The receiver sees the most recently bumped row on first + /// `borrow()` and is woken by subsequent bumps. Per `tokio::sync::watch` + /// semantics, a slow subscriber may skip intermediate revisions — the + /// supabase-shape "always-latest" guarantee. + pub fn subscribe(&self) -> watch::Receiver { + self.tx.subscribe() + } + + /// Observer count — useful for tests and diagnostics. + pub fn receiver_count(&self) -> usize { + self.tx.receiver_count() + } +} + +impl Default for LanceVersionWatcher { + fn default() -> Self { + Self::new(CognitiveEventRow::default()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn subscribe_observes_initial() { + let mut row = CognitiveEventRow::default(); + row.thinking = 7; + let w = LanceVersionWatcher::new(row); + let rx = w.subscribe(); + assert_eq!(rx.borrow().thinking, 7); + } + + #[test] + fn bump_delivers_latest() { + let w = LanceVersionWatcher::default(); + let mut rx = w.subscribe(); + + let mut row = CognitiveEventRow::default(); + row.free_e = 42; + assert!(w.bump(row)); + + // Manual borrow_and_update to observe the latest value. + let snapshot = rx.borrow_and_update().clone(); + assert_eq!(snapshot.free_e, 42); + } + + #[test] + fn bump_without_subscribers_returns_false() { + let w = LanceVersionWatcher::default(); + // No subscribers → send succeeds only if a receiver exists. + // `watch::Sender::send` errors when every receiver has been + // dropped; we model that as `bump() == false`. + assert!(!w.bump(CognitiveEventRow::default())); + } + + #[test] + fn receiver_count_tracks_subscribers() { + let w = LanceVersionWatcher::default(); + assert_eq!(w.receiver_count(), 0); + let _rx1 = w.subscribe(); + let _rx2 = w.subscribe(); + assert_eq!(w.receiver_count(), 2); + } +}