Skip to content

Commit acc241e

Browse files
committed
refactor: collapse StreamingResultSet.of factories into one
StreamingResultSet had two public factories — of(reader, allocator, queryId[, zone]) (4 callers) and ofClosingOnFailure(Result, queryId, zone) (5 callers). Every production caller wanted the close-on-failure behavior; only tests and the metadata helper used the bare of(). Two factories with overlapping responsibilities is one too many — a caller hitting the bare of() and not knowing about ofClosingOnFailure would silently leak the 100 MB RootAllocator on construction failure. Collapse to one public factory: - of(QueryResultArrowStream.Result, queryId, sessionZone) — the only factory callers see, always closes both reader and allocator on failure. Name is the unambiguous "of" because there is no other. - create(reader, allocator, queryId, sessionZone) — private; just the construction body the factory wraps. Production call sites (DataCloudConnection, DataCloudStatement) and MetadataResultSets were already passing a (reader, allocator) pair, so the call shape collapses to passing the Result holder. Tests that were building the pair locally now wrap it in a Result the same way.
1 parent 7228a66 commit acc241e

6 files changed

Lines changed: 38 additions & 49 deletions

File tree

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long
221221
QueryResultArrowStream.OUTPUT_FORMAT);
222222
val arrowStream = SQLExceptionQueryResultIterator.createSqlExceptionArrowStreamReader(
223223
iterator, connectionProperties.isIncludeCustomerDetailInReason(), queryId, null);
224-
return StreamingResultSet.ofClosingOnFailure(arrowStream, queryId, ZoneId.systemDefault());
224+
return StreamingResultSet.of(arrowStream, queryId, ZoneId.systemDefault());
225225
} catch (StatusRuntimeException ex) {
226226
throw QueryExceptionHandler.createException(
227227
connectionProperties.isIncludeCustomerDetailInReason(), null, queryId, ex);
@@ -264,7 +264,7 @@ public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId, l
264264
QueryResultArrowStream.OUTPUT_FORMAT);
265265
val arrowStream = SQLExceptionQueryResultIterator.createSqlExceptionArrowStreamReader(
266266
iterator, connectionProperties.isIncludeCustomerDetailInReason(), queryId, null);
267-
return StreamingResultSet.ofClosingOnFailure(arrowStream, queryId, ZoneId.systemDefault());
267+
return StreamingResultSet.of(arrowStream, queryId, ZoneId.systemDefault());
268268
} catch (StatusRuntimeException ex) {
269269
throw QueryExceptionHandler.createException(
270270
connectionProperties.isIncludeCustomerDetailInReason(), null, queryId, ex);

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudStatement.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ public ResultSet executeQuery(String sql) throws SQLException {
196196
val iterator = executeAdaptiveQuery(sql);
197197
val arrowStream = SQLExceptionQueryResultIterator.createSqlExceptionArrowStreamReader(
198198
iterator, includeCustomerDetail, iterator.getQueryStatus().getQueryId(), sql);
199-
resultSet = StreamingResultSet.ofClosingOnFailure(
200-
arrowStream, iterator.getQueryStatus().getQueryId(), sessionZone);
199+
resultSet =
200+
StreamingResultSet.of(arrowStream, iterator.getQueryStatus().getQueryId(), sessionZone);
201201
log.info(
202202
"executeAdaptiveQuery completed. queryId={}, sessionZone={}",
203203
queryHandle.getQueryStatus().getQueryId(),
@@ -436,7 +436,7 @@ public ResultSet getResultSet() throws SQLException {
436436
includeCustomerDetail,
437437
adaptiveIterator.getQueryStatus().getQueryId(),
438438
null);
439-
resultSet = StreamingResultSet.ofClosingOnFailure(
439+
resultSet = StreamingResultSet.of(
440440
arrowStream,
441441
adaptiveIterator.getQueryStatus().getQueryId(),
442442
sessionZone);

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/StreamingResultSet.java

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -74,25 +74,38 @@ private StreamingResultSet(
7474
this.closed = false;
7575
}
7676

77-
public static StreamingResultSet of(ArrowStreamReader reader, BufferAllocator allocator, String queryId)
78-
throws SQLException {
79-
return of(reader, allocator, queryId, ZoneId.systemDefault());
80-
}
81-
8277
/**
83-
* Creates a StreamingResultSet from an {@link ArrowStreamReader} and its backing allocator.
78+
* Creates a StreamingResultSet from a {@link QueryResultArrowStream.Result} (reader paired
79+
* with its backing allocator).
8480
*
8581
* <p>Ownership of both the reader and the allocator transfers to the returned result set —
8682
* closing the result set closes the reader and then the allocator, in that order, so Arrow's
87-
* buffer accounting clears before the allocator's budget check. Callers must not close
88-
* either separately.
83+
* buffer accounting clears before the allocator's budget check. If construction itself
84+
* throws (for example a {@link SQLException} wrapping an unsupported Arrow type), this
85+
* method closes both before re-throwing so the 100 MB {@link
86+
* org.apache.arrow.memory.RootAllocator} does not leak. Callers must not close either
87+
* separately on success.
8988
*
90-
* @param reader The Arrow stream, owned by the result set.
91-
* @param allocator The allocator backing the reader, owned by the result set.
92-
* @param queryId The query identifier.
89+
* @param arrowStream The Arrow stream + allocator pair, both owned by the result set.
90+
* @param queryId The query identifier (may be {@code null} for synthesized result sets).
9391
* @param sessionZone The session timezone used for timestamp conversions.
9492
*/
95-
public static StreamingResultSet of(
93+
public static StreamingResultSet of(QueryResultArrowStream.Result arrowStream, String queryId, ZoneId sessionZone)
94+
throws SQLException {
95+
try {
96+
return create(arrowStream.getReader(), arrowStream.getAllocator(), queryId, sessionZone);
97+
} catch (SQLException | RuntimeException ex) {
98+
try {
99+
arrowStream.getReader().close();
100+
} catch (Exception suppressed) {
101+
ex.addSuppressed(suppressed);
102+
}
103+
arrowStream.getAllocator().close();
104+
throw ex;
105+
}
106+
}
107+
108+
private static StreamingResultSet create(
96109
ArrowStreamReader reader, BufferAllocator allocator, String queryId, ZoneId sessionZone)
97110
throws SQLException {
98111
try {
@@ -113,28 +126,6 @@ public static StreamingResultSet of(
113126
}
114127
}
115128

116-
/**
117-
* Hand the reader + allocator pair from {@link QueryResultArrowStream.Result} to {@link
118-
* #of(ArrowStreamReader, BufferAllocator, String, ZoneId)} and close both on construction
119-
* failure. Without this, an {@code of} call that throws (for example {@code SQLException}
120-
* wrapping an unsupported Arrow type) would leak the 100 MB
121-
* {@link org.apache.arrow.memory.RootAllocator} held by {@code Result}.
122-
*/
123-
public static StreamingResultSet ofClosingOnFailure(
124-
QueryResultArrowStream.Result arrowStream, String queryId, ZoneId sessionZone) throws SQLException {
125-
try {
126-
return of(arrowStream.getReader(), arrowStream.getAllocator(), queryId, sessionZone);
127-
} catch (SQLException | RuntimeException ex) {
128-
try {
129-
arrowStream.getReader().close();
130-
} catch (Exception suppressed) {
131-
ex.addSuppressed(suppressed);
132-
}
133-
arrowStream.getAllocator().close();
134-
throw ex;
135-
}
136-
}
137-
138129
// --- Core ResultSet navigation ---
139130

140131
@Override

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/metadata/MetadataResultSets.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package com.salesforce.datacloud.jdbc.core.metadata;
66

77
import com.salesforce.datacloud.jdbc.core.StreamingResultSet;
8+
import com.salesforce.datacloud.jdbc.protocol.QueryResultArrowStream;
89
import com.salesforce.datacloud.jdbc.protocol.data.ColumnMetadata;
910
import com.salesforce.datacloud.jdbc.protocol.data.HyperTypeToArrow;
1011
import com.salesforce.datacloud.jdbc.protocol.data.VectorPopulator;
@@ -62,13 +63,9 @@ public static StreamingResultSet of(List<ColumnMetadata> columns, List<List<Obje
6263
// Allocator is handed to StreamingResultSet along with the reader; the result set owns
6364
// its lifecycle and closes it when close() is called.
6465
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
65-
try {
66-
ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(ipcBytes), allocator);
67-
return StreamingResultSet.of(reader, allocator, /*queryId=*/ null, ZoneId.systemDefault());
68-
} catch (SQLException | RuntimeException ex) {
69-
allocator.close();
70-
throw ex;
71-
}
66+
ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(ipcBytes), allocator);
67+
return StreamingResultSet.of(
68+
new QueryResultArrowStream.Result(reader, allocator), /*queryId=*/ null, ZoneId.systemDefault());
7269
}
7370

7471
/**

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/core/StreamCloseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void closingResultSetClosesUnderlyingIterator() {
8181
// ByteStringReadableByteChannel(iterator, resource) → ArrowStreamReader
8282
val arrowStream = SQLExceptionQueryResultIterator.createSqlExceptionArrowStreamReader(
8383
tracked, false, "test-query", null);
84-
val resultSet = StreamingResultSet.of(arrowStream.getReader(), arrowStream.getAllocator(), "test-query");
84+
val resultSet = StreamingResultSet.of(arrowStream, "test-query", java.time.ZoneId.systemDefault());
8585

8686
// Read one row — stream is still open with remaining rows
8787
assertThat(resultSet.next()).isTrue();

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/core/StreamingResultSetMethodTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ private StreamingResultSet createSingleVarCharResultSet(boolean nullValue) {
8484

8585
RootAllocator readerAllocator = new RootAllocator(Long.MAX_VALUE);
8686
ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), readerAllocator);
87-
return StreamingResultSet.of(reader, readerAllocator, QUERY_ID);
87+
return StreamingResultSet.of(
88+
new QueryResultArrowStream.Result(reader, readerAllocator), QUERY_ID, ZoneId.systemDefault());
8889
}
8990

9091
// --- Unsupported methods ---
@@ -167,7 +168,7 @@ void ofClosingOnFailureClosesAllocatorWhenSchemaIsUnsupported() {
167168
val reader = spy(new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), readerAllocator));
168169
val arrowStream = new QueryResultArrowStream.Result(reader, readerAllocator);
169170

170-
assertThatThrownBy(() -> StreamingResultSet.ofClosingOnFailure(arrowStream, QUERY_ID, ZoneId.systemDefault()))
171+
assertThatThrownBy(() -> StreamingResultSet.of(arrowStream, QUERY_ID, ZoneId.systemDefault()))
171172
.isInstanceOf(SQLException.class)
172173
.hasMessageContaining("Unsupported column type");
173174

0 commit comments

Comments
 (0)