Skip to content

Commit 1ffc421

Browse files
committed
chore: address pr feedback and refactor
1 parent 3e8ca63 commit 1ffc421

3 files changed

Lines changed: 99 additions & 113 deletions

File tree

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
4343
import com.google.cloud.http.HttpTransportOptions;
4444
import com.google.cloud.logging.Logging;
45+
import com.google.common.annotations.VisibleForTesting;
4546
import com.google.common.collect.ImmutableSortedSet;
4647
import io.opentelemetry.api.OpenTelemetry;
4748
import io.opentelemetry.api.baggage.Baggage;
@@ -438,6 +439,7 @@ String getConnectionUrl() {
438439
return connectionUrl;
439440
}
440441

442+
@VisibleForTesting
441443
public String getConnectionId() {
442444
return this.connectionId;
443445
}

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -912,8 +912,7 @@ private void processArrowStream(
912912
enqueueError(arrowBatchWrapperBlockingQueue, e);
913913
Thread.currentThread().interrupt();
914914
} catch (Exception e) {
915-
if (e.getCause() instanceof InterruptedException
916-
|| Thread.currentThread().isInterrupted()) {
915+
if (e.getCause() instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
917916
LOG.log(
918917
Level.WARNING,
919918
"\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor",
@@ -1684,8 +1683,7 @@ private void parseAndPopulateRpcData(
16841683
}
16851684

16861685
} catch (Exception ex) {
1687-
if (ex.getCause() instanceof InterruptedException
1688-
|| Thread.currentThread().isInterrupted()) {
1686+
if (ex.getCause() instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
16891687
LOG.log(
16901688
Level.WARNING,
16911689
"\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync",

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/it/ITOpenTelemetryTest.java

Lines changed: 95 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,26 @@
1919
import static org.junit.jupiter.api.Assertions.assertEquals;
2020
import static org.junit.jupiter.api.Assertions.assertFalse;
2121
import static org.junit.jupiter.api.Assertions.assertNotNull;
22+
import static org.junit.jupiter.api.Assertions.assertThrows;
2223
import static org.junit.jupiter.api.Assertions.assertTrue;
2324
import static org.junit.jupiter.api.Assumptions.assumeTrue;
2425

2526
import com.google.api.gax.paging.Page;
2627
import com.google.cloud.ServiceOptions;
2728
import com.google.cloud.bigquery.jdbc.BigQueryConnection;
29+
import com.google.cloud.bigquery.jdbc.DataSource;
2830
import com.google.cloud.logging.LogEntry;
2931
import com.google.cloud.logging.Logging;
3032
import com.google.cloud.logging.LoggingOptions;
3133
import com.google.cloud.trace.v1.TraceServiceClient;
3234
import com.google.devtools.cloudtrace.v1.Trace;
3335
import com.google.devtools.cloudtrace.v1.TraceSpan;
3436
import java.sql.Connection;
35-
import java.sql.DriverManager;
3637
import java.sql.ResultSet;
3738
import java.sql.SQLException;
3839
import java.sql.Statement;
3940
import java.util.ArrayList;
4041
import java.util.List;
41-
import java.util.Properties;
4242
import org.junit.jupiter.api.Test;
4343

4444
public class ITOpenTelemetryTest {
@@ -49,112 +49,89 @@ public class ITOpenTelemetryTest {
4949
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=%s;OAuthType=3;Timeout=3600;",
5050
PROJECT_ID);
5151

52+
private static class TelemetryContext {
53+
final String traceId;
54+
final String spanId;
55+
56+
TelemetryContext(String traceId, String spanId) {
57+
this.traceId = traceId;
58+
this.spanId = spanId;
59+
}
60+
}
61+
5262
@Test
5363
public void testExecute_withOpenTelemetryGcpExporter() throws Exception {
5464
assumeTrue(
5565
PROJECT_ID != null && !PROJECT_ID.trim().isEmpty(),
5666
"Skipping OTel E2E tests because no default Project ID is configured.");
5767

58-
// Step 1: Connect with GCP Exporters enabled
59-
Properties props = new Properties();
60-
props.setProperty("enableGcpTraceExporter", "true");
61-
props.setProperty("enableGcpLogExporter", "true");
62-
props.setProperty("LogLevel", "3"); // Triggers FINE log generation
63-
props.setProperty("gcpTelemetryProjectId", PROJECT_ID);
64-
props.setProperty("EnableHighThroughputAPI", "0");
65-
props.setProperty("MaxResults", "50"); // Forces small page size (50) to trigger pagination
68+
// Step 1: Connect with GCP Exporters enabled via DataSource
69+
DataSource ds = DataSource.fromUrl(CONNECTION_URL);
70+
ds.setEnableGcpTraceExporter(true);
71+
ds.setEnableGcpLogExporter(true);
72+
ds.setLogLevel("5"); // Triggers FINE log generation
73+
ds.setGcpTelemetryProjectId(PROJECT_ID);
74+
ds.setEnableHighThroughputAPI(false);
75+
ds.setMaxResults(50L); // Forces small page size (50) to trigger pagination
6676

6777
String connectionUuid = null;
6878

69-
try (Connection connection = DriverManager.getConnection(CONNECTION_URL, props);
79+
try (Connection connection = ds.getConnection();
7080
Statement statement = connection.createStatement()) {
7181

7282
// Retrieve the Connection UUID programmatically
7383
BigQueryConnection bqConnection = connection.unwrap(BigQueryConnection.class);
7484
connectionUuid = bqConnection.getConnectionId();
7585
assertNotNull(connectionUuid, "Connection UUID should be generated");
7686

77-
// Execute an in-memory array query (scans 0 bytes, extremely fast) and force pagination
78-
String paginationQuery = "SELECT * FROM UNNEST(GENERATE_ARRAY(1, 1000)) AS id;";
87+
// Execute an in-memory array query (scans 0 bytes, extremely fast) and force pagination (3
88+
// pages)
89+
String paginationQuery = "SELECT * FROM UNNEST(GENERATE_ARRAY(1, 150)) AS id;";
7990
try (ResultSet paginatedRs = statement.executeQuery(paginationQuery)) {
8091
int rowCount = 0;
81-
while (paginatedRs.next() && rowCount < 1000) {
92+
while (paginatedRs.next() && rowCount < 150) {
8293
rowCount++;
8394
}
8495
}
8596
}
8697

87-
// Step 2: Retrieve logs from Cloud Logging and extract TraceId
88-
String traceId = null;
89-
String hexSpanId = null;
98+
// Step 2: Retrieve and assert logs
99+
TelemetryContext telCtx = verifyAndFetchLogs(connectionUuid);
90100

91-
try (Logging logging =
92-
LoggingOptions.newBuilder().setProjectId(PROJECT_ID).build().getService()) {
93-
String filter =
94-
"logName:\"projects/"
95-
+ PROJECT_ID
96-
+ "/logs/com.google.cloud.bigquery\" AND labels.\"jdbc.connection_id\"=\""
97-
+ connectionUuid
98-
+ "\"";
101+
// Step 3: Query Cloud Trace and assert parent-child hierarchy
102+
Trace trace = verifyAndFetchTrace(telCtx.traceId);
99103

100-
List<LogEntry> entries = fetchLogsWithRetry(logging, filter);
101-
assertFalse(entries.isEmpty(), "Telemetry logs should be exported to GCP");
104+
boolean foundParentExecuteQuery = false;
105+
boolean foundChildSdkSpans = false;
106+
boolean foundPaginationSpans = false;
107+
long parentSpanId = 0;
102108

103-
LogEntry sampleEntry = entries.get(0);
104-
traceId = sampleEntry.getTrace();
105-
hexSpanId = sampleEntry.getSpanId();
106-
107-
assertNotNull(traceId, "Log entry must contain TraceId");
108-
assertNotNull(hexSpanId, "Log entry must contain SpanId");
109-
110-
// Verify Connection UUID label correlation on all entries
111-
for (LogEntry entry : entries) {
112-
assertEquals(connectionUuid, entry.getLabels().get("jdbc.connection_id"));
109+
for (TraceSpan span : trace.getSpansList()) {
110+
String spanName = span.getName();
111+
if (spanName.equals("BigQueryStatement.executeQuery")) {
112+
foundParentExecuteQuery = true;
113+
parentSpanId = span.getSpanId();
113114
}
114115
}
115116

116-
// Step 3: Query Cloud Trace using TraceId and assert parent-child hierarchy
117-
String hexTraceId = traceId;
118-
if (traceId.contains("/traces/")) {
119-
hexTraceId = traceId.substring(traceId.lastIndexOf("/traces/") + 8);
120-
}
121-
122-
try (TraceServiceClient traceClient = TraceServiceClient.create()) {
123-
Trace trace = fetchTraceWithRetry(traceClient, PROJECT_ID, hexTraceId);
124-
assertNotNull(trace, "Trace must be found in Cloud Trace API: " + hexTraceId);
125-
126-
boolean foundParentExecuteQuery = false;
127-
boolean foundChildSdkSpans = false;
128-
boolean foundPaginationSpans = false;
129-
long parentSpanId = 0;
117+
assertTrue(
118+
foundParentExecuteQuery,
119+
"Traces must contain JDBC parent span 'BigQueryStatement.executeQuery'");
130120

131-
for (TraceSpan span : trace.getSpansList()) {
132-
String spanName = span.getName();
133-
if (spanName.equals("BigQueryStatement.executeQuery")) {
134-
foundParentExecuteQuery = true;
135-
parentSpanId = span.getSpanId();
136-
}
121+
// Verify that we captured child spans or linked pagination spans
122+
for (TraceSpan span : trace.getSpansList()) {
123+
if (span.getParentSpanId() == parentSpanId && parentSpanId != 0) {
124+
foundChildSdkSpans = true;
137125
}
138-
139-
assertTrue(
140-
foundParentExecuteQuery,
141-
"Traces must contain JDBC parent span 'BigQueryStatement.executeQuery'");
142-
143-
// Verify that we captured child spans or linked pagination spans
144-
for (TraceSpan span : trace.getSpansList()) {
145-
if (span.getParentSpanId() == parentSpanId && parentSpanId != 0) {
146-
foundChildSdkSpans = true;
147-
}
148-
if (span.getName().equals("BigQueryStatement.pagination")) {
149-
foundPaginationSpans = true;
150-
}
126+
if (span.getName().equals("BigQueryStatement.pagination")) {
127+
foundPaginationSpans = true;
151128
}
152-
153-
assertTrue(foundPaginationSpans, "OTel pagination must generate pagination spans");
154-
assertTrue(
155-
foundChildSdkSpans,
156-
"OTel context must propagate parent to downstream pagination child spans");
157129
}
130+
131+
assertTrue(foundPaginationSpans, "OTel pagination must generate pagination spans");
132+
assertTrue(
133+
foundChildSdkSpans,
134+
"OTel context must propagate parent to downstream pagination child spans");
158135
}
159136

160137
@Test
@@ -163,37 +140,48 @@ public void testExecute_withErrorCorrelation() throws Exception {
163140
PROJECT_ID != null && !PROJECT_ID.trim().isEmpty(),
164141
"Skipping OTel E2E tests because no default Project ID is configured.");
165142

166-
// Step 1: Connect with GCP Exporters enabled
167-
Properties props = new Properties();
168-
props.setProperty("enableGcpTraceExporter", "true");
169-
props.setProperty("enableGcpLogExporter", "true");
170-
props.setProperty("LogLevel", "3"); // Triggers FINE log generation
171-
props.setProperty("gcpTelemetryProjectId", PROJECT_ID);
143+
// Step 1: Connect with GCP Exporters enabled via DataSource
144+
DataSource ds = DataSource.fromUrl(CONNECTION_URL);
145+
ds.setEnableGcpTraceExporter(true);
146+
ds.setEnableGcpLogExporter(true);
147+
ds.setLogLevel("5"); // Triggers FINE log generation
148+
ds.setGcpTelemetryProjectId(PROJECT_ID);
172149

173150
String connectionUuid = null;
174151

175-
try (Connection connection = DriverManager.getConnection(CONNECTION_URL, props);
152+
try (Connection connection = ds.getConnection();
176153
Statement statement = connection.createStatement()) {
177154

178155
// Retrieve the Connection UUID programmatically
179156
BigQueryConnection bqConnection = connection.unwrap(BigQueryConnection.class);
180157
connectionUuid = bqConnection.getConnectionId();
181158
assertNotNull(connectionUuid, "Connection UUID should be generated");
182159

183-
// Execute a query designed to fail due to non-existent table
184-
boolean caughtException = false;
185-
try {
186-
statement.executeQuery("SELECT * FROM invalid_dataset.invalid_table;");
187-
} catch (SQLException e) {
188-
caughtException = true;
160+
// Execute a query designed to fail instantly due to syntax error (compiler-level failure)
161+
assertThrows(SQLException.class, () -> statement.executeQuery("SELECT * FROM;"));
162+
}
163+
164+
// Step 2: Retrieve and assert logs
165+
TelemetryContext telCtx = verifyAndFetchLogs(connectionUuid);
166+
167+
// Step 3: Query Cloud Trace and assert span status is ERROR
168+
Trace trace = verifyAndFetchTrace(telCtx.traceId);
169+
170+
boolean foundParentExecuteQuery = false;
171+
172+
for (TraceSpan span : trace.getSpansList()) {
173+
String spanName = span.getName();
174+
if (spanName.equals("BigQueryStatement.executeQuery")) {
175+
foundParentExecuteQuery = true;
189176
}
190-
assertTrue(caughtException, "Expected SQLException to be thrown");
191177
}
192178

193-
// Step 2: Retrieve logs from Cloud Logging and assert error logs
194-
String traceId = null;
195-
String hexSpanId = null;
179+
assertTrue(
180+
foundParentExecuteQuery,
181+
"Traces must contain JDBC parent span 'BigQueryStatement.executeQuery'");
182+
}
196183

184+
private TelemetryContext verifyAndFetchLogs(String connectionUuid) throws Exception {
197185
try (Logging logging =
198186
LoggingOptions.newBuilder().setProjectId(PROJECT_ID).build().getService()) {
199187
String filter =
@@ -207,14 +195,22 @@ public void testExecute_withErrorCorrelation() throws Exception {
207195
assertFalse(entries.isEmpty(), "Telemetry logs should be exported to GCP");
208196

209197
LogEntry sampleEntry = entries.get(0);
210-
traceId = sampleEntry.getTrace();
211-
hexSpanId = sampleEntry.getSpanId();
198+
String traceId = sampleEntry.getTrace();
199+
String hexSpanId = sampleEntry.getSpanId();
212200

213201
assertNotNull(traceId, "Log entry must contain TraceId");
214202
assertNotNull(hexSpanId, "Log entry must contain SpanId");
203+
204+
// Verify Connection UUID label correlation on all entries
205+
for (LogEntry entry : entries) {
206+
assertEquals(connectionUuid, entry.getLabels().get("jdbc.connection_id"));
207+
}
208+
209+
return new TelemetryContext(traceId, hexSpanId);
215210
}
211+
}
216212

217-
// Step 3: Query Cloud Trace using TraceId and assert span status is ERROR
213+
private Trace verifyAndFetchTrace(String traceId) throws Exception {
218214
String hexTraceId = traceId;
219215
if (traceId.contains("/traces/")) {
220216
hexTraceId = traceId.substring(traceId.lastIndexOf("/traces/") + 8);
@@ -223,19 +219,7 @@ public void testExecute_withErrorCorrelation() throws Exception {
223219
try (TraceServiceClient traceClient = TraceServiceClient.create()) {
224220
Trace trace = fetchTraceWithRetry(traceClient, PROJECT_ID, hexTraceId);
225221
assertNotNull(trace, "Trace must be found in Cloud Trace API: " + hexTraceId);
226-
227-
boolean foundParentExecuteQuery = false;
228-
229-
for (TraceSpan span : trace.getSpansList()) {
230-
String spanName = span.getName();
231-
if (spanName.equals("BigQueryStatement.executeQuery")) {
232-
foundParentExecuteQuery = true;
233-
}
234-
}
235-
236-
assertTrue(
237-
foundParentExecuteQuery,
238-
"Traces must contain JDBC parent span 'BigQueryStatement.executeQuery'");
222+
return trace;
239223
}
240224
}
241225

@@ -246,7 +230,6 @@ private <T> T pollWithRetry(java.util.concurrent.Callable<T> task) throws Interr
246230

247231
while (attempts < maxAttempts) {
248232
attempts++;
249-
Thread.sleep(delayMs);
250233
try {
251234
T result = task.call();
252235
if (result != null) {
@@ -258,6 +241,9 @@ private <T> T pollWithRetry(java.util.concurrent.Callable<T> task) throws Interr
258241
} catch (Exception e) {
259242
// Ignore exceptions during remote lookup and retry
260243
}
244+
if (attempts < maxAttempts) {
245+
Thread.sleep(delayMs);
246+
}
261247
}
262248
return null;
263249
}

0 commit comments

Comments
 (0)