Skip to content

Commit c2523f2

Browse files
committed
refactor: unify ResultSet implementations on Arrow-backed path
Collapse the two ResultSet families (streaming Arrow + row-based metadata) into a single Arrow-backed implementation so there is one accessor pipeline, one set of type semantics, and one place to fix bugs. Changes: - ArrowStreamReaderCursor becomes source-agnostic: a BatchLoader drives a VectorSchemaRoot, whether sourced from an ArrowStreamReader or a pre-populated in-memory batch. The cursor also owns an AutoCloseable so it is responsible for releasing the allocator + reader on close — the old ArrowStreamReader.close() would only tear down vectors and leak the 100 MB RootAllocator. - QueryResultArrowStream.toArrowStreamReader returns a Result holder that pairs the reader with the allocator and closes both in the right order so Arrow's accounting invariants hold. - StreamingResultSet gains ofInMemory(root, owned, queryId, zone, cols) so metadata results funnel through the same result set. A columns override preserves the JDBC-spec typeName labels (e.g. TEXT) that would otherwise be lost when deriving from the Arrow schema. - MetadataArrowBuilder materialises List<List<Object>> metadata rows into a populated VectorSchemaRoot using the existing HyperTypeToArrow mapping; MetadataResultSets is the factory callers use. - QueryMetadataUtil and DataCloudDatabaseMetadata route getTables, getColumns, getSchemas, getTypeInfo and empty metadata results through the Arrow-backed StreamingResultSet. - DataCloudMetadataResultSet, SimpleResultSet, and ColumnAccessor are removed now that no caller depends on them. - StreamingResultSet.getObject(int, Class) gains an isInstance-based fallback so callers can retrieve String-typed VARCHAR columns without each accessor having to implement typed getObject. - Tests moved to the unified path; integer-accessor-only assertions in DataCloudDatabaseMetadataTest updated to reflect stricter Arrow accessor semantics.
1 parent ec73677 commit c2523f2

19 files changed

Lines changed: 563 additions & 1220 deletions

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/ArrowStreamReaderCursor.java

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import com.salesforce.datacloud.jdbc.core.accessor.QueryJDBCAccessor;
1010
import com.salesforce.datacloud.jdbc.core.accessor.QueryJDBCAccessorFactory;
11-
import java.io.IOException;
1211
import java.sql.SQLException;
1312
import java.time.ZoneId;
1413
import java.util.List;
@@ -21,31 +20,79 @@
2120
import org.apache.arrow.vector.VectorSchemaRoot;
2221
import org.apache.arrow.vector.ipc.ArrowStreamReader;
2322

