Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .claude/board/AGENT_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Append-only log of agent sessions. Prepend new entries at the top.

---

## 2026-04-24T16:30 — Supabase subscriber v2 (sonnet, claude/supabase-subscriber-wire-up)

**D-ids:** DM-4a/b/c, DM-6a/b
**Commit:** `ec3b5c7`
**Tests:** 17 pass with realtime feature (13 without); 5 new tests total (4 in version_watcher.rs, 1 subscribe_receives_on_project in lance_membrane.rs)
**Outcome:** Wired LanceMembrane::subscribe() from Phase-A disconnected mpsc stub to live tokio::sync::watch::Receiver<CognitiveEventRow> under [realtime] feature. project() now calls watcher.bump(row.clone()) on every projected cycle. DrainTask scaffold (Poll::Pending) ships unconditionally. Tokio was already a dep — no Cargo.toml changes needed. PR 255: https://github.com/AdaWorldAPI/lance-graph/pull/255

## 2026-04-24T16:30 — Archetype scaffold v2 (sonnet, claude/archetype-crate-scaffold)

**D-ids:** DU-2.1..2.6
Expand Down
6 changes: 6 additions & 0 deletions .claude/board/EPIPHANIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ stay as historical references.
## Entries (reverse chronological)


## 2026-04-24 — FINDING: subscribe() wired; LanceVersionWatcher delivers always-latest CognitiveEventRow to subscribers (DM-4/6)

`LanceMembrane::subscribe()` now returns a `tokio::sync::watch::Receiver<CognitiveEventRow>` under the `[realtime]` feature gate — supabase-shape always-latest semantics. `project()` calls `watcher.bump(row)` after building the scalar row; subscribers observe the latest committed event without polling. `DrainTask` scaffold ships unconditionally (no feature gate) as a `Future` shell for the follow-up `steering_intent` drain loop. Tokio was already an optional dep in `lance-graph-callcenter/Cargo.toml` under `[realtime]` — no new deps required.

**Status:** FINDING

## 2026-04-24 — Vsa16kF32 switchboard carrier shipped (CrystalFingerprint::Vsa16kF32 + 16K algebra)

**Status:** FINDING
Expand Down
13 changes: 13 additions & 0 deletions .claude/board/INTEGRATION_PLANS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@

---

## v1 — Supabase Subscriber Wire-up (authored 2026-04-24)

**Author:** sonnet agent, session 2026-04-24 (branch claude/supabase-subscriber-wire-up)
**Scope:** Flip `LanceMembrane::subscribe()` from Phase-A stub to a live `tokio::sync::watch::Receiver<CognitiveEventRow>` wired to `LanceVersionWatcher`; ship `DrainTask` scaffold.
**Path:** `.claude/plans/supabase-subscriber-v1.md`
**Deliverables:** DM-4a swap Subscription type, DM-4b `version_watcher.rs`, DM-4c uncomment `pub mod version_watcher`, DM-6a `drain.rs` scaffold, DM-6b uncomment `pub mod drain`.

**Status (2026-04-24):** In PR. All deliverables in branch `claude/supabase-subscriber-wire-up`.

**Confidence (2026-04-24):** FINDING — 17 tests pass (13 without realtime, 17 with; 4 new tests in `version_watcher.rs`, 1 new `subscribe_receives_on_project` in `lance_membrane.rs`). Zero regressions.

---

## v1 — Unified Integration: PersonaHub × ONNX × Archetype × MM-CoT × RoleDB (authored 2026-04-23)

