diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java
index a824ed34d..90327989a 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java
@@ -28,13 +28,24 @@ public class HttpTransportConfig {
/** Default connect timeout: 30 seconds. */
public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(30);
- /** Default read timeout: 5 minutes (for long-running model calls). */
+ /** Default read timeout: 5 minutes. */
public static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(5);
+ /**
+ * Default streaming response timeout: 5 minutes (request start to first emitted streaming
+ * chunk).
+ */
+ public static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofMinutes(5);
+
+ /** Default streaming idle timeout: 5 minutes (maximum wait between data chunks). */
+ public static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = DEFAULT_READ_TIMEOUT;
+
/** Default write timeout: 30 seconds. */
public static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(30);
private final Duration connectTimeout;
+ private final Duration responseTimeout;
+ private final Duration streamIdleTimeout;
private final Duration readTimeout;
private final Duration writeTimeout;
private final int maxIdleConnections;
@@ -45,6 +56,8 @@ public class HttpTransportConfig {
private HttpTransportConfig(Builder builder) {
this.connectTimeout = builder.connectTimeout;
+ this.responseTimeout = builder.responseTimeout;
+ this.streamIdleTimeout = builder.streamIdleTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.maxIdleConnections = builder.maxIdleConnections;
@@ -63,9 +76,40 @@ public Duration getConnectTimeout() {
return connectTimeout;
}
+ /**
+ * Get the response timeout for streaming requests.
+ *
+ *
For {@link JdkHttpTransport} streaming requests, this bounds the time from request start
+ * until the first emitted SSE/NDJSON chunk. {@link OkHttpTransport} does not consume this
+ * option; it relies on OkHttp's {@link #getReadTimeout()} for stream reads.
+ *
+ * @return the response timeout duration
+ */
+ public Duration getResponseTimeout() {
+ return responseTimeout;
+ }
+
+ /**
+ * Get the stream idle timeout.
+ *
+ *
For {@link JdkHttpTransport} streaming requests, this bounds the maximum wait between two
+ * consecutive emitted SSE/NDJSON chunks. {@link OkHttpTransport} does not consume this option;
+ * it relies on OkHttp's {@link #getReadTimeout()} for stream reads.
+ *
+ * @return the stream idle timeout duration
+ */
+ public Duration getStreamIdleTimeout() {
+ return streamIdleTimeout;
+ }
+
/**
* Get the read timeout.
*
+ *
{@link OkHttpTransport} applies this as OkHttp's read timeout for both standard and
+ * streaming requests. {@link JdkHttpTransport} applies this to non-streaming requests only; JDK
+ * streaming requests use {@link #getResponseTimeout()} and {@link #getStreamIdleTimeout()}
+ * instead.
+ *
* @return the read timeout duration
*/
public Duration getReadTimeout() {
@@ -153,6 +197,8 @@ public static HttpTransportConfig defaults() {
*/
public static class Builder {
private Duration connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+ private Duration responseTimeout = DEFAULT_RESPONSE_TIMEOUT;
+ private Duration streamIdleTimeout = DEFAULT_STREAM_IDLE_TIMEOUT;
private Duration readTimeout = DEFAULT_READ_TIMEOUT;
private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT;
private int maxIdleConnections = 5;
@@ -172,9 +218,44 @@ public Builder connectTimeout(Duration connectTimeout) {
return this;
}
+ /**
+ * Set the response timeout for streaming requests.
+ *
+ *
For {@link JdkHttpTransport} streaming requests, this bounds the time from request
+ * start until the first emitted SSE/NDJSON chunk. {@link OkHttpTransport} does not consume
+ * this option; it relies on OkHttp's {@link #getReadTimeout()} for stream reads.
+ *
+ * @param responseTimeout the response timeout duration
+ * @return this builder
+ */
+ public Builder responseTimeout(Duration responseTimeout) {
+ this.responseTimeout = responseTimeout;
+ return this;
+ }
+
+ /**
+ * Set the stream idle timeout.
+ *
+ *
For {@link JdkHttpTransport} streaming requests, this bounds the maximum wait between
+ * two consecutive emitted SSE/NDJSON chunks. {@link OkHttpTransport} does not consume this
+ * option; it relies on OkHttp's {@link #getReadTimeout()} for stream reads.
+ *
+ * @param streamIdleTimeout the stream idle timeout duration
+ * @return this builder
+ */
+ public Builder streamIdleTimeout(Duration streamIdleTimeout) {
+ this.streamIdleTimeout = streamIdleTimeout;
+ return this;
+ }
+
/**
* Set the read timeout.
*
+ *
{@link OkHttpTransport} applies this as OkHttp's read timeout for both standard and
+ * streaming requests. {@link JdkHttpTransport} applies this to non-streaming requests only;
+ * JDK streaming requests use {@link #getResponseTimeout()} and
+ * {@link #getStreamIdleTimeout()} instead.
+ *
* @param readTimeout the read timeout duration
* @return this builder
*/
diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java
index 5bb749e64..ec1f85f6e 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java
@@ -34,13 +34,15 @@
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
@@ -174,7 +176,7 @@ public HttpResponse execute(HttpRequest request) throws HttpTransportException {
throw new HttpTransportException("Transport has been closed");
}
- var jdkRequest = buildJdkRequest(request);
+ var jdkRequest = buildJdkRequest(request, false);
try {
var response = client.send(jdkRequest, BodyHandlers.ofString());
@@ -193,55 +195,109 @@ public Flux stream(HttpRequest request) {
return Flux.error(new HttpTransportException("Transport has been closed"));
}
- var jdkRequest = buildJdkRequest(request);
-
- // Check status code and read error body immediately when CompletableFuture completes
- // to avoid stream being closed before we can read it
- CompletableFuture> future =
- client.sendAsync(jdkRequest, BodyHandlers.ofInputStream())
- .thenApply(
- response -> {
- int statusCode = response.statusCode();
- if (statusCode < 200 || statusCode >= 300) {
- // Read error body immediately while stream is still open
- String errorBody = readInputStream(response.body());
- log.warn(
- "HTTP request failed. URL: {} | Status: {} | Error:"
- + " {}",
- request.getUrl(),
- statusCode,
- errorBody);
- throw new CompletionException(
- new HttpTransportException(
- "HTTP request failed with status "
- + statusCode
- + " | "
- + errorBody,
- statusCode,
- errorBody));
+ var jdkRequest = buildJdkRequest(request, true);
+
+ return Flux.defer(
+ () -> {
+ AtomicReference responseBody = new AtomicReference<>();
+ long requestStartNanos = System.nanoTime();
+ return sendInputStreamAsync(jdkRequest, responseBody)
+ .timeout(Mono.delay(streamResponseTimeout()))
+ .flatMapMany(
+ response ->
+ handleStreamResponse(
+ response,
+ request,
+ responseBody,
+ requestStartNanos))
+ .doFinally(signal -> closeQuietly(responseBody.getAndSet(null)))
+ .onErrorMap(this::mapStreamError);
+ });
+ }
+
+ /**
+ * Send a streaming request and propagate Reactor cancellation to the JDK future/socket.
+ */
+ private Mono> sendInputStreamAsync(
+ java.net.http.HttpRequest request, AtomicReference responseBody) {
+ return Mono.create(
+ sink -> {
+ AtomicBoolean cancelled = new AtomicBoolean(false);
+ var future = client.sendAsync(request, BodyHandlers.ofInputStream());
+ sink.onCancel(
+ () -> {
+ cancelled.set(true);
+ future.cancel(true);
+ closeQuietly(responseBody.getAndSet(null));
+ });
+ future.whenComplete(
+ (response, error) -> {
+ if (error != null) {
+ if (!cancelled.get()) {
+ sink.error(error);
}
- return response;
- });
-
- return Mono.fromCompletionStage(future)
- .flatMapMany(response -> processStreamResponse(response, request))
- .publishOn(Schedulers.boundedElastic())
- .onErrorMap(
- e -> !(e instanceof HttpTransportException),
- e -> {
- Throwable cause = e instanceof CompletionException ? e.getCause() : e;
- if (cause instanceof HttpTransportException) {
- return (HttpTransportException) cause;
- }
- return new HttpTransportException(
- "SSE/NDJSON stream failed: " + e.getMessage(), e);
- })
- .subscribeOn(Schedulers.boundedElastic());
+ return;
+ }
+
+ responseBody.set(response.body());
+ if (cancelled.get()) {
+ closeQuietly(responseBody.getAndSet(null));
+ return;
+ }
+ sink.success(response);
+ });
+ });
}
- private Flux processStreamResponse(
- java.net.http.HttpResponse response, HttpRequest request) {
+ /**
+ * Validate the streaming response and apply first-chunk and inter-chunk timeouts.
+ */
+ private Flux handleStreamResponse(
+ java.net.http.HttpResponse response,
+ HttpRequest request,
+ AtomicReference responseBody,
+ long requestStartNanos) {
InputStream inputStream = response.body();
+ responseBody.set(inputStream);
+
+ int statusCode = response.statusCode();
+ if (statusCode < 200 || statusCode >= 300) {
+ return readInputStreamAsync(inputStream)
+ .timeout(streamResponseTimeout())
+ .onErrorReturn("")
+ .flatMapMany(
+ errorBody -> {
+ log.warn(
+ "HTTP request failed. URL: {} | Status: {} | Error: {}",
+ request.getUrl(),
+ statusCode,
+ errorBody);
+ return Flux.error(
+ new HttpTransportException(
+ "HTTP request failed with status "
+ + statusCode
+ + " | "
+ + errorBody,
+ statusCode,
+ errorBody));
+ });
+ }
+
+ return processStreamResponse(inputStream, request)
+ .timeout(
+ // Timeout strategy 1: Time To First Chunk.
+ // This uses the remaining response timeout budget from request start.
+ Mono.delay(remainingResponseTimeout(requestStartNanos)),
+
+ // Timeout strategy 2: Inter-token gap (Stream Idle Timeout).
+ // The maximum time to wait between receiving two consecutive data chunks.
+ data -> Mono.delay(streamIdleTimeout()));
+ }
+
+ /**
+ * Parse SSE or NDJSON lines on a bounded elastic worker because BufferedReader blocks.
+ */
+ private Flux processStreamResponse(InputStream inputStream, HttpRequest request) {
if (inputStream == null) {
return Flux.empty();
}
@@ -253,13 +309,18 @@ private Flux processStreamResponse(
// Use Flux.using to manage resource lifecycle
return Flux.using(
- () ->
- new BufferedReader(
- new InputStreamReader(inputStream, StandardCharsets.UTF_8)),
- reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader),
- this::closeQuietly);
+ () ->
+ new BufferedReader(
+ new InputStreamReader(inputStream, StandardCharsets.UTF_8)),
+ reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader),
+ this::closeQuietly)
+ // reader.lines() uses blocking I/O internally
+ .subscribeOn(Schedulers.boundedElastic());
}
+ /**
+ * Extract non-empty SSE data payloads and stop before the terminal marker.
+ */
private Flux readSseLines(BufferedReader reader) {
return Flux.fromStream(reader.lines())
.filter(line -> line.startsWith(SSE_DATA_PREFIX))
@@ -269,6 +330,9 @@ private Flux readSseLines(BufferedReader reader) {
.filter(data -> !data.isEmpty());
}
+ /**
+ * Extract non-empty NDJSON records as already-delimited stream chunks.
+ */
private Flux readNdJsonLines(BufferedReader reader) {
return Flux.fromStream(reader.lines())
.doOnNext(line -> log.debug("Received NDJSON line"))
@@ -310,7 +374,7 @@ public boolean isClosed() {
return closed.get();
}
- private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) {
+ private java.net.http.HttpRequest buildJdkRequest(HttpRequest request, boolean isStreaming) {
URI uri;
try {
uri = URI.create(request.getUrl());
@@ -318,8 +382,11 @@ private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) {
throw new HttpTransportException("Invalid URL: " + request.getUrl(), e);
}
- var builder =
- java.net.http.HttpRequest.newBuilder().uri(uri).timeout(config.getReadTimeout());
+ var builder = java.net.http.HttpRequest.newBuilder().uri(uri);
+
+ if (!isStreaming && config.getReadTimeout() != null) {
+ builder.timeout(config.getReadTimeout());
+ }
for (Map.Entry header : request.getHeaders().entrySet()) {
builder.header(header.getKey(), header.getValue());
@@ -382,6 +449,59 @@ private String readInputStream(InputStream inputStream) {
}
}
+ /**
+ * Read an error response body away from the JDK HTTP worker threads.
+ */
+ private Mono readInputStreamAsync(InputStream inputStream) {
+ return Mono.fromCallable(() -> readInputStream(inputStream))
+ .defaultIfEmpty("")
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ /**
+ * Resolve the configured request-start-to-first-chunk timeout.
+ */
+ private Duration streamResponseTimeout() {
+ return config.getResponseTimeout() != null
+ ? config.getResponseTimeout()
+ : HttpTransportConfig.DEFAULT_RESPONSE_TIMEOUT;
+ }
+
+ /**
+ * Keep the first-chunk timeout as one budget across header wait and body parsing.
+ */
+ private Duration remainingResponseTimeout(long requestStartNanos) {
+ Duration elapsed = Duration.ofNanos(System.nanoTime() - requestStartNanos);
+ Duration remaining = streamResponseTimeout().minus(elapsed);
+ return remaining.isNegative() || remaining.isZero() ? Duration.ZERO : remaining;
+ }
+
+ /**
+ * Resolve the configured maximum idle gap between emitted stream chunks.
+ */
+ private Duration streamIdleTimeout() {
+ return config.getStreamIdleTimeout() != null
+ ? config.getStreamIdleTimeout()
+ : HttpTransportConfig.DEFAULT_STREAM_IDLE_TIMEOUT;
+ }
+
+ /**
+ * Normalize low-level streaming failures into the transport exception type.
+ */
+ private Throwable mapStreamError(Throwable e) {
+ if (e instanceof TimeoutException) {
+ return new HttpTransportException("Stream timeout: " + e.getMessage(), e);
+ }
+ if (e instanceof HttpTransportException) {
+ return e;
+ }
+ Throwable cause = e instanceof CompletionException ? e.getCause() : e;
+ if (cause instanceof HttpTransportException) {
+ return cause;
+ }
+ return new HttpTransportException("SSE/NDJSON stream failed: " + e.getMessage(), e);
+ }
+
private void closeQuietly(AutoCloseable closeable) {
if (closeable != null) {
try {
diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java
index b8a1e0d82..46117f13f 100644
--- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java
+++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java
@@ -106,6 +106,9 @@ private OkHttpClient buildClient(HttpTransportConfig config) {
new OkHttpClient.Builder()
.connectTimeout(
config.getConnectTimeout().toMillis(), TimeUnit.MILLISECONDS)
+ // OkHttp readTimeout is a socket-read idle timeout. It remains suitable
+ // for streaming reads and is distinct from JDK streaming response/idle
+ // timeouts in HttpTransportConfig.
.readTimeout(config.getReadTimeout().toMillis(), TimeUnit.MILLISECONDS)
.writeTimeout(config.getWriteTimeout().toMillis(), TimeUnit.MILLISECONDS)
.connectionPool(
diff --git a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java
index 0f0695142..0f4aa1a4b 100644
--- a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java
+++ b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java
@@ -22,14 +22,31 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Authenticator;
+import java.net.CookieHandler;
+import java.net.ProxySelector;
+import java.net.URI;
import java.net.http.HttpClient;
+import java.net.http.HttpClient.Redirect;
+import java.net.http.HttpClient.Version;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLSession;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
@@ -57,7 +74,9 @@ void setUp() throws Exception {
HttpTransportConfig config =
HttpTransportConfig.builder()
.connectTimeout(Duration.ofSeconds(5))
- .readTimeout(Duration.ofSeconds(10))
+ .readTimeout(Duration.ofSeconds(2)) // Global timeout for sync calls
+ .responseTimeout(Duration.ofSeconds(2)) // First emitted chunk for streaming
+ .streamIdleTimeout(Duration.ofSeconds(1)) // Inter-token gap for streaming
.build();
transport = new JdkHttpTransport(config);
}
@@ -304,6 +323,8 @@ void testJdkHttpTransportBuilder() {
HttpTransportConfig.builder()
.connectTimeout(Duration.ofSeconds(10))
.readTimeout(Duration.ofSeconds(30))
+ .responseTimeout(Duration.ofSeconds(45))
+ .streamIdleTimeout(Duration.ofSeconds(15))
.build();
JdkHttpTransport builtTransport = JdkHttpTransport.builder().config(config).build();
@@ -311,6 +332,8 @@ void testJdkHttpTransportBuilder() {
assertNotNull(builtTransport);
assertNotNull(builtTransport.getClient());
assertEquals(config, builtTransport.getConfig());
+ assertEquals(Duration.ofSeconds(45), builtTransport.getConfig().getResponseTimeout());
+ assertEquals(Duration.ofSeconds(15), builtTransport.getConfig().getStreamIdleTimeout());
assertFalse(builtTransport.isClosed());
builtTransport.close();
assertTrue(builtTransport.isClosed());
@@ -1063,4 +1086,350 @@ void testHttpVersionConfig() {
assertNotNull(jdkHttpTransport);
assertNotNull(jdkHttpTransport2);
}
+
+ @Test
+ void testStreamColdStartSurvivesGlobalTimeout() throws Exception {
+ // Reproduces the bug reported in the issue 1302
+
+ HttpTransportConfig customConfig =
+ HttpTransportConfig.builder()
+ .readTimeout(Duration.ofSeconds(1)) // Very tight global timeout
+ .responseTimeout(Duration.ofSeconds(4)) // Ample first-chunk timeout
+ .streamIdleTimeout(Duration.ofSeconds(2))
+ .build();
+
+ JdkHttpTransport customTransport = new JdkHttpTransport(customConfig);
+
+ try {
+ // Simulate the cold start overhead + LLM thinking time by delaying headers for 2
+ // seconds.
+ mockServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "text/event-stream")
+ .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n")
+ .setHeadersDelay(2, TimeUnit.SECONDS));
+
+ HttpRequest request =
+ HttpRequest.builder()
+ .url(mockServer.url("/cold-start-bug-reproduction").toString())
+ .method("POST")
+ .body("{}")
+ .build();
+
+ // The test succeeds ONLY if the stream survives the 2-second initial delay
+ // without being killed by the 1-second global readTimeout.
+ StepVerifier.create(customTransport.stream(request))
+ .expectNextMatches(data -> data.contains("\"id\":\"1\""))
+ .verifyComplete();
+ } finally {
+ customTransport.close();
+ }
+ }
+
+ @Test
+ void testStreamResponseTimeout() {
+ // Test Timeout Strategy 1:
+ // Delay headers by 3 seconds, which exceeds the configured responseTimeout (2 seconds).
+ mockServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n")
+ .setHeader("Content-Type", "text/event-stream")
+ .setHeadersDelay(3, TimeUnit.SECONDS));
+
+ HttpRequest request =
+ HttpRequest.builder()
+ .url(mockServer.url("/ttft-timeout").toString())
+ .method("POST")
+ .body("{}")
+ .build();
+
+ StepVerifier.create(transport.stream(request))
+ .expectErrorMatches(
+ e ->
+ e instanceof HttpTransportException
+ && e.getMessage().contains("Stream timeout"))
+ .verify(Duration.ofSeconds(5));
+ }
+
+ @Test
+ void testStreamIdleTimeout() {
+ // Test Timeout Strategy 2 (Inter-token gap):
+ // Emit the first complete event immediately, then delay the second event long enough to
+ // exceed streamIdleTimeout.
+ mockServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "text/event-stream")
+ .setBody("data: {\"id\":\"1\"}\n\ndata: {\"id\":\"2\"}\n\n")
+ .throttleBody(19, 2, TimeUnit.SECONDS));
+
+ HttpRequest request =
+ HttpRequest.builder()
+ .url(mockServer.url("/idle-timeout").toString())
+ .method("POST")
+ .body("{}")
+ .build();
+
+ StepVerifier.create(transport.stream(request))
+ .expectNextMatches(data -> data.contains("\"id\":\"1\""))
+ .expectErrorMatches(
+ e ->
+ e instanceof HttpTransportException
+ && e.getMessage().contains("Stream timeout"))
+ .verify(Duration.ofSeconds(5));
+ }
+
+ @Test
+ void testStreamTimeoutClosesBodyWhenFutureCompletesAfterCancellation() {
+ HttpTransportConfig customConfig =
+ HttpTransportConfig.builder()
+ .responseTimeout(Duration.ofMillis(100))
+ .streamIdleTimeout(Duration.ofSeconds(1))
+ .build();
+ BlockingInputStream body = new BlockingInputStream();
+ AtomicReference>> futureRef =
+ new AtomicReference<>();
+ JdkHttpTransport customTransport =
+ new JdkHttpTransport(new DeferredBodyHttpClient(futureRef, body), customConfig);
+
+ try {
+ HttpRequest request =
+ HttpRequest.builder()
+ .url("http://localhost/deferred-body")
+ .method("POST")
+ .body("{}")
+ .build();
+
+ StepVerifier.create(customTransport.stream(request))
+ .expectErrorMatches(
+ e ->
+ e instanceof HttpTransportException
+ && e.getMessage().contains("Stream timeout"))
+ .verify(Duration.ofSeconds(2));
+
+ CompletableFuture> future = futureRef.get();
+ assertNotNull(future);
+ assertTrue(future.isCancelled(), "Timeout should cancel the pending async request");
+
+ future.complete(new TestHttpResponse(200, body));
+ assertTrue(body.awaitClosed(), "Response body must be closed after late completion");
+ } finally {
+ customTransport.close();
+ }
+ }
+
+ @Test
+ void testStreamSurvivesGlobalReadTimeout() {
+ // Verify that streaming requests are NOT killed by the global readTimeout.
+ // readTimeout is 2s, but we will make the stream take roughly 3s overall.
+ // We throttle 10 bytes every 500ms. Inter-token gap is < 1s, so streamIdleTimeout is
+ // respected.
+ String sseBody =
+ "data: 1\n\n"
+ + "data: 2\n\n"
+ + "data: 3\n\n"
+ + "data: 4\n\n"
+ + "data: 5\n\n"
+ + "data: [DONE]\n\n";
+
+ mockServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "text/event-stream")
+ .setBody(sseBody)
+ .throttleBody(10, 500, TimeUnit.MILLISECONDS));
+
+ HttpRequest request =
+ HttpRequest.builder()
+ .url(mockServer.url("/survive-timeout").toString())
+ .method("POST")
+ .body("{}")
+ .build();
+
+ StepVerifier.create(transport.stream(request))
+ .expectNextCount(5) // Should successfully receive all 5 data chunks
+ .verifyComplete();
+ }
+
+ private static class DeferredBodyHttpClient extends HttpClient {
+ private final AtomicReference>>
+ futureRef;
+ private final InputStream body;
+
+ DeferredBodyHttpClient(
+ AtomicReference>>
+ futureRef,
+ InputStream body) {
+ this.futureRef = futureRef;
+ this.body = body;
+ }
+
+ @Override
+ public Optional cookieHandler() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional connectTimeout() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Redirect followRedirects() {
+ return Redirect.NEVER;
+ }
+
+ @Override
+ public Optional proxy() {
+ return Optional.empty();
+ }
+
+ @Override
+ public SSLContext sslContext() {
+ return null;
+ }
+
+ @Override
+ public SSLParameters sslParameters() {
+ return new SSLParameters();
+ }
+
+ @Override
+ public Optional authenticator() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Version version() {
+ return Version.HTTP_2;
+ }
+
+ @Override
+ public Optional executor() {
+ return Optional.empty();
+ }
+
+ @Override
+ public java.net.http.HttpResponse send(
+ java.net.http.HttpRequest request,
+ java.net.http.HttpResponse.BodyHandler responseBodyHandler) {
+ throw new UnsupportedOperationException("send is not used in this test");
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture> sendAsync(
+ java.net.http.HttpRequest request,
+ java.net.http.HttpResponse.BodyHandler responseBodyHandler) {
+ CompletableFuture> future = new LateCompletableFuture<>();
+ futureRef.set(
+ (CompletableFuture>)
+ (CompletableFuture>) future);
+ return future;
+ }
+
+ @Override
+ public CompletableFuture> sendAsync(
+ java.net.http.HttpRequest request,
+ java.net.http.HttpResponse.BodyHandler responseBodyHandler,
+ java.net.http.HttpResponse.PushPromiseHandler pushPromiseHandler) {
+ return sendAsync(request, responseBodyHandler);
+ }
+ }
+
+ private static class LateCompletableFuture extends CompletableFuture {
+ private final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ cancelled.set(true);
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled.get();
+ }
+ }
+
+ private static class TestHttpResponse implements java.net.http.HttpResponse {
+ private final int statusCode;
+ private final InputStream body;
+
+ TestHttpResponse(int statusCode, InputStream body) {
+ this.statusCode = statusCode;
+ this.body = body;
+ }
+
+ @Override
+ public int statusCode() {
+ return statusCode;
+ }
+
+ @Override
+ public java.net.http.HttpRequest request() {
+ return null;
+ }
+
+ @Override
+ public Optional> previousResponse() {
+ return Optional.empty();
+ }
+
+ @Override
+ public java.net.http.HttpHeaders headers() {
+ return java.net.http.HttpHeaders.of(Map.of(), (name, value) -> true);
+ }
+
+ @Override
+ public InputStream body() {
+ return body;
+ }
+
+ @Override
+ public Optional sslSession() {
+ return Optional.empty();
+ }
+
+ @Override
+ public URI uri() {
+ return URI.create("http://localhost/deferred-body");
+ }
+
+ @Override
+ public Version version() {
+ return Version.HTTP_2;
+ }
+ }
+
+ private static class BlockingInputStream extends InputStream {
+ private final CountDownLatch closed = new CountDownLatch(1);
+
+ @Override
+ public int read() {
+ return -1;
+ }
+
+ @Override
+ public byte[] readAllBytes() {
+ return "closed".getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed.countDown();
+ super.close();
+ }
+
+ boolean awaitClosed() {
+ try {
+ return closed.await(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
}