Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ Cross-package release notes for relayburn. Package changelogs contain package-le

## [Unreleased]

### Changed

- `relayburn-sdk`: dedupe ingest filesystem walks (`list_dirs`,
`list_jsonl_files`, `walk_jsonl`) into `ingest::walk`, fix the
`walk_jsonl` filter to match `.JSONL` case-insensitively, and collapse
the per-harness append boilerplate (`apply_parsed_extras`) and the
three single-harness verb skeletons (`run_single_harness`). No
behavior change beyond the case-sensitivity fix. (#343)
Comment on lines +9 to +14
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Split this into concise impact-first bullets and drop the issue reference.

This entry is doing too much in one bullet and is implementation-heavy; it’s harder to scan in [Unreleased]. Please break it into short user-visible bullets (e.g., .JSONL matching fix as one bullet, ingest refactor effects as separate bullets) and remove (#343).

As per coding guidelines: “Changelog entries should be concise and impact-first… Prefer one short bullet per user-visible change… Drop issue/PR links…”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@CHANGELOG.md` around lines 9 - 14, Split the long implementation-heavy bullet
into multiple concise, impact-first bullets and remove the issue reference:
create one short user-visible bullet stating the .JSONL filename matching is now
case-insensitive (fixes `.JSONL` vs `.jsonl` filtering), another bullet saying
filesystem walk helpers in relayburn-sdk were consolidated (mentioning functions
list_dirs, list_jsonl_files, walk_jsonl collapsed into ingest::walk) and a third
noting the internal boilerplate collapse (apply_parsed_extras and the
single-harness helpers like run_single_harness were simplified) — keep each
bullet one terse sentence and drop the "(`#343`)" reference.


## [2.8.7] - 2026-05-21

### Changed
Expand Down
259 changes: 107 additions & 152 deletions crates/relayburn-sdk/src/ingest/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ use crate::ledger::{load_config, Ledger};
use crate::reader::{
parse_claude_session, parse_claude_session_incremental, parse_codex_session_incremental,
parse_opencode_session_incremental, reconcile_claude_session_relationships,
ClaudeParseIncrementalOptions, ClaudeParseOptions, CodexLastCompletedTurn, CodexResumeState,
CodexTurnContext, ContentStoreMode, CumulativeUsage as ReaderCumulativeUsage,
ParseCodexIncrementalOptions, ParseOpencodeIncrementalOptions, PersistedUserTurnSlot,
ReconcileClaudeRelationshipsInput,
ClaudeParseIncrementalOptions, ClaudeParseIncrementalResult, ClaudeParseOptions,
ClaudeParseResult, CodexLastCompletedTurn, CodexResumeState, CodexTurnContext, CompactionEvent,
ContentRecord, ContentStoreMode, CumulativeUsage as ReaderCumulativeUsage,
ParseCodexIncrementalOptions, ParseCodexIncrementalResult, ParseOpencodeIncrementalOptions,
ParseOpencodeIncrementalResult, PersistedUserTurnSlot, ReconcileClaudeRelationshipsInput,
SessionRelationshipRecord, ToolResultEventRecord, UserTurnRecord,
};

use crate::ingest::cursors::{
Expand All @@ -46,7 +48,7 @@ use crate::ingest::pending_stamps::{
PendingStampSessionCandidate,
};
use crate::ingest::reingest::derive_codex_session_id;
use crate::ingest::walk::{walk_jsonl, walk_opencode_sessions};
use crate::ingest::walk::{list_dirs, list_jsonl_files, walk_jsonl, walk_opencode_sessions};

#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -241,73 +243,56 @@ pub fn ingest_claude_projects(
ledger: &mut Ledger,
opts: &IngestOptions,
) -> anyhow::Result<IngestReport> {
progress(opts, "cleaning pending spawn stamps");
cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?;
let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?;
let mut after = before.clone();
let content_mode = resolve_content_mode(opts.ledger_home.as_deref());
let report = ingest_claude_into(
ledger,
&mut after,
&opts.roots,
content_mode,
opts.ledger_home.as_deref(),
)?;
emit_gap_warning(
AdapterName::Claude,
content_mode,
opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)),
);
save_cursors_if_changed(ledger, &before, &after).map_err(|e| anyhow::anyhow!(e))?;
Ok(report)
run_single_harness(ledger, opts, AdapterName::Claude, ingest_claude_into)
}

