Skip to content

Commit d9418fc

Browse files
committed
refactor: create functional interface withTracing
1 parent cb578ac commit d9418fc

File tree

1 file changed

+99
-107
lines changed

1 file changed

+99
-107
lines changed

java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java

Lines changed: 99 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -241,62 +241,48 @@ private BigQuerySettings generateBigQuerySettings() {
241241
@Override
242242
public ResultSet executeQuery(String sql) throws SQLException {
243243
LOG.finest("++enter++");
244-
Tracer tracer = getSafeTracer();
245-
Span span = tracer.spanBuilder("BigQueryStatement.executeQuery").startSpan();
246-
try (Scope scope = span.makeCurrent()) {
247-
span.setAttribute("db.statement", sql);
248-
this.otelContext = Context.current();
249-
logQueryExecutionStart(sql);
250-
try {
251-
QueryJobConfiguration jobConfiguration =
252-
setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build());
253-
runQuery(sql, jobConfiguration);
254-
} catch (InterruptedException ex) {
255-
throw new BigQueryJdbcException(ex);
256-
}
244+
return withTracing(
245+
"BigQueryStatement.executeQuery",
246+
(span) -> {
247+
span.setAttribute("db.statement", sql);
248+
logQueryExecutionStart(sql);
249+
try {
250+
QueryJobConfiguration jobConfiguration =
251+
setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build());
252+
runQuery(sql, jobConfiguration);
253+
} catch (InterruptedException ex) {
254+
throw new BigQueryJdbcException(ex);
255+
}
257256

258-
if (!isSingularResultSet()) {
259-
throw new BigQueryJdbcException(
260-
"Query returned more than one or didn't return any ResultSet.");
261-
}
262-
// This contains all the other assertions spec required on this method
263-
return getCurrentResultSet();
264-
} catch (Exception ex) {
265-
span.recordException(ex);
266-
span.setStatus(StatusCode.ERROR, ex.getMessage());
267-
throw ex;
268-
} finally {
269-
span.end();
270-
}
257+
if (!isSingularResultSet()) {
258+
throw new BigQueryJdbcException(
259+
"Query returned more than one or didn't return any ResultSet.");
260+
}
261+
// This contains all the other assertions spec required on this method
262+
return getCurrentResultSet();
263+
});
271264
}
272265

273266
@Override
274267
public long executeLargeUpdate(String sql) throws SQLException {
275268
LOG.finest("++enter++");
276-
Tracer tracer = getSafeTracer();
277-
Span span = tracer.spanBuilder("BigQueryStatement.executeLargeUpdate").startSpan();
278-
try (Scope scope = span.makeCurrent()) {
279-
span.setAttribute("db.statement", sql);
280-
this.otelContext = Context.current();
281-
logQueryExecutionStart(sql);
282-
try {
283-
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
284-
runQuery(sql, jobConfiguration.build());
285-
} catch (InterruptedException ex) {
286-
throw new BigQueryJdbcRuntimeException(ex);
287-
}
288-
if (this.currentUpdateCount == -1) {
289-
throw new BigQueryJdbcException(
290-
"Update query expected to return affected row count. Double check query type.");
291-
}
292-
return this.currentUpdateCount;
293-
} catch (Exception ex) {
294-
span.recordException(ex);
295-
span.setStatus(StatusCode.ERROR, ex.getMessage());
296-
throw ex;
297-
} finally {
298-
span.end();
299-
}
269+
return withTracing(
270+
"BigQueryStatement.executeLargeUpdate",
271+
(span) -> {
272+
span.setAttribute("db.statement", sql);
273+
logQueryExecutionStart(sql);
274+
try {
275+
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
276+
runQuery(sql, jobConfiguration.build());
277+
} catch (InterruptedException ex) {
278+
throw new BigQueryJdbcRuntimeException(ex);
279+
}
280+
if (this.currentUpdateCount == -1) {
281+
throw new BigQueryJdbcException(
282+
"Update query expected to return affected row count. Double check query type.");
283+
}
284+
return this.currentUpdateCount;
285+
});
300286
}
301287

