Skip to content

Commit 407fe98

Browse files
committed
Fix streaming parallel tool calls concatenating JSON arguments
When Anthropic returns multiple tool calls in a single streaming response, the JSON argument fragments from different tool calls were concatenated into one invalid string because input_json_delta events were all routed to @latest_tool_call_id. Anthropic's streaming API includes an `index` field on every content_block_delta that identifies which content block a fragment belongs to. This fix: 1. Passes the block index through extract_tool_calls as a symbol key 2. Registers block index → tool call ID mappings from content_block_start 3. Routes input_json_delta fragments to the correct tool call by index 4. Falls back to @latest_tool_call_id for providers that don't send index Fixes #710
1 parent 4ed9dfb commit 407fe98

4 files changed

Lines changed: 119 additions & 25 deletions

File tree

.tool-versions

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ruby 3.4.7

lib/ruby_llm/providers/anthropic/tools.rb

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,33 @@ def function_for(tool)
6969

7070
def extract_tool_calls(data)
7171
if json_delta?(data)
72-
{ nil => ToolCall.new(id: nil, name: nil, arguments: data.dig('delta', 'partial_json')) }
72+
# Use the content block index as the hash key so the accumulator
73+
# can route fragments to the correct tool call during parallel
74+
# streaming. Without this, all fragments go to @latest_tool_call_id
75+
# and parallel tool call arguments get concatenated together.
76+
block_index = data['index']
77+
key = block_index ? "block_idx_#{block_index}" : nil
78+
{ key => ToolCall.new(id: nil, name: nil, arguments: data.dig('delta', 'partial_json')) }
79+
elsif data['type'] == 'content_block_start' && data.dig('content_block', 'type') == 'tool_use'
80+
block = data['content_block']
81+
build_tool_use_start(block, data['index'])
7382
else
7483
parse_tool_calls(data['content_block'])
7584
end
7685
end
7786

87+
def build_tool_use_start(block, block_index)
88+
input = block['input']
89+
args = input.is_a?(Hash) && input.empty? ? +'' : (input || +'')
90+
tool_calls = { block['id'] => ToolCall.new(id: block['id'], name: block['name'], arguments: args) }
91+
if block_index
92+
tool_calls["register_idx_#{block_index}"] = ToolCall.new(
93+
id: block['id'], name: '_register_block_index', arguments: nil
94+
)
95+
end
96+
tool_calls
97+
end
98+
7899
def parse_tool_calls(content_blocks)
79100
return nil if content_blocks.nil?
80101

lib/ruby_llm/stream_accumulator.rb

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -72,36 +72,51 @@ def tool_calls_from_stream
7272
end
7373
end
7474

75-
def accumulate_tool_calls(new_tool_calls) # rubocop:disable Metrics/PerceivedComplexity
75+
def accumulate_tool_calls(new_tool_calls)
76+
@block_index_to_tool_call_id ||= {}
7677
RubyLLM.logger.debug { "Accumulating tool calls: #{new_tool_calls}" } if RubyLLM.config.log_stream_debug
77-
new_tool_calls.each_value do |tool_call|
78-
if tool_call.id
79-
tool_call_id = tool_call.id.empty? ? SecureRandom.uuid : tool_call.id
80-
tool_call_arguments = tool_call.arguments
81-
if tool_call_arguments.nil? || (tool_call_arguments.respond_to?(:empty?) && tool_call_arguments.empty?)
82-
tool_call_arguments = +''
83-
end
84-
@tool_calls[tool_call.id] = ToolCall.new(
85-
id: tool_call_id,
86-
name: tool_call.name,
87-
arguments: tool_call_arguments,
88-
thought_signature: tool_call.thought_signature
89-
)
90-
@latest_tool_call_id = tool_call.id
78+
new_tool_calls.each do |key, tool_call|
79+
if register_block_index?(key, tool_call)
80+
block_key = key.to_s.sub('register_idx_', 'block_idx_')
81+
@block_index_to_tool_call_id[block_key] = tool_call.id
82+
elsif tool_call.id
83+
register_tool_call(tool_call)
9184
else
92-
existing = @tool_calls[@latest_tool_call_id]
93-
if existing
94-
fragment = tool_call.arguments
95-
fragment = '' if fragment.nil?
96-
existing.arguments << fragment
97-
if tool_call.thought_signature && existing.thought_signature.nil?
98-
existing.thought_signature = tool_call.thought_signature
99-
end
100-
end
85+
append_tool_call_fragment(key, tool_call)
10186
end
10287
end
10388
end
10489

