Skip to content

Commit 51b3bbe

Browse files
Patel230claude
andcommitted
feat: native tools, LLM compaction, thinking streamer, model fallback, retry jitter
- Native tool calling for Anthropic (tool_use), OpenAI (function_calling), Gemini (function declarations) — all with multi-round loops - NativeToolExecutor with parallel/sequential execution - JSON schema generator (ToolSchema, NativeToolDefinition, parseToolDescription) - ThinkingStreamer interface + Anthropic implementation (CompleteStreamWithThinking) - LLMCompactionStrategy — uses provider to summarize old turns, falls back to tiered - Azure provider wired into NewProvider switch - Tool.Execute / ToolCall.Args upgraded: map[string]string → map[string]interface{} ArgStr() helper added; 25+ call sites updated - CompletionOptions.Model for per-call model override across all providers - WithFallbackModel + isFallbackError for automatic model fallback on not-found errors - Retry jitter: ±25% randomness on backoff to prevent thundering herd - Event channel buffer 256 → 1024 for parallel tool burst headroom - Tool output cap: 200-line head+tail truncation via capToolOutput() - Message-level prompt caching (cache_control on penultimate message) - Stuck-loop detection: abort if same tool call repeats 3× - Additional tests: native tools, multi-turn, context window, stuck-loop Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent a5b56c5 commit 51b3bbe

30 files changed

Lines changed: 2403 additions & 247 deletions

agent.go

Lines changed: 269 additions & 54 deletions
Large diffs are not rendered by default.

agent_test.go

