1717package com .google .adk .models ;
1818
1919import com .anthropic .client .AnthropicClient ;
20+ import com .anthropic .core .http .StreamResponse ;
2021import com .anthropic .models .messages .ContentBlock ;
2122import com .anthropic .models .messages .ContentBlockParam ;
2223import com .anthropic .models .messages .Message ;
2324import com .anthropic .models .messages .MessageCreateParams ;
2425import com .anthropic .models .messages .MessageParam ;
2526import com .anthropic .models .messages .MessageParam .Role ;
27+ import com .anthropic .models .messages .RawContentBlockDeltaEvent ;
28+ import com .anthropic .models .messages .RawContentBlockStartEvent ;
29+ import com .anthropic .models .messages .RawMessageStreamEvent ;
2630import com .anthropic .models .messages .TextBlockParam ;
2731import com .anthropic .models .messages .Tool ;
2832import com .anthropic .models .messages .ToolChoice ;
2933import com .anthropic .models .messages .ToolChoiceAuto ;
3034import com .anthropic .models .messages .ToolResultBlockParam ;
3135import com .anthropic .models .messages .ToolUnion ;
36+ import com .anthropic .models .messages .ToolUseBlock ;
3237import com .anthropic .models .messages .ToolUseBlockParam ;
3338import com .fasterxml .jackson .core .type .TypeReference ;
3439import com .google .adk .JsonBaseModel ;
5358/**
5459 * Represents the Claude Generative AI model by Anthropic.
5560 *
56- * <p>This class provides methods for interacting with Claude models. Streaming and live connections
57- * are not currently supported for Claude.
61+ * <p>This class provides methods for interacting with Claude models, including streaming responses.
62+ * Live connections are not currently supported for Claude.
5863 */
5964public class Claude extends BaseLlm {
6065
@@ -81,7 +86,23 @@ public Claude(String modelName, AnthropicClient anthropicClient, int maxTokens)
8186
8287 @ Override
8388 public Flowable <LlmResponse > generateContent (LlmRequest llmRequest , boolean stream ) {
84- // TODO: Switch to streaming API.
89+ MessageCreateParams params = buildMessageCreateParams (llmRequest );
90+
91+ if (stream ) {
92+ logger .debug ("Sending streaming request to Claude model {}" , params .model ());
93+ return Flowable .using (
94+ () -> this .anthropicClient .messages ().createStreaming (params ),
95+ streamResponse -> processStreamingResponse (streamResponse .stream ()),
96+ StreamResponse ::close );
97+ } else {
98+ logger .debug ("Sending request to Claude model {}" , params .model ());
99+ var message = this .anthropicClient .messages ().create (params );
100+ logger .debug ("Claude response: {}" , message );
101+ return Flowable .just (convertAnthropicResponseToLlmResponse (message ));
102+ }
103+ }
104+
105+ private MessageCreateParams buildMessageCreateParams (LlmRequest llmRequest ) {
85106 List <MessageParam > messages =
86107 llmRequest .contents ().stream ()
87108 .map (this ::contentToAnthropicMessageParam )
@@ -132,11 +153,112 @@ public Flowable<LlmResponse> generateContent(LlmRequest llmRequest, boolean stre
132153 paramsBuilder .toolChoice (toolChoice );
133154 }
134155
135- var message = this .anthropicClient .messages ().create (paramsBuilder .build ());
156+ return paramsBuilder .build ();
157+ }
158+
159+ /**
160+ * Converts a stream of raw Anthropic streaming events into a Flowable of {@link LlmResponse}.
161+ *
162+ * <p>Text deltas are emitted immediately as partial responses. Tool use blocks are accumulated
163+ * and emitted as function calls when the block is complete.
164+ */
165+ private Flowable <LlmResponse > processStreamingResponse (
166+ java .util .stream .Stream <RawMessageStreamEvent > events ) {
167+ // Mutable state for accumulating tool call data across events.
168+ // Keys are content block indices from the stream.
169+ Map <Long , String > toolUseIds = new HashMap <>();
170+ Map <Long , String > toolUseNames = new HashMap <>();
171+ Map <Long , StringBuilder > toolUseInputJsons = new HashMap <>();
172+
173+ return Flowable .fromStream (events )
174+ .concatMap (
175+ event -> {
176+ if (event .isContentBlockStart ()) {
177+ RawContentBlockStartEvent startEvent = event .asContentBlockStart ();
178+ long index = startEvent .index ();
179+ Optional <ToolUseBlock > toolUseOpt = startEvent .contentBlock ().toolUse ();
180+ if (toolUseOpt .isPresent ()) {
181+ ToolUseBlock toolUse = toolUseOpt .get ();
182+ toolUseIds .put (index , toolUse .id ());
183+ toolUseNames .put (index , toolUse .name ());
184+ toolUseInputJsons .put (index , new StringBuilder ());
185+ }
186+ return Flowable .<LlmResponse >empty ();
187+
188+ } else if (event .isContentBlockDelta ()) {
189+ RawContentBlockDeltaEvent deltaEvent = event .asContentBlockDelta ();
190+ long index = deltaEvent .index ();
191+ var delta = deltaEvent .delta ();
192+
193+ if (delta .isText ()) {
194+ String textChunk = delta .asText ().text ();
195+ logger .trace ("Claude streaming text chunk: {}" , textChunk );
196+ return Flowable .just (
197+ LlmResponse .builder ()
198+ .content (
199+ Content .builder ()
200+ .role ("model" )
201+ .parts (ImmutableList .of (Part .builder ().text (textChunk ).build ()))
202+ .build ())
203+ .partial (true )
204+ .build ());
205+
206+ } else if (delta .isInputJson ()) {
207+ String jsonChunk = delta .asInputJson ().partialJson ();
208+ StringBuilder accumulator = toolUseInputJsons .get (index );
209+ if (accumulator != null ) {
210+ accumulator .append (jsonChunk );
211+ }
212+ return Flowable .<LlmResponse >empty ();
213+ }
214+ return Flowable .<LlmResponse >empty ();
215+
216+ } else if (event .isContentBlockStop ()) {
217+ long index = event .asContentBlockStop ().index ();
218+ String id = toolUseIds .remove (index );
219+ String name = toolUseNames .remove (index );
220+ StringBuilder inputJsonBuilder = toolUseInputJsons .remove (index );
221+
222+ if (id != null && name != null && inputJsonBuilder != null ) {
223+ Map <String , Object > args ;
224+ try {
225+ args =
226+ JsonBaseModel .getMapper ()
227+ .readValue (
228+ inputJsonBuilder .toString (),
229+ new TypeReference <Map <String , Object >>() {});
230+ } catch (Exception e ) {
231+ logger .warn (
232+ "Failed to parse tool input JSON for tool '{}': {}" , name , e .getMessage ());
233+ args = ImmutableMap .of ();
234+ }
235+ logger .debug ("Claude streaming tool call: id={}, name={}" , id , name );
236+ return Flowable .just (
237+ LlmResponse .builder ()
238+ .content (
239+ Content .builder ()
240+ .role ("model" )
241+ .parts (
242+ ImmutableList .of (
243+ Part .builder ()
244+ .functionCall (
245+ FunctionCall .builder ()
246+ .id (id )
247+ .name (name )
248+ .args (args )
249+ .build ())
250+ .build ()))
251+ .build ())
252+ .build ());
253+ }
254+ return Flowable .<LlmResponse >empty ();
136255
137- logger .debug ("Claude response: {}" , message );
256+ } else if (event .isMessageStop ()) {
257+ return Flowable .just (LlmResponse .builder ().turnComplete (true ).build ());
258+ }
138259
139- return Flowable .just (convertAnthropicResponseToLlmResponse (message ));
260+ return Flowable .<LlmResponse >empty ();
261+ });
140262 }
141263
142264 private Role toClaudeRole (String role ) {
0 commit comments