Skip to content

Commit ef9c52a

Browse files
committed
AVRO-4241: [Java] BinaryDecoder should verify available bytes before reading
Add ensureAvailableBytes() pre-check in readString, readBytes, readArrayStart, arrayNext, readMapStart, and mapNext to verify the source has sufficient data before proceeding. Byte-array-backed sources return an exact remaining count. Stream-backed sources return buffered bytes plus InputStream.available(), which is reliable for the finite streams used by DataFileReader and DataFileStream. Includes regression tests and updated array/map limit tests.
1 parent 7473a6e commit ef9c52a

File tree

6 files changed

+378
-2
lines changed

6 files changed

+378
-2
lines changed

lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
*/
1818
package org.apache.avro.generic;
1919

20+
import java.io.EOFException;
2021
import java.io.IOException;
2122
import java.lang.reflect.Constructor;
2223
import java.nio.ByteBuffer;
2324
import java.util.Collection;
2425
import java.util.HashMap;
26+
import java.util.HashSet;
2527
import java.util.Map;
28+
import java.util.Set;
2629
import java.util.concurrent.ConcurrentHashMap;
2730
import java.util.function.Function;
2831

@@ -291,6 +294,7 @@ protected Object readArray(Object old, Schema expected, ResolvingDecoder in) thr
291294
long l = in.readArrayStart();
292295
long base = 0;
293296
if (l > 0) {
297+
ensureAvailableCollectionBytes(in, l, expectedType);
294298
LogicalType logicalType = expectedType.getLogicalType();
295299
Conversion<?> conversion = getData().getConversionFor(logicalType);
296300
Object array = newArray(old, (int) l, expected);
@@ -306,13 +310,25 @@ protected Object readArray(Object old, Schema expected, ResolvingDecoder in) thr
306310
}
307311
}
308312
base += l;
309-
} while ((l = in.arrayNext()) > 0);
313+
} while ((l = arrayNext(in, expectedType)) > 0);
310314
return pruneArray(array);
311315
} else {
312316
return pruneArray(newArray(old, 0, expected));
313317
}
314318
}
315319

320+
/**
321+
* Reads the next array block count and validates remaining bytes before the
322+
* caller allocates storage.
323+
*/
324+
private long arrayNext(ResolvingDecoder in, Schema elementType) throws IOException {
325+
long l = in.arrayNext();
326+
if (l > 0) {
327+
ensureAvailableCollectionBytes(in, l, elementType);
328+
}
329+
return l;
330+
}
331+
316332
private Object pruneArray(Object object) {
317333
if (object instanceof GenericArray<?>) {
318334
((GenericArray<?>) object).prune();
@@ -348,6 +364,9 @@ protected Object readMap(Object old, Schema expected, ResolvingDecoder in) throw
348364
long l = in.readMapStart();
349365
LogicalType logicalType = eValue.getLogicalType();
350366
Conversion<?> conversion = getData().getConversionFor(logicalType);
367+
if (l > 0) {
368+
ensureAvailableMapBytes(in, l, eValue);
369+
}
351370
Object map = newMap(old, (int) l);
352371
if (l > 0) {
353372
do {
@@ -361,11 +380,40 @@ protected Object readMap(Object old, Schema expected, ResolvingDecoder in) throw
361380
addToMap(map, readMapKey(null, expected, in), readWithoutConversion(null, eValue, in));
362381
}
363382
}
364-
} while ((l = in.mapNext()) > 0);
383+
} while ((l = mapNext(in, eValue)) > 0);
365384
}
366385
return map;
367386
}
368387

