Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 52 additions & 164 deletions cpp/csp/adapters/arrow/ArrowFieldWriter.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Concrete FieldWriter implementations for all CSP scalar types.
//
// Fixed-length writers use UnsafeWriter<BuilderT> — a single template
// that takes a value-extraction callable at construction (mirrors LambdaReader).
// Variable-length writers (StringLike, Enum) and NestedStruct are separate
// classes because they need safe Append (variable-length) or recursive logic.
// Every scalar/temporal/string/enum column is a ScalarFieldWriter<CspT>: it reads the value out of
// a struct field and appends it through the shared ArrowScalarColumnWriter<CspT> kernel (the single
// home of the per-type Arrow-builder choice + conversion, also used value-first by the scalar output
// path). NestedStruct is a separate class because it is recursive.

#include <csp/adapters/arrow/ArrowFieldWriter.h>
#include <csp/adapters/arrow/ArrowScalarWriter.h>
#include <csp/engine/CspType.h>
#include <csp/engine/CspEnum.h>

Expand Down Expand Up @@ -54,147 +55,59 @@ std::vector<std::shared_ptr<::arrow::Array>> FieldWriter::finish()
namespace
{

// --- Generic lambda-based writer for fixed-length types ---
// ValueFn signature: auto(const Struct *) — returns the value to UnsafeAppend/Append.
// Covers: all numeric primitives, bool, DateTime, TimeDelta, Time, Date.
// --- Scalar field writer: reads field->value<CspT>(s), appends via the shared kernel ---
// Replaces the former per-category writers (numeric/temporal/string/enum). The per-type Arrow
// builder + value conversion now live exactly once, in ArrowScalarColumnWriter<CspT>.

template<typename ArrowBuilderT, typename ValueFn>
class UnsafeWriter final : public FieldWriter
template<typename CspT>
class ScalarFieldWriter final : public FieldWriter
{
public:
UnsafeWriter( const std::string & columnName, const StructFieldPtr & field,
std::shared_ptr<ArrowBuilderT> typedBuilder,
std::shared_ptr<::arrow::DataType> dataType, ValueFn fn )
: FieldWriter( columnName, field, typedBuilder, std::move( dataType ) ),
m_typedBuilder( typedBuilder.get() ), m_fn( std::move( fn ) ) {}

void writeAll( const std::vector<StructPtr> & structs, int64_t offset, int64_t count ) override
ScalarFieldWriter( const std::string & columnName, const StructFieldPtr & field, bool isBytes = false )
: ScalarFieldWriter( columnName, field, ArrowScalarColumnWriter<CspT>( isBytes ) )
{
for( int64_t i = offset; i < offset + count; ++i )
{
const Struct * s = structs[i].get();
if( m_field -> isSet( s ) )
m_typedBuilder -> UnsafeAppend( m_fn( s ) );
else
m_typedBuilder -> UnsafeAppendNull();
}
}

protected:
void doWrite( const Struct * s ) override
{
ARROW_OK_OR_THROW( m_typedBuilder -> Append( m_fn( s ) ), "Failed to append value" );
}

private:
ArrowBuilderT * m_typedBuilder;
ValueFn m_fn;
};

// Factory: creates an UnsafeWriter, deducing ValueFn type, and returns CreatedFieldWriter
template<typename ArrowBuilderT, typename ValueFn>
CreatedFieldWriter makeUnsafeWriter( const std::string & name, const StructFieldPtr & field,
std::shared_ptr<ArrowBuilderT> builder,
std::shared_ptr<::arrow::DataType> dataType, ValueFn && fn )
{
auto w = std::make_unique<UnsafeWriter<ArrowBuilderT, std::decay_t<ValueFn>>>(
name, field, builder, std::move( dataType ), std::forward<ValueFn>( fn ) );
return { std::move( w ), std::move( builder ) };
}

// Factory: primitive numeric writer (auto-creates builder from default constructor)
template<typename CspT, typename ArrowBuilderT>
CreatedFieldWriter makePrimitiveWriter( const std::string & name, const StructFieldPtr & f )
{
auto b = std::make_shared<ArrowBuilderT>();
return makeUnsafeWriter( name, f, b, b -> type(), [f]( const Struct * s ) {
return static_cast<typename ArrowBuilderT::value_type>( f -> value<CspT>( s ) );
} );
}

// Factory: nanosecond-based temporal writer (DateTime, TimeDelta, Time)
template<typename CspT, typename ArrowBuilderT>
CreatedFieldWriter makeNanosWriter( const std::string & name, const StructFieldPtr & f,
std::shared_ptr<::arrow::DataType> dataType )
{
auto b = std::make_shared<ArrowBuilderT>( dataType, ::arrow::default_memory_pool() );
return makeUnsafeWriter( name, f, b, std::move( dataType ), [f]( const Struct * s ) {
return f -> value<CspT>( s ).asNanoseconds();
} );
}

// --- String / Bytes writer (variable-length: needs safe Append) ---

template<typename ArrowBuilderT>
class StringLikeWriter final : public FieldWriter
{
public:
StringLikeWriter( const std::string & columnName, const StructFieldPtr & field,
std::shared_ptr<::arrow::DataType> dataType )
: FieldWriter( columnName, field, std::make_shared<ArrowBuilderT>(), std::move( dataType ) ),
m_typedBuilder( static_cast<ArrowBuilderT *>( m_builder.get() ) ) {}

// Columnar bulk path: caller has reserved, so set values use the unsafe append.
void writeAll( const std::vector<StructPtr> & structs, int64_t offset, int64_t count ) override
{
for( int64_t i = offset; i < offset + count; ++i )
{
const Struct * s = structs[i].get();
if( m_field -> isSet( s ) )
{
auto & val = m_field -> value<std::string>( s );
ARROW_OK_OR_THROW( m_typedBuilder -> Append( val.c_str(), val.length() ), "Failed to append string/bytes" );
}
m_kernel.appendUnsafe( m_field -> value<CspT>( s ) );
else
ARROW_OK_OR_THROW( m_typedBuilder -> AppendNull(), "Failed to append null" );
m_kernel.appendNull();
}
}

protected:
void doWrite( const Struct * s ) override
{
auto & val = m_field -> value<std::string>( s );
ARROW_OK_OR_THROW( m_typedBuilder -> Append( val.c_str(), val.length() ), "Failed to append string/bytes" );
m_kernel.append( m_field -> value<CspT>( s ) );
}

private:
ArrowBuilderT * m_typedBuilder;
};

// --- Enum writer (variable-length string: CspEnum → name()) ---

class EnumWriter final : public FieldWriter
{
public:
EnumWriter( const std::string & columnName, const StructFieldPtr & field )
: FieldWriter( columnName, field, std::make_shared<::arrow::StringBuilder>(), ::arrow::utf8() ),
m_typedBuilder( static_cast<::arrow::StringBuilder *>( m_builder.get() ) ) {}

void writeAll( const std::vector<StructPtr> & structs, int64_t offset, int64_t count ) override
// Delegating ctor: build the kernel first so its builder/dataType can seed the FieldWriter base,
// then adopt the kernel (both hold the same shared builder).
ScalarFieldWriter( const std::string & columnName, const StructFieldPtr & field,
ArrowScalarColumnWriter<CspT> kernel )
: FieldWriter( columnName, field, kernel.builder(), kernel.dataType() ),
m_kernel( std::move( kernel ) )
{
for( int64_t i = offset; i < offset + count; ++i )
{
const Struct * s = structs[i].get();
if( m_field -> isSet( s ) )
{
auto & n = m_field -> value<CspEnum>( s ).name();
ARROW_OK_OR_THROW( m_typedBuilder -> Append( n.c_str(), n.length() ), "Failed to append enum" );
}
else
ARROW_OK_OR_THROW( m_typedBuilder -> AppendNull(), "Failed to append null" );
}
}

protected:
void doWrite( const Struct * s ) override
{
auto & n = m_field -> value<CspEnum>( s ).name();
ARROW_OK_OR_THROW( m_typedBuilder -> Append( n.c_str(), n.length() ), "Failed to append enum" );
}

private:
::arrow::StringBuilder * m_typedBuilder;
ArrowScalarColumnWriter<CspT> m_kernel;
};

template<typename CspT>
CreatedFieldWriter makeScalarFieldWriter( const std::string & name, const StructFieldPtr & f, bool isBytes = false )
{
auto w = std::make_unique<ScalarFieldWriter<CspT>>( name, f, isBytes );
auto b = w -> builder();
return { std::move( w ), std::move( b ) };
}

// --- Nested struct writer (recursive) ---

class NestedStructWriter final : public FieldWriter
Expand Down Expand Up @@ -272,14 +185,6 @@ bool isBytesField( const StructFieldPtr & field )
return strType && strType -> isBytes();
}

