Skip to content

Commit 7d21cf4

Browse files
committed
feat(bigquery): add response body tracking for compressed response
1 parent 39133b5 commit 7d21cf4

File tree

2 files changed

+90
-1
lines changed

2 files changed

+90
-1
lines changed

java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@
2020
import com.google.api.core.BetaApi;
2121
import com.google.api.core.InternalApi;
2222
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.io.CountingInputStream;
2324
import io.opentelemetry.api.common.AttributeKey;
2425
import io.opentelemetry.api.trace.Span;
2526
import io.opentelemetry.api.trace.Tracer;
27+
import java.io.FilterInputStream;
2628
import java.io.IOException;
29+
import java.io.InputStream;
30+
import java.lang.reflect.Field;
31+
import org.jspecify.annotations.NonNull;
2732

2833
/**
2934
* HttpRequestInitializer that wraps a delegate initializer, intercepts all HTTP requests, adds
@@ -50,6 +55,7 @@ public class HttpTracingRequestInitializer implements HttpRequestInitializer {
5055
AttributeKey.longKey("http.response.body.size");
5156

5257
@VisibleForTesting static final String HTTP_RPC_SYSTEM_NAME = "http";
58+
@VisibleForTesting static final String GZIP_ENCODING = "gzip";
5359

5460
private static final java.util.Set<String> REDACTED_QUERY_PARAMETERS =
5561
com.google.common.collect.ImmutableSet.of(
@@ -137,7 +143,42 @@ static void addResponseBodySizeToSpan(HttpResponse response, Span span) {
137143
if (contentLength != null && contentLength > 0) {
138144
span.setAttribute(HTTP_RESPONSE_BODY_SIZE, contentLength);
139145
}
140-
// TODO handle chunked responses
146+
// For compressed responses without Content-Length, we need to wrap the response to get the
147+
// actual size
148+
if (GZIP_ENCODING.equals(response.getContentEncoding())) {
149+
getResponseBodySizeForCompressedResponse(response, span);
150+
}
151+
}
152+
153+
/**
154+
* Wraps the response's input stream with a CountingInputStream to track bytes read. This handles
155+
* compressed transfer encoding where Content-Length is not available.
156+
*/
157+
private static void getResponseBodySizeForCompressedResponse(HttpResponse response, Span span) {
158+
try {
159+
InputStream content = response.getContent();
160+
if (content == null) {
161+
return;
162+
}
163+
164+
InputStream wrappedStream = getWrappedInputStream(span, content);
165+
Field contentField = HttpResponse.class.getDeclaredField("content");
166+
contentField.setAccessible(true);
167+
contentField.set(response, wrappedStream);
168+
} catch (Exception e) {
169+
// Ignore - stream wrapping failed, we will not track response size
170+
}
171+
}
172+
173+
private static @NonNull InputStream getWrappedInputStream(Span span, InputStream content) {
174+
CountingInputStream counter = new CountingInputStream(content);
175+
return new FilterInputStream(counter) {
176+
@Override
177+
public void close() throws IOException {
178+
super.close();
179+
span.setAttribute(HTTP_RESPONSE_BODY_SIZE, counter.getCount());
180+
}
181+
};
141182
}
142183

143184
/** Removes credentials from URL. */

java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,13 @@
5353
import io.opentelemetry.sdk.trace.SdkTracerProvider;
5454
import io.opentelemetry.sdk.trace.data.SpanData;
5555
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
56+
57+
import java.io.ByteArrayOutputStream;
5658
import java.io.IOException;
59+
import java.nio.charset.StandardCharsets;
5760
import java.util.List;
61+
import java.util.zip.GZIPOutputStream;
62+
5863
import org.junit.jupiter.api.BeforeEach;
5964
import org.junit.jupiter.api.Test;
6065

@@ -430,4 +435,47 @@ private void closeAndVerifySpanData(
430435
assertNull(span.getAttributes().get(HttpTracingRequestInitializer.HTTP_RESPONSE_BODY_SIZE));
431436
}
432437
}
438+
439+
@Test
440+
public void testCompressedResponseBodySizeTracking() throws IOException {
441+
String responseBody = "chunked response data";
442+
ByteArrayOutputStream responseStream = new ByteArrayOutputStream();
443+
try (GZIPOutputStream gzipOut = new GZIPOutputStream(responseStream)) {
444+
gzipOut.write(responseBody.getBytes(StandardCharsets.UTF_8));
445+
}
446+
byte[] compressedData = responseStream.toByteArray();
447+
448+
HttpTransport transport =
449+
new MockHttpTransport() {
450+
@Override
451+
public LowLevelHttpRequest buildRequest(String method, String url) {
452+
return new MockLowLevelHttpRequest() {
453+
@Override
454+
public LowLevelHttpResponse execute() {
455+
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
456+
response.setContentEncoding(HttpTracingRequestInitializer.GZIP_ENCODING);
457+
response.setStatusCode(200);
458+
response.setContent(new java.io.ByteArrayInputStream(compressedData));
459+
return response;
460+
}
461+
};
462+
}
463+
};
464+
465+
HttpRequest request = buildGetRequest(transport, initializer, BASE_URL);
466+
HttpResponse response = request.execute();
467+
// Read the entire response body (this triggers the counting)
468+
response.parseAsString();
469+
470+
response.disconnect();
471+
spanScope.close();
472+
parentSpan.end();
473+
List<SpanData> spans = spanExporter.getFinishedSpanItems();
474+
assertEquals(1, spans.size());
475+
SpanData span = spans.get(0);
476+
477+
assertEquals(
478+
(long) responseBody.length(),
479+
span.getAttributes().get(HttpTracingRequestInitializer.HTTP_RESPONSE_BODY_SIZE));
480+
}
433481
}

0 commit comments

Comments
 (0)