Skip to content

Commit e7dd9e4

Browse files
committed
refactor: address PR review — share vector builder and use in-memory ArrowStreamReader
Rework the ResultSet unification to address two reviewer requests on #175: 1. Share the vector-building code with the parameter-encoding path instead of having a dedicated MetadataArrowBuilder. VectorPopulator now exposes a row-indexed primitive (setCell) used by both callers. The existing single-row parameter-binding overload and a new many-row metadata overload both funnel through it, and all the individual vector setters are parameterised by row index. 2. Keep ArrowStreamReaderCursor on its original ArrowStreamReader-only interface. The metadata path now serialises a populated VSR to Arrow IPC bytes and wraps the result in a ByteArrayInputStream-backed ArrowStreamReader, so both streaming and metadata result sets travel through exactly the same reader/cursor plumbing. Supporting changes: - typeName overrides (e.g. "TEXT" for JDBC-spec metadata columns) now round-trip through Arrow via a jdbc:type_name field-metadata key rather than a columns-override parameter on StreamingResultSet. HyperTypeToArrow stamps the key on write; ArrowToHyperTypeMapper.toColumnMetadata reads it back. - StreamingResultSet drops the ofInMemory(...) factory and the columns override; callers construct an ArrowStreamReader + BufferAllocator pair and hand them to of(reader, allocator, queryId, zone). The cursor owns both and closes reader-then-allocator on close. - QueryResultArrowStream.toArrowStreamReader returns a simple Result holder (reader + allocator) instead of an AutoCloseable bundle. - MetadataResultSets is the single entry point for Arrow-backed metadata result sets; MetadataArrowBuilder is deleted. - Empty metadata results skip writeBatch() entirely so ArrowStreamReaderCursor doesn't interpret a zero-row batch as "at least one row available". - Tests updated to the new API; StreamingResultSetMethodTest builds its in-memory ResultSet the same way as the metadata path (IPC round-trip).
1 parent c2523f2 commit e7dd9e4

14 files changed

Lines changed: 358 additions & 491 deletions

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/ArrowStreamReaderCursor.java

Lines changed: 30 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import com.salesforce.datacloud.jdbc.core.accessor.QueryJDBCAccessor;
1010
import com.salesforce.datacloud.jdbc.core.accessor.QueryJDBCAccessorFactory;
11+
import java.io.IOException;
1112
import java.sql.SQLException;
1213
import java.time.ZoneId;
1314
import java.util.List;
@@ -16,83 +17,47 @@
1617
import lombok.SneakyThrows;
1718
import lombok.extern.slf4j.Slf4j;
1819
import lombok.val;
20+
import org.apache.arrow.memory.BufferAllocator;
1921
import org.apache.arrow.vector.FieldVector;
2022
import org.apache.arrow.vector.VectorSchemaRoot;
2123
import org.apache.arrow.vector.ipc.ArrowStreamReader;
2224

