Skip to content

Commit 1f6beac

Browse files
committed
Fix LocalInputFile
Summary Root cause (parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java) Both read(ByteBuffer) and readFully(ByteBuffer) called: buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining()); Two independent bugs: 1. Wrong put overload semantics. ByteBuffer.put(byte[] src, int offset, int length) reads src[offset..offset+length]. The code passed buf.position() + buf.arrayOffset() as the source offset, but the source is the freshly-allocated buffer array whose indices have no relationship to buf.position() or buf.arrayOffset(). On heap buffers with position=0 and arrayOffset=0 this happens to work; any other state reads from the wrong offset (or throws ArrayIndexOutOfBoundsException). 2. arrayOffset() is not universally defined. Direct buffers, memory-mapped buffers, and read-only views all throw UnsupportedOperationException from arrayOffset(). Any Parquet consumer passing such a buffer (which is exactly what the steampipe integration test triggers via ParquetFileReader.readFooter) blows up at the first call. 3. Partial-read accounting. read(ByteBuffer) unconditionally copied buf.remaining() bytes regardless of how many read(byte[]) actually returned, corrupting the destination buffer on short reads / EOF. Fix - readFully(ByteBuffer) → buf.put(buffer) (straight array-to-buffer copy, no arrayOffset()). - read(ByteBuffer) → only put read bytes when read > 0, return read (including -1 at EOF). Tests added TestLocalInputOutput gained 8 regression cases in parquet-common/src/test/java/org/apache/parquet/io/: - readFullyIntoHeapByteBuffer — baseline happy path. - readFullyIntoHeapByteBufferWithNonZeroPosition — exercises the old wrong-source-offset bug. - readFullyIntoDirectByteBuffer — direct buffers; arrayOffset() would have thrown. - readFullyIntoReadOnlyByteBuffer — read-only views; asserts ReadOnlyBufferException (correct JDK behaviour after the fix). - readIntoHeapByteBuffer — baseline. - readIntoByteBufferAdvancesPositionByBytesRead — exercises the partial-read accounting bug. - readIntoByteBufferReturnsMinusOneAtEof — EOF signalling. - readIntoDirectByteBuffer — direct buffer read path.
1 parent 53d7842 commit 1f6beac

2 files changed

Lines changed: 158 additions & 4 deletions

File tree

