Skip to content

Commit 8ff82fd

Browse files
authored
Rewrite the parquet output adapter manager (#712)
* Refactor parquet output to a RecordBatch sink architecture ParquetWriter builds RecordBatches and hands them to a pluggable RecordBatchSink (onStart/onBatch/onFileChange/onStop). Removes the old C++ file-writer hierarchy and unifies output conversion via visitCspValueType. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Clean up parquet output: drop dead code, fix split metadata Remove dead StructColumnArrayBuilder and parquet_dict_basket_output_adapter, de-virtualize scheduleEndCycleEvent, propagate file metadata to per-column files in split mode, and expand output tests. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Move parquet output file I/O into C++ (RecordBatchFileSink) Closure-based C++ sink writes parquet/IPC/split-column files directly, removing the per-batch C++<->Python hop. Adds FileExistsError. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Parquet output: fix sink bugs, remove orphaned Python sink From the multi-model review: - delete the now-dead Python sink (_parquet_rb_writer.py, rb_sink wiring, TestOutputSinkDirect) since file I/O is all C++ now - RecordBatchFileSink: guard mkdir(""), fix close-path exception safety, resolve compression via Arrow's Codec API (case-insensitive) - honor an explicit writeTimestampColumn - fail fast on single-file + dict basket - add regression tests Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Parquet output: drop dead manager index-sink, harden stop() order Remove the unused manager-level m_indexSink/setIndexSink (per-basket index sinks are unaffected) and stop all writers before destroying any in stop(). Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Parquet output: minor cleanups (scratch reset, batch guard, docs) Encapsulate the scratch isSet reset behind StructField::clearIsSet, add a debug-only length assert in buildRecordBatch, and document that file_visitor runs synchronously on the engine thread. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Fix Windows CI: release parquet/IPC file handles before tempdir cleanup Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Parquet output: remove dead code found in full audit Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Parquet output: harden teardown, basket metadata, and tests Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Review fixes, and cleanup Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Address comments Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Parquet output: value-first scalar writer, drop scratch struct Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Cleanup Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> --------- Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
1 parent c256630 commit 8ff82fd

32 files changed

Lines changed: 2914 additions & 1791 deletions

cpp/csp/adapters/arrow/ArrowFieldWriter.cpp

Lines changed: 52 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
// Concrete FieldWriter implementations for all CSP scalar types.
22
//
3-
// Fixed-length writers use UnsafeWriter<BuilderT> — a single template
4-
// that takes a value-extraction callable at construction (mirrors LambdaReader).
5-
// Variable-length writers (StringLike, Enum) and NestedStruct are separate
6-
// classes because they need safe Append (variable-length) or recursive logic.
3+
// Every scalar/temporal/string/enum column is a ScalarFieldWriter<CspT>: it reads the value out of
4+
// a struct field and appends it through the shared ArrowScalarColumnWriter<CspT> kernel (the single
5+
// home of the per-type Arrow-builder choice + conversion, also used value-first by the scalar output
6+
// path). NestedStruct is a separate class because it is recursive.
77

88
#include <csp/adapters/arrow/ArrowFieldWriter.h>
9+
#include <csp/adapters/arrow/ArrowScalarWriter.h>
910
#include <csp/engine/CspType.h>
1011
#include <csp/engine/CspEnum.h>
1112

@@ -54,147 +55,59 @@ std::vector<std::shared_ptr<::arrow::Array>> FieldWriter::finish()
5455
namespace
5556
{
5657

57-
// --- Generic lambda-based writer for fixed-length types ---
58-
// ValueFn signature: auto(const Struct *) — returns the value to UnsafeAppend/Append.
59-
// Covers: all numeric primitives, bool, DateTime, TimeDelta, Time, Date.
58+
// --- Scalar field writer: reads field->value<CspT>(s), appends via the shared kernel ---
59+
// Replaces the former per-category writers (numeric/temporal/string/enum). The per-type Arrow
60+
// builder + value conversion now live exactly once, in ArrowScalarColumnWriter<CspT>.
6061

61-
template<typename ArrowBuilderT, typename ValueFn>
62-
class UnsafeWriter final : public FieldWriter
62+
template<typename CspT>
63+
class ScalarFieldWriter final : public FieldWriter
6364
{
6465
public:
65-
UnsafeWriter( const std::string & columnName, const StructFieldPtr & field,
66-
std::shared_ptr<ArrowBuilderT> typedBuilder,
67-
std::shared_ptr<::arrow::DataType> dataType, ValueFn fn )
68-
: FieldWriter( columnName, field, typedBuilder, std::move( dataType ) ),
69-
m_typedBuilder( typedBuilder.get() ), m_fn( std::move( fn ) ) {}
70-
71-
void writeAll( const std::vector<StructPtr> & structs, int64_t offset, int64_t count ) override
66+
ScalarFieldWriter( const std::string & columnName, const StructFieldPtr & field, bool isBytes = false )
67+
: ScalarFieldWriter( columnName, field, ArrowScalarColumnWriter<CspT>( isBytes ) )
7268
{
73-
for( int64_t i = offset; i < offset + count; ++i )
74-
{
75-
const Struct * s = structs[i].get();
76-
if( m_field -> isSet( s ) )
77-
m_typedBuilder -> UnsafeAppend( m_fn( s ) );
78-
else
79-
m_typedBuilder -> UnsafeAppendNull();
80-
}
8169
}
8270

83-
protected:
84-
void doWrite( const Struct * s ) override
85-
{
86-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( m_fn( s ) ), "Failed to append value" );
87-
}
88-
89-
private:
90-
ArrowBuilderT * m_typedBuilder;
91-
ValueFn m_fn;
92-
};
93-
94-
// Factory: creates an UnsafeWriter, deducing ValueFn type, and returns CreatedFieldWriter
95-
template<typename ArrowBuilderT, typename ValueFn>
96-
CreatedFieldWriter makeUnsafeWriter( const std::string & name, const StructFieldPtr & field,
97-
std::shared_ptr<ArrowBuilderT> builder,
98-
std::shared_ptr<::arrow::DataType> dataType, ValueFn && fn )
99-
{
100-
auto w = std::make_unique<UnsafeWriter<ArrowBuilderT, std::decay_t<ValueFn>>>(
101-
name, field, builder, std::move( dataType ), std::forward<ValueFn>( fn ) );
102-
return { std::move( w ), std::move( builder ) };
103-
}
104-
105-
// Factory: primitive numeric writer (auto-creates builder from default constructor)
106-
template<typename CspT, typename ArrowBuilderT>
107-
CreatedFieldWriter makePrimitiveWriter( const std::string & name, const StructFieldPtr & f )
108-
{
109-
auto b = std::make_shared<ArrowBuilderT>();
110-
return makeUnsafeWriter( name, f, b, b -> type(), [f]( const Struct * s ) {
111-
return static_cast<typename ArrowBuilderT::value_type>( f -> value<CspT>( s ) );
112-
} );
113-
}
114-
115-
// Factory: nanosecond-based temporal writer (DateTime, TimeDelta, Time)
116-
template<typename CspT, typename ArrowBuilderT>
117-
CreatedFieldWriter makeNanosWriter( const std::string & name, const StructFieldPtr & f,
118-
std::shared_ptr<::arrow::DataType> dataType )
119-
{
120-
auto b = std::make_shared<ArrowBuilderT>( dataType, ::arrow::default_memory_pool() );
121-
return makeUnsafeWriter( name, f, b, std::move( dataType ), [f]( const Struct * s ) {
122-
return f -> value<CspT>( s ).asNanoseconds();
123-
} );
124-
}
125-
126-
// --- String / Bytes writer (variable-length: needs safe Append) ---
127-
128-
template<typename ArrowBuilderT>
129-
class StringLikeWriter final : public FieldWriter
130-
{
131-
public:
132-
StringLikeWriter( const std::string & columnName, const StructFieldPtr & field,
133-
std::shared_ptr<::arrow::DataType> dataType )
134-
: FieldWriter( columnName, field, std::make_shared<ArrowBuilderT>(), std::move( dataType ) ),
135-
m_typedBuilder( static_cast<ArrowBuilderT *>( m_builder.get() ) ) {}
136-
71+
// Columnar bulk path: caller has reserved, so set values use the unsafe append.
13772
void writeAll( const std::vector<StructPtr> & structs, int64_t offset, int64_t count ) override
13873
{
13974
for( int64_t i = offset; i < offset + count; ++i )
14075
{
14176
const Struct * s = structs[i].get();
14277
if( m_field -> isSet( s ) )
143-
{
144-
auto & val = m_field -> value<std::string>( s );
145-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( val.c_str(), val.length() ), "Failed to append string/bytes" );
146-
}
78+
m_kernel.appendUnsafe( m_field -> value<CspT>( s ) );
14779
else
148-
ARROW_OK_OR_THROW( m_typedBuilder -> AppendNull(), "Failed to append null" );
80+
m_kernel.appendNull();
14981
}
15082
}
15183

15284
protected:
15385
void doWrite( const Struct * s ) override
15486
{
155-
auto & val = m_field -> value<std::string>( s );
156-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( val.c_str(), val.length() ), "Failed to append string/bytes" );
87+
m_kernel.append( m_field -> value<CspT>( s ) );
15788
}
15889

