forked from lance-format/lance-graph
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(callcenter): wire LanceVersionWatcher + DrainTask scaffold (DM-4a/b/c, DM-6a/b) #255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| # Supabase-shape Subscriber Flow Wire-up — v1 | ||
|
|
||
| > **Status:** In progress (2026-04-24) | ||
| > **Owner:** @callcenter-specialist, @bus-compiler | ||
| > **Scope:** `lance-graph-callcenter` crate only (plus `external_membrane.rs` if trait surface bends) | ||
| > **Depends on:** none (substrate-independent) | ||
|
|
||
| ## Goal | ||
|
|
||
| Flip `LanceMembrane::subscribe()` from GHOST to PARTIAL. Ship DM-4 `LanceVersionWatcher` + DM-6 `DrainTask`. Close the Outside-BBB loop so Lance dataset version bumps fire notifications to subscribers with filtered `CognitiveEventRow` payloads. | ||
|
|
||
| ## Deliverables | ||
|
|
||
| - **DM-4a** — Swap `Subscription` associated type from `mpsc::Receiver<u64>` to `tokio::sync::watch::Receiver<CognitiveEventRow>` in `lance_membrane.rs`. | ||
| - **DM-4b** — Create `crates/lance-graph-callcenter/src/version_watcher.rs`. Holds the `watch::Sender<CognitiveEventRow>`; `bump(row)` on each `project()` commit. | ||
| - **DM-4c** — Uncomment `pub mod version_watcher` in `lib.rs:71-72`; export `LanceVersionWatcher`. | ||
| - **DM-5a** — `subscribe(filter)` returns the `watch::Receiver` wrapped with a `CommitFilter` predicate combinator. | ||
| - **DM-6a** — Create `crates/lance-graph-callcenter/src/drain.rs`. Scaffold only — `DrainTask` struct + `drain()` method that currently returns `Poll::Pending`. Wiring to `OrchestrationBridge::route()` is the follow-up PR. | ||
| - **DM-6b** — Uncomment `pub mod drain` in `lib.rs:78-79`; export `DrainTask`. | ||
| - **DM-7** — Flip test `subscribe_returns_disconnected_receiver` to `subscribe_receives_on_project` — assert `rx.borrow().version > 0` after a `project()` call. | ||
|
|
||
| ## Non-goals (explicit) | ||
|
|
||
| - `dialect` Phase-B source wiring — separate TECH_DEBT row (@callcenter-specialist). | ||
| - `scent` Phase-C CAM-PQ cascade — blocked on substrate migration (PR B, pending). | ||
| - `PhoenixServer` DM-5 — Queued separately. | ||
| - `DrainTask` runtime drain of `steering_intent` — this PR ships only the scaffold. | ||
|
|
||
| ## Acceptance criteria | ||
|
|
||
| - `cargo test -p lance-graph-callcenter --lib` — 11 existing tests pass + new `subscribe_receives_on_project` test passes. Zero regressions. | ||
| - `bbb_scalar_only_compile_check` still compiles. | ||
| - `cargo check --workspace` compiles. | ||
| - Verdict flip in `.claude/plans/unified-integration-v1.md §6`: Supabase row `GHOST` → `PARTIAL` (one-line Edit on the table row). | ||
| - `.claude/board/INTEGRATION_PLANS.md` — prepend entry pointing to this plan file. | ||
| - `.claude/board/STATUS_BOARD.md` — DM-4 / DM-6 rows status updated. | ||
| - `.claude/board/EPIPHANIES.md` — prepend short FINDING entry noting subscribe wire-up. | ||
|
|
||
| ## Architecture notes | ||
|
|
||
| Per CLAUDE.md BBB invariant, `Subscription` must carry Arrow-scalar content only — `CognitiveEventRow` is the canonical outbound DTO and is already scalar-only (compile-time enforced by `bbb_scalar_only_compile_check`). `tokio::sync::watch` is the right primitive for the supabase-realtime-shaped fan-out: single-producer (the membrane), many-consumer (subscribers), always-latest semantics (skip stale revisions). | ||
|
|
||
| Implementation sketch: | ||
|
|
||
| ```rust | ||
| // version_watcher.rs | ||
| pub struct LanceVersionWatcher { | ||
| tx: tokio::sync::watch::Sender<CognitiveEventRow>, | ||
| } | ||
| impl LanceVersionWatcher { | ||
| pub fn new(initial: CognitiveEventRow) -> Self { ... } | ||
| pub fn bump(&self, row: CognitiveEventRow) { let _ = self.tx.send(row); } | ||
| pub fn subscribe(&self) -> tokio::sync::watch::Receiver<CognitiveEventRow> { self.tx.subscribe() } | ||
| } | ||
| ``` | ||
|
|
||
| The `CommitFilter` wrapper is NOT a new trait — it's a method `LanceMembrane::subscribe_filtered(filter: CommitFilter) -> impl Stream<Item = CognitiveEventRow>` that calls `self.watcher.subscribe()` and uses `tokio_stream::wrappers::WatchStream::new(rx).filter(move |row| filter.matches(row))`. | ||
|
|
||
| ## File-level edits (full list) | ||
|
|
||
| 1. `crates/lance-graph-callcenter/Cargo.toml` — add `tokio = { workspace = true, features = ["sync"] }` if not present; add `tokio-stream = { workspace = true }` (minimal features). | ||
| 2. `crates/lance-graph-callcenter/src/version_watcher.rs` — NEW. | ||
| 3. `crates/lance-graph-callcenter/src/drain.rs` — NEW (scaffold). | ||
| 4. `crates/lance-graph-callcenter/src/lib.rs:71-72` — uncomment `pub mod version_watcher`. | ||
| 5. `crates/lance-graph-callcenter/src/lib.rs:78-79` — uncomment `pub mod drain`. | ||
| 6. `crates/lance-graph-callcenter/src/lance_membrane.rs:56` — field `watcher: LanceVersionWatcher`. | ||
| 7. `crates/lance-graph-callcenter/src/lance_membrane.rs:117` — `type Subscription = watch::Receiver<CognitiveEventRow>`. | ||
| 8. `crates/lance-graph-callcenter/src/lance_membrane.rs:186-189` — `subscribe()` body = `self.watcher.subscribe()`. | ||
| 9. `crates/lance-graph-callcenter/src/lance_membrane.rs` — `project()` path (wherever it completes a commit) — call `self.watcher.bump(row)` after the Lance write. | ||
| 10. `crates/lance-graph-callcenter/src/lance_membrane.rs` tests — flip `subscribe_returns_disconnected_receiver` to `subscribe_receives_on_project`. | ||
| 11. `crates/lance-graph-contract/src/external_membrane.rs` — IF the `Subscription` associated type is declared here with bounds, widen to `Clone + Send` as needed. Do NOT add Vsa/RoleKey/NarsTruth — BBB deny-list stays intact. | ||
| 12. Board files (INTEGRATION_PLANS, STATUS_BOARD, EPIPHANIES, unified-integration-v1 §6) — per Acceptance. | ||
|
|
||
| ## Test | ||
|
|
||
| - New test `subscribe_receives_on_project` — construct `LanceMembrane`, call `subscribe()` → `rx`, call `project(some_event)` → assert `rx.borrow().<relevant field>` matches `some_event`. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| //! `DrainTask` — DM-6 scaffold of the callcenter membrane plan. | ||
| //! | ||
| //! Future home for the `steering_intent` Lance-read → `UnifiedStep` → | ||
| //! `OrchestrationBridge::route()` pipeline. This PR ships only the | ||
| //! type shell and a `Poll::Pending` `drain()` method so that | ||
| //! `lib.rs` can re-export the name and consumers can start wiring | ||
| //! against the surface. The live drain loop lands in a follow-up. | ||
| //! | ||
| //! Plan: `.claude/plans/supabase-subscriber-v1.md` § DM-6. | ||
|
|
||
| use core::future::Future; | ||
| use core::pin::Pin; | ||
| use core::task::{Context, Poll}; | ||
|
|
||
| /// Background task that drains `steering_intent` rows from Lance and | ||
| /// forwards them to the `OrchestrationBridge`. | ||
| /// | ||
| /// **Scaffold only.** Fields and the drain loop will be populated in | ||
| /// the follow-up PR. Ships now so that `LanceMembrane` consumers can | ||
| /// import the symbol and so the `pub mod drain` re-export in | ||
| /// `lib.rs` is honest about the type existing. | ||
| #[derive(Debug, Default)] | ||
| pub struct DrainTask { | ||
| /// Monotonic count of rows drained (zero until DM-6b lands). | ||
| drained: u64, | ||
| } | ||
|
|
||
| impl DrainTask { | ||
| /// Build an empty drain task. The follow-up PR will add | ||
| /// `new(dataset: &LanceDataset, bridge: Arc<dyn OrchestrationBridge>)`. | ||
| pub fn new() -> Self { | ||
| Self::default() | ||
| } | ||
|
|
||
| /// How many `steering_intent` rows this task has forwarded so far. | ||
| pub fn drained(&self) -> u64 { | ||
| self.drained | ||
| } | ||
|
|
||
| /// Poll the drain loop. | ||
| /// | ||
| /// Returns `Poll::Pending` unconditionally in the scaffold; the | ||
| /// follow-up PR replaces this with the Lance read + route pipeline. | ||
| pub fn drain(&mut self, _cx: &mut Context<'_>) -> Poll<()> { | ||
| Poll::Pending | ||
| } | ||
| } | ||
|
|
||
| /// `Future` adapter so the scaffold composes with `tokio::spawn` | ||
| /// as soon as the drain body lands. | ||
| impl Future for DrainTask { | ||
| type Output = (); | ||
|
|
||
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { | ||
| self.as_mut().drain(cx) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use core::task::{RawWaker, RawWakerVTable, Waker}; | ||
|
|
||
| fn noop_waker() -> Waker { | ||
| const VTABLE: RawWakerVTable = RawWakerVTable::new( | ||
| |_| RawWaker::new(core::ptr::null(), &VTABLE), | ||
| |_| {}, | ||
| |_| {}, | ||
| |_| {}, | ||
| ); | ||
| // SAFETY: The vtable functions are all no-ops and never | ||
| // dereference the pointer; null is safe here. | ||
| unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &VTABLE)) } | ||
| } | ||
|
|
||
| #[test] | ||
| fn scaffold_starts_at_zero() { | ||
| let task = DrainTask::new(); | ||
| assert_eq!(task.drained(), 0); | ||
| } | ||
|
|
||
| #[test] | ||
| fn drain_is_pending_in_scaffold() { | ||
| let mut task = DrainTask::new(); | ||
| let waker = noop_waker(); | ||
| let mut cx = Context::from_waker(&waker); | ||
| assert!(matches!(task.drain(&mut cx), Poll::Pending)); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The realtime
subscribeimplementation ignores the providedCommitFilterand always returns an unfiltered watcher receiver. Because this commit turns subscriptions into live event delivery, callers that request actor/free-energy/commit filtering now receive every row, which breaks the contract implied bysubscribe(filter)and can leak unrelated events to subscribers that expected scoped data.Useful? React with 👍 / 👎.