Skip to content

Commit 4f868ef

Browse files
authored
apacheGH-3011: Deny further writes after InternalParquetRecordWriter is aborted (apache#3450)
1 parent c3d95de commit 4f868ef

2 files changed

Lines changed: 49 additions & 0 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ public void close() throws IOException, InterruptedException {
152152
}
153153

154154
public void write(T value) throws IOException, InterruptedException {
155+
if (aborted) {
156+
throw new IOException("Writer has been aborted due to a previous error and cannot accept further writes");
157+
}
155158
try {
156159
writeSupport.write(value);
157160
++recordCount;

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,18 @@
2929
import java.util.Random;
3030
import org.apache.hadoop.conf.Configuration;
3131
import org.apache.parquet.bytes.DirectByteBufferAllocator;
32+
import org.apache.parquet.bytes.HeapByteBufferAllocator;
3233
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
3334
import org.apache.parquet.column.ParquetProperties;
3435
import org.apache.parquet.example.data.Group;
36+
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
3537
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
3638
import org.apache.parquet.hadoop.codec.CleanUtil;
3739
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
3840
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
3941
import org.apache.parquet.io.LocalOutputFile;
42+
import org.apache.parquet.schema.MessageType;
43+
import org.apache.parquet.schema.MessageTypeParser;
4044
import org.junit.Assert;
4145
import org.junit.Rule;
4246
import org.junit.Test;
@@ -58,6 +62,48 @@ public class TestParquetWriterError {
5862
@Rule
5963
public TemporaryFolder tmpFolder = new TemporaryFolder();
6064

65+
@Test
66+
public void testWriteAfterAbortShouldThrow() throws Exception {
67+
java.nio.file.Path outputFile = tmpFolder.newFile("abort_test.parquet").toPath();
68+
MessageType schema =
69+
MessageTypeParser.parseMessageType("message test { required binary name; required int32 age; }");
70+
SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
71+
72+
try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) {
73+
ParquetWriter<Group> writer = ExampleParquetWriter.builder(new LocalOutputFile(outputFile))
74+
.withType(schema)
75+
.withAllocator(allocator)
76+
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
77+
.build();
78+
79+
// Write one valid record
80+
writer.write(groupFactory.newGroup().append("name", "Alice").append("age", 30));
81+
82+
// Simulate an aborted state by reflectively setting the aborted flag
83+
// on the internal writer. This mirrors what happens when a write fails
84+
// with an exception (e.g. OOM during page flush).
85+
Field internalWriterField = ParquetWriter.class.getDeclaredField("writer");
86+
internalWriterField.setAccessible(true);
87+
InternalParquetRecordWriter<?> internalWriter =
88+
(InternalParquetRecordWriter<?>) internalWriterField.get(writer);
89+
Field abortedField = InternalParquetRecordWriter.class.getDeclaredField("aborted");
90+
abortedField.setAccessible(true);
91+
abortedField.setBoolean(internalWriter, true);
92+
93+
// Now try to write again - this should throw IOException
94+
IOException e = Assert.assertThrows(
95+
"Expected IOException when writing to an aborted writer",
96+
IOException.class,
97+
() -> writer.write(
98+
groupFactory.newGroup().append("name", "Charlie").append("age", 25)));
99+
Assert.assertTrue(
100+
"Error message should mention aborted state", e.getMessage().contains("aborted"));
101+
102+
// Close should not throw (it should silently skip flushing due to aborted state)
103+
writer.close();
104+
}
105+
}
106+
61107
@Test
62108
public void testInSeparateProcess() throws IOException, InterruptedException {
63109
String outputFile = tmpFolder.newFile("out.parquet").toString();

0 commit comments

Comments
 (0)