Skip to content

Commit 1204b1f

Browse files
committed
chore: add Span Links for asynchronous pagination
1 parent c457034 commit 1204b1f

File tree

1 file changed

+66
-58
lines changed

1 file changed

+66
-58
lines changed

java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java

Lines changed: 66 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import com.google.common.util.concurrent.Uninterruptibles;
6060
import io.opentelemetry.api.GlobalOpenTelemetry;
6161
import io.opentelemetry.api.trace.Span;
62+
import io.opentelemetry.api.trace.SpanBuilder;
63+
import io.opentelemetry.api.trace.SpanContext;
6264
import io.opentelemetry.api.trace.StatusCode;
6365
import io.opentelemetry.api.trace.Tracer;
6466
import io.opentelemetry.context.Context;
@@ -1057,72 +1059,78 @@ Thread runNextPageTaskAsync(
10571059
// calls
10581060
populateFirstPage(result, rpcResponseQueue);
10591061

1060-
Context asyncContext = (this.otelContext != null) ? this.otelContext : Context.current();
1062+
SpanContext parentSpanContext = null;
1063+
if (this.otelContext != null) {
1064+
parentSpanContext = Span.fromContext(this.otelContext).getSpanContext();
1065+
}
1066+
final SpanContext finalParentSpanContext = parentSpanContext;
10611067

10621068
// This thread makes the RPC calls and paginates
10631069
Runnable nextPageTask =
1064-
asyncContext.wrap(
1065-
() -> {
1066-
Tracer tracer = getSafeTracer();
1067-
String currentPageToken = firstPageToken;
1068-
TableResult currentResults = result;
1069-
TableId destinationTable = null;
1070-
if (firstPageToken != null) {
1071-
destinationTable = getDestinationTable(jobId);
1072-
}
1070+
() -> {
1071+
Tracer tracer = getSafeTracer();
1072+
String currentPageToken = firstPageToken;
1073+
TableResult currentResults = result;
1074+
TableId destinationTable = null;
1075+
if (firstPageToken != null) {
1076+
destinationTable = getDestinationTable(jobId);
1077+
}
10731078

1074-
try {
1075-
while (currentPageToken != null) {
1076-
// do not process further pages and shutdown
1077-
if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
1078-
LOG.warning(
1079-
"%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName());
1080-
break;
1081-
}
1079+
try {
1080+
while (currentPageToken != null) {
1081+
// do not process further pages and shutdown
1082+
if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
1083+
LOG.warning(
1084+
"%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName());
1085+
break;
1086+
}
10821087

1083-
Span paginationSpan =
1084-
tracer.spanBuilder("BigQueryStatement.pagination").startSpan();
1085-
try (Scope scope = paginationSpan.makeCurrent()) {
1086-
paginationSpan.setAttribute("db.pagination.page_token", currentPageToken);
1087-
1088-
long startTime = System.nanoTime();
1089-
currentResults =
1090-
this.bigQuery.listTableData(
1091-
destinationTable,
1092-
TableDataListOption.pageSize(querySettings.getMaxResultPerPage()),
1093-
TableDataListOption.pageToken(currentPageToken));
1094-
1095-
long duration = (System.nanoTime() - startTime) / 1000000;
1096-
paginationSpan.setAttribute("db.pagination.duration_ms", duration);
1097-
paginationSpan.setAttribute(
1098-
"db.pagination.rows_fetched", querySettings.getMaxResultPerPage());
1099-
1100-
currentPageToken = currentResults.getNextPageToken();
1101-
// this will be parsed asynchronously without blocking the current thread
1102-
Uninterruptibles.putUninterruptibly(
1103-
rpcResponseQueue, Tuple.of(currentResults, true));
1104-
LOG.fine(
1105-
"Fetched %d results from the server in %d ms.",
1106-
querySettings.getMaxResultPerPage(), (int) duration);
1107-
} catch (Exception e) {
1108-
paginationSpan.recordException(e);
1109-
paginationSpan.setStatus(StatusCode.ERROR, e.getMessage());
1110-
throw e;
1111-
} finally {
1112-
paginationSpan.end();
1113-
}
1114-
}
1115-
} catch (Exception ex) {
1088+
SpanBuilder spanBuilder = tracer.spanBuilder("BigQueryStatement.pagination");
1089+
if (finalParentSpanContext != null && finalParentSpanContext.isValid()) {
1090+
spanBuilder.addLink(finalParentSpanContext);
1091+
}
1092+
Span paginationSpan = spanBuilder.startSpan();
1093+
try (Scope scope = paginationSpan.makeCurrent()) {
1094+
paginationSpan.setAttribute("db.pagination.page_token", currentPageToken);
1095+
1096+
long startTime = System.nanoTime();
1097+
currentResults =
1098+
this.bigQuery.listTableData(
1099+
destinationTable,
1100+
TableDataListOption.pageSize(querySettings.getMaxResultPerPage()),
1101+
TableDataListOption.pageToken(currentPageToken));
1102+
1103+
long duration = (System.nanoTime() - startTime) / 1000000;
1104+
paginationSpan.setAttribute("db.pagination.duration_ms", duration);
1105+
paginationSpan.setAttribute(
1106+
"db.pagination.rows_fetched", querySettings.getMaxResultPerPage());
1107+
1108+
currentPageToken = currentResults.getNextPageToken();
1109+
// this will be parsed asynchronously without blocking the current thread
11161110
Uninterruptibles.putUninterruptibly(
1117-
bigQueryFieldValueListWrapperBlockingQueue,
1118-
BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex)));
1111+
rpcResponseQueue, Tuple.of(currentResults, true));
1112+
LOG.fine(
1113+
"Fetched %d results from the server in %d ms.",
1114+
querySettings.getMaxResultPerPage(), (int) duration);
1115+
} catch (Exception e) {
1116+
paginationSpan.recordException(e);
1117+
paginationSpan.setStatus(StatusCode.ERROR, e.getMessage());
1118+
throw e;
11191119
} finally {
1120-
// this will stop the parseDataTask as well when the pagination completes
1121-
Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false));
1120+
paginationSpan.end();
11221121
}
1123-
// We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
1124-
// have finished processing the records and even that will be interrupted
1125-
});
1122+
}
1123+
} catch (Exception ex) {
1124+
Uninterruptibles.putUninterruptibly(
1125+
bigQueryFieldValueListWrapperBlockingQueue,
1126+
BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex)));
1127+
} finally {
1128+
// this will stop the parseDataTask as well when the pagination completes
1129+
Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false));
1130+
}
1131+
// We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
1132+
// have finished processing the records and even that will be interrupted
1133+
};
11261134

11271135
Thread nextPageWorker = JDBC_THREAD_FACTORY.newThread(nextPageTask);
11281136
nextPageWorker.start();

0 commit comments

Comments
 (0)