15990
private:
160-
ArrowBuilderT * m_typedBuilder;
161-
};
162-
163-
// --- Enum writer (variable-length string: CspEnum → name()) ---
164-
165-
class EnumWriter final : public FieldWriter
166-
{
167-
public:
168-
EnumWriter( const std::string & columnName, const StructFieldPtr & field )
169-
: FieldWriter( columnName, field, std::make_shared<::arrow::StringBuilder>(), ::arrow::utf8() ),
170-
m_typedBuilder( static_cast<::arrow::StringBuilder *>( m_builder.get() ) ) {}
171-
172-
void writeAll( const std::vector<StructPtr> & structs, int64_t offset, int64_t count ) override
91+
// Delegating ctor: build the kernel first so its builder/dataType can seed the FieldWriter base,
92+
// then adopt the kernel (both hold the same shared builder).
93+
ScalarFieldWriter( const std::string & columnName, const StructFieldPtr & field,
94+
ArrowScalarColumnWriter<CspT> kernel )
95+
: FieldWriter( columnName, field, kernel.builder(), kernel.dataType() ),
96+
m_kernel( std::move( kernel ) )
17397
{
174-
for( int64_t i = offset; i < offset + count; ++i )
175-
{
176-
const Struct * s = structs[i].get();
177-
if( m_field -> isSet( s ) )
178-
{
179-
auto & n = m_field -> value<CspEnum>( s ).name();
180-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( n.c_str(), n.length() ), "Failed to append enum" );
181-
}
182-
else
183-
ARROW_OK_OR_THROW( m_typedBuilder -> AppendNull(), "Failed to append null" );
184-
}
18598
}
18699

