Skip to content

Commit 05cd17d

Browse files
authored
Merge branch 'main' into iot_metrics
2 parents d285288 + 17f67d9 commit 05cd17d

11 files changed

Lines changed: 212 additions & 84 deletions

File tree

crt/aws-c-auth

crt/aws-lc

src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,7 @@
55

66
package software.amazon.awssdk.crt.http;
77

8-
import software.amazon.awssdk.crt.CRT;
98
import software.amazon.awssdk.crt.CrtResource;
10-
import software.amazon.awssdk.crt.CrtRuntimeException;
11-
12-
import java.util.concurrent.CompletableFuture;
139

1410
/**
1511
* An base class represents a single Http Request/Response for both HTTP/1.1 and
@@ -100,6 +96,22 @@ public int getResponseStatusCode() {
10096
throw new IllegalStateException("Can't get Status Code on Closed Stream");
10197
}
10298

99+
/**
100+
* Cancels the stream with the default error code (AWS_ERROR_HTTP_STREAM_CANCELLED).
101+
* <p>
102+
* For HTTP/1.1 streams, this is equivalent to closing the connection.
103+
* For HTTP/2 streams, this sends a RST_STREAM frame with AWS_HTTP2_ERR_CANCEL.
104+
* <p>
105+
* The stream will complete with AWS_ERROR_HTTP_STREAM_CANCELLED, unless the stream is
106+
* already completing for other reasons, or the stream is not activated,
107+
* in which case this call will have no effect.
108+
*/
109+
public void cancel() {
110+
if (!isNull()) {
111+
httpStreamBaseCancelDefaultError(getNativeHandle());
112+
}
113+
}
114+
103115
/*******************************************************************************
104116
* Native methods
105117
******************************************************************************/
@@ -111,4 +123,6 @@ public int getResponseStatusCode() {
111123
private static native void httpStreamBaseActivate(long http_stream, HttpStreamBase streamObj);
112124

113125
private static native int httpStreamBaseGetResponseStatusCode(long http_stream);
126+
127+
private static native void httpStreamBaseCancelDefaultError(long http_stream);
114128
}

src/native/http_request_response.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,26 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS
706706
aws_http_stream_update_window(stream, window_update);
707707
}
708708

709+
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpStreamBaseCancelDefaultError(
710+
JNIEnv *env,
711+
jclass jni_class,
712+
jlong jni_binding) {
713+
714+
(void)jni_class;
715+
aws_cache_jni_ids(env);
716+
717+
struct http_stream_binding *binding = (struct http_stream_binding *)jni_binding;
718+
struct aws_http_stream *stream = binding->native_stream;
719+
720+
if (stream == NULL) {
721+
aws_jni_throw_runtime_exception(env, "HttpStream is null.");
722+
return;
723+
}
724+
725+
AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "Cancelling Stream with default error. stream: %p", (void *)stream);
726+
aws_http_stream_cancel_default_error(stream);
727+
}
728+
709729
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2Stream_http2StreamResetStream(
710730
JNIEnv *env,
711731
jclass jni_class,

src/test/java/software/amazon/awssdk/crt/test/Http2ClientLocalHostTest.java

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
import software.amazon.awssdk.crt.Log;
4545

4646
public class Http2ClientLocalHostTest extends HttpClientTestFixture {
47-
// crt/aws-c-http/tests/mock_server includes a readme on how the server can be run locally for testing.
47+
// crt/aws-c-http/tests/mock_server includes a readme on how the server can be
48+
// run locally for testing.
4849
private static final int LOCAL_HTTPS_PORT = 3443;
4950
private static final int LOCAL_HTTP_PORT = 3280;
5051

@@ -208,7 +209,7 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int
208209
}
209210

210211
@Override
211-
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){
212+
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
212213
String bodyString = new String(bodyBytesIn);
213214
// Parse {"bytes": 123456} manually
214215
int start = bodyString.indexOf("\"bytes\":") + 8;
@@ -255,33 +256,34 @@ public void testRequestsUploadStress() throws Exception {
255256

256257
final CompletableFuture<Void> requestCompleteFuture = new CompletableFuture<Void>();
257258
final long expectedLength = bodyLength;
258-
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
259-
@Override
260-
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
261-
HttpHeader[] nextHeaders) {
259+
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request,
260+
new HttpStreamBaseResponseHandler() {
261+
@Override
262+
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
263+
HttpHeader[] nextHeaders) {
262264

263-
Assert.assertTrue(responseStatusCode == 200);
264-
}
265+
Assert.assertTrue(responseStatusCode == 200);
266+
}
265267

266-
@Override
267-
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){
268-
String bodyString = new String(bodyBytesIn);
269-
// Parse {"bytes": 123456} manually
270-
int start = bodyString.indexOf("\"bytes\":") + 8;
271-
int end = bodyString.indexOf("}", start);
272-
String bytesStr = bodyString.substring(start, end).trim();
273-
long receivedLength = Long.parseLong(bytesStr);
274-
Assert.assertTrue(receivedLength == expectedLength);
275-
return bodyString.length();
276-
}
268+
@Override
269+
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
270+
String bodyString = new String(bodyBytesIn);
271+
// Parse {"bytes": 123456} manually
272+
int start = bodyString.indexOf("\"bytes\":") + 8;
273+
int end = bodyString.indexOf("}", start);
274+
String bytesStr = bodyString.substring(start, end).trim();
275+
long receivedLength = Long.parseLong(bytesStr);
276+
Assert.assertTrue(receivedLength == expectedLength);
277+
return bodyString.length();
278+
}
277279

