Skip to content

Commit 9807b97

Browse files
committed
add stream cancel support
1 parent 2176d23 commit 9807b97

6 files changed

Lines changed: 206 additions & 79 deletions

File tree

src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,7 @@
55

66
package software.amazon.awssdk.crt.http;
77

8-
import software.amazon.awssdk.crt.CRT;
98
import software.amazon.awssdk.crt.CrtResource;
10-
import software.amazon.awssdk.crt.CrtRuntimeException;
11-
12-
import java.util.concurrent.CompletableFuture;
139

1410
/**
1511
* An base class represents a single Http Request/Response for both HTTP/1.1 and
@@ -100,6 +96,25 @@ public int getResponseStatusCode() {
10096
throw new IllegalStateException("Can't get Status Code on Closed Stream");
10197
}
10298

99+
/**
100+
* Cancels the stream.
101+
* <p>
102+
* For HTTP/1.1 streams, this is equivalent to closing the connection.
103+
* For HTTP/2 streams, this sends a RST_STREAM frame with AWS_HTTP2_ERR_CANCEL.
104+
* <p>
105+
* The stream will complete with the provided error code, unless the stream is
106+
* already completing for other reasons, or the stream is not activated,
107+
* in which case this call will have no effect.
108+
*
109+
* @param errorCode The CRT error code to use when completing the stream.
110+
* Use CRT.awsErrorCode() to convert AWS error codes.
111+
*/
112+
public void cancel(int errorCode) {
113+
if (!isNull()) {
114+
httpStreamBaseCancel(getNativeHandle(), errorCode);
115+
}
116+
}
117+
103118
/*******************************************************************************
104119
* Native methods
105120
******************************************************************************/
@@ -111,4 +126,6 @@ public int getResponseStatusCode() {
111126
private static native void httpStreamBaseActivate(long http_stream, HttpStreamBase streamObj);
112127

113128
private static native int httpStreamBaseGetResponseStatusCode(long http_stream);
129+
130+
private static native void httpStreamBaseCancel(long http_stream, int error_code);
114131
}

src/native/http_request_response.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,27 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS
706706
aws_http_stream_update_window(stream, window_update);
707707
}
708708

