diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cf82af1..5524d48b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ Cross-package release notes for relayburn. Package changelogs contain package-le ### Changed +- `relayburn-sdk`: ingest verbs are now synchronous; async callers should + run them via `tokio::task::spawn_blocking`. - `relayburn-sdk`: lower per-record allocations in reader hashing, tool-result sizing, relationship dedup, and project resolution. Cuts overhead during large session imports and concurrent `resolve_project` calls. @@ -15,6 +17,10 @@ Cross-package release notes for relayburn. Package changelogs contain package-le JSON lines lacking a trailing newline still surface in the single-shot output. +### Removed + +- `relayburn-sdk`: removed `run_ingest_tick`. + ## [2.8.3] - 2026-05-11 ### Changed diff --git a/crates/relayburn-cli/src/commands/hotspots.rs b/crates/relayburn-cli/src/commands/hotspots.rs index c560a041..fbb05f67 100644 --- a/crates/relayburn-cli/src/commands/hotspots.rs +++ b/crates/relayburn-cli/src/commands/hotspots.rs @@ -228,12 +228,9 @@ fn run_inner(globals: &GlobalArgs, args: HotspotsArgs) -> anyhow::Result { progress.set_task("opening ledger"); let mut handle = Ledger::open(opts)?; - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; progress.set_task("refreshing ledger"); let raw_opts = progress.ingest_options(ledger_home.clone()); - rt.block_on(ingest_all(handle.raw_mut(), &raw_opts))?; + ingest_all(handle.raw_mut(), &raw_opts)?; drop(handle); let session_filter = match args.session.as_deref() { diff --git a/crates/relayburn-cli/src/commands/ingest.rs b/crates/relayburn-cli/src/commands/ingest.rs index f4074e55..0d71752b 100644 --- a/crates/relayburn-cli/src/commands/ingest.rs +++ b/crates/relayburn-cli/src/commands/ingest.rs @@ -74,8 +74,7 @@ pub fn run(globals: &GlobalArgs, args: IngestArgs) -> i32 { } /// One-shot scan: open the ledger, run a single `ingest_all`, log the -/// summary, exit. Drives a current-thread tokio runtime so the otherwise -/// sync presenter can drive the async SDK verb. +/// summary, exit. /// /// Summary line is emitted on **stdout** (matching TS `runIngestOnce` /// at `packages/cli/src/commands/ingest.ts:121-126`) so callers can @@ -92,20 +91,9 @@ fn run_once(globals: &GlobalArgs, quiet: bool) -> i32 { return report_error(&err, globals); } }; - progress.set_task("starting runtime"); - let rt = match tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - { - Ok(rt) => rt, - Err(err) => { - progress.finish_and_clear(); - return report_error(&err, globals); - } - }; progress.set_task("scanning sessions"); let opts = progress.ingest_options(globals.ledger_path.clone()); - let result = rt.block_on(ingest_all(handle.raw_mut(), &opts)); + let result = ingest_all(handle.raw_mut(), &opts); progress.finish_and_clear(); match result { Ok(report) => { @@ -203,7 +191,7 @@ fn run_watch(globals: &GlobalArgs, args: &IngestArgs) -> i32 { } else { progress.ingest_options(ledger_home) }; - let result = ingest_all(guard.raw_mut(), &opts).await; + let result = ingest_all(guard.raw_mut(), &opts); progress.set_task(watch_message); result }) @@ -322,22 +310,6 @@ fn run_hook(globals: &GlobalArgs, hook: &str, quiet: bool) -> i32 { return 0; } }; - if let Some(progress) = &progress { - progress.set_task("starting runtime"); - } - let rt = match tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - { - Ok(rt) => rt, - Err(err) => { - if let Some(progress) = &progress { - progress.finish_and_clear(); - } - eprintln!("[burn] ingest: {err}"); - return 0; - } - }; if let Some(progress) = &progress { progress.set_task("scanning sessions"); } @@ -345,7 +317,7 @@ fn run_hook(globals: &GlobalArgs, hook: &str, quiet: bool) -> i32 { Some(progress) => progress.ingest_options(globals.ledger_path.clone()), None => TaskProgress::quiet_ingest_options(globals.ledger_path.clone()), }; - let result = rt.block_on(ingest_all(handle.raw_mut(), &opts)); + let result = ingest_all(handle.raw_mut(), &opts); if let Some(progress) = &progress { progress.finish_and_clear(); } diff --git a/crates/relayburn-cli/src/commands/state.rs b/crates/relayburn-cli/src/commands/state.rs index be6f1998..5bffd496 100644 --- a/crates/relayburn-cli/src/commands/state.rs +++ b/crates/relayburn-cli/src/commands/state.rs @@ -488,10 +488,7 @@ fn run_reset(globals: &GlobalArgs, args: crate::cli::StateResetArgs) -> i32 { print_reset_report(globals, &summary, /*executed=*/ true, ingest_report.as_ref()) } -/// Drive a single `ingest_all` sweep on the open handle. Mirrors the -/// `run_ingest` helper in `commands/summary.rs`: the SDK verb is async, -/// so we spin a current-thread tokio runtime to drive it from this -/// otherwise-sync presenter. +/// Drive a single `ingest_all` sweep on the open handle. /// /// `ledger_home` propagates the global `--ledger-path` override into /// `RawIngestOptions::ledger_home` so sidecar ingest state (config and @@ -504,12 +501,9 @@ fn run_reset_reingest( ledger_home: Option, progress: &TaskProgress, ) -> anyhow::Result { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; progress.set_task("re-ingesting sessions"); let opts = progress.ingest_options(ledger_home); - rt.block_on(ingest_all(handle.raw_mut(), &opts)) + ingest_all(handle.raw_mut(), &opts) } fn print_reset_report( diff --git a/crates/relayburn-cli/src/commands/summary.rs b/crates/relayburn-cli/src/commands/summary.rs index fcecdde7..b5e1830d 100644 --- a/crates/relayburn-cli/src/commands/summary.rs +++ b/crates/relayburn-cli/src/commands/summary.rs @@ -325,27 +325,18 @@ fn parse_tag_filters(tags: &[String]) -> anyhow::Result Ok(out) } -/// Run an ingest sweep on the open handle. Builds a current-thread tokio -/// runtime so the otherwise-sync presenter can drive the async verb. +/// Run an ingest sweep on the open handle. fn run_ingest( handle: &mut LedgerHandle, progress: &TaskProgress, ledger_home: Option, ) -> anyhow::Result { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .map_err(|err| { - progress.finish_and_clear(); - err - })?; progress.set_task("refreshing ledger"); let opts = progress.ingest_options(ledger_home); - rt.block_on(ingest_all(handle.raw_mut(), &opts)) - .map_err(|err| { - progress.finish_and_clear(); - err - }) + ingest_all(handle.raw_mut(), &opts).map_err(|err| { + progress.finish_and_clear(); + err + }) } const COVERAGE_FIELDS: [CoverageField; 5] = [ diff --git a/crates/relayburn-cli/src/harnesses/claude.rs b/crates/relayburn-cli/src/harnesses/claude.rs index edc52c7e..64b4a7fc 100644 --- a/crates/relayburn-cli/src/harnesses/claude.rs +++ b/crates/relayburn-cli/src/harnesses/claude.rs @@ -146,7 +146,7 @@ impl HarnessAdapter for ClaudeAdapter { let mut handle = Ledger::open(LedgerOpenOptions::default())?; let cwd_str = ctx.cwd.to_string_lossy().into_owned(); let opts = RawIngestOptions::default(); - ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts).await + ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts) } } diff --git a/crates/relayburn-cli/src/harnesses/codex.rs b/crates/relayburn-cli/src/harnesses/codex.rs index 65f3669b..6dea3d21 100644 --- a/crates/relayburn-cli/src/harnesses/codex.rs +++ b/crates/relayburn-cli/src/harnesses/codex.rs @@ -10,13 +10,12 @@ //! * `ingest_sessions` — defers to [`relayburn_sdk::ingest_codex_sessions`], //! the codex-only ingest pass. The factory opens a fresh ledger handle //! per call (mirrors the TS lock-then-write-then-close shape; SQLite WAL -//! keeps the per-tick open cheap). +//! keeps the per-tick open cheap). The SDK verb is sync, so we pass it +//! directly as a fn pointer to [`pending_stamp::session_store_adapter`]. -use std::future::Future; use std::path::PathBuf; -use std::pin::Pin; -use relayburn_sdk::{ingest_codex_sessions, IngestReport, RawIngestOptions, RawLedger}; +use relayburn_sdk::ingest_codex_sessions; use super::pending_stamp; use super::HarnessAdapter; @@ -29,20 +28,11 @@ fn codex_sessions_dir() -> PathBuf { home_dir().join(".codex").join("sessions") } -/// Box-pin the SDK's `async fn ingest_codex_sessions` into a fn pointer -/// the [`pending_stamp::SessionIngestor`] type alias accepts. -fn codex_ingest<'a>( - ledger: &'a mut RawLedger, - opts: &'a RawIngestOptions, -) -> Pin> + Send + 'a>> { - Box::pin(ingest_codex_sessions(ledger, opts)) -} - /// Hand out a `&'static dyn HarnessAdapter` for codex. The registry calls /// this once at lazy-init time. See /// [`pending_stamp::session_store_adapter`] for the leak semantics. pub fn adapter() -> &'static dyn HarnessAdapter { - pending_stamp::session_store_adapter("codex", codex_sessions_dir, codex_ingest) + pending_stamp::session_store_adapter("codex", codex_sessions_dir, ingest_codex_sessions) } #[cfg(test)] diff --git a/crates/relayburn-cli/src/harnesses/opencode.rs b/crates/relayburn-cli/src/harnesses/opencode.rs index 00d08a27..8488c5d0 100644 --- a/crates/relayburn-cli/src/harnesses/opencode.rs +++ b/crates/relayburn-cli/src/harnesses/opencode.rs @@ -13,13 +13,12 @@ //! [`relayburn_sdk::ingest_opencode_sessions`], the opencode-only ingest //! pass. The factory opens a fresh ledger handle per call (mirrors the //! TS lock-then-write-then-close shape; SQLite WAL keeps the per-tick -//! open cheap). +//! open cheap). The SDK verb is sync, so we pass it directly as a fn +//! pointer to [`pending_stamp::session_store_adapter`]. -use std::future::Future; use std::path::PathBuf; -use std::pin::Pin; -use relayburn_sdk::{ingest_opencode_sessions, IngestReport, RawIngestOptions, RawLedger}; +use relayburn_sdk::ingest_opencode_sessions; use super::pending_stamp; use super::HarnessAdapter; @@ -36,20 +35,11 @@ fn opencode_sessions_dir() -> PathBuf { .join("session") } -/// Box-pin the SDK's `async fn ingest_opencode_sessions` into a fn -/// pointer the [`pending_stamp::SessionIngestor`] type alias accepts. -fn opencode_ingest<'a>( - ledger: &'a mut RawLedger, - opts: &'a RawIngestOptions, -) -> Pin> + Send + 'a>> { - Box::pin(ingest_opencode_sessions(ledger, opts)) -} - /// Hand out a `&'static dyn HarnessAdapter` for opencode. The registry /// calls this once at lazy-init time. See /// [`pending_stamp::session_store_adapter`] for the leak semantics. pub fn adapter() -> &'static dyn HarnessAdapter { - pending_stamp::session_store_adapter("opencode", opencode_sessions_dir, opencode_ingest) + pending_stamp::session_store_adapter("opencode", opencode_sessions_dir, ingest_opencode_sessions) } #[cfg(test)] diff --git a/crates/relayburn-cli/src/harnesses/pending_stamp.rs b/crates/relayburn-cli/src/harnesses/pending_stamp.rs index 9a617f7e..8d6a1c4a 100644 --- a/crates/relayburn-cli/src/harnesses/pending_stamp.rs +++ b/crates/relayburn-cli/src/harnesses/pending_stamp.rs @@ -131,15 +131,13 @@ pub fn adapter_static(config: PendingStampAdapter) -> &'static dyn HarnessAdapte Box::leak(Box::new(PendingStampAdapterImpl::new(config))) } -/// Async fn pointer for an SDK session ingestor (`ingest_codex_sessions`, -/// `ingest_opencode_sessions`). The shape matches both per-harness ingest -/// passes verbatim — they live in `relayburn_sdk` as `async fn`, and the -/// per-call `Box::pin` adaptation happens at the call site so the helper +/// Fn pointer for an SDK session ingestor (`ingest_codex_sessions`, +/// `ingest_opencode_sessions`). Both verbs are sync in the SDK; the +/// per-tick `Box::pin` adaptation that drops them into [`IngestSessionsFn`] +/// happens at the call site in [`session_store_adapter`] so the helper /// stays a fn pointer (no per-tick closure allocation). -pub type SessionIngestor = for<'a> fn( - &'a mut RawLedger, - &'a RawIngestOptions, -) -> Pin> + Send + 'a>>; +pub type SessionIngestor = + fn(&mut RawLedger, &RawIngestOptions) -> anyhow::Result; /// One-call factory for pending-stamp adapters whose only differences are /// the harness name, the session-root resolver, and which SDK ingest pass @@ -174,7 +172,7 @@ pub fn session_store_adapter( ledger_home, ..RawIngestOptions::default() }; - ingestor(handle.raw_mut(), &opts).await + ingestor(handle.raw_mut(), &opts) }) }); adapter_static(PendingStampAdapter::new(name, session_root, ingest_sessions)) diff --git a/crates/relayburn-sdk-node/src/lib.rs b/crates/relayburn-sdk-node/src/lib.rs index 9c605869..8c72013c 100644 --- a/crates/relayburn-sdk-node/src/lib.rs +++ b/crates/relayburn-sdk-node/src/lib.rs @@ -1261,8 +1261,12 @@ pub async fn ingest(opts: Option) -> Result anyhow::Result<()> { +//! # fn run() -> anyhow::Result<()> { //! let mut ledger = RawLedger::open_default()?; -//! let report = ingest_all(&mut ledger, &RawIngestOptions::default()).await?; +//! let report = ingest_all(&mut ledger, &RawIngestOptions::default())?; //! println!("ingested {} turns", report.appended_turns); //! # Ok(()) } //! ``` @@ -89,6 +89,6 @@ pub use pending_stamps::{ pub use reingest::{derive_codex_session_id, reingest_missing_content, ReingestContentReport}; pub use walk::{walk_jsonl, walk_opencode_sessions}; pub use watch_loop::{ - run_ingest_tick, start_watch_loop, ErrorSink, IngestFn, ReportSink, StartWatchLoopOptions, - WatchController, DEFAULT_FS_DEBOUNCE, DEFAULT_SLOW_FALLBACK, + start_watch_loop, ErrorSink, IngestFn, ReportSink, StartWatchLoopOptions, WatchController, + DEFAULT_FS_DEBOUNCE, DEFAULT_SLOW_FALLBACK, }; diff --git a/crates/relayburn-sdk/src/ingest/gap_warning_tests.rs b/crates/relayburn-sdk/src/ingest/gap_warning_tests.rs index 5a9011b6..dbcd46e5 100644 --- a/crates/relayburn-sdk/src/ingest/gap_warning_tests.rs +++ b/crates/relayburn-sdk/src/ingest/gap_warning_tests.rs @@ -194,7 +194,7 @@ async fn gap_warning_fires_once_then_suppressed_for_claude() { })), ..Default::default() }; - ingest_all(&mut ledger, &opts_with_warn).await.unwrap(); + ingest_all(&mut ledger, &opts_with_warn).unwrap(); let first_warnings = warn_log.lock().unwrap().clone(); assert_eq!( @@ -224,7 +224,7 @@ async fn gap_warning_fires_once_then_suppressed_for_claude() { })), ..Default::default() }; - ingest_all(&mut ledger, &opts_with_warn2).await.unwrap(); + ingest_all(&mut ledger, &opts_with_warn2).unwrap(); let second_warnings = warn_log2.lock().unwrap().clone(); assert!( @@ -315,7 +315,7 @@ async fn no_gap_warning_for_chat_only_claude_session() { })), ..Default::default() }; - ingest_all(&mut ledger, &opts).await.unwrap(); + ingest_all(&mut ledger, &opts).unwrap(); }) .await; @@ -385,7 +385,7 @@ async fn gap_warning_fires_once_then_suppressed_for_codex() { })), ..Default::default() }; - ingest_all(&mut ledger, &opts1).await.unwrap(); + ingest_all(&mut ledger, &opts1).unwrap(); let first = warn1.lock().unwrap().clone(); assert_eq!( first.len(), @@ -413,7 +413,7 @@ async fn gap_warning_fires_once_then_suppressed_for_codex() { })), ..Default::default() }; - ingest_all(&mut ledger, &opts2).await.unwrap(); + ingest_all(&mut ledger, &opts2).unwrap(); assert!( warn2.lock().unwrap().is_empty(), "second ingest must stay silent for unchanged codex set" @@ -530,7 +530,7 @@ async fn gap_warning_fires_once_then_suppressed_for_opencode() { })), ..Default::default() }; - ingest_all(&mut ledger, &opts1).await.unwrap(); + ingest_all(&mut ledger, &opts1).unwrap(); let first = warn1.lock().unwrap().clone(); assert_eq!( first.len(), @@ -558,7 +558,7 @@ async fn gap_warning_fires_once_then_suppressed_for_opencode() { })), ..Default::default() }; - ingest_all(&mut ledger, &opts2).await.unwrap(); + ingest_all(&mut ledger, &opts2).unwrap(); assert!( warn2.lock().unwrap().is_empty(), "second ingest must stay silent for unchanged opencode set" @@ -607,9 +607,7 @@ async fn per_harness_then_ingest_all_keeps_each_adapter_isolated() { })), ..Default::default() }; - ingest_codex_sessions(&mut ledger, &opts_codex) - .await - .unwrap(); + ingest_codex_sessions(&mut ledger, &opts_codex).unwrap(); let after_codex = warn1.lock().unwrap().clone(); assert_eq!( after_codex.len(), @@ -644,7 +642,7 @@ async fn per_harness_then_ingest_all_keeps_each_adapter_isolated() { })), ..Default::default() }; - ingest_all(&mut ledger, &opts_all).await.unwrap(); + ingest_all(&mut ledger, &opts_all).unwrap(); let after_all = warn2.lock().unwrap().clone(); let claude_count = after_all .iter() @@ -708,9 +706,7 @@ async fn per_harness_verbs_emit_gap_warnings() { })), ..Default::default() }; - ingest_claude_projects(&mut ledger, &opts_claude) - .await - .unwrap(); + ingest_claude_projects(&mut ledger, &opts_claude).unwrap(); let claude = warn_claude.lock().unwrap().clone(); assert_eq!(claude.len(), 1, "claude per-harness verb fires once"); assert!(claude[0].starts_with("claude:")); @@ -724,9 +720,7 @@ async fn per_harness_verbs_emit_gap_warnings() { })), ..Default::default() }; - ingest_opencode_sessions(&mut ledger, &opts_oc) - .await - .unwrap(); + ingest_opencode_sessions(&mut ledger, &opts_oc).unwrap(); let oc = warn_oc.lock().unwrap().clone(); assert_eq!(oc.len(), 1, "opencode per-harness verb fires once"); assert!(oc[0].starts_with("opencode:")); diff --git a/crates/relayburn-sdk/src/ingest/ingest.rs b/crates/relayburn-sdk/src/ingest/ingest.rs index 27a0dca2..3dd48fb0 100644 --- a/crates/relayburn-sdk/src/ingest/ingest.rs +++ b/crates/relayburn-sdk/src/ingest/ingest.rs @@ -184,7 +184,7 @@ fn resolve_content_mode(ledger_home: Option<&Path>) -> ContentStoreMode { /// Ingest every known session store once. Cleans stale pending stamps, /// loads cursors, walks Claude/Codex/OpenCode in turn, then persists any /// cursor mutations. Returns the merged report. -pub async fn ingest_all(ledger: &mut Ledger, opts: &IngestOptions) -> anyhow::Result { +pub fn ingest_all(ledger: &mut Ledger, opts: &IngestOptions) -> anyhow::Result { progress(opts, "cleaning pending spawn stamps"); cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?; progress(opts, "loading ingest cursors"); @@ -237,7 +237,7 @@ pub async fn ingest_all(ledger: &mut Ledger, opts: &IngestOptions) -> anyhow::Re Ok(report) } -pub async fn ingest_claude_projects( +pub fn ingest_claude_projects( ledger: &mut Ledger, opts: &IngestOptions, ) -> anyhow::Result { @@ -262,7 +262,7 @@ pub async fn ingest_claude_projects( Ok(report) } -pub async fn ingest_codex_sessions( +pub fn ingest_codex_sessions( ledger: &mut Ledger, opts: &IngestOptions, ) -> anyhow::Result { @@ -287,7 +287,7 @@ pub async fn ingest_codex_sessions( Ok(report) } -pub async fn ingest_opencode_sessions( +pub fn ingest_opencode_sessions( ledger: &mut Ledger, opts: &IngestOptions, ) -> anyhow::Result { @@ -315,7 +315,7 @@ pub async fn ingest_opencode_sessions( /// Per-session fast-path used when a Claude launcher already knows the /// sessionId from the spawn plan. We go straight to the one JSONL file and /// persist a cursor at EOF — a later `ingest_all` sweep then skips it. -pub async fn ingest_claude_session( +pub fn ingest_claude_session( ledger: &mut Ledger, cwd: &str, session_id: &str, diff --git a/crates/relayburn-sdk/src/ingest/orchestration_tests.rs b/crates/relayburn-sdk/src/ingest/orchestration_tests.rs index 4b551a6b..aa694606 100644 --- a/crates/relayburn-sdk/src/ingest/orchestration_tests.rs +++ b/crates/relayburn-sdk/src/ingest/orchestration_tests.rs @@ -20,12 +20,9 @@ //! per-harness roots in [`IngestRoots`] so a stray `~/.claude/projects/` //! on the developer's machine can't be picked up. //! -//! `#[tokio::test]` defaults to `current_thread`, so the `std::sync::Mutex` -//! guard held across an `.await` is sound (no other task can run on the -//! same thread). Clippy's `await_holding_lock` warns by default; we silence -//! it at module level rather than per-test. - -#![allow(clippy::await_holding_lock)] +//! Synchronous `#[test]` runners, so a `std::sync::Mutex` guard held for +//! the duration of a test body is sound — no other test runs concurrently +//! against the same process env. use std::collections::HashSet; use std::fs; @@ -128,8 +125,8 @@ fn pinned_roots(tmp: &TempDir) -> IngestRoots { } } -#[tokio::test] -async fn ingest_claude_projects_round_trips_a_fixture_session() { +#[test] +fn ingest_claude_projects_round_trips_a_fixture_session() { let tmp = TempDir::new().unwrap(); let _env = isolated_relayburn_home(&tmp); let roots = pinned_roots(&tmp); @@ -149,7 +146,7 @@ async fn ingest_claude_projects_round_trips_a_fixture_session() { roots, ..Default::default() }; - let report = ingest_claude_projects(&mut ledger, &opts).await.unwrap(); + let report = ingest_claude_projects(&mut ledger, &opts).unwrap(); assert!(report.appended_turns >= 1, "expected ≥1 turn ingested"); assert!(report.ingested_sessions >= 1); @@ -167,8 +164,8 @@ async fn ingest_claude_projects_round_trips_a_fixture_session() { } } -#[tokio::test] -async fn ingest_claude_projects_resolves_pending_stamp_tags() { +#[test] +fn ingest_claude_projects_resolves_pending_stamp_tags() { let tmp = TempDir::new().unwrap(); let _env = isolated_relayburn_home(&tmp); let roots = pinned_roots(&tmp); @@ -201,7 +198,7 @@ async fn ingest_claude_projects_resolves_pending_stamp_tags() { roots, ..Default::default() }; - let report = ingest_claude_projects(&mut ledger, &opts).await.unwrap(); + let report = ingest_claude_projects(&mut ledger, &opts).unwrap(); assert!(report.appended_turns >= 1, "expected >=1 turn ingested"); assert_eq!(report.applied_pending_stamps, 1); @@ -215,8 +212,8 @@ async fn ingest_claude_projects_resolves_pending_stamp_tags() { assert_eq!(turns[0].turn.session_id, sid); } -#[tokio::test] -async fn ingest_codex_sessions_round_trips_a_fixture_session() { +#[test] +fn ingest_codex_sessions_round_trips_a_fixture_session() { let tmp = TempDir::new().unwrap(); let _env = isolated_relayburn_home(&tmp); let roots = pinned_roots(&tmp); @@ -232,7 +229,7 @@ async fn ingest_codex_sessions_round_trips_a_fixture_session() { roots, ..Default::default() }; - let report = ingest_codex_sessions(&mut ledger, &opts).await.unwrap(); + let report = ingest_codex_sessions(&mut ledger, &opts).unwrap(); assert!(report.appended_turns >= 1, "expected ≥1 codex turn"); let turns = ledger .query_turns(&Query::for_session("sess_simple_1")) @@ -247,8 +244,8 @@ async fn ingest_codex_sessions_round_trips_a_fixture_session() { } } -#[tokio::test] -async fn ingest_opencode_sessions_round_trips_a_fixture_session() { +#[test] +fn ingest_opencode_sessions_round_trips_a_fixture_session() { let tmp = TempDir::new().unwrap(); let _env = isolated_relayburn_home(&tmp); let roots = pinned_roots(&tmp); @@ -265,7 +262,7 @@ async fn ingest_opencode_sessions_round_trips_a_fixture_session() { roots, ..Default::default() }; - let report = ingest_opencode_sessions(&mut ledger, &opts).await.unwrap(); + let report = ingest_opencode_sessions(&mut ledger, &opts).unwrap(); assert!( report.appended_turns >= 1, "expected ≥1 opencode turn (got {})", @@ -284,8 +281,8 @@ async fn ingest_opencode_sessions_round_trips_a_fixture_session() { } } -#[tokio::test] -async fn ingest_all_walks_each_harness_root_once() { +#[test] +fn ingest_all_walks_each_harness_root_once() { let tmp = TempDir::new().unwrap(); let _env = isolated_relayburn_home(&tmp); let roots = pinned_roots(&tmp); @@ -326,7 +323,7 @@ async fn ingest_all_walks_each_harness_root_once() { roots, ..Default::default() }; - let report = ingest_all(&mut ledger, &opts).await.unwrap(); + let report = ingest_all(&mut ledger, &opts).unwrap(); assert!( report.appended_turns >= 3, "expected ≥3 turns total across the three harnesses (got {})", @@ -353,8 +350,8 @@ async fn ingest_all_walks_each_harness_root_once() { ); } -#[tokio::test] -async fn ingest_claude_session_writes_eof_cursor_so_followup_skips_file() { +#[test] +fn ingest_claude_session_writes_eof_cursor_so_followup_skips_file() { let tmp = TempDir::new().unwrap(); let _env = isolated_relayburn_home(&tmp); let roots = pinned_roots(&tmp); @@ -377,9 +374,7 @@ async fn ingest_claude_session_writes_eof_cursor_so_followup_skips_file() { ..Default::default() }; - let r = ingest_claude_session(&mut ledger, cwd, sid, &opts) - .await - .unwrap(); + let r = ingest_claude_session(&mut ledger, cwd, sid, &opts).unwrap(); assert!(r.appended_turns >= 1, "expected ≥1 turn appended"); assert_eq!(r.ingested_sessions, 1); @@ -396,7 +391,7 @@ async fn ingest_claude_session_writes_eof_cursor_so_followup_skips_file() { // A subsequent ingest_all sweep with the same file content must skip // it — appendedTurns should not go up. let before_count = ledger.query_turns(&Query::for_session(sid)).unwrap().len(); - let r2 = ingest_all(&mut ledger, &opts).await.unwrap(); + let r2 = ingest_all(&mut ledger, &opts).unwrap(); let after_count = ledger.query_turns(&Query::for_session(sid)).unwrap().len(); assert_eq!( before_count, after_count, diff --git a/crates/relayburn-sdk/src/ingest/reingest.rs b/crates/relayburn-sdk/src/ingest/reingest.rs index 5f0fd014..a4f82cde 100644 --- a/crates/relayburn-sdk/src/ingest/reingest.rs +++ b/crates/relayburn-sdk/src/ingest/reingest.rs @@ -65,7 +65,7 @@ pub struct ReingestContentReport { /// cursors, ledger turns, or compaction events. /// /// Mirrors TS `reingestMissingContent`. -pub async fn reingest_missing_content( +pub fn reingest_missing_content( ledger: &mut Ledger, opts: &IngestOptions, ) -> anyhow::Result { @@ -502,8 +502,8 @@ mod tests { file } - #[tokio::test] - async fn skips_session_with_both_content_and_user_turn_already_present() { + #[test] + fn skips_session_with_both_content_and_user_turn_already_present() { let tmp = TempDir::new().unwrap(); let mut ledger = open_ledger(&tmp); @@ -528,7 +528,7 @@ mod tests { opencode_storage_dir: Some(home.path().join(".local/share/opencode/storage")), }); - let report = reingest_missing_content(&mut ledger, &opts).await.unwrap(); + let report = reingest_missing_content(&mut ledger, &opts).unwrap(); assert_eq!(report.scanned_files, 1); assert_eq!(report.skipped_existing, 1); assert_eq!(report.reingested_sessions, 0); @@ -536,8 +536,8 @@ mod tests { assert_eq!(report.appended_user_turns, 0); } - #[tokio::test] - async fn reparses_session_with_no_existing_records() { + #[test] + fn reparses_session_with_no_existing_records() { let tmp = TempDir::new().unwrap(); let mut ledger = open_ledger(&tmp); @@ -553,7 +553,7 @@ mod tests { opencode_storage_dir: Some(home.path().join(".local/share/opencode/storage")), }); - let report = reingest_missing_content(&mut ledger, &opts).await.unwrap(); + let report = reingest_missing_content(&mut ledger, &opts).unwrap(); assert_eq!(report.scanned_files, 1); assert_eq!(report.skipped_existing, 0); assert!(report.appended_user_turns >= 1); @@ -561,15 +561,15 @@ mod tests { // After the run, the AND-skip filter sees both sides covered, so // a re-run skips the same file. - let report2 = reingest_missing_content(&mut ledger, &opts).await.unwrap(); + let report2 = reingest_missing_content(&mut ledger, &opts).unwrap(); assert_eq!(report2.scanned_files, 1); assert_eq!(report2.skipped_existing, 1); assert_eq!(report2.appended_content, 0); assert_eq!(report2.appended_user_turns, 0); } - #[tokio::test] - async fn backfills_user_turn_when_only_content_exists() { + #[test] + fn backfills_user_turn_when_only_content_exists() { // Mirrors the TS "rebuild content backfills user-turn rows even // when content already exists" case: pre-seed the content side // only and verify the verb appends a user-turn row even though @@ -594,7 +594,7 @@ mod tests { opencode_storage_dir: Some(home.path().join(".local/share/opencode/storage")), }); - let report = reingest_missing_content(&mut ledger, &opts).await.unwrap(); + let report = reingest_missing_content(&mut ledger, &opts).unwrap(); assert_eq!(report.scanned_files, 1); assert_eq!(report.skipped_existing, 0); // Content appended should be 0 (existing_content has the @@ -604,8 +604,8 @@ mod tests { assert!(report.appended_user_turns >= 1); } - #[tokio::test] - async fn missing_session_dirs_are_silently_skipped() { + #[test] + fn missing_session_dirs_are_silently_skipped() { let tmp = TempDir::new().unwrap(); let mut ledger = open_ledger(&tmp); @@ -616,7 +616,7 @@ mod tests { opencode_storage_dir: Some(home.path().join(".local/share/opencode/storage")), }); - let report = reingest_missing_content(&mut ledger, &opts).await.unwrap(); + let report = reingest_missing_content(&mut ledger, &opts).unwrap(); assert_eq!(report, ReingestContentReport::default()); } diff --git a/crates/relayburn-sdk/src/ingest/watch_loop.rs b/crates/relayburn-sdk/src/ingest/watch_loop.rs index 805ab370..98336330 100644 --- a/crates/relayburn-sdk/src/ingest/watch_loop.rs +++ b/crates/relayburn-sdk/src/ingest/watch_loop.rs @@ -256,29 +256,22 @@ impl WatchController { /// We do NOT `abort()` the spawned task — that would cut a tick off /// mid-write. The stop is two-phased: set the atomic flag (covers a /// notify lost between loop iterations), then notify the parked waiter. + /// The trailing `in_flight.lock().await` covers a concurrent `tick()` + /// call from outside the loop: `tick()` doesn't check `stopped`, so + /// it can still acquire `in_flight` and run an ingest after the loop + /// task has exited. Waiting on the guard here guarantees no tick is + /// mid-write when `stop` returns, so callers can tear down state + /// safely. pub async fn stop(&self) { self.inner.stopped.store(true, Ordering::SeqCst); self.inner.stop_signal.notify_waiters(); if let Some(handle) = self.handle.lock().await.take() { let _ = handle.await; } - // Belt-and-braces: even if the handle was already taken (idempotent - // calls), make sure no tick is mid-flight before returning. let _ = self.inner.in_flight.lock().await; } } -/// Run a single ingest pass directly. Mirrors TS `runIngestTick(opts)` — -/// callers that want a one-shot sweep instead of a long-running loop use -/// this without going through `start_watch_loop`. -pub async fn run_ingest_tick(ingest: F) -> anyhow::Result -where - F: FnOnce() -> Fut, - Fut: Future>, -{ - ingest().await -} - /// Spawn a background ticker that calls `opts.ingest` whenever the /// active driver fires, skipping ticks while one is in flight. Returns /// a [`WatchController`] the caller uses to invoke an extra tick on @@ -420,22 +413,6 @@ mod tests { }) } - #[tokio::test] - async fn run_ingest_tick_invokes_callable_once() { - let counter = Arc::new(AtomicUsize::new(0)); - let report = run_ingest_tick(|| { - let counter = counter.clone(); - async move { - counter.fetch_add(1, Ordering::SeqCst); - Ok(IngestReport::default()) - } - }) - .await - .unwrap(); - assert_eq!(counter.load(Ordering::SeqCst), 1); - assert_eq!(report.scanned_sessions, 0); - } - #[tokio::test(flavor = "current_thread", start_paused = true)] async fn watch_loop_runs_immediate_then_periodic() { let counter = Arc::new(AtomicUsize::new(0)); diff --git a/crates/relayburn-sdk/src/ingest_verb.rs b/crates/relayburn-sdk/src/ingest_verb.rs index 49a7eba8..19e5f60e 100644 --- a/crates/relayburn-sdk/src/ingest_verb.rs +++ b/crates/relayburn-sdk/src/ingest_verb.rs @@ -1,9 +1,14 @@ -//! Ingest verb — async wrapper over [`crate::ingest::ingest_all`]. +//! Ingest verb — wrapper over [`crate::ingest::ingest_all`]. //! //! Mirrors the TS `ingest` verb in `packages/sdk/index.js`. The Rust port //! threads the ledger location through [`crate::Ledger::open`] explicitly //! instead of swapping `RELAYBURN_HOME`, so embeddings can run against //! multiple ledgers in the same process. +//! +//! Sync by design: the body is filesystem walks plus rusqlite writes, none +//! of which yield to the tokio runtime. Callers running this from an async +//! context (the napi binding, MCP server) should wrap the call in +//! `tokio::task::spawn_blocking`. use std::path::PathBuf; @@ -56,23 +61,23 @@ impl IngestOptions { impl LedgerHandle { /// Run [`ingest_all`] against this ledger handle. Returns the merged /// per-harness report. - pub async fn ingest(&mut self, mut opts: IngestOptions) -> anyhow::Result { + pub fn ingest(&mut self, mut opts: IngestOptions) -> anyhow::Result { if opts.ledger_home.is_none() { opts.ledger_home = self.inner.burn_path().parent().map(|p| p.to_path_buf()); } let raw = opts.into_raw(); - ingest_all(&mut self.inner, &raw).await + ingest_all(&mut self.inner, &raw) } } /// Free-function form of the ingest verb. Opens a fresh ledger using /// `opts.ledger_home`, runs [`ingest_all`], and returns the report. -pub async fn ingest(opts: IngestOptions) -> anyhow::Result { +pub fn ingest(opts: IngestOptions) -> anyhow::Result { let mut handle = Ledger::open(LedgerOpenOptions { home: opts.ledger_home.clone(), ..Default::default() })?; - handle.ingest(opts).await + handle.ingest(opts) } #[cfg(test)] @@ -80,8 +85,8 @@ mod tests { use super::*; use tempfile::TempDir; - #[tokio::test] - async fn ingest_with_empty_roots_returns_zero_report() { + #[test] + fn ingest_with_empty_roots_returns_zero_report() { let home = TempDir::new().expect("home tmp"); let claude = TempDir::new().expect("claude tmp"); let codex = TempDir::new().expect("codex tmp"); @@ -98,7 +103,7 @@ mod tests { on_warn: None, }; - let report = ingest(opts).await.expect("ingest"); + let report = ingest(opts).expect("ingest"); assert_eq!(report.scanned_sessions, 0); assert_eq!(report.ingested_sessions, 0); assert_eq!(report.appended_turns, 0); diff --git a/crates/relayburn-sdk/src/lib.rs b/crates/relayburn-sdk/src/lib.rs index 4498d774..45f866f4 100644 --- a/crates/relayburn-sdk/src/lib.rs +++ b/crates/relayburn-sdk/src/lib.rs @@ -13,9 +13,11 @@ //! Verbs are callable two ways: as a free function or as a method on //! [`LedgerHandle`]. //! -//! `ingest` is async (tokio); the query/compute verbs are sync -//! (CPU-bound). Callers running them from an async context — the typical -//! pattern in the MCP server — should wrap them in `tokio::task::spawn_blocking`. +//! Every verb is synchronous: ingest is filesystem walks plus rusqlite +//! writes, and the query/compute verbs are CPU-bound. Callers running +//! these from an async context — the typical pattern in the MCP server, +//! the napi binding, or the watch loop — should wrap them in +//! `tokio::task::spawn_blocking` so they don't stall the runtime. //! //! # Opening a ledger //! @@ -110,11 +112,11 @@ pub use crate::analyze::{ pub use crate::ingest::{ cleanup_stale_pending_stamps, default_session_roots, ingest_all, ingest_claude_session, - ingest_codex_sessions, ingest_opencode_sessions, run_ingest_tick, start_watch_loop, - write_pending_stamp, ErrorSink, IngestFn, IngestOptions as RawIngestOptions, IngestReport, - IngestRoots, PendingStamp, PendingStampHarness, PendingStampWriteResult, ReportSink, - StartWatchLoopOptions, WatchController, WriteOptions as PendingStampWriteOptions, - DEFAULT_FS_DEBOUNCE, DEFAULT_SLOW_FALLBACK, + ingest_codex_sessions, ingest_opencode_sessions, start_watch_loop, write_pending_stamp, + ErrorSink, IngestFn, IngestOptions as RawIngestOptions, IngestReport, IngestRoots, + PendingStamp, PendingStampHarness, PendingStampWriteResult, ReportSink, StartWatchLoopOptions, + WatchController, WriteOptions as PendingStampWriteOptions, DEFAULT_FS_DEBOUNCE, + DEFAULT_SLOW_FALLBACK, }; // --- LedgerOpenOptions ----------------------------------------------------- diff --git a/crates/relayburn-sdk/tests/integration.rs b/crates/relayburn-sdk/tests/integration.rs index f6c519a9..40b1719c 100644 --- a/crates/relayburn-sdk/tests/integration.rs +++ b/crates/relayburn-sdk/tests/integration.rs @@ -318,8 +318,8 @@ fn sdk_verbs_round_trip_against_a_fixture_ledger() { .collect(); } -#[tokio::test] -async fn ingest_with_empty_roots_returns_zero_report_via_handle_and_free_fn() { +#[test] +fn ingest_with_empty_roots_returns_zero_report_via_handle_and_free_fn() { // 10. ingest — handle + free. Both forms must accept empty roots and // return an all-zero report without scanning the developer's HOME. let home = TempDir::new().expect("home tmp"); @@ -338,7 +338,6 @@ async fn ingest_with_empty_roots_returns_zero_report_via_handle_and_free_fn() { }, ..Default::default() }) - .await .expect("handle ingest"); assert_eq!(report.scanned_sessions, 0); assert_eq!(report.appended_turns, 0); @@ -352,7 +351,6 @@ async fn ingest_with_empty_roots_returns_zero_report_via_handle_and_free_fn() { }, ..Default::default() }) - .await .expect("free ingest"); assert_eq!(report2.scanned_sessions, 0); assert_eq!(report2.appended_turns, 0);