Skip to content

Commit d3e7a19

Browse files
authored
fix(bigquery-jdbc): ensure largeResults are handled in PreparedStatement (#13496)
1 parent aa3262a commit d3e7a19

4 files changed

Lines changed: 122 additions & 52 deletions

File tree

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryParameterHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,4 +284,8 @@ StandardSQLTypeName getSqlType(String name) {
284284
}
285285
return null;
286286
}
287+
288+
int getParametersArraySize() {
289+
return this.parametersArraySize;
290+
}
287291
}

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryPreparedStatement.java

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@
6464
class BigQueryPreparedStatement extends BigQueryStatement implements PreparedStatement {
6565
private final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString());
6666
private static final char POSITIONAL_PARAMETER_CHAR = '?';
67-
// Making this protected so BigQueryCallableStatement subclass can access the parameters.
68-
protected final BigQueryParameterHandler parameterHandler;
67+
// parameterHandler is inherited from BigQueryStatement
6968
protected int parameterCount = 0;
7069
protected String currentQuery;
7170
private Queue<ArrayList<BigQueryJdbcParameter>> batchParameters = new LinkedList<>();
@@ -90,49 +89,22 @@ private int getParameterCount(String query) {
9089

9190
@Override
9291
public ResultSet executeQuery() throws SQLException {
93-
logQueryExecutionStart(this.currentQuery);
94-
try {
95-
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery);
96-
jobConfiguration.setParameterMode("POSITIONAL");
97-
jobConfiguration = this.parameterHandler.configureParameters(jobConfiguration);
98-
runQuery(this.currentQuery, jobConfiguration.build());
99-
} catch (InterruptedException ex) {
100-
throw new BigQueryJdbcRuntimeException("Interrupted during executeQuery", ex);
101-
}
102-
return getCurrentResultSet();
92+
return super.executeQuery(this.currentQuery);
10393
}
10494

10595
@Override
10696
public long executeLargeUpdate() throws SQLException {
107-
logQueryExecutionStart(this.currentQuery);
108-
try {
109-
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery);
110-
jobConfiguration.setParameterMode("POSITIONAL");
111-
jobConfiguration = this.parameterHandler.configureParameters(jobConfiguration);
112-
runQuery(this.currentQuery, jobConfiguration.build());
113-
} catch (InterruptedException ex) {
114-
throw new BigQueryJdbcRuntimeException("Interrupted during executeLargeUpdate", ex);
115-
}
116-
return this.currentUpdateCount;
97+
return super.executeLargeUpdate(this.currentQuery);
11798
}
11899

119100
@Override
120101
public int executeUpdate() throws SQLException {
121-
return checkUpdateCount(executeLargeUpdate());
102+
return super.executeUpdate(this.currentQuery);
122103
}
123104

124105
@Override
125106
public boolean execute() throws SQLException {
126-
logQueryExecutionStart(this.currentQuery);
127-
try {
128-
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery);
129-
jobConfiguration.setParameterMode("POSITIONAL");
130-
jobConfiguration = this.parameterHandler.configureParameters(jobConfiguration);
131-
runQuery(this.currentQuery, jobConfiguration.build());
132-
} catch (InterruptedException ex) {
133-
throw new BigQueryJdbcRuntimeException("Interrupted during execute", ex);
134-
}
135-
return getCurrentResultSet() != null;
107+
return super.execute(this.currentQuery);
136108
}
137109

138110
@Override