90+
def register_block_index?(key, tool_call)
91+
tool_call.name == '_register_block_index' && key.to_s.start_with?('register_idx_')
92+
end
93+
94+
def register_tool_call(tool_call)
95+
tool_call_id = tool_call.id.empty? ? SecureRandom.uuid : tool_call.id
96+
tool_call_arguments = tool_call.arguments
97+
if tool_call_arguments.nil? || (tool_call_arguments.respond_to?(:empty?) && tool_call_arguments.empty?)
98+
tool_call_arguments = +''
99+
end
100+
@tool_calls[tool_call.id] = ToolCall.new(
101+
id: tool_call_id,
102+
name: tool_call.name,
103+
arguments: tool_call_arguments,
104+
thought_signature: tool_call.thought_signature
105+
)
106+
@latest_tool_call_id = tool_call.id
107+
end
108+
109+
def append_tool_call_fragment(key, tool_call)
110+
target_id = @block_index_to_tool_call_id[key] || @latest_tool_call_id
111+
existing = @tool_calls[target_id]
112+
return unless existing
113+
114+
existing.arguments << (tool_call.arguments || '')
115+
return unless tool_call.thought_signature && existing.thought_signature.nil?
116+
117+
existing.thought_signature = tool_call.thought_signature
118+
end
119+
105120
def find_tool_call(tool_call_id)
106121
if tool_call_id.nil?
107122
@tool_calls[@latest_tool_call]

spec/ruby_llm/stream_accumulator_spec.rb

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,62 @@
1414
message = accumulator.to_message(nil)
1515
expect(message.tool_calls['call_1'].arguments).to eq({})
1616
end
17+
18+
context 'with parallel streaming tool calls' do
19+
it 'keeps arguments separate when block index is provided' do
20+
accumulator = described_class.new
21+
22+
register_tool_call(accumulator, id: 'toolu_A', name: 'get_market_data', block_index: 1)
23+
register_tool_call(accumulator, id: 'toolu_B', name: 'web_search', block_index: 2)
24+
25+
add_delta(accumulator, 'block_idx_1', '{"symbol":')
26+
add_delta(accumulator, 'block_idx_2', '{"query":')
27+
add_delta(accumulator, 'block_idx_1', '"MNQM26"}')
28+
add_delta(accumulator, 'block_idx_2', '"market news"}')
29+
30+
message = accumulator.to_message(nil)
31+
32+
expect(message.tool_calls['toolu_A'].arguments).to eq({ 'symbol' => 'MNQM26' })
33+
expect(message.tool_calls['toolu_B'].arguments).to eq({ 'query' => 'market news' })
34+
end
35+
end
36+
37+
it 'falls back to latest_tool_call_id when no block index is provided' do
38+
accumulator = described_class.new
39+
40+
tc = RubyLLM::ToolCall.new(id: 'call_1', name: 'weather', arguments: +'')
41+
accumulator.add(
42+
RubyLLM::Chunk.new(role: :assistant, content: nil, tool_calls: { 'call_1' => tc })
43+
)
44+
45+
delta = RubyLLM::ToolCall.new(id: nil, name: nil, arguments: '{"city":"NYC"}')
46+
accumulator.add(
47+
RubyLLM::Chunk.new(role: :assistant, content: nil, tool_calls: { nil => delta })
48+
)
49+
50+
message = accumulator.to_message(nil)
51+
expect(message.tool_calls['call_1'].arguments).to eq({ 'city' => 'NYC' })
52+
end
53+
end
54+
55+
private
56+
57+
def register_tool_call(accumulator, id:, name:, block_index:)
58+
tc = RubyLLM::ToolCall.new(id: id, name: name, arguments: +'')
59+
register = RubyLLM::ToolCall.new(id: id, name: '_register_block_index', arguments: nil)
60+
accumulator.add(
61+
RubyLLM::Chunk.new(
62+
role: :assistant,
63+
content: nil,
64+
tool_calls: { id => tc, "register_idx_#{block_index}" => register }
65+
)
66+
)
67+
end
68+
69+
def add_delta(accumulator, block_key, json_fragment)
70+
delta = RubyLLM::ToolCall.new(id: nil, name: nil, arguments: json_fragment)
71+
accumulator.add(
72+
RubyLLM::Chunk.new(role: :assistant, content: nil, tool_calls: { block_key => delta })
73+
)
1774
end
1875
end

0 commit comments

Comments
 (0)