Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Cross-package release notes for relayburn. Package changelogs contain package-le

### Changed

- `relayburn-sdk`: ingest verbs are now synchronous; async callers should
run them via `tokio::task::spawn_blocking`.
- `relayburn-sdk`: lower per-record allocations in reader hashing, tool-result
sizing, relationship dedup, and project resolution. Cuts overhead during
large session imports and concurrent `resolve_project` calls.
Expand All @@ -15,6 +17,10 @@ Cross-package release notes for relayburn. Package changelogs contain package-le
JSON lines lacking a trailing newline still surface in the single-shot
output.

### Removed

- `relayburn-sdk`: removed `run_ingest_tick`.

## [2.8.3] - 2026-05-11

### Changed
Expand Down
5 changes: 1 addition & 4 deletions crates/relayburn-cli/src/commands/hotspots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,9 @@ fn run_inner(globals: &GlobalArgs, args: HotspotsArgs) -> anyhow::Result<i32> {
progress.set_task("opening ledger");
let mut handle = Ledger::open(opts)?;

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
progress.set_task("refreshing ledger");
let raw_opts = progress.ingest_options(ledger_home.clone());
rt.block_on(ingest_all(handle.raw_mut(), &raw_opts))?;
ingest_all(handle.raw_mut(), &raw_opts)?;
drop(handle);

let session_filter = match args.session.as_deref() {
Expand Down
36 changes: 4 additions & 32 deletions crates/relayburn-cli/src/commands/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ pub fn run(globals: &GlobalArgs, args: IngestArgs) -> i32 {
}

/// One-shot scan: open the ledger, run a single `ingest_all`, log the
/// summary, exit. Drives a current-thread tokio runtime so the otherwise
/// sync presenter can drive the async SDK verb.
/// summary, exit.
///
/// Summary line is emitted on **stdout** (matching TS `runIngestOnce`
/// at `packages/cli/src/commands/ingest.ts:121-126`) so callers can
Expand All @@ -92,20 +91,9 @@ fn run_once(globals: &GlobalArgs, quiet: bool) -> i32 {
return report_error(&err, globals);
}
};
progress.set_task("starting runtime");
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(err) => {
progress.finish_and_clear();
return report_error(&err, globals);
}
};
progress.set_task("scanning sessions");
let opts = progress.ingest_options(globals.ledger_path.clone());
let result = rt.block_on(ingest_all(handle.raw_mut(), &opts));
let result = ingest_all(handle.raw_mut(), &opts);
progress.finish_and_clear();
match result {
Ok(report) => {
Expand Down Expand Up @@ -203,7 +191,7 @@ fn run_watch(globals: &GlobalArgs, args: &IngestArgs) -> i32 {
} else {
progress.ingest_options(ledger_home)
};
let result = ingest_all(guard.raw_mut(), &opts).await;
let result = ingest_all(guard.raw_mut(), &opts);
progress.set_task(watch_message);
result
})
Expand Down Expand Up @@ -322,30 +310,14 @@ fn run_hook(globals: &GlobalArgs, hook: &str, quiet: bool) -> i32 {
return 0;
}
};
if let Some(progress) = &progress {
progress.set_task("starting runtime");
}
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(err) => {
if let Some(progress) = &progress {
progress.finish_and_clear();
}
eprintln!("[burn] ingest: {err}");
return 0;
}
};
if let Some(progress) = &progress {
progress.set_task("scanning sessions");
}
let opts = match &progress {
Some(progress) => progress.ingest_options(globals.ledger_path.clone()),
None => TaskProgress::quiet_ingest_options(globals.ledger_path.clone()),
};
let result = rt.block_on(ingest_all(handle.raw_mut(), &opts));
let result = ingest_all(handle.raw_mut(), &opts);
if let Some(progress) = &progress {
progress.finish_and_clear();
}
Expand Down
10 changes: 2 additions & 8 deletions crates/relayburn-cli/src/commands/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,7 @@ fn run_reset(globals: &GlobalArgs, args: crate::cli::StateResetArgs) -> i32 {
print_reset_report(globals, &summary, /*executed=*/ true, ingest_report.as_ref())
}

/// Drive a single `ingest_all` sweep on the open handle. Mirrors the
/// `run_ingest` helper in `commands/summary.rs`: the SDK verb is async,
/// so we spin a current-thread tokio runtime to drive it from this
/// otherwise-sync presenter.
/// Drive a single `ingest_all` sweep on the open handle.
///
/// `ledger_home` propagates the global `--ledger-path` override into
/// `RawIngestOptions::ledger_home` so sidecar ingest state (config and
Expand All @@ -504,12 +501,9 @@ fn run_reset_reingest(
ledger_home: Option<std::path::PathBuf>,
progress: &TaskProgress,
) -> anyhow::Result<relayburn_sdk::IngestReport> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
progress.set_task("re-ingesting sessions");
let opts = progress.ingest_options(ledger_home);
rt.block_on(ingest_all(handle.raw_mut(), &opts))
ingest_all(handle.raw_mut(), &opts)
}