java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public class BigQueryStatement extends BigQueryNoOpsStatement {
105105
protected int currentJobIdIndex = -1;
106106
protected List<String> batchQueries = new ArrayList<>();
107107
protected BigQueryConnection connection;
108+
protected BigQueryParameterHandler parameterHandler = null;
108109
protected String connectionId;
109110
protected int maxFieldSize = 0;
110111
protected int maxRows = 0;
@@ -242,9 +243,10 @@ public ResultSet executeQuery(String sql) throws SQLException {
242243
private ResultSet executeQueryImpl(String sql) throws SQLException {
243244
logQueryExecutionStart(sql);
244245
try {
245-
QueryJobConfiguration jobConfiguration =
246-
setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build());
247-
runQuery(sql, jobConfiguration);
246+
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
247+
jobConfiguration = applyParametersIfPresent(jobConfiguration);
248+
jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration);
249+
runQuery(sql, jobConfiguration.build());
248250
} catch (InterruptedException ex) {
249251
throw new BigQueryJdbcException("Interrupted during executeQuery", ex);
250252
}
@@ -266,6 +268,7 @@ private long executeLargeUpdateImpl(String sql) throws SQLException {
266268
logQueryExecutionStart(sql);
267269
try {
268270
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
271+
jobConfiguration = applyParametersIfPresent(jobConfiguration);
269272
runQuery(sql, jobConfiguration.build());
270273
} catch (InterruptedException ex) {
271274
throw new BigQueryJdbcRuntimeException("Interrupted during executeLargeUpdate", ex);
@@ -301,12 +304,13 @@ public boolean execute(String sql) throws SQLException {
301304
private boolean executeImpl(String sql) throws SQLException {
302305
logQueryExecutionStart(sql);
303306
try {
304-
QueryJobConfiguration jobConfiguration = getJobConfig(sql).build();
305-
// If Large Results are enabled, ensure query type is SELECT
306-
if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) {
307+
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
308+
jobConfiguration = applyParametersIfPresent(jobConfiguration);
309+
if (isLargeResultsEnabled()
310+
&& getQueryType(jobConfiguration.build(), null) == SqlType.SELECT) {
307311
jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration);
308312
}
309-
runQuery(sql, jobConfiguration);
313+
runQuery(sql, jobConfiguration.build());
310314
} catch (InterruptedException ex) {
311315
throw new BigQueryJdbcRuntimeException("Interrupted during execute", ex);
312316
}
@@ -623,35 +627,43 @@ void runQuery(String query, QueryJobConfiguration jobConfiguration)
623627
}
624628
}
625629

626-
private boolean isLargeResultsEnabled() {
630+
protected QueryJobConfiguration.Builder applyParametersIfPresent(
631+
QueryJobConfiguration.Builder jobConfigurationBuilder) throws SQLException {
632+
if (this.parameterHandler != null && this.parameterHandler.getParametersArraySize() > 0) {
633+
jobConfigurationBuilder.setParameterMode("POSITIONAL");
634+
jobConfigurationBuilder = this.parameterHandler.configureParameters(jobConfigurationBuilder);
635+
}
636+
return jobConfigurationBuilder;
637+
}
638+
639+
boolean isLargeResultsEnabled() {
627640
String destinationTable = this.querySettings.getDestinationTable();
628641
String destinationDataset = this.querySettings.getDestinationDataset();
629642
return destinationDataset != null || destinationTable != null;
630643
}
631644

632-
private QueryJobConfiguration setDestinationDatasetAndTableInJobConfig(
633-
QueryJobConfiguration jobConfiguration) {
645+
QueryJobConfiguration.Builder setDestinationDatasetAndTableInJobConfig(
646+
QueryJobConfiguration.Builder jobConfigurationBuilder) {
634647
String destinationTable = this.querySettings.getDestinationTable();
635648
String destinationDataset = this.querySettings.getDestinationDataset();
636649
if (destinationDataset != null || destinationTable != null) {
637650
if (destinationDataset != null) {
638651
checkIfDatasetExistElseCreate(destinationDataset);
639652
}
640-
if (jobConfiguration.useLegacySql() && destinationDataset == null) {
653+
if (getUseLegacySql() && destinationDataset == null) {
641654
checkIfDatasetExistElseCreate(DEFAULT_DATASET_NAME);
642655
destinationDataset = DEFAULT_DATASET_NAME;
643656
}
644657
if (destinationTable == null) {
645658
destinationTable = getDefaultDestinationTable();
646659
}
647-
return jobConfiguration.toBuilder()
660+
return jobConfigurationBuilder
648661
.setAllowLargeResults(this.querySettings.getAllowLargeResults())
649662
.setDestinationTable(TableId.of(destinationDataset, destinationTable))
650663
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
651-
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE)
652-
.build();
664+
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE);
653665
}
654-
return jobConfiguration;
666+
return jobConfigurationBuilder;
655667
}
656668

657669
Job getNextJob() {
@@ -1407,14 +1419,16 @@ QueryJobConfiguration.Builder getJobConfig(String query) {
14071419
if (this.querySettings.getQueryProperties() != null) {
14081420
queryConfigBuilder.setConnectionProperties(this.querySettings.getQueryProperties());
14091421
}
1410-
boolean useLegacy =
1411-
QueryDialectType.BIG_QUERY.equals(
1412-
QueryDialectType.valueOf(this.querySettings.getQueryDialect()));
1413-
queryConfigBuilder.setUseLegacySql(useLegacy);
1422+
queryConfigBuilder.setUseLegacySql(getUseLegacySql());
14141423

14151424
return queryConfigBuilder;
14161425
}
14171426

1427+
private boolean getUseLegacySql() {
1428+
return QueryDialectType.BIG_QUERY.equals(
1429+
QueryDialectType.valueOf(this.querySettings.getQueryDialect()));
1430+
}
1431+
14181432
private void checkIfDatasetExistElseCreate(String datasetName) {
14191433
Dataset dataset = bigQuery.getDataset(DatasetId.of(datasetName));
14201434
if (dataset == null) {

java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,86 @@ public void testWrapperMethods() throws SQLException {
702702
assertTrue(e.getMessage().contains("Cannot unwrap to java.sql.Connection"));
703703
}
704704

705+
@Test
706+
public void testPreparedStatementExecuteQueryWithLargeResults() throws Exception {
707+
// Setup connection mocks to return large results settings
708+
doReturn(true).when(bigQueryConnection).isAllowLargeResults();
709+
doReturn("test_dataset").when(bigQueryConnection).getDestinationDataset();
710+
doReturn("test_table").when(bigQueryConnection).getDestinationTable();
711+
712+
com.google.cloud.bigquery.Dataset dataset = mock(com.google.cloud.bigquery.Dataset.class);
713+
doReturn(dataset).when(bigquery).getDataset(any(com.google.cloud.bigquery.DatasetId.class));
714+
715+
// Create PreparedStatement
716+
BigQueryPreparedStatement preparedStatement =
717+
new BigQueryPreparedStatement(bigQueryConnection, query);
718+
BigQueryPreparedStatement preparedStatementSpy = Mockito.spy(preparedStatement);
719+
720+
TableResult result = Mockito.mock(TableResult.class);
721+
BigQueryJsonResultSet jsonResultSet = mock(BigQueryJsonResultSet.class);
722+
QueryJobConfiguration jobConfiguration = QueryJobConfiguration.newBuilder(query).build();
723+
Job job = getJobMock(result, jobConfiguration, StatementType.SELECT);
724+
725+
doReturn(job).when(bigquery).queryWithTimeout(any(), any(), any());
726+
doReturn(jsonResultSet).when(preparedStatementSpy).processJsonResultSet(eq(result), any());
727+
728+
Job dryRunJob = getJobMock(null, jobConfiguration, StatementType.SELECT);
729+
doReturn(dryRunJob).when(bigquery).create(any(JobInfo.class));
730+
731+
// Act
732+
preparedStatementSpy.executeQuery();
733+
734+
// Assert
735+
ArgumentCaptor<QueryJobConfiguration> captor =
736+
ArgumentCaptor.forClass(QueryJobConfiguration.class);
737+
verify(bigquery).queryWithTimeout(captor.capture(), any(), any());
738+
QueryJobConfiguration capturedConfig = captor.getValue();
739+
740+
assertThat(capturedConfig.getDestinationTable())
741+
.isEqualTo(TableId.of("test_dataset", "test_table"));
742+
assertThat(capturedConfig.allowLargeResults()).isTrue();
743+
}
744+
745+
@Test
746+
public void testPreparedStatementExecuteWithLargeResults() throws Exception {
747+
// Setup connection mocks to return large results settings
748+
doReturn(true).when(bigQueryConnection).isAllowLargeResults();
749+
doReturn("test_dataset").when(bigQueryConnection).getDestinationDataset();
750+
doReturn("test_table").when(bigQueryConnection).getDestinationTable();
751+
752+
com.google.cloud.bigquery.Dataset dataset = mock(com.google.cloud.bigquery.Dataset.class);
753+
doReturn(dataset).when(bigquery).getDataset(any(com.google.cloud.bigquery.DatasetId.class));
754+
755+
// Create PreparedStatement
756+
BigQueryPreparedStatement preparedStatement =
757+
new BigQueryPreparedStatement(bigQueryConnection, query);
758+
BigQueryPreparedStatement preparedStatementSpy = Mockito.spy(preparedStatement);
759+
760+
TableResult result = Mockito.mock(TableResult.class);
761+
BigQueryJsonResultSet jsonResultSet = mock(BigQueryJsonResultSet.class);
762+
QueryJobConfiguration jobConfiguration = QueryJobConfiguration.newBuilder(query).build();
763+
Job job = getJobMock(result, jobConfiguration, StatementType.SELECT);
764+
765+
doReturn(job).when(bigquery).queryWithTimeout(any(), any(), any());
766+
doReturn(jsonResultSet).when(preparedStatementSpy).processJsonResultSet(eq(result), any());
767+
768+
Job dryRunJob = getJobMock(null, jobConfiguration, StatementType.SELECT);
769+
doReturn(dryRunJob).when(bigquery).create(any(JobInfo.class));
770+
771+
// Act
772+
preparedStatementSpy.execute();
773+
774+
// Assert
775+
ArgumentCaptor<QueryJobConfiguration> captor =
776+
ArgumentCaptor.forClass(QueryJobConfiguration.class);
777+
verify(bigquery).queryWithTimeout(captor.capture(), any(), any());
778+
QueryJobConfiguration capturedConfig = captor.getValue();
779+
780+
assertThat(capturedConfig.getDestinationTable())
781+
.isEqualTo(TableId.of("test_dataset", "test_table"));
782+
assertThat(capturedConfig.allowLargeResults()).isTrue();
783+
}
784+
705785
@Test
706786
public void testSetFetchSizeNegativeThrows() {
707787
org.junit.jupiter.api.Assertions.assertThrows(

0 commit comments

Comments
 (0)