709+
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpStreamBaseCancel(
710+
JNIEnv *env,
711+
jclass jni_class,
712+
jlong jni_binding,
713+
jint error_code) {
714+
715+
(void)jni_class;
716+
aws_cache_jni_ids(env);
717+
718+
struct http_stream_binding *binding = (struct http_stream_binding *)jni_binding;
719+
struct aws_http_stream *stream = binding->native_stream;
720+
721+
if (stream == NULL) {
722+
aws_jni_throw_runtime_exception(env, "HttpStream is null.");
723+
return;
724+
}
725+
726+
AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "Cancelling Stream. stream: %p, error_code: %d", (void *)stream, error_code);
727+
aws_http_stream_cancel(stream, error_code);
728+
}
729+
709730
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2Stream_http2StreamResetStream(
710731
JNIEnv *env,
711732
jclass jni_class,

src/test/java/software/amazon/awssdk/crt/test/Http2ClientLocalHostTest.java

Lines changed: 88 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
import software.amazon.awssdk.crt.Log;
4545

4646
public class Http2ClientLocalHostTest extends HttpClientTestFixture {
47-
// crt/aws-c-http/tests/mock_server includes a readme on how the server can be run locally for testing.
47+
// crt/aws-c-http/tests/mock_server includes a readme on how the server can be
48+
// run locally for testing.
4849
private static final int LOCAL_HTTPS_PORT = 3443;
4950
private static final int LOCAL_HTTP_PORT = 3280;
5051

@@ -208,7 +209,7 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int
208209
}
209210

210211
@Override
211-
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){
212+
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
212213
String bodyString = new String(bodyBytesIn);
213214
// Parse {"bytes": 123456} manually
214215
int start = bodyString.indexOf("\"bytes\":") + 8;
@@ -255,33 +256,34 @@ public void testRequestsUploadStress() throws Exception {
255256

256257
final CompletableFuture<Void> requestCompleteFuture = new CompletableFuture<Void>();
257258
final long expectedLength = bodyLength;
258-
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
259-
@Override
260-
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
261-
HttpHeader[] nextHeaders) {
259+
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request,
260+
new HttpStreamBaseResponseHandler() {
261+
@Override
262+
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
263+
HttpHeader[] nextHeaders) {
262264

263-
Assert.assertTrue(responseStatusCode == 200);
264-
}
265+
Assert.assertTrue(responseStatusCode == 200);
266+
}
265267

266-
@Override
267-
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){
268-
String bodyString = new String(bodyBytesIn);
269-
// Parse {"bytes": 123456} manually
270-
int start = bodyString.indexOf("\"bytes\":") + 8;
271-
int end = bodyString.indexOf("}", start);
272-
String bytesStr = bodyString.substring(start, end).trim();
273-
long receivedLength = Long.parseLong(bytesStr);
274-
Assert.assertTrue(receivedLength == expectedLength);
275-
return bodyString.length();
276-
}
268+
@Override
269+
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
270+
String bodyString = new String(bodyBytesIn);
271+
// Parse {"bytes": 123456} manually
272+
int start = bodyString.indexOf("\"bytes\":") + 8;
273+
int end = bodyString.indexOf("}", start);
274+
String bytesStr = bodyString.substring(start, end).trim();
275+
long receivedLength = Long.parseLong(bytesStr);
276+
Assert.assertTrue(receivedLength == expectedLength);
277+
return bodyString.length();
278+
}
277279

278-
@Override
279-
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
280-
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
281-
stream.close();
282-
requestCompleteFuture.complete(null);
283-
}
284-
});
280+
@Override
281+
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
282+
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
283+
stream.close();
284+
requestCompleteFuture.complete(null);
285+
}
286+
});
285287

286288
acquireCompleteFuture.get(30, TimeUnit.SECONDS);
287289
requestCompleteFuture.get(5, TimeUnit.MINUTES);
@@ -305,29 +307,30 @@ public void testRequestsDownloadStress() throws Exception {
305307

306308
final CompletableFuture<Void> requestCompleteFuture = new CompletableFuture<Void>();
307309
final AtomicLong receivedLength = new AtomicLong(0);
308-
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
309-
@Override
310-
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
311-
HttpHeader[] nextHeaders) {
310+
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request,
311+
new HttpStreamBaseResponseHandler() {
312+
@Override
313+
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
314+
HttpHeader[] nextHeaders) {
312315

313-
Assert.assertTrue(responseStatusCode == 200);
314-
}
316+
Assert.assertTrue(responseStatusCode == 200);
317+
}
315318

316-
@Override
317-
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){
318-
receivedLength.addAndGet(bodyBytesIn.length);
319+
@Override
320+
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
321+
receivedLength.addAndGet(bodyBytesIn.length);
319322

320-
return bodyBytesIn.length;
321-
}
323+
return bodyBytesIn.length;
324+
}
322325

323-
@Override
324-
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
326+
@Override
327+
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
325328

326-
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
327-
stream.close();
328-
requestCompleteFuture.complete(null);
329-
}
330-
});
329+
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
330+
stream.close();
331+
requestCompleteFuture.complete(null);
332+
}
333+
});
331334