parquet-common/src/main/java/org/apache/parquet/io/LocalInputFile.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,18 @@ public void readFully(byte[] bytes, int start, int len) throws IOException {
8181
@Override
8282
public int read(ByteBuffer buf) throws IOException {
8383
byte[] buffer = new byte[buf.remaining()];
84-
int code = read(buffer);
85-
buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
86-
return code;
84+
int bytesRead = randomAccessFile.read(buffer);
85+
if (bytesRead > 0) {
86+
buf.put(buffer, 0, bytesRead);
87+
}
88+
return bytesRead;
8789
}
8890

8991
@Override
9092
public void readFully(ByteBuffer buf) throws IOException {
9193
byte[] buffer = new byte[buf.remaining()];
9294
readFully(buffer);
93-
buf.put(buffer, buf.position() + buf.arrayOffset(), buf.remaining());
95+
buf.put(buffer);
9496
}
9597

9698
@Override

parquet-common/src/test/java/org/apache/parquet/io/TestLocalInputOutput.java

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
*/
1919
package org.apache.parquet.io;
2020

21+
import static org.junit.Assert.assertArrayEquals;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertThrows;
2324

25+
import java.io.EOFException;
2426
import java.io.File;
2527
import java.io.IOException;
28+
import java.nio.ByteBuffer;
2629
import java.nio.file.FileAlreadyExistsException;
2730
import java.nio.file.Path;
2831
import java.nio.file.Paths;
@@ -89,4 +92,153 @@ private File createTempFile() throws IOException {
8992
tmp.delete();
9093
return tmp;
9194
}
95+
96+
@Test
97+
public void readFullyIntoHeapByteBuffer() throws IOException {
98+
Path path = writeBytes(new byte[] {1, 2, 3, 4, 5});
99+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
100+
ByteBuffer buf = ByteBuffer.allocate(5);
101+
stream.readFully(buf);
102+
assertEquals(5, buf.position());
103+
buf.flip();
104+
byte[] out = new byte[5];
105+
buf.get(out);
106+
assertArrayEquals(new byte[] {1, 2, 3, 4, 5}, out);
107+
}
108+
}
109+
110+
@Test
111+
public void readFullyIntoHeapByteBufferWithNonZeroPosition() throws IOException {
112+
// Regression: the buggy implementation passed buf.position() as the src offset to
113+
// ByteBuffer.put(byte[], int, int), which reads from the wrong location in the source array.
114+
Path path = writeBytes(new byte[] {10, 20, 30, 40});
115+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
116+
ByteBuffer buf = ByteBuffer.allocate(6);
117+
buf.put(new byte[] {99, 99}); // advance position to 2
118+
stream.readFully(buf);
119+
assertEquals(6, buf.position());
120+
buf.flip();
121+
byte[] out = new byte[6];
122+
buf.get(out);
123+
assertArrayEquals(new byte[] {99, 99, 10, 20, 30, 40}, out);
124+
}
125+
}
126+
127+
@Test
128+
public void readFullyIntoDirectByteBuffer() throws IOException {
129+
// Regression: the buggy implementation called arrayOffset() which throws
130+
// UnsupportedOperationException on direct buffers.
131+
Path path = writeBytes(new byte[] {7, 8, 9});
132+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
133+
ByteBuffer buf = ByteBuffer.allocateDirect(3);
134+
stream.readFully(buf);
135+
assertEquals(3, buf.position());
136+
buf.flip();
137+
byte[] out = new byte[3];
138+
buf.get(out);
139+
assertArrayEquals(new byte[] {7, 8, 9}, out);
140+
}
141+
}
142+
143+
@Test
144+
public void readFullyIntoReadOnlyByteBuffer() throws IOException {
145+
// Read-only views also throw from arrayOffset().
146+
Path path = writeBytes(new byte[] {7, 8, 9});
147+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
148+
ByteBuffer backing = ByteBuffer.allocate(3);
149+
ByteBuffer buf = backing.asReadOnlyBuffer();
150+
assertThrows(java.nio.ReadOnlyBufferException.class, () -> stream.readFully(buf));
151+
}
152+
}
153+
154+
@Test
155+
public void readIntoHeapByteBuffer() throws IOException {
156+
Path path = writeBytes(new byte[] {1, 2, 3, 4});
157+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
158+
ByteBuffer buf = ByteBuffer.allocate(4);
159+
int read = stream.read(buf);
160+
assertEquals(4, read);
161+
assertEquals(4, buf.position());
162+
buf.flip();
163+
byte[] out = new byte[4];
164+
buf.get(out);
165+
assertArrayEquals(new byte[] {1, 2, 3, 4}, out);
166+
}
167+
}
168+
169+
@Test
170+
public void readIntoByteBufferAdvancesPositionByBytesRead() throws IOException {
171+
// Regression: the buggy implementation always advanced by buf.remaining() regardless of how
172+
// many bytes were actually read, leaving the destination buffer inconsistent on partial reads.
173+
Path path = writeBytes(new byte[] {1, 2, 3});
174+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
175+
ByteBuffer buf = ByteBuffer.allocate(10);
176+
int read = stream.read(buf);
177+
assertEquals(3, read);
178+
assertEquals(3, buf.position());
179+
}
180+
}
181+
182+
@Test
183+
public void readIntoByteBufferReturnsMinusOneAtEof() throws IOException {
184+
Path path = writeBytes(new byte[] {1});
185+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
186+
assertEquals(1, stream.read());
187+
ByteBuffer buf = ByteBuffer.allocate(4);
188+
int read = stream.read(buf);
189+
assertEquals(-1, read);
190+
assertEquals(0, buf.position());
191+
}
192+
}
193+
194+
@Test
195+
public void readIntoDirectByteBuffer() throws IOException {
196+
Path path = writeBytes(new byte[] {7, 8, 9});
197+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
198+
ByteBuffer buf = ByteBuffer.allocateDirect(3);
199+
int read = stream.read(buf);
200+
assertEquals(3, read);
201+
assertEquals(3, buf.position());
202+
buf.flip();
203+
byte[] out = new byte[3];
204+
buf.get(out);
205+
assertArrayEquals(new byte[] {7, 8, 9}, out);
206+
}
207+
}
208+
209+
@Test
210+
public void readIntoByteBufferWithNonZeroPosition() throws IOException {
211+
// Regression: the buggy implementation passed buf.position() as the src offset to
212+
// ByteBuffer.put(byte[], int, int), which reads from the wrong location in the source array.
213+
Path path = writeBytes(new byte[] {10, 20, 30});
214+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
215+
ByteBuffer buf = ByteBuffer.allocate(5);
216+
buf.put(new byte[] {99, 99}); // advance position to 2
217+
int read = stream.read(buf);
218+
assertEquals(3, read);
219+
assertEquals(5, buf.position());
220+
buf.flip();
221+
byte[] out = new byte[5];
222+
buf.get(out);
223+
assertArrayEquals(new byte[] {99, 99, 10, 20, 30}, out);
224+
}
225+
}
226+
227+
@Test
228+
public void readFullyThrowsEofWhenStreamTooShort() throws IOException {
229+
Path path = writeBytes(new byte[] {1, 2});
230+
try (SeekableInputStream stream = new LocalInputFile(path).newStream()) {
231+
ByteBuffer buf = ByteBuffer.allocate(10);
232+
assertThrows(EOFException.class, () -> stream.readFully(buf));
233+
}
234+
}
235+
236+
private Path writeBytes(byte[] data) throws IOException {
237+
Path path = Paths.get(createTempFile().getPath());
238+
OutputFile write = new LocalOutputFile(path);
239+
try (PositionOutputStream stream = write.createOrOverwrite(512)) {
240+
stream.write(data);
241+
}
242+
return path;
243+
}
92244
}

0 commit comments

Comments
 (0)