Skip to content

Commit c669ae3

Browse files
authored
Merge pull request #99 from 7df-lab/dev/bugfix0613
Fix Anthropic usage and context accounting
2 parents 564cade + f383091 commit c669ae3

13 files changed

Lines changed: 349 additions & 55 deletions

File tree

crates/core/src/context/mod.rs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use async_trait::async_trait;
55
use serde::{Deserialize, Serialize};
66

77
use crate::{ItemId, ResponseItem, SessionId, SummaryModelSelection, TurnId};
8-
use devo_protocol::{ContentBlock, Message, Role};
8+
use devo_protocol::{ContentBlock, Message, Model, Role};
99

1010
// ---------------------------------------------------------------------------
1111
// Contextual user fragment traits and registration
@@ -120,6 +120,10 @@ pub struct TokenBudget {
120120
pub max_output_tokens: usize,
121121
/// The threshold at which automatic compaction should trigger.
122122
pub compact_threshold: f64,
123+
/// Absolute token limit for automatic compaction, when model metadata
124+
/// provides a more precise context boundary than the default ratio.
125+
#[serde(default, skip_serializing_if = "Option::is_none")]
126+
pub auto_compact_token_limit: Option<usize>,
123127
}
124128

125129
impl TokenBudget {
@@ -129,6 +133,22 @@ impl TokenBudget {
129133
context_window,
130134
max_output_tokens,
131135
compact_threshold: 0.9,
136+
auto_compact_token_limit: None,
137+
}
138+
}
139+
140+
/// Creates a token budget aligned with the active model's effective context window.
141+
pub fn for_model(model: &Model) -> Self {
142+
let default_budget = Self::default();
143+
let context_window = model.effective_context_window() as usize;
144+
let max_output_tokens = model
145+
.max_tokens
146+
.map_or(default_budget.max_output_tokens, |value| value as usize);
147+
Self {
148+
context_window,
149+
max_output_tokens,
150+
compact_threshold: 1.0,
151+
auto_compact_token_limit: Some(context_window),
132152
}
133153
}
134154

@@ -139,6 +159,9 @@ impl TokenBudget {
139159

140160
/// Returns whether compaction should run for the supplied prompt token count.
141161
pub fn should_compact(&self, current_tokens: usize) -> bool {
162+
if let Some(limit) = self.auto_compact_token_limit {
163+
return current_tokens > limit;
164+
}
142165
current_tokens as f64 > self.input_budget() as f64 * self.compact_threshold
143166
}
144167
}
@@ -383,6 +406,8 @@ mod tests {
383406
ByteTokenEstimator, ContextualUserFragment, PromptAssemblyInput, SnapshotPersistFailure,
384407
TokenBudget, TokenEstimator,
385408
};
409+
use devo_protocol::Model;
410+
use pretty_assertions::assert_eq;
386411
use std::hint::black_box;
387412
use std::time::Instant;
388413

