diff --git a/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingLowLevelHttpResponse.java b/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingLowLevelHttpResponse.java new file mode 100644 index 000000000000..357ff99475ec --- /dev/null +++ b/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingLowLevelHttpResponse.java @@ -0,0 +1,115 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.telemetry; + +import static com.google.cloud.bigquery.telemetry.HttpTracingRequestInitializer.HTTP_RESPONSE_BODY_SIZE; + +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.common.io.CountingInputStream; +import io.opentelemetry.api.trace.Span; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Class that wraps a LowLevelHttpResponse to be able to inject wrapper on the delegate InputStream + * that allows us to capture response size for telemetry purposes. + */ +class HttpTracingLowLevelHttpResponse extends LowLevelHttpResponse { + private final LowLevelHttpResponse delegate; + private InputStream countingInputStream; + private final Span span; + + public HttpTracingLowLevelHttpResponse(LowLevelHttpResponse delegate, Span span) { + this.delegate = delegate; + this.span = span; + } + + @Override + public InputStream getContent() throws IOException { + if (countingInputStream == null) { + InputStream originalContent = delegate.getContent(); + if (originalContent != null && span != null && span.getSpanContext().isValid()) { + countingInputStream = getWrappedInputStream(span, originalContent); + } else { + countingInputStream = originalContent; + } + } + return countingInputStream; + } + + @Override + public String getContentEncoding() throws IOException { + return delegate.getContentEncoding(); + } + + @Override + public long getContentLength() throws IOException { + return delegate.getContentLength(); + } + + @Override + public String getContentType() throws IOException { + return delegate.getContentType(); + } + + @Override + public String getStatusLine() throws IOException { + return delegate.getStatusLine(); + } + + @Override + public int getStatusCode() throws IOException { + return delegate.getStatusCode(); + } + + @Override + public String getReasonPhrase() throws IOException { + return delegate.getReasonPhrase(); + } + + @Override + public int getHeaderCount() throws IOException { + return delegate.getHeaderCount(); + } + + @Override + public String getHeaderName(int index) throws IOException { + return delegate.getHeaderName(index); + } + + @Override + public String getHeaderValue(int index) throws IOException { + return delegate.getHeaderValue(index); + } + + @Override + public void disconnect() throws IOException { + delegate.disconnect(); + } + + private InputStream getWrappedInputStream(Span span, InputStream content) { + CountingInputStream counter = new CountingInputStream(content); + return new FilterInputStream(counter) { + @Override + public void close() throws IOException { + super.close(); + span.setAttribute(HTTP_RESPONSE_BODY_SIZE, counter.getCount()); + } + }; + } +} diff --git a/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java b/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java index d34409d44028..3f2a5f208e56 100644 --- a/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java +++ b/java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java @@ -24,6 +24,7 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import java.io.IOException; +import java.lang.reflect.Field; /** * HttpRequestInitializer that wraps a delegate initializer, intercepts all HTTP requests, adds @@ -50,6 +51,7 @@ public class HttpTracingRequestInitializer implements HttpRequestInitializer { AttributeKey.longKey("http.response.body.size"); @VisibleForTesting static final String HTTP_RPC_SYSTEM_NAME = "http"; + @VisibleForTesting static final String GZIP_ENCODING = "gzip"; private static final java.util.Set REDACTED_QUERY_PARAMETERS = com.google.common.collect.ImmutableSet.of( @@ -137,7 +139,29 @@ static void addResponseBodySizeToSpan(HttpResponse response, Span span) { if (contentLength != null && contentLength > 0) { span.setAttribute(HTTP_RESPONSE_BODY_SIZE, contentLength); } - // TODO handle chunked responses + // For compressed responses without Content-Length, we need to wrap the response to get the + // actual size + if (GZIP_ENCODING.equals(response.getContentEncoding())) { + getResponseBodySizeForCompressedResponse(response, span); + } + } + + /** + * Wraps the LowLevelHttpResponse with a HttpTracingLowLevelHttpResponse to give us access to get + * response body size for compressed responses. + */ + private static void getResponseBodySizeForCompressedResponse(HttpResponse response, Span span) { + try { + Field lowlevelField = HttpResponse.class.getDeclaredField("response"); + lowlevelField.setAccessible(true); + LowLevelHttpResponse lowLevelHttpResponse = + (LowLevelHttpResponse) lowlevelField.get(response); + HttpTracingLowLevelHttpResponse wrappedResponse = + new HttpTracingLowLevelHttpResponse(lowLevelHttpResponse, span); + lowlevelField.set(response, wrappedResponse); + } catch (NoSuchFieldException | SecurityException | IllegalAccessException e) { + // Ignore - stream wrapping failed, we will not track response size + } } /** Removes credentials from URL. */ diff --git a/java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java b/java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java index 0bf0012be30e..4f6673f4428e 100644 --- a/java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java +++ b/java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java @@ -53,8 +53,11 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.zip.GZIPOutputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -430,4 +433,47 @@ private void closeAndVerifySpanData( assertNull(span.getAttributes().get(HttpTracingRequestInitializer.HTTP_RESPONSE_BODY_SIZE)); } } + + @Test + public void testCompressedResponseBodySizeTracking() throws IOException { + String responseBody = "chunked response data"; + ByteArrayOutputStream responseStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(responseStream)) { + gzipOut.write(responseBody.getBytes(StandardCharsets.UTF_8)); + } + byte[] compressedData = responseStream.toByteArray(); + + HttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() { + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + response.setContentEncoding(HttpTracingRequestInitializer.GZIP_ENCODING); + response.setStatusCode(200); + response.setContent(new java.io.ByteArrayInputStream(compressedData)); + return response; + } + }; + } + }; + + HttpRequest request = buildGetRequest(transport, initializer, BASE_URL); + HttpResponse response = request.execute(); + // Read the entire response body (this triggers the counting) + response.parseAsString(); + + response.disconnect(); + spanScope.close(); + parentSpan.end(); + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + SpanData span = spans.get(0); + + assertEquals( + compressedData.length, + span.getAttributes().get(HttpTracingRequestInitializer.HTTP_RESPONSE_BODY_SIZE)); + } }