388+
/**
389+
* Reads the next map block count and validates remaining bytes before the
390+
* caller allocates storage.
391+
*/
392+
private long mapNext(ResolvingDecoder in, Schema valueType) throws IOException {
393+
long l = in.mapNext();
394+
if (l > 0) {
395+
ensureAvailableMapBytes(in, l, valueType);
396+
}
397+
return l;
398+
}
399+
400+
/**
401+
* Validates remaining bytes for a map block. Each map entry has a string key
402+
* (at least 1 byte for the length varint) plus a value, so the minimum bytes
403+
* per entry is {@code 1 + minBytesPerElement(valueSchema)}.
404+
*/
405+
private static void ensureAvailableMapBytes(Decoder decoder, long count, Schema valueSchema) throws EOFException {
406+
// Map keys are always strings: at least 1 byte for the length varint
407+
int minBytesPerEntry = 1 + minBytesPerElement(valueSchema);
408+
if (count > 0) {
409+
int remaining = decoder.remainingBytes();
410+
if (remaining >= 0 && count * (long) minBytesPerEntry > remaining) {
411+
throw new EOFException("Map claims " + count + " entries with at least " + minBytesPerEntry
412+
+ " bytes each, but only " + remaining + " bytes are available");
413+
}
414+
}
415+
}
416+
369417
/**
370418
* Called by the default implementation of {@link #readMap} to read a key value.
371419
* The default implementation returns delegates to
@@ -384,6 +432,73 @@ protected void addToMap(Object map, Object key, Object value) {
384432
((Map) map).put(key, value);
385433
}
386434

435+
/**
436+
* Returns the minimum number of bytes required to encode a single value of the
437+
* given schema in Avro binary format. Used to validate that the decoder has
438+
* enough data remaining before allocating collection backing arrays.
439+
* <p>
440+
* Returns 0 for types whose binary encoding is empty ({@code null}, zero-length
441+
* {@code fixed}, records with only zero-byte fields). Returns a positive value
442+
* for all other types.
443+
*/
444+
static int minBytesPerElement(Schema schema) {
445+
return minBytesPerElement(schema, new HashSet<>());
446+
}
447+
448+
private static int minBytesPerElement(Schema schema, Set<String> visited) {
449+
switch (schema.getType()) {
450+
case NULL:
451+
return 0;
452+
case FIXED:
453+
return schema.getFixedSize();
454+
case FLOAT:
455+
return 4;
456+
case DOUBLE:
457+
return 8;
458+
case RECORD:
459+
String fullName = schema.getFullName();
460+
if (!visited.add(fullName)) {
461+
return 0; // break recursion for self-referencing schemas
462+
}
463+
int sum = 0;
464+
for (Schema.Field f : schema.getFields()) {
465+
sum += minBytesPerElement(f.schema(), visited);
466+
if (sum > 0) {
467+
break; // at least 1 byte is enough to confirm non-zero
468+
}
469+
}
470+
visited.remove(fullName);
471+
return sum;
472+
case UNION:
473+
// The branch index varint is always at least 1 byte
474+
return 1;
475+
default:
476+
// BOOLEAN, INT, LONG, ENUM, STRING, BYTES, ARRAY, MAP are all >= 1 byte
477+
return 1;
478+
}
479+
}
480+
481+
/**
482+
* Validates that the decoder has enough remaining bytes to hold {@code count}
483+
* elements of the given schema, assuming each element requires at least
484+
* {@link #minBytesPerElement} bytes. Throws {@link EOFException} if the decoder
485+
* reports fewer remaining bytes than required.
486+
* <p>
487+
* This check prevents out-of-memory errors from pre-allocating huge backing
488+
* arrays when the source data is truncated or malicious.
489+
*/
490+
private static void ensureAvailableCollectionBytes(Decoder decoder, long count, Schema elementSchema)
491+
throws EOFException {
492+
int minBytes = minBytesPerElement(elementSchema);
493+
if (minBytes > 0 && count > 0) {
494+
int remaining = decoder.remainingBytes();
495+
if (remaining >= 0 && count * (long) minBytes > remaining) {
496+
throw new EOFException("Collection claims " + count + " elements with at least " + minBytes
497+
+ " bytes each, but only " + remaining + " bytes are available");
498+
}
499+
}
500+
}
501+
387502
/**
388503
* Called to read a fixed value. May be overridden for alternate fixed
389504
* representations. By default, returns {@link GenericFixed}.

lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ public double readDouble() throws IOException {
295295
@Override
296296
public Utf8 readString(Utf8 old) throws IOException {
297297
int length = SystemLimitException.checkMaxStringLength(readLong());
298+
ensureAvailableBytes(length);
298299
Utf8 result = (old != null ? old : new Utf8());
299300
result.setByteLength(length);
300301
if (0 != length) {
@@ -318,6 +319,7 @@ public void skipString() throws IOException {
318319
@Override
319320
public ByteBuffer readBytes(ByteBuffer old) throws IOException {
320321
int length = SystemLimitException.checkMaxBytesLength(readLong());
322+
ensureAvailableBytes(length);
321323
final ByteBuffer result;
322324
if (old != null && length <= old.capacity()) {
323325
result = old;
@@ -508,6 +510,23 @@ public boolean isEnd() throws IOException {
508510
return (0 == read);
509511
}
510512

513+
/**
514+
* Returns the total number of bytes remaining that can be read from this
515+
* decoder (including any buffered bytes), or {@code -1} if the total is
516+
* unknown.
517+
* <p>
518+
* Byte-array-backed decoders return an exact count. Stream-backed decoders
519+
* return the sum of buffered bytes and {@link InputStream#available()}, which
520+
* is a reliable lower bound for the finite, in-memory or file-backed streams
521+
* used by {@code DataFileReader} and {@code DataFileStream}.
522+
* <p>
523+
* {@link DirectBinaryDecoder} always returns {@code -1}.
524+
*/
525+
@Override
526+
public int remainingBytes() {
527+
return source != null ? source.remainingBytes() : -1;
528+
}
529+
511530
/**
512531
* Ensures that buf[pos + num - 1] is not out of the buffer array bounds.
513532
* However, buf[pos + num -1] may be >= limit if there is not enough data left
@@ -530,6 +549,29 @@ private void ensureBounds(int num) throws IOException {
530549
}
531550
}
532551

552+
/**
553+
* Validates that the source has at least {@code length} bytes remaining before
554+
* proceeding. Throws early if the declared length is inconsistent with the
555+
* available data.
556+
* <p>
557+
* For byte-array-backed sources the remaining count is exact, so this check is
558+
* definitive. For stream-backed sources the check uses buffered bytes plus
559+
* {@link InputStream#available()}, which is a reliable lower bound for the
560+
* finite streams used by {@code DataFileReader} and {@code DataFileStream}.
561+
*
562+
* @param length the number of bytes expected to be available
563+
* @throws EOFException if the source is known to have fewer bytes remaining
564+
*/
565+
private void ensureAvailableBytes(int length) throws EOFException {
566+
if (source != null && length > 0) {
567+
int remaining = source.remainingBytes();
568+
if (remaining >= 0 && length > remaining) {
569+
throw new EOFException(
570+
"Attempted to read " + length + " bytes, but only " + remaining + " bytes are available");
571+
}
572+
}
573+
}
574+
533575
/**
534576
* Returns an {@link java.io.InputStream} that is aware of any buffering that
535577
* may occur in this BinaryDecoder. Readers that need to interleave decoding
@@ -664,6 +706,17 @@ protected ByteSource() {
664706

665707
abstract boolean isEof();
666708

709+
/**
710+
* Returns the total number of bytes remaining that can be read from this source
711+
* (including any buffered bytes), or {@code -1} if the total is unknown.
712+
* Byte-array-backed sources return an exact count. Stream-backed sources return
713+
* the sum of buffered bytes and {@link InputStream#available()}, which is a
714+
* reliable lower bound for the finite, in-memory or file-backed streams used by
715+
* {@code DataFileReader} and {@code DataFileStream}. Returns {@code -1} only
716+
* when {@code available()} throws.
717+
*/
718+
protected abstract int remainingBytes();
719+
667720
protected void attach(int bufferSize, BinaryDecoder decoder) {
668721
decoder.buf = new byte[bufferSize];
669722
decoder.pos = 0;
@@ -910,6 +963,15 @@ public boolean isEof() {
910963
return isEof;
911964
}
912965

966+
@Override
967+
protected int remainingBytes() {
968+
try {
969+
return (ba.getLim() - ba.getPos()) + in.available();
970+
} catch (IOException e) {
971+
return -1;
972+
}
973+
}
974+
913975
@Override
914976
public void close() throws IOException {
915977
in.close();
@@ -1028,5 +1090,10 @@ public boolean isEof() {
10281090
int remaining = ba.getLim() - ba.getPos();
10291091
return (remaining == 0);
10301092
}
1093+
1094+
@Override
1095+
protected int remainingBytes() {
1096+
return ba.getLim() - ba.getPos();
1097+
}
10311098
}
10321099
}

lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,4 +299,14 @@ public void readFixed(byte[] bytes) throws IOException {
299299
* type of the next value to be read
300300
*/
301301
public abstract int readIndex() throws IOException;
302+
303+
/**
304+
* Returns the total number of bytes remaining that can be read from this
305+
* decoder, or {@code -1} if the total is unknown. Implementations that can
306+
* determine remaining capacity (for example, byte-array-backed decoders) should
307+
* override this method. The default returns {@code -1}.
308+
*/
309+
public int remainingBytes() {
310+
return -1;
311+
}
302312
}

lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,4 +246,9 @@ public int readIndex() throws IOException {
246246
public Symbol doAction(Symbol input, Symbol top) throws IOException {
247247
return null;
248248
}
249+
250+
@Override
251+
public int remainingBytes() {
252+
return in != null ? in.remainingBytes() : -1;
253+
}
249254
}

0 commit comments

Comments
 (0)