Skip to content

Commit 577f6e8

Browse files
authored
chore(bigquery-jdbc): add the bridge Thread wrap ahead of Executor migration (#13482)
1 parent 872d7b7 commit 577f6e8

2 files changed

Lines changed: 202 additions & 0 deletions

File tree

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.concurrent.Future;
7373
import java.util.concurrent.LinkedBlockingQueue;
7474
import java.util.concurrent.TimeUnit;
75+
import java.util.concurrent.TimeoutException;
7576
import java.util.concurrent.atomic.AtomicReference;
7677
import java.util.function.Function;
7778
import java.util.function.Supplier;
@@ -5264,4 +5265,80 @@ private void loadDriverVersionProperties() {
52645265
throw ex;
52655266
}
52665267
}
5268+
5269+
// TODO(keshav): This is a temporary compatibility bridge to wrap raw Threads into Futures.
5270+
// This should be removed when BigQueryDatabaseMetaData is refactored to use the ExecutorService
5271+
// directly.
5272+
static Future<?>[] wrapThread(final Thread thread) {
5273+
if (thread == null) {
5274+
return null;
5275+
}
5276+
return new Future<?>[] {
5277+
new Future<Object>() {
5278+
private volatile boolean cancelled = false;
5279+
5280+
@Override
5281+
public synchronized boolean cancel(boolean mayInterruptIfRunning) {
5282+
if (cancelled || thread.getState() == Thread.State.TERMINATED) {
5283+
return false;
5284+
}
5285+
cancelled = true;
5286+
if (mayInterruptIfRunning) {
5287+
thread.interrupt();
5288+
}
5289+
return true;
5290+
}
5291+
5292+
@Override
5293+
public boolean isCancelled() {
5294+
return cancelled;
5295+
}
5296+
5297+
@Override
5298+
public boolean isDone() {
5299+
return cancelled || thread.getState() == Thread.State.TERMINATED;
5300+
}
5301+
5302+
@Override
5303+
public Object get() throws InterruptedException, CancellationException {
5304+
try {
5305+
return get(365, TimeUnit.DAYS);
5306+
} catch (TimeoutException e) {
5307+
throw new RuntimeException(e);
5308+
}
5309+
}
5310+
5311+
@Override
5312+
public Object get(long timeout, TimeUnit unit)
5313+
throws InterruptedException, CancellationException, TimeoutException {
5314+
if (isCancelled()) {
5315+
throw new CancellationException();
5316+
}
5317+
long remainingNanos = unit.toNanos(timeout);
5318+
long deadline = System.nanoTime() + remainingNanos;
5319+
while (thread.getState() != Thread.State.TERMINATED) {
5320+
if (isCancelled()) {
5321+
throw new CancellationException();
5322+
}
5323+
if (remainingNanos <= 0) {
5324+
throw new TimeoutException();
5325+
}
5326+
long remainingMillis = TimeUnit.NANOSECONDS.toMillis(remainingNanos);
5327+
if (remainingMillis == 0) {
5328+
remainingMillis = 1;
5329+
}
5330+
5331+
long delay = Math.min(remainingMillis, 50);
5332+
if (thread.getState() == Thread.State.NEW) {
5333+
Thread.sleep(delay);
5334+
} else {
5335+
thread.join(delay);
5336+
}
5337+
remainingNanos = deadline - System.nanoTime();
5338+
}
5339+
return null;
5340+
}
5341+
}
5342+
};
5343+
}
52675344
}

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,13 @@
4444
import java.sql.Types;
4545
import java.util.*;
4646
import java.util.concurrent.Callable;
47+
import java.util.concurrent.CancellationException;
48+
import java.util.concurrent.CountDownLatch;
4749
import java.util.concurrent.ExecutionException;
4850
import java.util.concurrent.ExecutorService;
4951
import java.util.concurrent.Future;
52+
import java.util.concurrent.TimeUnit;
53+
import java.util.concurrent.TimeoutException;
5054
import java.util.regex.Pattern;
5155
import org.junit.jupiter.api.BeforeEach;
5256
import org.junit.jupiter.api.Test;
@@ -3308,4 +3312,125 @@ public void testMetadataAndResultSetMetadataTypeMappingConsistency(StandardSQLTy
33083312
assertEquals(
33093313
metadataTypeInfo.jdbcType, (int) resultSetType, "Type mapping mismatch for " + type);
33103314
}
3315+
3316+
@Test
3317+
public void testWrapThread_NullThread() {
3318+
assertNull(BigQueryDatabaseMetaData.wrapThread(null));
3319+
}
3320+
3321+
@Test
3322+
public void testWrapThread_BasicLifecycle() throws Exception {
3323+
CountDownLatch startLatch = new CountDownLatch(1);
3324+
CountDownLatch finishLatch = new CountDownLatch(1);
3325+
Thread t =
3326+
new Thread(
3327+
() -> {
3328+
try {
3329+
startLatch.countDown();
3330+
finishLatch.await();
3331+
} catch (InterruptedException e) {
3332+
// ignore
3333+
}
3334+
});
3335+
3336+
Future<?>[] futures = BigQueryDatabaseMetaData.wrapThread(t);
3337+
assertNotNull(futures);
3338+
assertEquals(1, futures.length);
3339+
Future<?> f = futures[0];
3340+
3341+
// Thread is NEW (not started yet).
3342+
assertFalse(f.isDone());
3343+
assertFalse(f.isCancelled());
3344+
3345+
t.start();
3346+
startLatch.await();
3347+
3348+
// Thread is running.
3349+
assertFalse(f.isDone());
3350+
assertFalse(f.isCancelled());
3351+
3352+
finishLatch.countDown();
3353+
t.join();
3354+
3355+
// Thread is terminated.
3356+
assertTrue(f.isDone());
3357+
assertFalse(f.isCancelled());
3358+
assertNull(f.get());
3359+
}
3360+
3361+
@Test
3362+
public void testWrapThread_CancelBeforeStart() throws Exception {
3363+
Thread t =
3364+
new Thread(
3365+
() -> {
3366+
try {
3367+
Thread.sleep(1000);
3368+
} catch (InterruptedException e) {
3369+
// ignore
3370+
}
3371+
});
3372+
3373+
Future<?> f = BigQueryDatabaseMetaData.wrapThread(t)[0];
3374+
assertTrue(f.cancel(true));
3375+
assertTrue(f.isCancelled());
3376+
assertTrue(f.isDone());
3377+
3378+
// cancel on already cancelled should return false
3379+
assertFalse(f.cancel(true));
3380+
3381+
assertThrows(CancellationException.class, () -> f.get());
3382+
assertThrows(CancellationException.class, () -> f.get(1, TimeUnit.SECONDS));
3383+
}
3384+
3385+
@Test
3386+
public void testWrapThread_CancelRunningWithInterrupt() throws Exception {
3387+
CountDownLatch startLatch = new CountDownLatch(1);
3388+
CountDownLatch interruptedLatch = new CountDownLatch(1);
3389+
Thread t =
3390+
new Thread(
3391+
() -> {
3392+
startLatch.countDown();
3393+
try {
3394+
Thread.sleep(10000);
3395+
} catch (InterruptedException e) {
3396+
interruptedLatch.countDown();
3397+
}
3398+
});
3399+
3400+
t.start();
3401+
startLatch.await();
3402+
3403+
Future<?> f = BigQueryDatabaseMetaData.wrapThread(t)[0];
3404+
assertTrue(f.cancel(true));
3405+
assertTrue(f.isCancelled());
3406+
assertTrue(f.isDone());
3407+
3408+
assertTrue(interruptedLatch.await(5, TimeUnit.SECONDS));
3409+
assertThrows(CancellationException.class, () -> f.get());
3410+
}
3411+
3412+
@Test
3413+
public void testWrapThread_GetTimeout() throws Exception {
3414+
CountDownLatch startLatch = new CountDownLatch(1);
3415+
Thread t =
3416+
new Thread(
3417+
() -> {
3418+
startLatch.countDown();
3419+
try {
3420+
Thread.sleep(10000);
3421+
} catch (InterruptedException e) {
3422+
// ignore
3423+
}
3424+
});
3425+
3426+
t.start();
3427+
startLatch.await();
3428+
3429+
Future<?> f = BigQueryDatabaseMetaData.wrapThread(t)[0];
3430+
assertThrows(TimeoutException.class, () -> f.get(100, TimeUnit.MILLISECONDS));
3431+
3432+
// Cleanup: stop the thread
3433+
t.interrupt();
3434+
t.join();
3435+
}
33113436
}

0 commit comments

Comments
 (0)