278-
@Override
279-
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
280-
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
281-
stream.close();
282-
requestCompleteFuture.complete(null);
283-
}
284-
});
280+
@Override
281+
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
282+
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
283+
stream.close();
284+
requestCompleteFuture.complete(null);
285+
}
286+
});
285287

286288
acquireCompleteFuture.get(30, TimeUnit.SECONDS);
287289
requestCompleteFuture.get(5, TimeUnit.MINUTES);
@@ -305,29 +307,30 @@ public void testRequestsDownloadStress() throws Exception {
305307

306308
final CompletableFuture<Void> requestCompleteFuture = new CompletableFuture<Void>();
307309
final AtomicLong receivedLength = new AtomicLong(0);
308-
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
309-
@Override
310-
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
311-
HttpHeader[] nextHeaders) {
310+
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request,
311+
new HttpStreamBaseResponseHandler() {
312+
@Override
313+
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
314+
HttpHeader[] nextHeaders) {
312315

313-
Assert.assertTrue(responseStatusCode == 200);
314-
}
316+
Assert.assertTrue(responseStatusCode == 200);
317+
}
315318

316-
@Override
317-
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){
318-
receivedLength.addAndGet(bodyBytesIn.length);
319+
@Override
320+
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
321+
receivedLength.addAndGet(bodyBytesIn.length);
319322

320-
return bodyBytesIn.length;
321-
}
323+
return bodyBytesIn.length;
324+
}
322325

323-
@Override
324-
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
326+
@Override
327+
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
325328

326-
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
327-
stream.close();
328-
requestCompleteFuture.complete(null);
329-
}
330-
});
329+
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
330+
stream.close();
331+
requestCompleteFuture.complete(null);
332+
}
333+
});
331334

332335
acquireCompleteFuture.get(30, TimeUnit.SECONDS);
333336
requestCompleteFuture.get(5, TimeUnit.MINUTES);
@@ -337,4 +340,5 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
337340
CrtResource.logNativeResources();
338341
CrtResource.waitForNoResources();
339342
}
343+
340344
}

