Rewrite the parquet output adapter manager#712
Conversation
ParquetWriter builds RecordBatches and hands them to a pluggable RecordBatchSink (onStart/onBatch/onFileChange/onStop). Removes the old C++ file-writer hierarchy and unifies output conversion via visitCspValueType. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove dead StructColumnArrayBuilder and parquet_dict_basket_output_adapter, de-virtualize scheduleEndCycleEvent, propagate file metadata to per-column files in split mode, and expand output tests. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Closure-based C++ sink writes parquet/IPC/split-column files directly, removing the per-batch C++<->Python hop. Adds FileExistsError. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
From the multi-model review:
- delete the now-dead Python sink (_parquet_rb_writer.py, rb_sink wiring,
TestOutputSinkDirect) since file I/O is all C++ now
- RecordBatchFileSink: guard mkdir(""), fix close-path exception safety,
resolve compression via Arrow's Codec API (case-insensitive)
- honor an explicit writeTimestampColumn
- fail fast on single-file + dict basket
- add regression tests
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove the unused manager-level m_indexSink/setIndexSink (per-basket index sinks are unaffected) and stop all writers before destroying any in stop(). Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Encapsulate the scratch isSet reset behind StructField::clearIsSet, add a debug-only length assert in buildRecordBatch, and document that file_visitor runs synchronously on the engine thread. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
ptomecek
left a comment
There was a problem hiding this comment.
I went through the whole output path (ParquetWriter -> RecordBatchSink -> makeFileSink), the dict-basket teardown, the Python wiring, and compared everything against the old FileWriterWrapper hierarchy. This is a clean rewrite. Each of the hardening fixes is real and correctly done: the mkdir guard for bare filenames, the close-path exception safety (no double-close, all sub-writers and the stream get closed even if one throws), compression resolved through Arrow's codec API, the explicit timestamp column being honored, the single-file + dict-basket fast failure, FileExistsError on overwrite, and the stop()-stops-everything-before-destroying-anything teardown. Arrow Status/Result values are checked at every call site. I didn't find any correctness, resource, or concurrency bugs.
Two low-severity things inline, plus one parity note I couldn't attach to a line:
Nested-struct field ordering: ArrowFieldWriter::NestedStructWriter (in ArrowFieldWriter.cpp, which isn't part of this PR) orders the arrow struct's child fields by declaration order, whereas the old code used the struct's memory-layout order. It reads back fine through the new name-based reader, but the on-disk child order differs from files written by older csp, which matters for anything reading those columns positionally. A test that pins down the struct-within-struct schema shape would be worth adding.
Overall this looks good to merge. The one thing I'd sort out first is the silently-dropped column below, since it turns an error into missing data.
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
|
Looks good to me now |
AdamGlustein
left a comment
There was a problem hiding this comment.
As a mere human who attempted reading through this code, I find it very difficult to follow with the lambda-based approach. First, figuring out what function is actually being called is not easy, and deciphering what actually makes it into the closure of that function is even harder. It would be much simpler (in my opinion) to make these callbacks member functions and replace the lambda capture variables with member variables for these classes.
Also, I think some of the complexity here was necessitated by being backwards compatible with the original Parquet adapter. In my opinion, I think we can relax that constraint. There are a lot of overly specific things (like symbol columns, etc.) that we don't need to support and can just drop completely from the code. With a minor version bump we're not going to break any contracts, and I highly doubt some of these vestigial features are used by anyone anymore.
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
fa752ed to
60aa0bf
Compare
Rewrite the parquet output adapter for RecordBatch-based writing
Replaces the old per-format C++ file-writer hierarchy (
FileWriterWrapper/ParquetFileWriterWrapper/ArrowIPCFileWriterWrapper/FileWriterWrapperContainer) with a RecordBatch sink architecture:ParquetWriterbuilds ArrowRecordBatches and hands them to a single concrete C++RecordBatchSinkthat writes parquet files, Arrow IPC streams, and split-column directories. This mirrors the input-side rewrite in #704 and shares the same Arrow type machinery.Motivation
The old output path had:
FileWriterWrapperContainerto fan out to split-column filesDialectGenericListWriterInterfacefor list columns, duplicating type dispatch already implemented in the shared Arrow layerStructColumnArrayBuilder) that re-implemented nested-struct serialization the Arrow nodes already doparquet_dict_basket_output_adapter(dead since basket output goes through theparquet_dict_basket_writernode)The new implementation:
arrow::RecordBatchperbatch_sizerows and writes it through a singleRecordBatchSink, so the file backend is one self-contained componentstruct_to_record_batchesArrow node and the input adapter use: struct columns go through the sharedArrowFieldWriter, and scalar columns append values directly through anArrowScalarColumnWriter<T>kernel that the field writer is itself built onArchitecture
RecordBatchSink(new,RecordBatchSink.cpp) is a single concrete class with ordinary member functions (onStart/onBatch/onFileChange/onStop) and member state — it owns overwrite checks, parent-directory creation, file rotation, compression, and the optionalfile_visitor.onFileChangereturns whether a file is open after the call, so the writer tracks open/closed state directly. Compression is resolved through Arrow's ownarrow::util::CodecAPI rather than a hardcoded name map.ArrowScalarColumnWriter<T>(new,ArrowScalarWriter.h) is the single home of the per-type Arrow-builder choice and value conversion for scalar/temporal/string/enum types. It is shared two ways: the scalar output column (ScalarColumnArrayBuilder<T>) appends ticked values to it directly, and the struct field writer (ScalarFieldWriter<T>in the sharedArrowFieldWriter) reads a struct field then appends through the same kernel. csp's row-at-a-time "may-not-tick → null" model is handled by appending value-or-null per row; no synthetic scratch struct is needed for scalar columns.ArrowBackedArrayBuilder(new) bridges published struct columns onto the sharedArrowFieldWriter: each row it is handed the sourceStruct*and the field writer reads the field (or appends null if the struct did not tick).ParquetOutputAdapterManageris simplified to orchestrate the writer + per-basket dict writers and wire the sink (and a per-basket sink factory) at construction.What's removed
FileWriterWrapper/ParquetFileWriterWrapper/ArrowIPCFileWriterWrapper/FileWriterWrapperContainer— the old per-format C++ file-writer hierarchyDialectGenericListWriterInterface— list columns now go throughListColumnArrayBuilderStructColumnArrayBuilder— nested-struct columns are written by the sharedArrowFieldWriter::NestedStructWriterStructto reuse the struct serializer; they append values directly throughArrowScalarColumnWriter<T>parquet_dict_basket_output_adapter— registered but unreachable dead adapterParquetOutputAdapter.cpp/ArrowSingleColumnArrayBuilder.hper-type boilerplate, folded into the shared Arrow writerBug fixes / hardening
Surfaced by review of the new sink:
mkdir("")on a bare filename — writing to a relative path with no directory component (e.g."out.parquet") no longer fails withInvalid argument; the empty dirname is guarded.file_visitorruns (no double-close if it throws), and the output stream / all split sub-writers are always closed even if one close fails (no leaked file descriptors).arrow::util::Codec::GetCompressionType+IsAvailable(case-insensitive, tracks whatever the Arrow build supports, clear error otherwise) instead of a hardcoded lowercase map.writeTimestampColumnhonored — an explicitly requested timestamp column is no longer silently downgraded.split_columns_to_files=Truenow raises a clear error instead of a low-levelIOError.FileExistsError— writing over an existing file withallow_overwrite=Falseraises PythonFileExistsError.stop()flush/close all writers before destroying any of them.API compatibility
The public Python API (
ParquetWriter,ParquetOutputConfig,publish,publish_struct,publish_dict_basket,filename_provider,file_visitor,file_metadata/column_metadata) is unchanged.csp/tests/adapters/test_parquet_output.py(77 tests, covering all scalar/struct/list/numpy types, batch-size→row-group counts, compression codecs, Arrow IPC, split-column, rotation, dict baskets, metadata, nested-struct schema shape, overwrite/FileExistsError) and the existingtest_parquet.pysuite pass.