187-
protected:
188-
void doWrite( const Struct * s ) override
189-
{
190-
auto & n = m_field -> value<CspEnum>( s ).name();
191-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( n.c_str(), n.length() ), "Failed to append enum" );
192-
}
193-
194-
private:
195-
::arrow::StringBuilder * m_typedBuilder;
100+
ArrowScalarColumnWriter<CspT> m_kernel;
196101
};
197102

103+
template<typename CspT>
104+
CreatedFieldWriter makeScalarFieldWriter( const std::string & name, const StructFieldPtr & f, bool isBytes = false )
105+
{
106+
auto w = std::make_unique<ScalarFieldWriter<CspT>>( name, f, isBytes );
107+
auto b = w -> builder();
108+
return { std::move( w ), std::move( b ) };
109+
}
110+
198111
// --- Nested struct writer (recursive) ---
199112

200113
class NestedStructWriter final : public FieldWriter
@@ -272,14 +185,6 @@ bool isBytesField( const StructFieldPtr & field )
272185
return strType && strType -> isBytes();
273186
}
274187

275-
template<typename WriterT, typename... Args>
276-
CreatedFieldWriter makeWriter( Args &&... args )
277-
{
278-
auto w = std::make_unique<WriterT>( std::forward<Args>( args )... );
279-
auto b = w -> builder();
280-
return { std::move( w ), std::move( b ) };
281-
}
282-
283188
} // anonymous namespace
284189