src/test/java/software/amazon/awssdk/crt/test/Http2StreamManagerTest.java

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,8 @@ public void testMaxConcurrentStreamsEnforcement() throws Exception {
210210

211211
Http2StreamManagerOptions options = new Http2StreamManagerOptions();
212212
options.withMaxConcurrentStreams(maxConcurrentStreams)
213-
.withMaxConcurrentStreamsPerConnection(100)
214-
.withIdealConcurrentStreamsPerConnection(100);
213+
.withMaxConcurrentStreamsPerConnection(100)
214+
.withIdealConcurrentStreamsPerConnection(100);
215215

216216
HttpClientConnectionManagerOptions connectionManagerOptions = new HttpClientConnectionManagerOptions();
217217
connectionManagerOptions.withClientBootstrap(bootstrap)
@@ -274,13 +274,15 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
274274
// (allowing small margin for timing)
275275
int observedMax = maxConcurrentActive.get();
276276
Log.log(Log.LogLevel.Info, Log.LogSubject.HttpConnectionManager,
277-
String.format("Max concurrent streams observed: %d (limit: %d)", observedMax, maxConcurrentStreams));
277+
String.format("Max concurrent streams observed: %d (limit: %d)", observedMax,
278+
maxConcurrentStreams));
278279

279-
// The observed max should be close to our limit (allowing some flexibility for race conditions)
280+
// The observed max should be close to our limit (allowing some flexibility for
281+
// race conditions)
280282
Assert.assertTrue(
281-
String.format("Expected max concurrent streams around %d, but observed %d",
282-
maxConcurrentStreams, observedMax),
283-
observedMax <= maxConcurrentStreams);
283+
String.format("Expected max concurrent streams around %d, but observed %d",
284+
maxConcurrentStreams, observedMax),
285+
observedMax <= maxConcurrentStreams);
284286

285287
} finally {
286288
streamManager.close();
@@ -291,4 +293,49 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
291293
CrtResource.logNativeResources();
292294
CrtResource.waitForNoResources();
293295
}
296+
297+
@Test
298+
public void testHttp2StreamCancel() throws Exception {
299+
skipIfAndroid();
300+
skipIfLocalhostUnavailable();
301+
302+
URI uri = new URI(endpoint);
303+
// Use a large object to make the request outlive the cancel stage and let cancel happen as expected.
304+
String large_file_path = "/crt-canary-obj.txt";
305+
int maxConcurrentStreams = 20;
306+
try (Http2StreamManager streamManager = createStreamManager(uri, 100, maxConcurrentStreams)) {
307+
308+
Http2Request request = createHttp2Request("GET", endpoint, large_file_path, EMPTY_BODY);
309+
310+
final CompletableFuture<Integer> requestCompleteFuture = new CompletableFuture<>();
311+
312+
streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
313+
@Override
314+
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
315+
HttpHeader[] nextHeaders) {
316+
// Cancel the HTTP/2 stream using the default error code (AWS_ERROR_HTTP_STREAM_CANCELLED)
317+
stream.cancel();
318+
}
319+
320+
@Override
321+
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
322+
requestCompleteFuture.complete(errorCode);
323+
stream.close();
324+
}
325+
}).whenComplete((stream, throwable) -> {
326+
if (throwable != null) {
327+
requestCompleteFuture.completeExceptionally(throwable);
328+
}
329+
});
330+
331+
332+
int actualErrorCode = requestCompleteFuture.get(60, TimeUnit.SECONDS);
333+
334+
// The HTTP/2 stream should complete with AWS_ERROR_HTTP_STREAM_CANCELLED
335+
Assert.assertEquals("Error code should be AWS_ERROR_HTTP_STREAM_CANCELLED",
336+
"AWS_ERROR_HTTP_STREAM_CANCELLED", CRT.awsErrorName(actualErrorCode));
337+
}
338+
CrtResource.logNativeResources();
339+
CrtResource.waitForNoResources();
340+
}
294341
}

src/test/java/software/amazon/awssdk/crt/test/HttpClientConnectionTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ public void testRetryableErrorCheck() {
153153
assertTrue(HttpClientConnection.isErrorRetryable(exception));
154154
}
155155

156-
157156
/**
158157
* This test exercises the noProxyHosts configuration. It is included here
159158
* rather than in ProxyTests because a successful test connects to the configured

0 commit comments

Comments
 (0)