44 */
55package com .salesforce .datacloud .jdbc .core ;
66
7- import static org .assertj .core .api .Assertions .*;
8- import static org .mockito .Mockito .*;
7+ import static org .assertj .core .api .Assertions .assertThat ;
8+ import static org .mockito .Mockito .times ;
9+ import static org .mockito .Mockito .verify ;
10+ import static org .mockito .Mockito .when ;
911
12+ import java .io .ByteArrayInputStream ;
13+ import java .io .ByteArrayOutputStream ;
1014import java .time .ZoneId ;
15+ import java .util .Collections ;
1116import java .util .stream .IntStream ;
1217import lombok .SneakyThrows ;
1318import lombok .val ;
19+ import org .apache .arrow .memory .RootAllocator ;
1420import org .apache .arrow .vector .VectorSchemaRoot ;
1521import org .apache .arrow .vector .ipc .ArrowStreamReader ;
22+ import org .apache .arrow .vector .ipc .ArrowStreamWriter ;
23+ import org .apache .arrow .vector .types .pojo .ArrowType ;
24+ import org .apache .arrow .vector .types .pojo .Field ;
25+ import org .apache .arrow .vector .types .pojo .FieldType ;
26+ import org .apache .arrow .vector .types .pojo .Schema ;
1627import org .junit .jupiter .api .Test ;
1728import org .junit .jupiter .api .extension .ExtendWith ;
18- import org .junit .jupiter .params .ParameterizedTest ;
19- import org .junit .jupiter .params .provider .ValueSource ;
2029import org .mockito .Mock ;
2130import org .mockito .junit .jupiter .MockitoExtension ;
2231
@@ -41,7 +50,10 @@ void closesTheReader() {
4150 void incrementsInternalIndexUntilRowsExhaustedThenLoadsNextBatch () {
4251 val times = 5 ;
4352 when (reader .getVectorSchemaRoot ()).thenReturn (root );
44- when (reader .loadNextBatch ()).thenReturn (true );
53+ // First batch has rows; loadNextBatch is consulted only after the per-batch index is
54+ // exhausted. Return false on that single call so the cursor terminates rather than
55+ // looping forever inside loadNextNonEmptyBatch.
56+ when (reader .loadNextBatch ()).thenReturn (false );
4557 when (root .getRowCount ()).thenReturn (times );
4658
4759 val sut = new ArrowStreamReaderCursor (reader , ZoneId .systemDefault ());
@@ -51,16 +63,102 @@ void incrementsInternalIndexUntilRowsExhaustedThenLoadsNextBatch() {
5163 verify (reader , times (1 )).loadNextBatch ();
5264 }
5365
54- @ ParameterizedTest
66+ @ Test
5567 @ SneakyThrows
56- @ ValueSource (booleans = {true , false })
57- void forwardsLoadNextBatch (boolean result ) {
58- when (root .getRowCount ()).thenReturn (-10 );
68+ void firstNextReturnsTrueWhenInitialBatchHasRows () {
69+ when (root .getRowCount ()).thenReturn (1 );
5970 when (reader .getVectorSchemaRoot ()).thenReturn (root );
60- when (reader .loadNextBatch ()).thenReturn (result );
6171
6272 val sut = new ArrowStreamReaderCursor (reader , ZoneId .systemDefault ());
6373
64- assertThat (sut .next ()).isEqualTo (result );
74+ assertThat (sut .next ()).isTrue ();
75+ }
76+
77+ @ Test
78+ @ SneakyThrows
79+ void firstNextReturnsFalseWhenStreamHasNoBatches () {
80+ when (root .getRowCount ()).thenReturn (0 );
81+ when (reader .getVectorSchemaRoot ()).thenReturn (root );
82+ when (reader .loadNextBatch ()).thenReturn (false );
83+
84+ val sut = new ArrowStreamReaderCursor (reader , ZoneId .systemDefault ());
85+
86+ assertThat (sut .next ()).isFalse ();
87+ }
88+
89+ /**
90+ * Pin behavior on a real Arrow IPC stream that emits a zero-row batch followed by a non-empty
91+ * batch. The cursor must skip the zero-row batch (it is valid Arrow IPC, e.g. async queries
92+ * with empty initial chunks or schema-only metadata streams) rather than reporting a phantom
93+ * row, and then surface the actual data on the next call.
94+ */
95+ @ Test
96+ @ SneakyThrows
97+ void skipsZeroRowBatchAndYieldsSubsequentNonEmptyRows () {
98+ val field = new Field ("a" , new FieldType (true , new ArrowType .Int (32 , true ), null ), null );
99+ val schema = new Schema (Collections .singletonList (field ));
100+
101+ byte [] ipc ;
102+ try (RootAllocator writeAlloc = new RootAllocator (Long .MAX_VALUE );
103+ VectorSchemaRoot writeRoot = VectorSchemaRoot .create (schema , writeAlloc )) {
104+ try (val out = new ByteArrayOutputStream ();
105+ ArrowStreamWriter writer = new ArrowStreamWriter (writeRoot , null , out )) {
106+ writer .start ();
107+ writeRoot .allocateNew ();
108+ writeRoot .setRowCount (0 );
109+ writer .writeBatch ();
110+ writeRoot .allocateNew ();
111+ ((org .apache .arrow .vector .IntVector ) writeRoot .getVector ("a" )).setSafe (0 , 7 );
112+ writeRoot .setRowCount (1 );
113+ writer .writeBatch ();
114+ writer .end ();
115+ ipc = out .toByteArray ();
116+ }
117+ }
118+
119+ try (RootAllocator readAlloc = new RootAllocator (Long .MAX_VALUE );
120+ ArrowStreamReader streamReader = new ArrowStreamReader (new ByteArrayInputStream (ipc ), readAlloc )) {
121+ val sut = new ArrowStreamReaderCursor (streamReader , ZoneId .systemDefault ());
122+
123+ assertThat (sut .next ())
124+ .as ("skips zero-row batch, advances to row in second batch" )
125+ .isTrue ();
126+ assertThat (((org .apache .arrow .vector .IntVector )
127+ streamReader .getVectorSchemaRoot ().getVector ("a" ))
128+ .get (0 ))
129+ .isEqualTo (7 );
130+ assertThat (sut .next ()).as ("only one real row across the stream" ).isFalse ();
131+ }
132+ }
133+
134+ /**
135+ * Pin behavior on a stream containing only a zero-row batch. The cursor must not report any
136+ * row. This can occur for empty query results.
137+ */
138+ @ Test
139+ @ SneakyThrows
140+ void zeroRowOnlyBatchYieldsNoRows () {
141+ val field = new Field ("a" , new FieldType (true , new ArrowType .Int (32 , true ), null ), null );
142+ val schema = new Schema (Collections .singletonList (field ));
143+
144+ byte [] ipc ;
145+ try (RootAllocator writeAlloc = new RootAllocator (Long .MAX_VALUE );
146+ VectorSchemaRoot writeRoot = VectorSchemaRoot .create (schema , writeAlloc )) {
147+ writeRoot .allocateNew ();
148+ writeRoot .setRowCount (0 );
149+ try (val out = new ByteArrayOutputStream ();
150+ ArrowStreamWriter writer = new ArrowStreamWriter (writeRoot , null , out )) {
151+ writer .start ();
152+ writer .writeBatch ();
153+ writer .end ();
154+ ipc = out .toByteArray ();
155+ }
156+ }
157+
158+ try (RootAllocator readAlloc = new RootAllocator (Long .MAX_VALUE );
159+ ArrowStreamReader streamReader = new ArrowStreamReader (new ByteArrayInputStream (ipc ), readAlloc )) {
160+ val sut = new ArrowStreamReaderCursor (streamReader , ZoneId .systemDefault ());
161+ assertThat (sut .next ()).isFalse ();
162+ }
65163 }
66164}
0 commit comments