Skip to content

Commit ce192d2

Browse files
committed
AVRO-4241: [Java] BinaryDecoder should verify available bytes before reading
Add ensureAvailableBytes() pre-check in readString, readBytes, readArrayStart, arrayNext, readMapStart, and mapNext to verify the source has sufficient data before proceeding. Byte-array-backed sources return an exact remaining count. Stream-backed sources return buffered bytes plus InputStream.available(), which is reliable for the finite streams used by DataFileReader and DataFileStream. Includes regression tests and updated array/map limit tests.
1 parent 7473a6e commit ce192d2

7 files changed

Lines changed: 440 additions & 2 deletions

File tree

lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717
*/
1818
package org.apache.avro.generic;
1919

20+
import java.io.EOFException;
2021
import java.io.IOException;
2122
import java.lang.reflect.Constructor;
2223
import java.nio.ByteBuffer;
2324
import java.util.Collection;
25+
import java.util.Collections;
2426
import java.util.HashMap;
27+
import java.util.IdentityHashMap;
2528
import java.util.Map;
29+
import java.util.Set;
2630
import java.util.concurrent.ConcurrentHashMap;
2731
import java.util.function.Function;
2832

@@ -291,6 +295,7 @@ protected Object readArray(Object old, Schema expected, ResolvingDecoder in) thr
291295
long l = in.readArrayStart();
292296
long base = 0;
293297
if (l > 0) {
298+
ensureAvailableCollectionBytes(in, l, expectedType);
294299
LogicalType logicalType = expectedType.getLogicalType();
295300
Conversion<?> conversion = getData().getConversionFor(logicalType);
296301
Object array = newArray(old, (int) l, expected);
@@ -306,13 +311,25 @@ protected Object readArray(Object old, Schema expected, ResolvingDecoder in) thr
306311
}
307312
}
308313
base += l;
309-
} while ((l = in.arrayNext()) > 0);
314+
} while ((l = arrayNext(in, expectedType)) > 0);
310315
return pruneArray(array);
311316
} else {
312317
return pruneArray(newArray(old, 0, expected));
313318
}
314319
}
315320

321+
/**
322+
* Reads the next array block count and validates remaining bytes before the
323+
* caller allocates storage.
324+
*/
325+
private long arrayNext(ResolvingDecoder in, Schema elementType) throws IOException {
326+
long l = in.arrayNext();
327+
if (l > 0) {
328+
ensureAvailableCollectionBytes(in, l, elementType);
329+
}
330+
return l;
331+
}
332+
316333
private Object pruneArray(Object object) {
317334
if (object instanceof GenericArray<?>) {
318335
((GenericArray<?>) object).prune();
@@ -348,6 +365,9 @@ protected Object readMap(Object old, Schema expected, ResolvingDecoder in) throw
348365
long l = in.readMapStart();
349366
LogicalType logicalType = eValue.getLogicalType();
350367
Conversion<?> conversion = getData().getConversionFor(logicalType);
368+
if (l > 0) {
369+
ensureAvailableMapBytes(in, l, eValue);
370+
}
351371
Object map = newMap(old, (int) l);
352372
if (l > 0) {
353373
do {
@@ -361,11 +381,40 @@ protected Object readMap(Object old, Schema expected, ResolvingDecoder in) throw
361381
addToMap(map, readMapKey(null, expected, in), readWithoutConversion(null, eValue, in));
362382
}
363383
}
364-
} while ((l = in.mapNext()) > 0);
384+
} while ((l = mapNext(in, eValue)) > 0);
365385
}
366386
return map;
367387
}
368388

