Skip to content

Commit d7c9f61

Browse files
willwashburnclaude
andauthored
refactor(sdk): drop async on ingest verbs that never yield (#419)
* refactor(sdk): drop async on ingest verbs that never yield (#334) Every ingest verb was `async fn` but the bodies were sync filesystem walks and rusqlite writes — no `.await` actually reached the runtime. Drop the `async` annotation so the type matches the semantics; callers that need to run these from an async context can wrap them in `tokio::task::spawn_blocking` (the sdk-node binding now does). Also delete `run_ingest_tick` (a one-line wrapper over `ingest().await`) and the dead `in_flight.lock().await` belt-and-braces in `WatchController::stop` — the task handle await already covers any in-flight tick. CLI presenters that built a tokio runtime just to `block_on` ingest now call the sync verb directly; only `burn ingest --watch` keeps its runtime (the watch loop still spawns async tasks for the FS-event driver). https://claude.ai/code/session_01RAt76YyVgMYdij9zeqPUFB * fix(sdk): restore in_flight barrier in WatchController::stop Codex review on #419 flagged that dropping the trailing `in_flight.lock().await` regresses the "await any in-flight tick" contract: the public `tick()` path doesn't check the `stopped` flag, so a concurrent caller can grab `in_flight` and run an ingest after the loop task has already exited. Awaiting the spawned task handle alone covers the runner-driven path but not that one — callers that tear down state right after `stop().await` could race an active write. Put the trailing `in_flight.lock().await` back and document why it isn't redundant. https://claude.ai/code/session_01RAt76YyVgMYdij9zeqPUFB * docs(changelog): trim Unreleased ingest entry to impact-first bullet CodeRabbit review feedback on #419: per the project's CLAUDE.md guidelines, changelog entries should be concise and impact-first. https://claude.ai/code/session_01RAt76YyVgMYdij9zeqPUFB --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 25c71ae commit d7c9f61

19 files changed

Lines changed: 126 additions & 213 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ Cross-package release notes for relayburn. Package changelogs contain package-le
66

77
### Changed
88

9+
- `relayburn-sdk`: ingest verbs are now synchronous; async callers should
10+
run them via `tokio::task::spawn_blocking`.
911
- `relayburn-sdk`: lower per-record allocations in reader hashing, tool-result
1012
sizing, relationship dedup, and project resolution. Cuts overhead during
1113
large session imports and concurrent `resolve_project` calls.
@@ -15,6 +17,10 @@ Cross-package release notes for relayburn. Package changelogs contain package-le
1517
JSON lines lacking a trailing newline still surface in the single-shot
1618
output.
1719

20+
### Removed
21+
22+
- `relayburn-sdk`: removed `run_ingest_tick`.
23+
1824
## [2.8.3] - 2026-05-11
1925

2026
### Changed

crates/relayburn-cli/src/commands/hotspots.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,9 @@ fn run_inner(globals: &GlobalArgs, args: HotspotsArgs) -> anyhow::Result<i32> {
228228
progress.set_task("opening ledger");
229229
let mut handle = Ledger::open(opts)?;
230230

231-
let rt = tokio::runtime::Builder::new_current_thread()
232-
.enable_all()
233-
.build()?;
234231
progress.set_task("refreshing ledger");
235232
let raw_opts = progress.ingest_options(ledger_home.clone());
236-
rt.block_on(ingest_all(handle.raw_mut(), &raw_opts))?;
233+
ingest_all(handle.raw_mut(), &raw_opts)?;
237234
drop(handle);
238235

239236
let session_filter = match args.session.as_deref() {

crates/relayburn-cli/src/commands/ingest.rs

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ pub fn run(globals: &GlobalArgs, args: IngestArgs) -> i32 {
7474
}
7575

7676
/// One-shot scan: open the ledger, run a single `ingest_all`, log the
77-
/// summary, exit. Drives a current-thread tokio runtime so the otherwise
78-
/// sync presenter can drive the async SDK verb.
77+
/// summary, exit.
7978
///
8079
/// Summary line is emitted on **stdout** (matching TS `runIngestOnce`
8180
/// at `packages/cli/src/commands/ingest.ts:121-126`) so callers can
@@ -92,20 +91,9 @@ fn run_once(globals: &GlobalArgs, quiet: bool) -> i32 {
9291
return report_error(&err, globals);
9392
}
9493
};
95-
progress.set_task("starting runtime");
96-
let rt = match tokio::runtime::Builder::new_current_thread()
97-
.enable_all()
98-
.build()
99-
{
100-
Ok(rt) => rt,
101-
Err(err) => {
102-
progress.finish_and_clear();
103-
return report_error(&err, globals);
104-
}
105-
};
10694
progress.set_task("scanning sessions");
10795
let opts = progress.ingest_options(globals.ledger_path.clone());
108-
let result = rt.block_on(ingest_all(handle.raw_mut(), &opts));
96+
let result = ingest_all(handle.raw_mut(), &opts);
10997
progress.finish_and_clear();
11098
match result {
11199
Ok(report) => {
@@ -203,7 +191,7 @@ fn run_watch(globals: &GlobalArgs, args: &IngestArgs) -> i32 {
203191
} else {
204192
progress.ingest_options(ledger_home)
205193
};
206-
let result = ingest_all(guard.raw_mut(), &opts).await;
194+
let result = ingest_all(guard.raw_mut(), &opts);
207195
progress.set_task(watch_message);
208196
result
209197
})
@@ -322,30 +310,14 @@ fn run_hook(globals: &GlobalArgs, hook: &str, quiet: bool) -> i32 {
322310
return 0;
323311
}
324312
};
325-
if let Some(progress) = &progress {
326-
progress.set_task("starting runtime");
327-
}
328-
let rt = match tokio::runtime::Builder::new_current_thread()
329-
.enable_all()
330-
.build()
331-
{
332-
Ok(rt) => rt,
333-
Err(err) => {
334-
if let Some(progress) = &progress {
335-
progress.finish_and_clear();
336-
}
337-
eprintln!("[burn] ingest: {err}");
338-
return 0;
339-
}
340-
};
341313
if let Some(progress) = &progress {
342314
progress.set_task("scanning sessions");
343315
}
344316
let opts = match &progress {
345317
Some(progress) => progress.ingest_options(globals.ledger_path.clone()),
346318
None => TaskProgress::quiet_ingest_options(globals.ledger_path.clone()),
347319
};
348-
let result = rt.block_on(ingest_all(handle.raw_mut(), &opts));
320+
let result = ingest_all(handle.raw_mut(), &opts);
349321
if let Some(progress) = &progress {
350322
progress.finish_and_clear();
351323
}

crates/relayburn-cli/src/commands/state.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -488,10 +488,7 @@ fn run_reset(globals: &GlobalArgs, args: crate::cli::StateResetArgs) -> i32 {
488488
print_reset_report(globals, &summary, /*executed=*/ true, ingest_report.as_ref())
489489
}
490490

491-
/// Drive a single `ingest_all` sweep on the open handle. Mirrors the
492-
/// `run_ingest` helper in `commands/summary.rs`: the SDK verb is async,
493-
/// so we spin a current-thread tokio runtime to drive it from this
494-
/// otherwise-sync presenter.
491+
/// Drive a single `ingest_all` sweep on the open handle.
495492
///
496493
/// `ledger_home` propagates the global `--ledger-path` override into
497494
/// `RawIngestOptions::ledger_home` so sidecar ingest state (config and
@@ -504,12 +501,9 @@ fn run_reset_reingest(
504501
ledger_home: Option<std::path::PathBuf>,
505502
progress: &TaskProgress,
506503
) -> anyhow::Result<relayburn_sdk::IngestReport> {
507-
let rt = tokio::runtime::Builder::new_current_thread()
508-
.enable_all()
509-
.build()?;
510504
progress.set_task("re-ingesting sessions");
511505
let opts = progress.ingest_options(ledger_home);
512-
rt.block_on(ingest_all(handle.raw_mut(), &opts))
506+
ingest_all(handle.raw_mut(), &opts)
513507
}
514508

515509
fn print_reset_report(

crates/relayburn-cli/src/commands/summary.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -325,27 +325,18 @@ fn parse_tag_filters(tags: &[String]) -> anyhow::Result<BTreeMap<String, String>
325325
Ok(out)
326326
}
327327

328-
/// Run an ingest sweep on the open handle. Builds a current-thread tokio
329-
/// runtime so the otherwise-sync presenter can drive the async verb.
328+
/// Run an ingest sweep on the open handle.
330329
fn run_ingest(
331330
handle: &mut LedgerHandle,
332331
progress: &TaskProgress,
333332
ledger_home: Option<std::path::PathBuf>,
334333
) -> anyhow::Result<relayburn_sdk::IngestReport> {
335-
let rt = tokio::runtime::Builder::new_current_thread()
336-
.enable_all()
337-
.build()
338-
.map_err(|err| {
339-
progress.finish_and_clear();
340-
err
341-
})?;
342334
progress.set_task("refreshing ledger");
343335
let opts = progress.ingest_options(ledger_home);
344-
rt.block_on(ingest_all(handle.raw_mut(), &opts))
345-
.map_err(|err| {
346-
progress.finish_and_clear();
347-
err
348-
})
336+
ingest_all(handle.raw_mut(), &opts).map_err(|err| {
337+
progress.finish_and_clear();
338+
err
339+
})
349340
}
350341

351342
const COVERAGE_FIELDS: [CoverageField; 5] = [

crates/relayburn-cli/src/harnesses/claude.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl HarnessAdapter for ClaudeAdapter {
146146
let mut handle = Ledger::open(LedgerOpenOptions::default())?;
147147
let cwd_str = ctx.cwd.to_string_lossy().into_owned();
148148
let opts = RawIngestOptions::default();
149-
ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts).await
149+
ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts)
150150
}
151151
}
152152