285190
CreatedFieldWriter createFieldWriter(
@@ -291,50 +196,29 @@ CreatedFieldWriter createFieldWriter(
291196
switch( f -> type() -> type() )
292197
{
293198
// --- Numeric ---
294-
case CspType::Type::BOOL:
295-
{
296-
auto b = std::make_shared<::arrow::BooleanBuilder>();
297-
return makeUnsafeWriter( columnName, f, b, ::arrow::boolean(),
298-
[f]( const Struct * s ) { return f -> value<bool>( s ); } );
299-
}
300-
case CspType::Type::INT8: return makePrimitiveWriter<int8_t, ::arrow::Int8Builder>( columnName, f );
301-
case CspType::Type::INT16: return makePrimitiveWriter<int16_t, ::arrow::Int16Builder>( columnName, f );
302-
case CspType::Type::INT32: return makePrimitiveWriter<int32_t, ::arrow::Int32Builder>( columnName, f );
303-
case CspType::Type::INT64: return makePrimitiveWriter<int64_t, ::arrow::Int64Builder>( columnName, f );
304-
case CspType::Type::UINT8: return makePrimitiveWriter<uint8_t, ::arrow::UInt8Builder>( columnName, f );
305-
case CspType::Type::UINT16: return makePrimitiveWriter<uint16_t, ::arrow::UInt16Builder>( columnName, f );
306-
case CspType::Type::UINT32: return makePrimitiveWriter<uint32_t, ::arrow::UInt32Builder>( columnName, f );
307-
case CspType::Type::UINT64: return makePrimitiveWriter<uint64_t, ::arrow::UInt64Builder>( columnName, f );
308-
case CspType::Type::DOUBLE: return makePrimitiveWriter<double, ::arrow::DoubleBuilder>( columnName, f );
199+
case CspType::Type::BOOL: return makeScalarFieldWriter<bool>( columnName, f );
200+
case CspType::Type::INT8: return makeScalarFieldWriter<int8_t>( columnName, f );
201+
case CspType::Type::INT16: return makeScalarFieldWriter<int16_t>( columnName, f );
202+
case CspType::Type::INT32: return makeScalarFieldWriter<int32_t>( columnName, f );
203+
case CspType::Type::INT64: return makeScalarFieldWriter<int64_t>( columnName, f );
204+
case CspType::Type::UINT8: return makeScalarFieldWriter<uint8_t>( columnName, f );
205+
case CspType::Type::UINT16: return makeScalarFieldWriter<uint16_t>( columnName, f );
206+
case CspType::Type::UINT32: return makeScalarFieldWriter<uint32_t>( columnName, f );
207+
case CspType::Type::UINT64: return makeScalarFieldWriter<uint64_t>( columnName, f );
208+
case CspType::Type::DOUBLE: return makeScalarFieldWriter<double>( columnName, f );
309209

310210
// --- String / Bytes ---
311-
case CspType::Type::STRING:
312-
if( isBytesField( f ) )
313-
return makeWriter<StringLikeWriter<::arrow::BinaryBuilder>>( columnName, f, ::arrow::binary() );
314-
return makeWriter<StringLikeWriter<::arrow::StringBuilder>>( columnName, f, ::arrow::utf8() );
211+
case CspType::Type::STRING: return makeScalarFieldWriter<std::string>( columnName, f, isBytesField( f ) );
315212

316-
case CspType::Type::ENUM: return makeWriter<EnumWriter>( columnName, f );
213+
case CspType::Type::ENUM: return makeScalarFieldWriter<CspEnum>( columnName, f );
317214

318215
// --- Temporal ---
319-
case CspType::Type::DATETIME:
320-
return makeNanosWriter<DateTime, ::arrow::TimestampBuilder>(
321-
columnName, f, std::make_shared<::arrow::TimestampType>( ::arrow::TimeUnit::NANO, "UTC" ) );
322-
case CspType::Type::TIMEDELTA:
323-
return makeNanosWriter<TimeDelta, ::arrow::DurationBuilder>(
324-
columnName, f, std::make_shared<::arrow::DurationType>( ::arrow::TimeUnit::NANO ) );
325-
case CspType::Type::TIME:
326-
return makeNanosWriter<Time, ::arrow::Time64Builder>(
327-
columnName, f, std::make_shared<::arrow::Time64Type>( ::arrow::TimeUnit::NANO ) );
216+
case CspType::Type::DATETIME: return makeScalarFieldWriter<DateTime>( columnName, f );
217+
case CspType::Type::TIMEDELTA: return makeScalarFieldWriter<TimeDelta>( columnName, f );
218+
case CspType::Type::TIME: return makeScalarFieldWriter<Time>( columnName, f );
328219

329220
// --- Date (days since epoch) ---
330-
case CspType::Type::DATE:
331-
{
332-
auto b = std::make_shared<::arrow::Date32Builder>();
333-
return makeUnsafeWriter( columnName, f, b, ::arrow::date32(), [f]( const Struct * s ) {
334-
auto & d = f -> value<Date>( s );
335-
return static_cast<int32_t>( DateTime( d.year(), d.month(), d.day() ).asNanoseconds() / csp::NANOS_PER_DAY );
336-
} );
337-
}
221+
case CspType::Type::DATE: return makeScalarFieldWriter<Date>( columnName, f );
338222

339223
// --- Nested struct ---
340224
case CspType::Type::STRUCT:
@@ -345,7 +229,11 @@ CreatedFieldWriter createFieldWriter(
345229
std::vector<std::shared_ptr<::arrow::ArrayBuilder>> childBuilders;
346230
std::vector<std::unique_ptr<FieldWriter>> childWriters;
347231

348-
// Use fieldNames() for stable insertion order (fields() is sorted for memory layout)
232+
// Ordering convention: csp writes struct child columns in declaration order, using
233+
// fieldNames() rather than fields() (which is sorted by struct memory layout). This keeps
234+
// nested-struct child order consistent with top-level column order and stable against
235+
// internal field packing. The reader matches children by name, so either order round-trips,
236+
// but declaration order is the on-disk contract.
349237
for( auto & subFieldName : nestedMeta -> fieldNames() )
350238
{
351239
auto subField = nestedMeta -> field( subFieldName );

0 commit comments

Comments
 (0)