Skip to content

Commit 65ac88b

Browse files
committed
fix: skip zero-row batches in ArrowStreamReaderCursor
The cursor's next() was treating "loadNextBatch returned true" as "I have a row." That's wrong: a zero-row batch is valid Arrow IPC. Hyper sends one as the first chunk on some queries, and the async chunked-query path uses empty batches as keep-alives. So the cursor reports a phantom row that isn't there. The fix is a new loadNextNonEmptyBatch that calls loadNextBatch in a loop and only returns once it lands on a batch with at least one row. JDBC's ResultSet.next() has no concept of a batch, so swallowing the empty ones has to happen in the cursor. Pushing it out to callers would mean every JDBC user has to know about Arrow's batch boundaries. Tests: - skipsZeroRowBatchAndYieldsSubsequentNonEmptyRows: real Arrow IPC stream of {0-row, 1-row}, next() reports the one real row, then false. - zeroRowOnlyBatchYieldsNoRows: real Arrow IPC stream of {0-row}, next() returns false. - firstNextReturnsTrueWhenInitialBatchHasRows / firstNextReturnsFalseWhenStreamHasNoBatches replace the old parameterised forwardsLoadNextBatch test, which would loop forever under the new control flow.
1 parent 092ac4d commit 65ac88b

2 files changed

Lines changed: 119 additions & 16 deletions

File tree

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/ArrowStreamReaderCursor.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,15 @@ private QueryJDBCAccessor createAccessor(FieldVector vector) throws SQLException
5454
return QueryJDBCAccessorFactory.createAccessor(vector, currentIndex::get, sessionZone);
5555
}
5656