crates/relayburn-cli/src/harnesses/codex.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,12 @@
1010
//! * `ingest_sessions` — defers to [`relayburn_sdk::ingest_codex_sessions`],
1111
//! the codex-only ingest pass. The factory opens a fresh ledger handle
1212
//! per call (mirrors the TS lock-then-write-then-close shape; SQLite WAL
13-
//! keeps the per-tick open cheap).
13+
//! keeps the per-tick open cheap). The SDK verb is sync, so we pass it
14+
//! directly as a fn pointer to [`pending_stamp::session_store_adapter`].
1415
15-
use std::future::Future;
1616
use std::path::PathBuf;
17-
use std::pin::Pin;
1817

19-
use relayburn_sdk::{ingest_codex_sessions, IngestReport, RawIngestOptions, RawLedger};
18+
use relayburn_sdk::ingest_codex_sessions;
2019

2120
use super::pending_stamp;
2221
use super::HarnessAdapter;
@@ -29,20 +28,11 @@ fn codex_sessions_dir() -> PathBuf {
2928
home_dir().join(".codex").join("sessions")
3029
}
3130

32-
/// Box-pin the SDK's `async fn ingest_codex_sessions` into a fn pointer
33-
/// the [`pending_stamp::SessionIngestor`] type alias accepts.
34-
fn codex_ingest<'a>(
35-
ledger: &'a mut RawLedger,
36-
opts: &'a RawIngestOptions,
37-
) -> Pin<Box<dyn Future<Output = anyhow::Result<IngestReport>> + Send + 'a>> {
38-
Box::pin(ingest_codex_sessions(ledger, opts))
39-
}
40-
4131
/// Hand out a `&'static dyn HarnessAdapter` for codex. The registry calls
4232
/// this once at lazy-init time. See
4333
/// [`pending_stamp::session_store_adapter`] for the leak semantics.
4434
pub fn adapter() -> &'static dyn HarnessAdapter {
45-
pending_stamp::session_store_adapter("codex", codex_sessions_dir, codex_ingest)
35+
pending_stamp::session_store_adapter("codex", codex_sessions_dir, ingest_codex_sessions)
4636
}
4737

