Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crt/aws-c-auth
2 changes: 1 addition & 1 deletion crt/aws-lc
22 changes: 18 additions & 4 deletions src/main/java/software/amazon/awssdk/crt/http/HttpStreamBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@

package software.amazon.awssdk.crt.http;

import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;

import java.util.concurrent.CompletableFuture;

/**
* An base class represents a single Http Request/Response for both HTTP/1.1 and
Expand Down Expand Up @@ -100,6 +96,22 @@ public int getResponseStatusCode() {
throw new IllegalStateException("Can't get Status Code on Closed Stream");
}

/**
* Cancels the stream with the default error code (AWS_ERROR_HTTP_STREAM_CANCELLED).
* <p>
* For HTTP/1.1 streams, this is equivalent to closing the connection.
* For HTTP/2 streams, this sends a RST_STREAM frame with AWS_HTTP2_ERR_CANCEL.
* <p>
* The stream will complete with AWS_ERROR_HTTP_STREAM_CANCELLED, unless the stream is
* already completing for other reasons, or the stream is not activated,
* in which case this call will have no effect.
*/
public void cancel() {
if (!isNull()) {
httpStreamBaseCancelDefaultError(getNativeHandle());
}
}

/*******************************************************************************
* Native methods
******************************************************************************/
Expand All @@ -111,4 +123,6 @@ public int getResponseStatusCode() {
private static native void httpStreamBaseActivate(long http_stream, HttpStreamBase streamObj);

private static native int httpStreamBaseGetResponseStatusCode(long http_stream);

private static native void httpStreamBaseCancelDefaultError(long http_stream);
}
20 changes: 20 additions & 0 deletions src/native/http_request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,26 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpS
aws_http_stream_update_window(stream, window_update);
}

JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_HttpStreamBase_httpStreamBaseCancelDefaultError(
JNIEnv *env,
jclass jni_class,
jlong jni_binding) {

(void)jni_class;
aws_cache_jni_ids(env);

struct http_stream_binding *binding = (struct http_stream_binding *)jni_binding;
struct aws_http_stream *stream = binding->native_stream;

if (stream == NULL) {
aws_jni_throw_runtime_exception(env, "HttpStream is null.");
return;
}

AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "Cancelling Stream with default error. stream: %p", (void *)stream);
aws_http_stream_cancel_default_error(stream);
}

JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_http_Http2Stream_http2StreamResetStream(
JNIEnv *env,
jclass jni_class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
import software.amazon.awssdk.crt.Log;

public class Http2ClientLocalHostTest extends HttpClientTestFixture {
// crt/aws-c-http/tests/mock_server includes a readme on how the server can be run locally for testing.
// crt/aws-c-http/tests/mock_server includes a readme on how the server can be
// run locally for testing.
private static final int LOCAL_HTTPS_PORT = 3443;
private static final int LOCAL_HTTP_PORT = 3280;

Expand Down Expand Up @@ -208,7 +209,7 @@ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int
}

@Override
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
String bodyString = new String(bodyBytesIn);
// Parse {"bytes": 123456} manually
int start = bodyString.indexOf("\"bytes\":") + 8;
Expand Down Expand Up @@ -255,33 +256,34 @@ public void testRequestsUploadStress() throws Exception {

final CompletableFuture<Void> requestCompleteFuture = new CompletableFuture<Void>();
final long expectedLength = bodyLength;
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
@Override
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request,
new HttpStreamBaseResponseHandler() {
@Override
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {

Assert.assertTrue(responseStatusCode == 200);
}
Assert.assertTrue(responseStatusCode == 200);
}

@Override
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){
String bodyString = new String(bodyBytesIn);
// Parse {"bytes": 123456} manually
int start = bodyString.indexOf("\"bytes\":") + 8;
int end = bodyString.indexOf("}", start);
String bytesStr = bodyString.substring(start, end).trim();
long receivedLength = Long.parseLong(bytesStr);
Assert.assertTrue(receivedLength == expectedLength);
return bodyString.length();
}
@Override
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
String bodyString = new String(bodyBytesIn);
// Parse {"bytes": 123456} manually
int start = bodyString.indexOf("\"bytes\":") + 8;
int end = bodyString.indexOf("}", start);
String bytesStr = bodyString.substring(start, end).trim();
long receivedLength = Long.parseLong(bytesStr);
Assert.assertTrue(receivedLength == expectedLength);
return bodyString.length();
}

@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
stream.close();
requestCompleteFuture.complete(null);
}
});
@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
stream.close();
requestCompleteFuture.complete(null);
}
});