302288
@Override
@@ -318,30 +304,23 @@ int checkUpdateCount(long updateCount) {
318304
@Override
319305
public boolean execute(String sql) throws SQLException {
320306
LOG.finest("++enter++");
321-
Tracer tracer = getSafeTracer();
322-
Span span = tracer.spanBuilder("BigQueryStatement.execute").startSpan();
323-
try (Scope scope = span.makeCurrent()) {
324-
span.setAttribute("db.statement", sql);
325-
this.otelContext = Context.current();
326-
logQueryExecutionStart(sql);
327-
try {
328-
QueryJobConfiguration jobConfiguration = getJobConfig(sql).build();
329-
// If Large Results are enabled, ensure query type is SELECT
330-
if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) {
331-
jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration);
332-
}
333-
runQuery(sql, jobConfiguration);
334-
} catch (InterruptedException ex) {
335-
throw new BigQueryJdbcRuntimeException(ex);
336-
}
337-
return getCurrentResultSet() != null;
338-
} catch (Exception ex) {
339-
span.recordException(ex);
340-
span.setStatus(StatusCode.ERROR, ex.getMessage());
341-
throw ex;
342-
} finally {
343-
span.end();
344-
}
307+
return withTracing(
308+
"BigQueryStatement.execute",
309+
(span) -> {
310+
span.setAttribute("db.statement", sql);
311+
logQueryExecutionStart(sql);
312+
try {
313+
QueryJobConfiguration jobConfiguration = getJobConfig(sql).build();
314+
// If Large Results are enabled, ensure query type is SELECT
315+
if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) {
316+
jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration);
317+
}
318+
runQuery(sql, jobConfiguration);
319+
} catch (InterruptedException ex) {
320+
throw new BigQueryJdbcRuntimeException(ex);
321+
}
322+
return getCurrentResultSet() != null;
323+
});
345324
}
346325

347326
StatementType getStatementType(QueryJobConfiguration queryJobConfiguration) throws SQLException {
@@ -1476,42 +1455,35 @@ public void clearBatch() {
14761455
@Override
14771456
public int[] executeBatch() throws SQLException {
14781457
LOG.finest("++enter++");
1479-
Tracer tracer = getSafeTracer();
1480-
Span span = tracer.spanBuilder("BigQueryStatement.executeBatch").startSpan();
1481-
try (Scope scope = span.makeCurrent()) {
1482-
span.setAttribute("db.statement.count", this.batchQueries.size());
1483-
this.otelContext = Context.current();
1484-
1485-
int[] result = new int[this.batchQueries.size()];
1486-
if (this.batchQueries.isEmpty()) {
1487-
return result;
1488-
}
1458+
return withTracing(
1459+
"BigQueryStatement.executeBatch",
1460+
(span) -> {
1461+
span.setAttribute("db.statement.count", this.batchQueries.size());
1462+
1463+
int[] result = new int[this.batchQueries.size()];
1464+
if (this.batchQueries.isEmpty()) {
1465+
return result;
1466+
}
14891467

1490-
try {
1491-
String combinedQueries = String.join("", this.batchQueries);
1492-
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries);
1493-
jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH);
1494-
runQuery(combinedQueries, jobConfiguration.build());
1495-
} catch (InterruptedException ex) {
1496-
throw new BigQueryJdbcRuntimeException(ex);
1497-
}
1468+
try {
1469+
String combinedQueries = String.join("", this.batchQueries);
1470+
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries);
1471+
jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH);
1472+
runQuery(combinedQueries, jobConfiguration.build());
1473+
} catch (InterruptedException ex) {
1474+
throw new BigQueryJdbcRuntimeException(ex);
1475+
}
14981476

1499-
int i = 0;
1500-
while (getUpdateCount() != -1 && i < this.batchQueries.size()) {
1501-
result[i] = getUpdateCount();
1502-
getMoreResults();
1503-
i++;
1504-
}
1477+
int i = 0;
1478+
while (getUpdateCount() != -1 && i < this.batchQueries.size()) {
1479+
result[i] = getUpdateCount();
1480+
getMoreResults();
1481+
i++;
1482+
}
15051483

1506-
clearBatch();
1507-
return result;
1508-
} catch (Exception e) {
1509-
span.recordException(e);
1510-
span.setStatus(StatusCode.ERROR, e.getMessage());
1511-
throw e;
1512-
} finally {
1513-
span.end();
1514-
}
1484+
clearBatch();
1485+
return result;
1486+
});
15151487
}
15161488

15171489
@Override
@@ -1661,6 +1633,26 @@ private void enqueueBufferEndOfStream(BlockingQueue<BigQueryFieldValueListWrappe
16611633
Uninterruptibles.putUninterruptibly(queue, BigQueryFieldValueListWrapper.of(null, null, true));
16621634
}
16631635

1636+
@FunctionalInterface
1637+
private interface TracedOperation<T> {
1638+
T run(Span span) throws SQLException;
1639+
}
1640+
1641+
private <T> T withTracing(String spanName, TracedOperation<T> operation) throws SQLException {
1642+
Tracer tracer = getSafeTracer();
1643+
Span span = tracer.spanBuilder(spanName).startSpan();
1644+
try (Scope scope = span.makeCurrent()) {
1645+
this.otelContext = Context.current();
1646+
return operation.run(span);
1647+
} catch (Exception ex) {
1648+
span.recordException(ex);
1649+
span.setStatus(StatusCode.ERROR, ex.getMessage());
1650+
throw ex;
1651+
} finally {
1652+
span.end();
1653+
}
1654+
}
1655+
16641656
/**
16651657
* Gets the OpenTelemetry Context from the statement execution. Used by ResultSet for pagination
16661658
* span context.

0 commit comments

Comments
 (0)