You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
**Tests:** 17 pass with realtime feature (13 without); 5 new tests total (4 in version_watcher.rs, 1 subscribe_receives_on_project in lance_membrane.rs)
12
+
**Outcome:** Wired LanceMembrane::subscribe() from Phase-A disconnected mpsc stub to live tokio::sync::watch::Receiver<CognitiveEventRow> under [realtime] feature. project() now calls watcher.bump(row.clone()) on every projected cycle. DrainTask scaffold (Poll::Pending) ships unconditionally. Tokio was already a dep — no Cargo.toml changes needed. PR 255: https://github.com/AdaWorldAPI/lance-graph/pull/255
`LanceMembrane::subscribe()` now returns a `tokio::sync::watch::Receiver<CognitiveEventRow>` under the `[realtime]` feature gate — supabase-shape always-latest semantics. `project()` calls `watcher.bump(row)` after building the scalar row; subscribers observe the latest committed event without polling. `DrainTask` scaffold ships unconditionally (no feature gate) as a `Future` shell for the follow-up `steering_intent` drain loop. Tokio was already an optional dep in `lance-graph-callcenter/Cargo.toml` under `[realtime]` — no new deps required.
**Scope:** Flip `LanceMembrane::subscribe()` from Phase-A stub to a live `tokio::sync::watch::Receiver<CognitiveEventRow>` wired to `LanceVersionWatcher`; ship `DrainTask` scaffold.
**Deliverables:** DM-4a swap Subscription type, DM-4b `version_watcher.rs`, DM-4c uncomment `pub mod version_watcher`, DM-6a `drain.rs` scaffold, DM-6b uncomment `pub mod drain`.
45
+
46
+
**Status (2026-04-24):** In PR. All deliverables in branch `claude/supabase-subscriber-wire-up`.
47
+
48
+
**Confidence (2026-04-24):** FINDING — 17 tests pass (13 without realtime, 17 with; 4 new tests in `version_watcher.rs`, 1 new `subscribe_receives_on_project` in `lance_membrane.rs`). Zero regressions.
> **Scope:**`lance-graph-callcenter` crate only (plus `external_membrane.rs` if trait surface bends)
6
+
> **Depends on:** none (substrate-independent)
7
+
8
+
## Goal
9
+
10
+
Flip `LanceMembrane::subscribe()` from GHOST to PARTIAL. Ship DM-4 `LanceVersionWatcher` + DM-6 `DrainTask`. Close the Outside-BBB loop so Lance dataset version bumps fire notifications to subscribers with filtered `CognitiveEventRow` payloads.
11
+
12
+
## Deliverables
13
+
14
+
-**DM-4a** — Swap `Subscription` associated type from `mpsc::Receiver<u64>` to `tokio::sync::watch::Receiver<CognitiveEventRow>` in `lance_membrane.rs`.
15
+
-**DM-4b** — Create `crates/lance-graph-callcenter/src/version_watcher.rs`. Holds the `watch::Sender<CognitiveEventRow>`; `bump(row)` on each `project()` commit.
16
+
-**DM-4c** — Uncomment `pub mod version_watcher` in `lib.rs:71-72`; export `LanceVersionWatcher`.
17
+
-**DM-5a** — `subscribe(filter)` returns the `watch::Receiver` wrapped with a `CommitFilter` predicate combinator.
18
+
-**DM-6a** — Create `crates/lance-graph-callcenter/src/drain.rs`. Scaffold only — `DrainTask` struct + `drain()` method that currently returns `Poll::Pending`. Wiring to `OrchestrationBridge::route()` is the follow-up PR.
19
+
-**DM-6b** — Uncomment `pub mod drain` in `lib.rs:78-79`; export `DrainTask`.
20
+
-**DM-7** — Flip test `subscribe_returns_disconnected_receiver` to `subscribe_receives_on_project` — assert `rx.borrow().version > 0` after a `project()` call.
21
+
22
+
## Non-goals (explicit)
23
+
24
+
-`dialect` Phase-B source wiring — separate TECH_DEBT row (@callcenter-specialist).
-`DrainTask` runtime drain of `steering_intent` — this PR ships only the scaffold.
28
+
29
+
## Acceptance criteria
30
+
31
+
-`cargo test -p lance-graph-callcenter --lib` — 11 existing tests pass + new `subscribe_receives_on_project` test passes. Zero regressions.
32
+
-`bbb_scalar_only_compile_check` still compiles.
33
+
-`cargo check --workspace` compiles.
34
+
- Verdict flip in `.claude/plans/unified-integration-v1.md §6`: Supabase row `GHOST` → `PARTIAL` (one-line Edit on the table row).
35
+
-`.claude/board/INTEGRATION_PLANS.md` — prepend entry pointing to this plan file.
36
+
-`.claude/board/STATUS_BOARD.md` — DM-4 / DM-6 rows status updated.
37
+
-`.claude/board/EPIPHANIES.md` — prepend short FINDING entry noting subscribe wire-up.
38
+
39
+
## Architecture notes
40
+
41
+
Per CLAUDE.md BBB invariant, `Subscription` must carry Arrow-scalar content only — `CognitiveEventRow` is the canonical outbound DTO and is already scalar-only (compile-time enforced by `bbb_scalar_only_compile_check`). `tokio::sync::watch` is the right primitive for the supabase-realtime-shaped fan-out: single-producer (the membrane), many-consumer (subscribers), always-latest semantics (skip stale revisions).
The `CommitFilter` wrapper is NOT a new trait — it's a method `LanceMembrane::subscribe_filtered(filter: CommitFilter) -> impl Stream<Item = CognitiveEventRow>` that calls `self.watcher.subscribe()` and uses `tokio_stream::wrappers::WatchStream::new(rx).filter(move |row| filter.matches(row))`.
58
+
59
+
## File-level edits (full list)
60
+
61
+
1.`crates/lance-graph-callcenter/Cargo.toml` — add `tokio = { workspace = true, features = ["sync"] }` if not present; add `tokio-stream = { workspace = true }` (minimal features).
8.`crates/lance-graph-callcenter/src/lance_membrane.rs:186-189` — `subscribe()` body = `self.watcher.subscribe()`.
69
+
9.`crates/lance-graph-callcenter/src/lance_membrane.rs` — `project()` path (wherever it completes a commit) — call `self.watcher.bump(row)` after the Lance write.
70
+
10.`crates/lance-graph-callcenter/src/lance_membrane.rs` tests — flip `subscribe_returns_disconnected_receiver` to `subscribe_receives_on_project`.
71
+
11.`crates/lance-graph-contract/src/external_membrane.rs` — IF the `Subscription` associated type is declared here with bounds, widen to `Clone + Send` as needed. Do NOT add Vsa/RoleKey/NarsTruth — BBB deny-list stays intact.
0 commit comments