Skip to content

Commit 089434b

Browse files
authored
Core: Fix ByteBufferInputStream.read() to return -1 at EOF (#16167)
* Core: Fix ByteBufferInputStream.read() to return -1 at EOF instead of throwing EOFException (#16127) * Core: Enhance EOF test coverage for ByteBufferInputStream Add read(byte[]) assertion to assertAtEOF helper to pin both overloads, and add testDrainedMultiBufferStream to explicitly exercise the nextBuffer() -> return -1 code path in MultiBufferInputStream.
1 parent 9789f85 commit 089434b

3 files changed

Lines changed: 33 additions & 7 deletions

File tree

core/src/main/java/org/apache/iceberg/io/MultiBufferInputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ public int read(byte[] bytes) {
263263
@Override
264264
public int read() throws IOException {
265265
if (current == null) {
266-
throw new EOFException();
266+
return -1;
267267
}
268268

269269
while (true) {
@@ -272,7 +272,7 @@ public int read() throws IOException {
272272
return current.get() & 0xFF; // as unsigned
273273
} else if (!nextBuffer()) {
274274
// there are no more buffers
275-
throw new EOFException();
275+
return -1;
276276
}
277277
}
278278
}

core/src/main/java/org/apache/iceberg/io/SingleBufferInputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public long getPos() {
5858
@Override
5959
public int read() throws IOException {
6060
if (!buffer.hasRemaining()) {
61-
throw new EOFException();
61+
return -1;
6262
}
6363
return buffer.get() & 0xFF; // as unsigned
6464
}

core/src/test/java/org/apache/iceberg/io/TestByteBufferInputStreams.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ public abstract class TestByteBufferInputStreams {
3535

3636
protected abstract void checkOriginalData();
3737

38+
private static void assertAtEOF(ByteBufferInputStream stream) throws IOException {
39+
long pos = stream.getPos();
40+
assertThat(stream.read()).as("read() at EOF").isEqualTo(-1);
41+
assertThat(stream.read()).as("read() should keep returning -1 at EOF").isEqualTo(-1);
42+
assertThat(stream.read(new byte[1])).as("read(byte[]) at EOF").isEqualTo(-1);
43+
assertThat(stream.getPos()).as("Position should not advance past EOF").isEqualTo(pos);
44+
assertThat(stream.available()).as("available() should be 0 at EOF").isEqualTo(0);
45+
}
46+
3847
@Test
3948
public void testRead0() throws Exception {
4049
byte[] bytes = new byte[0];
@@ -67,7 +76,7 @@ public void testReadAll() throws Exception {
6776

6877
assertThat(stream.read(bytes)).as("Should return -1 at end of stream").isEqualTo(-1);
6978

70-
assertThat(stream.available()).as("Should have no more remaining content").isEqualTo(0);
79+
assertAtEOF(stream);
7180

7281
checkOriginalData();
7382
}
@@ -102,7 +111,7 @@ public void testSmallReads() throws Exception {
102111

103112
assertThat(stream.read(bytes)).as("Should return -1 at end of stream").isEqualTo(-1);
104113

105-
assertThat(stream.available()).as("Should have no more remaining content").isEqualTo(0);
114+
assertAtEOF(stream);
106115
}
107116

108117
checkOriginalData();
@@ -142,7 +151,7 @@ public void testPartialBufferReads() throws Exception {
142151

143152
assertThat(stream.read(bytes)).as("Should return -1 at end of stream").isEqualTo(-1);
144153

145-
assertThat(stream.available()).as("Should have no more remaining content").isEqualTo(0);
154+
assertAtEOF(stream);
146155
}
147156

148157
checkOriginalData();
@@ -158,7 +167,7 @@ public void testReadByte() throws Exception {
158167
assertThat(stream.read()).isEqualTo(i);
159168
}
160169

161-
assertThatThrownBy(stream::read).isInstanceOf(EOFException.class).hasMessage(null);
170+
assertAtEOF(stream);
162171

163172
checkOriginalData();
164173
}
@@ -532,4 +541,21 @@ public void testMarkDoubleReset() throws Exception {
532541
.isInstanceOf(IOException.class)
533542
.hasMessageStartingWith("No mark defined");
534543
}
544+
545+
@Test
546+
public void testEmptyStream() throws Exception {
547+
assertAtEOF(ByteBufferInputStream.wrap(ByteBuffer.allocate(0)));
548+
assertAtEOF(ByteBufferInputStream.wrap(ByteBuffer.allocate(0), ByteBuffer.allocate(0)));
549+
assertAtEOF(ByteBufferInputStream.wrap(Collections.emptyList()));
550+
}
551+
552+
@Test
553+
public void testDrainedMultiBufferStream() throws Exception {
554+
ByteBufferInputStream stream =
555+
ByteBufferInputStream.wrap(
556+
ByteBuffer.wrap(new byte[] {1, 2, 3}), ByteBuffer.wrap(new byte[] {4, 5}));
557+
byte[] buf = new byte[5];
558+
assertThat(stream.read(buf)).as("Should read all bytes").isEqualTo(5);
559+
assertAtEOF(stream);
560+
}
535561
}

0 commit comments

Comments
 (0)