Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class DataCloudPreparedStatement extends DataCloudStatement implements Pr
private String sql;
private final ParameterManager parameterManager;
private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
// True if we are currently fetching metadata from the server, this influences the query param generation
// to not return any data.
private boolean fetchingMetadata = false;

DataCloudPreparedStatement(DataCloudConnection connection, ParameterManager parameterManager) {
super(connection);
Expand Down Expand Up @@ -78,22 +81,26 @@ public boolean execute(String sql) throws SQLException {
}

@Override
protected ExecuteQueryParamBuilder getQueryParamBuilder(QueryTimeout queryTimeout) throws SQLException {
protected QueryParam.Builder getQueryParamBuilder(
String sql, QueryTimeout queryTimeout, QueryParam.TransferMode transferMode) throws SQLException {
val builder = super.getQueryParamBuilder(sql, queryTimeout, transferMode);

final byte[] encodedRow;
try {
encodedRow = toArrowByteArray(parameterManager.getParameters(), calendar);
} catch (IOException e) {
throw new SQLException("Failed to encode parameters on prepared statement", e);
}

val preparedQueryParams = QueryParam.newBuilder()
.setParamStyle(QueryParam.ParameterStyle.QUESTION_MARK)
if (fetchingMetadata) {
// Submit the query as metadata only query, with limit 0 Hyper will skip execution.
builder.setQueryRowLimit(0);
}

return builder.setParamStyle(QueryParam.ParameterStyle.QUESTION_MARK)
.setArrowParameters(QueryParameterArrow.newBuilder()
.setData(ByteString.copyFrom(encodedRow))
.build())
.build();

return super.getQueryParamBuilder(queryTimeout).withQueryParams(preparedQueryParams);
.build());
}

public boolean executeAsyncQuery() throws SQLException {
Expand Down Expand Up @@ -265,7 +272,18 @@ public void setArray(int parameterIndex, Array x) throws SQLException {

@Override
public ResultSetMetaData getMetaData() throws SQLException {
throw new SQLException(NOT_SUPPORTED_IN_DATACLOUD_QUERY, SqlErrorCodes.FEATURE_NOT_SUPPORTED);
if ((resultSet != null) && !resultSet.isClosed()) {
return resultSet.getMetaData();
}
try {
fetchingMetadata = true;
val result = super.executeQuery(sql);
val metadata = result.getMetaData();
result.close();
return metadata;
} finally {
fetchingMetadata = false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import salesforce.cdp.hyperdb.v1.QueryParam;
import salesforce.cdp.hyperdb.v1.ResultRange;

@Slf4j
public class DataCloudStatement implements Statement, AutoCloseable {
Expand Down Expand Up @@ -58,13 +59,23 @@ public DataCloudStatement(@NonNull DataCloudConnection connection) {
this.statementProperties = connection.getConnectionProperties().getStatementProperties();
}

protected ExecuteQueryParamBuilder getQueryParamBuilder(QueryTimeout queryTimeout) throws SQLException {
protected QueryParam.Builder getQueryParamBuilder(
String sql, QueryTimeout queryTimeout, QueryParam.TransferMode transferMode) throws SQLException {
val builder = QueryParam.newBuilder()
.setQuery(sql)
.setOutputFormat(QueryResultArrowStream.OUTPUT_FORMAT)
.setTransferMode(transferMode);

val querySettings = new HashMap<>(statementProperties.getQuerySettings());
if (!queryTimeout.getServerQueryTimeout().isZero()) {
querySettings.put(
"query_timeout", queryTimeout.getServerQueryTimeout().toMillis() + "ms");
}
return ExecuteQueryParamBuilder.of(querySettings);
if (!querySettings.isEmpty()) {
builder.putAllSettings(querySettings);
}

return builder;
}

@Getter
Expand Down Expand Up @@ -127,10 +138,14 @@ public ResultSet executeQuery(String sql) throws SQLException {
private QueryResultIterator executeAdaptiveQuery(String sql) throws SQLException {
val queryTimeout = QueryTimeout.of(
statementProperties.getQueryTimeout(), statementProperties.getQueryTimeoutLocalEnforcementDelay());
val paramBuilder = getQueryParamBuilder(queryTimeout);
val queryParam = targetMaxRows > 0
? paramBuilder.getAdaptiveRowLimitQueryParams(sql, targetMaxRows, targetMaxBytes)
: paramBuilder.getAdaptiveQueryParams(sql);
val paramBuilder = getQueryParamBuilder(sql, queryTimeout, QueryParam.TransferMode.ADAPTIVE);
if (targetMaxRows > 0) {
val range = ResultRange.newBuilder().setRowLimit(targetMaxRows).setByteLimit(targetMaxBytes);
paramBuilder.setResultRange(range);
log.info("setting row limit query. maxRows={}, maxBytes={}", (long) targetMaxRows, (long) targetMaxBytes);
}
QueryParam queryParam = paramBuilder.build();

val stub = connection
.getStub()
.withDeadlineAfter(
Expand All @@ -147,8 +162,8 @@ protected void executeAsyncQueryInternal(String sql) throws SQLException {
try {
val queryTimeout = QueryTimeout.of(
statementProperties.getQueryTimeout(), statementProperties.getQueryTimeoutLocalEnforcementDelay());
val paramBuilder = getQueryParamBuilder(queryTimeout);
val request = paramBuilder.getQueryParams(sql, QueryParam.TransferMode.ASYNC);
val paramBuilder = getQueryParamBuilder(sql, queryTimeout, QueryParam.TransferMode.ASYNC);
QueryParam queryParam = paramBuilder.build();
val stub = connection
.getStub()
.withDeadlineAfter(
Expand All @@ -157,7 +172,7 @@ protected void executeAsyncQueryInternal(String sql) throws SQLException {
// We set the deadline based off the query timeout here as the server-side doesn't properly enforce
// the query timeout during the initial compilation phase. By setting the deadline, we can ensure
// that the query timeout is enforced also when the server hangs during compilation.
queryHandle = AsyncQueryAccessHandle.of(stub, request);
queryHandle = AsyncQueryAccessHandle.of(stub, queryParam);
log.info(
"executeAsyncQuery completed. queryId={}",
queryHandle.getQueryStatus().getQueryId());
Expand Down Expand Up @@ -185,6 +200,7 @@ public void close() throws SQLException {
log.debug("Entering close");
if (resultSet != null) {
resultSet.close();
resultSet = null;
}
log.debug("Exiting close");
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import com.salesforce.datacloud.jdbc.hyper.LocalHyperTestBase;
import com.salesforce.datacloud.jdbc.util.HyperLogScope;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
Expand All @@ -22,6 +26,7 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -222,4 +227,103 @@ public void testPreparedStatementTimestampWithCalendarRange() {
}
}
}

@Test
@SneakyThrows
public void testGetMetaDataReturnsResultSetMetaData() {
try (HyperLogScope logScope = new HyperLogScope()) {
try (Connection connection = getHyperQueryConnection(logScope.getProperties())) {
try (PreparedStatement preparedStatement =
connection.prepareStatement("select 1 as id, 'test' as name, 3.14 as value, pg_sleep(100000) as"
+ " would_timeout_in_execution")) {
ResultSetMetaData metadata = preparedStatement.getMetaData();

assertThat(metadata).isNotNull();
assertThat(metadata.getColumnCount()).isEqualTo(4);

assertThat(metadata.getColumnName(1)).isEqualTo("id");
assertThat(metadata.getColumnTypeName(1)).isEqualTo("INTEGER");

assertThat(metadata.getColumnName(2)).isEqualTo("name");
assertThat(metadata.getColumnTypeName(2)).isEqualTo("VARCHAR");

assertThat(metadata.getColumnName(3)).isEqualTo("value");
assertThat(metadata.getColumnTypeName(3)).isEqualTo("DECIMAL");

// Verify that the query actually finished
ResultSet resultSet = logScope.executeQuery("SELECT COUNT(*) FROM hyper_log WHERE k='query-end'");
resultSet.next();
Assertions.assertThat(resultSet.getDouble(1)).isEqualTo(1);
}
}
}
}

@Test
@SneakyThrows
public void testGetMetaDataFollowedByExecuteReturnsData() {
try (Connection connection = getHyperQueryConnection()) {
try (PreparedStatement preparedStatement =
connection.prepareStatement("select 1 as id, 'test' as name, 3.14 as value")) {
ResultSetMetaData metadata = preparedStatement.getMetaData();
assertThat(metadata).isNotNull();
assertThat(metadata.getColumnCount()).isEqualTo(3);

try (ResultSet resultSet = preparedStatement.executeQuery()) {
assertThat(resultSet.next()).isTrue();
assertThat(resultSet.getInt("id")).isEqualTo(1);
assertThat(resultSet.getString("name")).isEqualTo("test");
assertThat(resultSet.getBigDecimal("value")).isEqualTo(BigDecimal.valueOf(3.14));
assertThat(resultSet.next()).isFalse();
}
}
}
}

@Test
@SneakyThrows
public void testGetMetaDataWithInvalidQueryThrowsSQLException() {
try (Connection connection = getHyperQueryConnection()) {
try (PreparedStatement preparedStatement =
connection.prepareStatement("select * from non_existent_table")) {
Assertions.assertThatThrownBy(preparedStatement::getMetaData)
.isInstanceOf(SQLException.class)
.hasMessageContaining("table \"non_existent_table\" does not exist");
}
}
}

@Test
@SneakyThrows
public void testGetMetaDataAfterExecuteDoesNotQueryAgain() {
try (HyperLogScope logScope = new HyperLogScope()) {
try (Connection connection = getHyperQueryConnection(logScope.getProperties())) {
try (PreparedStatement preparedStatement =
connection.prepareStatement("select 1 as id, 'test' as name")) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
assertThat(resultSet.next()).isTrue();

ResultSetMetaData metadata = preparedStatement.getMetaData();
assertThat(metadata).isNotNull();
assertThat(metadata.getColumnCount()).isEqualTo(2);
}

ResultSet logResult = logScope.executeQuery("SELECT COUNT(*) FROM hyper_log WHERE k='query-end'");
logResult.next();
Assertions.assertThat(logResult.getDouble(1))
.as("Should only have one query execution, not two")
.isEqualTo(1);

// Test that after closing the resultset it would query again
ResultSetMetaData metadata = preparedStatement.getMetaData();
assertThat(metadata).isNotNull();
assertThat(metadata.getColumnCount()).isEqualTo(2);

ResultSet logResult2 = logScope.executeQuery("SELECT COUNT(*) FROM hyper_log WHERE k='query-end'");
logResult2.next();
Assertions.assertThat(logResult2.getDouble(1)).isEqualTo(2);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ private static Stream<Arguments> unsupported() {
impl("executeUpdate", s -> s.executeUpdate("", Statement.RETURN_GENERATED_KEYS)),
impl("executeUpdate", s -> s.executeUpdate("", new int[] {})),
impl("executeUpdate", s -> s.executeUpdate("", new String[] {})),
impl("getMetaData", DataCloudPreparedStatement::getMetaData),
impl("getParameterMetaData", DataCloudPreparedStatement::getParameterMetaData));
}

Expand Down
Loading
Loading