Skip to content

Commit 6486596

Browse files
authored
fix(stream): use subscribeOn instead of publishOn for blocking stream sources (#410)
1 parent 75f3b68 commit 6486596

5 files changed

Lines changed: 6 additions & 5 deletions

File tree

agentscope-core/src/main/java/io/agentscope/core/model/AnthropicChatModel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ protected Flux<ChatResponse> doStream(
171171
// Convert the SDK's Stream to Flux
172172
return AnthropicResponseParser.parseStreamEvents(
173173
Flux.fromStream(streamResponse.stream())
174-
.publishOn(Schedulers.boundedElastic()),
174+
.subscribeOn(
175+
Schedulers.boundedElastic()),
175176
startTime)
176177
.doFinally(
177178
signalType -> {

agentscope-core/src/main/java/io/agentscope/core/model/GeminiChatModel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ protected Flux<ChatResponse> doStream(
207207

208208
// Convert ResponseStream to Flux
209209
return Flux.fromIterable(responseStream)
210-
.publishOn(Schedulers.boundedElastic())
210+
.subscribeOn(Schedulers.boundedElastic())
211211
.map(
212212
response ->
213213
formatter.parseResponse(

agentscope-core/src/main/java/io/agentscope/core/model/OpenAIChatModel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ protected Flux<ChatResponse> doStream(
171171

172172
// Convert the SDK's Stream to Flux
173173
return Flux.fromStream(streamResponse.stream())
174-
.publishOn(Schedulers.boundedElastic())
174+
.subscribeOn(Schedulers.boundedElastic())
175175
.map(chunk -> formatter.parseResponse(chunk, startTime))
176176
.filter(Objects::nonNull)
177177
.doFinally(

agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public Flux<String> stream(HttpRequest request) {
188188
closeQuietly(response);
189189
}
190190
})
191-
.publishOn(Schedulers.boundedElastic());
191+
.subscribeOn(Schedulers.boundedElastic());
192192
}
193193

194194
@Override

agentscope-examples/model-request-compression/src/main/java/io/agentscope/examples/compression/extra/CompressingOkHttpTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ public Flux<String> stream(HttpRequest request) {
244244
closeQuietly(response);
245245
}
246246
})
247-
.publishOn(Schedulers.boundedElastic());
247+
.subscribeOn(Schedulers.boundedElastic());
248248
}
249249

250250
@Override

0 commit comments

Comments
 (0)