6262import java .util .UUID ;
6363import java .util .concurrent .ConcurrentHashMap ;
6464import java .util .concurrent .Executor ;
65+ import java .util .concurrent .ExecutorService ;
6566import java .util .concurrent .TimeUnit ;
6667
6768/**
@@ -209,6 +210,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
209210 Boolean reqGoogleDriveScope ;
210211 private final Properties clientInfo = new Properties ();
211212 private boolean isReadOnlyTokenUsed = false ;
213+ private final ExecutorService metadataExecutor ;
214+ private final ExecutorService queryExecutor ;
212215
213216 BigQueryConnection (String url ) throws IOException {
214217 this (url , DataSource .fromUrl (url ));
@@ -344,6 +347,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
344347
345348 this .headerProvider = createHeaderProvider ();
346349 this .bigQuery = getBigQueryConnection ();
350+ this .metadataExecutor = BigQueryJdbcMdc .newFixedThreadPool (metadataFetchThreadCount );
351+ this .queryExecutor = BigQueryJdbcMdc .newCachedThreadPool ();
347352 }
348353 }
349354
@@ -937,23 +942,91 @@ public void close() throws SQLException {
937942 }
938943
939944 private void closeImpl () throws SQLException {
945+ SQLException exceptionToThrow = null ;
940946 try {
941947 if (this .bigQueryReadClient != null ) {
942948 this .bigQueryReadClient .shutdown ();
943- this .bigQueryReadClient .awaitTermination (1 , TimeUnit .MINUTES );
944- this .bigQueryReadClient .close ();
945949 }
946-
947950 if (this .bigQueryWriteClient != null ) {
948951 this .bigQueryWriteClient .shutdown ();
949- this .bigQueryWriteClient .awaitTermination (1 , TimeUnit .MINUTES );
950- this .bigQueryWriteClient .close ();
952+ }
953+ if (this .metadataExecutor != null ) {
954+ this .metadataExecutor .shutdown ();
955+ }
956+ if (this .queryExecutor != null ) {
957+ this .queryExecutor .shutdown ();
951958 }
952959
953960 for (Statement statement : this .openStatements ) {
954- statement .close ();
961+ try {
962+ statement .close ();
963+ } catch (SQLException e ) {
964+ if (exceptionToThrow == null ) {
965+ exceptionToThrow = e ;
966+ } else {
967+ exceptionToThrow .addSuppressed (e );
968+ }
969+ }
955970 }
956971 this .openStatements .clear ();
972+
973+ boolean interrupted = Thread .currentThread ().isInterrupted ();
974+
975+ try {
976+ if (this .bigQueryReadClient != null ) {
977+ if (interrupted ) {
978+ this .bigQueryReadClient .shutdownNow ();
979+ } else {
980+ this .bigQueryReadClient .awaitTermination (1 , TimeUnit .MINUTES );
981+ }
982+ }
983+ if (this .bigQueryWriteClient != null ) {
984+ if (interrupted ) {
985+ this .bigQueryWriteClient .shutdownNow ();
986+ } else {
987+ this .bigQueryWriteClient .awaitTermination (1 , TimeUnit .MINUTES );
988+ }
989+ }
990+ if (this .metadataExecutor != null ) {
991+ if (interrupted || !this .metadataExecutor .awaitTermination (10 , TimeUnit .SECONDS )) {
992+ this .metadataExecutor .shutdownNow ();
993+ }
994+ }
995+ if (this .queryExecutor != null ) {
996+ if (interrupted || !this .queryExecutor .awaitTermination (10 , TimeUnit .SECONDS )) {
997+ this .queryExecutor .shutdownNow ();
998+ }
999+ }
1000+ } catch (InterruptedException e ) {
1001+ interrupted = true ;
1002+ if (this .bigQueryReadClient != null ) {
1003+ this .bigQueryReadClient .shutdownNow ();
1004+ }
1005+ if (this .bigQueryWriteClient != null ) {
1006+ this .bigQueryWriteClient .shutdownNow ();
1007+ }
1008+ if (this .metadataExecutor != null ) {
1009+ this .metadataExecutor .shutdownNow ();
1010+ }
1011+ if (this .queryExecutor != null ) {
1012+ this .queryExecutor .shutdownNow ();
1013+ }
1014+ } finally {
1015+ try {
1016+ if (this .bigQueryReadClient != null ) {
1017+ this .bigQueryReadClient .close ();
1018+ }
1019+ } finally {
1020+ if (this .bigQueryWriteClient != null ) {
1021+ this .bigQueryWriteClient .close ();
1022+ }
1023+ }
1024+ }
1025+
1026+ if (interrupted ) {
1027+ Thread .currentThread ().interrupt ();
1028+ throw new InterruptedException ("Interrupted awaiting executor termination" );
1029+ }
9571030 } catch (ConcurrentModificationException ex ) {
9581031 throw new BigQueryJdbcException ("Concurrent modification during close" , ex );
9591032 } catch (InterruptedException e ) {
@@ -962,9 +1035,20 @@ private void closeImpl() throws SQLException {
9621035 BigQueryJdbcMdc .clear ();
9631036 BigQueryJdbcRootLogger .closeConnectionHandler (this .connectionId );
9641037 }
1038+ if (exceptionToThrow != null ) {
1039+ throw exceptionToThrow ;
1040+ }
9651041 this .isClosed = true ;
9661042 }
9671043
1044+ ExecutorService getExecutorService () {
1045+ return this .queryExecutor ;
1046+ }
1047+
1048+ ExecutorService getMetadataExecutor () {
1049+ return this .metadataExecutor ;
1050+ }
1051+
9681052 @ Override
9691053 public boolean isClosed () {
9701054 return this .isClosed ;
0 commit comments