@@ -403,11 +428,52 @@ mod tests {
403428
#[test]
404429
fn token_budget_default_values() {
405430
let budget = TokenBudget::default();
406-
assert_eq!(budget.context_window, 200_000);
407-
assert_eq!(budget.max_output_tokens, 8192);
431+
assert_eq!(
432+
budget,
433+
TokenBudget {
434+
context_window: 200_000,
435+
max_output_tokens: 8192,
436+
compact_threshold: 0.9,
437+
auto_compact_token_limit: None,
438+
}
439+
);
408440
assert!((budget.compact_threshold - 0.9).abs() < f64::EPSILON);
409441
}
410442

443+
#[test]
444+
fn token_budget_for_model_uses_effective_context_as_auto_compact_limit() {
445+
let model = Model {
446+
context_window: 1_000_000,
447+
effective_context_window_percent: Some(95),
448+
max_tokens: Some(384_000),
449+
..Model::default()
450+
};
451+
452+
assert_eq!(
453+
TokenBudget::for_model(&model),
454+
TokenBudget {
455+
context_window: 950_000,
456+
max_output_tokens: 384_000,
457+
compact_threshold: 1.0,
458+
auto_compact_token_limit: Some(950_000),
459+
}
460+
);
461+
}
462+
463+
#[test]
464+
fn model_token_budget_does_not_compact_before_effective_context_limit() {
465+
let model = Model {
466+
context_window: 1_000_000,
467+
effective_context_window_percent: Some(95),
468+
max_tokens: Some(384_000),
469+
..Model::default()
470+
};
471+
let budget = TokenBudget::for_model(&model);
472+
473+
assert_eq!(budget.should_compact(950_000), false);
474+
assert_eq!(budget.should_compact(950_001), true);
475+
}
476+
411477
#[test]
412478
fn token_budget_input_budget_saturates() {
413479
let budget = TokenBudget::new(100, 200);

crates/core/src/query.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -691,11 +691,11 @@ pub async fn query(
691691
}
692692

693693
// 1.3 + 1.7: Check token budget and compact before building the request
694-
if session.last_input_tokens > 0
694+
if session.last_turn_tokens > 0
695695
&& session
696696
.config
697697
.token_budget
698-
.should_compact(session.last_input_tokens)
698+
.should_compact(session.last_turn_tokens)
699699
{
700700
if !budget_steer_injected {
701701
if let Some(turn) = session.turn_state.as_mut() {
@@ -982,6 +982,10 @@ pub async fn query(
982982
session.total_cache_read_tokens +=
983983
response.usage.cache_read_input_tokens.unwrap_or(0);
984984
session.last_input_tokens = response.usage.input_tokens;
985+
session.last_turn_tokens = response
986+
.usage
987+
.input_tokens
988+
.saturating_add(response.usage.output_tokens);
985989

986990
emit(QueryEvent::Usage {
987991
input_tokens: response.usage.input_tokens,

crates/core/src/session.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ impl From<HashMap<String, String>> for ProviderRequestModelMap {
116116
}
117117

118118
impl TurnConfig {
119+
pub fn token_budget(&self) -> TokenBudget {
120+
TokenBudget::for_model(&self.model)
121+
}
122+
119123
pub fn new(model: Model, thinking_selection: Option<String>) -> Self {
120124
let request_model = model.slug.clone();
121125
let thinking_selection = model.normalize_thinking_selection(thinking_selection.as_deref());
@@ -239,8 +243,10 @@ pub struct SessionState {
239243
pub total_cache_read_tokens: usize, // TODO: same with `total_input_cached_tokens`.
240244
pub prompt_token_estimate: usize,
241245
/// Input tokens reported by the model for the most recent turn.
242-
/// Used by `TokenBudget::should_compact()` to decide when to compact.
243246
pub last_input_tokens: usize,
247+
/// Total context tokens reported by the model for the most recent turn.
248+
/// This includes input plus output and drives automatic compaction.
249+
pub last_turn_tokens: usize,
244250
/// Thread-safe queue for pending turn inputs.
245251
/// - Source: user sends `turn/start` while a turn is active.
246252
/// - Lifecycle: preserved across turns; unconsumed items are pushed back
@@ -273,6 +279,7 @@ impl SessionState {
273279
total_cache_read_tokens: 0,
274280
prompt_token_estimate: 0,
275281
last_input_tokens: 0,
282+
last_turn_tokens: 0,
276283
pending_turn_queue: Arc::new(Mutex::new(VecDeque::new())),
277284
btw_input_queue: Arc::new(Mutex::new(VecDeque::new())),
278285
turn_state: None,
@@ -480,6 +487,29 @@ mod tests {
480487
assert_eq!(provider_bound.thinking_selection, Some("high".to_string()));
481488
}
482489

490+
#[test]
491+
fn turn_config_token_budget_uses_model_effective_context() {
492+
let model = Model {
493+
slug: "deepseek-v4-pro".to_string(),
494+
display_name: "deepseek-v4-pro".to_string(),
495+
context_window: 1_000_000,
496+
effective_context_window_percent: Some(95),
497+
max_tokens: Some(384_000),
498+
..Model::default()
499+
};
500+
let turn_config = TurnConfig::new(model, None);
501+
502+
assert_eq!(
503+
turn_config.token_budget(),
504+
TokenBudget {
505+
context_window: 950_000,
506+
max_output_tokens: 384_000,
507+
compact_threshold: 1.0,
508+
auto_compact_token_limit: Some(950_000),
509+
}
510+
);
511+
}
512+
483513
#[test]
484514
fn session_config_default_values() {
485515
let config = SessionConfig::default();
@@ -505,6 +535,7 @@ mod tests {
505535
assert_eq!(state.turn_count, 0);
506536
assert_eq!(state.total_input_tokens, 0);
507537
assert_eq!(state.total_output_tokens, 0);
538+
assert_eq!(state.last_turn_tokens, 0);
508539
}
509540

510541
#[test]

crates/provider/src/anthropic/messages.rs

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use serde_json::json;
3737
use tracing::debug;
3838

3939
use super::AnthropicAIRole;
40+
use super::stream_usage::AnthropicStreamUsage;
4041
use crate::ModelProviderSDK;
4142
use crate::ProviderAdapter;
4243
use crate::ProviderCapabilities;
@@ -363,8 +364,7 @@ impl ModelProviderSDK for AnthropicProvider {
363364
.context("failed to create anthropic event source")?;
364365
let stream = async_stream::try_stream! {
365366
let mut message_id = String::new();
366-
let mut input_tokens = 0usize;
367-
let mut output_tokens = 0usize;
367+
let mut stream_usage = AnthropicStreamUsage::default();
368368
let mut stop_reason: Option<StopReason> = None;
369369
let mut content_blocks: BTreeMap<usize, ResponseContent> = BTreeMap::new();
370370
let mut reasoning_blocks: BTreeMap<usize, String> = BTreeMap::new();
@@ -412,12 +412,9 @@ impl ModelProviderSDK for AnthropicProvider {
412412
{
413413
message_id = id.to_string();
414414
}
415-
if let Some(usage) = data.get("usage")
416-
&& let Some(input) =
417-
usage.get("input_tokens").and_then(Value::as_u64)
418-
{
419-
input_tokens = input as usize;
420-
}
415+
if let Some(usage) = stream_usage.update_from_message_start(&data) {
416+
yield StreamEvent::UsageDelta(usage);
417+
}
421418
}
422419
"content_block_start" => {
423420
let Some(index) = data.get("index").and_then(Value::as_u64) else {
@@ -696,17 +693,8 @@ impl ModelProviderSDK for AnthropicProvider {
696693
{
697694
stop_reason = Some(parse_stop_reason(reason));
698695
}
699-
if let Some(usage) = data.get("usage") {
700-
if let Some(output) = usage.get("output_tokens").and_then(Value::as_u64)
701-
{
702-
output_tokens = output as usize;
703-
}
704-
yield StreamEvent::UsageDelta(Usage {
705-
input_tokens,
706-
output_tokens,
707-
cache_creation_input_tokens: None,
708-
cache_read_input_tokens: None,
709-
});
696+
if let Some(usage) = stream_usage.update_from_message_delta(&data) {
697+
yield StreamEvent::UsageDelta(usage);
710698
}
711699
}
712700
"message_stop" => {
@@ -732,12 +720,7 @@ impl ModelProviderSDK for AnthropicProvider {
732720
content: dsml_healer
733721
.heal_response_content(content_blocks.into_values().collect()),
734722
stop_reason: stop_reason.clone(),
735-
usage: Usage {
736-
input_tokens,
737-
output_tokens,
738-
cache_creation_input_tokens: None,
739-
cache_read_input_tokens: None,
740-
},
723+
usage: stream_usage.snapshot(),
741724
metadata: ResponseMetadata {
742725
extras: reasoning_blocks
743726
.values()
@@ -777,12 +760,7 @@ impl ModelProviderSDK for AnthropicProvider {
777760
id: message_id,
778761
content: dsml_healer.heal_response_content(content_blocks.into_values().collect()),
779762
stop_reason,
780-
usage: Usage {
781-
input_tokens,
782-
output_tokens,
783-
cache_creation_input_tokens: None,
784-
cache_read_input_tokens: None,
785-
},
763+
usage: stream_usage.snapshot(),
786764
metadata: ResponseMetadata {
787765
extras: reasoning_blocks
788766
.into_values()
@@ -1302,8 +1280,13 @@ fn insert_provider_reasoning_blocks(
13021280
}
13031281

13041282
fn map_usage(usage: &AnthropicUsage) -> Usage {
1283+
let cache_creation_input_tokens = usage.cache_creation_input_tokens.unwrap_or(0);
1284+
let cache_read_input_tokens = usage.cache_read_input_tokens.unwrap_or(0);
13051285
Usage {
1306-
input_tokens: usage.input_tokens,
1286+
input_tokens: usage
1287+
.input_tokens
1288+
.saturating_add(cache_creation_input_tokens)
1289+
.saturating_add(cache_read_input_tokens),
13071290
output_tokens: usage.output_tokens,
13081291
cache_creation_input_tokens: usage.cache_creation_input_tokens,
13091292
cache_read_input_tokens: usage.cache_read_input_tokens,
@@ -1772,7 +1755,7 @@ mod tests {
17721755

17731756
assert_eq!(response.id, "msg_123");
17741757
assert_eq!(response.stop_reason, Some(StopReason::ToolUse));
1775-
assert_eq!(response.usage.input_tokens, 11);
1758+
assert_eq!(response.usage.input_tokens, 19);
17761759
assert_eq!(response.usage.output_tokens, 7);
17771760
assert_eq!(response.usage.cache_creation_input_tokens, Some(3));
17781761
assert_eq!(response.usage.cache_read_input_tokens, Some(5));
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod messages;
22
pub mod role;
3+
mod stream_usage;
34

45
pub use messages::AnthropicProvider;
56
pub use role::AnthropicAIRole;
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use devo_protocol::Usage;
2+
use serde_json::Value;
3+
4+
#[derive(Debug, Default)]
5+
pub(super) struct AnthropicStreamUsage {
6+
uncached_input_tokens: usize,
7+
output_tokens: usize,
8+
cache_creation_input_tokens: Option<usize>,
9+
cache_read_input_tokens: Option<usize>,
10+
}
11+
12+
impl AnthropicStreamUsage {
13+
pub(super) fn update_from_message_start(&mut self, data: &Value) -> Option<Usage> {
14+
let usage = data
15+
.get("message")
16+
.and_then(|message| message.get("usage"))
17+
.or_else(|| data.get("usage"))?;
18+
self.update_from_usage(usage)
19+
}
20+
21+
pub(super) fn update_from_message_delta(&mut self, data: &Value) -> Option<Usage> {
22+
self.update_from_usage(data.get("usage")?)
23+
}
24+
25+
pub(super) fn snapshot(&self) -> Usage {
26+
let cache_creation_input_tokens = self.cache_creation_input_tokens.unwrap_or(0);
27+
let cache_read_input_tokens = self.cache_read_input_tokens.unwrap_or(0);
28+
Usage {
29+
input_tokens: self
30+
.uncached_input_tokens
31+
.saturating_add(cache_creation_input_tokens)
32+
.saturating_add(cache_read_input_tokens),
33+
output_tokens: self.output_tokens,
34+
cache_creation_input_tokens: self.cache_creation_input_tokens,
35+
cache_read_input_tokens: self.cache_read_input_tokens,
36+
}
37+
}
38+
39+
fn update_from_usage(&mut self, usage: &Value) -> Option<Usage> {
40+
let mut updated = false;
41+
if let Some(input_tokens) = usage.get("input_tokens").and_then(Value::as_u64) {
42+
self.uncached_input_tokens = input_tokens as usize;
43+
updated = true;
44+
}
45+
if let Some(output_tokens) = usage.get("output_tokens").and_then(Value::as_u64) {
46+
self.output_tokens = output_tokens as usize;
47+
updated = true;
48+
}
49+
if let Some(cache_creation_input_tokens) = usage
50+
.get("cache_creation_input_tokens")
51+
.and_then(Value::as_u64)
52+
{
53+
self.cache_creation_input_tokens = Some(cache_creation_input_tokens as usize);
54+
updated = true;
55+
}
56+
if let Some(cache_read_input_tokens) =
57+
usage.get("cache_read_input_tokens").and_then(Value::as_u64)
58+
{
59+
self.cache_read_input_tokens = Some(cache_read_input_tokens as usize);
60+
updated = true;
61+
}
62+
63+
updated.then(|| self.snapshot())
64+
}
65+
}

0 commit comments

Comments
 (0)