acquireCompleteFuture.get(30, TimeUnit.SECONDS);
requestCompleteFuture.get(5, TimeUnit.MINUTES);
Expand All @@ -305,29 +307,30 @@ public void testRequestsDownloadStress() throws Exception {

final CompletableFuture<Void> requestCompleteFuture = new CompletableFuture<Void>();
final AtomicLong receivedLength = new AtomicLong(0);
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
@Override
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
CompletableFuture<Http2Stream> acquireCompleteFuture = streamManager.acquireStream(request,
new HttpStreamBaseResponseHandler() {
@Override
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {

Assert.assertTrue(responseStatusCode == 200);
}
Assert.assertTrue(responseStatusCode == 200);
}

@Override
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn){
receivedLength.addAndGet(bodyBytesIn.length);
@Override
public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
receivedLength.addAndGet(bodyBytesIn.length);

return bodyBytesIn.length;
}
return bodyBytesIn.length;
}

@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {

Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
stream.close();
requestCompleteFuture.complete(null);
}
});
Assert.assertTrue(errorCode == CRT.AWS_CRT_SUCCESS);
stream.close();
requestCompleteFuture.complete(null);
}
});

acquireCompleteFuture.get(30, TimeUnit.SECONDS);
requestCompleteFuture.get(5, TimeUnit.MINUTES);
Expand All @@ -337,4 +340,5 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
CrtResource.logNativeResources();
CrtResource.waitForNoResources();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ public void testMaxConcurrentStreamsEnforcement() throws Exception {

Http2StreamManagerOptions options = new Http2StreamManagerOptions();
options.withMaxConcurrentStreams(maxConcurrentStreams)
.withMaxConcurrentStreamsPerConnection(100)
.withIdealConcurrentStreamsPerConnection(100);
.withMaxConcurrentStreamsPerConnection(100)
.withIdealConcurrentStreamsPerConnection(100);

HttpClientConnectionManagerOptions connectionManagerOptions = new HttpClientConnectionManagerOptions();
connectionManagerOptions.withClientBootstrap(bootstrap)
Expand Down Expand Up @@ -274,13 +274,15 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
// (allowing small margin for timing)
int observedMax = maxConcurrentActive.get();
Log.log(Log.LogLevel.Info, Log.LogSubject.HttpConnectionManager,
String.format("Max concurrent streams observed: %d (limit: %d)", observedMax, maxConcurrentStreams));
String.format("Max concurrent streams observed: %d (limit: %d)", observedMax,
maxConcurrentStreams));

// The observed max should be close to our limit (allowing some flexibility for race conditions)
// The observed max should be close to our limit (allowing some flexibility for
// race conditions)
Assert.assertTrue(
String.format("Expected max concurrent streams around %d, but observed %d",
maxConcurrentStreams, observedMax),
observedMax <= maxConcurrentStreams);
String.format("Expected max concurrent streams around %d, but observed %d",
maxConcurrentStreams, observedMax),
observedMax <= maxConcurrentStreams);

} finally {
streamManager.close();
Expand All @@ -291,4 +293,49 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
CrtResource.logNativeResources();
CrtResource.waitForNoResources();
}

@Test
public void testHttp2StreamCancel() throws Exception {
skipIfAndroid();
skipIfLocalhostUnavailable();

URI uri = new URI(endpoint);
// Use a large object to make the request outlive the cancel stage and let cancel happen as expected.
String large_file_path = "/crt-canary-obj.txt";
int maxConcurrentStreams = 20;
try (Http2StreamManager streamManager = createStreamManager(uri, 100, maxConcurrentStreams)) {

Http2Request request = createHttp2Request("GET", endpoint, large_file_path, EMPTY_BODY);

final CompletableFuture<Integer> requestCompleteFuture = new CompletableFuture<>();

streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
@Override
public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
HttpHeader[] nextHeaders) {
// Cancel the HTTP/2 stream using the default error code (AWS_ERROR_HTTP_STREAM_CANCELLED)
stream.cancel();
}

@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
requestCompleteFuture.complete(errorCode);
stream.close();
}
}).whenComplete((stream, throwable) -> {
if (throwable != null) {
requestCompleteFuture.completeExceptionally(throwable);
}
});


int actualErrorCode = requestCompleteFuture.get(60, TimeUnit.SECONDS);

// The HTTP/2 stream should complete with AWS_ERROR_HTTP_STREAM_CANCELLED
Assert.assertEquals("Error code should be AWS_ERROR_HTTP_STREAM_CANCELLED",
"AWS_ERROR_HTTP_STREAM_CANCELLED", CRT.awsErrorName(actualErrorCode));
}
CrtResource.logNativeResources();
CrtResource.waitForNoResources();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public void testRetryableErrorCheck() {
assertTrue(HttpClientConnection.isErrorRetryable(exception));
}


/**
* This test exercises the noProxyHosts configuration. It is included here
* rather than in ProxyTests because a successful test connects to the configured
Expand Down
Loading
Loading