Skip to content

Commit 59706e6

Browse files
committed
refactor: use Context.current().wrap() instead of Span Links
1 parent 602b916 commit 59706e6

File tree

1 file changed

+66
-70
lines changed

1 file changed

+66
-70
lines changed

java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java

Lines changed: 66 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import io.opentelemetry.api.GlobalOpenTelemetry;
6161
import io.opentelemetry.api.trace.Span;
6262
import io.opentelemetry.api.trace.SpanBuilder;
63-
import io.opentelemetry.api.trace.SpanContext;
6463
import io.opentelemetry.api.trace.StatusCode;
6564
import io.opentelemetry.api.trace.Tracer;
6665
import io.opentelemetry.context.Context;
@@ -1059,78 +1058,75 @@ Thread runNextPageTaskAsync(
10591058
// calls
10601059
populateFirstPage(result, rpcResponseQueue);
10611060

1062-
SpanContext parentSpanContext = null;
1063-
if (this.otelContext != null) {
1064-
parentSpanContext = Span.fromContext(this.otelContext).getSpanContext();
1065-
}
1066-
final SpanContext finalParentSpanContext = parentSpanContext;
1067-
10681061
// This thread makes the RPC calls and paginates
10691062
Runnable nextPageTask =
1070-
() -> {
1071-
Tracer tracer = getSafeTracer();
1072-
String currentPageToken = firstPageToken;
1073-
TableResult currentResults = result;
1074-
TableId destinationTable = null;
1075-
if (firstPageToken != null) {
1076-
destinationTable = getDestinationTable(jobId);
1077-
}
1063+
Context.current()
1064+
.wrap(
1065+
() -> {
1066+
Tracer tracer = getSafeTracer();
1067+
String currentPageToken = firstPageToken;
1068+
TableResult currentResults = result;
1069+
TableId destinationTable = null;
1070+
if (firstPageToken != null) {
1071+
destinationTable = getDestinationTable(jobId);
1072+
}
10781073

1079-
try {
1080-
while (currentPageToken != null) {
1081-
// do not process further pages and shutdown
1082-
if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
1083-
LOG.warning(
1084-
"%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName());
1085-
break;
1086-
}
1087-
1088-
SpanBuilder spanBuilder = tracer.spanBuilder("BigQueryStatement.pagination");
1089-
if (finalParentSpanContext != null && finalParentSpanContext.isValid()) {
1090-
spanBuilder.addLink(finalParentSpanContext);
1091-
}
1092-
Span paginationSpan = spanBuilder.startSpan();
1093-
try (Scope scope = paginationSpan.makeCurrent()) {
1094-
paginationSpan.setAttribute("db.pagination.page_token", currentPageToken);
1095-
1096-
long startTime = System.nanoTime();
1097-
currentResults =
1098-
this.bigQuery.listTableData(
1099-
destinationTable,
1100-
TableDataListOption.pageSize(querySettings.getMaxResultPerPage()),
1101-
TableDataListOption.pageToken(currentPageToken));
1102-
1103-
long duration = (System.nanoTime() - startTime) / 1000000;
1104-
paginationSpan.setAttribute("db.pagination.duration_ms", duration);
1105-
paginationSpan.setAttribute(
1106-
"db.pagination.rows_fetched", querySettings.getMaxResultPerPage());
1107-
1108-
currentPageToken = currentResults.getNextPageToken();
1109-
// this will be parsed asynchronously without blocking the current thread
1110-
Uninterruptibles.putUninterruptibly(
1111-
rpcResponseQueue, Tuple.of(currentResults, true));
1112-
LOG.fine(
1113-
"Fetched %d results from the server in %d ms.",
1114-
querySettings.getMaxResultPerPage(), (int) duration);
1115-
} catch (Exception e) {
1116-
paginationSpan.recordException(e);
1117-
paginationSpan.setStatus(StatusCode.ERROR, e.getMessage());
1118-
throw e;
1119-
} finally {
1120-
paginationSpan.end();
1121-
}
1122-
}
1123-
} catch (Exception ex) {
1124-
Uninterruptibles.putUninterruptibly(
1125-
bigQueryFieldValueListWrapperBlockingQueue,
1126-
BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex)));
1127-
} finally {
1128-
// this will stop the parseDataTask as well when the pagination completes
1129-
Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false));
1130-
}
1131-
// We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
1132-
// have finished processing the records and even that will be interrupted
1133-
};
1074+
try {
1075+
while (currentPageToken != null) {
1076+
// do not process further pages and shutdown
1077+
if (Thread.currentThread().isInterrupted()
1078+
|| queryTaskExecutor.isShutdown()) {
1079+
LOG.warning(
1080+
"%s Interrupted @ runNextPageTaskAsync",
1081+
Thread.currentThread().getName());
1082+
break;
1083+
}
1084+
1085+
SpanBuilder spanBuilder = tracer.spanBuilder("BigQueryStatement.pagination");
1086+
Span paginationSpan = spanBuilder.startSpan();
1087+
try (Scope scope = paginationSpan.makeCurrent()) {
1088+
paginationSpan.setAttribute("db.pagination.page_token", currentPageToken);
1089+
1090+
long startTime = System.nanoTime();
1091+
currentResults =
1092+
this.bigQuery.listTableData(
1093+
destinationTable,
1094+
TableDataListOption.pageSize(querySettings.getMaxResultPerPage()),
1095+
TableDataListOption.pageToken(currentPageToken));
1096+
1097+
long duration = (System.nanoTime() - startTime) / 1000000;
1098+
paginationSpan.setAttribute("db.pagination.duration_ms", duration);
1099+
paginationSpan.setAttribute(
1100+
"db.pagination.rows_fetched", querySettings.getMaxResultPerPage());
1101+
1102+
currentPageToken = currentResults.getNextPageToken();
1103+
// this will be parsed asynchronously without blocking the current thread
1104+
Uninterruptibles.putUninterruptibly(
1105+
rpcResponseQueue, Tuple.of(currentResults, true));
1106+
LOG.fine(
1107+
"Fetched %d results from the server in %d ms.",
1108+
querySettings.getMaxResultPerPage(), (int) duration);
1109+
} catch (Exception e) {
1110+
paginationSpan.recordException(e);
1111+
paginationSpan.setStatus(StatusCode.ERROR, e.getMessage());
1112+
throw e;
1113+
} finally {
1114+
paginationSpan.end();
1115+
}
1116+
}
1117+
} catch (Exception ex) {
1118+
Uninterruptibles.putUninterruptibly(
1119+
bigQueryFieldValueListWrapperBlockingQueue,
1120+
BigQueryFieldValueListWrapper.ofError(
1121+
new BigQueryJdbcRuntimeException(ex)));
1122+
} finally {
1123+
// this will stop the parseDataTask as well when the pagination completes
1124+
Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false));
1125+
}
1126+
// We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may
1127+
// not
1128+
// have finished processing the records and even that will be interrupted
1129+
});
11341130

11351131
Thread nextPageWorker = JDBC_THREAD_FACTORY.newThread(nextPageTask);
11361132
nextPageWorker.start();

0 commit comments

Comments
 (0)