diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java index a824ed34d..90327989a 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportConfig.java @@ -28,13 +28,24 @@ public class HttpTransportConfig { /** Default connect timeout: 30 seconds. */ public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(30); - /** Default read timeout: 5 minutes (for long-running model calls). */ + /** Default read timeout: 5 minutes. */ public static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(5); + /** + * Default streaming response timeout: 5 minutes (request start to first emitted streaming + * chunk). + */ + public static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofMinutes(5); + + /** Default streaming idle timeout: 5 minutes (maximum wait between data chunks). */ + public static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = DEFAULT_READ_TIMEOUT; + /** Default write timeout: 30 seconds. */ public static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(30); private final Duration connectTimeout; + private final Duration responseTimeout; + private final Duration streamIdleTimeout; private final Duration readTimeout; private final Duration writeTimeout; private final int maxIdleConnections; @@ -45,6 +56,8 @@ public class HttpTransportConfig { private HttpTransportConfig(Builder builder) { this.connectTimeout = builder.connectTimeout; + this.responseTimeout = builder.responseTimeout; + this.streamIdleTimeout = builder.streamIdleTimeout; this.readTimeout = builder.readTimeout; this.writeTimeout = builder.writeTimeout; this.maxIdleConnections = builder.maxIdleConnections; @@ -63,9 +76,40 @@ public Duration getConnectTimeout() { return connectTimeout; } + /** + * Get the response timeout for streaming requests. + * + *

For {@link JdkHttpTransport} streaming requests, this bounds the time from request start + * until the first emitted SSE/NDJSON chunk. {@link OkHttpTransport} does not consume this + * option; it relies on OkHttp's {@link #getReadTimeout()} for stream reads. + * + * @return the response timeout duration + */ + public Duration getResponseTimeout() { + return responseTimeout; + } + + /** + * Get the stream idle timeout. + * + *

For {@link JdkHttpTransport} streaming requests, this bounds the maximum wait between two + * consecutive emitted SSE/NDJSON chunks. {@link OkHttpTransport} does not consume this option; + * it relies on OkHttp's {@link #getReadTimeout()} for stream reads. + * + * @return the stream idle timeout duration + */ + public Duration getStreamIdleTimeout() { + return streamIdleTimeout; + } + /** * Get the read timeout. * + *

{@link OkHttpTransport} applies this as OkHttp's read timeout for both standard and + * streaming requests. {@link JdkHttpTransport} applies this to non-streaming requests only; JDK + * streaming requests use {@link #getResponseTimeout()} and {@link #getStreamIdleTimeout()} + * instead. + * * @return the read timeout duration */ public Duration getReadTimeout() { @@ -153,6 +197,8 @@ public static HttpTransportConfig defaults() { */ public static class Builder { private Duration connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private Duration responseTimeout = DEFAULT_RESPONSE_TIMEOUT; + private Duration streamIdleTimeout = DEFAULT_STREAM_IDLE_TIMEOUT; private Duration readTimeout = DEFAULT_READ_TIMEOUT; private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT; private int maxIdleConnections = 5; @@ -172,9 +218,44 @@ public Builder connectTimeout(Duration connectTimeout) { return this; } + /** + * Set the response timeout for streaming requests. + * + *

For {@link JdkHttpTransport} streaming requests, this bounds the time from request + * start until the first emitted SSE/NDJSON chunk. {@link OkHttpTransport} does not consume + * this option; it relies on OkHttp's {@link #getReadTimeout()} for stream reads. + * + * @param responseTimeout the response timeout duration + * @return this builder + */ + public Builder responseTimeout(Duration responseTimeout) { + this.responseTimeout = responseTimeout; + return this; + } + + /** + * Set the stream idle timeout. + * + *

For {@link JdkHttpTransport} streaming requests, this bounds the maximum wait between + * two consecutive emitted SSE/NDJSON chunks. {@link OkHttpTransport} does not consume this + * option; it relies on OkHttp's {@link #getReadTimeout()} for stream reads. + * + * @param streamIdleTimeout the stream idle timeout duration + * @return this builder + */ + public Builder streamIdleTimeout(Duration streamIdleTimeout) { + this.streamIdleTimeout = streamIdleTimeout; + return this; + } + /** * Set the read timeout. * + *

{@link OkHttpTransport} applies this as OkHttp's read timeout for both standard and + * streaming requests. {@link JdkHttpTransport} applies this to non-streaming requests only; + * JDK streaming requests use {@link #getResponseTimeout()} and + * {@link #getStreamIdleTimeout()} instead. + * * @param readTimeout the read timeout duration * @return this builder */ diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java index 5bb749e64..ec1f85f6e 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/JdkHttpTransport.java @@ -34,13 +34,15 @@ import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.security.cert.X509Certificate; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; @@ -174,7 +176,7 @@ public HttpResponse execute(HttpRequest request) throws HttpTransportException { throw new HttpTransportException("Transport has been closed"); } - var jdkRequest = buildJdkRequest(request); + var jdkRequest = buildJdkRequest(request, false); try { var response = client.send(jdkRequest, BodyHandlers.ofString()); @@ -193,55 +195,109 @@ public Flux stream(HttpRequest request) { return Flux.error(new HttpTransportException("Transport has been closed")); } - var jdkRequest = buildJdkRequest(request); - - // Check status code and read error body immediately when CompletableFuture completes - // to avoid stream being closed before we can read it - CompletableFuture> future = - client.sendAsync(jdkRequest, BodyHandlers.ofInputStream()) - .thenApply( - response -> { - int statusCode = response.statusCode(); - if (statusCode < 200 || statusCode >= 300) { - // Read error body immediately while stream is still open - String errorBody = readInputStream(response.body()); - log.warn( - "HTTP request failed. URL: {} | Status: {} | Error:" - + " {}", - request.getUrl(), - statusCode, - errorBody); - throw new CompletionException( - new HttpTransportException( - "HTTP request failed with status " - + statusCode - + " | " - + errorBody, - statusCode, - errorBody)); + var jdkRequest = buildJdkRequest(request, true); + + return Flux.defer( + () -> { + AtomicReference responseBody = new AtomicReference<>(); + long requestStartNanos = System.nanoTime(); + return sendInputStreamAsync(jdkRequest, responseBody) + .timeout(Mono.delay(streamResponseTimeout())) + .flatMapMany( + response -> + handleStreamResponse( + response, + request, + responseBody, + requestStartNanos)) + .doFinally(signal -> closeQuietly(responseBody.getAndSet(null))) + .onErrorMap(this::mapStreamError); + }); + } + + /** + * Send a streaming request and propagate Reactor cancellation to the JDK future/socket. + */ + private Mono> sendInputStreamAsync( + java.net.http.HttpRequest request, AtomicReference responseBody) { + return Mono.create( + sink -> { + AtomicBoolean cancelled = new AtomicBoolean(false); + var future = client.sendAsync(request, BodyHandlers.ofInputStream()); + sink.onCancel( + () -> { + cancelled.set(true); + future.cancel(true); + closeQuietly(responseBody.getAndSet(null)); + }); + future.whenComplete( + (response, error) -> { + if (error != null) { + if (!cancelled.get()) { + sink.error(error); } - return response; - }); - - return Mono.fromCompletionStage(future) - .flatMapMany(response -> processStreamResponse(response, request)) - .publishOn(Schedulers.boundedElastic()) - .onErrorMap( - e -> !(e instanceof HttpTransportException), - e -> { - Throwable cause = e instanceof CompletionException ? e.getCause() : e; - if (cause instanceof HttpTransportException) { - return (HttpTransportException) cause; - } - return new HttpTransportException( - "SSE/NDJSON stream failed: " + e.getMessage(), e); - }) - .subscribeOn(Schedulers.boundedElastic()); + return; + } + + responseBody.set(response.body()); + if (cancelled.get()) { + closeQuietly(responseBody.getAndSet(null)); + return; + } + sink.success(response); + }); + }); } - private Flux processStreamResponse( - java.net.http.HttpResponse response, HttpRequest request) { + /** + * Validate the streaming response and apply first-chunk and inter-chunk timeouts. + */ + private Flux handleStreamResponse( + java.net.http.HttpResponse response, + HttpRequest request, + AtomicReference responseBody, + long requestStartNanos) { InputStream inputStream = response.body(); + responseBody.set(inputStream); + + int statusCode = response.statusCode(); + if (statusCode < 200 || statusCode >= 300) { + return readInputStreamAsync(inputStream) + .timeout(streamResponseTimeout()) + .onErrorReturn("") + .flatMapMany( + errorBody -> { + log.warn( + "HTTP request failed. URL: {} | Status: {} | Error: {}", + request.getUrl(), + statusCode, + errorBody); + return Flux.error( + new HttpTransportException( + "HTTP request failed with status " + + statusCode + + " | " + + errorBody, + statusCode, + errorBody)); + }); + } + + return processStreamResponse(inputStream, request) + .timeout( + // Timeout strategy 1: Time To First Chunk. + // This uses the remaining response timeout budget from request start. + Mono.delay(remainingResponseTimeout(requestStartNanos)), + + // Timeout strategy 2: Inter-token gap (Stream Idle Timeout). + // The maximum time to wait between receiving two consecutive data chunks. + data -> Mono.delay(streamIdleTimeout())); + } + + /** + * Parse SSE or NDJSON lines on a bounded elastic worker because BufferedReader blocks. + */ + private Flux processStreamResponse(InputStream inputStream, HttpRequest request) { if (inputStream == null) { return Flux.empty(); } @@ -253,13 +309,18 @@ private Flux processStreamResponse( // Use Flux.using to manage resource lifecycle return Flux.using( - () -> - new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.UTF_8)), - reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader), - this::closeQuietly); + () -> + new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8)), + reader -> isNdjson ? readNdJsonLines(reader) : readSseLines(reader), + this::closeQuietly) + // reader.lines() uses blocking I/O internally + .subscribeOn(Schedulers.boundedElastic()); } + /** + * Extract non-empty SSE data payloads and stop before the terminal marker. + */ private Flux readSseLines(BufferedReader reader) { return Flux.fromStream(reader.lines()) .filter(line -> line.startsWith(SSE_DATA_PREFIX)) @@ -269,6 +330,9 @@ private Flux readSseLines(BufferedReader reader) { .filter(data -> !data.isEmpty()); } + /** + * Extract non-empty NDJSON records as already-delimited stream chunks. + */ private Flux readNdJsonLines(BufferedReader reader) { return Flux.fromStream(reader.lines()) .doOnNext(line -> log.debug("Received NDJSON line")) @@ -310,7 +374,7 @@ public boolean isClosed() { return closed.get(); } - private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) { + private java.net.http.HttpRequest buildJdkRequest(HttpRequest request, boolean isStreaming) { URI uri; try { uri = URI.create(request.getUrl()); @@ -318,8 +382,11 @@ private java.net.http.HttpRequest buildJdkRequest(HttpRequest request) { throw new HttpTransportException("Invalid URL: " + request.getUrl(), e); } - var builder = - java.net.http.HttpRequest.newBuilder().uri(uri).timeout(config.getReadTimeout()); + var builder = java.net.http.HttpRequest.newBuilder().uri(uri); + + if (!isStreaming && config.getReadTimeout() != null) { + builder.timeout(config.getReadTimeout()); + } for (Map.Entry header : request.getHeaders().entrySet()) { builder.header(header.getKey(), header.getValue()); @@ -382,6 +449,59 @@ private String readInputStream(InputStream inputStream) { } } + /** + * Read an error response body away from the JDK HTTP worker threads. + */ + private Mono readInputStreamAsync(InputStream inputStream) { + return Mono.fromCallable(() -> readInputStream(inputStream)) + .defaultIfEmpty("") + .subscribeOn(Schedulers.boundedElastic()); + } + + /** + * Resolve the configured request-start-to-first-chunk timeout. + */ + private Duration streamResponseTimeout() { + return config.getResponseTimeout() != null + ? config.getResponseTimeout() + : HttpTransportConfig.DEFAULT_RESPONSE_TIMEOUT; + } + + /** + * Keep the first-chunk timeout as one budget across header wait and body parsing. + */ + private Duration remainingResponseTimeout(long requestStartNanos) { + Duration elapsed = Duration.ofNanos(System.nanoTime() - requestStartNanos); + Duration remaining = streamResponseTimeout().minus(elapsed); + return remaining.isNegative() || remaining.isZero() ? Duration.ZERO : remaining; + } + + /** + * Resolve the configured maximum idle gap between emitted stream chunks. + */ + private Duration streamIdleTimeout() { + return config.getStreamIdleTimeout() != null + ? config.getStreamIdleTimeout() + : HttpTransportConfig.DEFAULT_STREAM_IDLE_TIMEOUT; + } + + /** + * Normalize low-level streaming failures into the transport exception type. + */ + private Throwable mapStreamError(Throwable e) { + if (e instanceof TimeoutException) { + return new HttpTransportException("Stream timeout: " + e.getMessage(), e); + } + if (e instanceof HttpTransportException) { + return e; + } + Throwable cause = e instanceof CompletionException ? e.getCause() : e; + if (cause instanceof HttpTransportException) { + return cause; + } + return new HttpTransportException("SSE/NDJSON stream failed: " + e.getMessage(), e); + } + private void closeQuietly(AutoCloseable closeable) { if (closeable != null) { try { diff --git a/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java b/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java index b8a1e0d82..46117f13f 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java +++ b/agentscope-core/src/main/java/io/agentscope/core/model/transport/OkHttpTransport.java @@ -106,6 +106,9 @@ private OkHttpClient buildClient(HttpTransportConfig config) { new OkHttpClient.Builder() .connectTimeout( config.getConnectTimeout().toMillis(), TimeUnit.MILLISECONDS) + // OkHttp readTimeout is a socket-read idle timeout. It remains suitable + // for streaming reads and is distinct from JDK streaming response/idle + // timeouts in HttpTransportConfig. .readTimeout(config.getReadTimeout().toMillis(), TimeUnit.MILLISECONDS) .writeTimeout(config.getWriteTimeout().toMillis(), TimeUnit.MILLISECONDS) .connectionPool( diff --git a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java index 0f0695142..0f4aa1a4b 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/model/transport/JdkHttpTransportTest.java @@ -22,14 +22,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.io.InputStream; +import java.net.Authenticator; +import java.net.CookieHandler; +import java.net.ProxySelector; +import java.net.URI; import java.net.http.HttpClient; +import java.net.http.HttpClient.Redirect; +import java.net.http.HttpClient.Version; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLSession; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; @@ -57,7 +74,9 @@ void setUp() throws Exception { HttpTransportConfig config = HttpTransportConfig.builder() .connectTimeout(Duration.ofSeconds(5)) - .readTimeout(Duration.ofSeconds(10)) + .readTimeout(Duration.ofSeconds(2)) // Global timeout for sync calls + .responseTimeout(Duration.ofSeconds(2)) // First emitted chunk for streaming + .streamIdleTimeout(Duration.ofSeconds(1)) // Inter-token gap for streaming .build(); transport = new JdkHttpTransport(config); } @@ -304,6 +323,8 @@ void testJdkHttpTransportBuilder() { HttpTransportConfig.builder() .connectTimeout(Duration.ofSeconds(10)) .readTimeout(Duration.ofSeconds(30)) + .responseTimeout(Duration.ofSeconds(45)) + .streamIdleTimeout(Duration.ofSeconds(15)) .build(); JdkHttpTransport builtTransport = JdkHttpTransport.builder().config(config).build(); @@ -311,6 +332,8 @@ void testJdkHttpTransportBuilder() { assertNotNull(builtTransport); assertNotNull(builtTransport.getClient()); assertEquals(config, builtTransport.getConfig()); + assertEquals(Duration.ofSeconds(45), builtTransport.getConfig().getResponseTimeout()); + assertEquals(Duration.ofSeconds(15), builtTransport.getConfig().getStreamIdleTimeout()); assertFalse(builtTransport.isClosed()); builtTransport.close(); assertTrue(builtTransport.isClosed()); @@ -1063,4 +1086,350 @@ void testHttpVersionConfig() { assertNotNull(jdkHttpTransport); assertNotNull(jdkHttpTransport2); } + + @Test + void testStreamColdStartSurvivesGlobalTimeout() throws Exception { + // Reproduces the bug reported in the issue 1302 + + HttpTransportConfig customConfig = + HttpTransportConfig.builder() + .readTimeout(Duration.ofSeconds(1)) // Very tight global timeout + .responseTimeout(Duration.ofSeconds(4)) // Ample first-chunk timeout + .streamIdleTimeout(Duration.ofSeconds(2)) + .build(); + + JdkHttpTransport customTransport = new JdkHttpTransport(customConfig); + + try { + // Simulate the cold start overhead + LLM thinking time by delaying headers for 2 + // seconds. + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeadersDelay(2, TimeUnit.SECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/cold-start-bug-reproduction").toString()) + .method("POST") + .body("{}") + .build(); + + // The test succeeds ONLY if the stream survives the 2-second initial delay + // without being killed by the 1-second global readTimeout. + StepVerifier.create(customTransport.stream(request)) + .expectNextMatches(data -> data.contains("\"id\":\"1\"")) + .verifyComplete(); + } finally { + customTransport.close(); + } + } + + @Test + void testStreamResponseTimeout() { + // Test Timeout Strategy 1: + // Delay headers by 3 seconds, which exceeds the configured responseTimeout (2 seconds). + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setBody("data: {\"id\":\"1\"}\n\ndata: [DONE]\n\n") + .setHeader("Content-Type", "text/event-stream") + .setHeadersDelay(3, TimeUnit.SECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/ttft-timeout").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(transport.stream(request)) + .expectErrorMatches( + e -> + e instanceof HttpTransportException + && e.getMessage().contains("Stream timeout")) + .verify(Duration.ofSeconds(5)); + } + + @Test + void testStreamIdleTimeout() { + // Test Timeout Strategy 2 (Inter-token gap): + // Emit the first complete event immediately, then delay the second event long enough to + // exceed streamIdleTimeout. + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody("data: {\"id\":\"1\"}\n\ndata: {\"id\":\"2\"}\n\n") + .throttleBody(19, 2, TimeUnit.SECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/idle-timeout").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(transport.stream(request)) + .expectNextMatches(data -> data.contains("\"id\":\"1\"")) + .expectErrorMatches( + e -> + e instanceof HttpTransportException + && e.getMessage().contains("Stream timeout")) + .verify(Duration.ofSeconds(5)); + } + + @Test + void testStreamTimeoutClosesBodyWhenFutureCompletesAfterCancellation() { + HttpTransportConfig customConfig = + HttpTransportConfig.builder() + .responseTimeout(Duration.ofMillis(100)) + .streamIdleTimeout(Duration.ofSeconds(1)) + .build(); + BlockingInputStream body = new BlockingInputStream(); + AtomicReference>> futureRef = + new AtomicReference<>(); + JdkHttpTransport customTransport = + new JdkHttpTransport(new DeferredBodyHttpClient(futureRef, body), customConfig); + + try { + HttpRequest request = + HttpRequest.builder() + .url("http://localhost/deferred-body") + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(customTransport.stream(request)) + .expectErrorMatches( + e -> + e instanceof HttpTransportException + && e.getMessage().contains("Stream timeout")) + .verify(Duration.ofSeconds(2)); + + CompletableFuture> future = futureRef.get(); + assertNotNull(future); + assertTrue(future.isCancelled(), "Timeout should cancel the pending async request"); + + future.complete(new TestHttpResponse(200, body)); + assertTrue(body.awaitClosed(), "Response body must be closed after late completion"); + } finally { + customTransport.close(); + } + } + + @Test + void testStreamSurvivesGlobalReadTimeout() { + // Verify that streaming requests are NOT killed by the global readTimeout. + // readTimeout is 2s, but we will make the stream take roughly 3s overall. + // We throttle 10 bytes every 500ms. Inter-token gap is < 1s, so streamIdleTimeout is + // respected. + String sseBody = + "data: 1\n\n" + + "data: 2\n\n" + + "data: 3\n\n" + + "data: 4\n\n" + + "data: 5\n\n" + + "data: [DONE]\n\n"; + + mockServer.enqueue( + new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "text/event-stream") + .setBody(sseBody) + .throttleBody(10, 500, TimeUnit.MILLISECONDS)); + + HttpRequest request = + HttpRequest.builder() + .url(mockServer.url("/survive-timeout").toString()) + .method("POST") + .body("{}") + .build(); + + StepVerifier.create(transport.stream(request)) + .expectNextCount(5) // Should successfully receive all 5 data chunks + .verifyComplete(); + } + + private static class DeferredBodyHttpClient extends HttpClient { + private final AtomicReference>> + futureRef; + private final InputStream body; + + DeferredBodyHttpClient( + AtomicReference>> + futureRef, + InputStream body) { + this.futureRef = futureRef; + this.body = body; + } + + @Override + public Optional cookieHandler() { + return Optional.empty(); + } + + @Override + public Optional connectTimeout() { + return Optional.empty(); + } + + @Override + public Redirect followRedirects() { + return Redirect.NEVER; + } + + @Override + public Optional proxy() { + return Optional.empty(); + } + + @Override + public SSLContext sslContext() { + return null; + } + + @Override + public SSLParameters sslParameters() { + return new SSLParameters(); + } + + @Override + public Optional authenticator() { + return Optional.empty(); + } + + @Override + public Version version() { + return Version.HTTP_2; + } + + @Override + public Optional executor() { + return Optional.empty(); + } + + @Override + public java.net.http.HttpResponse send( + java.net.http.HttpRequest request, + java.net.http.HttpResponse.BodyHandler responseBodyHandler) { + throw new UnsupportedOperationException("send is not used in this test"); + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture> sendAsync( + java.net.http.HttpRequest request, + java.net.http.HttpResponse.BodyHandler responseBodyHandler) { + CompletableFuture> future = new LateCompletableFuture<>(); + futureRef.set( + (CompletableFuture>) + (CompletableFuture) future); + return future; + } + + @Override + public CompletableFuture> sendAsync( + java.net.http.HttpRequest request, + java.net.http.HttpResponse.BodyHandler responseBodyHandler, + java.net.http.HttpResponse.PushPromiseHandler pushPromiseHandler) { + return sendAsync(request, responseBodyHandler); + } + } + + private static class LateCompletableFuture extends CompletableFuture { + private final AtomicBoolean cancelled = new AtomicBoolean(false); + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + cancelled.set(true); + return true; + } + + @Override + public boolean isCancelled() { + return cancelled.get(); + } + } + + private static class TestHttpResponse implements java.net.http.HttpResponse { + private final int statusCode; + private final InputStream body; + + TestHttpResponse(int statusCode, InputStream body) { + this.statusCode = statusCode; + this.body = body; + } + + @Override + public int statusCode() { + return statusCode; + } + + @Override + public java.net.http.HttpRequest request() { + return null; + } + + @Override + public Optional> previousResponse() { + return Optional.empty(); + } + + @Override + public java.net.http.HttpHeaders headers() { + return java.net.http.HttpHeaders.of(Map.of(), (name, value) -> true); + } + + @Override + public InputStream body() { + return body; + } + + @Override + public Optional sslSession() { + return Optional.empty(); + } + + @Override + public URI uri() { + return URI.create("http://localhost/deferred-body"); + } + + @Override + public Version version() { + return Version.HTTP_2; + } + } + + private static class BlockingInputStream extends InputStream { + private final CountDownLatch closed = new CountDownLatch(1); + + @Override + public int read() { + return -1; + } + + @Override + public byte[] readAllBytes() { + return "closed".getBytes(StandardCharsets.UTF_8); + } + + @Override + public void close() throws IOException { + closed.countDown(); + super.close(); + } + + boolean awaitClosed() { + try { + return closed.await(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + } }