From 260f59b9512060e9f639cd0899047f95a7bdb860 Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Thu, 30 Apr 2026 18:33:26 +0800 Subject: [PATCH 1/4] fix(transport): resolve SSE stream timeout in JdkHttpTransport --- .../model/transport/HttpTransportConfig.java | 56 +++++++- .../model/transport/JdkHttpTransport.java | 111 +++++++++------ .../model/transport/JdkHttpTransportTest.java | 134 +++++++++++++++++- 3 files changed, 252 insertions(+), 49 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java index a824ed34d..f73f7a171 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java @@ -28,13 +28,21 @@ public class HttpTransportConfig { /** Default connect timeout: 30 seconds. */ public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(30); - /** Default read timeout: 5 minutes (for long-running model calls). */ + /** Default response timeout (TTFT): 5 minutes (Time To First Token for streaming). */ + public static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofMinutes(5); + + /** Default stream idle timeout: 30 seconds (Maximum wait time between consecutive data chunks). */ + public static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = Duration.ofSeconds(30); + + /** Default read timeout: 5 minutes (Overall timeout for non-streaming calls). */ public static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(5); /** Default write timeout: 30 seconds. */ public static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(30); private final Duration connectTimeout; + private final Duration responseTimeout; + private final Duration streamIdleTimeout; private final Duration readTimeout; private final Duration writeTimeout; private final int maxIdleConnections; @@ -45,6 +53,8 @@ public class HttpTransportConfig { private HttpTransportConfig(Builder builder) { this.connectTimeout = builder.connectTimeout; + this.responseTimeout = builder.responseTimeout; + this.streamIdleTimeout = builder.streamIdleTimeout; this.readTimeout = builder.readTimeout; this.writeTimeout = builder.writeTimeout; this.maxIdleConnections = builder.maxIdleConnections; @@ -64,7 +74,25 @@ public Duration getConnectTimeout() { } /** - * Get the read timeout. + * Get the response timeout (Time To First Token for streaming). + * + * @return the response timeout duration + */ + public Duration getResponseTimeout() { + return responseTimeout; + } + + /** + * Get the stream idle timeout (maximum time between two consecutive data chunks). + * + * @return the stream idle timeout duration + */ + public Duration getStreamIdleTimeout() { + return streamIdleTimeout; + } + + /** + * Get the read timeout(for non-streaming). * * @return the read timeout duration */ @@ -153,6 +181,8 @@ public static HttpTransportConfig defaults() { */ public static class Builder { private Duration connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private Duration responseTimeout = DEFAULT_RESPONSE_TIMEOUT; + private Duration streamIdleTimeout = DEFAULT_STREAM_IDLE_TIMEOUT; private Duration readTimeout = DEFAULT_READ_TIMEOUT; private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT; private int maxIdleConnections = 5; @@ -172,6 +202,28 @@ public Builder connectTimeout(Duration connectTimeout) { return this; } + /** + * Set the response timeout (Time To First Byte). + * + * @param responseTimeout the response timeout duration + * @return this builder + */ + public Builder responseTimeout(Duration responseTimeout) { + this.responseTimeout = responseTimeout; + return this; + } + + /** + * Set the stream idle timeout. + * + * @param streamIdleTimeout the stream idle timeout duration + * @return this builder + */ + public Builder streamIdleTimeout(Duration streamIdleTimeout) { + this.streamIdleTimeout = streamIdleTimeout; + return this; + } + /** * Set the read timeout. * diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index 5bb749e64..0ad4da3e1 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -38,8 +38,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -174,7 +174,7 @@ public HttpResponse execute(HttpRequest request) throws HttpTransportException { throw new HttpTransportException("Transport has been closed"); } - var jdkRequest = buildJdkRequest(request); + var jdkRequest = buildJdkRequest(request, false); try { var response = client.send(jdkRequest, BodyHandlers.ofString()); @@ -193,50 +193,64 @@ public Flux stream(HttpRequest request) { return Flux.error(new HttpTransportException("Transport has been closed")); } - var jdkRequest = buildJdkRequest(request); - - // Check status code and read error body immediately when CompletableFuture completes - // to avoid stream being closed before we can read it - CompletableFuture> future = - client.sendAsync(jdkRequest, BodyHandlers.ofInputStream()) - .thenApply( - response -> { - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - // Read error body immediately while stream is still open - String errorBody = readInputStream(response.body()); - log.warn( - "HTTP request failed. URL: {} | Status: {} | Error:" - + " {}", - request.getUrl(), + var jdkRequest = buildJdkRequest(request, true); + + // Use Mono.fromFuture() to ensure lazy execution and proper cancellation propagation. + // This prevents "ghost connections" from leaking if the downstream cancels or times out + // before headers arrive. + return Mono.fromFuture(() -> client.sendAsync(jdkRequest, BodyHandlers.ofInputStream())) + .flatMapMany( + response -> { + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + String errorBody = readInputStream(response.body()); + log.warn( + "HTTP request failed. URL: {} | Status: {} | Error: {}", + request.getUrl(), + statusCode, + errorBody); + return Flux.error( + new HttpTransportException( + "HTTP request failed with status " + + statusCode + + " | " + + errorBody, statusCode, - errorBody); - throw new CompletionException( - new HttpTransportException( - "HTTP request failed with status " - + statusCode - + " | " - + errorBody, - statusCode, - errorBody)); - } - return response; - }); - - return Mono.fromCompletionStage(future) - .flatMapMany(response -> processStreamResponse(response, request)) - .publishOn(Schedulers.boundedElastic()) + errorBody)); + } + return processStreamResponse(response, request); + }) + .timeout( + // Timeout strategy 1: Time To First Token (TTFT). + // The maximum time to wait for the first piece of data. + Mono.delay( + config.getResponseTimeout() != null + ? config.getResponseTimeout() + : HttpTransportConfig.DEFAULT_RESPONSE_TIMEOUT), + + // Timeout strategy 2: Inter-token gap (Stream Idle Timeout). + // The maximum time to wait between receiving two consecutive data chunks. + data -> + Mono.delay( + config.getStreamIdleTimeout() != null + ? config.getStreamIdleTimeout() + : HttpTransportConfig.DEFAULT_STREAM_IDLE_TIMEOUT)) .onErrorMap( - e -> !(e instanceof HttpTransportException), e -> { + if (e instanceof TimeoutException) { + return new HttpTransportException( + "Stream timeout: " + e.getMessage(), e); + } + if (e instanceof HttpTransportException) { + return e; + } Throwable cause = e instanceof CompletionException ? e.getCause() : e; if (cause instanceof HttpTransportException) { - return (HttpTransportException) cause; + return cause; } return new HttpTransportException( "SSE/NDJSON stream failed: " + e.getMessage(), e); - }) - .subscribeOn(Schedulers.boundedElastic()); + }); } private Flux processStreamResponse( @@ -253,11 +267,13 @@ private Flux processStreamResponse( // Use Flux.using to manage resource lifecycle return Flux.using( - () -> - new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.UTF_8)), - reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader), - this::closeQuietly); + () -> + new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8)), + reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader), + this::closeQuietly) + // reader.lines() uses blocking I/O internally + .subscribeOn(Schedulers.boundedElastic()); } private Flux readSseLines(BufferedReader reader) { @@ -310,7 +326,7 @@ public boolean isClosed() { return closed.get(); } - private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) { + private java.net.http.HttpRequest buildJdkRequest(HttpRequest request, boolean isStreaming) { URI uri; try { uri = URI.create(request.getUrl()); @@ -318,8 +334,11 @@ private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) { throw new HttpTransportException("Invalid URL: " + request.getUrl(), e); } - var builder = - java.net.http.HttpRequest.newBuilder().uri(uri).timeout(config.getReadTimeout()); + var builder = java.net.http.HttpRequest.newBuilder().uri(uri); + + if (!isStreaming && config.getReadTimeout() != null) { + builder.timeout(config.getReadTimeout()); + } for (Map.Entry header : request.getHeaders().entrySet()) { builder.header(header.getKey(), header.getValue()); diff --git a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java index 0f0695142..ed019f8f5 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java @@ -57,7 +57,9 @@ void setUp() throws Exception { HttpTransportConfig config = HttpTransportConfig.builder() .connectTimeout(Duration.ofSeconds(5)) - .readTimeout(Duration.ofSeconds(10)) + .readTimeout(Duration.ofSeconds(2)) // Global timeout for sync calls + .responseTimeout(Duration.ofSeconds(2)) // TTFT for streaming + .streamIdleTimeout(Duration.ofSeconds(1)) // Inter-token gap for streaming .build(); transport = new JdkHttpTransport(config); } @@ -304,6 +306,8 @@ void testJdkHttpTransportBuilder() { HttpTransportConfig.builder() .connectTimeout(Duration.ofSeconds(10)) .readTimeout(Duration.ofSeconds(30)) + .responseTimeout(Duration.ofSeconds(45)) + .streamIdleTimeout(Duration.ofSeconds(15)) .build(); JdkHttpTransport builtTransport = JdkHttpTransport.builder().config(config).build(); @@ -311,6 +315,8 @@ void testJdkHttpTransportBuilder() { assertNotNull(builtTransport); assertNotNull(builtTransport.getClient()); assertEquals(config, builtTransport.getConfig()); + assertEquals(Duration.ofSeconds(45), builtTransport.getConfig().getResponseTimeout()); + assertEquals(Duration.ofSeconds(15), builtTransport.getConfig().getStreamIdleTimeout()); assertFalse(builtTransport.isClosed()); builtTransport.close(); assertTrue(builtTransport.isClosed()); @@ -1063,4 +1069,130 @@ void testHttpVersionConfig() { assertNotNull(jdkHttpTransport); assertNotNull(jdkHttpTransport2); } + + @Test + void testStreamColdStartSurvivesGlobalTimeout() throws Exception { + // Reproduces the bug reported in the issue 1302 + + HttpTransportConfig customConfig = + HttpTransportConfig.builder() + .readTimeout(Duration.ofSeconds(1)) // Very tight global timeout + .responseTimeout(Duration.ofSeconds(4)) // Ample Time-To-First-Token timeout + .streamIdleTimeout(Duration.ofSeconds(2)) + .build(); + + JdkHttpTransport customTransport = new JdkHttpTransport(customConfig); + + try { + // Simulate the cold start overhead + LLM thinking time by delaying headers for 2 + // seconds. + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeadersDelay(2, TimeUnit.SECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/cold-start-bug-reproduction").toString()) + .method("POST") + .body("{}") + .build(); + + // The test succeeds ONLY if the stream survives the 2-second initial delay + // without being killed by the 1-second global readTimeout. + StepVerifier.create(customTransport.stream(request)) + .expectNextMatches(data -> data.contains("\"id\":\"1\"")) + .verifyComplete(); + } finally { + customTransport.close(); + } + } + + @Test + void testStreamResponseTimeout() { + // Test Timeout Strategy 1 (TTFT): + // Delay headers by 3 seconds, which exceeds the configured responseTimeout (2 seconds). + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeader("Content-Type", "text/event-stream") + .setHeadersDelay(3, TimeUnit.SECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/ttft-timeout").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(transport.stream(request)) + .expectErrorMatches( + e -> + e instanceof HttpTransportException + && e.getMessage().contains("Stream timeout")) + .verify(Duration.ofSeconds(5)); + } + + @Test + void testStreamIdleTimeout() { + // Test Timeout Strategy 2 (Inter-token gap): + // Throttle body to emit 1 byte every 2 seconds. + // This exceeds the configured streamIdleTimeout (1 second). + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody("data: {\"id\":\"1\"}\n\ndata: {\"id\":\"2\"}\n\n") + .throttleBody(1, 2, TimeUnit.SECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/idle-timeout").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(transport.stream(request)) + .expectErrorMatches( + e -> + e instanceof HttpTransportException + && e.getMessage().contains("Stream timeout")) + .verify(Duration.ofSeconds(5)); + } + + @Test + void testStreamSurvivesGlobalReadTimeout() { + // Verify that streaming requests are NOT killed by the global readTimeout. + // readTimeout is 2s, but we will make the stream take roughly 3s overall. + // We throttle 10 bytes every 500ms. Inter-token gap is < 1s, so streamIdleTimeout is + // respected. + String sseBody = + "data: 1\n\n" + + "data: 2\n\n" + + "data: 3\n\n" + + "data: 4\n\n" + + "data: 5\n\n" + + "data: [DONE]\n\n"; + + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody(sseBody) + .throttleBody(10, 500, TimeUnit.MILLISECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/survive-timeout").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(transport.stream(request)) + .expectNextCount(5) // Should successfully receive all 5 data chunks + .verifyComplete(); + } } From 027ae27d60927cc79fb843a2d1db52d7021c3fd6 Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Sun, 24 May 2026 22:06:00 +0800 Subject: [PATCH 2/4] fix: review --- .../model/transport/HttpTransportConfig.java | 15 +- .../model/transport/JdkHttpTransport.java | 141 +++++++--- .../model/transport/JdkHttpTransportTest.java | 243 +++++++++++++++++- 3 files changed, 349 insertions(+), 50 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java index f73f7a171..35b7fbacf 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java @@ -28,14 +28,14 @@ public class HttpTransportConfig { /** Default connect timeout: 30 seconds. */ public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(30); + /** Default read timeout: 5 minutes (Overall timeout for non-streaming calls). */ + public static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(5); + /** Default response timeout (TTFT): 5 minutes (Time To First Token for streaming). */ public static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofMinutes(5); - /** Default stream idle timeout: 30 seconds (Maximum wait time between consecutive data chunks). */ - public static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = Duration.ofSeconds(30); - - /** Default read timeout: 5 minutes (Overall timeout for non-streaming calls). */ - public static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(5); + /** Default stream idle timeout: 5 minutes (maximum wait between consecutive data chunks). */ + public static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = DEFAULT_READ_TIMEOUT; /** Default write timeout: 30 seconds. */ public static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(30); @@ -92,7 +92,10 @@ public Duration getStreamIdleTimeout() { } /** - * Get the read timeout(for non-streaming). + * Get the read timeout. + * + *

For {@link JdkHttpTransport} streaming requests, response and idle timeouts are controlled + * by {@link #getResponseTimeout()} and {@link #getStreamIdleTimeout()} instead. * * @return the read timeout duration */ diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index 0ad4da3e1..d1f80b880 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -34,6 +34,7 @@ import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.X509Certificate; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; @@ -41,6 +42,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; @@ -195,15 +197,64 @@ public Flux stream(HttpRequest request) { var jdkRequest = buildJdkRequest(request, true); - // Use Mono.fromFuture() to ensure lazy execution and proper cancellation propagation. - // This prevents "ghost connections" from leaking if the downstream cancels or times out - // before headers arrive. - return Mono.fromFuture(() -> client.sendAsync(jdkRequest, BodyHandlers.ofInputStream())) - .flatMapMany( - response -> { - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - String errorBody = readInputStream(response.body()); + return Flux.defer( + () -> { + AtomicReference responseBody = new AtomicReference<>(); + return sendInputStreamAsync(jdkRequest, responseBody) + .timeout(Mono.delay(streamResponseTimeout())) + .flatMapMany( + response -> + handleStreamResponse(response, request, responseBody)) + .doFinally(signal -> closeQuietly(responseBody.getAndSet(null))) + .onErrorMap(this::mapStreamError); + }); + } + + private Mono> sendInputStreamAsync( + java.net.http.HttpRequest request, AtomicReference responseBody) { + return Mono.create( + sink -> { + AtomicBoolean cancelled = new AtomicBoolean(false); + var future = client.sendAsync(request, BodyHandlers.ofInputStream()); + sink.onCancel( + () -> { + cancelled.set(true); + future.cancel(true); + closeQuietly(responseBody.getAndSet(null)); + }); + future.whenComplete( + (response, error) -> { + if (error != null) { + if (!cancelled.get()) { + sink.error(error); + } + return; + } + + responseBody.set(response.body()); + if (cancelled.get()) { + closeQuietly(responseBody.getAndSet(null)); + return; + } + sink.success(response); + }); + }); + } + + private Flux handleStreamResponse( + java.net.http.HttpResponse response, + HttpRequest request, + AtomicReference responseBody) { + InputStream inputStream = response.body(); + responseBody.set(inputStream); + + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + return readInputStreamAsync(inputStream) + .timeout(streamResponseTimeout()) + .onErrorReturn("") + .flatMapMany( + errorBody -> { log.warn( "HTTP request failed. URL: {} | Status: {} | Error: {}", request.getUrl(), @@ -217,45 +268,21 @@ public Flux stream(HttpRequest request) { + errorBody, statusCode, errorBody)); - } - return processStreamResponse(response, request); - }) + }); + } + + return processStreamResponse(inputStream, request) .timeout( // Timeout strategy 1: Time To First Token (TTFT). - // The maximum time to wait for the first piece of data. - Mono.delay( - config.getResponseTimeout() != null - ? config.getResponseTimeout() - : HttpTransportConfig.DEFAULT_RESPONSE_TIMEOUT), + // The maximum time to wait for the first piece of data after headers. + Mono.delay(streamResponseTimeout()), // Timeout strategy 2: Inter-token gap (Stream Idle Timeout). // The maximum time to wait between receiving two consecutive data chunks. - data -> - Mono.delay( - config.getStreamIdleTimeout() != null - ? config.getStreamIdleTimeout() - : HttpTransportConfig.DEFAULT_STREAM_IDLE_TIMEOUT)) - .onErrorMap( - e -> { - if (e instanceof TimeoutException) { - return new HttpTransportException( - "Stream timeout: " + e.getMessage(), e); - } - if (e instanceof HttpTransportException) { - return e; - } - Throwable cause = e instanceof CompletionException ? e.getCause() : e; - if (cause instanceof HttpTransportException) { - return cause; - } - return new HttpTransportException( - "SSE/NDJSON stream failed: " + e.getMessage(), e); - }); + data -> Mono.delay(streamIdleTimeout())); } - private Flux processStreamResponse( - java.net.http.HttpResponse response, HttpRequest request) { - InputStream inputStream = response.body(); + private Flux processStreamResponse(InputStream inputStream, HttpRequest request) { if (inputStream == null) { return Flux.empty(); } @@ -401,6 +428,38 @@ private String readInputStream(InputStream inputStream) { } } + private Mono readInputStreamAsync(InputStream inputStream) { + return Mono.fromCallable(() -> readInputStream(inputStream)) + .defaultIfEmpty("") + .subscribeOn(Schedulers.boundedElastic()); + } + + private Duration streamResponseTimeout() { + return config.getResponseTimeout() != null + ? config.getResponseTimeout() + : HttpTransportConfig.DEFAULT_RESPONSE_TIMEOUT; + } + + private Duration streamIdleTimeout() { + return config.getStreamIdleTimeout() != null + ? config.getStreamIdleTimeout() + : HttpTransportConfig.DEFAULT_STREAM_IDLE_TIMEOUT; + } + + private Throwable mapStreamError(Throwable e) { + if (e instanceof TimeoutException) { + return new HttpTransportException("Stream timeout: " + e.getMessage(), e); + } + if (e instanceof HttpTransportException) { + return e; + } + Throwable cause = e instanceof CompletionException ? e.getCause() : e; + if (cause instanceof HttpTransportException) { + return cause; + } + return new HttpTransportException("SSE/NDJSON stream failed: " + e.getMessage(), e); + } + private void closeQuietly(AutoCloseable closeable) { if (closeable != null) { try { diff --git a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java index ed019f8f5..d4c1461c9 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java @@ -22,14 +22,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.io.InputStream; +import java.net.Authenticator; +import java.net.CookieHandler; +import java.net.ProxySelector; +import java.net.URI; import java.net.http.HttpClient; +import java.net.http.HttpClient.Redirect; +import java.net.http.HttpClient.Version; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLSession; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; @@ -1139,14 +1156,14 @@ void testStreamResponseTimeout() { @Test void testStreamIdleTimeout() { // Test Timeout Strategy 2 (Inter-token gap): - // Throttle body to emit 1 byte every 2 seconds. - // This exceeds the configured streamIdleTimeout (1 second). + // Emit the first complete event immediately, then delay the second event long enough to + // exceed streamIdleTimeout. mockServer.enqueue( new MockResponse() .setResponseCode(200) .setHeader("Content-Type", "text/event-stream") .setBody("data: {\"id\":\"1\"}\n\ndata: {\"id\":\"2\"}\n\n") - .throttleBody(1, 2, TimeUnit.SECONDS)); + .throttleBody(19, 2, TimeUnit.SECONDS)); HttpRequest request = HttpRequest.builder() @@ -1156,6 +1173,7 @@ void testStreamIdleTimeout() { .build(); StepVerifier.create(transport.stream(request)) + .expectNextMatches(data -> data.contains("\"id\":\"1\"")) .expectErrorMatches( e -> e instanceof HttpTransportException @@ -1163,6 +1181,45 @@ void testStreamIdleTimeout() { .verify(Duration.ofSeconds(5)); } + @Test + void testStreamTimeoutClosesBodyWhenFutureCompletesAfterCancellation() { + HttpTransportConfig customConfig = + HttpTransportConfig.builder() + .responseTimeout(Duration.ofMillis(100)) + .streamIdleTimeout(Duration.ofSeconds(1)) + .build(); + BlockingInputStream body = new BlockingInputStream(); + AtomicReference>> futureRef = + new AtomicReference<>(); + JdkHttpTransport customTransport = + new JdkHttpTransport(new DeferredBodyHttpClient(futureRef, body), customConfig); + + try { + HttpRequest request = + HttpRequest.builder() + .url("http://localhost/deferred-body") + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(customTransport.stream(request)) + .expectErrorMatches( + e -> + e instanceof HttpTransportException + && e.getMessage().contains("Stream timeout")) + .verify(Duration.ofSeconds(2)); + + CompletableFuture> future = futureRef.get(); + assertNotNull(future); + assertTrue(future.isCancelled(), "Timeout should cancel the pending async request"); + + future.complete(new TestHttpResponse(200, body)); + assertTrue(body.awaitClosed(), "Response body must be closed after late completion"); + } finally { + customTransport.close(); + } + } + @Test void testStreamSurvivesGlobalReadTimeout() { // Verify that streaming requests are NOT killed by the global readTimeout. @@ -1195,4 +1252,184 @@ void testStreamSurvivesGlobalReadTimeout() { .expectNextCount(5) // Should successfully receive all 5 data chunks .verifyComplete(); } + + private static class DeferredBodyHttpClient extends HttpClient { + private final AtomicReference>> + futureRef; + private final InputStream body; + + DeferredBodyHttpClient( + AtomicReference>> + futureRef, + InputStream body) { + this.futureRef = futureRef; + this.body = body; + } + + @Override + public Optional cookieHandler() { + return Optional.empty(); + } + + @Override + public Optional connectTimeout() { + return Optional.empty(); + } + + @Override + public Redirect followRedirects() { + return Redirect.NEVER; + } + + @Override + public Optional proxy() { + return Optional.empty(); + } + + @Override + public SSLContext sslContext() { + return null; + } + + @Override + public SSLParameters sslParameters() { + return new SSLParameters(); + } + + @Override + public Optional authenticator() { + return Optional.empty(); + } + + @Override + public Version version() { + return Version.HTTP_2; + } + + @Override + public Optional executor() { + return Optional.empty(); + } + + @Override + public java.net.http.HttpResponse send( + java.net.http.HttpRequest request, + java.net.http.HttpResponse.BodyHandler responseBodyHandler) { + throw new UnsupportedOperationException("send is not used in this test"); + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture> sendAsync( + java.net.http.HttpRequest request, + java.net.http.HttpResponse.BodyHandler responseBodyHandler) { + CompletableFuture> future = new LateCompletableFuture<>(); + futureRef.set( + (CompletableFuture>) + (CompletableFuture) future); + return future; + } + + @Override + public CompletableFuture> sendAsync( + java.net.http.HttpRequest request, + java.net.http.HttpResponse.BodyHandler responseBodyHandler, + java.net.http.HttpResponse.PushPromiseHandler pushPromiseHandler) { + return sendAsync(request, responseBodyHandler); + } + } + + private static class LateCompletableFuture extends CompletableFuture { + private final AtomicBoolean cancelled = new AtomicBoolean(false); + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + cancelled.set(true); + return true; + } + + @Override + public boolean isCancelled() { + return cancelled.get(); + } + } + + private static class TestHttpResponse implements java.net.http.HttpResponse { + private final int statusCode; + private final InputStream body; + + TestHttpResponse(int statusCode, InputStream body) { + this.statusCode = statusCode; + this.body = body; + } + + @Override + public int statusCode() { + return statusCode; + } + + @Override + public java.net.http.HttpRequest request() { + return null; + } + + @Override + public Optional> previousResponse() { + return Optional.empty(); + } + + @Override + public java.net.http.HttpHeaders headers() { + return java.net.http.HttpHeaders.of(Map.of(), (name, value) -> true); + } + + @Override + public InputStream body() { + return body; + } + + @Override + public Optional sslSession() { + return Optional.empty(); + } + + @Override + public URI uri() { + return URI.create("http://localhost/deferred-body"); + } + + @Override + public Version version() { + return Version.HTTP_2; + } + } + + private static class BlockingInputStream extends InputStream { + private final CountDownLatch closed = new CountDownLatch(1); + + @Override + public int read() { + return -1; + } + + @Override + public byte[] readAllBytes() { + return "closed".getBytes(StandardCharsets.UTF_8); + } + + @Override + public void close() throws IOException { + closed.countDown(); + super.close(); + } + + boolean awaitClosed() { + try { + return closed.await(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + } } From 5c0c10edcf079de6b0a8b70ebc57c48e680509b5 Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Mon, 25 May 2026 14:25:30 +0800 Subject: [PATCH 3/4] fix: the issue of responseTimeout being doubled --- .../model/transport/HttpTransportConfig.java | 12 ++++-- .../model/transport/JdkHttpTransport.java | 22 +++++++--- .../model/transport/JdkHttpTransportTest.java | 42 +++++++++++++++++-- 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java index 35b7fbacf..15d311fe9 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java @@ -31,7 +31,7 @@ public class HttpTransportConfig { /** Default read timeout: 5 minutes (Overall timeout for non-streaming calls). */ public static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(5); - /** Default response timeout (TTFT): 5 minutes (Time To First Token for streaming). */ + /** Default response timeout: 5 minutes (request start to first emitted streaming chunk). */ public static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofMinutes(5); /** Default stream idle timeout: 5 minutes (maximum wait between consecutive data chunks). */ @@ -74,7 +74,10 @@ public Duration getConnectTimeout() { } /** - * Get the response timeout (Time To First Token for streaming). + * Get the response timeout for streaming requests. + * + *

For {@link JdkHttpTransport} streaming requests, this bounds the time from request start + * until the first emitted SSE/NDJSON chunk. * * @return the response timeout duration */ @@ -206,7 +209,10 @@ public Builder connectTimeout(Duration connectTimeout) { } /** - * Set the response timeout (Time To First Byte). + * Set the response timeout for streaming requests. + * + *

For {@link JdkHttpTransport} streaming requests, this bounds the time from request + * start until the first emitted SSE/NDJSON chunk. * * @param responseTimeout the response timeout duration * @return this builder diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index d1f80b880..0d4684462 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -200,11 +200,16 @@ public Flux stream(HttpRequest request) { return Flux.defer( () -> { AtomicReference responseBody = new AtomicReference<>(); + long requestStartNanos = System.nanoTime(); return sendInputStreamAsync(jdkRequest, responseBody) .timeout(Mono.delay(streamResponseTimeout())) .flatMapMany( response -> - handleStreamResponse(response, request, responseBody)) + handleStreamResponse( + response, + request, + responseBody, + requestStartNanos)) .doFinally(signal -> closeQuietly(responseBody.getAndSet(null))) .onErrorMap(this::mapStreamError); }); @@ -244,7 +249,8 @@ private Mono> sendInputStreamAsync( private Flux handleStreamResponse( java.net.http.HttpResponse response, HttpRequest request, - AtomicReference responseBody) { + AtomicReference responseBody, + long requestStartNanos) { InputStream inputStream = response.body(); responseBody.set(inputStream); @@ -273,9 +279,9 @@ private Flux handleStreamResponse( return processStreamResponse(inputStream, request) .timeout( - // Timeout strategy 1: Time To First Token (TTFT). - // The maximum time to wait for the first piece of data after headers. - Mono.delay(streamResponseTimeout()), + // Timeout strategy 1: Time To First Chunk. + // This uses the remaining response timeout budget from request start. + Mono.delay(remainingResponseTimeout(requestStartNanos)), // Timeout strategy 2: Inter-token gap (Stream Idle Timeout). // The maximum time to wait between receiving two consecutive data chunks. @@ -440,6 +446,12 @@ private Duration streamResponseTimeout() { : HttpTransportConfig.DEFAULT_RESPONSE_TIMEOUT; } + private Duration remainingResponseTimeout(long requestStartNanos) { + Duration elapsed = Duration.ofNanos(System.nanoTime() - requestStartNanos); + Duration remaining = streamResponseTimeout().minus(elapsed); + return remaining.isNegative() || remaining.isZero() ? Duration.ZERO : remaining; + } + private Duration streamIdleTimeout() { return config.getStreamIdleTimeout() != null ? config.getStreamIdleTimeout() diff --git a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java index d4c1461c9..5c7bede2d 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java @@ -75,7 +75,7 @@ void setUp() throws Exception { HttpTransportConfig.builder() .connectTimeout(Duration.ofSeconds(5)) .readTimeout(Duration.ofSeconds(2)) // Global timeout for sync calls - .responseTimeout(Duration.ofSeconds(2)) // TTFT for streaming + .responseTimeout(Duration.ofSeconds(2)) // First emitted chunk for streaming .streamIdleTimeout(Duration.ofSeconds(1)) // Inter-token gap for streaming .build(); transport = new JdkHttpTransport(config); @@ -1094,7 +1094,7 @@ void testStreamColdStartSurvivesGlobalTimeout() throws Exception { HttpTransportConfig customConfig = HttpTransportConfig.builder() .readTimeout(Duration.ofSeconds(1)) // Very tight global timeout - .responseTimeout(Duration.ofSeconds(4)) // Ample Time-To-First-Token timeout + .responseTimeout(Duration.ofSeconds(4)) // Ample first-chunk timeout .streamIdleTimeout(Duration.ofSeconds(2)) .build(); @@ -1129,7 +1129,7 @@ void testStreamColdStartSurvivesGlobalTimeout() throws Exception { @Test void testStreamResponseTimeout() { - // Test Timeout Strategy 1 (TTFT): + // Test Timeout Strategy 1: // Delay headers by 3 seconds, which exceeds the configured responseTimeout (2 seconds). mockServer.enqueue( new MockResponse() @@ -1153,6 +1153,42 @@ void testStreamResponseTimeout() { .verify(Duration.ofSeconds(5)); } + @Test + void testStreamResponseTimeoutSpansRequestStartToFirstChunk() { + HttpTransportConfig customConfig = + HttpTransportConfig.builder() + .responseTimeout(Duration.ofSeconds(1)) + .streamIdleTimeout(Duration.ofSeconds(2)) + .build(); + JdkHttpTransport customTransport = new JdkHttpTransport(customConfig); + + try { + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody("data: {\"id\":\"late\"}\n\ndata: [DONE]\n\n") + .setHeadersDelay(600, TimeUnit.MILLISECONDS) + .setBodyDelay(700, TimeUnit.MILLISECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/first-chunk-budget").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(customTransport.stream(request)) + .expectErrorMatches( + e -> + e instanceof HttpTransportException + && e.getMessage().contains("Stream timeout")) + .verify(Duration.ofSeconds(3)); + } finally { + customTransport.close(); + } + } + @Test void testStreamIdleTimeout() { // Test Timeout Strategy 2 (Inter-token gap): From 6d73fa25b42c5beb1e95424d52b29f400469ba6c Mon Sep 17 00:00:00 2001 From: jujn <2087687391@qq.com> Date: Mon, 25 May 2026 15:57:09 +0800 Subject: [PATCH 4/4] fix: improve code comments --- .../model/transport/HttpTransportConfig.java | 36 ++++++++++++++----- .../model/transport/JdkHttpTransport.java | 30 ++++++++++++++++ .../core/model/transport/OkHttpTransport.java | 3 ++ .../model/transport/JdkHttpTransportTest.java | 36 ------------------- 4 files changed, 61 insertions(+), 44 deletions(-) diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java index 15d311fe9..90327989a 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java @@ -28,13 +28,16 @@ public class HttpTransportConfig { /** Default connect timeout: 30 seconds. */ public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(30); - /** Default read timeout: 5 minutes (Overall timeout for non-streaming calls). */ + /** Default read timeout: 5 minutes. */ public static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(5); - /** Default response timeout: 5 minutes (request start to first emitted streaming chunk). */ + /** + * Default streaming response timeout: 5 minutes (request start to first emitted streaming + * chunk). + */ public static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofMinutes(5); - /** Default stream idle timeout: 5 minutes (maximum wait between consecutive data chunks). */ + /** Default streaming idle timeout: 5 minutes (maximum wait between data chunks). */ public static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = DEFAULT_READ_TIMEOUT; /** Default write timeout: 30 seconds. */ @@ -77,7 +80,8 @@ public Duration getConnectTimeout() { * Get the response timeout for streaming requests. * *

For {@link JdkHttpTransport} streaming requests, this bounds the time from request start - * until the first emitted SSE/NDJSON chunk. + * until the first emitted SSE/NDJSON chunk. {@link OkHttpTransport} does not consume this + * option; it relies on OkHttp's {@link #getReadTimeout()} for stream reads. * * @return the response timeout duration */ @@ -86,7 +90,11 @@ public Duration getResponseTimeout() { } /** - * Get the stream idle timeout (maximum time between two consecutive data chunks). + * Get the stream idle timeout. + * + *

For {@link JdkHttpTransport} streaming requests, this bounds the maximum wait between two + * consecutive emitted SSE/NDJSON chunks. {@link OkHttpTransport} does not consume this option; + * it relies on OkHttp's {@link #getReadTimeout()} for stream reads. * * @return the stream idle timeout duration */ @@ -97,8 +105,10 @@ public Duration getStreamIdleTimeout() { /** * Get the read timeout. * - *

For {@link JdkHttpTransport} streaming requests, response and idle timeouts are controlled - * by {@link #getResponseTimeout()} and {@link #getStreamIdleTimeout()} instead. + *

{@link OkHttpTransport} applies this as OkHttp's read timeout for both standard and + * streaming requests. {@link JdkHttpTransport} applies this to non-streaming requests only; JDK + * streaming requests use {@link #getResponseTimeout()} and {@link #getStreamIdleTimeout()} + * instead. * * @return the read timeout duration */ @@ -212,7 +222,8 @@ public Builder connectTimeout(Duration connectTimeout) { * Set the response timeout for streaming requests. * *

For {@link JdkHttpTransport} streaming requests, this bounds the time from request - * start until the first emitted SSE/NDJSON chunk. + * start until the first emitted SSE/NDJSON chunk. {@link OkHttpTransport} does not consume + * this option; it relies on OkHttp's {@link #getReadTimeout()} for stream reads. * * @param responseTimeout the response timeout duration * @return this builder @@ -225,6 +236,10 @@ public Builder responseTimeout(Duration responseTimeout) { /** * Set the stream idle timeout. * + *

For {@link JdkHttpTransport} streaming requests, this bounds the maximum wait between + * two consecutive emitted SSE/NDJSON chunks. {@link OkHttpTransport} does not consume this + * option; it relies on OkHttp's {@link #getReadTimeout()} for stream reads. + * * @param streamIdleTimeout the stream idle timeout duration * @return this builder */ @@ -236,6 +251,11 @@ public Builder streamIdleTimeout(Duration streamIdleTimeout) { /** * Set the read timeout. * + *

{@link OkHttpTransport} applies this as OkHttp's read timeout for both standard and + * streaming requests. {@link JdkHttpTransport} applies this to non-streaming requests only; + * JDK streaming requests use {@link #getResponseTimeout()} and + * {@link #getStreamIdleTimeout()} instead. + * * @param readTimeout the read timeout duration * @return this builder */ diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index 0d4684462..ec1f85f6e 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -215,6 +215,9 @@ public Flux stream(HttpRequest request) { }); } + /** + * Send a streaming request and propagate Reactor cancellation to the JDK future/socket. + */ private Mono> sendInputStreamAsync( java.net.http.HttpRequest request, AtomicReference responseBody) { return Mono.create( @@ -246,6 +249,9 @@ private Mono> sendInputStreamAsync( }); } + /** + * Validate the streaming response and apply first-chunk and inter-chunk timeouts. + */ private Flux handleStreamResponse( java.net.http.HttpResponse response, HttpRequest request, @@ -288,6 +294,9 @@ private Flux handleStreamResponse( data -> Mono.delay(streamIdleTimeout())); } + /** + * Parse SSE or NDJSON lines on a bounded elastic worker because BufferedReader blocks. + */ private Flux processStreamResponse(InputStream inputStream, HttpRequest request) { if (inputStream == null) { return Flux.empty(); @@ -309,6 +318,9 @@ private Flux processStreamResponse(InputStream inputStream, HttpRequest .subscribeOn(Schedulers.boundedElastic()); } + /** + * Extract non-empty SSE data payloads and stop before the terminal marker. + */ private Flux readSseLines(BufferedReader reader) { return Flux.fromStream(reader.lines()) .filter(line -> line.startsWith(SSE_DATA_PREFIX)) @@ -318,6 +330,9 @@ private Flux readSseLines(BufferedReader reader) { .filter(data -> !data.isEmpty()); } + /** + * Extract non-empty NDJSON records as already-delimited stream chunks. + */ private Flux readNdJsonLines(BufferedReader reader) { return Flux.fromStream(reader.lines()) .doOnNext(line -> log.debug("Received NDJSON line")) @@ -434,30 +449,45 @@ private String readInputStream(InputStream inputStream) { } } + /** + * Read an error response body away from the JDK HTTP worker threads. + */ private Mono readInputStreamAsync(InputStream inputStream) { return Mono.fromCallable(() -> readInputStream(inputStream)) .defaultIfEmpty("") .subscribeOn(Schedulers.boundedElastic()); } + /** + * Resolve the configured request-start-to-first-chunk timeout. + */ private Duration streamResponseTimeout() { return config.getResponseTimeout() != null ? config.getResponseTimeout() : HttpTransportConfig.DEFAULT_RESPONSE_TIMEOUT; } + /** + * Keep the first-chunk timeout as one budget across header wait and body parsing. + */ private Duration remainingResponseTimeout(long requestStartNanos) { Duration elapsed = Duration.ofNanos(System.nanoTime() - requestStartNanos); Duration remaining = streamResponseTimeout().minus(elapsed); return remaining.isNegative() || remaining.isZero() ? Duration.ZERO : remaining; } + /** + * Resolve the configured maximum idle gap between emitted stream chunks. + */ private Duration streamIdleTimeout() { return config.getStreamIdleTimeout() != null ? config.getStreamIdleTimeout() : HttpTransportConfig.DEFAULT_STREAM_IDLE_TIMEOUT; } + /** + * Normalize low-level streaming failures into the transport exception type. + */ private Throwable mapStreamError(Throwable e) { if (e instanceof TimeoutException) { return new HttpTransportException("Stream timeout: " + e.getMessage(), e); diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java index b8a1e0d82..46117f13f 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java @@ -106,6 +106,9 @@ private OkHttpClient buildClient(HttpTransportConfig config) { new OkHttpClient.Builder() .connectTimeout( config.getConnectTimeout().toMillis(), TimeUnit.MILLISECONDS) + // OkHttp readTimeout is a socket-read idle timeout. It remains suitable + // for streaming reads and is distinct from JDK streaming response/idle + // timeouts in HttpTransportConfig. .readTimeout(config.getReadTimeout().toMillis(), TimeUnit.MILLISECONDS) .writeTimeout(config.getWriteTimeout().toMillis(), TimeUnit.MILLISECONDS) .connectionPool( diff --git a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java index 5c7bede2d..0f4aa1a4b 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java @@ -1153,42 +1153,6 @@ void testStreamResponseTimeout() { .verify(Duration.ofSeconds(5)); } - @Test - void testStreamResponseTimeoutSpansRequestStartToFirstChunk() { - HttpTransportConfig customConfig = - HttpTransportConfig.builder() - .responseTimeout(Duration.ofSeconds(1)) - .streamIdleTimeout(Duration.ofSeconds(2)) - .build(); - JdkHttpTransport customTransport = new JdkHttpTransport(customConfig); - - try { - mockServer.enqueue( - new MockResponse() - .setResponseCode(200) - .setHeader("Content-Type", "text/event-stream") - .setBody("data: {\"id\":\"late\"}\n\ndata: [DONE]\n\n") - .setHeadersDelay(600, TimeUnit.MILLISECONDS) - .setBodyDelay(700, TimeUnit.MILLISECONDS)); - - HttpRequest request = - HttpRequest.builder() - .url(mockServer.url("/first-chunk-budget").toString()) - .method("POST") - .body("{}") - .build(); - - StepVerifier.create(customTransport.stream(request)) - .expectErrorMatches( - e -> - e instanceof HttpTransportException - && e.getMessage().contains("Stream timeout")) - .verify(Duration.ofSeconds(3)); - } finally { - customTransport.close(); - } - } - @Test void testStreamIdleTimeout() { // Test Timeout Strategy 2 (Inter-token gap):