Skip to content

Commit 34f2e3b

Browse files
willwashburnclaude
andauthored
refactor(sdk): dedupe ingest filesystem walks + per-harness apply boilerplate (#423)
Consolidates the three copies of `list_dirs` / `list_jsonl_files` / `walk_jsonl` into `ingest::walk`, fixes the `walk_jsonl` filter to match `.JSONL` case-insensitively (matches TS adapter), and collapses the per-harness append-if-not-empty boilerplate behind a `DerivedRecords` trait + `apply_parsed_extras` helper. The three single-harness ingest verbs now share a `run_single_harness` wrapper for the cleanup/load-cursors/resolve-content/emit-gap/save skeleton. Also switches the `ingest_claude_session` cwd encoding to the idiomatic `replace('/', "-")`. Closes #343 Co-authored-by: Claude <noreply@anthropic.com>
1 parent 2f07d32 commit 34f2e3b

4 files changed

Lines changed: 214 additions & 198 deletions

File tree

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@ Cross-package release notes for relayburn. Package changelogs contain package-le
44

55
## [Unreleased]
66

7+
### Changed
8+
9+
- `relayburn-sdk`: dedupe ingest filesystem walks (`list_dirs`,
10+
`list_jsonl_files`, `walk_jsonl`) into `ingest::walk`, fix the
11+
`walk_jsonl` filter to match `.JSONL` case-insensitively, and collapse
12+
the per-harness append boilerplate (`apply_parsed_extras`) and the
13+
three single-harness verb skeletons (`run_single_harness`). No
14+
behavior change beyond the case-sensitivity fix. (#343)
15+
716
## [2.8.7] - 2026-05-21
817

918
### Changed

crates/relayburn-sdk/src/ingest/ingest.rs

Lines changed: 107 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ use crate::ledger::{load_config, Ledger};
2828
use crate::reader::{
2929
parse_claude_session, parse_claude_session_incremental, parse_codex_session_incremental,
3030
parse_opencode_session_incremental, reconcile_claude_session_relationships,
31-
ClaudeParseIncrementalOptions, ClaudeParseOptions, CodexLastCompletedTurn, CodexResumeState,
32-
CodexTurnContext, ContentStoreMode, CumulativeUsage as ReaderCumulativeUsage,
33-
ParseCodexIncrementalOptions, ParseOpencodeIncrementalOptions, PersistedUserTurnSlot,
34-
ReconcileClaudeRelationshipsInput,
31+
ClaudeParseIncrementalOptions, ClaudeParseIncrementalResult, ClaudeParseOptions,
32+
ClaudeParseResult, CodexLastCompletedTurn, CodexResumeState, CodexTurnContext, CompactionEvent,
33+
ContentRecord, ContentStoreMode, CumulativeUsage as ReaderCumulativeUsage,
34+
ParseCodexIncrementalOptions, ParseCodexIncrementalResult, ParseOpencodeIncrementalOptions,
35+
ParseOpencodeIncrementalResult, PersistedUserTurnSlot, ReconcileClaudeRelationshipsInput,
36+
SessionRelationshipRecord, ToolResultEventRecord, UserTurnRecord,
3537
};
3638

3739
use crate::ingest::cursors::{
@@ -46,7 +48,7 @@ use crate::ingest::pending_stamps::{
4648
PendingStampSessionCandidate,
4749
};
4850
use crate::ingest::reingest::derive_codex_session_id;
49-
use crate::ingest::walk::{walk_jsonl, walk_opencode_sessions};
51+
use crate::ingest::walk::{list_dirs, list_jsonl_files, walk_jsonl, walk_opencode_sessions};
5052

5153
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
5254
#[serde(rename_all = "camelCase")]
@@ -241,73 +243,56 @@ pub fn ingest_claude_projects(
241243
ledger: &mut Ledger,
242244
opts: &IngestOptions,
243245
) -> anyhow::Result<IngestReport> {
244-
progress(opts, "cleaning pending spawn stamps");
245-
cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?;
246-
let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?;
247-
let mut after = before.clone();
248-
let content_mode = resolve_content_mode(opts.ledger_home.as_deref());
249-
let report = ingest_claude_into(
250-
ledger,
251-
&mut after,
252-
&opts.roots,
253-
content_mode,
254-
opts.ledger_home.as_deref(),
255-
)?;
256-
emit_gap_warning(
257-
AdapterName::Claude,
258-
content_mode,
259-
opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)),
260-
);
261-
save_cursors_if_changed(ledger, &before, &after).map_err(|e| anyhow::anyhow!(e))?;
262-
Ok(report)
246+
run_single_harness(ledger, opts, AdapterName::Claude, ingest_claude_into)
263247
}
264248

265249
pub fn ingest_codex_sessions(
266250
ledger: &mut Ledger,
267251
opts: &IngestOptions,
268252
) -> anyhow::Result<IngestReport> {
269-
progress(opts, "cleaning pending spawn stamps");
270-
cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?;
271-
let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?;
272-
let mut after = before.clone();
273-
let content_mode = resolve_content_mode(opts.ledger_home.as_deref());
274-
let report = ingest_codex_into(
275-
ledger,
276-
&mut after,
277-
&opts.roots,
278-
content_mode,
279-
opts.ledger_home.as_deref(),
280-
)?;
281-
emit_gap_warning(
282-
AdapterName::Codex,
283-
content_mode,
284-
opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)),
285-
);
286-
save_cursors_if_changed(ledger, &before, &after).map_err(|e| anyhow::anyhow!(e))?;
287-
Ok(report)
253+
run_single_harness(ledger, opts, AdapterName::Codex, ingest_codex_into)
288254
}
289255