332335
acquireCompleteFuture.get(30, TimeUnit.SECONDS);
333336
requestCompleteFuture.get(5, TimeUnit.MINUTES);
@@ -337,4 +340,45 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
337340
CrtResource.logNativeResources();
338341
CrtResource.waitForNoResources();
339342
}
343+
344+
@Test
345+
public void testHttp2StreamCancel() throws Exception {
346+
skipIfAndroid();
347+
skipIfLocalhostUnavailable();
348+
URI uri = new URI(String.format("https://localhost:%d/echo", LOCAL_HTTPS_PORT));
349+
try (Http2StreamManager streamManager = createStreamManager(uri, 100)) {
350+
long bodyLength = 250000L;
351+
352+
Http2Request request = createHttp2Request("GET", uri, 0);
353+
request.addHeader(new HttpHeader("x-repeat-data", String.valueOf(bodyLength)));
354+
/* Get a slow response to make sure we cancel before finishes */
355+
request.addHeader(new HttpHeader("x-slow-response", "true"));
356+
357+
final CompletableFuture<Void> requestCompleteFuture = new CompletableFuture<Void>();
358+
final int expectedErrorCode = 0x0832; // Some Random error code
359+
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request,
360+
new HttpStreamBaseResponseHandler() {
361+
@Override
362+
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
363+
HttpHeader[] nextHeaders) {
364+
// Cancel the HTTP/2 stream immediately upon receiving headers with specific
365+
// error code
366+
stream.cancel(expectedErrorCode);
367+
}
368+
369+
@Override
370+
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
371+
372+
Assert.assertTrue(errorCode == expectedErrorCode);
373+
stream.close();
374+
requestCompleteFuture.complete(null);
375+
}
376+
});
377+
378+
acquireCompleteFuture.get(3, TimeUnit.SECONDS);
379+
requestCompleteFuture.get(10, TimeUnit.SECONDS);
380+
}
381+
CrtResource.logNativeResources();
382+
CrtResource.waitForNoResources();
383+
}
340384
}

src/test/java/software/amazon/awssdk/crt/test/Http2StreamManagerTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,8 @@ public void testMaxConcurrentStreamsEnforcement() throws Exception {
210210

211211
Http2StreamManagerOptions options = new Http2StreamManagerOptions();
212212
options.withMaxConcurrentStreams(maxConcurrentStreams)
213-
.withMaxConcurrentStreamsPerConnection(100)
214-
.withIdealConcurrentStreamsPerConnection(100);
213+
.withMaxConcurrentStreamsPerConnection(100)
214+
.withIdealConcurrentStreamsPerConnection(100);
215215

216216
HttpClientConnectionManagerOptions connectionManagerOptions = new HttpClientConnectionManagerOptions();
217217
connectionManagerOptions.withClientBootstrap(bootstrap)
@@ -274,13 +274,15 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
274274
// (allowing small margin for timing)
275275
int observedMax = maxConcurrentActive.get();
276276
Log.log(Log.LogLevel.Info, Log.LogSubject.HttpConnectionManager,
277-
String.format("Max concurrent streams observed: %d (limit: %d)", observedMax, maxConcurrentStreams));
277+
String.format("Max concurrent streams observed: %d (limit: %d)", observedMax,
278+
maxConcurrentStreams));
278279

279-
// The observed max should be close to our limit (allowing some flexibility for race conditions)
280+
// The observed max should be close to our limit (allowing some flexibility for
281+
// race conditions)
280282
Assert.assertTrue(
281-
String.format("Expected max concurrent streams around %d, but observed %d",
282-
maxConcurrentStreams, observedMax),
283-
observedMax <= maxConcurrentStreams);
283+
String.format("Expected max concurrent streams around %d, but observed %d",
284+
maxConcurrentStreams, observedMax),
285+
observedMax <= maxConcurrentStreams);
284286

285287
} finally {
286288
streamManager.close();

src/test/java/software/amazon/awssdk/crt/test/HttpClientConnectionTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ public void testRetryableErrorCheck() {
153153
assertTrue(HttpClientConnection.isErrorRetryable(exception));
154154
}
155155

156-
157156
/**
158157
* This test exercises the noProxyHosts configuration. It is included here
159158
* rather than in ProxyTests because a successful test connects to the configured

0 commit comments

Comments
 (0)