Skip to content

Commit ab56ca6

Browse files
committed
reader (Rust): stream JSONL hot loops; use memchr crate
The Claude / Codex incremental and prescan paths previously built a multi-GB up-front buffer (`vec![0u8; (size - start_offset) as usize]`, `Vec::with_capacity((size - start_offset) as usize)` + `read_to_end`) to scan a session log. Switch to BufReader + `read_until(b'\n', ...)` into a reused line buffer so only the longest single line stays resident, regardless of file size. The main `parse_claude_session` loop also moves off `BufReader::lines()` (a fresh `String` per line) onto `read_line` into a reused `String`, keeping per-line allocation bounded by the longest line for sessions with tens of thousands of turns. `memchr_newline` in the codex parser was named for `memchr` but did `buf.iter().position(|&b| b == b'\n')`. Wire the actual `memchr` crate (already a transitive dep through `regex`) for SIMD-accelerated line splits; pinned at the workspace root and depended on directly from `relayburn-sdk` so we own the version. Closes #323.
1 parent 9973768 commit ab56ca6

6 files changed

Lines changed: 94 additions & 44 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ Cross-package release notes for relayburn. Package changelogs contain package-le
44

55
## [Unreleased]
66

7+
- `relayburn-sdk` (Rust): reader hot loops in `claude.rs` and `codex.rs` now stream JSONL line-by-line via `BufReader::read_until` instead of pre-allocating a `(size - start_offset)`-byte buffer up front; only the longest single line stays resident. `memchr_newline` in the codex parser now actually uses the `memchr` crate for SIMD-accelerated newline scanning. The main `parse_claude_session` loop also drops `BufReader::lines()` in favor of `read_line` into a reused `String`. (#323)
8+
79
## [2.0.0] - 2026-05-07
810

911
- `relayburn-sdk` (Rust): default ledger home moves from `~/.relayburn` to `~/.agentworkforce/burn` so the Rust 2.0 port and the TS 1.x package can coexist on disk during the #249 cutover. `RELAYBURN_HOME` (and the per-DB path overrides) continue to override the path; TS 1.x users on `~/.relayburn` are unaffected. Rust-port testers with data under the old path can `mv ~/.relayburn ~/.agentworkforce/burn` to carry it over (formats are not compatible — Rust treats any non-2.0 layout as empty and requires a `burn ingest` re-population).

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ anyhow = "1"
1717
thiserror = "2"
1818
clap = { version = "4", features = ["derive"] }
1919
rusqlite = { version = "0.32", features = ["bundled"] }
20+
memchr = "2"

crates/relayburn-sdk/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ sha2 = "0.10"
3333
hex = "0.4"
3434
regex = "1"
3535
phf = { version = "0.11", features = ["macros"] }
36+
# reader: SIMD-accelerated newline split for the JSONL hot loops in
37+
# `reader/{claude,codex}.rs`. `regex` already pulls memchr in
38+
# transitively; we depend on it directly so we own the version.
39+
memchr = { workspace = true }
3640

3741
# ledger: SQLite events + content store
3842
rusqlite = { workspace = true }

crates/relayburn-sdk/src/reader/claude.rs

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,21 @@ pub fn parse_claude_session_with_counter<P: AsRef<Path>, C: TokenCounter + ?Size
9696
let capture_content = matches!(content_mode, ContentStoreMode::Full);
9797

9898
let file = File::open(path)?;
99-
let reader = BufReader::new(file);
99+
let mut reader = BufReader::new(file);
100100

101101
let mut state = ParseState::new(options, path);
102102

103-
for line in reader.lines() {
104-
let line = line?;
105-
state.ingest_line(&line, counter, capture_content);
103+
// `BufReader::lines()` allocates a fresh `String` per line; for sessions
104+
// with tens of thousands of turns that's pure churn. `read_line` into a
105+
// single reused buffer keeps allocation bounded by the longest line.
106+
let mut line = String::new();
107+
loop {
108+
line.clear();
109+
match reader.read_line(&mut line) {
110+
Ok(0) => break,
111+
Ok(_) => state.ingest_line(&line, counter, capture_content),
112+
Err(err) => return Err(err),
113+
}
106114
}
107115

108116
Ok(state.finish(options, capture_content))
@@ -2290,7 +2298,7 @@ fn prescan_nodes(
22902298
next_event_index: 0,
22912299
});
22922300
}
2293-
let mut file = File::open(path)?;
2301+
let file = File::open(path)?;
22942302
let size = file.metadata()?.len();
22952303
let length = end_offset.min(size);
22962304
if length == 0 {
@@ -2299,18 +2307,29 @@ fn prescan_nodes(
22992307
next_event_index: 0,
23002308
});
23012309
}
2302-
let mut buf = vec![0u8; length as usize];
2303-
file.read_exact(&mut buf)?;
2304-
let mut p: usize = 0;
2310+
// Stream the prefix line-by-line rather than reading `[0, length)`
2311+
// into memory all at once. For multi-GB sessions the up-front
2312+
// `vec![0u8; length as usize]` was a multi-GB allocation we never
2313+
// need — only the longest single line has to fit in memory.
2314+
let mut reader = BufReader::new(file).take(length);
2315+
let mut line_buf: Vec<u8> = Vec::new();
23052316
let mut last_assistant_message_id: Option<String> = None;
23062317
let mut next_event_index: u64 = 0;
2307-
while p < buf.len() {
2308-
let nl_idx = match buf[p..].iter().position(|&b| b == b'\n') {
2309-
Some(i) => p + i,
2310-
None => break,
2311-
};
2312-
let raw = std::str::from_utf8(&buf[p..nl_idx]).unwrap_or("").trim();
2313-
p = nl_idx + 1;
2318+
loop {
2319+
line_buf.clear();
2320+
let n = reader.read_until(b'\n', &mut line_buf)?;
2321+
if n == 0 {
2322+
break;
2323+
}
2324+
// A trailing partial line (no `\n`) inside the prescan window
2325+
// should never happen — incremental ingest only commits cursors
2326+
// at newline boundaries — but guard anyway.
2327+
if line_buf.last() != Some(&b'\n') {
2328+
break;
2329+
}
2330+
let raw = std::str::from_utf8(&line_buf[..n - 1])
2331+
.unwrap_or("")
2332+
.trim();
23142333
if raw.is_empty() {
23152334
continue;
23162335
}
@@ -2499,20 +2518,30 @@ fn run_incremental<C: TokenCounter + ?Sized>(
24992518

25002519
let mut file = File::open(path)?;
25012520
file.seek(SeekFrom::Start(start_offset))?;
2502-
let mut buf: Vec<u8> = Vec::with_capacity((size - start_offset) as usize);
2503-
file.read_to_end(&mut buf)?;
2504-
2505-
let mut p: usize = 0;
2521+
// Stream from `start_offset` line-by-line. The previous implementation
2522+
// allocated `Vec::with_capacity((size - start_offset) as usize)` and
2523+
// `read_to_end` into it — for a multi-GB session this was a multi-GB
2524+
// up-front allocation. With BufReader + `read_until` only the longest
2525+
// single line stays resident.
2526+
let mut reader = BufReader::new(file);
2527+
let mut line_buf: Vec<u8> = Vec::new();
25062528
let mut cursor_offset: u64 = start_offset; // position past last complete \n
2507-
while p < buf.len() {
2508-
let nl_idx = match buf[p..].iter().position(|&b| b == b'\n') {
2509-
Some(i) => p + i,
2510-
None => break,
2511-
};
2512-
let line_start_offset = start_offset + p as u64;
2513-
let line_end_offset = start_offset + nl_idx as u64 + 1;
2514-
let trimmed = std::str::from_utf8(&buf[p..nl_idx]).unwrap_or("").trim();
2515-
p = nl_idx + 1;
2529+
loop {
2530+
line_buf.clear();
2531+
let n = reader.read_until(b'\n', &mut line_buf)?;
2532+
if n == 0 {
2533+
break;
2534+
}
2535+
// Drop trailing partial lines — the next incremental call resumes
2536+
// from `cursor_offset`, which we only advance past complete `\n`.
2537+
if line_buf.last() != Some(&b'\n') {
2538+
break;
2539+
}
2540+
let line_start_offset = cursor_offset;
2541+
let line_end_offset = cursor_offset + n as u64;
2542+
let trimmed = std::str::from_utf8(&line_buf[..n - 1])
2543+
.unwrap_or("")
2544+
.trim();
25162545
cursor_offset = line_end_offset;
25172546
if trimmed.is_empty() {
25182547
continue;

crates/relayburn-sdk/src/reader/codex.rs

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
1313
use std::collections::{BTreeMap, BTreeSet, HashMap};
1414
use std::fs::File;
15-
use std::io::{Read, Seek, SeekFrom};
15+
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
1616
use std::path::Path;
1717

1818
use serde_json::Value;
@@ -187,13 +187,17 @@ pub fn parse_codex_session_incremental(
187187
resume: clone_resume(options.resume.as_ref()),
188188
});
189189
}
190-
let mut buf = vec![0u8; (size - start_offset) as usize];
191190
file.seek(SeekFrom::Start(start_offset))?;
192-
file.read_exact(&mut buf)?;
191+
// Stream from `start_offset` line-by-line. The previous implementation
192+
// pre-allocated `vec![0u8; (size - start_offset) as usize]` and
193+
// `read_exact` into it — for a multi-GB session that was a multi-GB
194+
// up-front allocation. With BufReader + `read_until` only the longest
195+
// single line stays resident.
196+
let reader = BufReader::new(file);
193197

194198
let project_resolver = ProjectResolver::new();
195199
Ok(parse_codex_buffer(
196-
&buf,
200+
reader,
197201
start_offset,
198202
options,
199203
&project_resolver,
@@ -330,8 +334,8 @@ struct Pending<T> {
330334
record: T,
331335
}
332336

333-
fn parse_codex_buffer(
334-
buf: &[u8],
337+
fn parse_codex_buffer<R: BufRead>(
338+
mut reader: R,
335339
start_offset: u64,
336340
options: &ParseCodexIncrementalOptions,
337341
project_resolver: &ProjectResolver,
@@ -387,16 +391,25 @@ fn parse_codex_buffer(
387391
let mut committed_tool_result_counters = tool_result_counters.clone();
388392
let mut committed_last_completed_turn = last_completed_turn.clone();
389393

390-
let mut p: usize = 0;
391-
while p < buf.len() {
392-
let nl_idx = match memchr_newline(&buf[p..]) {
393-
Some(idx) => p + idx,
394-
None => break,
394+
let mut line_buf: Vec<u8> = Vec::new();
395+
let mut current_offset: u64 = start_offset;
396+
loop {
397+
line_buf.clear();
398+
let n = match reader.read_until(b'\n', &mut line_buf) {
399+
Ok(0) => break,
400+
Ok(n) => n,
401+
Err(_) => break,
395402
};
396-
let line_end_offset = start_offset + (nl_idx as u64) + 1;
397-
let raw = &buf[p..nl_idx];
398-
p = nl_idx + 1;
399-
let text = std::str::from_utf8(raw).unwrap_or("").trim();
403+
// Drop trailing partial lines — the next incremental call resumes
404+
// from the committed end offset, which only advances past `\n`.
405+
if line_buf.last() != Some(&b'\n') {
406+
break;
407+
}
408+
let line_end_offset = current_offset + n as u64;
409+
current_offset = line_end_offset;
410+
let text = std::str::from_utf8(&line_buf[..n - 1])
411+
.unwrap_or("")
412+
.trim();
400413
if text.is_empty() {
401414
continue;
402415
}
@@ -1229,7 +1242,7 @@ fn resolve_token_counter(
12291242
}
12301243

12311244
fn memchr_newline(buf: &[u8]) -> Option<usize> {
1232-
buf.iter().position(|&b| b == b'\n')
1245+
memchr::memchr(b'\n', buf)
12331246
}
12341247

12351248
fn session_meta_payload_id(payload: &Value) -> Option<String> {

0 commit comments

Comments
 (0)