Skip to content

Commit b85ef79

Browse files
committed
feat: implement streaming support for Claude model
Switch Claude.generateContent() to use the Anthropic SDK's createStreaming() API when stream=true, replacing the previous non-streaming fallback (which had a TODO: Switch to streaming API comment). - Text deltas are emitted immediately as partial LlmResponses - Tool use JSON is accumulated across inputJson deltas and emitted as a function call LlmResponse on contentBlockStop - messageStop emits a turnComplete=true sentinel - StreamResponse is always closed via Flowable.using() - Extracted buildMessageCreateParams() to deduplicate param logic Add 6 unit tests covering text streaming, tool call accumulation, mixed text+tool responses, empty streams, and invalid JSON fallback.
1 parent ec93f50 commit b85ef79

2 files changed

Lines changed: 464 additions & 8 deletions

File tree

core/src/main/java/com/google/adk/models/Claude.java

Lines changed: 128 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,23 @@
1717
package com.google.adk.models;
1818

1919
import com.anthropic.client.AnthropicClient;
20+
import com.anthropic.core.http.StreamResponse;
2021
import com.anthropic.models.messages.ContentBlock;
2122
import com.anthropic.models.messages.ContentBlockParam;
2223
import com.anthropic.models.messages.Message;
2324
import com.anthropic.models.messages.MessageCreateParams;
2425
import com.anthropic.models.messages.MessageParam;
2526
import com.anthropic.models.messages.MessageParam.Role;
27+
import com.anthropic.models.messages.RawContentBlockDeltaEvent;
28+
import com.anthropic.models.messages.RawContentBlockStartEvent;
29+
import com.anthropic.models.messages.RawMessageStreamEvent;
2630
import com.anthropic.models.messages.TextBlockParam;
2731
import com.anthropic.models.messages.Tool;
2832
import com.anthropic.models.messages.ToolChoice;
2933
import com.anthropic.models.messages.ToolChoiceAuto;
3034
import com.anthropic.models.messages.ToolResultBlockParam;
3135
import com.anthropic.models.messages.ToolUnion;
36+
import com.anthropic.models.messages.ToolUseBlock;
3237
import com.anthropic.models.messages.ToolUseBlockParam;
3338
import com.fasterxml.jackson.core.type.TypeReference;
3439
import com.google.adk.JsonBaseModel;
@@ -53,8 +58,8 @@
5358
/**
5459
* Represents the Claude Generative AI model by Anthropic.
5560
*
56-
* <p>This class provides methods for interacting with Claude models. Streaming and live connections
57-
* are not currently supported for Claude.
61+
* <p>This class provides methods for interacting with Claude models, including streaming responses.
62+
* Live connections are not currently supported for Claude.
5863
*/
5964
public class Claude extends BaseLlm {
6065

@@ -81,7 +86,23 @@ public Claude(String modelName, AnthropicClient anthropicClient, int maxTokens)
8186

8287
@Override
8388
public Flowable<LlmResponse> generateContent(LlmRequest llmRequest, boolean stream) {
84-
// TODO: Switch to streaming API.
89+
MessageCreateParams params = buildMessageCreateParams(llmRequest);
90+
91+
if (stream) {
92+
logger.debug("Sending streaming request to Claude model {}", params.model());
93+
return Flowable.using(
94+
() -> this.anthropicClient.messages().createStreaming(params),
95+
streamResponse -> processStreamingResponse(streamResponse.stream()),
96+
StreamResponse::close);
97+
} else {
98+
logger.debug("Sending request to Claude model {}", params.model());
99+
var message = this.anthropicClient.messages().create(params);
100+
logger.debug("Claude response: {}", message);
101+
return Flowable.just(convertAnthropicResponseToLlmResponse(message));
102+
}
103+
}
104+
105+
private MessageCreateParams buildMessageCreateParams(LlmRequest llmRequest) {
85106
List<MessageParam> messages =
86107
llmRequest.contents().stream()
87108
.map(this::contentToAnthropicMessageParam)
@@ -132,11 +153,112 @@ public Flowable<LlmResponse> generateContent(LlmRequest llmRequest, boolean stre
132153
paramsBuilder.toolChoice(toolChoice);
133154
}
134155

135-
var message = this.anthropicClient.messages().create(paramsBuilder.build());
156+
return paramsBuilder.build();
157+
}
158+
159+
/**
160+
* Converts a stream of raw Anthropic streaming events into a Flowable of {@link LlmResponse}.
161+
*
162+
* <p>Text deltas are emitted immediately as partial responses. Tool use blocks are accumulated
163+
* and emitted as function calls when the block is complete.
164+
*/
165+
private Flowable<LlmResponse> processStreamingResponse(
166+
java.util.stream.Stream<RawMessageStreamEvent> events) {
167+
// Mutable state for accumulating tool call data across events.
168+
// Keys are content block indices from the stream.
169+
Map<Long, String> toolUseIds = new HashMap<>();
170+
Map<Long, String> toolUseNames = new HashMap<>();
171+
Map<Long, StringBuilder> toolUseInputJsons = new HashMap<>();
172+
173+
return Flowable.fromStream(events)
174+
.concatMap(
175+
event -> {
176+
if (event.isContentBlockStart()) {
177+
RawContentBlockStartEvent startEvent = event.asContentBlockStart();
178+
long index = startEvent.index();
179+
Optional<ToolUseBlock> toolUseOpt = startEvent.contentBlock().toolUse();
180+
if (toolUseOpt.isPresent()) {
181+
ToolUseBlock toolUse = toolUseOpt.get();
182+
toolUseIds.put(index, toolUse.id());
183+
toolUseNames.put(index, toolUse.name());
184+
toolUseInputJsons.put(index, new StringBuilder());
185+
}
186+
return Flowable.<LlmResponse>empty();
187+
188+
} else if (event.isContentBlockDelta()) {
189+
RawContentBlockDeltaEvent deltaEvent = event.asContentBlockDelta();
190+
long index = deltaEvent.index();
191+
var delta = deltaEvent.delta();
192+
193+
if (delta.isText()) {
194+
String textChunk = delta.asText().text();
195+
logger.trace("Claude streaming text chunk: {}", textChunk);
196+
return Flowable.just(
197+
LlmResponse.builder()
198+
.content(
199+
Content.builder()
200+
.role("model")
201+
.parts(ImmutableList.of(Part.builder().text(textChunk).build()))
202+
.build())
203+
.partial(true)
204+
.build());
205+
206+
} else if (delta.isInputJson()) {
207+
String jsonChunk = delta.asInputJson().partialJson();
208+
StringBuilder accumulator = toolUseInputJsons.get(index);
209+
if (accumulator != null) {
210+
accumulator.append(jsonChunk);
211+
}
212+
return Flowable.<LlmResponse>empty();
213+
}
214+
return Flowable.<LlmResponse>empty();
215+
216+
} else if (event.isContentBlockStop()) {
217+
long index = event.asContentBlockStop().index();
218+
String id = toolUseIds.remove(index);
219+
String name = toolUseNames.remove(index);
220+
StringBuilder inputJsonBuilder = toolUseInputJsons.remove(index);
221+
222+
if (id != null && name != null && inputJsonBuilder != null) {
223+
Map<String, Object> args;
224+
try {
225+
args =
226+
JsonBaseModel.getMapper()
227+
.readValue(
228+
inputJsonBuilder.toString(),
229+
new TypeReference<Map<String, Object>>() {});
230+
} catch (Exception e) {
231+
logger.warn(
232+
"Failed to parse tool input JSON for tool '{}': {}", name, e.getMessage());
233+
args = ImmutableMap.of();
234+
}
235+
logger.debug("Claude streaming tool call: id={}, name={}", id, name);
236+
return Flowable.just(
237+
LlmResponse.builder()
238+
.content(
239+
Content.builder()
240+
.role("model")
241+
.parts(
242+
ImmutableList.of(
243+
Part.builder()
244+
.functionCall(
245+
FunctionCall.builder()
246+
.id(id)
247+
.name(name)
248+
.args(args)
249+
.build())
250+
.build()))
251+
.build())
252+
.build());
253+
}
254+
return Flowable.<LlmResponse>empty();
136255

137-
logger.debug("Claude response: {}", message);
256+
} else if (event.isMessageStop()) {
257+
return Flowable.just(LlmResponse.builder().turnComplete(true).build());
258+
}
138259

139-
return Flowable.just(convertAnthropicResponseToLlmResponse(message));
260+
return Flowable.<LlmResponse>empty();
261+
});
140262
}
141263

142264
private Role toClaudeRole(String role) {

0 commit comments

Comments
 (0)