2929import java .util .Random ;
3030import org .apache .hadoop .conf .Configuration ;
3131import org .apache .parquet .bytes .DirectByteBufferAllocator ;
32+ import org .apache .parquet .bytes .HeapByteBufferAllocator ;
3233import org .apache .parquet .bytes .TrackingByteBufferAllocator ;
3334import org .apache .parquet .column .ParquetProperties ;
3435import org .apache .parquet .example .data .Group ;
36+ import org .apache .parquet .example .data .simple .SimpleGroupFactory ;
3537import org .apache .parquet .filter2 .recordlevel .PhoneBookWriter ;
3638import org .apache .parquet .hadoop .codec .CleanUtil ;
3739import org .apache .parquet .hadoop .example .ExampleParquetWriter ;
3840import org .apache .parquet .hadoop .metadata .CompressionCodecName ;
3941import org .apache .parquet .io .LocalOutputFile ;
42+ import org .apache .parquet .schema .MessageType ;
43+ import org .apache .parquet .schema .MessageTypeParser ;
4044import org .junit .Assert ;
4145import org .junit .Rule ;
4246import org .junit .Test ;
@@ -58,6 +62,49 @@ public class TestParquetWriterError {
5862 @ Rule
5963 public TemporaryFolder tmpFolder = new TemporaryFolder ();
6064
65+ @ Test
66+ public void testWriteAfterAbortShouldThrow () throws Exception {
67+ java .nio .file .Path outputFile = tmpFolder .newFile ("abort_test.parquet" ).toPath ();
68+ MessageType schema =
69+ MessageTypeParser .parseMessageType ("message test { required binary name; required int32 age; }" );
70+ SimpleGroupFactory groupFactory = new SimpleGroupFactory (schema );
71+
72+ try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator .wrap (new HeapByteBufferAllocator ())) {
73+ ParquetWriter <Group > writer = ExampleParquetWriter .builder (new LocalOutputFile (outputFile ))
74+ .withType (schema )
75+ .withAllocator (allocator )
76+ .withWriteMode (ParquetFileWriter .Mode .OVERWRITE )
77+ .build ();
78+
79+ // Write one valid record
80+ writer .write (groupFactory .newGroup ().append ("name" , "Alice" ).append ("age" , 30 ));
81+
82+ // Simulate an aborted state by reflectively setting the aborted flag
83+ // on the internal writer. This mirrors what happens when a write fails
84+ // with an exception (e.g. OOM during page flush).
85+ Field internalWriterField = ParquetWriter .class .getDeclaredField ("writer" );
86+ internalWriterField .setAccessible (true );
87+ InternalParquetRecordWriter <?> internalWriter =
88+ (InternalParquetRecordWriter <?>) internalWriterField .get (writer );
89+ Field abortedField = InternalParquetRecordWriter .class .getDeclaredField ("aborted" );
90+ abortedField .setAccessible (true );
91+ abortedField .setBoolean (internalWriter , true );
92+
93+ // Now try to write again - this should throw IOException
94+ try {
95+ writer .write (groupFactory .newGroup ().append ("name" , "Charlie" ).append ("age" , 25 ));
96+ Assert .fail ("Expected IOException when writing to an aborted writer" );
97+ } catch (IOException e ) {
98+ Assert .assertTrue (
99+ "Error message should mention aborted state" ,
100+ e .getMessage ().contains ("aborted" ));
101+ }
102+
103+ // Close should not throw (it should silently skip flushing due to aborted state)
104+ writer .close ();
105+ }
106+ }
107+
61108 @ Test
62109 public void testInSeparateProcess () throws IOException , InterruptedException {
63110 String outputFile = tmpFolder .newFile ("out.parquet" ).toString ();
0 commit comments