feat(callcenter): wire LanceVersionWatcher + DrainTask scaffold (DM-4a/b/c, DM-6a/b)#255
Conversation
…a/b/c, DM-6a/b) Flips LanceMembrane::subscribe() from Phase-A disconnected stub to a live tokio::sync::watch::Receiver<CognitiveEventRow> behind the [realtime] feature gate (supabase-shape always-latest fan-out). project() now calls watcher.bump(row) so every subscriber observes the latest committed CognitiveEventRow without polling. Changes: - crates/lance-graph-callcenter/src/version_watcher.rs — NEW (DM-4b): LanceVersionWatcher wrapping watch::Sender<CognitiveEventRow>; bump/subscribe/receiver_count; 4 tests. - crates/lance-graph-callcenter/src/drain.rs — NEW (DM-6a): DrainTask scaffold, Future impl returning Poll::Pending; 2 tests. - lib.rs — uncomment pub mod version_watcher (#[cfg(feature="realtime")]) and pub mod drain (unconditional) (DM-4c, DM-6b). - lance_membrane.rs — add watcher: LanceVersionWatcher field; Subscription type is watch::Receiver<CognitiveEventRow> under [realtime] / mpsc::Receiver<u64> without; project() calls watcher.bump(row.clone()); subscribe() returns watcher.subscribe(); new test subscribe_receives_on_project (flips subscribe_returns_disconnected_receiver under [realtime]). - Board: STATUS_BOARD DM-4/DM-6 → In PR; INTEGRATION_PLANS prepend supabase-subscriber-v1; EPIPHANIES prepend finding entry. - .claude/plans/supabase-subscriber-v1.md committed (plan file). Tests: 13 pass (no realtime), 17 pass (with realtime). Zero regressions. https://claude.ai/code/session_01SbYsmmbPf9YQuYbHZN52Zh
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ec3b5c7742
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let (tx, _rx) = watch::channel(initial); | ||
| Self { tx } |
There was a problem hiding this comment.
Preserve watcher state without active subscribers
Dropping the initial receiver immediately means the channel can have zero receivers for long periods, and bump() then fails instead of updating the stored value. In practice, any project() that happens before the first subscriber (or between subscriber disconnect/reconnect windows) is lost, so new subscribers do not get the latest committed row even though this module/documentation describes always-latest behavior.
Useful? React with 👍 / 👎.
| fn subscribe(&self, _filter: CommitFilter) -> watch::Receiver<CognitiveEventRow> { | ||
| self.watcher.subscribe() |
There was a problem hiding this comment.
Honor CommitFilter in realtime subscribe path
The realtime subscribe implementation ignores the provided CommitFilter and always returns an unfiltered watcher receiver. Because this commit turns subscriptions into live event delivery, callers that request actor/free-energy/commit filtering now receive every row, which breaks the contract implied by subscribe(filter) and can leak unrelated events to subscribers that expected scoped data.
Useful? React with 👍 / 👎.
…riber-wire-up # Conflicts: # .claude/board/AGENT_LOG.md
Summary
LanceVersionWatcherinversion_watcher.rs(117 LOC, 4 tests);#[cfg(feature="realtime")]pub mod inlib.rs;LanceMembranegainswatcherfield,project()callsbump(row),subscribe()returnswatch::Receiver<CognitiveEventRow>— flips Subscription type from Phase-Ampsc::Receiver<u64>stub to live supabase-shape always-latest fan-out.DrainTaskscaffold indrain.rs(89 LOC, 2 tests),Futureimpl returningPoll::Pending; unconditionalpub mod draininlib.rs.subscribe_receives_on_project(under[realtime]): callsproject(), assertsrx.borrow().thinkingmatches the projected row.STATUS_BOARD.mdDM-4/DM-6 → In PR;INTEGRATION_PLANS.mdprepend entry;EPIPHANIES.mdprepend finding.Test plan
cargo test -p lance-graph-callcenter --lib— 13 pass (without realtime)cargo test -p lance-graph-callcenter --lib --features realtime— 17 pass (4 new inversion_watcher, 1 newsubscribe_receives_on_project)cargo check -p lance-graph-callcenter— clean (no callcenter warnings)cargo check -p lance-graph-callcenter --features realtime— cleanKey constraint respected
Tokio was already an optional dep under
[realtime]inCargo.toml— no new dependencies added.https://claude.ai/code/session_01SbYsmmbPf9YQuYbHZN52Zh
Generated by Claude Code