fn print_reset_report(
Expand Down
19 changes: 5 additions & 14 deletions crates/relayburn-cli/src/commands/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,27 +325,18 @@ fn parse_tag_filters(tags: &[String]) -> anyhow::Result<BTreeMap<String, String>
Ok(out)
}

/// Run an ingest sweep on the open handle. Builds a current-thread tokio
/// runtime so the otherwise-sync presenter can drive the async verb.
/// Run an ingest sweep on the open handle.
fn run_ingest(
handle: &mut LedgerHandle,
progress: &TaskProgress,
ledger_home: Option<std::path::PathBuf>,
) -> anyhow::Result<relayburn_sdk::IngestReport> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| {
progress.finish_and_clear();
err
})?;
progress.set_task("refreshing ledger");
let opts = progress.ingest_options(ledger_home);
rt.block_on(ingest_all(handle.raw_mut(), &opts))
.map_err(|err| {
progress.finish_and_clear();
err
})
ingest_all(handle.raw_mut(), &opts).map_err(|err| {
progress.finish_and_clear();
err
})
}

const COVERAGE_FIELDS: [CoverageField; 5] = [
Expand Down
2 changes: 1 addition & 1 deletion crates/relayburn-cli/src/harnesses/claude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl HarnessAdapter for ClaudeAdapter {
let mut handle = Ledger::open(LedgerOpenOptions::default())?;
let cwd_str = ctx.cwd.to_string_lossy().into_owned();
let opts = RawIngestOptions::default();
ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts).await
ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts)
}
}

Expand Down
18 changes: 4 additions & 14 deletions crates/relayburn-cli/src/harnesses/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
//! * `ingest_sessions` — defers to [`relayburn_sdk::ingest_codex_sessions`],
//! the codex-only ingest pass. The factory opens a fresh ledger handle
//! per call (mirrors the TS lock-then-write-then-close shape; SQLite WAL
//! keeps the per-tick open cheap).
//! keeps the per-tick open cheap). The SDK verb is sync, so we pass it
//! directly as a fn pointer to [`pending_stamp::session_store_adapter`].

use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;

use relayburn_sdk::{ingest_codex_sessions, IngestReport, RawIngestOptions, RawLedger};
use relayburn_sdk::ingest_codex_sessions;

use super::pending_stamp;
use super::HarnessAdapter;
Expand All @@ -29,20 +28,11 @@ fn codex_sessions_dir() -> PathBuf {
home_dir().join(".codex").join("sessions")
}

/// Box-pin the SDK's `async fn ingest_codex_sessions` into a fn pointer
/// the [`pending_stamp::SessionIngestor`] type alias accepts.
fn codex_ingest<'a>(
ledger: &'a mut RawLedger,
opts: &'a RawIngestOptions,
) -> Pin<Box<dyn Future<Output = anyhow::Result<IngestReport>> + Send + 'a>> {
Box::pin(ingest_codex_sessions(ledger, opts))
}

/// Hand out a `&'static dyn HarnessAdapter` for codex. The registry calls
/// this once at lazy-init time. See
/// [`pending_stamp::session_store_adapter`] for the leak semantics.
pub fn adapter() -> &'static dyn HarnessAdapter {
pending_stamp::session_store_adapter("codex", codex_sessions_dir, codex_ingest)
pending_stamp::session_store_adapter("codex", codex_sessions_dir, ingest_codex_sessions)
}

#[cfg(test)]
Expand Down
18 changes: 4 additions & 14 deletions crates/relayburn-cli/src/harnesses/opencode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
//! [`relayburn_sdk::ingest_opencode_sessions`], the opencode-only ingest
//! pass. The factory opens a fresh ledger handle per call (mirrors the
//! TS lock-then-write-then-close shape; SQLite WAL keeps the per-tick
//! open cheap).
//! open cheap). The SDK verb is sync, so we pass it directly as a fn
//! pointer to [`pending_stamp::session_store_adapter`].

use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;

use relayburn_sdk::{ingest_opencode_sessions, IngestReport, RawIngestOptions, RawLedger};
use relayburn_sdk::ingest_opencode_sessions;

use super::pending_stamp;
use super::HarnessAdapter;
Expand All @@ -36,20 +35,11 @@ fn opencode_sessions_dir() -> PathBuf {
.join("session")
}

/// Box-pin the SDK's `async fn ingest_opencode_sessions` into a fn
/// pointer the [`pending_stamp::SessionIngestor`] type alias accepts.
fn opencode_ingest<'a>(
ledger: &'a mut RawLedger,
opts: &'a RawIngestOptions,
) -> Pin<Box<dyn Future<Output = anyhow::Result<IngestReport>> + Send + 'a>> {
Box::pin(ingest_opencode_sessions(ledger, opts))
}

/// Hand out a `&'static dyn HarnessAdapter` for opencode. The registry
/// calls this once at lazy-init time. See
/// [`pending_stamp::session_store_adapter`] for the leak semantics.
pub fn adapter() -> &'static dyn HarnessAdapter {
pending_stamp::session_store_adapter("opencode", opencode_sessions_dir, opencode_ingest)
pending_stamp::session_store_adapter("opencode", opencode_sessions_dir, ingest_opencode_sessions)
}

#[cfg(test)]
Expand Down
16 changes: 7 additions & 9 deletions crates/relayburn-cli/src/harnesses/pending_stamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,13 @@ pub fn adapter_static(config: PendingStampAdapter) -> &'static dyn HarnessAdapte
Box::leak(Box::new(PendingStampAdapterImpl::new(config)))
}

/// Async fn pointer for an SDK session ingestor (`ingest_codex_sessions`,
/// `ingest_opencode_sessions`). The shape matches both per-harness ingest
/// passes verbatim — they live in `relayburn_sdk` as `async fn`, and the
/// per-call `Box::pin` adaptation happens at the call site so the helper
/// Fn pointer for an SDK session ingestor (`ingest_codex_sessions`,
/// `ingest_opencode_sessions`). Both verbs are sync in the SDK; the
/// per-tick `Box::pin` adaptation that drops them into [`IngestSessionsFn`]
/// happens at the call site in [`session_store_adapter`] so the helper
/// stays a fn pointer (no per-tick closure allocation).
pub type SessionIngestor = for<'a> fn(
&'a mut RawLedger,
&'a RawIngestOptions,
) -> Pin<Box<dyn Future<Output = anyhow::Result<IngestReport>> + Send + 'a>>;
pub type SessionIngestor =
fn(&mut RawLedger, &RawIngestOptions) -> anyhow::Result<IngestReport>;

/// One-call factory for pending-stamp adapters whose only differences are
/// the harness name, the session-root resolver, and which SDK ingest pass
Expand Down Expand Up @@ -174,7 +172,7 @@ pub fn session_store_adapter(
ledger_home,
..RawIngestOptions::default()
};
ingestor(handle.raw_mut(), &opts).await
ingestor(handle.raw_mut(), &opts)
})
});
adapter_static(PendingStampAdapter::new(name, session_root, ingest_sessions))
Expand Down
6 changes: 5 additions & 1 deletion crates/relayburn-sdk-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,8 +1261,12 @@ pub async fn ingest(opts: Option<IngestOptions>) -> Result<IngestReport, NapiErr
on_progress: None,
on_warn: None,
};
let report = sdk::ingest(raw)
// SDK ingest is sync (filesystem walks + rusqlite writes). Run it on
// tokio's blocking pool so the napi runtime stays responsive while the
// sweep is in flight.
let report = tokio::task::spawn_blocking(move || sdk::ingest(raw))
.await
.map_err(|e| NapiError::from_reason(format!("ingest task panicked: {e}")))?
.map_err(|e| NapiError::from_reason(format!("{e:#}")))?;
Ok(report.into())
}
Expand Down
8 changes: 4 additions & 4 deletions crates/relayburn-sdk/src/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
//!
//! ```no_run
//! use relayburn_sdk::{ingest_all, RawIngestOptions, RawLedger};
//! # async fn run() -> anyhow::Result<()> {
//! # fn run() -> anyhow::Result<()> {
//! let mut ledger = RawLedger::open_default()?;
//! let report = ingest_all(&mut ledger, &RawIngestOptions::default()).await?;
//! let report = ingest_all(&mut ledger, &RawIngestOptions::default())?;
//! println!("ingested {} turns", report.appended_turns);
//! # Ok(()) }
//! ```
Expand Down Expand Up @@ -89,6 +89,6 @@ pub use pending_stamps::{
pub use reingest::{derive_codex_session_id, reingest_missing_content, ReingestContentReport};
pub use walk::{walk_jsonl, walk_opencode_sessions};
pub use watch_loop::{
run_ingest_tick, start_watch_loop, ErrorSink, IngestFn, ReportSink, StartWatchLoopOptions,
WatchController, DEFAULT_FS_DEBOUNCE, DEFAULT_SLOW_FALLBACK,
start_watch_loop, ErrorSink, IngestFn, ReportSink, StartWatchLoopOptions, WatchController,
DEFAULT_FS_DEBOUNCE, DEFAULT_SLOW_FALLBACK,
};
Loading
Loading