23+
/**
24+
* Row cursor over a {@link VectorSchemaRoot} that drives the {@link StreamingResultSet}.
25+
*
26+
* <p>The cursor is source-agnostic: a {@link BatchLoader} loads the next batch into the vector
27+
* schema root, whether that comes from an Arrow IPC stream or a pre-populated in-memory batch.
28+
* This is the single codepath that the driver exposes to JDBC callers — streaming query results
29+
* and materialised metadata results both funnel through here.
30+
*
31+
* <p>The cursor owns an optional {@link AutoCloseable} holding the resources that back the
32+
* vector schema root (allocator, underlying reader, etc.). Closing the cursor closes that holder,
33+
* guaranteeing root-allocator hygiene without requiring each call site to manage the allocator
34+
* separately.
35+
*/
2436
@Slf4j
2537
class ArrowStreamReaderCursor implements AutoCloseable {
2638

2739
private static final int INIT_ROW_NUMBER = -1;
2840

29-
private final ArrowStreamReader reader;
41+
private final VectorSchemaRoot root;
42+
private final BatchLoader batchLoader;
43+
private final AutoCloseable ownedResources;
3044
private final ZoneId sessionZone;
3145

3246
@lombok.Getter
3347
private int rowsSeen = 0;
3448

3549
private final AtomicInteger currentIndex = new AtomicInteger(INIT_ROW_NUMBER);
3650

37-
ArrowStreamReaderCursor(ArrowStreamReader reader, ZoneId sessionZone) {
38-
this.reader = reader;
39-
this.sessionZone = sessionZone;
51+
/**
52+
* Loads the next batch of rows into the vector schema root.
53+
*
54+
* <p>Implementations should return {@code true} if the vector schema root now holds rows from
55+
* a newly-loaded batch, and {@code false} if the source has no more data.
56+
*/
57+
@FunctionalInterface
58+
interface BatchLoader {
59+
boolean loadNextBatch() throws Exception;
4060
}
4161

62+
/**
63+
* Create a cursor that pulls batches from an {@link ArrowStreamReader}. The reader (and the
64+
* allocator it was constructed with) are owned by the cursor — closing the cursor closes the
65+
* supplied {@code ownedResources}.
66+
*/
4267
@SneakyThrows
43-
private VectorSchemaRoot getSchemaRoot() {
44-
return reader.getVectorSchemaRoot();
68+
static ArrowStreamReaderCursor streaming(
69+
ArrowStreamReader reader, AutoCloseable ownedResources, ZoneId sessionZone) {
70+
val root = reader.getVectorSchemaRoot();
71+
BatchLoader loader = reader::loadNextBatch;
72+
return new ArrowStreamReaderCursor(root, loader, ownedResources, sessionZone);
73+
}
74+
75+
/**
76+
* Create a cursor over a single pre-populated {@link VectorSchemaRoot}. The root (and any
77+
* backing allocator wrapped in {@code ownedResources}) are owned by the cursor — closing the
78+
* cursor closes the supplied {@code ownedResources}.
79+
*/
80+
static ArrowStreamReaderCursor inMemory(VectorSchemaRoot root, AutoCloseable ownedResources, ZoneId sessionZone) {
81+
// The VSR is already populated, so there is nothing more to load — the cursor walks the
82+
// row count until exhausted and then reports end-of-stream.
83+
return new ArrowStreamReaderCursor(root, () -> false, ownedResources, sessionZone);
84+
}
85+
86+
private ArrowStreamReaderCursor(
87+
VectorSchemaRoot root, BatchLoader batchLoader, AutoCloseable ownedResources, ZoneId sessionZone) {
88+
this.root = root;
89+
this.batchLoader = batchLoader;
90+
this.ownedResources = ownedResources;
91+
this.sessionZone = sessionZone;
4592
}
4693

4794
List<QueryJDBCAccessor> createAccessors() {
48-
return getSchemaRoot().getFieldVectors().stream()
95+
return root.getFieldVectors().stream()
4996
.map(rethrowFunction(this::createAccessor))
5097
.collect(Collectors.toList());
5198
}
@@ -59,13 +106,13 @@ private QueryJDBCAccessor createAccessor(FieldVector vector) throws SQLException
59106
*
60107
* <p>Empty IPC batches are valid; Hyper can send an empty first chunk. JDBC has no notion of
61108
* a "batch": {@link java.sql.ResultSet#next()} only knows "is there another row?". This
62-
* cursor is the single place that translates batch-level signals from {@link
63-
* ArrowStreamReader#loadNextBatch()} into row-level advances, so it must consume the empty
64-
* batches itself rather than push that responsibility outward.
109+
* cursor is the single place that translates batch-level signals from {@link BatchLoader}
110+
* into row-level advances, so it must consume the empty batches itself rather than push
111+
* that responsibility outward.
65112
*/
66-
private boolean loadNextNonEmptyBatch() throws IOException {
67-
while (reader.loadNextBatch()) {
68-
if (getSchemaRoot().getRowCount() > 0) {
113+
private boolean loadNextNonEmptyBatch() throws Exception {
114+
while (batchLoader.loadNextBatch()) {
115+
if (root.getRowCount() > 0) {
69116
currentIndex.set(0);
70117
return true;
71118
}
@@ -76,7 +123,7 @@ private boolean loadNextNonEmptyBatch() throws IOException {
76123
@SneakyThrows
77124
public boolean next() {
78125
val current = currentIndex.incrementAndGet();
79-
val total = getSchemaRoot().getRowCount();
126+
val total = root.getRowCount();
80127

81128
try {
82129
val next = current < total || loadNextNonEmptyBatch();
@@ -97,6 +144,8 @@ public boolean next() {
97144
@SneakyThrows
98145
@Override
99146
public void close() {
100-
reader.close();
147+
if (ownedResources != null) {
148+
ownedResources.close();
149+
}
101150
}
102151
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudDatabaseMetadata.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import com.google.common.collect.ImmutableList;
1515
import com.salesforce.datacloud.jdbc.config.DriverVersion;
16-
import com.salesforce.datacloud.jdbc.core.metadata.DataCloudResultSetMetaData;
16+
import com.salesforce.datacloud.jdbc.core.metadata.MetadataResultSets;
1717
import com.salesforce.datacloud.jdbc.core.types.HyperTypes;
1818
import com.salesforce.datacloud.jdbc.util.JdbcURL;
1919
import com.salesforce.datacloud.jdbc.util.ThrowingJdbcSupplier;
@@ -706,39 +706,39 @@ public ResultSet getColumns(String catalog, String schemaPattern, String tableNa
706706
@Override
707707
public ResultSet getColumnPrivileges(String catalog, String schema, String table, String columnNamePattern)
708708
throws SQLException {
709-
return DataCloudMetadataResultSet.empty();
709+
return MetadataResultSets.emptyNoColumns();
710710
}
711711

712712
@Override
713713
public ResultSet getTablePrivileges(String catalog, String schemaPattern, String tableNamePattern)
714714
throws SQLException {
715-
return DataCloudMetadataResultSet.empty();
715+
return MetadataResultSets.emptyNoColumns();
716716
}
717717

718718
@Override
719719
public ResultSet getBestRowIdentifier(String catalog, String schema, String table, int scope, boolean nullable)
720720
throws SQLException {
721-
return DataCloudMetadataResultSet.empty();
721+
return MetadataResultSets.emptyNoColumns();
722722
}
723723

724724
@Override
725725
public ResultSet getVersionColumns(String catalog, String schema, String table) throws SQLException {
726-
return DataCloudMetadataResultSet.empty();
726+
return MetadataResultSets.emptyNoColumns();
727727
}
728728

729729
@Override
730730
public ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException {
731-
return DataCloudMetadataResultSet.empty();
731+
return MetadataResultSets.emptyNoColumns();
732732
}
733733

734734
@Override
735735
public ResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException {
736-
return DataCloudMetadataResultSet.empty();
736+
return MetadataResultSets.emptyNoColumns();
737737
}
738738

739739
@Override
740740
public ResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException {
741-
return DataCloudMetadataResultSet.empty();
741+
return MetadataResultSets.emptyNoColumns();
742742
}
743743

744744
@Override
@@ -750,19 +750,18 @@ public ResultSet getCrossReference(
750750
String foreignSchema,
751751
String foreignTable)
752752
throws SQLException {
753-
return DataCloudMetadataResultSet.empty();
753+
return MetadataResultSets.emptyNoColumns();
754754
}
755755

756756
@Override
757757
public ResultSet getTypeInfo() throws SQLException {
758-
return DataCloudMetadataResultSet.of(
759-
new DataCloudResultSetMetaData(MetadataSchemas.TYPE_INFO), HyperTypes.typeInfoRows());
758+
return MetadataResultSets.ofRawRows(MetadataSchemas.TYPE_INFO, HyperTypes.typeInfoRows());
760759
}
761760

762761
@Override
763762
public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate)
764763
throws SQLException {
765-
return DataCloudMetadataResultSet.empty();
764+
return MetadataResultSets.emptyNoColumns();
766765
}
767766

768767
@Override

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudMetadataResultSet.java

Lines changed: 0 additions & 164 deletions
This file was deleted.

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/QueryMetadataUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import com.google.common.base.Strings;
1313
import com.google.common.collect.ImmutableList;
1414
import com.google.common.collect.ImmutableMap;
15-
import com.salesforce.datacloud.jdbc.core.metadata.DataCloudResultSetMetaData;
15+
import com.salesforce.datacloud.jdbc.core.metadata.MetadataResultSets;
1616
import com.salesforce.datacloud.jdbc.core.types.HyperTypes;
1717
import com.salesforce.datacloud.jdbc.protocol.data.ColumnMetadata;
1818
import com.salesforce.datacloud.jdbc.protocol.data.HyperType;
@@ -75,7 +75,7 @@ public static ResultSet createTableResultSet(
7575
}
7676

7777
static ResultSet getMetadataResultSet(List<ColumnMetadata> columns, List<Object> data) throws SQLException {
78-
return DataCloudMetadataResultSet.of(new DataCloudResultSetMetaData(columns), data);
78+
return MetadataResultSets.ofRawRows(columns, data);
7979
}
8080

8181
private static List<Object> constructTableData(ResultSet resultSet) throws SQLException {

0 commit comments

Comments
 (0)