diff --git a/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java b/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java index d34409d44028..3ff398e2eef0 100644 --- a/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java +++ b/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java @@ -20,10 +20,15 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.CountingInputStream; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; +import java.io.FilterInputStream; import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; +import org.jspecify.annotations.NonNull; /** * HttpRequestInitializer that wraps a delegate initializer, intercepts all HTTP requests, adds @@ -50,6 +55,7 @@ public class HttpTracingRequestInitializer implements HttpRequestInitializer { AttributeKey.longKey("http.response.body.size"); @VisibleForTesting static final String HTTP_RPC_SYSTEM_NAME = "http"; + @VisibleForTesting static final String GZIP_ENCODING = "gzip"; private static final java.util.Set REDACTED_QUERY_PARAMETERS = com.google.common.collect.ImmutableSet.of( @@ -137,7 +143,42 @@ static void addResponseBodySizeToSpan(HttpResponse response, Span span) { if (contentLength != null && contentLength > 0) { span.setAttribute(HTTP_RESPONSE_BODY_SIZE, contentLength); } - // TODO handle chunked responses + // For compressed responses without Content-Length, we need to wrap the response to get the + // actual size + if (GZIP_ENCODING.equals(response.getContentEncoding())) { + getResponseBodySizeForCompressedResponse(response, span); + } + } + + /** + * Wraps the response's input stream with a CountingInputStream to track bytes read. This handles + * compressed transfer encoding where Content-Length is not available. + */ + private static void getResponseBodySizeForCompressedResponse(HttpResponse response, Span span) { + try { + InputStream content = response.getContent(); + if (content == null) { + return; + } + + InputStream wrappedStream = getWrappedInputStream(span, content); + Field contentField = HttpResponse.class.getDeclaredField("content"); + contentField.setAccessible(true); + contentField.set(response, wrappedStream); + } catch (Exception e) { + // Ignore - stream wrapping failed, we will not track response size + } + } + + private static @NonNull InputStream getWrappedInputStream(Span span, InputStream content) { + CountingInputStream counter = new CountingInputStream(content); + return new FilterInputStream(counter) { + @Override + public void close() throws IOException { + super.close(); + span.setAttribute(HTTP_RESPONSE_BODY_SIZE, counter.getCount()); + } + }; } /** Removes credentials from URL. */ diff --git a/java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java b/java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java index 0bf0012be30e..c0d8033ab6eb 100644 --- a/java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java +++ b/java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java @@ -53,8 +53,13 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; + +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.zip.GZIPOutputStream; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -430,4 +435,47 @@ private void closeAndVerifySpanData( assertNull(span.getAttributes().get(HttpTracingRequestInitializer.HTTP_RESPONSE_BODY_SIZE)); } } + + @Test + public void testCompressedResponseBodySizeTracking() throws IOException { + String responseBody = "chunked response data"; + ByteArrayOutputStream responseStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(responseStream)) { + gzipOut.write(responseBody.getBytes(StandardCharsets.UTF_8)); + } + byte[] compressedData = responseStream.toByteArray(); + + HttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() { + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + response.setContentEncoding(HttpTracingRequestInitializer.GZIP_ENCODING); + response.setStatusCode(200); + response.setContent(new java.io.ByteArrayInputStream(compressedData)); + return response; + } + }; + } + }; + + HttpRequest request = buildGetRequest(transport, initializer, BASE_URL); + HttpResponse response = request.execute(); + // Read the entire response body (this triggers the counting) + response.parseAsString(); + + response.disconnect(); + spanScope.close(); + parentSpan.end(); + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + SpanData span = spans.get(0); + + assertEquals( + (long) responseBody.length(), + span.getAttributes().get(HttpTracingRequestInitializer.HTTP_RESPONSE_BODY_SIZE)); + } }