pub fn ingest_codex_sessions(
ledger: &mut Ledger,
opts: &IngestOptions,
) -> anyhow::Result<IngestReport> {
progress(opts, "cleaning pending spawn stamps");
cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?;
let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?;
let mut after = before.clone();
let content_mode = resolve_content_mode(opts.ledger_home.as_deref());
let report = ingest_codex_into(
ledger,
&mut after,
&opts.roots,
content_mode,
opts.ledger_home.as_deref(),
)?;
emit_gap_warning(
AdapterName::Codex,
content_mode,
opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)),
);
save_cursors_if_changed(ledger, &before, &after).map_err(|e| anyhow::anyhow!(e))?;
Ok(report)
run_single_harness(ledger, opts, AdapterName::Codex, ingest_codex_into)
}

pub fn ingest_opencode_sessions(
ledger: &mut Ledger,
opts: &IngestOptions,
) -> anyhow::Result<IngestReport> {
run_single_harness(ledger, opts, AdapterName::Opencode, ingest_opencode_into)
}

/// Shared boilerplate for the per-harness verbs: clean stale stamps, snapshot
/// cursors, resolve content mode, run the harness body, emit any pending gap
/// warning for that adapter, then persist cursor mutations. The per-harness
/// `ingest_*_into` functions plug straight in as `body`.
fn run_single_harness<F>(
ledger: &mut Ledger,
opts: &IngestOptions,
adapter: AdapterName,
body: F,
) -> anyhow::Result<IngestReport>
where
F: FnOnce(
&mut Ledger,
&mut Cursors,
&IngestRoots,
ContentStoreMode,
Option<&Path>,
) -> anyhow::Result<IngestReport>,
{
progress(opts, "cleaning pending spawn stamps");
cleanup_stale_pending_stamps_in(opts.ledger_home.as_deref())?;
let before = load_cursors(ledger).map_err(|e| anyhow::anyhow!(e))?;
let mut after = before.clone();
let content_mode = resolve_content_mode(opts.ledger_home.as_deref());
let report = ingest_opencode_into(
let report = body(
ledger,
&mut after,
&opts.roots,
content_mode,
opts.ledger_home.as_deref(),
)?;
emit_gap_warning(
AdapterName::Opencode,
content_mode,
opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str)),
);
let on_warn: Option<&dyn Fn(&str)> = opts.on_warn.as_ref().map(|f| f.as_ref() as &dyn Fn(&str));
emit_gap_warning(adapter, content_mode, on_warn);
save_cursors_if_changed(ledger, &before, &after).map_err(|e| anyhow::anyhow!(e))?;
Ok(report)
}
Expand All @@ -322,10 +307,7 @@ pub fn ingest_claude_session(
opts: &IngestOptions,
) -> anyhow::Result<IngestReport> {
// Encode cwd → flattened dir name (TS: `cwd.replace(/\//g, '-')`).
let encoded: String = cwd
.chars()
.map(|c| if c == '/' { '-' } else { c })
.collect();
let encoded = cwd.replace('/', "-");
let file = claude_projects_dir(&opts.roots)
.join(&encoded)
.join(format!("{session_id}.jsonl"));
Expand Down Expand Up @@ -356,21 +338,7 @@ pub fn ingest_claude_session(

let appended_turns = result.turns.len();
ledger.append_turns(&result.turns)?;
if !result.content.is_empty() {
ledger.append_content(&result.content)?;
}
if !result.events.is_empty() {
ledger.append_compactions(&result.events)?;
}
if !result.relationships.is_empty() {
ledger.append_relationships(&result.relationships)?;
}
if !result.tool_result_events.is_empty() {
ledger.append_tool_result_events(&result.tool_result_events)?;
}
if !result.user_turns.is_empty() {
ledger.append_user_turns(&result.user_turns)?;
}
apply_parsed_extras(ledger, &result)?;

// Re-stat after parsing so the cursor reflects the byte position the
// parser actually read to. `parse_claude_session` uses BufReader::lines()
Expand Down Expand Up @@ -512,21 +480,7 @@ fn ingest_claude_into(
count_new_tool_results(&parsed.content),
);
}
if !parsed.content.is_empty() {
ledger.append_content(&parsed.content)?;
}
if !parsed.events.is_empty() {
ledger.append_compactions(&parsed.events)?;
}
if !parsed.relationships.is_empty() {
ledger.append_relationships(&parsed.relationships)?;
}
if !parsed.tool_result_events.is_empty() {
ledger.append_tool_result_events(&parsed.tool_result_events)?;
}
if !parsed.user_turns.is_empty() {
ledger.append_user_turns(&parsed.user_turns)?;
}
apply_parsed_extras(ledger, &parsed)?;

reconcile_inputs.push(ReconcileClaudeRelationshipsInput {
evidence: parsed.evidence,
Expand Down Expand Up @@ -616,15 +570,18 @@ fn ingest_codex_into(
resume,
..Default::default()
};
let parsed = match parse_codex_session_incremental(&file, &parse_opts) {
let mut parsed = match parse_codex_session_incremental(&file, &parse_opts) {
Ok(r) => r,
Err(err) => {
eprintln!("[burn] skipping {}: {}", file.display(), err);
continue;
}
};

let next_resume = parsed.resume;
// Take `resume` out so the remaining `parsed` can be borrowed by
// `apply_parsed_extras` below; the resume state drives only cursor
// bookkeeping past that point.
let next_resume = std::mem::take(&mut parsed.resume);
let mut codex_session_id = if !next_resume.session_id.is_empty() {
Some(next_resume.session_id.clone())
} else {
Expand Down Expand Up @@ -678,21 +635,7 @@ fn ingest_codex_into(
count_new_tool_results(&parsed.content),
);
}
if !parsed.content.is_empty() {
ledger.append_content(&parsed.content)?;
}
if !parsed.events.is_empty() {
ledger.append_compactions(&parsed.events)?;
}
if !parsed.relationships.is_empty() {
ledger.append_relationships(&parsed.relationships)?;
}
if !parsed.tool_result_events.is_empty() {
ledger.append_tool_result_events(&parsed.tool_result_events)?;
}
if !parsed.user_turns.is_empty() {
ledger.append_user_turns(&parsed.user_turns)?;
}
apply_parsed_extras(ledger, &parsed)?;

let next = resume_state_to_codex_cursor(&next_resume, inode, parsed.end_offset, mtime);
cursors.insert(key, FileCursor::Codex(Box::new(next)));
Expand Down Expand Up @@ -796,21 +739,7 @@ fn ingest_opencode_into(
count_new_tool_results(&parsed.content),
);
}
if !parsed.content.is_empty() {
ledger.append_content(&parsed.content)?;
}
if !parsed.events.is_empty() {
ledger.append_compactions(&parsed.events)?;
}
if !parsed.relationships.is_empty() {
ledger.append_relationships(&parsed.relationships)?;
}
if !parsed.tool_result_events.is_empty() {
ledger.append_tool_result_events(&parsed.tool_result_events)?;
}
if !parsed.user_turns.is_empty() {
ledger.append_user_turns(&parsed.user_turns)?;
}
apply_parsed_extras(ledger, &parsed)?;

let seen: Vec<String> = parsed.seen_message_ids.into_iter().collect();
let next = OpencodeCursor {
Expand Down Expand Up @@ -992,6 +921,69 @@ fn last_completed_turn_to_value(t: &CodexLastCompletedTurn) -> Value {
Value::Object(m)
}

/// Slice accessors shared by every parser result so [`apply_parsed_extras`]
/// can append the trailing derived-record buckets without caring which
/// harness produced them. Turns are deliberately omitted — the harness
/// orchestrators count them, look up the session id, and resolve pending
/// stamps before appending.
trait DerivedRecords {
fn content(&self) -> &[ContentRecord];
fn events(&self) -> &[CompactionEvent];
fn relationships(&self) -> &[SessionRelationshipRecord];
fn tool_result_events(&self) -> &[ToolResultEventRecord];
fn user_turns(&self) -> &[UserTurnRecord];
}

macro_rules! impl_derived_records {
($ty:ty) => {
impl DerivedRecords for $ty {
fn content(&self) -> &[ContentRecord] {
&self.content
}
fn events(&self) -> &[CompactionEvent] {
&self.events
}
fn relationships(&self) -> &[SessionRelationshipRecord] {
&self.relationships
}
fn tool_result_events(&self) -> &[ToolResultEventRecord] {
&self.tool_result_events
}
fn user_turns(&self) -> &[UserTurnRecord] {
&self.user_turns
}
}
};
}

impl_derived_records!(ClaudeParseResult);
impl_derived_records!(ClaudeParseIncrementalResult);
impl_derived_records!(ParseCodexIncrementalResult);
impl_derived_records!(ParseOpencodeIncrementalResult);

/// Append the trailing derived-record buckets shared by every parser
/// result: content, compactions, relationships, tool-result events, and
/// user-turn rows. Each bucket is gated on non-empty to avoid a no-op
/// transaction.
fn apply_parsed_extras<P: DerivedRecords>(ledger: &mut Ledger, p: &P) -> anyhow::Result<()> {
if !p.content().is_empty() {
ledger.append_content(p.content())?;
}
if !p.events().is_empty() {
ledger.append_compactions(p.events())?;
}
if !p.relationships().is_empty() {
ledger.append_relationships(p.relationships())?;
}
if !p.tool_result_events().is_empty() {
ledger.append_tool_result_events(p.tool_result_events())?;
}
if !p.user_turns().is_empty() {
ledger.append_user_turns(p.user_turns())?;
}
Ok(())
}

fn resolve_pending_stamps_for_report(
ledger: &mut Ledger,
candidate: &PendingStampSessionCandidate,
Expand All @@ -1013,43 +1005,6 @@ fn resolve_pending_stamps_for_report(

// --- filesystem helpers --------------------------------------------------

fn list_dirs(parent: &Path) -> Vec<PathBuf> {
let mut out = Vec::new();
let entries = match fs::read_dir(parent) {
Ok(it) => it,
Err(_) => return out,
};
for entry in entries.flatten() {
match entry.file_type() {
Ok(ft) if ft.is_dir() => out.push(parent.join(entry.file_name())),
_ => {}
}
}
out
}

fn list_jsonl_files(dir: &Path) -> Vec<PathBuf> {
let mut out = Vec::new();
let entries = match fs::read_dir(dir) {
Ok(it) => it,
Err(_) => return out,
};
for entry in entries.flatten() {
let Ok(ft) = entry.file_type() else { continue };
if !ft.is_file() {
continue;
}
let name = entry.file_name();
let Some(name_str) = name.to_str() else {
continue;
};
if name_str.ends_with(".jsonl") {
out.push(dir.join(name_str));
}
}
out
}

fn dir_mtime(dir: &Path) -> Option<i64> {
let meta = fs::metadata(dir).ok()?;
Some(mtime_ms(&meta))
Expand Down
Loading
Loading