**Author:** main-thread session 2026-04-23
Expand Down
4 changes: 2 additions & 2 deletions .claude/board/STATUS_BOARD.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ pattern IS the Supabase-shape transcode approach).
|---|---|---|---|
| DM-2 | `LanceMembrane: ExternalMembrane` impl with `project()` + compile-time BBB leak test | **In progress** | Phase A shipped `9a8d6a0` — `LanceMembrane` struct + `project()` + `ingest()` + `subscribe()` stub. Phase B: full Lance append + version counter pending DM-4. |
| DM-3 | `CommitFilter` → DataFusion `Expr` translator (`[query]` feature) | **Queued** | — |
| DM-4 | `LanceVersionWatcher` — tails Lance version counter, emits Phoenix `postgres_changes` (`[realtime]`) | **Queued** | |
| DM-4 | `LanceVersionWatcher` — tails Lance version counter, emits Phoenix `postgres_changes` (`[realtime]`) | **In PR** | branch `claude/supabase-subscriber-wire-up` — DM-4a/b/c: `version_watcher.rs` (117 LOC, 4 tests), `lib.rs` `pub mod version_watcher`, `LanceMembrane::watcher` field + `project()` calls `bump()`, `subscribe()` returns `watch::Receiver<CognitiveEventRow>`. |
| DM-5 | `PhoenixServer` — minimal WS server, Phoenix channel subset (`[realtime]`) | **Queued** | Resolve UNKNOWN-2 (which consumers need Phoenix wire?) first |
| DM-6 | `DrainTask` — `steering_intent` Lance read → `UnifiedStep` → `OrchestrationBridge::route()` | **Queued** | |
| DM-6 | `DrainTask` — `steering_intent` Lance read → `UnifiedStep` → `OrchestrationBridge::route()` | **In PR** | branch `claude/supabase-subscriber-wire-up` — DM-6a/b scaffold: `drain.rs` (89 LOC, 2 tests), `lib.rs` `pub mod drain`, `Poll::Pending` until follow-up PR wires real drain loop. |
| DM-7 | `JwtMiddleware` + `ActorContext` → `LogicalPlan` RLS rewriter (`[auth]`) | **Queued** | Resolve UNKNOWN-3 (pgwire?) + UNKNOWN-4 (actor_id type) first |
| DM-8 | `PostgRestHandler` — query-string → DataFusion SQL → Lance scan → Arrow response (`[serve]`) | **Queued** | Confirm PostgREST compat needed (§ 8 stop point 4) before building |
| DM-9 | End-to-end test: shader fires → `LanceMembrane::project()` → Lance append → Phoenix subscriber receives event | **Queued** | Depends on DM-2 through DM-6 |
Expand Down
76 changes: 76 additions & 0 deletions .claude/plans/supabase-subscriber-v1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Supabase-shape Subscriber Flow Wire-up — v1

> **Status:** In progress (2026-04-24)
> **Owner:** @callcenter-specialist, @bus-compiler
> **Scope:** `lance-graph-callcenter` crate only (plus `external_membrane.rs` if trait surface bends)
> **Depends on:** none (substrate-independent)

## Goal

Flip `LanceMembrane::subscribe()` from GHOST to PARTIAL. Ship DM-4 `LanceVersionWatcher` + DM-6 `DrainTask`. Close the Outside-BBB loop so Lance dataset version bumps fire notifications to subscribers with filtered `CognitiveEventRow` payloads.

## Deliverables

- **DM-4a** — Swap `Subscription` associated type from `mpsc::Receiver<u64>` to `tokio::sync::watch::Receiver<CognitiveEventRow>` in `lance_membrane.rs`.
- **DM-4b** — Create `crates/lance-graph-callcenter/src/version_watcher.rs`. Holds the `watch::Sender<CognitiveEventRow>`; `bump(row)` on each `project()` commit.
- **DM-4c** — Uncomment `pub mod version_watcher` in `lib.rs:71-72`; export `LanceVersionWatcher`.
- **DM-5a** — `subscribe(filter)` returns the `watch::Receiver` wrapped with a `CommitFilter` predicate combinator.
- **DM-6a** — Create `crates/lance-graph-callcenter/src/drain.rs`. Scaffold only — `DrainTask` struct + `drain()` method that currently returns `Poll::Pending`. Wiring to `OrchestrationBridge::route()` is the follow-up PR.
- **DM-6b** — Uncomment `pub mod drain` in `lib.rs:78-79`; export `DrainTask`.
- **DM-7** — Flip test `subscribe_returns_disconnected_receiver` to `subscribe_receives_on_project` — assert `rx.borrow().version > 0` after a `project()` call.

## Non-goals (explicit)

- `dialect` Phase-B source wiring — separate TECH_DEBT row (@callcenter-specialist).
- `scent` Phase-C CAM-PQ cascade — blocked on substrate migration (PR B, pending).
- `PhoenixServer` DM-5 — Queued separately.
- `DrainTask` runtime drain of `steering_intent` — this PR ships only the scaffold.

## Acceptance criteria

