Skip to content

Commit dc35146

Browse files
willwashburnclaude
andauthored
reader (Rust): stream JSONL hot loops; use memchr crate (#372)
* reader (Rust): stream JSONL hot loops; use memchr crate The Claude / Codex incremental and prescan paths previously built a multi-GB up-front buffer (`vec![0u8; (size - start_offset) as usize]`, `Vec::with_capacity((size - start_offset) as usize)` + `read_to_end`) to scan a session log. Switch to BufReader + `read_until(b'\n', ...)` into a reused line buffer so only the longest single line stays resident, regardless of file size. The main `parse_claude_session` loop also moves off `BufReader::lines()` (a fresh `String` per line) onto `read_line` into a reused `String`, keeping per-line allocation bounded by the longest line for sessions with tens of thousands of turns. `memchr_newline` in the codex parser was named for `memchr` but did `buf.iter().position(|&b| b == b'\n')`. Wire the actual `memchr` crate (already a transitive dep through `regex`) for SIMD-accelerated line splits; pinned at the workspace root and depended on directly from `relayburn-sdk` so we own the version. Closes #323. * codex reader: propagate read_until I/O errors Address review feedback on #372: the codex streaming loop was swallowing `read_until` failures via `Err(_) => break`, which would silently truncate the parse at a transient mid-file read error and advance the resume cursor as if the bytes had been processed. The claude.rs equivalents already use `?` to propagate. Bubble the error by changing `parse_codex_buffer` to return `std::io::Result<ParseCodexIncrementalResult>` and using `?` in the loop, matching `parse_codex_session_incremental`'s outer signature. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent e0c8fa0 commit dc35146

6 files changed

Lines changed: 108 additions & 56 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ Cross-package release notes for relayburn. Package changelogs contain package-le
44

55
## [Unreleased]
66

7+
- `relayburn-sdk` (Rust): reader hot loops in `claude.rs` and `codex.rs` now stream JSONL line-by-line via `BufReader::read_until` instead of pre-allocating a `(size - start_offset)`-byte buffer up front; only the longest single line stays resident. `memchr_newline` in the codex parser now actually uses the `memchr` crate for SIMD-accelerated newline scanning. The main `parse_claude_session` loop also drops `BufReader::lines()` in favor of `read_line` into a reused `String`. (#323)
8+
79
## [2.0.0] - 2026-05-07
810

911
- `relayburn-sdk` (Rust): default ledger home moves from `~/.relayburn` to `~/.agentworkforce/burn` so the Rust 2.0 port and the TS 1.x package can coexist on disk during the #249 cutover. `RELAYBURN_HOME` (and the per-DB path overrides) continue to override the path; TS 1.x users on `~/.relayburn` are unaffected. Rust-port testers with data under the old path can `mv ~/.relayburn ~/.agentworkforce/burn` to carry it over (formats are not compatible — Rust treats any non-2.0 layout as empty and requires a `burn ingest` re-population).

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ anyhow = "1"
1717
thiserror = "2"
1818
clap = { version = "4", features = ["derive"] }
1919
rusqlite = { version = "0.32", features = ["bundled"] }
20+
memchr = "2"

crates/relayburn-sdk/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ sha2 = "0.10"
3333
hex = "0.4"
3434
regex = "1"
3535
phf = { version = "0.11", features = ["macros"] }
36+
# reader: SIMD-accelerated newline split for the JSONL hot loops in
37+
# `reader/{claude,codex}.rs`. `regex` already pulls memchr in
38+
# transitively; we depend on it directly so we own the version.
39+
memchr = { workspace = true }
3640

3741
# ledger: SQLite events + content store
3842
rusqlite = { workspace = true }

crates/relayburn-sdk/src/reader/claude.rs

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,21 @@ pub fn parse_claude_session_with_counter<P: AsRef<Path>, C: TokenCounter + ?Size
9696
let capture_content = matches!(content_mode, ContentStoreMode::Full);
9797

9898
let file = File::open(path)?;
99-
let reader = BufReader::new(file);
99+
let mut reader = BufReader::new(file);
100100

101101
let mut state = ParseState::new(options, path);
102102

103-
for line in reader.lines() {
104-
let line = line?;
105-
state.ingest_line(&line, counter, capture_content);
103+
// `BufReader::lines()` allocates a fresh `String` per line; for sessions
104+
// with tens of thousands of turns that's pure churn. `read_line` into a
105+
// single reused buffer keeps allocation bounded by the longest line.
106+
let mut line = String::new();
107+
loop {
108+
line.clear();
109+
match reader.read_line(&mut line) {
110+
Ok(0) => break,
111+
Ok(_) => state.ingest_line(&line, counter, capture_content),
112+
Err(err) => return Err(err),
113+
}
106114
}
107115

108116
Ok(state.finish(options, capture_content))
@@ -2290,7 +2298,7 @@ fn prescan_nodes(
22902298
next_event_index: 0,
22912299
});
22922300
}
2293-
let mut file = File::open(path)?;
2301+
let file = File::open(path)?;
22942302
let size = file.metadata()?.len();
22952303
let length = end_offset.min(size);
22962304
if length == 0 {
@@ -2299,18 +2307,29 @@ fn prescan_nodes(
22992307
next_event_index: 0,
23002308
});
23012309
}
2302-
let mut buf = vec![0u8; length as usize];
2303-
file.read_exact(&mut buf)?;
2304-
let mut p: usize = 0;
2310+
// Stream the prefix line-by-line rather than reading `[0, length)`
2311+
// into memory all at once. For multi-GB sessions the up-front
2312+
// `vec![0u8; length as usize]` was a multi-GB allocation we never
2313+
// need — only the longest single line has to fit in memory.
2314+
let mut reader = BufReader::new(file).take(length);
2315+
let mut line_buf: Vec<u8> = Vec::new();
23052316
let mut last_assistant_message_id: Option<String> = None;
23062317
let mut next_event_index: u64 = 0;
2307-
while p < buf.len() {
2308-
let nl_idx = match buf[p..].iter().position(|&b| b == b'\n') {
2309-
Some(i) => p + i,
2310-
None => break,
2311-
};
2312-
let raw = std::str::from_utf8(&buf[p..nl_idx]).unwrap_or("").trim();
2313-
p = nl_idx + 1;
2318+
loop {
2319+
line_buf.clear();
2320+
let n = reader.read_until(b'\n', &mut line_buf)?;
2321+
if n == 0 {
2322+
break;
2323+
}
2324+
// A trailing partial line (no `\n`) inside the prescan window
2325+
// should never happen — incremental ingest only commits cursors
2326+
// at newline boundaries — but guard anyway.
2327+
if line_buf.last() != Some(&b'\n') {
2328+
break;
2329+
}
2330+
let raw = std::str::from_utf8(&line_buf[..n - 1])
2331+
.unwrap_or("")
2332+
.trim();
23142333
if raw.is_empty() {
23152334
continue;
23162335
}
@@ -2499,20 +2518,30 @@ fn run_incremental<C: TokenCounter + ?Sized>(
24992518

25002519
let mut file = File::open(path)?;
25012520
file.seek(SeekFrom::Start(start_offset))?;
2502-
let mut buf: Vec<u8> = Vec::with_capacity((size - start_offset) as usize);
2503-
file.read_to_end(&mut buf)?;
2504-
2505-
let mut p: usize = 0;
2521+
// Stream from `start_offset` line-by-line. The previous implementation
2522+
// allocated `Vec::with_capacity((size - start_offset) as usize)` and
2523+
// `read_to_end` into it — for a multi-GB session this was a multi-GB
2524+
// up-front allocation. With BufReader + `read_until` only the longest
2525+
// single line stays resident.
2526+
let mut reader = BufReader::new(file);
2527+
let mut line_buf: Vec<u8> = Vec::new();
25062528
let mut cursor_offset: u64 = start_offset; // position past last complete \n
2507-
while p < buf.len() {
2508-
let nl_idx = match buf[p..].iter().position(|&b| b == b'\n') {
2509-
Some(i) => p + i,
2510-
None => break,
2511-
};
2512-
let line_start_offset = start_offset + p as u64;
2513-
let line_end_offset = start_offset + nl_idx as u64 + 1;
2514-
let trimmed = std::str::from_utf8(&buf[p..nl_idx]).unwrap_or("").trim();
2515-
p = nl_idx + 1;
2529+
loop {
2530+
line_buf.clear();
2531+
let n = reader.read_until(b'\n', &mut line_buf)?;
2532+
if n == 0 {
2533+
break;
2534+
}
2535+
// Drop trailing partial lines — the next incremental call resumes
2536+
// from `cursor_offset`, which we only advance past complete `\n`.
2537+
if line_buf.last() != Some(&b'\n') {
2538+
break;
2539+
}
2540+
let line_start_offset = cursor_offset;
2541+
let line_end_offset = cursor_offset + n as u64;
2542+
let trimmed = std::str::from_utf8(&line_buf[..n - 1])
2543+
.unwrap_or("")
2544+
.trim();
25162545
cursor_offset = line_end_offset;
25172546
if trimmed.is_empty() {
25182547
continue;

crates/relayburn-sdk/src/reader/codex.rs

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
1313
use std::collections::{BTreeMap, BTreeSet, HashMap};
1414
use std::fs::File;
15-
use std::io::{Read, Seek, SeekFrom};
15+
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
1616
use std::path::Path;
1717

1818
use serde_json::Value;
@@ -192,17 +192,16 @@ pub fn parse_codex_session_incremental(
192192
resume: clone_resume(options.resume.as_ref()),
193193
});
194194
}
195-
let mut buf = vec![0u8; (size - start_offset) as usize];
196195
file.seek(SeekFrom::Start(start_offset))?;
197-
file.read_exact(&mut buf)?;
196+
// Stream from `start_offset` line-by-line. The previous implementation
197+
// pre-allocated `vec![0u8; (size - start_offset) as usize]` and
198+
// `read_exact` into it — for a multi-GB session that was a multi-GB
199+
// up-front allocation. With BufReader + `read_until` only the longest
200+
// single line stays resident.
201+
let reader = BufReader::new(file);
198202

199203
let project_resolver = ProjectResolver::new();
200-
Ok(parse_codex_buffer(
201-
&buf,
202-
start_offset,
203-
options,
204-
&project_resolver,
205-
))
204+
parse_codex_buffer(reader, start_offset, options, &project_resolver)
206205
}
207206

208207
// ---------------------------------------------------------------------------
@@ -335,12 +334,12 @@ struct Pending<T> {
335334
record: T,
336335
}
337336

338-
fn parse_codex_buffer(
339-
buf: &[u8],
337+
fn parse_codex_buffer<R: BufRead>(
338+
mut reader: R,
340339
start_offset: u64,
341340
options: &ParseCodexIncrementalOptions,
342341
project_resolver: &ProjectResolver,
343-
) -> ParseCodexIncrementalResult {
342+
) -> std::io::Result<ParseCodexIncrementalResult> {
344343
let capture_content = matches!(options.content_mode, Some(ContentStoreMode::Full));
345344
// Validated by `resolve_token_counter` at the public entry point.
346345
let counter = HeuristicCounter;
@@ -392,16 +391,24 @@ fn parse_codex_buffer(
392391
let mut committed_tool_result_counters = tool_result_counters.clone();
393392
let mut committed_last_completed_turn = last_completed_turn.clone();
394393

395-
let mut p: usize = 0;
396-
while p < buf.len() {
397-
let nl_idx = match find_newline(&buf[p..]) {
398-
Some(idx) => p + idx,
399-
None => break,
400-
};
401-
let line_end_offset = start_offset + (nl_idx as u64) + 1;
402-
let raw = &buf[p..nl_idx];
403-
p = nl_idx + 1;
404-
let text = std::str::from_utf8(raw).unwrap_or("").trim();
394+
let mut line_buf: Vec<u8> = Vec::new();
395+
let mut current_offset: u64 = start_offset;
396+
loop {
397+
line_buf.clear();
398+
let n = reader.read_until(b'\n', &mut line_buf)?;
399+
if n == 0 {
400+
break;
401+
}
402+
// Drop trailing partial lines — the next incremental call resumes
403+
// from the committed end offset, which only advances past `\n`.
404+
if line_buf.last() != Some(&b'\n') {
405+
break;
406+
}
407+
let line_end_offset = current_offset + n as u64;
408+
current_offset = line_end_offset;
409+
let text = std::str::from_utf8(&line_buf[..n - 1])
410+
.unwrap_or("")
411+
.trim();
405412
if text.is_empty() {
406413
continue;
407414
}
@@ -1185,7 +1192,19 @@ fn parse_codex_buffer(
11851192
}
11861193
}
11871194

1188-
ParseCodexIncrementalResult {
1195+
// Silence unused-mutable warnings for snapshot mirrors that are written
1196+
// but only read indirectly.
1197+
let _ = (
1198+
cumulative,
1199+
session_id,
1200+
session_cwd,
1201+
turn_contexts,
1202+
seen_session_meta_keys,
1203+
root_session_emitted,
1204+
last_completed_turn,
1205+
);
1206+
1207+
Ok(ParseCodexIncrementalResult {
11891208
turns,
11901209
content: content_out,
11911210
events: events_out,
@@ -1194,7 +1213,7 @@ fn parse_codex_buffer(
11941213
tool_result_events: tool_events_out,
11951214
end_offset: committed_end_offset,
11961215
resume,
1197-
}
1216+
})
11981217
}
11991218

12001219
// ---------------------------------------------------------------------------
@@ -1217,10 +1236,6 @@ fn resolve_token_counter(
12171236
}
12181237
}
12191238

1220-
fn find_newline(buf: &[u8]) -> Option<usize> {
1221-
buf.iter().position(|&b| b == b'\n')
1222-
}
1223-
12241239
fn session_meta_payload_id(payload: &Value) -> Option<String> {
12251240
let id = payload.get("id")?.as_str()?;
12261241
if id.is_empty() {

0 commit comments

Comments
 (0)