template<typename WriterT, typename... Args>
CreatedFieldWriter makeWriter( Args &&... args )
{
auto w = std::make_unique<WriterT>( std::forward<Args>( args )... );
auto b = w -> builder();
return { std::move( w ), std::move( b ) };
}

} // anonymous namespace

CreatedFieldWriter createFieldWriter(
Expand All @@ -291,50 +196,29 @@ CreatedFieldWriter createFieldWriter(
switch( f -> type() -> type() )
{
// --- Numeric ---
case CspType::Type::BOOL:
{
auto b = std::make_shared<::arrow::BooleanBuilder>();
return makeUnsafeWriter( columnName, f, b, ::arrow::boolean(),
[f]( const Struct * s ) { return f -> value<bool>( s ); } );
}
case CspType::Type::INT8: return makePrimitiveWriter<int8_t, ::arrow::Int8Builder>( columnName, f );
case CspType::Type::INT16: return makePrimitiveWriter<int16_t, ::arrow::Int16Builder>( columnName, f );
case CspType::Type::INT32: return makePrimitiveWriter<int32_t, ::arrow::Int32Builder>( columnName, f );
case CspType::Type::INT64: return makePrimitiveWriter<int64_t, ::arrow::Int64Builder>( columnName, f );
case CspType::Type::UINT8: return makePrimitiveWriter<uint8_t, ::arrow::UInt8Builder>( columnName, f );
case CspType::Type::UINT16: return makePrimitiveWriter<uint16_t, ::arrow::UInt16Builder>( columnName, f );
case CspType::Type::UINT32: return makePrimitiveWriter<uint32_t, ::arrow::UInt32Builder>( columnName, f );
case CspType::Type::UINT64: return makePrimitiveWriter<uint64_t, ::arrow::UInt64Builder>( columnName, f );
case CspType::Type::DOUBLE: return makePrimitiveWriter<double, ::arrow::DoubleBuilder>( columnName, f );
case CspType::Type::BOOL: return makeScalarFieldWriter<bool>( columnName, f );
case CspType::Type::INT8: return makeScalarFieldWriter<int8_t>( columnName, f );
case CspType::Type::INT16: return makeScalarFieldWriter<int16_t>( columnName, f );
case CspType::Type::INT32: return makeScalarFieldWriter<int32_t>( columnName, f );
case CspType::Type::INT64: return makeScalarFieldWriter<int64_t>( columnName, f );
case CspType::Type::UINT8: return makeScalarFieldWriter<uint8_t>( columnName, f );
case CspType::Type::UINT16: return makeScalarFieldWriter<uint16_t>( columnName, f );
case CspType::Type::UINT32: return makeScalarFieldWriter<uint32_t>( columnName, f );
case CspType::Type::UINT64: return makeScalarFieldWriter<uint64_t>( columnName, f );
case CspType::Type::DOUBLE: return makeScalarFieldWriter<double>( columnName, f );

// --- String / Bytes ---
case CspType::Type::STRING:
if( isBytesField( f ) )
return makeWriter<StringLikeWriter<::arrow::BinaryBuilder>>( columnName, f, ::arrow::binary() );
return makeWriter<StringLikeWriter<::arrow::StringBuilder>>( columnName, f, ::arrow::utf8() );
case CspType::Type::STRING: return makeScalarFieldWriter<std::string>( columnName, f, isBytesField( f ) );

case CspType::Type::ENUM: return makeWriter<EnumWriter>( columnName, f );
case CspType::Type::ENUM: return makeScalarFieldWriter<CspEnum>( columnName, f );

// --- Temporal ---
case CspType::Type::DATETIME:
return makeNanosWriter<DateTime, ::arrow::TimestampBuilder>(
columnName, f, std::make_shared<::arrow::TimestampType>( ::arrow::TimeUnit::NANO, "UTC" ) );
case CspType::Type::TIMEDELTA:
return makeNanosWriter<TimeDelta, ::arrow::DurationBuilder>(
columnName, f, std::make_shared<::arrow::DurationType>( ::arrow::TimeUnit::NANO ) );
case CspType::Type::TIME:
return makeNanosWriter<Time, ::arrow::Time64Builder>(
columnName, f, std::make_shared<::arrow::Time64Type>( ::arrow::TimeUnit::NANO ) );
case CspType::Type::DATETIME: return makeScalarFieldWriter<DateTime>( columnName, f );
case CspType::Type::TIMEDELTA: return makeScalarFieldWriter<TimeDelta>( columnName, f );
case CspType::Type::TIME: return makeScalarFieldWriter<Time>( columnName, f );

// --- Date (days since epoch) ---
case CspType::Type::DATE:
{
auto b = std::make_shared<::arrow::Date32Builder>();
return makeUnsafeWriter( columnName, f, b, ::arrow::date32(), [f]( const Struct * s ) {
auto & d = f -> value<Date>( s );
return static_cast<int32_t>( DateTime( d.year(), d.month(), d.day() ).asNanoseconds() / csp::NANOS_PER_DAY );
} );
}
case CspType::Type::DATE: return makeScalarFieldWriter<Date>( columnName, f );

// --- Nested struct ---
case CspType::Type::STRUCT:
Expand All @@ -345,7 +229,11 @@ CreatedFieldWriter createFieldWriter(
std::vector<std::shared_ptr<::arrow::ArrayBuilder>> childBuilders;
std::vector<std::unique_ptr<FieldWriter>> childWriters;

// Use fieldNames() for stable insertion order (fields() is sorted for memory layout)
// Ordering convention: csp writes struct child columns in declaration order, using
// fieldNames() rather than fields() (which is sorted by struct memory layout). This keeps
// nested-struct child order consistent with top-level column order and stable against
// internal field packing. The reader matches children by name, so either order round-trips,
// but declaration order is the on-disk contract.
for( auto & subFieldName : nestedMeta -> fieldNames() )
{
auto subField = nestedMeta -> field( subFieldName );
Expand Down
Loading
Loading