57-
private boolean loadNextBatch() throws IOException {
58-
if (reader.loadNextBatch()) {
59-
currentIndex.set(0);
60-
return true;
57+
/**
58+
* Load the next batch that has at least one row, skipping any zero-row batches in between.
59+
*/
60+
private boolean loadNextNonEmptyBatch() throws IOException {
61+
while (reader.loadNextBatch()) {
62+
if (getSchemaRoot().getRowCount() > 0) {
63+
currentIndex.set(0);
64+
return true;
65+
}
6166
}
6267
return false;
6368
}
@@ -68,7 +73,7 @@ public boolean next() {
6873
val total = getSchemaRoot().getRowCount();
6974

7075
try {
71-
val next = current < total || loadNextBatch();
76+
val next = current < total || loadNextNonEmptyBatch();
7277
if (next) {
7378
rowsSeen++;
7479
}

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/core/ArrowStreamReaderCursorTest.java

Lines changed: 109 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,28 @@
44
*/
55
package com.salesforce.datacloud.jdbc.core;
66

7-
import static org.assertj.core.api.Assertions.*;
8-
import static org.mockito.Mockito.*;
7+
import static org.assertj.core.api.Assertions.assertThat;
8+
import static org.mockito.Mockito.times;
9+
import static org.mockito.Mockito.verify;
10+
import static org.mockito.Mockito.when;
911

12+
import java.io.ByteArrayInputStream;
13+
import java.io.ByteArrayOutputStream;
1014
import java.time.ZoneId;
15+
import java.util.Collections;
1116
import java.util.stream.IntStream;
1217
import lombok.SneakyThrows;
1318
import lombok.val;
19+
import org.apache.arrow.memory.RootAllocator;
1420
import org.apache.arrow.vector.VectorSchemaRoot;
1521
import org.apache.arrow.vector.ipc.ArrowStreamReader;
22+
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
23+
import org.apache.arrow.vector.types.pojo.ArrowType;
24+
import org.apache.arrow.vector.types.pojo.Field;
25+
import org.apache.arrow.vector.types.pojo.FieldType;
26+
import org.apache.arrow.vector.types.pojo.Schema;
1627
import org.junit.jupiter.api.Test;
1728
import org.junit.jupiter.api.extension.ExtendWith;
18-
import org.junit.jupiter.params.ParameterizedTest;
19-
import org.junit.jupiter.params.provider.ValueSource;
2029
import org.mockito.Mock;
2130
import org.mockito.junit.jupiter.MockitoExtension;
2231

@@ -41,7 +50,10 @@ void closesTheReader() {
4150
void incrementsInternalIndexUntilRowsExhaustedThenLoadsNextBatch() {
4251
val times = 5;
4352
when(reader.getVectorSchemaRoot()).thenReturn(root);
44-
when(reader.loadNextBatch()).thenReturn(true);
53+
// First batch has rows; loadNextBatch is consulted only after the per-batch index is
54+
// exhausted. Return false on that single call so the cursor terminates rather than
55+
// looping forever inside loadNextNonEmptyBatch.
56+
when(reader.loadNextBatch()).thenReturn(false);
4557
when(root.getRowCount()).thenReturn(times);
4658

4759
val sut = new ArrowStreamReaderCursor(reader, ZoneId.systemDefault());
@@ -51,16 +63,102 @@ void incrementsInternalIndexUntilRowsExhaustedThenLoadsNextBatch() {
5163
verify(reader, times(1)).loadNextBatch();
5264
}
5365

54-
@ParameterizedTest
66+
@Test
5567
@SneakyThrows
56-
@ValueSource(booleans = {true, false})
57-
void forwardsLoadNextBatch(boolean result) {
58-
when(root.getRowCount()).thenReturn(-10);
68+
void firstNextReturnsTrueWhenInitialBatchHasRows() {
69+
when(root.getRowCount()).thenReturn(1);
5970
when(reader.getVectorSchemaRoot()).thenReturn(root);
60-
when(reader.loadNextBatch()).thenReturn(result);
6171

6272
val sut = new ArrowStreamReaderCursor(reader, ZoneId.systemDefault());
6373

64-
assertThat(sut.next()).isEqualTo(result);
74+
assertThat(sut.next()).isTrue();
75+
}
76+
77+
@Test
78+
@SneakyThrows
79+
void firstNextReturnsFalseWhenStreamHasNoBatches() {
80+
when(root.getRowCount()).thenReturn(0);
81+
when(reader.getVectorSchemaRoot()).thenReturn(root);
82+
when(reader.loadNextBatch()).thenReturn(false);
83+
84+
val sut = new ArrowStreamReaderCursor(reader, ZoneId.systemDefault());
85+
86+
assertThat(sut.next()).isFalse();
87+
}
88+
89+
/**
90+
* Pin behavior on a real Arrow IPC stream that emits a zero-row batch followed by a non-empty
91+
* batch. The cursor must skip the zero-row batch (it is valid Arrow IPC, e.g. async queries
92+
* with empty initial chunks or schema-only metadata streams) rather than reporting a phantom
93+
* row, and then surface the actual data on the next call.
94+
*/
95+
@Test
96+
@SneakyThrows
97+
void skipsZeroRowBatchAndYieldsSubsequentNonEmptyRows() {
98+
val field = new Field("a", new FieldType(true, new ArrowType.Int(32, true), null), null);
99+
val schema = new Schema(Collections.singletonList(field));
100+
101+
byte[] ipc;
102+
try (RootAllocator writeAlloc = new RootAllocator(Long.MAX_VALUE);
103+
VectorSchemaRoot writeRoot = VectorSchemaRoot.create(schema, writeAlloc)) {
104+
try (val out = new ByteArrayOutputStream();
105+
ArrowStreamWriter writer = new ArrowStreamWriter(writeRoot, null, out)) {
106+
writer.start();
107+
writeRoot.allocateNew();
108+
writeRoot.setRowCount(0);
109+
writer.writeBatch();
110+
writeRoot.allocateNew();
111+
((org.apache.arrow.vector.IntVector) writeRoot.getVector("a")).setSafe(0, 7);
112+
writeRoot.setRowCount(1);
113+
writer.writeBatch();
114+
writer.end();
115+
ipc = out.toByteArray();
116+
}
117+
}
118+
119+
try (RootAllocator readAlloc = new RootAllocator(Long.MAX_VALUE);
120+
ArrowStreamReader streamReader = new ArrowStreamReader(new ByteArrayInputStream(ipc), readAlloc)) {
121+
val sut = new ArrowStreamReaderCursor(streamReader, ZoneId.systemDefault());
122+
123+
assertThat(sut.next())
124+
.as("skips zero-row batch, advances to row in second batch")
125+
.isTrue();
126+
assertThat(((org.apache.arrow.vector.IntVector)
127+
streamReader.getVectorSchemaRoot().getVector("a"))
128+
.get(0))
129+
.isEqualTo(7);
130+
assertThat(sut.next()).as("only one real row across the stream").isFalse();
131+
}
132+
}
133+
134+
/**
135+
* Pin behavior on a stream containing only a zero-row batch. The cursor must not report any
136+
* row.
137+
*/
138+
@Test
139+
@SneakyThrows
140+
void zeroRowOnlyBatchYieldsNoRows() {
141+
val field = new Field("a", new FieldType(true, new ArrowType.Int(32, true), null), null);
142+
val schema = new Schema(Collections.singletonList(field));
143+
144+
byte[] ipc;
145+
try (RootAllocator writeAlloc = new RootAllocator(Long.MAX_VALUE);
146+
VectorSchemaRoot writeRoot = VectorSchemaRoot.create(schema, writeAlloc)) {
147+
writeRoot.allocateNew();
148+
writeRoot.setRowCount(0);
149+
try (val out = new ByteArrayOutputStream();
150+
ArrowStreamWriter writer = new ArrowStreamWriter(writeRoot, null, out)) {
151+
writer.start();
152+
writer.writeBatch();
153+
writer.end();
154+
ipc = out.toByteArray();
155+
}
156+
}
157+
158+
try (RootAllocator readAlloc = new RootAllocator(Long.MAX_VALUE);
159+
ArrowStreamReader streamReader = new ArrowStreamReader(new ByteArrayInputStream(ipc), readAlloc)) {
160+
val sut = new ArrowStreamReaderCursor(streamReader, ZoneId.systemDefault());
161+
assertThat(sut.next()).isFalse();
162+
}
65163
}
66164
}

0 commit comments

Comments
 (0)