diff --git a/cpp/csp/adapters/arrow/ArrowFieldWriter.cpp b/cpp/csp/adapters/arrow/ArrowFieldWriter.cpp index 53f9979d2..dd381ec12 100644 --- a/cpp/csp/adapters/arrow/ArrowFieldWriter.cpp +++ b/cpp/csp/adapters/arrow/ArrowFieldWriter.cpp @@ -1,11 +1,12 @@ // Concrete FieldWriter implementations for all CSP scalar types. // -// Fixed-length writers use UnsafeWriter — a single template -// that takes a value-extraction callable at construction (mirrors LambdaReader). -// Variable-length writers (StringLike, Enum) and NestedStruct are separate -// classes because they need safe Append (variable-length) or recursive logic. +// Every scalar/temporal/string/enum column is a ScalarFieldWriter: it reads the value out of +// a struct field and appends it through the shared ArrowScalarColumnWriter kernel (the single +// home of the per-type Arrow-builder choice + conversion, also used value-first by the scalar output +// path). NestedStruct is a separate class because it is recursive. #include +#include #include #include @@ -54,147 +55,59 @@ std::vector> FieldWriter::finish() namespace { -// --- Generic lambda-based writer for fixed-length types --- -// ValueFn signature: auto(const Struct *) — returns the value to UnsafeAppend/Append. -// Covers: all numeric primitives, bool, DateTime, TimeDelta, Time, Date. +// --- Scalar field writer: reads field->value(s), appends via the shared kernel --- +// Replaces the former per-category writers (numeric/temporal/string/enum). The per-type Arrow +// builder + value conversion now live exactly once, in ArrowScalarColumnWriter. -template -class UnsafeWriter final : public FieldWriter +template +class ScalarFieldWriter final : public FieldWriter { public: - UnsafeWriter( const std::string & columnName, const StructFieldPtr & field, - std::shared_ptr typedBuilder, - std::shared_ptr<::arrow::DataType> dataType, ValueFn fn ) - : FieldWriter( columnName, field, typedBuilder, std::move( dataType ) ), - m_typedBuilder( typedBuilder.get() ), m_fn( std::move( fn ) ) {} - - void writeAll( const std::vector & structs, int64_t offset, int64_t count ) override + ScalarFieldWriter( const std::string & columnName, const StructFieldPtr & field, bool isBytes = false ) + : ScalarFieldWriter( columnName, field, ArrowScalarColumnWriter( isBytes ) ) { - for( int64_t i = offset; i < offset + count; ++i ) - { - const Struct * s = structs[i].get(); - if( m_field -> isSet( s ) ) - m_typedBuilder -> UnsafeAppend( m_fn( s ) ); - else - m_typedBuilder -> UnsafeAppendNull(); - } } -protected: - void doWrite( const Struct * s ) override - { - ARROW_OK_OR_THROW( m_typedBuilder -> Append( m_fn( s ) ), "Failed to append value" ); - } - -private: - ArrowBuilderT * m_typedBuilder; - ValueFn m_fn; -}; - -// Factory: creates an UnsafeWriter, deducing ValueFn type, and returns CreatedFieldWriter -template -CreatedFieldWriter makeUnsafeWriter( const std::string & name, const StructFieldPtr & field, - std::shared_ptr builder, - std::shared_ptr<::arrow::DataType> dataType, ValueFn && fn ) -{ - auto w = std::make_unique>>( - name, field, builder, std::move( dataType ), std::forward( fn ) ); - return { std::move( w ), std::move( builder ) }; -} - -// Factory: primitive numeric writer (auto-creates builder from default constructor) -template -CreatedFieldWriter makePrimitiveWriter( const std::string & name, const StructFieldPtr & f ) -{ - auto b = std::make_shared(); - return makeUnsafeWriter( name, f, b, b -> type(), [f]( const Struct * s ) { - return static_cast( f -> value( s ) ); - } ); -} - -// Factory: nanosecond-based temporal writer (DateTime, TimeDelta, Time) -template -CreatedFieldWriter makeNanosWriter( const std::string & name, const StructFieldPtr & f, - std::shared_ptr<::arrow::DataType> dataType ) -{ - auto b = std::make_shared( dataType, ::arrow::default_memory_pool() ); - return makeUnsafeWriter( name, f, b, std::move( dataType ), [f]( const Struct * s ) { - return f -> value( s ).asNanoseconds(); - } ); -} - -// --- String / Bytes writer (variable-length: needs safe Append) --- - -template -class StringLikeWriter final : public FieldWriter -{ -public: - StringLikeWriter( const std::string & columnName, const StructFieldPtr & field, - std::shared_ptr<::arrow::DataType> dataType ) - : FieldWriter( columnName, field, std::make_shared(), std::move( dataType ) ), - m_typedBuilder( static_cast( m_builder.get() ) ) {} - + // Columnar bulk path: caller has reserved, so set values use the unsafe append. void writeAll( const std::vector & structs, int64_t offset, int64_t count ) override { for( int64_t i = offset; i < offset + count; ++i ) { const Struct * s = structs[i].get(); if( m_field -> isSet( s ) ) - { - auto & val = m_field -> value( s ); - ARROW_OK_OR_THROW( m_typedBuilder -> Append( val.c_str(), val.length() ), "Failed to append string/bytes" ); - } + m_kernel.appendUnsafe( m_field -> value( s ) ); else - ARROW_OK_OR_THROW( m_typedBuilder -> AppendNull(), "Failed to append null" ); + m_kernel.appendNull(); } } protected: void doWrite( const Struct * s ) override { - auto & val = m_field -> value( s ); - ARROW_OK_OR_THROW( m_typedBuilder -> Append( val.c_str(), val.length() ), "Failed to append string/bytes" ); + m_kernel.append( m_field -> value( s ) ); } private: - ArrowBuilderT * m_typedBuilder; -}; - -// --- Enum writer (variable-length string: CspEnum → name()) --- - -class EnumWriter final : public FieldWriter -{ -public: - EnumWriter( const std::string & columnName, const StructFieldPtr & field ) - : FieldWriter( columnName, field, std::make_shared<::arrow::StringBuilder>(), ::arrow::utf8() ), - m_typedBuilder( static_cast<::arrow::StringBuilder *>( m_builder.get() ) ) {} - - void writeAll( const std::vector & structs, int64_t offset, int64_t count ) override + // Delegating ctor: build the kernel first so its builder/dataType can seed the FieldWriter base, + // then adopt the kernel (both hold the same shared builder). + ScalarFieldWriter( const std::string & columnName, const StructFieldPtr & field, + ArrowScalarColumnWriter kernel ) + : FieldWriter( columnName, field, kernel.builder(), kernel.dataType() ), + m_kernel( std::move( kernel ) ) { - for( int64_t i = offset; i < offset + count; ++i ) - { - const Struct * s = structs[i].get(); - if( m_field -> isSet( s ) ) - { - auto & n = m_field -> value( s ).name(); - ARROW_OK_OR_THROW( m_typedBuilder -> Append( n.c_str(), n.length() ), "Failed to append enum" ); - } - else - ARROW_OK_OR_THROW( m_typedBuilder -> AppendNull(), "Failed to append null" ); - } } -protected: - void doWrite( const Struct * s ) override - { - auto & n = m_field -> value( s ).name(); - ARROW_OK_OR_THROW( m_typedBuilder -> Append( n.c_str(), n.length() ), "Failed to append enum" ); - } - -private: - ::arrow::StringBuilder * m_typedBuilder; + ArrowScalarColumnWriter m_kernel; }; +template +CreatedFieldWriter makeScalarFieldWriter( const std::string & name, const StructFieldPtr & f, bool isBytes = false ) +{ + auto w = std::make_unique>( name, f, isBytes ); + auto b = w -> builder(); + return { std::move( w ), std::move( b ) }; +} + // --- Nested struct writer (recursive) --- class NestedStructWriter final : public FieldWriter @@ -272,14 +185,6 @@ bool isBytesField( const StructFieldPtr & field ) return strType && strType -> isBytes(); } -template -CreatedFieldWriter makeWriter( Args &&... args ) -{ - auto w = std::make_unique( std::forward( args )... ); - auto b = w -> builder(); - return { std::move( w ), std::move( b ) }; -} - } // anonymous namespace CreatedFieldWriter createFieldWriter( @@ -291,50 +196,29 @@ CreatedFieldWriter createFieldWriter( switch( f -> type() -> type() ) { // --- Numeric --- - case CspType::Type::BOOL: - { - auto b = std::make_shared<::arrow::BooleanBuilder>(); - return makeUnsafeWriter( columnName, f, b, ::arrow::boolean(), - [f]( const Struct * s ) { return f -> value( s ); } ); - } - case CspType::Type::INT8: return makePrimitiveWriter( columnName, f ); - case CspType::Type::INT16: return makePrimitiveWriter( columnName, f ); - case CspType::Type::INT32: return makePrimitiveWriter( columnName, f ); - case CspType::Type::INT64: return makePrimitiveWriter( columnName, f ); - case CspType::Type::UINT8: return makePrimitiveWriter( columnName, f ); - case CspType::Type::UINT16: return makePrimitiveWriter( columnName, f ); - case CspType::Type::UINT32: return makePrimitiveWriter( columnName, f ); - case CspType::Type::UINT64: return makePrimitiveWriter( columnName, f ); - case CspType::Type::DOUBLE: return makePrimitiveWriter( columnName, f ); + case CspType::Type::BOOL: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::INT8: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::INT16: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::INT32: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::INT64: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::UINT8: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::UINT16: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::UINT32: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::UINT64: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::DOUBLE: return makeScalarFieldWriter( columnName, f ); // --- String / Bytes --- - case CspType::Type::STRING: - if( isBytesField( f ) ) - return makeWriter>( columnName, f, ::arrow::binary() ); - return makeWriter>( columnName, f, ::arrow::utf8() ); + case CspType::Type::STRING: return makeScalarFieldWriter( columnName, f, isBytesField( f ) ); - case CspType::Type::ENUM: return makeWriter( columnName, f ); + case CspType::Type::ENUM: return makeScalarFieldWriter( columnName, f ); // --- Temporal --- - case CspType::Type::DATETIME: - return makeNanosWriter( - columnName, f, std::make_shared<::arrow::TimestampType>( ::arrow::TimeUnit::NANO, "UTC" ) ); - case CspType::Type::TIMEDELTA: - return makeNanosWriter( - columnName, f, std::make_shared<::arrow::DurationType>( ::arrow::TimeUnit::NANO ) ); - case CspType::Type::TIME: - return makeNanosWriter( - columnName, f, std::make_shared<::arrow::Time64Type>( ::arrow::TimeUnit::NANO ) ); + case CspType::Type::DATETIME: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::TIMEDELTA: return makeScalarFieldWriter( columnName, f ); + case CspType::Type::TIME: return makeScalarFieldWriter