diff --git a/CHANGELOG.md b/CHANGELOG.md index b3c6925..2039c16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,15 @@ Cross-package release notes for relayburn. Package changelogs contain package-le ## [Unreleased] +### Changed + +- `relayburn-sdk`: dedupe ingest filesystem walks (`list_dirs`, + `list_jsonl_files`, `walk_jsonl`) into `ingest::walk`, fix the + `walk_jsonl` filter to match `.JSONL` case-insensitively, and collapse + the per-harness append boilerplate (`apply_parsed_extras`) and the + three single-harness verb skeletons (`run_single_harness`). No + behavior change beyond the case-sensitivity fix. (#343) + ## [2.8.7] - 2026-05-21 ### Changed diff --git a/crates/relayburn-sdk/src/ingest/ingest.rs b/crates/relayburn-sdk/src/ingest/ingest.rs index 3dd48fb..cebcca4 100644 --- a/crates/relayburn-sdk/src/ingest/ingest.rs +++ b/crates/relayburn-sdk/src/ingest/ingest.rs @@ -28,10 +28,12 @@ use crate::ledger::{load_config, Ledger}; use crate::reader::{ parse_claude_session, parse_claude_session_incremental, parse_codex_session_incremental, parse_opencode_session_incremental, reconcile_claude_session_relationships, - ClaudeParseIncrementalOptions, ClaudeParseOptions, CodexLastCompletedTurn, CodexResumeState, - CodexTurnContext, ContentStoreMode, CumulativeUsage as ReaderCumulativeUsage, - ParseCodexIncrementalOptions, ParseOpencodeIncrementalOptions, PersistedUserTurnSlot, - ReconcileClaudeRelationshipsInput, + ClaudeParseIncrementalOptions, ClaudeParseIncrementalResult, ClaudeParseOptions, + ClaudeParseResult, CodexLastCompletedTurn, CodexResumeState, CodexTurnContext, CompactionEvent, + ContentRecord, ContentStoreMode, CumulativeUsage as ReaderCumulativeUsage, + ParseCodexIncrementalOptions, ParseCodexIncrementalResult, ParseOpencodeIncrementalOptions, + ParseOpencodeIncrementalResult, PersistedUserTurnSlot, ReconcileClaudeRelationshipsInput, + SessionRelationshipRecord, ToolResultEventRecord, UserTurnRecord, }; use crate::ingest::cursors::{ @@ -46,7 +48,7 @@ use crate::ingest::pending_stamps::{ PendingStampSessionCandidate, }; use crate::ingest::reingest::derive_codex_session_id; -use crate::ingest::walk::{walk_jsonl, walk_opencode_sessions}; +use crate::ingest::walk::{list_dirs, list_jsonl_files, walk_jsonl, walk_opencode_sessions}; #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -241,73 +243,56 @@ pub fn ingest_claude_projects( ledger: &mut Ledger, opts: &IngestOptions, ) -> anyhow::Result { - progress(opts, "cleaning pending spawn stamps"); - cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?; - let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?; - let mut after = before.clone(); - let content_mode = resolve_content_mode(opts.ledger_home.as_deref()); - let report = ingest_claude_into( - ledger, - &mut after, - &opts.roots, - content_mode, - opts.ledger_home.as_deref(), - )?; - emit_gap_warning( - AdapterName::Claude, - content_mode, - opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)), - ); - save_cursors_if_changed(ledger, &before, &after).map_err(|e| anyhow::anyhow!(e))?; - Ok(report) + run_single_harness(ledger, opts, AdapterName::Claude, ingest_claude_into) } pub fn ingest_codex_sessions( ledger: &mut Ledger, opts: &IngestOptions, ) -> anyhow::Result { - progress(opts, "cleaning pending spawn stamps"); - cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?; - let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?; - let mut after = before.clone(); - let content_mode = resolve_content_mode(opts.ledger_home.as_deref()); - let report = ingest_codex_into( - ledger, - &mut after, - &opts.roots, - content_mode, - opts.ledger_home.as_deref(), - )?; - emit_gap_warning( - AdapterName::Codex, - content_mode, - opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)), - ); - save_cursors_if_changed(ledger, &before, &after).map_err(|e| anyhow::anyhow!(e))?; - Ok(report) + run_single_harness(ledger, opts, AdapterName::Codex, ingest_codex_into) } pub fn ingest_opencode_sessions( ledger: &mut Ledger, opts: &IngestOptions, ) -> anyhow::Result { + run_single_harness(ledger, opts, AdapterName::Opencode, ingest_opencode_into) +} + +/// Shared boilerplate for the per-harness verbs: clean stale stamps, snapshot +/// cursors, resolve content mode, run the harness body, emit any pending gap +/// warning for that adapter, then persist cursor mutations. The per-harness +/// `ingest_*_into` functions plug straight in as `body`. +fn run_single_harness( + ledger: &mut Ledger, + opts: &IngestOptions, + adapter: AdapterName, + body: F, +) -> anyhow::Result +where + F: FnOnce( + &mut Ledger, + &mut Cursors, + &IngestRoots, + ContentStoreMode, + Option<&Path>, + ) -> anyhow::Result, +{ progress(opts, "cleaning pending spawn stamps"); cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?; let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?; let mut after = before.clone(); let content_mode = resolve_content_mode(opts.ledger_home.as_deref()); - let report = ingest_opencode_into( + let report = body( ledger, &mut after, &opts.roots, content_mode, opts.ledger_home.as_deref(), )?; - emit_gap_warning( - AdapterName::Opencode, - content_mode, - opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)), - ); + let on_warn: Option<&dyn Fn(&str)> = opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)); + emit_gap_warning(adapter, content_mode, on_warn); save_cursors_if_changed(ledger, &before, &after).map_err(|e| anyhow::anyhow!(e))?; Ok(report) } @@ -322,10 +307,7 @@ pub fn ingest_claude_session( opts: &IngestOptions, ) -> anyhow::Result { // Encode cwd → flattened dir name (TS: `cwd.replace(/\//g, '-')`). - let encoded: String = cwd - .chars() - .map(|c| if c == '/' { '-' } else { c }) - .collect(); + let encoded = cwd.replace('/', "-"); let file = claude_projects_dir(&opts.roots) .join(&encoded) .join(format!("{session_id}.jsonl")); @@ -356,21 +338,7 @@ pub fn ingest_claude_session( let appended_turns = result.turns.len(); ledger.append_turns(&result.turns)?; - if !result.content.is_empty() { - ledger.append_content(&result.content)?; - } - if !result.events.is_empty() { - ledger.append_compactions(&result.events)?; - } - if !result.relationships.is_empty() { - ledger.append_relationships(&result.relationships)?; - } - if !result.tool_result_events.is_empty() { - ledger.append_tool_result_events(&result.tool_result_events)?; - } - if !result.user_turns.is_empty() { - ledger.append_user_turns(&result.user_turns)?; - } + apply_parsed_extras(ledger, &result)?; // Re-stat after parsing so the cursor reflects the byte position the // parser actually read to. `parse_claude_session` uses BufReader::lines() @@ -512,21 +480,7 @@ fn ingest_claude_into( count_new_tool_results(&parsed.content), ); } - if !parsed.content.is_empty() { - ledger.append_content(&parsed.content)?; - } - if !parsed.events.is_empty() { - ledger.append_compactions(&parsed.events)?; - } - if !parsed.relationships.is_empty() { - ledger.append_relationships(&parsed.relationships)?; - } - if !parsed.tool_result_events.is_empty() { - ledger.append_tool_result_events(&parsed.tool_result_events)?; - } - if !parsed.user_turns.is_empty() { - ledger.append_user_turns(&parsed.user_turns)?; - } + apply_parsed_extras(ledger, &parsed)?; reconcile_inputs.push(ReconcileClaudeRelationshipsInput { evidence: parsed.evidence, @@ -616,7 +570,7 @@ fn ingest_codex_into( resume, ..Default::default() }; - let parsed = match parse_codex_session_incremental(&file, &parse_opts) { + let mut parsed = match parse_codex_session_incremental(&file, &parse_opts) { Ok(r) => r, Err(err) => { eprintln!("[burn] skipping {}: {}", file.display(), err); @@ -624,7 +578,10 @@ fn ingest_codex_into( } }; - let next_resume = parsed.resume; + // Take `resume` out so the remaining `parsed` can be borrowed by + // `apply_parsed_extras` below; the resume state drives only cursor + // bookkeeping past that point. + let next_resume = std::mem::take(&mut parsed.resume); let mut codex_session_id = if !next_resume.session_id.is_empty() { Some(next_resume.session_id.clone()) } else { @@ -678,21 +635,7 @@ fn ingest_codex_into( count_new_tool_results(&parsed.content), ); } - if !parsed.content.is_empty() { - ledger.append_content(&parsed.content)?; - } - if !parsed.events.is_empty() { - ledger.append_compactions(&parsed.events)?; - } - if !parsed.relationships.is_empty() { - ledger.append_relationships(&parsed.relationships)?; - } - if !parsed.tool_result_events.is_empty() { - ledger.append_tool_result_events(&parsed.tool_result_events)?; - } - if !parsed.user_turns.is_empty() { - ledger.append_user_turns(&parsed.user_turns)?; - } + apply_parsed_extras(ledger, &parsed)?; let next = resume_state_to_codex_cursor(&next_resume, inode, parsed.end_offset, mtime); cursors.insert(key, FileCursor::Codex(Box::new(next))); @@ -796,21 +739,7 @@ fn ingest_opencode_into( count_new_tool_results(&parsed.content), ); } - if !parsed.content.is_empty() { - ledger.append_content(&parsed.content)?; - } - if !parsed.events.is_empty() { - ledger.append_compactions(&parsed.events)?; - } - if !parsed.relationships.is_empty() { - ledger.append_relationships(&parsed.relationships)?; - } - if !parsed.tool_result_events.is_empty() { - ledger.append_tool_result_events(&parsed.tool_result_events)?; - } - if !parsed.user_turns.is_empty() { - ledger.append_user_turns(&parsed.user_turns)?; - } + apply_parsed_extras(ledger, &parsed)?; let seen: Vec = parsed.seen_message_ids.into_iter().collect(); let next = OpencodeCursor { @@ -992,6 +921,69 @@ fn last_completed_turn_to_value(t: &CodexLastCompletedTurn) -> Value { Value::Object(m) } +/// Slice accessors shared by every parser result so [`apply_parsed_extras`] +/// can append the trailing derived-record buckets without caring which +/// harness produced them. Turns are deliberately omitted — the harness +/// orchestrators count them, look up the session id, and resolve pending +/// stamps before appending. +trait DerivedRecords { + fn content(&self) -> &[ContentRecord]; + fn events(&self) -> &[CompactionEvent]; + fn relationships(&self) -> &[SessionRelationshipRecord]; + fn tool_result_events(&self) -> &[ToolResultEventRecord]; + fn user_turns(&self) -> &[UserTurnRecord]; +} + +macro_rules! impl_derived_records { + ($ty:ty) => { + impl DerivedRecords for $ty { + fn content(&self) -> &[ContentRecord] { + &self.content + } + fn events(&self) -> &[CompactionEvent] { + &self.events + } + fn relationships(&self) -> &[SessionRelationshipRecord] { + &self.relationships + } + fn tool_result_events(&self) -> &[ToolResultEventRecord] { + &self.tool_result_events + } + fn user_turns(&self) -> &[UserTurnRecord] { + &self.user_turns + } + } + }; +} + +impl_derived_records!(ClaudeParseResult); +impl_derived_records!(ClaudeParseIncrementalResult); +impl_derived_records!(ParseCodexIncrementalResult); +impl_derived_records!(ParseOpencodeIncrementalResult); + +/// Append the trailing derived-record buckets shared by every parser +/// result: content, compactions, relationships, tool-result events, and +/// user-turn rows. Each bucket is gated on non-empty to avoid a no-op +/// transaction. +fn apply_parsed_extras(ledger: &mut Ledger, p: &P) -> anyhow::Result<()> { + if !p.content().is_empty() { + ledger.append_content(p.content())?; + } + if !p.events().is_empty() { + ledger.append_compactions(p.events())?; + } + if !p.relationships().is_empty() { + ledger.append_relationships(p.relationships())?; + } + if !p.tool_result_events().is_empty() { + ledger.append_tool_result_events(p.tool_result_events())?; + } + if !p.user_turns().is_empty() { + ledger.append_user_turns(p.user_turns())?; + } + Ok(()) +} + fn resolve_pending_stamps_for_report( ledger: &mut Ledger, candidate: &PendingStampSessionCandidate, @@ -1013,43 +1005,6 @@ fn resolve_pending_stamps_for_report( // --- filesystem helpers -------------------------------------------------- -fn list_dirs(parent: &Path) -> Vec { - let mut out = Vec::new(); - let entries = match fs::read_dir(parent) { - Ok(it) => it, - Err(_) => return out, - }; - for entry in entries.flatten() { - match entry.file_type() { - Ok(ft) if ft.is_dir() => out.push(parent.join(entry.file_name())), - _ => {} - } - } - out -} - -fn list_jsonl_files(dir: &Path) -> Vec { - let mut out = Vec::new(); - let entries = match fs::read_dir(dir) { - Ok(it) => it, - Err(_) => return out, - }; - for entry in entries.flatten() { - let Ok(ft) = entry.file_type() else { continue }; - if !ft.is_file() { - continue; - } - let name = entry.file_name(); - let Some(name_str) = name.to_str() else { - continue; - }; - if name_str.ends_with(".jsonl") { - out.push(dir.join(name_str)); - } - } - out -} - fn dir_mtime(dir: &Path) -> Option { let meta = fs::metadata(dir).ok()?; Some(mtime_ms(&meta)) diff --git a/crates/relayburn-sdk/src/ingest/reingest.rs b/crates/relayburn-sdk/src/ingest/reingest.rs index a4f82cd..c1c7da9 100644 --- a/crates/relayburn-sdk/src/ingest/reingest.rs +++ b/crates/relayburn-sdk/src/ingest/reingest.rs @@ -29,13 +29,13 @@ use std::path::{Path, PathBuf}; +use crate::ledger::Ledger; use crate::reader::{ parse_claude_session_incremental, parse_codex_session_incremental, parse_opencode_session_incremental, read_codex_session_id_hint, ClaudeParseIncrementalOptions, ContentRecord, ContentStoreMode, ParseCodexIncrementalOptions, ParseOpencodeIncrementalOptions, UserTurnRecord, }; -use crate::ledger::Ledger; use serde::{Deserialize, Serialize}; use std::collections::{BTreeSet, HashSet}; @@ -43,7 +43,7 @@ use crate::ingest::ingest::{ claude_projects_dir, codex_sessions_dir, opencode_message_root, opencode_session_root, IngestOptions, IngestRoots, }; -use crate::ingest::walk::{walk_jsonl, walk_opencode_sessions}; +use crate::ingest::walk::{list_dirs, list_jsonl_files, walk_jsonl, walk_opencode_sessions}; /// Outcome of [`reingest_missing_content`]. Mirrors the TS /// `ReingestContentReport` shape one-to-one. @@ -333,47 +333,6 @@ fn derive_claude_session_id(file: &Path) -> Option { .map(|s| s.to_string()) } -fn list_dirs(parent: &Path) -> Vec { - let mut out = Vec::new(); - let entries = match std::fs::read_dir(parent) { - Ok(it) => it, - Err(_) => return out, - }; - for entry in entries.flatten() { - let Ok(ft) = entry.file_type() else { - continue; - }; - if ft.is_dir() { - out.push(parent.join(entry.file_name())); - } - } - out -} - -fn list_jsonl_files(dir: &Path) -> Vec { - let mut out = Vec::new(); - let entries = match std::fs::read_dir(dir) { - Ok(it) => it, - Err(_) => return out, - }; - for entry in entries.flatten() { - let Ok(ft) = entry.file_type() else { - continue; - }; - if !ft.is_file() { - continue; - } - let name = entry.file_name(); - let Some(name_str) = name.to_str() else { - continue; - }; - if name_str.ends_with(".jsonl") { - out.push(dir.join(&name)); - } - } - out -} - #[cfg(test)] mod tests { use super::*; @@ -622,7 +581,9 @@ mod tests { #[test] fn derive_codex_session_id_recognizes_canonical_filenames() { - let p = PathBuf::from("/x/rollout-2026-04-24T00-00-00-000Z-11111111-2222-3333-4444-555555555555.jsonl"); + let p = PathBuf::from( + "/x/rollout-2026-04-24T00-00-00-000Z-11111111-2222-3333-4444-555555555555.jsonl", + ); assert_eq!( derive_codex_session_id(&p), Some("11111111-2222-3333-4444-555555555555".to_string()) diff --git a/crates/relayburn-sdk/src/ingest/walk.rs b/crates/relayburn-sdk/src/ingest/walk.rs index 8efa9c8..51da959 100644 --- a/crates/relayburn-sdk/src/ingest/walk.rs +++ b/crates/relayburn-sdk/src/ingest/walk.rs @@ -11,7 +11,7 @@ use std::path::{Path, PathBuf}; /// subdirectory. Mirrors `walkJsonl` in the TS adapter — used by Codex /// rollouts (`~/.codex/sessions/**/*.jsonl`). pub fn walk_jsonl>(root: P) -> Vec { - walk_files(root.as_ref(), |name| name.ends_with(".jsonl")) + walk_files(root.as_ref(), |name| has_extension_ci(name, "jsonl")) } /// Collect every `ses_*.json` file under `root`. Mirrors @@ -24,6 +24,54 @@ pub fn walk_opencode_sessions>(root: P) -> Vec { }) } +/// Immediate-children directory list. Unreadable parents yield empty +/// (silent skip), matching the recursive walkers above. +pub(crate) fn list_dirs(parent: &Path) -> Vec { + let mut out = Vec::new(); + let entries = match fs::read_dir(parent) { + Ok(it) => it, + Err(_) => return out, + }; + for entry in entries.flatten() { + match entry.file_type() { + Ok(ft) if ft.is_dir() => out.push(parent.join(entry.file_name())), + _ => {} + } + } + out +} + +/// Immediate-children `*.jsonl` file list (case-insensitive on the +/// extension). Unreadable directories yield empty. Used by the Claude +/// per-project sweep, which never recurses past the project dir. +pub(crate) fn list_jsonl_files(dir: &Path) -> Vec { + let mut out = Vec::new(); + let entries = match fs::read_dir(dir) { + Ok(it) => it, + Err(_) => return out, + }; + for entry in entries.flatten() { + let Ok(ft) = entry.file_type() else { continue }; + if !ft.is_file() { + continue; + } + let name = entry.file_name(); + let Some(name_str) = name.to_str() else { + continue; + }; + if has_extension_ci(name_str, "jsonl") { + out.push(dir.join(&name)); + } + } + out +} + +fn has_extension_ci(name: &str, ext: &str) -> bool { + Path::new(name) + .extension() + .is_some_and(|e| e.eq_ignore_ascii_case(ext)) +} + fn walk_files(root: &Path, accept: impl Fn(&str) -> bool) -> Vec { let mut out = Vec::new(); let mut stack = vec![root.to_path_buf()]; @@ -79,7 +127,21 @@ mod tests { let mut got = walk_jsonl(dir.path()); got.sort(); assert_eq!(got.len(), 3); - assert!(got.iter().all(|p| p.extension().unwrap() == "jsonl")); + assert!(got + .iter() + .all(|p| p.extension().unwrap().eq_ignore_ascii_case("jsonl"))); + } + + #[test] + fn walk_jsonl_matches_uppercase_extension() { + let dir = TempDir::new().unwrap(); + touch(&dir.path().join("a.jsonl")); + touch(&dir.path().join("b.JSONL")); + touch(&dir.path().join("nested/c.JsonL")); + + let mut got = walk_jsonl(dir.path()); + got.sort(); + assert_eq!(got.len(), 3); } #[test] @@ -99,11 +161,40 @@ mod tests { } } + #[test] + fn list_dirs_returns_immediate_children_only() { + let dir = TempDir::new().unwrap(); + create_dir_all(dir.path().join("a")).unwrap(); + create_dir_all(dir.path().join("b/nested")).unwrap(); + touch(&dir.path().join("c.txt")); + + let mut got = list_dirs(dir.path()); + got.sort(); + assert_eq!(got.len(), 2); + assert!(got[0].ends_with("a")); + assert!(got[1].ends_with("b")); + } + + #[test] + fn list_jsonl_files_is_non_recursive_and_case_insensitive() { + let dir = TempDir::new().unwrap(); + touch(&dir.path().join("a.jsonl")); + touch(&dir.path().join("b.JSONL")); + touch(&dir.path().join("nested/skip.jsonl")); + touch(&dir.path().join("not-this.json")); + + let mut got = list_jsonl_files(dir.path()); + got.sort(); + assert_eq!(got.len(), 2); + } + #[test] fn missing_root_returns_empty() { let dir = TempDir::new().unwrap(); let absent = dir.path().join("does-not-exist"); assert!(walk_jsonl(&absent).is_empty()); assert!(walk_opencode_sessions(&absent).is_empty()); + assert!(list_dirs(&absent).is_empty()); + assert!(list_jsonl_files(&absent).is_empty()); } }