4838
#[cfg(test)]

crates/relayburn-cli/src/harnesses/opencode.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,12 @@
1313
//! [`relayburn_sdk::ingest_opencode_sessions`], the opencode-only ingest
1414
//! pass. The factory opens a fresh ledger handle per call (mirrors the
1515
//! TS lock-then-write-then-close shape; SQLite WAL keeps the per-tick
16-
//! open cheap).
16+
//! open cheap). The SDK verb is sync, so we pass it directly as a fn
17+
//! pointer to [`pending_stamp::session_store_adapter`].
1718
18-
use std::future::Future;
1919
use std::path::PathBuf;
20-
use std::pin::Pin;
2120

22-
use relayburn_sdk::{ingest_opencode_sessions, IngestReport, RawIngestOptions, RawLedger};
21+
use relayburn_sdk::ingest_opencode_sessions;
2322

2423
use super::pending_stamp;
2524
use super::HarnessAdapter;
@@ -36,20 +35,11 @@ fn opencode_sessions_dir() -> PathBuf {
3635
.join("session")
3736
}
3837

39-
/// Box-pin the SDK's `async fn ingest_opencode_sessions` into a fn
40-
/// pointer the [`pending_stamp::SessionIngestor`] type alias accepts.
41-
fn opencode_ingest<'a>(
42-
ledger: &'a mut RawLedger,
43-
opts: &'a RawIngestOptions,
44-
) -> Pin<Box<dyn Future<Output = anyhow::Result<IngestReport>> + Send + 'a>> {
45-
Box::pin(ingest_opencode_sessions(ledger, opts))
46-
}
47-
4838
/// Hand out a `&'static dyn HarnessAdapter` for opencode. The registry
4939
/// calls this once at lazy-init time. See
5040
/// [`pending_stamp::session_store_adapter`] for the leak semantics.
5141
pub fn adapter() -> &'static dyn HarnessAdapter {
52-
pending_stamp::session_store_adapter("opencode", opencode_sessions_dir, opencode_ingest)
42+
pending_stamp::session_store_adapter("opencode", opencode_sessions_dir, ingest_opencode_sessions)
5343
}
5444