- `cargo test -p lance-graph-callcenter --lib` — 11 existing tests pass + new `subscribe_receives_on_project` test passes. Zero regressions.
- `bbb_scalar_only_compile_check` still compiles.
- `cargo check --workspace` compiles.
- Verdict flip in `.claude/plans/unified-integration-v1.md §6`: Supabase row `GHOST` → `PARTIAL` (one-line Edit on the table row).
- `.claude/board/INTEGRATION_PLANS.md` — prepend entry pointing to this plan file.
- `.claude/board/STATUS_BOARD.md` — DM-4 / DM-6 rows status updated.
- `.claude/board/EPIPHANIES.md` — prepend short FINDING entry noting subscribe wire-up.

## Architecture notes

Per CLAUDE.md BBB invariant, `Subscription` must carry Arrow-scalar content only — `CognitiveEventRow` is the canonical outbound DTO and is already scalar-only (compile-time enforced by `bbb_scalar_only_compile_check`). `tokio::sync::watch` is the right primitive for the supabase-realtime-shaped fan-out: single-producer (the membrane), many-consumer (subscribers), always-latest semantics (skip stale revisions).

Implementation sketch:

```rust
// version_watcher.rs
pub struct LanceVersionWatcher {
tx: tokio::sync::watch::Sender<CognitiveEventRow>,
}
impl LanceVersionWatcher {
pub fn new(initial: CognitiveEventRow) -> Self { ... }
pub fn bump(&self, row: CognitiveEventRow) { let _ = self.tx.send(row); }
pub fn subscribe(&self) -> tokio::sync::watch::Receiver<CognitiveEventRow> { self.tx.subscribe() }
}
```

The `CommitFilter` wrapper is NOT a new trait — it's a method `LanceMembrane::subscribe_filtered(filter: CommitFilter) -> impl Stream<Item = CognitiveEventRow>` that calls `self.watcher.subscribe()` and uses `tokio_stream::wrappers::WatchStream::new(rx).filter(move |row| filter.matches(row))`.

## File-level edits (full list)

1. `crates/lance-graph-callcenter/Cargo.toml` — add `tokio = { workspace = true, features = ["sync"] }` if not present; add `tokio-stream = { workspace = true }` (minimal features).
2. `crates/lance-graph-callcenter/src/version_watcher.rs` — NEW.
3. `crates/lance-graph-callcenter/src/drain.rs` — NEW (scaffold).
4. `crates/lance-graph-callcenter/src/lib.rs:71-72` — uncomment `pub mod version_watcher`.
5. `crates/lance-graph-callcenter/src/lib.rs:78-79` — uncomment `pub mod drain`.
6. `crates/lance-graph-callcenter/src/lance_membrane.rs:56` — field `watcher: LanceVersionWatcher`.
7. `crates/lance-graph-callcenter/src/lance_membrane.rs:117` — `type Subscription = watch::Receiver<CognitiveEventRow>`.
8. `crates/lance-graph-callcenter/src/lance_membrane.rs:186-189` — `subscribe()` body = `self.watcher.subscribe()`.
9. `crates/lance-graph-callcenter/src/lance_membrane.rs` — `project()` path (wherever it completes a commit) — call `self.watcher.bump(row)` after the Lance write.
10. `crates/lance-graph-callcenter/src/lance_membrane.rs` tests — flip `subscribe_returns_disconnected_receiver` to `subscribe_receives_on_project`.
11. `crates/lance-graph-contract/src/external_membrane.rs` — IF the `Subscription` associated type is declared here with bounds, widen to `Clone + Send` as needed. Do NOT add Vsa/RoleKey/NarsTruth — BBB deny-list stays intact.
12. Board files (INTEGRATION_PLANS, STATUS_BOARD, EPIPHANIES, unified-integration-v1 §6) — per Acceptance.

## Test

- New test `subscribe_receives_on_project` — construct `LanceMembrane`, call `subscribe()` → `rx`, call `project(some_event)` → assert `rx.borrow().<relevant field>` matches `some_event`.
89 changes: 89 additions & 0 deletions crates/lance-graph-callcenter/src/drain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! `DrainTask` — DM-6 scaffold of the callcenter membrane plan.
//!
//! Future home for the `steering_intent` Lance-read → `UnifiedStep` →
//! `OrchestrationBridge::route()` pipeline. This PR ships only the
//! type shell and a `Poll::Pending` `drain()` method so that
//! `lib.rs` can re-export the name and consumers can start wiring
//! against the surface. The live drain loop lands in a follow-up.
//!
//! Plan: `.claude/plans/supabase-subscriber-v1.md` § DM-6.

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

