Skip to content

Commit efb9951

Browse files
authored
Merge WriteThroughput metric support (#6703)
* Implement WriteThroughput metric for sync clients (#6688) * Add WriteThroughput tracking for async code path (#6698) * Address feedback * Update doc and update async code path to report writeThroughput for non-streaming operations as well * Create variable for system.nanotime()
1 parent 3b11ae4 commit efb9951

19 files changed

Lines changed: 1278 additions & 38 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Add `WRITE_THROUGHPUT` metric to measure request body upload speed (bytes/sec). This metric is reported at the API call attempt level for requests with a body."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/InternalCoreExecutionAttribute.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import software.amazon.awssdk.annotations.SdkInternalApi;
1919
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
2020
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
21+
import software.amazon.awssdk.core.internal.metrics.RequestBodyMetrics;
2122
import software.amazon.awssdk.retries.api.RetryToken;
2223

2324
/**
@@ -36,6 +37,12 @@ public final class InternalCoreExecutionAttribute extends SdkExecutionAttribute
3637
public static final ExecutionAttribute<RetryToken> RETRY_TOKEN =
3738
new ExecutionAttribute<>("SdkInternalRetryToken");
3839

40+
/**
41+
* Metrics for tracking request body bytes written and timing for WRITE_THROUGHPUT calculation.
42+
*/
43+
public static final ExecutionAttribute<RequestBodyMetrics> REQUEST_BODY_METRICS =
44+
new ExecutionAttribute<>("RequestBodyMetrics");
45+
3946
private InternalCoreExecutionAttribute() {
4047
}
4148
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/ApiCallAttemptMetricCollectionStage.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import software.amazon.awssdk.annotations.SdkInternalApi;
2424
import software.amazon.awssdk.core.Response;
2525
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
26+
import software.amazon.awssdk.core.internal.InternalCoreExecutionAttribute;
2627
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
2728
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
2829
import software.amazon.awssdk.core.internal.http.pipeline.RequestToResponsePipeline;
2930
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
31+
import software.amazon.awssdk.core.internal.metrics.RequestBodyMetrics;
3032
import software.amazon.awssdk.core.internal.metrics.SdkErrorType;
3133
import software.amazon.awssdk.core.metrics.CoreMetric;
3234
import software.amazon.awssdk.http.SdkHttpFullRequest;
@@ -51,6 +53,7 @@ public Response<OutputT> execute(SdkHttpFullRequest input, RequestExecutionConte
5153
reportBackoffDelay(context);
5254

5355
resetBytesRead(context);
56+
resetBytesWritten(context);
5457
try {
5558
Response<OutputT> response = wrapped.execute(input, context);
5659
collectHttpMetrics(apiCallAttemptMetrics, response.httpResponse());
@@ -69,6 +72,11 @@ private void resetBytesRead(RequestExecutionContext context) {
6972
context.executionAttributes().putAttribute(SdkInternalExecutionAttribute.RESPONSE_BYTES_READ, new AtomicLong(0));
7073
}
7174

75+
private void resetBytesWritten(RequestExecutionContext context) {
76+
context.executionAttributes().putAttribute(InternalCoreExecutionAttribute.REQUEST_BODY_METRICS,
77+
new RequestBodyMetrics());
78+
}
79+
7280
private void reportBackoffDelay(RequestExecutionContext context) {
7381
Duration lastBackoffDelay = context.executionAttributes().getAttribute(RetryableStageHelper.LAST_BACKOFF_DELAY_DURATION);
7482
if (lastBackoffDelay != null) {

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncApiCallAttemptMetricCollectionStage.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import software.amazon.awssdk.annotations.SdkInternalApi;
2525
import software.amazon.awssdk.core.Response;
2626
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
27+
import software.amazon.awssdk.core.internal.InternalCoreExecutionAttribute;
2728
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
2829
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
2930
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
31+
import software.amazon.awssdk.core.internal.metrics.RequestBodyMetrics;
3032
import software.amazon.awssdk.core.internal.metrics.SdkErrorType;
3133
import software.amazon.awssdk.core.internal.util.MetricUtils;
3234
import software.amazon.awssdk.core.metrics.CoreMetric;
@@ -41,7 +43,6 @@
4143
@SdkInternalApi
4244
public final class AsyncApiCallAttemptMetricCollectionStage<OutputT> implements RequestPipeline<SdkHttpFullRequest,
4345
CompletableFuture<Response<OutputT>>> {
44-
private static final long ONE_SECOND_IN_NS = Duration.ofSeconds(1).toNanos();
4546

4647
private final RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> wrapped;
4748

@@ -59,11 +60,13 @@ public CompletableFuture<Response<OutputT>> execute(SdkHttpFullRequest input,
5960
reportBackoffDelay(context);
6061

6162
resetBytesRead(context);
63+
resetBytesWritten(context);
6264
CompletableFuture<Response<OutputT>> executeFuture = wrapped.execute(input, context);
6365
CompletableFuture<Response<OutputT>> metricsCollectedFuture = executeFuture.whenComplete((r, t) -> {
6466
if (t == null) {
6567
collectHttpMetrics(apiCallAttemptMetrics, r.httpResponse());
6668
reportReadMetrics(context);
69+
reportWriteThroughput(context);
6770
}
6871

6972
if (t != null) {
@@ -90,23 +93,39 @@ private void reportErrorType(RequestExecutionContext context, Throwable t) {
9093

9194
private void reportReadMetrics(RequestExecutionContext context) {
9295
MetricCollector metricCollector = context.attemptMetricCollector();
93-
long now = System.nanoTime();
94-
9596
long apiCallAttemptStartTime = MetricUtils.apiCallAttemptStartNanoTime(context).getAsLong();
96-
97-
long headersReadEndNanoTime = MetricUtils.responseHeadersReadEndNanoTime(context).getAsLong();
98-
long bytesRead = MetricUtils.apiCallAttemptResponseBytesRead(context).getAsLong();
99-
double bytesReadPerNs = (double) bytesRead / (now - headersReadEndNanoTime);
100-
double bytesReadPerSec = bytesReadPerNs * ONE_SECOND_IN_NS;
101-
97+
long now = System.nanoTime();
10298
long ttlb = now - apiCallAttemptStartTime;
10399

104100
metricCollector.reportMetric(CoreMetric.TIME_TO_LAST_BYTE, Duration.ofNanos(ttlb));
105-
metricCollector.reportMetric(CoreMetric.READ_THROUGHPUT, bytesReadPerSec);
101+
long responseReadStart = MetricUtils.responseHeadersReadEndNanoTime(context).getAsLong();
102+
long responseBytesRead = MetricUtils.apiCallAttemptResponseBytesRead(context).getAsLong();
103+
double readThroughput = MetricUtils.bytesPerSec(responseBytesRead, responseReadStart, now);
104+
metricCollector.reportMetric(CoreMetric.READ_THROUGHPUT, readThroughput);
106105
}
107106

108107
private void resetBytesRead(RequestExecutionContext context) {
109108
context.executionAttributes().putAttribute(SdkInternalExecutionAttribute.RESPONSE_BYTES_READ, new AtomicLong(0));
110109
}
111110

111+
private void resetBytesWritten(RequestExecutionContext context) {
112+
context.executionAttributes().putAttribute(InternalCoreExecutionAttribute.REQUEST_BODY_METRICS,
113+
new RequestBodyMetrics());
114+
}
115+
116+
private void reportWriteThroughput(RequestExecutionContext context) {
117+
RequestBodyMetrics metrics = context.executionAttributes()
118+
.getAttribute(InternalCoreExecutionAttribute.REQUEST_BODY_METRICS);
119+
if (metrics == null) {
120+
return;
121+
}
122+
long bytesWritten = metrics.bytesWritten().get();
123+
long firstByteTime = metrics.firstByteWrittenNanoTime().get();
124+
if (bytesWritten > 0 && firstByteTime > 0) {
125+
long lastByteTime = metrics.lastByteWrittenNanoTime().get();
126+
double writeThroughput = MetricUtils.bytesPerSec(bytesWritten, firstByteTime, lastByteTime);
127+
context.attemptMetricCollector().reportMetric(CoreMetric.WRITE_THROUGHPUT, writeThroughput);
128+
}
129+
}
130+
112131
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/HandleResponseStage.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import software.amazon.awssdk.core.Response;
2222
import software.amazon.awssdk.core.http.HttpResponseHandler;
2323
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
24+
import software.amazon.awssdk.core.internal.InternalCoreExecutionAttribute;
2425
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
2526
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
2627
import software.amazon.awssdk.core.internal.metrics.BytesReadTrackingInputStream;
28+
import software.amazon.awssdk.core.internal.metrics.RequestBodyMetrics;
2729
import software.amazon.awssdk.core.internal.util.MetricUtils;
2830
import software.amazon.awssdk.core.metrics.CoreMetric;
2931
import software.amazon.awssdk.http.AbortableInputStream;
@@ -64,11 +66,28 @@ private void collectMetrics(RequestExecutionContext context) {
6466
long ttlb = now - attemptStartTime;
6567
attemptMetricCollector.reportMetric(CoreMetric.TIME_TO_LAST_BYTE, Duration.ofNanos(ttlb));
6668

67-
long responseBytesRead = MetricUtils.apiCallAttemptResponseBytesRead(context).getAsLong();
6869
long responseReadStart = MetricUtils.responseHeadersReadEndNanoTime(context).getAsLong();
69-
double throughput = MetricUtils.bytesPerSec(responseBytesRead, responseReadStart, now);
7070

71-
attemptMetricCollector.reportMetric(CoreMetric.READ_THROUGHPUT, throughput);
71+
long responseBytesRead = MetricUtils.apiCallAttemptResponseBytesRead(context).getAsLong();
72+
double readThroughput = MetricUtils.bytesPerSec(responseBytesRead, responseReadStart, now);
73+
attemptMetricCollector.reportMetric(CoreMetric.READ_THROUGHPUT, readThroughput);
74+
75+
reportWriteThroughput(context, attemptMetricCollector);
76+
}
77+
78+
private void reportWriteThroughput(RequestExecutionContext context, MetricCollector attemptMetricCollector) {
79+
RequestBodyMetrics metrics = context.executionAttributes()
80+
.getAttribute(InternalCoreExecutionAttribute.REQUEST_BODY_METRICS);
81+
if (metrics == null) {
82+
return;
83+
}
84+
long bytesWritten = metrics.bytesWritten().get();
85+
long firstByteTime = metrics.firstByteWrittenNanoTime().get();
86+
if (bytesWritten > 0 && firstByteTime > 0) {
87+
long lastByteTime = metrics.lastByteWrittenNanoTime().get();
88+
double writeThroughput = MetricUtils.bytesPerSec(bytesWritten, firstByteTime, lastByteTime);
89+
attemptMetricCollector.reportMetric(CoreMetric.WRITE_THROUGHPUT, writeThroughput);
90+
}
7291
}
7392

7493
private SdkHttpFullResponse trackBytesRead(SdkHttpFullResponse httpFullResponse, RequestExecutionContext context) {

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import software.amazon.awssdk.core.exception.SdkClientException;
3939
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
4040
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
41+
import software.amazon.awssdk.core.internal.InternalCoreExecutionAttribute;
4142
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
4243
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
4344
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
@@ -47,6 +48,8 @@
4748
import software.amazon.awssdk.core.internal.http.timers.TimeoutTracker;
4849
import software.amazon.awssdk.core.internal.http.timers.TimerUtils;
4950
import software.amazon.awssdk.core.internal.metrics.BytesReadTrackingPublisher;
51+
import software.amazon.awssdk.core.internal.metrics.BytesWrittenTrackingPublisher;
52+
import software.amazon.awssdk.core.internal.metrics.RequestBodyMetrics;
5053
import software.amazon.awssdk.core.internal.util.MetricUtils;
5154
import software.amazon.awssdk.core.metrics.CoreMetric;
5255
import software.amazon.awssdk.http.SdkHttpFullRequest;
@@ -129,9 +132,10 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
129132

130133
CompletableFuture<Response<OutputT>> responseHandlerFuture = responseHandler.prepare();
131134

132-
SdkHttpContentPublisher requestProvider = context.requestProvider() == null
135+
SdkHttpContentPublisher basePublisher = context.requestProvider() == null
133136
? new SimpleHttpContentPublisher(request)
134137
: new SdkHttpContentPublisherAdapter(context.requestProvider());
138+
SdkHttpContentPublisher requestProvider = wrapWithMetricsTracking(basePublisher, context);
135139
// Set content length if it hasn't been set already.
136140
SdkHttpFullRequest requestWithContentLength = getRequestWithContentLength(request, requestProvider);
137141

@@ -219,6 +223,13 @@ private boolean isFullDuplex(ExecutionAttributes executionAttributes) {
219223
executionAttributes.getAttribute(SdkInternalExecutionAttribute.IS_FULL_DUPLEX);
220224
}
221225

226+
private SdkHttpContentPublisher wrapWithMetricsTracking(SdkHttpContentPublisher publisher,
227+
RequestExecutionContext context) {
228+
RequestBodyMetrics metrics = context.executionAttributes()
229+
.getAttribute(InternalCoreExecutionAttribute.REQUEST_BODY_METRICS);
230+
return new TrackingHttpContentPublisher(publisher, metrics);
231+
}
232+
222233
private SdkHttpFullRequest getRequestWithContentLength(SdkHttpFullRequest request, SdkHttpContentPublisher requestProvider) {
223234
if (shouldSetContentLength(request, requestProvider)) {
224235
return request.toBuilder()
@@ -262,7 +273,6 @@ private void completeResponseFuture(CompletableFuture<Response<OutputT>> respons
262273
* {@link SdkHttpContentPublisher} which the HTTP client SPI expects.
263274
*/
264275
private static final class SdkHttpContentPublisherAdapter implements SdkHttpContentPublisher {
265-
266276
private final AsyncRequestBody asyncRequestBody;
267277

268278
private SdkHttpContentPublisherAdapter(AsyncRequestBody asyncRequestBody) {
@@ -280,6 +290,29 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
280290
}
281291
}
282292

293+
/**
294+
* Wraps an {@link SdkHttpContentPublisher} with write throughput tracking.
295+
*/
296+
private static final class TrackingHttpContentPublisher implements SdkHttpContentPublisher {
297+
private final SdkHttpContentPublisher delegate;
298+
private final Publisher<ByteBuffer> trackingPublisher;
299+
300+
private TrackingHttpContentPublisher(SdkHttpContentPublisher delegate, RequestBodyMetrics metrics) {
301+
this.delegate = delegate;
302+
this.trackingPublisher = new BytesWrittenTrackingPublisher(delegate, metrics);
303+
}
304+
305+
@Override
306+
public Optional<Long> contentLength() {
307+
return delegate.contentLength();
308+
}
309+
310+
@Override
311+
public void subscribe(Subscriber<? super ByteBuffer> s) {
312+
trackingPublisher.subscribe(s);
313+
}
314+
}
315+
283316
/**
284317
* Decorator response handler that records response read metrics as well as records other data for computing other read
285318
* metrics at later points.

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeHttpRequestStage.java

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,19 @@
1717

1818
import static software.amazon.awssdk.http.Header.CONTENT_LENGTH;
1919

20+
import java.io.InputStream;
2021
import java.time.Duration;
2122
import java.util.Optional;
2223
import software.amazon.awssdk.annotations.SdkInternalApi;
2324
import software.amazon.awssdk.core.client.config.SdkClientOption;
2425
import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute;
26+
import software.amazon.awssdk.core.internal.InternalCoreExecutionAttribute;
2527
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
2628
import software.amazon.awssdk.core.internal.http.InterruptMonitor;
2729
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
2830
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
31+
import software.amazon.awssdk.core.internal.metrics.BytesWrittenTrackingInputStream;
32+
import software.amazon.awssdk.core.internal.metrics.RequestBodyMetrics;
2933
import software.amazon.awssdk.core.internal.util.MetricUtils;
3034
import software.amazon.awssdk.core.metrics.CoreMetric;
3135
import software.amazon.awssdk.http.ContentStreamProvider;
@@ -72,7 +76,7 @@ private HttpExecuteResponse executeHttpRequest(SdkHttpFullRequest request, Reque
7276

7377
MetricCollector httpMetricCollector = MetricUtils.createHttpMetricsCollector(context);
7478

75-
request = enforceContentLengthIfPresent(request);
79+
request = wrapRequestContentStream(request, context);
7680

7781
ExecutableHttpRequest requestCallable = sdkHttpClient
7882
.prepareRequest(HttpExecuteRequest.builder()
@@ -97,35 +101,40 @@ private HttpExecuteResponse executeHttpRequest(SdkHttpFullRequest request, Reque
97101
return measuredExecute.left();
98102
}
99103

104+
private SdkHttpFullRequest wrapRequestContentStream(SdkHttpFullRequest request, RequestExecutionContext context) {
105+
Optional<ContentStreamProvider> contentStreamProvider = request.contentStreamProvider();
106+
if (!contentStreamProvider.isPresent()) {
107+
return request;
108+
}
109+
110+
RequestBodyMetrics metrics = context.executionAttributes()
111+
.getAttribute(InternalCoreExecutionAttribute.REQUEST_BODY_METRICS);
112+
113+
ContentStreamProvider wrapped = () -> {
114+
InputStream stream = contentStreamProvider.get().newStream();
115+
stream = new BytesWrittenTrackingInputStream(stream, metrics);
116+
117+
Optional<Long> contentLength = contentLength(request);
118+
if (!contentLength.isPresent()) {
119+
LOG.debug(() -> String.format("Request contains a body but does not have a Content-Length header. Not validating "
120+
+ "the amount of data sent to the service: %s", request));
121+
return stream;
122+
}
123+
124+
stream = new LengthAwareInputStream(stream, contentLength.get());
125+
return stream;
126+
};
127+
128+
return request.toBuilder().contentStreamProvider(wrapped).build();
129+
}
130+
100131
private static long updateMetricCollectionAttributes(RequestExecutionContext context) {
101132
long now = System.nanoTime();
102133
context.executionAttributes().putAttribute(SdkInternalExecutionAttribute.API_CALL_ATTEMPT_START_NANO_TIME,
103134
now);
104135
return now;
105136
}
106137

107-
private static SdkHttpFullRequest enforceContentLengthIfPresent(SdkHttpFullRequest request) {
108-
Optional<ContentStreamProvider> requestContentStreamProviderOptional = request.contentStreamProvider();
109-
110-
if (!requestContentStreamProviderOptional.isPresent()) {
111-
return request;
112-
}
113-
114-
Optional<Long> contentLength = contentLength(request);
115-
if (!contentLength.isPresent()) {
116-
LOG.debug(() -> String.format("Request contains a body but does not have a Content-Length header. Not validating "
117-
+ "the amount of data sent to the service: %s", request));
118-
return request;
119-
}
120-
121-
ContentStreamProvider requestContentProvider = requestContentStreamProviderOptional.get();
122-
ContentStreamProvider lengthVerifyingProvider = () -> new LengthAwareInputStream(requestContentProvider.newStream(),
123-
contentLength.get());
124-
return request.toBuilder()
125-
.contentStreamProvider(lengthVerifyingProvider)
126-
.build();
127-
}
128-
129138
private static Optional<Long> contentLength(SdkHttpFullRequest request) {
130139
Optional<String> contentLengthHeader = request.firstMatchingHeader(CONTENT_LENGTH);
131140

0 commit comments

Comments
 (0)