5545
#[cfg(test)]

crates/relayburn-cli/src/harnesses/pending_stamp.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,13 @@ pub fn adapter_static(config: PendingStampAdapter) -> &'static dyn HarnessAdapte
131131
Box::leak(Box::new(PendingStampAdapterImpl::new(config)))
132132
}
133133

134-
/// Async fn pointer for an SDK session ingestor (`ingest_codex_sessions`,
135-
/// `ingest_opencode_sessions`). The shape matches both per-harness ingest
136-
/// passes verbatim — they live in `relayburn_sdk` as `async fn`, and the
137-
/// per-call `Box::pin` adaptation happens at the call site so the helper
134+
/// Fn pointer for an SDK session ingestor (`ingest_codex_sessions`,
135+
/// `ingest_opencode_sessions`). Both verbs are sync in the SDK; the
136+
/// per-tick `Box::pin` adaptation that drops them into [`IngestSessionsFn`]
137+
/// happens at the call site in [`session_store_adapter`] so the helper
138138
/// stays a fn pointer (no per-tick closure allocation).
139-
pub type SessionIngestor = for<'a> fn(
140-
&'a mut RawLedger,
141-
&'a RawIngestOptions,
142-
) -> Pin<Box<dyn Future<Output = anyhow::Result<IngestReport>> + Send + 'a>>;
139+
pub type SessionIngestor =
140+
fn(&mut RawLedger, &RawIngestOptions) -> anyhow::Result<IngestReport>;
143141

144142
/// One-call factory for pending-stamp adapters whose only differences are
145143
/// the harness name, the session-root resolver, and which SDK ingest pass
@@ -174,7 +172,7 @@ pub fn session_store_adapter(
174172
ledger_home,
175173
..RawIngestOptions::default()
176174
};
177-
ingestor(handle.raw_mut(), &opts).await
175+
ingestor(handle.raw_mut(), &opts)
178176
})
179177
});
180178
adapter_static(PendingStampAdapter::new(name, session_root, ingest_sessions))

crates/relayburn-sdk-node/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1261,8 +1261,12 @@ pub async fn ingest(opts: Option<IngestOptions>) -> Result<IngestReport, NapiErr
12611261
on_progress: None,
12621262
on_warn: None,
12631263
};
1264-
let report = sdk::ingest(raw)
1264+
// SDK ingest is sync (filesystem walks + rusqlite writes). Run it on
1265+
// tokio's blocking pool so the napi runtime stays responsive while the
1266+
// sweep is in flight.
1267+
let report = tokio::task::spawn_blocking(move || sdk::ingest(raw))
12651268
.await
1269+
.map_err(|e| NapiError::from_reason(format!("ingest task panicked: {e}")))?
12661270
.map_err(|e| NapiError::from_reason(format!("{e:#}")))?;
12671271
Ok(report.into())
12681272
}

0 commit comments

Comments
 (0)