Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ Cross-package release notes for relayburn. Package changelogs contain package-le

### Changed

- `relayburn-sdk`: lower per-record allocations in reader hashing, tool-result
sizing, relationship dedup, and project resolution. Cuts overhead during
large session imports and concurrent `resolve_project` calls.
- `relayburn-sdk`: `parse_claude_session` now delegates to the incremental
parser with `start_offset = 0`, dropping the duplicate `ParseState`
codepath. Behavior is unchanged — trailing in-progress turns and final
Expand Down
244 changes: 213 additions & 31 deletions crates/relayburn-sdk/src/reader/claude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1050,8 +1050,9 @@ fn collect_tool_result_events(
Some(s) if !s.is_empty() => s.to_string(),
_ => continue,
};
let call_index = *counters.get(&tu).unwrap_or(&0);
counters.insert(tu.clone(), call_index + 1);
let entry = counters.entry(tu.clone()).or_insert(0);
let call_index = *entry;
*entry += 1;
let is_error = bo.get("is_error").and_then(Value::as_bool) == Some(true);
let mut record = ToolResultEventRecord {
v: 1,
Expand Down Expand Up @@ -1153,8 +1154,9 @@ fn build_claude_system_tool_result_event(
if agent_id.is_none() && subagent_session_id.is_none() {
return None;
}
let call_index = *counters.get(&tool_use_id).unwrap_or(&0);
counters.insert(tool_use_id.clone(), call_index + 1);
let entry = counters.entry(tool_use_id.clone()).or_insert(0);
let call_index = *entry;
*entry += 1;
let status = claude_system_event_status(line);
let mut record = ToolResultEventRecord {
v: 1,
Expand Down Expand Up @@ -1534,38 +1536,40 @@ fn append_unique(values: Option<Vec<String>>, value: String) -> Vec<String> {
v
}

fn relationship_key(row: &SessionRelationshipRecord) -> String {
let source = match row.source {
RelationshipSourceKind::ClaudeCode => "claude-code",
RelationshipSourceKind::Codex => "codex",
RelationshipSourceKind::Opencode => "opencode",
RelationshipSourceKind::AnthropicApi => "anthropic-api",
RelationshipSourceKind::OpenaiApi => "openai-api",
RelationshipSourceKind::GeminiApi => "gemini-api",
RelationshipSourceKind::SpawnEnv => "spawn-env",
RelationshipSourceKind::NativeClaude => "native-claude",
RelationshipSourceKind::NativeOpencode => "native-opencode",
};
let rt = match row.relationship_type {
RelationshipType::Root => "root",
RelationshipType::Continuation => "continuation",
RelationshipType::Fork => "fork",
RelationshipType::Subagent => "subagent",
};
format!(
"{}|{}|{}|{}|{}|{}",
source,
row.session_id,
rt,
/// Owned, hashable identity for a relationship row. Used as a `HashSet` key
/// for cross-line dedup; cheap because the original `relationship_key` did one
/// `format!`-driven allocation per call but had to be re-run for every
/// candidate during `has_relationship`.
type RelationshipKey = (
&'static str,
String,
&'static str,
String,
String,
String,
);

fn relationship_key_borrowed<'a>(
row: &'a SessionRelationshipRecord,
) -> (&'static str, &'a str, &'static str, &'a str, &'a str, &'a str) {
(
row.source.wire_str(),
row.session_id.as_str(),
row.relationship_type.wire_str(),
row.related_session_id.as_deref().unwrap_or(""),
row.agent_id.as_deref().unwrap_or(""),
row.parent_tool_use_id.as_deref().unwrap_or(""),
)
}

fn relationship_key(row: &SessionRelationshipRecord) -> RelationshipKey {
let b = relationship_key_borrowed(row);
(b.0, b.1.to_string(), b.2, b.3.to_string(), b.4.to_string(), b.5.to_string())
}

fn has_relationship(rows: &[SessionRelationshipRecord], row: &SessionRelationshipRecord) -> bool {
let key = relationship_key(row);
rows.iter().any(|r| relationship_key(r) == key)
let key = relationship_key_borrowed(row);
rows.iter().any(|r| relationship_key_borrowed(r) == key)
}

fn collect_subagent_relationships(turns: &[TurnRecord], out: &mut Vec<SessionRelationshipRecord>) {
Expand Down Expand Up @@ -2059,7 +2063,7 @@ fn collect_explicit_claude_relationships_incremental(
line: &serde_json::Map<String, Value>,
evidence: &mut ClaudeRelationshipEvidence,
out: &mut Vec<(u64, SessionRelationshipRecord)>,
seen: &mut HashSet<String>,
seen: &mut HashSet<RelationshipKey>,
session_id: &str,
fallback_ts: Option<&str>,
line_offset: u64,
Expand Down Expand Up @@ -2147,7 +2151,7 @@ fn run_incremental<C: TokenCounter + ?Sized>(
let mut pending_relationships: Vec<(u64, SessionRelationshipRecord)> = Vec::new();
let mut pending_user_turns: Vec<(u64, UserTurnRecord)> = Vec::new();
let mut seen_root_session_ids: HashSet<String> = HashSet::new();
let mut seen_explicit_relationship_ids: HashSet<String> = HashSet::new();
let mut seen_explicit_relationship_ids: HashSet<RelationshipKey> = HashSet::new();
let mut pending_user_turn_inc_idx: Option<usize> = None;

let mut file = File::open(path)?;
Expand Down Expand Up @@ -2599,6 +2603,184 @@ mod tests {
.join(name)
}

#[test]
fn parse_result_from_incremental_result_copies_all_fields() {
let turn = TurnRecord {
v: 1,
source: SourceKind::ClaudeCode,
session_id: "session-1".to_string(),
session_path: Some("/tmp/session.jsonl".to_string()),
message_id: "msg-1".to_string(),
turn_index: 7,
ts: "2026-05-11T00:00:00.000Z".to_string(),
model: "claude-sonnet-4-6".to_string(),
project: Some("/tmp/project".to_string()),
project_key: Some("project-key".to_string()),
usage: Usage {
input: 1,
output: 2,
reasoning: 3,
cache_read: 4,
cache_create_5m: 5,
cache_create_1h: 6,
},
tool_calls: vec![],
files_touched: Some(vec!["/tmp/project/src/lib.rs".to_string()]),
subagent: Some(Subagent {
is_sidechain: false,
parent_tool_use_id: Some("tool-1".to_string()),
agent_id: Some("agent-1".to_string()),
parent_agent_id: Some("parent-agent".to_string()),
subagent_type: Some("general-purpose".to_string()),
description: Some("delegate".to_string()),
}),
stop_reason: Some("end_turn".to_string()),
activity: Some(crate::reader::types::ActivityCategory::Coding),
retries: Some(1),
has_edits: Some(true),
fidelity: Some(Fidelity {
granularity: UsageGranularity::PerTurn,
coverage: Coverage {
has_input_tokens: true,
has_output_tokens: true,
has_reasoning_tokens: true,
has_cache_read_tokens: true,
has_cache_create_tokens: true,
has_tool_calls: true,
has_tool_result_events: true,
has_session_relationships: true,
has_raw_content: true,
},
class: crate::reader::types::FidelityClass::Full,
}),
};
let content = ContentRecord {
v: 1,
source: SourceKind::ClaudeCode,
session_id: "session-1".to_string(),
message_id: "msg-1".to_string(),
ts: "2026-05-11T00:00:00.000Z".to_string(),
role: ContentRole::Assistant,
kind: ContentKind::Text,
text: Some("hello".to_string()),
tool_use: None,
tool_result: None,
};
let event = CompactionEvent {
v: 1,
source: SourceKind::ClaudeCode,
session_id: "session-1".to_string(),
ts: "2026-05-11T00:01:00.000Z".to_string(),
preceding_message_id: Some("msg-0".to_string()),
tokens_before_compact: Some(42),
};
let relationship = SessionRelationshipRecord {
v: 1,
source: RelationshipSourceKind::ClaudeCode,
session_id: "session-1".to_string(),
related_session_id: Some("session-0".to_string()),
relationship_type: RelationshipType::Continuation,
ts: Some("2026-05-11T00:02:00.000Z".to_string()),
source_session_id: Some("source-session".to_string()),
source_version: Some("1.2.3".to_string()),
parent_tool_use_id: Some("tool-1".to_string()),
agent_id: Some("agent-1".to_string()),
subagent_type: Some("general-purpose".to_string()),
description: Some("continued".to_string()),
};
let tool_result_event = ToolResultEventRecord {
v: 1,
source: SourceKind::ClaudeCode,
session_id: "session-1".to_string(),
message_id: Some("msg-1".to_string()),
tool_use_id: "tool-1".to_string(),
call_index: Some(0),
event_index: 9,
ts: Some("2026-05-11T00:03:00.000Z".to_string()),
status: ToolResultStatus::Completed,
event_source: ToolResultEventSource::ToolResult,
content_length: Some(5),
content_hash: Some("abc123".to_string()),
is_error: Some(false),
usage: Some(Usage::default()),
usage_attribution: Some(crate::reader::types::UsageAttribution::SingleToolTurn),
subagent_session_id: Some("sub-session".to_string()),
agent_id: Some("agent-1".to_string()),
replaced_tools: Some(vec!["old-tool".to_string()]),
collapsed_calls: Some(2),
};
let user_turn = UserTurnRecord {
v: 1,
source: SourceKind::ClaudeCode,
session_id: "session-1".to_string(),
user_uuid: "user-1".to_string(),
ts: "2026-05-11T00:04:00.000Z".to_string(),
preceding_message_id: Some("msg-0".to_string()),
following_message_id: Some("msg-1".to_string()),
blocks: vec![UserTurnBlock {
kind: crate::reader::types::UserTurnBlockKind::Text,
tool_use_id: None,
byte_len: 5,
approx_tokens: 1,
is_error: None,
}],
};
let evidence = ClaudeRelationshipEvidence {
file_session_id: Some("session-1".to_string()),
first_ts: Some("2026-05-11T00:00:00.000Z".to_string()),
in_log_session_ids: vec!["session-1".to_string()],
source_version: Some("1.2.3".to_string()),
first_parent_uuid: Some("parent-1".to_string()),
seen_uuids: vec!["uuid-1".to_string()],
has_resume_marker: true,
resume_target_session_id: Some("session-0".to_string()),
explicit_continuation_target_session_ids: Some(vec!["session-0".to_string()]),
explicit_fork_target_session_ids: Some(vec!["session-2".to_string()]),
user_seen: true,
};

let incremental = ParseIncrementalResult {
turns: vec![turn.clone()],
content: vec![content.clone()],
events: vec![event.clone()],
relationships: vec![relationship.clone()],
tool_result_events: vec![tool_result_event.clone()],
user_turns: vec![user_turn.clone()],
end_offset: 123,
last_user_text: "latest user turn".to_string(),
evidence: evidence.clone(),
};

let full = ParseResult::from(incremental);

assert_eq!(full.turns, vec![turn]);
assert_eq!(full.content, vec![content]);
assert_eq!(full.events, vec![event]);
assert_eq!(full.relationships, vec![relationship]);
assert_eq!(full.tool_result_events, vec![tool_result_event]);
assert_eq!(full.user_turns, vec![user_turn]);
assert_eq!(full.evidence.file_session_id, evidence.file_session_id);
assert_eq!(full.evidence.first_ts, evidence.first_ts);
assert_eq!(full.evidence.in_log_session_ids, evidence.in_log_session_ids);
assert_eq!(full.evidence.source_version, evidence.source_version);
assert_eq!(full.evidence.first_parent_uuid, evidence.first_parent_uuid);
assert_eq!(full.evidence.seen_uuids, evidence.seen_uuids);
assert_eq!(full.evidence.has_resume_marker, evidence.has_resume_marker);
assert_eq!(
full.evidence.resume_target_session_id,
evidence.resume_target_session_id
);
assert_eq!(
full.evidence.explicit_continuation_target_session_ids,
evidence.explicit_continuation_target_session_ids
);
assert_eq!(
full.evidence.explicit_fork_target_session_ids,
evidence.explicit_fork_target_session_ids
);
assert_eq!(full.evidence.user_seen, evidence.user_seen);
}

#[test]
fn simple_turn_parses() {
let path = fixture("simple-turn.jsonl");
Expand Down
10 changes: 6 additions & 4 deletions crates/relayburn-sdk/src/reader/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,9 @@ fn parse_codex_buffer<R: BufRead>(
Some(c) if !c.is_empty() => c.to_string(),
_ => continue,
};
let call_index = *tool_result_counters.get(&call_id).unwrap_or(&0);
tool_result_counters.insert(call_id.clone(), call_index + 1);
let entry = tool_result_counters.entry(call_id.clone()).or_insert(0);
let call_index = *entry;
*entry += 1;
let status = subagent_notification_status(payload);
let mut ev = ToolResultEventRecord {
v: 1,
Expand Down Expand Up @@ -879,8 +880,9 @@ fn parse_codex_buffer<R: BufRead>(
if user_turn_slot.ts.is_empty() && !item_ts.is_empty() {
user_turn_slot.ts = item_ts.to_string();
}
let call_index = *tool_result_counters.get(&call_id).unwrap_or(&0);
tool_result_counters.insert(call_id.clone(), call_index + 1);
let entry = tool_result_counters.entry(call_id.clone()).or_insert(0);
let call_index = *entry;
*entry += 1;
let initial_status = if open_turn
.as_ref()
.map(|o| o.errored_call_ids.contains(&call_id))
Expand Down
11 changes: 5 additions & 6 deletions crates/relayburn-sdk/src/reader/git.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,15 @@ impl ProjectResolver {
}

/// Resolve a project for `cwd`, consulting (and populating) the cache.
/// Holds the cache lock across `resolve_uncached` so concurrent callers
/// with the same `cwd` only do the filesystem walk once.
pub fn resolve(&self, cwd: &str) -> ResolvedProject {
if let Some(hit) = self.cache.lock().unwrap().get(cwd) {
let mut cache = self.cache.lock().unwrap();
if let Some(hit) = cache.get(cwd) {
return hit.clone();
}
let result = resolve_uncached(cwd);
self.cache
.lock()
.unwrap()
.entry(cwd.to_string())
.or_insert_with(|| result.clone());
cache.insert(cwd.to_string(), result.clone());
result
}

Expand Down
Loading
Loading