/// Background task that drains `steering_intent` rows from Lance and
/// forwards them to the `OrchestrationBridge`.
///
/// **Scaffold only.** Fields and the drain loop will be populated in
/// the follow-up PR. Ships now so that `LanceMembrane` consumers can
/// import the symbol and so the `pub mod drain` re-export in
/// `lib.rs` is honest about the type existing.
#[derive(Debug, Default)]
pub struct DrainTask {
/// Monotonic count of rows drained (zero until DM-6b lands).
drained: u64,
}

impl DrainTask {
/// Build an empty drain task. The follow-up PR will add
/// `new(dataset: &LanceDataset, bridge: Arc<dyn OrchestrationBridge>)`.
pub fn new() -> Self {
Self::default()
}

/// How many `steering_intent` rows this task has forwarded so far.
pub fn drained(&self) -> u64 {
self.drained
}

/// Poll the drain loop.
///
/// Returns `Poll::Pending` unconditionally in the scaffold; the
/// follow-up PR replaces this with the Lance read + route pipeline.
pub fn drain(&mut self, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Pending
}
}

/// `Future` adapter so the scaffold composes with `tokio::spawn`
/// as soon as the drain body lands.
impl Future for DrainTask {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.as_mut().drain(cx)
}
}

#[cfg(test)]
mod tests {
use super::*;
use core::task::{RawWaker, RawWakerVTable, Waker};

fn noop_waker() -> Waker {
const VTABLE: RawWakerVTable = RawWakerVTable::new(
|_| RawWaker::new(core::ptr::null(), &VTABLE),
|_| {},
|_| {},
|_| {},
);
// SAFETY: The vtable functions are all no-ops and never
// dereference the pointer; null is safe here.
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &VTABLE)) }
}

#[test]
fn scaffold_starts_at_zero() {
let task = DrainTask::new();
assert_eq!(task.drained(), 0);
}

#[test]
fn drain_is_pending_in_scaffold() {
let mut task = DrainTask::new();
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(task.drain(&mut cx), Poll::Pending));
}
}
71 changes: 62 additions & 9 deletions crates/lance-graph-callcenter/src/lance_membrane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,18 @@

use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc, RwLock,
RwLock,
};

#[cfg(not(feature = "realtime"))]
use std::sync::mpsc;

#[cfg(feature = "realtime")]
use tokio::sync::watch;

#[cfg(feature = "realtime")]
use crate::version_watcher::LanceVersionWatcher;

