Skip to content

Commit 89737b8

Browse files
committed
update
1 parent 5187eb1 commit 89737b8

1 file changed

Lines changed: 60 additions & 6 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,14 +1804,27 @@ private static void copy(SeekableInputStream from, PositionOutputStream to, long
18041804
* @throws IOException if there is an error while writing
18051805
*/
18061806
public void end(Map<String, String> extraMetaData) throws IOException {
1807+
final long footerStart = out.getPos();
1808+
1809+
// Build the footer metadata) in memory using the helper stream
1810+
InMemoryPositionOutputStream buffer = new InMemoryPositionOutputStream(footerStart);
1811+
1812+
serializeColumnIndexes(columnIndexes, blocks, buffer, fileEncryptor);
1813+
serializeOffsetIndexes(offsetIndexes, blocks, buffer, fileEncryptor);
1814+
serializeBloomFilters(bloomFilters, blocks, buffer, fileEncryptor);
1815+
1816+
ParquetMetadata localFooter =
1817+
new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
1818+
serializeFooter(localFooter, buffer, fileEncryptor, metadataConverter);
1819+
1820+
byte[] footerBytes = buffer.toByteArray();
1821+
18071822
try {
1823+
out.write(footerBytes);
1824+
out.flush();
1825+
18081826
state = state.end();
1809-
serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
1810-
serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
1811-
serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
1812-
LOG.debug("{}: end", out.getPos());
1813-
this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
1814-
serializeFooter(footer, out, fileEncryptor, metadataConverter);
1827+
this.footer = localFooter;
18151828
} finally {
18161829
close();
18171830
}
@@ -2441,4 +2454,45 @@ protected boolean isPaddingNeeded(long remaining) {
24412454
return (remaining <= maxPaddingSize);
24422455
}
24432456
}
2457+
2458+
/**
2459+
* Lightweight {@link PositionOutputStream} that writes into a byte buffer while
2460+
* keeping a virtual position that can be initialised to an arbitrary offset.
2461+
* The position offset lets us build the footer in memory but still record the
2462+
* *final* absolute offsets that will appear once the buffer is flushed to the
2463+
* underlying file.
2464+
*/
2465+
private static final class InMemoryPositionOutputStream extends PositionOutputStream {
2466+
private final java.io.ByteArrayOutputStream buffer = new java.io.ByteArrayOutputStream();
2467+
private long pos;
2468+
2469+
InMemoryPositionOutputStream(long startPos) {
2470+
this.pos = startPos;
2471+
}
2472+
2473+
@Override
2474+
public long getPos() {
2475+
return pos;
2476+
}
2477+
2478+
@Override
2479+
public void write(int b) {
2480+
buffer.write(b);
2481+
pos++;
2482+
}
2483+
2484+
@Override
2485+
public void write(byte[] b, int off, int len) {
2486+
buffer.write(b, off, len);
2487+
pos += len;
2488+
}
2489+
2490+
@Override
2491+
public void flush() {
2492+
}
2493+
2494+
byte[] toByteArray() {
2495+
return buffer.toByteArray();
2496+
}
2497+
}
24442498
}

0 commit comments

Comments
 (0)