Skip to content

Commit 9b3c924

Browse files
authored
feat: add Reactor context propagation for tracing and dedicated tests (#194)
1 parent f865a46 commit 9b3c924

6 files changed

Lines changed: 565 additions & 373 deletions

File tree

agentscope-core/src/main/java/io/agentscope/core/model/AnthropicChatModel.java

Lines changed: 88 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.agentscope.core.formatter.anthropic.AnthropicChatFormatter;
2626
import io.agentscope.core.formatter.anthropic.AnthropicResponseParser;
2727
import io.agentscope.core.message.Msg;
28-
import io.agentscope.core.tracing.TracerRegistry;
2928
import java.time.Instant;
3029
import java.util.List;
3130
import org.slf4j.Logger;
@@ -37,15 +36,18 @@
3736
/**
3837
* Anthropic Chat Model implementation using the official Anthropic Java SDK.
3938
*
40-
* <p>This implementation provides complete integration with Anthropic's Messages API, including
39+
* <p>
40+
* This implementation provides complete integration with Anthropic's Messages
41+
* API, including
4142
* tool calling, streaming support, and extended thinking features.
4243
*
43-
* <p>Important notes:
44+
* <p>
45+
* Important notes:
4446
*
4547
* <ul>
46-
* <li>System messages are handled via the system parameter, not as messages
47-
* <li>Tool results must be in separate user messages
48-
* <li>Supports Claude models (claude-3-*, claude-sonnet-*, etc.)
48+
* <li>System messages are handled via the system parameter, not as messages
49+
* <li>Tool results must be in separate user messages
50+
* <li>Supports Claude models (claude-3-*, claude-sonnet-*, etc.)
4951
* </ul>
5052
*/
5153
public class AnthropicChatModel extends ChatModelBase {
@@ -63,12 +65,15 @@ public class AnthropicChatModel extends ChatModelBase {
6365
/**
6466
* Creates a new Anthropic chat model instance.
6567
*
66-
* @param baseUrl the base URL for Anthropic API (null for default)
67-
* @param apiKey the API key for authentication (null to load from ANTHROPIC_API_KEY env var)
68-
* @param modelName the model name to use (e.g., "claude-sonnet-4-5-20250929")
69-
* @param streamEnabled whether streaming should be enabled
68+
* @param baseUrl the base URL for Anthropic API (null for default)
69+
* @param apiKey the API key for authentication (null to load from
70+
* ANTHROPIC_API_KEY env var)
71+
* @param modelName the model name to use (e.g.,
72+
* "claude-sonnet-4-5-20250929")
73+
* @param streamEnabled whether streaming should be enabled
7074
* @param defaultOptions default generation options
71-
* @param formatter the message formatter to use (null for default Anthropic formatter)
75+
* @param formatter the message formatter to use (null for default
76+
* Anthropic formatter)
7277
*/
7378
public AnthropicChatModel(
7479
String baseUrl,
@@ -102,14 +107,18 @@ public AnthropicChatModel(
102107
/**
103108
* Stream chat completion responses from Anthropic's API.
104109
*
105-
* <p>This method internally handles message formatting using the configured formatter. It
106-
* supports both streaming and non-streaming modes based on the streamEnabled setting.
110+
* <p>
111+
* This method internally handles message formatting using the configured
112+
* formatter. It
113+
* supports both streaming and non-streaming modes based on the streamEnabled
114+
* setting.
107115
*
108-
* <p>Supports timeout and retry configuration through GenerateOptions.
116+
* <p>
117+
* Supports timeout and retry configuration through GenerateOptions.
109118
*
110119
* @param messages AgentScope messages to send to the model
111-
* @param tools Optional list of tool schemas (null or empty if no tools)
112-
* @param options Optional generation options (null to use defaults)
120+
* @param tools Optional list of tool schemas (null or empty if no tools)
121+
* @param options Optional generation options (null to use defaults)
113122
* @return Flux stream of chat responses
114123
*/
115124
@Override
@@ -123,109 +132,77 @@ protected Flux<ChatResponse> doStream(
123132
tools != null && !tools.isEmpty());
124133

125134
Flux<ChatResponse> responseFlux =
126-
Flux.deferContextual(
127-
(reactorCtx) ->
128-
TracerRegistry.get()
129-
.runWithContext(
130-
reactorCtx,
131-
() -> {
132-
try {
133-
// Build message create params
134-
MessageCreateParams.Builder paramsBuilder =
135-
MessageCreateParams.builder()
136-
.model(modelName)
137-
.maxTokens(4096);
135+
Flux.defer(
136+
() -> {
137+
try {
138+
// Build message create params
139+
MessageCreateParams.Builder paramsBuilder =
140+
MessageCreateParams.builder()
141+
.model(modelName)
142+
.maxTokens(4096);
138143

139-
// Extract and apply system message
140-
// (Anthropic-specific requirement)
141-
formatter.applySystemMessage(
142-
paramsBuilder, messages);
144+
// Extract and apply system message
145+
// (Anthropic-specific requirement)
146+
formatter.applySystemMessage(paramsBuilder, messages);
143147

144-
// Use formatter to convert Msg to Anthropic
145-
// MessageParam
146-
List<MessageParam> formattedMessages =
147-
formatter.format(messages);
148-
for (MessageParam param :
149-
formattedMessages) {
150-
paramsBuilder.addMessage(param);
151-
}
148+
// Use formatter to convert Msg to Anthropic
149+
// MessageParam
150+
List<MessageParam> formattedMessages = formatter.format(messages);
151+
for (MessageParam param : formattedMessages) {
152+
paramsBuilder.addMessage(param);
153+
}
152154

153-
// Apply generation options via formatter
154-
formatter.applyOptions(
155-
paramsBuilder,
156-
options,
157-
defaultOptions);
155+
// Apply generation options via formatter
156+
formatter.applyOptions(paramsBuilder, options, defaultOptions);
158157

159-
// Add tools if provided
160-
if (tools != null && !tools.isEmpty()) {
161-
formatter.applyTools(
162-
paramsBuilder, tools);
163-
}
158+
// Add tools if provided
159+
if (tools != null && !tools.isEmpty()) {
160+
formatter.applyTools(paramsBuilder, tools);
161+
}
164162

165-
// Create the request
166-
MessageCreateParams params =
167-
paramsBuilder.build();
163+
// Create the request
164+
MessageCreateParams params = paramsBuilder.build();
168165

169-
if (streamEnabled) {
170-
// Make streaming API call
171-
StreamResponse<RawMessageStreamEvent>
172-
streamResponse =
173-
client.messages()
174-
.createStreaming(
175-
params);
166+
if (streamEnabled) {
167+
// Make streaming API call
168+
StreamResponse<RawMessageStreamEvent> streamResponse =
169+
client.messages().createStreaming(params);
176170

177-
// Convert the SDK's Stream to Flux
178-
return AnthropicResponseParser
179-
.parseStreamEvents(
180-
Flux.fromStream(
181-
streamResponse
182-
.stream())
183-
.publishOn(
184-
Schedulers
185-
.boundedElastic()),
186-
startTime)
187-
.doFinally(
188-
signalType -> {
189-
try {
190-
streamResponse
191-
.close();
192-
} catch (
193-
Exception
194-
e) {
195-
log.debug(
196-
"Error"
197-
+ " closing"
198-
+ " stream"
199-
+ " response",
200-
e);
201-
}
202-
});
203-
} else {
204-
// For non-streaming, make a single call
205-
// via CompletableFuture
206-
return Mono.fromFuture(
207-
client.async()
208-
.messages()
209-
.create(params))
210-
.map(
211-
message ->
212-
formatter
213-
.parseResponse(
214-
message,
215-
startTime))
216-
.flux();
171+
// Convert the SDK's Stream to Flux
172+
return AnthropicResponseParser.parseStreamEvents(
173+
Flux.fromStream(streamResponse.stream())
174+
.publishOn(Schedulers.boundedElastic()),
175+
startTime)
176+
.doFinally(
177+
signalType -> {
178+
try {
179+
streamResponse.close();
180+
} catch (Exception e) {
181+
log.debug(
182+
"Error closing stream"
183+
+ " response",
184+
e);
217185
}
218-
} catch (Exception e) {
219-
return Flux.error(
220-
new ModelException(
221-
"Failed to stream Anthropic"
222-
+ " API: "
223-
+ e.getMessage(),
224-
e,
225-
modelName,
226-
"anthropic"));
227-
}
228-
}));
186+
});
187+
} else {
188+
// For non-streaming, make a single call
189+
// via CompletableFuture
190+
return Mono.fromFuture(client.async().messages().create(params))
191+
.map(
192+
message ->
193+
formatter.parseResponse(
194+
message, startTime))
195+
.flux();
196+
}
197+
} catch (Exception e) {
198+
return Flux.error(
199+
new ModelException(
200+
"Failed to stream Anthropic API: " + e.getMessage(),
201+
e,
202+
modelName,
203+
"anthropic"));
204+
}
205+
});
229206

230207
// Apply timeout and retry if configured
231208
return ModelUtils.applyTimeoutAndRetry(

0 commit comments

Comments
 (0)