389+
/**
390+
* Reads the next map block count and validates remaining bytes before the
391+
* caller allocates storage.
392+
*/
393+
private long mapNext(ResolvingDecoder in, Schema valueType) throws IOException {
394+
long l = in.mapNext();
395+
if (l > 0) {
396+
ensureAvailableMapBytes(in, l, valueType);
397+
}
398+
return l;
399+
}
400+
401+
/**
402+
* Validates remaining bytes for a map block. Each map entry has a string key
403+
* (at least 1 byte for the length varint) plus a value, so the minimum bytes
404+
* per entry is {@code 1 + minBytesPerElement(valueSchema)}.
405+
*/
406+
private static void ensureAvailableMapBytes(Decoder decoder, long count, Schema valueSchema) throws EOFException {
407+
// Map keys are always strings: at least 1 byte for the length varint
408+
int minBytesPerEntry = 1 + minBytesPerElement(valueSchema);
409+
if (count > 0) {
410+
int remaining = decoder.remainingBytes();
411+
if (remaining >= 0 && count * (long) minBytesPerEntry > remaining) {
412+
throw new EOFException("Map claims " + count + " entries with at least " + minBytesPerEntry
413+
+ " bytes each, but only " + remaining + " bytes are available");
414+
}
415+
}
416+
}
417+
369418
/**
370419
* Called by the default implementation of {@link #readMap} to read a key value.
371420
* The default implementation returns delegates to
@@ -384,6 +433,73 @@ protected void addToMap(Object map, Object key, Object value) {
384433
((Map) map).put(key, value);
385434
}
386435

436+
/**
437+
* Returns the minimum number of bytes required to encode a single value of the
438+
* given schema in Avro binary format. Used to validate that the decoder has
439+
* enough data remaining before allocating collection backing arrays.
440+
* <p>
441+
* Returns 0 for types whose binary encoding is empty ({@code null}, zero-length
442+
* {@code fixed}, records with only zero-byte fields). Returns a positive value
443+
* for all other types.
444+
*/
445+
static int minBytesPerElement(Schema schema) {
446+
return minBytesPerElement(schema, Collections.newSetFromMap(new IdentityHashMap<>()));
447+
}
448+
449+
private static int minBytesPerElement(Schema schema, Set<Schema> visited) {
450+
switch (schema.getType()) {
451+
case NULL:
452+
return 0;
453+
case FIXED:
454+
return schema.getFixedSize();
455+
case FLOAT:
456+
return 4;
457+
case DOUBLE:
458+
return 8;
459+
case RECORD:
460+
if (!visited.add(schema)) {
461+
return 0; // break recursion for self-referencing schemas
462+
}
463+
long sum = 0;
464+
for (Schema.Field f : schema.getFields()) {
465+
sum += minBytesPerElement(f.schema(), visited);
466+
if (sum >= Integer.MAX_VALUE) {
467+
sum = Integer.MAX_VALUE;
468+
break;
469+
}
470+
}
471+
visited.remove(schema);
472+
return (int) sum;
473+
case UNION:
474+
// The branch index varint is always at least 1 byte
475+
return 1;
476+
default:
477+
// BOOLEAN, INT, LONG, ENUM, STRING, BYTES, ARRAY, MAP are all >= 1 byte
478+
return 1;
479+
}
480+
}
481+
482+
/**
483+
* Validates that the decoder has enough remaining bytes to hold {@code count}
484+
* elements of the given schema, assuming each element requires at least
485+
* {@link #minBytesPerElement} bytes. Throws {@link EOFException} if the decoder
486+
* reports fewer remaining bytes than required.
487+
* <p>
488+
* This check prevents out-of-memory errors from pre-allocating huge backing
489+
* arrays when the source data is truncated or malicious.
490+
*/
491+
private static void ensureAvailableCollectionBytes(Decoder decoder, long count, Schema elementSchema)
492+
throws EOFException {
493+
int minBytes = minBytesPerElement(elementSchema);
494+
if (minBytes > 0 && count > 0) {
495+
int remaining = decoder.remainingBytes();
496+
if (remaining >= 0 && count * (long) minBytes > remaining) {
497+
throw new EOFException("Collection claims " + count + " elements with at least " + minBytes
498+
+ " bytes each, but only " + remaining + " bytes are available");
499+
}
500+
}
501+
}
502+
387503
/**
388504
* Called to read a fixed value. May be overridden for alternate fixed
389505
* representations. By default, returns {@link GenericFixed}.

lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import org.apache.avro.AvroRuntimeException;
2121
import org.apache.avro.InvalidNumberEncodingException;
2222
import org.apache.avro.SystemLimitException;
23+
import org.apache.avro.util.ByteBufferInputStream;
2324
import org.apache.avro.util.Utf8;
2425

26+
import java.io.ByteArrayInputStream;
2527
import java.io.EOFException;
2628
import java.io.IOException;
2729
import java.io.InputStream;
@@ -295,6 +297,7 @@ public double readDouble() throws IOException {
295297
@Override
296298
public Utf8 readString(Utf8 old) throws IOException {
297299
int length = SystemLimitException.checkMaxStringLength(readLong());
300+
ensureAvailableBytes(length);
298301
Utf8 result = (old != null ? old : new Utf8());
299302
result.setByteLength(length);
300303
if (0 != length) {
@@ -318,6 +321,7 @@ public void skipString() throws IOException {
318321
@Override
319322
public ByteBuffer readBytes(ByteBuffer old) throws IOException {
320323
int length = SystemLimitException.checkMaxBytesLength(readLong());
324+
ensureAvailableBytes(length);
321325
final ByteBuffer result;
322326
if (old != null && length <= old.capacity()) {
323327
result = old;
@@ -508,6 +512,21 @@ public boolean isEnd() throws IOException {
508512
return (0 == read);
509513
}
510514

515+
/**
516+
* Returns the total number of bytes remaining that can be read from this
517+
* decoder (including any buffered bytes), or {@code -1} if the total is
518+
* unknown.
519+
* <p>
520+
* Byte-array-backed decoders return an exact count. InputStream-backed decoders
521+
* return an exact count only when the wrapped stream can report one.
522+
* <p>
523+
* {@link DirectBinaryDecoder} always returns {@code -1}.
524+
*/
525+
@Override
526+
public int remainingBytes() {
527+
return source != null ? source.remainingBytes() : -1;
528+
}
529+
511530
/**
512531
* Ensures that buf[pos + num - 1] is not out of the buffer array bounds.
513532
* However, buf[pos + num -1] may be >= limit if there is not enough data left
@@ -530,6 +549,27 @@ private void ensureBounds(int num) throws IOException {
530549
}
531550
}
532551

552+
/**
553+
* Validates that the source has at least {@code length} bytes remaining before
554+
* proceeding. Throws early if the declared length is inconsistent with the
555+
* available data.
556+
* <p>
557+
* This check is only applied when the decoder knows the exact remaining byte
558+
* count.
559+
*
560+
* @param length the number of bytes expected to be available
561+
* @throws EOFException if the source is known to have fewer bytes remaining
562+
*/
563+
private void ensureAvailableBytes(int length) throws EOFException {
564+
if (source != null && length > 0) {
565+
int remaining = source.remainingBytes();
566+
if (remaining >= 0 && length > remaining) {
567+
throw new EOFException(
568+
"Attempted to read " + length + " bytes, but only " + remaining + " bytes are available");
569+
}
570+
}
571+
}
572+
533573
/**
534574
* Returns an {@link java.io.InputStream} that is aware of any buffering that
535575
* may occur in this BinaryDecoder. Readers that need to interleave decoding
@@ -664,6 +704,12 @@ protected ByteSource() {
664704

665705
abstract boolean isEof();
666706

707+
/**
708+
* Returns the total number of bytes remaining that can be read from this source
709+
* (including any buffered bytes), or {@code -1} if the total is unknown.
710+
*/
711+
protected abstract int remainingBytes();
712+
667713
protected void attach(int bufferSize, BinaryDecoder decoder) {
668714
decoder.buf = new byte[bufferSize];
669715
decoder.pos = 0;
@@ -910,6 +956,19 @@ public boolean isEof() {
910956
return isEof;
911957
}
912958

959+
@Override
960+
protected int remainingBytes() {
961+
int buffered = ba.getLim() - ba.getPos();
962+
try {
963+
if (in.getClass() == ByteArrayInputStream.class || in.getClass() == ByteBufferInputStream.class) {
964+
return buffered + in.available();
965+
}
966+
} catch (IOException e) {
967+
return -1;
968+
}
969+
return -1;
970+
}
971+
913972
@Override
914973
public void close() throws IOException {
915974
in.close();
@@ -1028,5 +1087,10 @@ public boolean isEof() {
10281087
int remaining = ba.getLim() - ba.getPos();
10291088
return (remaining == 0);
10301089
}
1090+
1091+
@Override
1092+
protected int remainingBytes() {
1093+
return ba.getLim() - ba.getPos();
1094+
}
10311095
}
10321096
}

lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,4 +299,14 @@ public void readFixed(byte[] bytes) throws IOException {
299299
* type of the next value to be read
300300
*/
301301
public abstract int readIndex() throws IOException;
302+
303+
/**
304+
* Returns the total number of bytes remaining that can be read from this
305+
* decoder, or {@code -1} if the total is unknown. Implementations that can
306+
* determine remaining capacity (for example, byte-array-backed decoders) should
307+
* override this method. The default returns {@code -1}.
308+
*/
309+
public int remainingBytes() {
310+
return -1;
311+
}
302312
}

lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,4 +246,9 @@ public int readIndex() throws IOException {
246246
public Symbol doAction(Symbol input, Symbol top) throws IOException {
247247
return null;
248248
}
249+
250+
@Override
251+
public int remainingBytes() {
252+
return in != null ? in.remainingBytes() : -1;
253+
}
249254
}

lang/java/avro/src/main/java/org/apache/avro/util/ByteBufferInputStream.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,18 @@ public int read(byte[] b, int off, int len) throws IOException {
6565
}
6666
}
6767

68+
@Override
69+
public int available() throws IOException {
70+
long remaining = 0;
71+
for (int i = current; i < buffers.size(); i++) {
72+
remaining += buffers.get(i).remaining();
73+
if (remaining >= Integer.MAX_VALUE) {
74+
return Integer.MAX_VALUE;
75+
}
76+
}
77+
return (int) remaining;
78+
}
79+
6880
/**
6981
* Read a buffer from the input without copying, if possible.
7082
*/

0 commit comments

Comments
 (0)