2325
/**
24-
* Row cursor over a {@link VectorSchemaRoot} that drives the {@link StreamingResultSet}.
26+
* Row cursor over an {@link ArrowStreamReader} that drives the {@link StreamingResultSet}.
2527
*
26-
* <p>The cursor is source-agnostic: a {@link BatchLoader} loads the next batch into the vector
27-
* schema root, whether that comes from an Arrow IPC stream or a pre-populated in-memory batch.
28-
* This is the single codepath that the driver exposes to JDBC callers — streaming query results
29-
* and materialised metadata results both funnel through here.
30-
*
31-
* <p>The cursor owns an optional {@link AutoCloseable} holding the resources that back the
32-
* vector schema root (allocator, underlying reader, etc.). Closing the cursor closes that holder,
33-
* guaranteeing root-allocator hygiene without requiring each call site to manage the allocator
34-
* separately.
28+
* <p>The cursor owns the supplied {@link BufferAllocator} alongside the reader: closing the
29+
* cursor closes the reader (which releases ArrowBuf accounting) and then the allocator (which
30+
* returns its budget). This is the single place that guarantees root-allocator hygiene for the
31+
* driver; callers of {@link StreamingResultSet#of} hand ownership over and do not close the
32+
* allocator themselves.
3533
*/
3634
@Slf4j
3735
class ArrowStreamReaderCursor implements AutoCloseable {
3836

3937
private static final int INIT_ROW_NUMBER = -1;
4038

41-
private final VectorSchemaRoot root;
42-
private final BatchLoader batchLoader;
43-
private final AutoCloseable ownedResources;
39+
private final ArrowStreamReader reader;
40+
private final BufferAllocator allocator;
4441
private final ZoneId sessionZone;
4542

4643
@lombok.Getter
4744
private int rowsSeen = 0;
4845

4946
private final AtomicInteger currentIndex = new AtomicInteger(INIT_ROW_NUMBER);
5047

51-
/**
52-
* Loads the next batch of rows into the vector schema root.
53-
*
54-
* <p>Implementations should return {@code true} if the vector schema root now holds rows from
55-
* a newly-loaded batch, and {@code false} if the source has no more data.
56-
*/
57-
@FunctionalInterface
58-
interface BatchLoader {
59-
boolean loadNextBatch() throws Exception;
48+
ArrowStreamReaderCursor(ArrowStreamReader reader, BufferAllocator allocator, ZoneId sessionZone) {
49+
this.reader = reader;
50+
this.allocator = allocator;
51+
this.sessionZone = sessionZone;
6052
}
6153

62-
/**
63-
* Create a cursor that pulls batches from an {@link ArrowStreamReader}. The reader (and the
64-
* allocator it was constructed with) are owned by the cursor — closing the cursor closes the
65-
* supplied {@code ownedResources}.
66-
*/
6754
@SneakyThrows
68-
static ArrowStreamReaderCursor streaming(
69-
ArrowStreamReader reader, AutoCloseable ownedResources, ZoneId sessionZone) {
70-
val root = reader.getVectorSchemaRoot();
71-
BatchLoader loader = reader::loadNextBatch;
72-
return new ArrowStreamReaderCursor(root, loader, ownedResources, sessionZone);
73-
}
74-
75-
/**
76-
* Create a cursor over a single pre-populated {@link VectorSchemaRoot}. The root (and any
77-
* backing allocator wrapped in {@code ownedResources}) are owned by the cursor — closing the
78-
* cursor closes the supplied {@code ownedResources}.
79-
*/
80-
static ArrowStreamReaderCursor inMemory(VectorSchemaRoot root, AutoCloseable ownedResources, ZoneId sessionZone) {
81-
// The VSR is already populated, so there is nothing more to load — the cursor walks the
82-
// row count until exhausted and then reports end-of-stream.
83-
return new ArrowStreamReaderCursor(root, () -> false, ownedResources, sessionZone);
84-
}
85-
86-
private ArrowStreamReaderCursor(
87-
VectorSchemaRoot root, BatchLoader batchLoader, AutoCloseable ownedResources, ZoneId sessionZone) {
88-
this.root = root;
89-
this.batchLoader = batchLoader;
90-
this.ownedResources = ownedResources;
91-
this.sessionZone = sessionZone;
55+
private VectorSchemaRoot getSchemaRoot() {
56+
return reader.getVectorSchemaRoot();
9257
}
9358

9459
List<QueryJDBCAccessor> createAccessors() {
95-
return root.getFieldVectors().stream()
60+
return getSchemaRoot().getFieldVectors().stream()
9661
.map(rethrowFunction(this::createAccessor))
9762
.collect(Collectors.toList());
9863
}
@@ -106,13 +71,13 @@ private QueryJDBCAccessor createAccessor(FieldVector vector) throws SQLException
10671
*
10772
* <p>Empty IPC batches are valid; Hyper can send an empty first chunk. JDBC has no notion of
10873
* a "batch": {@link java.sql.ResultSet#next()} only knows "is there another row?". This
109-
* cursor is the single place that translates batch-level signals from {@link BatchLoader}
110-
* into row-level advances, so it must consume the empty batches itself rather than push
111-
* that responsibility outward.
74+
* cursor is the single place that translates batch-level signals from {@link
75+
* ArrowStreamReader#loadNextBatch()} into row-level advances, so it must consume the empty
76+
* batches itself rather than push that responsibility outward.
11277
*/
113-
private boolean loadNextNonEmptyBatch() throws Exception {
114-
while (batchLoader.loadNextBatch()) {
115-
if (root.getRowCount() > 0) {
78+
private boolean loadNextNonEmptyBatch() throws IOException {
79+
while (reader.loadNextBatch()) {
80+
if (getSchemaRoot().getRowCount() > 0) {
11681
currentIndex.set(0);
11782
return true;
11883
}
@@ -123,7 +88,7 @@ private boolean loadNextNonEmptyBatch() throws Exception {
12388
@SneakyThrows
12489
public boolean next() {
12590
val current = currentIndex.incrementAndGet();
126-
val total = root.getRowCount();
91+
val total = getSchemaRoot().getRowCount();
12792

12893
try {
12994
val next = current < total || loadNextNonEmptyBatch();
@@ -144,8 +109,12 @@ public boolean next() {
144109
@SneakyThrows
145110
@Override
146111
public void close() {
147-
if (ownedResources != null) {
148-
ownedResources.close();
112+
// Close the reader first: it releases the buffers accounted against the allocator, so the
113+
// allocator's closing budget check passes. Reversing the order trips a leak detector.
114+
try {
115+
reader.close();
116+
} finally {
117+
allocator.close();
149118
}
150119
}
151120
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long
220220
QueryResultArrowStream.OUTPUT_FORMAT);
221221
val arrowStream = SQLExceptionQueryResultIterator.createSqlExceptionArrowStreamReader(
222222
iterator, connectionProperties.isIncludeCustomerDetailInReason(), queryId, null);
223-
return StreamingResultSet.of(arrowStream, queryId);
223+
return StreamingResultSet.of(arrowStream.getReader(), arrowStream.getAllocator(), queryId);
224224
} catch (StatusRuntimeException ex) {
225225
throw QueryExceptionHandler.createException(
226226
connectionProperties.isIncludeCustomerDetailInReason(), null, queryId, ex);
@@ -263,7 +263,7 @@ public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId, l
263263
QueryResultArrowStream.OUTPUT_FORMAT);
264264
val arrowStream = SQLExceptionQueryResultIterator.createSqlExceptionArrowStreamReader(
265265
iterator, connectionProperties.isIncludeCustomerDetailInReason(), queryId, null);
266-
return StreamingResultSet.of(arrowStream, queryId);
266+
return StreamingResultSet.of(arrowStream.getReader(), arrowStream.getAllocator(), queryId);
267267
} catch (StatusRuntimeException ex) {
268268
throw QueryExceptionHandler.createException(
269269
connectionProperties.isIncludeCustomerDetailInReason(), null, queryId, ex);

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudStatement.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,11 @@ public ResultSet executeQuery(String sql) throws SQLException {
196196
val iterator = executeAdaptiveQuery(sql);
197197
val arrowStream = SQLExceptionQueryResultIterator.createSqlExceptionArrowStreamReader(
198198
iterator, includeCustomerDetail, iterator.getQueryStatus().getQueryId(), sql);
199-
resultSet =
200-
StreamingResultSet.of(arrowStream, iterator.getQueryStatus().getQueryId(), sessionZone);
199+
resultSet = StreamingResultSet.of(
200+
arrowStream.getReader(),
201+
arrowStream.getAllocator(),
202+
iterator.getQueryStatus().getQueryId(),
203+
sessionZone);
201204
log.info(
202205
"executeAdaptiveQuery completed. queryId={}, sessionZone={}",
203206
queryHandle.getQueryStatus().getQueryId(),
@@ -437,7 +440,8 @@ public ResultSet getResultSet() throws SQLException {
437440
adaptiveIterator.getQueryStatus().getQueryId(),
438441
null);
439442
resultSet = StreamingResultSet.of(
440-
arrowStream,
443+
arrowStream.getReader(),
444+
arrowStream.getAllocator(),
441445
adaptiveIterator.getQueryStatus().getQueryId(),
442446
sessionZone);
443447
} else if (resultSet == null) {

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/StreamingResultSet.java

Lines changed: 29 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
import com.salesforce.datacloud.jdbc.core.resultset.ForwardOnlyResultSet;
1010
import com.salesforce.datacloud.jdbc.core.resultset.ReadOnlyResultSet;
1111
import com.salesforce.datacloud.jdbc.core.resultset.ResultSetWithPositionalGetters;
12-
import com.salesforce.datacloud.jdbc.protocol.QueryResultArrowStream;
1312
import com.salesforce.datacloud.jdbc.protocol.data.ArrowToHyperTypeMapper;
14-
import com.salesforce.datacloud.jdbc.protocol.data.ColumnMetadata;
1513
import com.salesforce.datacloud.jdbc.util.ThrowingJdbcSupplier;
1614
import com.salesforce.datacloud.query.v3.QueryStatus;
1715
import java.io.IOException;
@@ -38,13 +36,13 @@
3836
import java.sql.Types;
3937
import java.time.ZoneId;
4038
import java.util.Calendar;
41-
import java.util.List;
4239
import java.util.Map;
4340
import java.util.stream.Collectors;
4441
import lombok.Getter;
4542
import lombok.extern.slf4j.Slf4j;
4643
import lombok.val;
47-
import org.apache.arrow.vector.VectorSchemaRoot;
44+
import org.apache.arrow.memory.BufferAllocator;
45+
import org.apache.arrow.vector.ipc.ArrowStreamReader;
4846

4947
@Slf4j
5048
public class StreamingResultSet
@@ -75,28 +73,41 @@ private StreamingResultSet(
7573
this.closed = false;
7674
}
7775

78-
public static StreamingResultSet of(QueryResultArrowStream.Result arrowStream, String queryId) throws SQLException {
79-
return of(arrowStream, queryId, ZoneId.systemDefault());
76+
public static StreamingResultSet of(ArrowStreamReader reader, BufferAllocator allocator, String queryId)
77+
throws SQLException {
78+
return of(reader, allocator, queryId, ZoneId.systemDefault());
8079
}
8180

8281
/**
83-
* Creates a StreamingResultSet with a specified session timezone.
82+
* Creates a StreamingResultSet from an {@link ArrowStreamReader} and its backing allocator.
83+
*
84+
* <p>Ownership of both the reader and the allocator transfers to the returned result set —
85+
* closing the result set closes the reader and then the allocator, in that order, so Arrow's
86+
* buffer accounting clears before the allocator's budget check. Callers must not close
87+
* either separately.
8488
*
85-
* <p>Ownership of {@code arrowStream} (both the reader and its backing allocator) transfers
86-
* to the returned result set — callers must not close it separately.
89+
* <p>The column metadata (including any {@link ColumnMetadata#getTypeName()} override
90+
* stamped under {@link com.salesforce.datacloud.jdbc.protocol.data.HyperTypeToArrow#JDBC_TYPE_NAME_METADATA_KEY})
91+
* is derived from the Arrow schema via {@link ArrowToHyperTypeMapper#toColumnMetadata(org.apache.arrow.vector.types.pojo.Field)}.
8792
*
88-
* @param arrowStream The Arrow stream containing query results, owned by the result set
89-
* @param queryId The query identifier
90-
* @param sessionZone The session timezone to use for timestamp conversions
91-
* @return A new StreamingResultSet
92-
* @throws SQLException If an error occurs during ResultSet creation
93+
* @param reader The Arrow stream, owned by the result set.
94+
* @param allocator The allocator backing the reader, owned by the result set.
95+
* @param queryId The query identifier.
96+
* @param sessionZone The session timezone used for timestamp conversions.
9397
*/
94-
public static StreamingResultSet of(QueryResultArrowStream.Result arrowStream, String queryId, ZoneId sessionZone)
98+
public static StreamingResultSet of(
99+
ArrowStreamReader reader, BufferAllocator allocator, String queryId, ZoneId sessionZone)
95100
throws SQLException {
96101
try {
97-
val schemaRoot = arrowStream.getReader().getVectorSchemaRoot();
98-
val cursor = ArrowStreamReaderCursor.streaming(arrowStream.getReader(), arrowStream, sessionZone);
99-
return build(cursor, schemaRoot, queryId, null);
102+
val schemaRoot = reader.getVectorSchemaRoot();
103+
val columns = schemaRoot.getSchema().getFields().stream()
104+
.map(ArrowToHyperTypeMapper::toColumnMetadata)
105+
.collect(Collectors.toList());
106+
val metadata = new DataCloudResultSetMetaData(columns);
107+
val cursor = new ArrowStreamReaderCursor(reader, allocator, sessionZone);
108+
val accessors = cursor.createAccessors().toArray(new QueryJDBCAccessor[0]);
109+
val columnNameResolver = new ColumnNameResolver(columns);
110+
return new StreamingResultSet(cursor, queryId, metadata, accessors, columnNameResolver);
100111
} catch (IOException ex) {
101112
throw new SQLException("Unexpected error during ResultSet creation", "XX000", ex);
102113
} catch (IllegalArgumentException ex) {
@@ -105,66 +116,6 @@ public static StreamingResultSet of(QueryResultArrowStream.Result arrowStream, S
105116
}
106117
}
107118

108-
/**
109-
* Creates a StreamingResultSet over a pre-populated in-memory {@link VectorSchemaRoot}.
110-
*
111-
* <p>Used by the metadata path (e.g. {@code DatabaseMetaData.getTables}), where the rows are
112-
* materialised into Arrow up front rather than streamed from the server. Ownership of
113-
* {@code ownedResources} — typically the {@link org.apache.arrow.memory.BufferAllocator} and
114-
* {@link VectorSchemaRoot} — transfers to the returned result set, which closes them when
115-
* it is closed.
116-
*
117-
* @param columns optional column-metadata override. When non-null it takes precedence over
118-
* what would be derived from the Arrow schema so that {@link ColumnMetadata#getTypeName()}
119-
* overrides (e.g. {@code "TEXT"} for {@code getTables} rather than the derived
120-
* {@code "VARCHAR"}) are preserved on the way through.
121-
*/
122-
public static StreamingResultSet ofInMemory(
123-
VectorSchemaRoot schemaRoot,
124-
AutoCloseable ownedResources,
125-
String queryId,
126-
ZoneId sessionZone,
127-
List<ColumnMetadata> columns)
128-
throws SQLException {
129-
try {
130-
val cursor = ArrowStreamReaderCursor.inMemory(schemaRoot, ownedResources, sessionZone);
131-
return build(cursor, schemaRoot, queryId, columns);
132-
} catch (IllegalArgumentException ex) {
133-
throw new SQLException("Unsupported column type in result set: " + ex.getMessage(), "0A000", ex);
134-
}
135-
}
136-
137-
/**
138-
* Overload that derives the column metadata from the Arrow schema. Prefer the overload that
139-
* takes an explicit {@code columns} list if callers need to preserve JDBC-spec type names
140-
* (e.g. {@code "TEXT"}).
141-
*/
142-
public static StreamingResultSet ofInMemory(
143-
VectorSchemaRoot schemaRoot, AutoCloseable ownedResources, String queryId, ZoneId sessionZone)
144-
throws SQLException {
145-
return ofInMemory(schemaRoot, ownedResources, queryId, sessionZone, null);
146-
}
147-
148-
private static StreamingResultSet build(
149-
ArrowStreamReaderCursor cursor,
150-
VectorSchemaRoot schemaRoot,
151-
String queryId,
152-
List<ColumnMetadata> columnOverride)
153-
throws SQLException {
154-
final List<ColumnMetadata> columns;
155-
if (columnOverride != null) {
156-
columns = columnOverride;
157-
} else {
158-
columns = schemaRoot.getSchema().getFields().stream()
159-
.map(field -> new ColumnMetadata(field.getName(), ArrowToHyperTypeMapper.toHyperType(field)))
160-
.collect(Collectors.toList());
161-
}
162-
val metadata = new DataCloudResultSetMetaData(columns);
163-
val accessors = cursor.createAccessors().toArray(new QueryJDBCAccessor[0]);
164-
val columnNameResolver = new ColumnNameResolver(columns);
165-
return new StreamingResultSet(cursor, queryId, metadata, accessors, columnNameResolver);
166-
}
167-
168119
// --- Core ResultSet navigation ---
169120

170121
@Override

0 commit comments

Comments
 (0)