Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import static com.databricks.jdbc.integration.IntegrationTestUtil.getValidJDBCConnection;
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

import com.databricks.jdbc.api.impl.DatabricksResultSet;
import com.databricks.jdbc.api.impl.DatabricksResultSetMetaData;
Expand All @@ -15,13 +14,14 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;

/** Test SQL execution with results spanning multiple chunks. */
public class MultiChunkExecutionIntegrationTests extends AbstractFakeServiceIntegrationTests {

@Test
void testMultiChunkSelect() throws SQLException {
void testMultiChunkSelect() throws SQLException, InterruptedException {
final String table = "samples.tpch.lineitem";

// To save on the size of stub mappings, the test uses just enough rows to span multiple chunks.
Expand All @@ -36,38 +36,62 @@ void testMultiChunkSelect() throws SQLException {
final Statement statement = connection.createStatement();
statement.setMaxRows(maxRows);

try (ResultSet rs = statement.executeQuery(sql)) {
DatabricksResultSetMetaData metaData = (DatabricksResultSetMetaData) rs.getMetaData();
final AtomicReference<Throwable> threadException = new AtomicReference<>();

int rowCount = 0;
while (rs.next()) {
rowCount++;
}
// Iterate through the result set in a different thread to surface any 1st-level thread-safety
// issues
Thread thread =
new Thread(
() -> {
try (ResultSet rs = statement.executeQuery(sql)) {
DatabricksResultSetMetaData metaData =
(DatabricksResultSetMetaData) rs.getMetaData();

int rowCount = 0;
while (rs.next()) {
rowCount++;
}

// The result should have the same number of rows as the limit
assertEquals(maxRows, rowCount);
assertEquals(maxRows, metaData.getTotalRows());

// The result should be split into multiple chunks
assertTrue(metaData.getChunkCount() > 1, "Chunk count should be greater than 1");

// The number of cloud fetch calls should be equal to the number of chunks
final int cloudFetchCalls =
getCloudFetchApiExtension()
.countRequestsMatching(getRequestedFor(urlPathMatching(".*")).build())
.getCount();
// cloud fetch calls can be retried
assertTrue(cloudFetchCalls >= metaData.getChunkCount());

if (isSqlExecSdkClient()) {
// Number of requests to fetch external links should be one less than the total
// number of chunks as first chunk link is already fetched
final String statementId = ((DatabricksResultSet) rs).getStatementId();
final String resultChunkPathRegex =
String.format(RESULT_CHUNK_PATH, statementId, ".*");
getDatabricksApiExtension()
.verify(
(int) (metaData.getChunkCount() - 1),
getRequestedFor(urlPathMatching(resultChunkPathRegex)));
}
} catch (Throwable e) {
threadException.set(e);
}
});

thread.start();
thread.join(10_000);

// The result should have the same number of rows as the limit
assertEquals(maxRows, rowCount);
assertEquals(maxRows, metaData.getTotalRows());

// The result should be split into multiple chunks
assertTrue(metaData.getChunkCount() > 1, "Chunk count should be greater than 1");

// The number of cloud fetch calls should be equal to the number of chunks
final int cloudFetchCalls =
getCloudFetchApiExtension()
.countRequestsMatching(getRequestedFor(urlPathMatching(".*")).build())
.getCount();
// cloud fetch calls can be retried
assertTrue(cloudFetchCalls >= metaData.getChunkCount());

if (isSqlExecSdkClient()) {
// Number of requests to fetch external links should be one less than the total number of
// chunks as first chunk link is already fetched
final String statementId = ((DatabricksResultSet) rs).getStatementId();
final String resultChunkPathRegex = String.format(RESULT_CHUNK_PATH, statementId, ".*");
getDatabricksApiExtension()
.verify(
(int) (metaData.getChunkCount() - 1),
getRequestedFor(urlPathMatching(resultChunkPathRegex)));
// Check if the thread had an exception
if (threadException.get() != null) {
if (threadException.get() instanceof AssertionError) {
throw (AssertionError) threadException.get();
} else {
fail("Test thread failed with exception: " + threadException.get());
}
}

Expand Down
Loading