Skip to content

Commit 27fd759

Browse files
committed
feat(bigquery): add response body tracking for compressed response
1 parent 7d21cf4 commit 27fd759

File tree

3 files changed

+123
-30
lines changed

3 files changed

+123
-30
lines changed
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigquery.telemetry;
18+
19+
import static com.google.cloud.bigquery.telemetry.HttpTracingRequestInitializer.HTTP_RESPONSE_BODY_SIZE;
20+
21+
import com.google.api.client.http.LowLevelHttpResponse;
22+
import com.google.common.io.CountingInputStream;
23+
import io.opentelemetry.api.trace.Span;
24+
import java.io.FilterInputStream;
25+
import java.io.IOException;
26+
import java.io.InputStream;
27+
28+
/**
29+
* Class that wraps LowLevelHttpResponse to be able to inject an InputStream wrapper that allows us
30+
* to track response size in trace spans.
31+
*/
32+
class HttpTracingLowLevelHttpResponse extends LowLevelHttpResponse {
33+
private final LowLevelHttpResponse delegate;
34+
private InputStream countingInputStream;
35+
private final Span span;
36+
37+
public HttpTracingLowLevelHttpResponse(LowLevelHttpResponse delegate, Span span) {
38+
this.delegate = delegate;
39+
this.span = span;
40+
}
41+
42+
@Override
43+
public InputStream getContent() throws IOException {
44+
if (countingInputStream == null) {
45+
if (span != null && span.getSpanContext().isValid()) {
46+
countingInputStream = getWrappedInputStream(span, delegate.getContent());
47+
}
48+
}
49+
return countingInputStream;
50+
}
51+
52+
@Override
53+
public String getContentEncoding() throws IOException {
54+
return delegate.getContentEncoding();
55+
}
56+
57+
@Override
58+
public long getContentLength() throws IOException {
59+
return delegate.getContentLength();
60+
}
61+
62+
@Override
63+
public String getContentType() throws IOException {
64+
return delegate.getContentType();
65+
}
66+
67+
@Override
68+
public String getStatusLine() throws IOException {
69+
return delegate.getStatusLine();
70+
}
71+
72+
@Override
73+
public int getStatusCode() throws IOException {
74+
return delegate.getStatusCode();
75+
}
76+
77+
@Override
78+
public String getReasonPhrase() throws IOException {
79+
return delegate.getReasonPhrase();
80+
}
81+
82+
@Override
83+
public int getHeaderCount() throws IOException {
84+
return delegate.getHeaderCount();
85+
}
86+
87+
@Override
88+
public String getHeaderName(int index) throws IOException {
89+
return delegate.getHeaderName(index);
90+
}
91+
92+
@Override
93+
public String getHeaderValue(int index) throws IOException {
94+
return delegate.getHeaderValue(index);
95+
}
96+
97+
@Override
98+
public void disconnect() throws IOException {
99+
delegate.disconnect();
100+
}
101+
102+
private InputStream getWrappedInputStream(Span span, InputStream content) {
103+
CountingInputStream counter = new CountingInputStream(content);
104+
return new FilterInputStream(counter) {
105+
@Override
106+
public void close() throws IOException {
107+
super.close();
108+
span.setAttribute(HTTP_RESPONSE_BODY_SIZE, counter.getCount());
109+
}
110+
};
111+
}
112+
}

java-bigquery/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializer.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,11 @@
2020
import com.google.api.core.BetaApi;
2121
import com.google.api.core.InternalApi;
2222
import com.google.common.annotations.VisibleForTesting;
23-
import com.google.common.io.CountingInputStream;
2423
import io.opentelemetry.api.common.AttributeKey;
2524
import io.opentelemetry.api.trace.Span;
2625
import io.opentelemetry.api.trace.Tracer;
27-
import java.io.FilterInputStream;
2826
import java.io.IOException;
29-
import java.io.InputStream;
3027
import java.lang.reflect.Field;
31-
import org.jspecify.annotations.NonNull;
3228

3329
/**
3430
* HttpRequestInitializer that wraps a delegate initializer, intercepts all HTTP requests, adds
@@ -151,36 +147,23 @@ static void addResponseBodySizeToSpan(HttpResponse response, Span span) {
151147
}
152148

153149
/**
154-
* Wraps the response's input stream with a CountingInputStream to track bytes read. This handles
155-
* compressed transfer encoding where Content-Length is not available.
150+
* Wraps the LowLevelHttpResponse with a HttpTracingLowLevelHttpResponse to give us access to get
151+
* response body size for compressed responses.
156152
*/
157153
private static void getResponseBodySizeForCompressedResponse(HttpResponse response, Span span) {
158154
try {
159-
InputStream content = response.getContent();
160-
if (content == null) {
161-
return;
162-
}
163-
164-
InputStream wrappedStream = getWrappedInputStream(span, content);
165-
Field contentField = HttpResponse.class.getDeclaredField("content");
166-
contentField.setAccessible(true);
167-
contentField.set(response, wrappedStream);
168-
} catch (Exception e) {
155+
Field lowlevelField = HttpResponse.class.getDeclaredField("response");
156+
lowlevelField.setAccessible(true);
157+
LowLevelHttpResponse lowLevelHttpResponse =
158+
(LowLevelHttpResponse) lowlevelField.get(response);
159+
HttpTracingLowLevelHttpResponse wrappedResponse =
160+
new HttpTracingLowLevelHttpResponse(lowLevelHttpResponse, span);
161+
lowlevelField.set(response, wrappedResponse);
162+
} catch (NoSuchFieldException | SecurityException | IllegalAccessException e) {
169163
// Ignore - stream wrapping failed, we will not track response size
170164
}
171165
}
172166

173-
private static @NonNull InputStream getWrappedInputStream(Span span, InputStream content) {
174-
CountingInputStream counter = new CountingInputStream(content);
175-
return new FilterInputStream(counter) {
176-
@Override
177-
public void close() throws IOException {
178-
super.close();
179-
span.setAttribute(HTTP_RESPONSE_BODY_SIZE, counter.getCount());
180-
}
181-
};
182-
}
183-
184167
/** Removes credentials from URL. */
185168
private static String getSanitizedUrl(HttpRequest request) {
186169
GenericUrl clone = request.getUrl().clone();

java-bigquery/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/telemetry/HttpTracingRequestInitializerTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,11 @@
5353
import io.opentelemetry.sdk.trace.SdkTracerProvider;
5454
import io.opentelemetry.sdk.trace.data.SpanData;
5555
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
56-
5756
import java.io.ByteArrayOutputStream;
5857
import java.io.IOException;
5958
import java.nio.charset.StandardCharsets;
6059
import java.util.List;
6160
import java.util.zip.GZIPOutputStream;
62-
6361
import org.junit.jupiter.api.BeforeEach;
6462
import org.junit.jupiter.api.Test;
6563

@@ -475,7 +473,7 @@ public LowLevelHttpResponse execute() {
475473
SpanData span = spans.get(0);
476474

477475
assertEquals(
478-
(long) responseBody.length(),
476+
compressedData.length,
479477
span.getAttributes().get(HttpTracingRequestInitializer.HTTP_RESPONSE_BODY_SIZE));
480478
}
481479
}

0 commit comments

Comments
 (0)