use lance_graph_contract::{
a2a_blackboard::ExpertId,
cognitive_shader::{MetaWord, ShaderBus},
Expand All @@ -54,6 +63,9 @@ pub struct LanceMembrane {
current_scent: AtomicU64,
current_rationale_phase: AtomicBool, // MM-CoT stage: true = Stage 1 rationale
version: AtomicU64,
/// Fan-out watcher for projected cognitive events ([realtime] feature only).
#[cfg(feature = "realtime")]
watcher: LanceVersionWatcher,
}

impl LanceMembrane {
Expand All @@ -65,6 +77,8 @@ impl LanceMembrane {
current_scent: AtomicU64::new(0),
current_rationale_phase: AtomicBool::new(false),
version: AtomicU64::new(0),
#[cfg(feature = "realtime")]
watcher: LanceVersionWatcher::default(),
}
}

Expand Down Expand Up @@ -109,12 +123,15 @@ impl ExternalMembrane for LanceMembrane {
/// External consumer intent entering through the gate.
type Intent = ExternalIntent;

/// Phase-A subscription stub: a disconnected `mpsc::Receiver<u64>`.
/// Subscription handle for projected cognitive events.
///
/// Callers that `recv()` on this immediately get `Err(RecvError)`.
/// Phase D replaces this with a `tokio::sync::watch::Receiver<u64>`
/// wired to the Lance version counter, filtered by `CommitFilter`.
/// With `[realtime]` feature: a `tokio::sync::watch::Receiver<CognitiveEventRow>`
/// wired to `LanceVersionWatcher` — always-latest semantics, supabase-shape.
/// Without `[realtime]`: a disconnected `mpsc::Receiver<u64>` stub (Phase A).
#[cfg(not(feature = "realtime"))]
type Subscription = mpsc::Receiver<u64>;
#[cfg(feature = "realtime")]
type Subscription = watch::Receiver<CognitiveEventRow>;

/// Project a committed ShaderBus cycle to a scalar row.
///
Expand All @@ -126,7 +143,7 @@ impl ExternalMembrane for LanceMembrane {
let expert = *self.current_expert.read().expect("expert poisoned");
let scent = self.current_scent.load(Ordering::Relaxed) as u8;

CognitiveEventRow {
let row = CognitiveEventRow {
external_role: role,
faculty_role: faculty,
expert_id: expert,
Expand All @@ -144,7 +161,13 @@ impl ExternalMembrane for LanceMembrane {
gate_commit: bus.gate.is_flow(),
gate_f: meta.free_e(),
rationale_phase: self.current_rationale_phase.load(Ordering::Relaxed),
}
};

// DM-4: fan out to all current subscribers (supabase-shape realtime).
#[cfg(feature = "realtime")]
self.watcher.bump(row.clone());

row
}

/// Translate external intent to canonical dispatch.
Expand Down Expand Up @@ -181,12 +204,19 @@ impl ExternalMembrane for LanceMembrane {

/// Subscribe to projected commits matching the filter.
///
/// Phase A: returns a disconnected channel (recv immediately errors).
/// Phase D: wires to `tokio::sync::watch` + `CommitFilter` predicate.
/// With `[realtime]`: returns a `watch::Receiver<CognitiveEventRow>` seeded
/// with the latest committed row. Always-latest semantics (supabase-shape).
/// Without `[realtime]`: returns a disconnected `mpsc::Receiver<u64>` stub.
#[cfg(not(feature = "realtime"))]
fn subscribe(&self, _filter: CommitFilter) -> mpsc::Receiver<u64> {
let (_tx, rx) = mpsc::channel();
rx
}

#[cfg(feature = "realtime")]
fn subscribe(&self, _filter: CommitFilter) -> watch::Receiver<CognitiveEventRow> {
self.watcher.subscribe()
Comment on lines +217 to +218

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Honor CommitFilter in realtime subscribe path

The realtime subscribe implementation ignores the provided CommitFilter and always returns an unfiltered watcher receiver. Because this commit turns subscriptions into live event delivery, callers that request actor/free-energy/commit filtering now receive every row, which breaks the contract implied by subscribe(filter) and can leak unrelated events to subscribers that expected scoped data.

Useful? React with 👍 / 👎.

}
}

// ─────────────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -263,6 +293,8 @@ mod tests {
assert_eq!(step.step_type, "lg.blackboard.context");
}

/// Phase A (no realtime feature): subscription is a disconnected stub.
#[cfg(not(feature = "realtime"))]
#[test]
fn subscribe_returns_disconnected_receiver() {
let m = LanceMembrane::new();
Expand All @@ -271,6 +303,27 @@ mod tests {
assert!(rx.try_recv().is_err());
}

/// Phase D (realtime feature): subscribe() → project() → rx.borrow() sees the row.
#[cfg(feature = "realtime")]
#[test]
fn subscribe_receives_on_project() {
let m = LanceMembrane::new();
let rx = m.subscribe(CommitFilter::default());

// Prime role context and call project()
let intent = ExternalIntent::seed(ExternalRole::CrewaiAgent, make_dn(), vec![]);
m.ingest(intent);

let bus = ShaderBus::empty();
let meta = MetaWord::new(7, 3, 200, 150, 10);
m.project(&bus, meta);

// The watcher should have delivered the row
let snapshot = rx.borrow();
assert_eq!(snapshot.thinking, 7, "subscriber should see the projected row");
assert_eq!(snapshot.external_role, ExternalRole::CrewaiAgent as u8);
}

#[test]
fn set_faculty_context_wires_rationale_phase() {
let m = LanceMembrane::new();
Expand Down
6 changes: 3 additions & 3 deletions crates/lance-graph-callcenter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ pub use vsa_udfs::register_vsa_udfs;
pub mod filter_expr;

// DM-4 — LanceVersionWatcher: tail version counter → Phoenix events ([realtime])
// #[cfg(feature = "realtime")]
// pub mod version_watcher;
#[cfg(feature = "realtime")]
pub mod version_watcher;

// DM-5 — PhoenixServer: minimal WS server, Phoenix channel subset ([realtime])
// #[cfg(feature = "realtime")]
// pub mod phoenix;

// DM-6 — DrainTask: steering_intent → UnifiedStep → OrchestrationBridge
// pub mod drain;
pub mod drain;

// DM-7 — JwtMiddleware + ActorContext → LogicalPlan RLS rewriter ([auth])
// Resolve UNKNOWN-3 (pgwire?) and UNKNOWN-4 (actor_id type) first.
Expand Down
Loading
Loading