Skip to content

Commit ca72772

Browse files
authored
AVRO-4240: [Java] Size DataFileWriter output buffer to fit entire block frame (#3705)
1 parent 7473a6e commit ca72772

1 file changed

Lines changed: 22 additions & 3 deletions

File tree

lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,20 +246,34 @@ public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) throws IOE
246246
return this;
247247
}
248248

249-
private void init(OutputStream outs) throws IOException {
249+
private void init(OutputStream outs) {
250250
this.underlyingStream = outs;
251-
this.out = new BufferedFileOutputStream(outs);
251+
// Size the output buffer to fit an entire block frame in a single flush:
252+
// maxBlockSize() for compressed data + 20 bytes for two varint-encoded longs
253+
// (up to 10 bytes each) + sync.length for the sync marker
254+
this.out = new BufferedFileOutputStream(outs, maxBlockSize() + 20 + sync.length);
252255
EncoderFactory efactory = new EncoderFactory();
253256
this.vout = efactory.directBinaryEncoder(out, null);
254257
dout.setSchema(schema);
255-
buffer = new NonCopyingByteArrayOutputStream(Math.min((int) (syncInterval * 1.25), Integer.MAX_VALUE / 2 - 1));
258+
buffer = new NonCopyingByteArrayOutputStream(maxBlockSize());
256259
this.bufOut = this.initEncoder.apply(buffer);
257260
if (this.codec == null) {
258261
this.codec = CodecFactory.nullCodec().createInstance();
259262
}
260263
this.isOpen = true;
261264
}
262265

266+
/**
267+
* Returns the estimated maximum compressed block size. Blocks are flushed when
268+
* uncompressed data reaches {@link #syncInterval}, but compression may increase
269+
* size (e.g. uncompressible data with codec framing overhead), so we allow 25%
270+
* headroom. The result is clamped to avoid integer overflow when used for
271+
* buffer allocation.
272+
*/
273+
private int maxBlockSize() {
274+
return Math.min((int) (syncInterval * 1.25), Integer.MAX_VALUE / 2 - 1);
275+
}
276+
263277
private static final SecureRandom RNG = new SecureRandom();
264278

265279
private static byte[] generateSync() {
@@ -494,6 +508,11 @@ public BufferedFileOutputStream(OutputStream out) {
494508
this.out = new PositionFilter(out);
495509
}
496510

511+
public BufferedFileOutputStream(OutputStream out, int bufferSize) {
512+
super(null, bufferSize);
513+
this.out = new PositionFilter(out);
514+
}
515+
497516
public long tell() {
498517
return position + count;
499518
}

0 commit comments

Comments
 (0)