Lines changed: 69 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ func TestAgentSimpleResponse(t *testing.T) {
3535
func TestAgentToolCall(t *testing.T) {
3636
toolCall := iteragent.ToolCall{
3737
Tool: "echo",
38-
Args: map[string]string{"msg": "ping"},
38+
Args: map[string]interface{}{"msg": "ping"},
3939
}
4040

4141
called := false
4242
echoTool := iteragent.Tool{
4343
Name: "echo",
4444
Description: "Echo the msg argument",
45-
Execute: func(ctx context.Context, args map[string]string) (string, error) {
45+
Execute: func(ctx context.Context, args map[string]interface{}) (string, error) {
4646
called = true
4747
return "pong", nil
4848
},
@@ -130,6 +130,57 @@ func TestParseToolCallsNone(t *testing.T) {
130130
}
131131
}
132132

133+
// TestParseToolCallsMalformedJSON verifies that truncated/malformed tool blocks
134+
// are recovered where possible and silently dropped when unrecoverable.
135+
func TestParseToolCallsMalformedJSON(t *testing.T) {
136+
cases := []struct {
137+
name string
138+
input string
139+
wantN int
140+
wantTool string
141+
}{
142+
{
143+
name: "truncated after tool value",
144+
input: "```tool\n{\"tool\":\"bash\",\"args\":{\"command\":\"ls\"\n```",
145+
wantN: 1,
146+
wantTool: "bash",
147+
},
148+
{
149+
name: "missing outer closing brace",
150+
input: "```tool\n{\"tool\":\"read_file\",\"args\":{\"path\":\"/tmp/x\"}\n```",
151+
wantN: 1,
152+
wantTool: "read_file",
153+
},
154+
{
155+
name: "completely invalid JSON",
156+
input: "```tool\nnot json at all\n```",
157+
wantN: 0,
158+
},
159+
{
160+
name: "empty block",
161+
input: "```tool\n\n```",
162+
wantN: 0,
163+
},
164+
{
165+
name: "trailing garbage after closing brace",
166+
input: "```tool\n{\"tool\":\"bash\",\"args\":{\"command\":\"ls\"}} extra stuff\n```",
167+
wantN: 1,
168+
wantTool: "bash",
169+
},
170+
}
171+
for _, tc := range cases {
172+
t.Run(tc.name, func(t *testing.T) {
173+
calls := iteragent.ParseToolCalls(tc.input)
174+
if len(calls) != tc.wantN {
175+
t.Fatalf("want %d calls, got %d", tc.wantN, len(calls))
176+
}
177+
if tc.wantN > 0 && calls[0].Tool != tc.wantTool {
178+
t.Errorf("want tool=%q, got %q", tc.wantTool, calls[0].Tool)
179+
}
180+
})
181+
}
182+
}
183+
133184
// TestToolDescriptions verifies that tool descriptions are formatted.
134185
func TestToolDescriptions(t *testing.T) {
135186
tools := []iteragent.Tool{
@@ -226,13 +277,13 @@ func TestAgentHooks_BeforeAfterTurn(t *testing.T) {
226277
func TestAgentHooks_OnToolStartEnd(t *testing.T) {
227278
toolCall := iteragent.ToolCall{
228279
Tool: "greet",
229-
Args: map[string]string{"name": "world"},
280+
Args: map[string]interface{}{"name": "world"},
230281
}
231282
greetTool := iteragent.Tool{
232283
Name: "greet",
233284
Description: "Greet someone",
234-
Execute: func(ctx context.Context, args map[string]string) (string, error) {
235-
return "hello " + args["name"], nil
285+
Execute: func(ctx context.Context, args map[string]interface{}) (string, error) {
286+
return "hello " + iteragent.ArgStr(args, "name"), nil
236287
},
237288
}
238289

@@ -243,10 +294,10 @@ func TestAgentHooks_OnToolStartEnd(t *testing.T) {
243294
var endResults []string
244295
var endErrors []error
245296
a.WithHooks(iteragent.AgentHooks{
246-
OnToolStart: func(toolName string, args map[string]string) {
297+
OnToolStart: func(toolName string, args map[string]interface{}) {
247298
startTools = append(startTools, toolName)
248-
if args["name"] != "world" {
249-
t.Errorf("OnToolStart: args[name]=%q, want %q", args["name"], "world")
299+
if iteragent.ArgStr(args, "name") != "world" {
300+
t.Errorf("OnToolStart: args[name]=%q, want %q", iteragent.ArgStr(args, "name"), "world")
250301
}
251302
},
252303
OnToolEnd: func(toolName string, result string, err error) {
@@ -357,10 +408,10 @@ func TestAgentTokenStreamer_RunReturnsFullResponse(t *testing.T) {
357408
// call precedes the final response. The mock streams every turn, so we verify
358409
// that EventTokenUpdate events were emitted and the final answer is correct.
359410
func TestAgentTokenStreamer_WithTools(t *testing.T) {
360-
toolCall := iteragent.ToolCall{Tool: "noop", Args: map[string]string{}}
411+
toolCall := iteragent.ToolCall{Tool: "noop", Args: map[string]interface{}{}}
361412
noopTool := iteragent.Tool{
362413
Name: "noop",
363-
Execute: func(ctx context.Context, args map[string]string) (string, error) {
414+
Execute: func(ctx context.Context, args map[string]interface{}) (string, error) {
364415
return "done", nil
365416
},
366417
}
@@ -421,13 +472,13 @@ func TestAgentClose(t *testing.T) {
421472
// when the parallel execution strategy is used.
422473
func TestAgentHooks_Parallel(t *testing.T) {
423474
calls := []iteragent.ToolCall{
424-
{Tool: "t1", Args: map[string]string{}},
425-
{Tool: "t2", Args: map[string]string{}},
475+
{Tool: "t1", Args: map[string]interface{}{}},
476+
{Tool: "t2", Args: map[string]interface{}{}},
426477
}
427478
makeTool := func(name string) iteragent.Tool {
428479
return iteragent.Tool{
429480
Name: name,
430-
Execute: func(ctx context.Context, args map[string]string) (string, error) {
481+
Execute: func(ctx context.Context, args map[string]interface{}) (string, error) {
431482
return name + "-result", nil
432483
},
433484
}
@@ -441,7 +492,7 @@ func TestAgentHooks_Parallel(t *testing.T) {
441492
var started, ended []string
442493

443494
a.WithHooks(iteragent.AgentHooks{
444-
OnToolStart: func(toolName string, args map[string]string) {
495+
OnToolStart: func(toolName string, args map[string]interface{}) {
445496
mu.Lock()
446497
started = append(started, toolName)
447498
mu.Unlock()
@@ -507,10 +558,10 @@ func TestPromptMessages_Hooks_BeforeAfterTurn(t *testing.T) {
507558
// TestPromptMessages_Hooks_OnToolStartEnd verifies OnToolStart and OnToolEnd fire
508559
// around each tool execution during PromptMessages().
509560
func TestPromptMessages_Hooks_OnToolStartEnd(t *testing.T) {
510-
toolCall := iteragent.ToolCall{Tool: "ping", Args: map[string]string{}}
561+
toolCall := iteragent.ToolCall{Tool: "ping", Args: map[string]interface{}{}}
511562
pingTool := iteragent.Tool{
512563
Name: "ping",
513-
Execute: func(ctx context.Context, args map[string]string) (string, error) {
564+
Execute: func(ctx context.Context, args map[string]interface{}) (string, error) {
514565
return "pong", nil
515566
},
516567
}
@@ -519,7 +570,7 @@ func TestPromptMessages_Hooks_OnToolStartEnd(t *testing.T) {
519570

520571
var startTools, endTools []string
521572
a.WithHooks(iteragent.AgentHooks{
522-
OnToolStart: func(toolName string, args map[string]string) {
573+
OnToolStart: func(toolName string, args map[string]interface{}) {
523574
startTools = append(startTools, toolName)
524575
},
525576
OnToolEnd: func(toolName string, result string, err error) {
@@ -778,7 +829,7 @@ func TestAgentWithMcpServerHttp_ToolCallable(t *testing.T) {
778829
ctx := context.Background()
779830
// Agent will call mcp_echo on turn 1, then return "done" on turn 2.
780831
p := iteragent.NewMockWithTools("done", []iteragent.ToolCall{
781-
{Tool: "mcp_echo", Args: map[string]string{}},
832+
{Tool: "mcp_echo", Args: map[string]interface{}{}},
782833
})
783834
a, err := iteragent.New(p, nil, testLogger()).
784835
WithMcpServerHttp(ctx, srv.URL)

anthropic.go

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,22 @@ func (p *anthropicProvider) Name() string {
3535
return fmt.Sprintf("anthropic(%s)", p.cfg.Model)
3636
}
3737

38+
// ContextWindow returns the context window for the configured Anthropic model.
39+
func (p *anthropicProvider) ContextWindow() int {
40+
return anthropicContextWindow(p.cfg.Model)
41+
}
42+
43+
func anthropicContextWindow(model string) int {
44+
// All current Claude models support 200k context.
45+
// Older claude-instant / claude-2 are 100k.
46+
switch {
47+
case strings.HasPrefix(model, "claude-instant"), strings.HasPrefix(model, "claude-2"):
48+
return 100_000
49+
default:
50+
return 200_000
51+
}
52+
}
53+
3854
type anthropicResponse struct {
3955
Content []struct {
4056
Text string `json:"text"`
@@ -75,8 +91,12 @@ func (p *anthropicProvider) buildAnthropicBody(messages []Message, opt Completio
7591
if opt.MaxTokens > 0 {
7692
maxTokens = opt.MaxTokens
7793
}
94+
model := p.cfg.Model
95+
if opt.Model != "" {
96+
model = opt.Model
97+
}
7898
reqMap := map[string]interface{}{
79-
"model": p.cfg.Model,
99+
"model": model,
80100
"max_tokens": maxTokens,
81101
"messages": filtered,
82102
}
@@ -93,6 +113,35 @@ func (p *anthropicProvider) buildAnthropicBody(messages []Message, opt Completio
93113
reqMap["system"] = system
94114
}
95115
}
116+
117+
// Message-level caching: add cache_control to the penultimate message to
118+
// cache the conversation history before the current user turn.
119+
msgCacheEnabled := opt.CacheConfig != nil && opt.CacheConfig.Enabled && opt.CacheConfig.CacheMessages
120+
if msgCacheEnabled && len(filtered) >= 2 {
121+
type contentBlock struct {
122+
Type string `json:"type"`
123+
Text string `json:"text"`
124+
CacheControl map[string]string `json:"cache_control,omitempty"`
125+
}
126+
type cachedMsg struct {
127+
Role string `json:"role"`
128+
Content []contentBlock `json:"content"`
129+
}
130+
msgs := make([]interface{}, len(filtered))
131+
for i, m := range filtered {
132+
if i == len(filtered)-2 {
133+
msgs[i] = cachedMsg{
134+
Role: m.Role,
135+
Content: []contentBlock{
136+
{Type: "text", Text: m.Content, CacheControl: map[string]string{"type": "ephemeral"}},
137+
},
138+
}
139+
} else {
140+
msgs[i] = map[string]interface{}{"role": m.Role, "content": m.Content}
141+
}
142+
}
143+
reqMap["messages"] = msgs
144+
}
96145
if opt.ThinkingLevel != ThinkingLevelOff && opt.ThinkingLevel != "" {
97146
budget := thinkingBudget(opt.ThinkingLevel)
98147
reqMap["thinking"] = map[string]interface{}{
@@ -138,6 +187,48 @@ func (p *anthropicProvider) CompleteStream(ctx context.Context, messages []Messa
138187
return result, nil
139188
}
140189

190+
// CompleteStreamWithThinking implements ThinkingStreamer. It delivers both
191+
// text tokens (via onToken) and thinking tokens (via onThinking) as they
192+
// arrive. Returns the full concatenated text response.
193+
func (p *anthropicProvider) CompleteStreamWithThinking(ctx context.Context, messages []Message, opt CompletionOptions, onToken func(string), onThinking func(string)) (string, error) {
194+
body, err := p.buildAnthropicBody(messages, opt, true)
195+
if err != nil {
196+
return "", fmt.Errorf("marshal request: %w", err)
197+
}
198+
199+
headers := map[string]string{
200+
"x-api-key": p.cfg.APIKey,
201+
"anthropic-version": "2023-06-01",
202+
}
203+
if opt.CacheConfig != nil && opt.CacheConfig.Enabled {
204+
headers["anthropic-beta"] = "prompt-caching-2024-07-31"
205+
}
206+
207+
var full strings.Builder
208+
sseClient := NewSSEClient()
209+
err = sseClient.Stream(ctx, "https://api.anthropic.com/v1/messages", headers, body, func(e SSEEvent) {
210+
if token, ok := ParseAnthropicSSE(e.Data); ok && token != "" {
211+
full.WriteString(token)
212+
if onToken != nil {
213+
onToken(token)
214+
}
215+
}
216+
if thinking, ok := ParseAnthropicSSEThinking(e.Data); ok && thinking != "" {
217+
if onThinking != nil {
218+
onThinking(thinking)
219+
}
220+
}
221+
})
222+
if err != nil {
223+
return "", fmt.Errorf("anthropic stream: %w", err)
224+
}
225+
result := full.String()
226+
if result == "" {
227+
return "", fmt.Errorf("empty streaming response from anthropic")
228+
}
229+
return result, nil
230+
}
231+
141232
func (p *anthropicProvider) Complete(ctx context.Context, messages []Message, opts ...CompletionOptions) (string, error) {
142233
var opt CompletionOptions
143234
if len(opts) > 0 {

0 commit comments

Comments
 (0)