290256
pub fn ingest_opencode_sessions(
291257
ledger: &mut Ledger,
292258
opts: &IngestOptions,
293259
) -> anyhow::Result<IngestReport> {
260+
run_single_harness(ledger, opts, AdapterName::Opencode, ingest_opencode_into)
261+
}
262+
263+
/// Shared boilerplate for the per-harness verbs: clean stale stamps, snapshot
264+
/// cursors, resolve content mode, run the harness body, emit any pending gap
265+
/// warning for that adapter, then persist cursor mutations. The per-harness
266+
/// `ingest_*_into` functions plug straight in as `body`.
267+
fn run_single_harness<F>(
268+
ledger: &mut Ledger,
269+
opts: &IngestOptions,
270+
adapter: AdapterName,
271+
body: F,
272+
) -> anyhow::Result<IngestReport>
273+
where
274+
F: FnOnce(
275+
&mut Ledger,
276+
&mut Cursors,
277+
&IngestRoots,
278+
ContentStoreMode,
279+
Option<&Path>,
280+
) -> anyhow::Result<IngestReport>,
281+
{
294282
progress(opts, "cleaning pending spawn stamps");
295283
cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?;
296284
let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?;
297285
let mut after = before.clone();
298286
let content_mode = resolve_content_mode(opts.ledger_home.as_deref());
299-
let report = ingest_opencode_into(
287+
let report = body(
300288
ledger,
301289
&mut after,
302290
&opts.roots,
303291
content_mode,
304292
opts.ledger_home.as_deref(),
305293
)?;
306-
emit_gap_warning(
307-
AdapterName::Opencode,
308-
content_mode,
309-
opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)),
310-
);
294+
let on_warn: Option<&dyn Fn(&str)> = opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str));
295+
emit_gap_warning(adapter, content_mode, on_warn);
311296
save_cursors_if_changed(ledger, &before, &after).map_err(|e| anyhow::anyhow!(e))?;
312297
Ok(report)
313298
}
@@ -322,10 +307,7 @@ pub fn ingest_claude_session(
322307
opts: &IngestOptions,
323308
) -> anyhow::Result<IngestReport> {
324309
// Encode cwd → flattened dir name (TS: `cwd.replace(/\//g, '-')`).
325-
let encoded: String = cwd
326-
.chars()
327-
.map(|c| if c == '/' { '-' } else { c })
328-
.collect();
310+
let encoded = cwd.replace('/', "-");
329311
let file = claude_projects_dir(&opts.roots)
330312
.join(&encoded)
331313
.join(format!("{session_id}.jsonl"));
@@ -356,21 +338,7 @@ pub fn ingest_claude_session(
356338

357339
let appended_turns = result.turns.len();
358340
ledger.append_turns(&result.turns)?;
359-
if !result.content.is_empty() {
360-
ledger.append_content(&result.content)?;
361-
}
362-
if !result.events.is_empty() {
363-
ledger.append_compactions(&result.events)?;
364-
}
365-
if !result.relationships.is_empty() {
366-
ledger.append_relationships(&result.relationships)?;
367-
}
368-
if !result.tool_result_events.is_empty() {
369-
ledger.append_tool_result_events(&result.tool_result_events)?;
370-
}
371-
if !result.user_turns.is_empty() {
372-
ledger.append_user_turns(&result.user_turns)?;
373-
}
341+
apply_parsed_extras(ledger, &result)?;
374342

375343
// Re-stat after parsing so the cursor reflects the byte position the
376344
// parser actually read to. `parse_claude_session` uses BufReader::lines()
@@ -512,21 +480,7 @@ fn ingest_claude_into(
512480
count_new_tool_results(&parsed.content),
513481
);
514482
}
515-
if !parsed.content.is_empty() {
516-
ledger.append_content(&parsed.content)?;
517-
}
518-
if !parsed.events.is_empty() {
519-
ledger.append_compactions(&parsed.events)?;
520-
}
521-
if !parsed.relationships.is_empty() {
522-
ledger.append_relationships(&parsed.relationships)?;
523-
}
524-
if !parsed.tool_result_events.is_empty() {
525-
ledger.append_tool_result_events(&parsed.tool_result_events)?;
526-
}
527-
if !parsed.user_turns.is_empty() {
528-
ledger.append_user_turns(&parsed.user_turns)?;
529-
}
483+
apply_parsed_extras(ledger, &parsed)?;
530484

531485
reconcile_inputs.push(ReconcileClaudeRelationshipsInput {
532486
evidence: parsed.evidence,
@@ -616,15 +570,18 @@ fn ingest_codex_into(
616570
resume,
617571
..Default::default()
618572
};
619-
let parsed = match parse_codex_session_incremental(&file, &parse_opts) {
573+
let mut parsed = match parse_codex_session_incremental(&file, &parse_opts) {
620574
Ok(r) => r,
621575
Err(err) => {
622576
eprintln!("[burn] skipping {}: {}", file.display(), err);
623577
continue;
624578
}
625579
};
626580

627-
let next_resume = parsed.resume;
581+
// Take `resume` out so the remaining `parsed` can be borrowed by
582+
// `apply_parsed_extras` below; the resume state drives only cursor
583+
// bookkeeping past that point.
584+
let next_resume = std::mem::take(&mut parsed.resume);
628585
let mut codex_session_id = if !next_resume.session_id.is_empty() {
629586
Some(next_resume.session_id.clone())
630587
} else {
@@ -678,21 +635,7 @@ fn ingest_codex_into(
678635
count_new_tool_results(&parsed.content),
679636
);
680637
}
681-
if !parsed.content.is_empty() {
682-
ledger.append_content(&parsed.content)?;
683-
}
684-
if !parsed.events.is_empty() {
685-
ledger.append_compactions(&parsed.events)?;
686-
}
687-
if !parsed.relationships.is_empty() {
688-
ledger.append_relationships(&parsed.relationships)?;
689-
}
690-
if !parsed.tool_result_events.is_empty() {
691-
ledger.append_tool_result_events(&parsed.tool_result_events)?;
692-
}
693-
if !parsed.user_turns.is_empty() {
694-
ledger.append_user_turns(&parsed.user_turns)?;
695-
}
638+
apply_parsed_extras(ledger, &parsed)?;
696639

697640
let next = resume_state_to_codex_cursor(&next_resume, inode, parsed.end_offset, mtime);
698641
cursors.insert(key, FileCursor::Codex(Box::new(next)));
@@ -796,21 +739,7 @@ fn ingest_opencode_into(
796739
count_new_tool_results(&parsed.content),
797740
);
798741
}
799-
if !parsed.content.is_empty() {
800-
ledger.append_content(&parsed.content)?;
801-
}
802-
if !parsed.events.is_empty() {
803-
ledger.append_compactions(&parsed.events)?;
804-
}
805-
if !parsed.relationships.is_empty() {
806-
ledger.append_relationships(&parsed.relationships)?;
807-
}
808-
if !parsed.tool_result_events.is_empty() {
809-
ledger.append_tool_result_events(&parsed.tool_result_events)?;
810-
}
811-
if !parsed.user_turns.is_empty() {
812-
ledger.append_user_turns(&parsed.user_turns)?;
813-
}
742+
apply_parsed_extras(ledger, &parsed)?;
814743

815744
let seen: Vec<String> = parsed.seen_message_ids.into_iter().collect();
816745
let next = OpencodeCursor {
@@ -992,6 +921,69 @@ fn last_completed_turn_to_value(t: &CodexLastCompletedTurn) -> Value {
992921
Value::Object(m)
993922
}
994923

924+
/// Slice accessors shared by every parser result so [`apply_parsed_extras`]
925+
/// can append the trailing derived-record buckets without caring which
926+
/// harness produced them. Turns are deliberately omitted — the harness
927+
/// orchestrators count them, look up the session id, and resolve pending
928+
/// stamps before appending.
929+
trait DerivedRecords {
930+
fn content(&self) -> &[ContentRecord];
931+
fn events(&self) -> &[CompactionEvent];
932+
fn relationships(&self) -> &[SessionRelationshipRecord];
933+
fn tool_result_events(&self) -> &[ToolResultEventRecord];
934+
fn user_turns(&self) -> &[UserTurnRecord];
935+
}
936+
937+
macro_rules! impl_derived_records {
938+
($ty:ty) => {
939+
impl DerivedRecords for $ty {
940+
fn content(&self) -> &[ContentRecord] {
941+
&self.content
942+
}
943+
fn events(&self) -> &[CompactionEvent] {
944+
&self.events
945+
}
946+
fn relationships(&self) -> &[SessionRelationshipRecord] {
947+
&self.relationships
948+
}
949+
fn tool_result_events(&self) -> &[ToolResultEventRecord] {
950+
&self.tool_result_events
951+
}
952+
fn user_turns(&self) -> &[UserTurnRecord] {
953+
&self.user_turns
954+
}
955+
}
956+
};
957+
}
958+
959+
impl_derived_records!(ClaudeParseResult);
960+
impl_derived_records!(ClaudeParseIncrementalResult);
961+
impl_derived_records!(ParseCodexIncrementalResult);
962+
impl_derived_records!(ParseOpencodeIncrementalResult);
963+
964+
/// Append the trailing derived-record buckets shared by every parser
965+
/// result: content, compactions, relationships, tool-result events, and
966+
/// user-turn rows. Each bucket is gated on non-empty to avoid a no-op
967+
/// transaction.
968+
fn apply_parsed_extras<P: DerivedRecords>(ledger: &mut Ledger, p: &P) -> anyhow::Result<()> {
969+
if !p.content().is_empty() {
970+
ledger.append_content(p.content())?;
971+
}
972+
if !p.events().is_empty() {
973+
ledger.append_compactions(p.events())?;
974+
}
975+
if !p.relationships().is_empty() {
976+
ledger.append_relationships(p.relationships())?;
977+
}
978+
if !p.tool_result_events().is_empty() {
979+
ledger.append_tool_result_events(p.tool_result_events())?;
980+
}
981+
if !p.user_turns().is_empty() {
982+
ledger.append_user_turns(p.user_turns())?;
983+
}
984+
Ok(())
985+
}
986+
995987
fn resolve_pending_stamps_for_report(
996988
ledger: &mut Ledger,
997989
candidate: &PendingStampSessionCandidate,
@@ -1013,43 +1005,6 @@ fn resolve_pending_stamps_for_report(
10131005

10141006
// --- filesystem helpers --------------------------------------------------
10151007

1016-
fn list_dirs(parent: &Path) -> Vec<PathBuf> {
1017-
let mut out = Vec::new();
1018-
let entries = match fs::read_dir(parent) {
1019-
Ok(it) => it,
1020-
Err(_) => return out,
1021-
};
1022-
for entry in entries.flatten() {
1023-
match entry.file_type() {
1024-
Ok(ft) if ft.is_dir() => out.push(parent.join(entry.file_name())),
1025-
_ => {}
1026-
}
1027-
}
1028-
out
1029-
}
1030-
1031-
fn list_jsonl_files(dir: &Path) -> Vec<PathBuf> {
1032-
let mut out = Vec::new();
1033-
let entries = match fs::read_dir(dir) {
1034-
Ok(it) => it,
1035-
Err(_) => return out,
1036-
};
1037-
for entry in entries.flatten() {
1038-
let Ok(ft) = entry.file_type() else { continue };
1039-
if !ft.is_file() {
1040-
continue;
1041-
}
1042-
let name = entry.file_name();
1043-
let Some(name_str) = name.to_str() else {
1044-
continue;
1045-
};
1046-
if name_str.ends_with(".jsonl") {
1047-
out.push(dir.join(name_str));
1048-
}
1049-
}
1050-
out
1051-
}
1052-
10531008
fn dir_mtime(dir: &Path) -> Option<i64> {
10541009
let meta = fs::metadata(dir).ok()?;
10551010
Some(mtime_ms(&meta))

0 commit comments

Comments
 (0)