Skip to content

Commit 0bf1bdb

Browse files
committed
Update doc and update async code path to report writeThroughput for non-streaming operations as well
1 parent bfa4b47 commit 0bf1bdb

4 files changed

Lines changed: 77 additions & 14 deletions

File tree

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,10 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
132132

133133
CompletableFuture<Response<OutputT>> responseHandlerFuture = responseHandler.prepare();
134134

135-
SdkHttpContentPublisher requestProvider = context.requestProvider() == null
135+
SdkHttpContentPublisher basePublisher = context.requestProvider() == null
136136
? new SimpleHttpContentPublisher(request)
137-
: createTrackingContentPublisher(context);
137+
: new SdkHttpContentPublisherAdapter(context.requestProvider());
138+
SdkHttpContentPublisher requestProvider = wrapWithMetricsTracking(basePublisher, context);
138139
// Set content length if it hasn't been set already.
139140
SdkHttpFullRequest requestWithContentLength = getRequestWithContentLength(request, requestProvider);
140141

@@ -222,10 +223,11 @@ private boolean isFullDuplex(ExecutionAttributes executionAttributes) {
222223
executionAttributes.getAttribute(SdkInternalExecutionAttribute.IS_FULL_DUPLEX);
223224
}
224225

225-
private SdkHttpContentPublisher createTrackingContentPublisher(RequestExecutionContext context) {
226+
private SdkHttpContentPublisher wrapWithMetricsTracking(SdkHttpContentPublisher publisher,
227+
RequestExecutionContext context) {
226228
RequestBodyMetrics metrics = context.executionAttributes()
227229
.getAttribute(InternalCoreExecutionAttribute.REQUEST_BODY_METRICS);
228-
return new SdkHttpContentPublisherAdapter(context.requestProvider(), metrics);
230+
return new TrackingHttpContentPublisher(publisher, metrics);
229231
}
230232

231233
private SdkHttpFullRequest getRequestWithContentLength(SdkHttpFullRequest request, SdkHttpContentPublisher requestProvider) {
@@ -271,20 +273,40 @@ private void completeResponseFuture(CompletableFuture<Response<OutputT>> respons
271273
* {@link SdkHttpContentPublisher} which the HTTP client SPI expects.
272274
*/
273275
private static final class SdkHttpContentPublisherAdapter implements SdkHttpContentPublisher {
274-
275276
private final AsyncRequestBody asyncRequestBody;
276-
private final Publisher<ByteBuffer> trackingPublisher;
277277

278-
private SdkHttpContentPublisherAdapter(AsyncRequestBody asyncRequestBody, RequestBodyMetrics metrics) {
278+
private SdkHttpContentPublisherAdapter(AsyncRequestBody asyncRequestBody) {
279279
this.asyncRequestBody = asyncRequestBody;
280-
this.trackingPublisher = new BytesWrittenTrackingPublisher(asyncRequestBody, metrics);
281280
}
282281

283282
@Override
284283
public Optional<Long> contentLength() {
285284
return asyncRequestBody.contentLength();
286285
}
287286

287+
@Override
288+
public void subscribe(Subscriber<? super ByteBuffer> s) {
289+
asyncRequestBody.subscribe(s);
290+
}
291+
}
292+
293+
/**
294+
* Wraps an {@link SdkHttpContentPublisher} with write throughput tracking.
295+
*/
296+
private static final class TrackingHttpContentPublisher implements SdkHttpContentPublisher {
297+
private final SdkHttpContentPublisher delegate;
298+
private final Publisher<ByteBuffer> trackingPublisher;
299+
300+
private TrackingHttpContentPublisher(SdkHttpContentPublisher delegate, RequestBodyMetrics metrics) {
301+
this.delegate = delegate;
302+
this.trackingPublisher = new BytesWrittenTrackingPublisher(delegate, metrics);
303+
}
304+
305+
@Override
306+
public Optional<Long> contentLength() {
307+
return delegate.contentLength();
308+
}
309+
288310
@Override
289311
public void subscribe(Subscriber<? super ByteBuffer> s) {
290312
trackingPublisher.subscribe(s);

core/sdk-core/src/main/java/software/amazon/awssdk/core/metrics/CoreMetric.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,17 @@ public final class CoreMetric {
158158

159159
/**
160160
* The write throughput of the client, defined as
161-
* {@code RequestBytesWritten / (LastByteWrittenTime - FirstByteWrittenTime)},
162-
* where FirstByteWrittenTime is when the first byte is read from the request body and LastByteWrittenTime is when
163-
* the last byte is read. This value is in bytes per second.
161+
* {@code RequestBytesWritten / (LastByteWrittenTime - FirstByteWrittenTime)}.
162+
* This value is in bytes per second.
164163
* <p>
165-
* This metric measures the rate at which bytes are read from the request body stream. It excludes connection setup,
166-
* TLS handshake time, and server processing time.
164+
* This metric measures the rate at which the SDK provides the request body to the HTTP client.
165+
* It excludes connection setup, TLS handshake time, and server processing time.
167166
* <p>
168-
* Note: This metric does not account for buffering in the HTTP client layer. The actual network transmission rate may
167+
* Note: This metric only measures the request body, not HTTP headers. For requests with small
168+
* payloads where the body size is comparable to the headers size, this metric may not accurately
169+
* reflect overall network throughput.
170+
* <p>
171+
* Note: This metric does not account for buffering in the HTTP client layer. The actual network transmission rate may
169172
* be lower if the HTTP client buffers data before sending. This metric represents an upper bound of the network
170173
* throughput.
171174
* <p>

test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/metrics/AsyncWriteThroughputMetricTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,23 @@ public void operationWithNoInputOrOutput_noRequestBody_writeThroughputNotReporte
102102
List<Double> writeThroughputValues = attemptMetrics.get(0).metricValues(CoreMetric.WRITE_THROUGHPUT);
103103
assertThat(writeThroughputValues).isEmpty();
104104
}
105+
106+
@Test
107+
public void nonStreamingOperation_withRequestBody_writeThroughputReported() {
108+
stubFor(post(anyUrl())
109+
.willReturn(aResponse().withStatus(200).withBody("{}")));
110+
111+
client.allTypes(r -> r.stringMember("test")).join();
112+
113+
ArgumentCaptor<MetricCollection> collectionCaptor = ArgumentCaptor.forClass(MetricCollection.class);
114+
verify(mockPublisher).publish(collectionCaptor.capture());
115+
116+
MetricCollection capturedCollection = collectionCaptor.getValue();
117+
List<MetricCollection> attemptMetrics = capturedCollection.children();
118+
119+
assertThat(attemptMetrics).hasSize(1);
120+
List<Double> writeThroughputValues = attemptMetrics.get(0).metricValues(CoreMetric.WRITE_THROUGHPUT);
121+
assertThat(writeThroughputValues).hasSize(1);
122+
assertThat(writeThroughputValues.get(0)).isGreaterThan(0);
123+
}
105124
}

test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/metrics/SyncWriteThroughputMetricTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,23 @@ public void operationWithNoInputOrOutput_noRequestBody_writeThroughputNotReporte
102102
List<Double> writeThroughputValues = attemptMetrics.get(0).metricValues(CoreMetric.WRITE_THROUGHPUT);
103103
assertThat(writeThroughputValues).isEmpty();
104104
}
105+
106+
@Test
107+
public void nonStreamingOperation_withRequestBody_writeThroughputReported() {
108+
stubFor(post(anyUrl())
109+
.willReturn(aResponse().withStatus(200).withBody("{}")));
110+
111+
client.allTypes(r -> r.stringMember("test"));
112+
113+
ArgumentCaptor<MetricCollection> collectionCaptor = ArgumentCaptor.forClass(MetricCollection.class);
114+
verify(mockPublisher).publish(collectionCaptor.capture());
115+
116+
MetricCollection capturedCollection = collectionCaptor.getValue();
117+
List<MetricCollection> attemptMetrics = capturedCollection.children();
118+
119+
assertThat(attemptMetrics).hasSize(1);
120+
List<Double> writeThroughputValues = attemptMetrics.get(0).metricValues(CoreMetric.WRITE_THROUGHPUT);
121+
assertThat(writeThroughputValues).hasSize(1);
122+
assertThat(writeThroughputValues.get(0)).isGreaterThan(0);
123+
}
105124
}

0 commit comments

Comments
 (0)