Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/jcode-provider-metadata/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,10 @@ mod tests {
assert_eq!(OLLAMA_PROFILE.default_model, None);
assert!(!OLLAMA_PROFILE.requires_api_key);

assert_eq!(OLLAMA_LOGIN_PROVIDER.auth_kind, LoginProviderAuthKind::Local);
assert_eq!(
OLLAMA_LOGIN_PROVIDER.auth_kind,
LoginProviderAuthKind::Local
);
assert_eq!(OLLAMA_LOGIN_PROVIDER.auth_status_method, "local endpoint");
assert!(matches!(
OLLAMA_LOGIN_PROVIDER.target,
Expand Down
61 changes: 48 additions & 13 deletions src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ impl CompactionManager {
self.semantic_embed_cache.clear();
self.semantic_embed_cache_counter = 0;
self.total_turns = total_messages;
if state.compacted_count > total_messages {
crate::logging::warn(&format!(
"[compaction] restore: stale compacted_count={} exceeds messages.len()={}; clamping",
state.compacted_count, total_messages
));
}
self.compacted_count = state.compacted_count.min(total_messages);
self.active_message_chars = 0;
self.active_message_chars_dirty = total_messages > self.compacted_count;
Expand Down Expand Up @@ -287,6 +293,29 @@ impl CompactionManager {
self.active_message_chars_dirty = false;
}

fn clamp_stale_compacted_count(&mut self, all_messages: &[Message], caller: &str) -> bool {
if all_messages.is_empty() {
return false;
}

if self.compacted_count <= all_messages.len() {
return false;
}

crate::logging::warn(&format!(
"[compaction] {}: stale compacted_count={} exceeds messages.len()={}; clamping",
caller,
self.compacted_count,
all_messages.len()
));
self.compacted_count = all_messages.len();
self.total_turns = all_messages.len();
self.active_message_chars = 0;
self.active_message_chars_dirty = false;
self.observed_input_tokens = None;
true
}

pub fn restore_persisted_stored_state_with(
&mut self,
state: &crate::session::StoredCompactionState,
Expand Down Expand Up @@ -603,22 +632,17 @@ impl CompactionManager {
/// Get the active (uncompacted) messages from a full message list.
/// Skips the first `compacted_count` messages.
fn active_messages<'a>(&self, all_messages: &'a [Message]) -> &'a [Message] {
if self.compacted_count <= all_messages.len() {
&all_messages[self.compacted_count..]
} else {
// Edge case: messages were cleared/replaced with fewer items
all_messages
}
let start = self.compacted_count.min(all_messages.len());
&all_messages[start..]
}

fn active_message_chars_with(&self, all_messages: &[Message]) -> usize {
let active = self.active_messages(all_messages);
if self.active_message_chars_dirty
|| self.active_messages_count() != self.active_messages(all_messages).len()
|| self.active_messages_count() != active.len()
|| self.compacted_count > all_messages.len()
{
self.active_messages(all_messages)
.iter()
.map(message_char_count)
.sum()
active.iter().map(message_char_count).sum()
} else {
self.active_message_chars
}
Expand Down Expand Up @@ -901,6 +925,8 @@ impl CompactionManager {
/// Check if background compaction is done and apply it, updating rolling
/// token-estimate state from the provided full message list.
pub fn check_and_apply_compaction_with(&mut self, all_messages: &[Message]) {
self.clamp_stale_compacted_count(all_messages, "check_and_apply_compaction_with");

let task = match self.pending_task.take() {
Some(task) => task,
None => return,
Expand Down Expand Up @@ -931,7 +957,10 @@ impl CompactionManager {
};

// Advance the compacted count — these messages are now summarized
self.compacted_count += self.pending_cutoff;
self.compacted_count = self.compacted_count.saturating_add(self.pending_cutoff);
if !all_messages.is_empty() {
self.compacted_count = self.compacted_count.min(all_messages.len());
}
self.active_message_chars = self
.active_message_chars_with(all_messages)
.saturating_sub(compacted_chars);
Expand Down Expand Up @@ -1010,7 +1039,9 @@ impl CompactionManager {
/// Get messages for API call (with summary if compacted).
/// Takes the full message list from the caller.
pub fn messages_for_api_with(&mut self, all_messages: &[Message]) -> Vec<Message> {
self.clamp_stale_compacted_count(all_messages, "messages_for_api_with");
self.check_and_apply_compaction_with(all_messages);
self.clamp_stale_compacted_count(all_messages, "messages_for_api_with");
self.discard_oversized_openai_native_compaction();

let active = self.active_messages(all_messages);
Expand Down Expand Up @@ -1196,6 +1227,7 @@ impl CompactionManager {
/// exceed the token budget, progressively keeps fewer turns down to
/// `MIN_TURNS_TO_KEEP`.
pub fn hard_compact_with(&mut self, all_messages: &[Message]) -> Result<usize, String> {
self.clamp_stale_compacted_count(all_messages, "hard_compact_with");
let active = self.active_messages(all_messages);

if active.len() <= MIN_TURNS_TO_KEEP {
Expand Down Expand Up @@ -1257,7 +1289,10 @@ impl CompactionManager {
original_turn_count: cutoff,
};

self.compacted_count += cutoff;
self.compacted_count = self
.compacted_count
.saturating_add(cutoff)
.min(all_messages.len());
self.active_message_chars = remaining_suffix_chars[cutoff];
self.active_message_chars_dirty = false;
self.active_summary = Some(summary);
Expand Down
124 changes: 124 additions & 0 deletions src/compaction_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,130 @@ fn test_hard_compact_preserves_recent_turns() {
);
}

#[test]
fn test_bug_175_messages_for_api_self_heals_stale_compacted_count() {
let mut manager = CompactionManager::new().with_budget(1_000);
let mut messages = Vec::new();
for i in 0..30 {
messages.push(make_text_message(
Role::User,
&format!("turn {} {}", i, "x".repeat(120)),
));
manager.notify_message_added();
}

manager.compacted_count = 100;
manager.total_turns = 100;
manager.active_message_chars = messages.iter().map(message_char_count).sum();
manager.active_message_chars_dirty = false;
manager.active_summary = Some(jcode_compaction_core::Summary {
text: "# Existing summary".to_string(),
openai_encrypted_content: None,
covers_up_to_turn: 100,
original_turn_count: 100,
});

let api_messages = manager.messages_for_api_with(&messages);

assert_eq!(
manager.compacted_count(),
messages.len(),
"stale compacted_count should be clamped to the caller's full message list"
);
assert_eq!(
api_messages.len(),
1,
"stale compacted_count must not replay the full transcript after the summary"
);
assert_eq!(manager.stats_with(&messages).active_messages, 0);
}

#[test]
fn test_bug_175_active_messages_clamps_stale_compacted_count() {
let mut manager = CompactionManager::new();
let messages = vec![
make_text_message(Role::User, "first"),
make_text_message(Role::Assistant, "second"),
make_text_message(Role::User, "third"),
];

manager.compacted_count = 10;
manager.total_turns = 10;

assert!(
manager.active_messages(&messages).is_empty(),
"a stale compacted_count must produce an empty active tail, not replay all messages"
);
}

#[test]
fn test_bug_175_token_estimate_ignores_stale_cached_active_chars() {
let mut manager = CompactionManager::new().with_budget(1_000);
let mut messages = Vec::new();
for i in 0..3 {
messages.push(make_text_message(
Role::User,
&format!("turn {} {}", i, "x".repeat(10_000)),
));
manager.notify_message_added();
}

manager.compacted_count = 10;
manager.total_turns = 10;
manager.active_message_chars = 1_000_000;
manager.active_message_chars_dirty = false;

assert_eq!(
manager.token_estimate_with(&messages),
0,
"stale cached active_message_chars should not keep emergency compaction above threshold"
);
}

#[test]
fn test_bug_175_hard_compact_does_not_inflate_stale_compacted_count() {
let mut manager = CompactionManager::new().with_budget(1_000);
let mut messages = Vec::new();
for i in 0..30 {
messages.push(make_text_message(
Role::User,
&format!("turn {} {}", i, "z".repeat(200)),
));
manager.notify_message_added();
}

manager.compacted_count = 100;
manager.total_turns = 100;
manager.active_message_chars_dirty = true;
manager.active_summary = Some(jcode_compaction_core::Summary {
text: "# Existing summary".to_string(),
openai_encrypted_content: None,
covers_up_to_turn: 100,
original_turn_count: 100,
});

let result = manager.hard_compact_with(&messages);

assert!(
result.is_err(),
"there are no active messages left after clamping stale compacted_count"
);
assert_eq!(
manager.compacted_count(),
messages.len(),
"hard compact should never push compacted_count past messages.len()"
);
let markers = manager
.active_summary
.as_ref()
.map(|summary| summary.text.matches("[Emergency compaction]").count())
.unwrap_or(0);
assert_eq!(
markers, 0,
"stale compacted state should not append another emergency summary block"
);
}

// ── safe_compaction_cutoff: tool call/result pair integrity ─────────

#[test]
Expand Down
12 changes: 6 additions & 6 deletions src/provider/openrouter_sse_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,12 @@ impl OpenRouterStream {
.and_then(|c| c.as_str())
&& !reasoning_content.is_empty()
{
let reasoning_delta = if reasoning_content.starts_with(&self.reasoning_buffer)
{
&reasoning_content[self.reasoning_buffer.len()..]
} else {
reasoning_content
};
let reasoning_delta =
if reasoning_content.starts_with(&self.reasoning_buffer) {
&reasoning_content[self.reasoning_buffer.len()..]
} else {
reasoning_content
};
self.reasoning_buffer = reasoning_content.to_string();
if !reasoning_delta.is_empty() {
self.pending
Expand Down
3 changes: 2 additions & 1 deletion src/provider/openrouter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,8 @@ fn test_parse_next_event_emits_only_incremental_reasoning_content() {
}

stream.buffer =
"data:{\"choices\":[{\"delta\":{\"reasoning_content\":\"Thinking more\"}}]}\n\n".to_string();
"data:{\"choices\":[{\"delta\":{\"reasoning_content\":\"Thinking more\"}}]}\n\n"
.to_string();
match stream.parse_next_event() {
Some(StreamEvent::ThinkingDelta(text)) => assert_eq!(text, " more"),
other => panic!("expected incremental ThinkingDelta, got {:?}", other),
Expand Down