Skip to content

Commit fa752ed

Browse files
arhamchopraCopilot
andcommitted
Parquet output: value-first scalar writer, drop scratch struct
Scalar output columns (the timestamp column, every plain publish(), and the dict-basket symbol/value_count columns) no longer write into a synthetic one-field scratch Struct just to reuse the struct FieldWriter. Instead the per-type Arrow conversion lives once in a new ArrowScalarColumnWriter<CspT> kernel, shared by: - the struct field path (ScalarFieldWriter<CspT>: read field->value<CspT>(s), then append), and - the scalar path (ScalarColumnArrayBuilder<CspT>: append the ticked value directly). ArrowBackedArrayBuilder is reduced to external (struct-field) mode only. The public FieldWriter interface is unchanged, so the struct_to_record_batches node and numpy writers only recompile. Removes the scratch struct's per-row setValue/isSet/clearIsSet on the scalar/timestamp hot path (~8-10% faster there); output is byte-identical. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 04445fc commit fa752ed

9 files changed

Lines changed: 299 additions & 331 deletions

cpp/csp/adapters/arrow/ArrowFieldWriter.cpp

Lines changed: 47 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
// Concrete FieldWriter implementations for all CSP scalar types.
22
//
3-
// Fixed-length writers use UnsafeWriter<BuilderT> — a single template
4-
// that takes a value-extraction callable at construction (mirrors LambdaReader).
5-
// Variable-length writers (StringLike, Enum) and NestedStruct are separate
6-
// classes because they need safe Append (variable-length) or recursive logic.
3+
// Every scalar/temporal/string/enum column is a ScalarFieldWriter<CspT>: it reads the value out of
4+
// a struct field and appends it through the shared ArrowScalarColumnWriter<CspT> kernel (the single
5+
// home of the per-type Arrow-builder choice + conversion, also used value-first by the scalar output
6+
// path). NestedStruct is a separate class because it is recursive.
77

88
#include <csp/adapters/arrow/ArrowFieldWriter.h>
9+
#include <csp/adapters/arrow/ArrowScalarWriter.h>
910
#include <csp/engine/CspType.h>
1011
#include <csp/engine/CspEnum.h>
1112

@@ -54,147 +55,59 @@ std::vector<std::shared_ptr<::arrow::Array>> FieldWriter::finish()
5455
namespace
5556
{
5657

57-
// --- Generic lambda-based writer for fixed-length types ---
58-
// ValueFn signature: auto(const Struct *) — returns the value to UnsafeAppend/Append.
59-
// Covers: all numeric primitives, bool, DateTime, TimeDelta, Time, Date.
58+
// --- Scalar field writer: reads field->value<CspT>(s), appends via the shared kernel ---
59+
// Replaces the former per-category writers (numeric/temporal/string/enum). The per-type Arrow
60+
// builder + value conversion now live exactly once, in ArrowScalarColumnWriter<CspT>.
6061

61-
template<typename ArrowBuilderT, typename ValueFn>
62-
class UnsafeWriter final : public FieldWriter
62+
template<typename CspT>
63+
class ScalarFieldWriter final : public FieldWriter
6364
{
6465
public:
65-
UnsafeWriter( const std::string & columnName, const StructFieldPtr & field,
66-
std::shared_ptr<ArrowBuilderT> typedBuilder,
67-
std::shared_ptr<::arrow::DataType> dataType, ValueFn fn )
68-
: FieldWriter( columnName, field, typedBuilder, std::move( dataType ) ),
69-
m_typedBuilder( typedBuilder.get() ), m_fn( std::move( fn ) ) {}
70-
71-
void writeAll( const std::vector<StructPtr> & structs, int64_t offset, int64_t count ) override
66+
ScalarFieldWriter( const std::string & columnName, const StructFieldPtr & field, bool isBytes = false )
67+
: ScalarFieldWriter( columnName, field, ArrowScalarColumnWriter<CspT>( isBytes ) )
7268
{
73-
for( int64_t i = offset; i < offset + count; ++i )
74-
{
75-
const Struct * s = structs[i].get();
76-
if( m_field -> isSet( s ) )
77-
m_typedBuilder -> UnsafeAppend( m_fn( s ) );
78-
else
79-
m_typedBuilder -> UnsafeAppendNull();
80-
}
8169
}
8270

83-
protected:
84-
void doWrite( const Struct * s ) override
85-
{
86-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( m_fn( s ) ), "Failed to append value" );
87-
}
88-
89-
private:
90-
ArrowBuilderT * m_typedBuilder;
91-
ValueFn m_fn;
92-
};
93-
94-
// Factory: creates an UnsafeWriter, deducing ValueFn type, and returns CreatedFieldWriter
95-
template<typename ArrowBuilderT, typename ValueFn>
96-
CreatedFieldWriter makeUnsafeWriter( const std::string & name, const StructFieldPtr & field,
97-
std::shared_ptr<ArrowBuilderT> builder,
98-
std::shared_ptr<::arrow::DataType> dataType, ValueFn && fn )
99-
{
100-
auto w = std::make_unique<UnsafeWriter<ArrowBuilderT, std::decay_t<ValueFn>>>(
101-
name, field, builder, std::move( dataType ), std::forward<ValueFn>( fn ) );
102-
return { std::move( w ), std::move( builder ) };
103-
}
104-
105-
// Factory: primitive numeric writer (auto-creates builder from default constructor)
106-
template<typename CspT, typename ArrowBuilderT>
107-
CreatedFieldWriter makePrimitiveWriter( const std::string & name, const StructFieldPtr & f )
108-
{
109-
auto b = std::make_shared<ArrowBuilderT>();
110-
return makeUnsafeWriter( name, f, b, b -> type(), [f]( const Struct * s ) {
111-
return static_cast<typename ArrowBuilderT::value_type>( f -> value<CspT>( s ) );
112-
} );
113-
}
114-
115-
// Factory: nanosecond-based temporal writer (DateTime, TimeDelta, Time)
116-
template<typename CspT, typename ArrowBuilderT>
117-
CreatedFieldWriter makeNanosWriter( const std::string & name, const StructFieldPtr & f,
118-
std::shared_ptr<::arrow::DataType> dataType )
119-
{
120-
auto b = std::make_shared<ArrowBuilderT>( dataType, ::arrow::default_memory_pool() );
121-
return makeUnsafeWriter( name, f, b, std::move( dataType ), [f]( const Struct * s ) {
122-
return f -> value<CspT>( s ).asNanoseconds();
123-
} );
124-
}
125-
126-
// --- String / Bytes writer (variable-length: needs safe Append) ---
127-
128-
template<typename ArrowBuilderT>
129-
class StringLikeWriter final : public FieldWriter
130-
{
131-
public:
132-
StringLikeWriter( const std::string & columnName, const StructFieldPtr & field,
133-
std::shared_ptr<::arrow::DataType> dataType )
134-
: FieldWriter( columnName, field, std::make_shared<ArrowBuilderT>(), std::move( dataType ) ),
135-
m_typedBuilder( static_cast<ArrowBuilderT *>( m_builder.get() ) ) {}
136-
71+
// Columnar bulk path: caller has reserved, so set values use the unsafe append.
13772
void writeAll( const std::vector<StructPtr> & structs, int64_t offset, int64_t count ) override
13873
{
13974
for( int64_t i = offset; i < offset + count; ++i )
14075
{
14176
const Struct * s = structs[i].get();
14277
if( m_field -> isSet( s ) )
143-
{
144-
auto & val = m_field -> value<std::string>( s );
145-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( val.c_str(), val.length() ), "Failed to append string/bytes" );
146-
}
78+
m_kernel.appendUnsafe( m_field -> value<CspT>( s ) );
14779
else
148-
ARROW_OK_OR_THROW( m_typedBuilder -> AppendNull(), "Failed to append null" );
80+
m_kernel.appendNull();
14981
}
15082
}
15183

15284
protected:
15385
void doWrite( const Struct * s ) override
15486
{
155-
auto & val = m_field -> value<std::string>( s );
156-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( val.c_str(), val.length() ), "Failed to append string/bytes" );
87+
m_kernel.append( m_field -> value<CspT>( s ) );
15788
}
15889

15990
private:
160-
ArrowBuilderT * m_typedBuilder;
161-
};
162-
163-
// --- Enum writer (variable-length string: CspEnum → name()) ---
164-
165-
class EnumWriter final : public FieldWriter
166-
{
167-
public:
168-
EnumWriter( const std::string & columnName, const StructFieldPtr & field )
169-
: FieldWriter( columnName, field, std::make_shared<::arrow::StringBuilder>(), ::arrow::utf8() ),
170-
m_typedBuilder( static_cast<::arrow::StringBuilder *>( m_builder.get() ) ) {}
171-
172-
void writeAll( const std::vector<StructPtr> & structs, int64_t offset, int64_t count ) override
91+
// Delegating ctor: build the kernel first so its builder/dataType can seed the FieldWriter base,
92+
// then adopt the kernel (both hold the same shared builder).
93+
ScalarFieldWriter( const std::string & columnName, const StructFieldPtr & field,
94+
ArrowScalarColumnWriter<CspT> kernel )
95+
: FieldWriter( columnName, field, kernel.builder(), kernel.dataType() ),
96+
m_kernel( std::move( kernel ) )
17397
{
174-
for( int64_t i = offset; i < offset + count; ++i )
175-
{
176-
const Struct * s = structs[i].get();
177-
if( m_field -> isSet( s ) )
178-
{
179-
auto & n = m_field -> value<CspEnum>( s ).name();
180-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( n.c_str(), n.length() ), "Failed to append enum" );
181-
}
182-
else
183-
ARROW_OK_OR_THROW( m_typedBuilder -> AppendNull(), "Failed to append null" );
184-
}
18598
}
18699

187-
protected:
188-
void doWrite( const Struct * s ) override
189-
{
190-
auto & n = m_field -> value<CspEnum>( s ).name();
191-
ARROW_OK_OR_THROW( m_typedBuilder -> Append( n.c_str(), n.length() ), "Failed to append enum" );
192-
}
193-
194-
private:
195-
::arrow::StringBuilder * m_typedBuilder;
100+
ArrowScalarColumnWriter<CspT> m_kernel;
196101
};
197102

103+
template<typename CspT>
104+
CreatedFieldWriter makeScalarFieldWriter( const std::string & name, const StructFieldPtr & f, bool isBytes = false )
105+
{
106+
auto w = std::make_unique<ScalarFieldWriter<CspT>>( name, f, isBytes );
107+
auto b = w -> builder();
108+
return { std::move( w ), std::move( b ) };
109+
}
110+
198111
// --- Nested struct writer (recursive) ---
199112

200113
class NestedStructWriter final : public FieldWriter
@@ -272,14 +185,6 @@ bool isBytesField( const StructFieldPtr & field )
272185
return strType && strType -> isBytes();
273186
}
274187

275-
template<typename WriterT, typename... Args>
276-
CreatedFieldWriter makeWriter( Args &&... args )
277-
{
278-
auto w = std::make_unique<WriterT>( std::forward<Args>( args )... );
279-
auto b = w -> builder();
280-
return { std::move( w ), std::move( b ) };
281-
}
282-
283188
} // anonymous namespace
284189

285190
CreatedFieldWriter createFieldWriter(
@@ -291,50 +196,29 @@ CreatedFieldWriter createFieldWriter(
291196
switch( f -> type() -> type() )
292197
{
293198
// --- Numeric ---
294-
case CspType::Type::BOOL:
295-
{
296-
auto b = std::make_shared<::arrow::BooleanBuilder>();
297-
return makeUnsafeWriter( columnName, f, b, ::arrow::boolean(),
298-
[f]( const Struct * s ) { return f -> value<bool>( s ); } );
299-
}
300-
case CspType::Type::INT8: return makePrimitiveWriter<int8_t, ::arrow::Int8Builder>( columnName, f );
301-
case CspType::Type::INT16: return makePrimitiveWriter<int16_t, ::arrow::Int16Builder>( columnName, f );
302-
case CspType::Type::INT32: return makePrimitiveWriter<int32_t, ::arrow::Int32Builder>( columnName, f );
303-
case CspType::Type::INT64: return makePrimitiveWriter<int64_t, ::arrow::Int64Builder>( columnName, f );
304-
case CspType::Type::UINT8: return makePrimitiveWriter<uint8_t, ::arrow::UInt8Builder>( columnName, f );
305-
case CspType::Type::UINT16: return makePrimitiveWriter<uint16_t, ::arrow::UInt16Builder>( columnName, f );
306-
case CspType::Type::UINT32: return makePrimitiveWriter<uint32_t, ::arrow::UInt32Builder>( columnName, f );
307-
case CspType::Type::UINT64: return makePrimitiveWriter<uint64_t, ::arrow::UInt64Builder>( columnName, f );
308-
case CspType::Type::DOUBLE: return makePrimitiveWriter<double, ::arrow::DoubleBuilder>( columnName, f );
199+
case CspType::Type::BOOL: return makeScalarFieldWriter<bool>( columnName, f );
200+
case CspType::Type::INT8: return makeScalarFieldWriter<int8_t>( columnName, f );
201+
case CspType::Type::INT16: return makeScalarFieldWriter<int16_t>( columnName, f );
202+
case CspType::Type::INT32: return makeScalarFieldWriter<int32_t>( columnName, f );
203+
case CspType::Type::INT64: return makeScalarFieldWriter<int64_t>( columnName, f );
204+
case CspType::Type::UINT8: return makeScalarFieldWriter<uint8_t>( columnName, f );
205+
case CspType::Type::UINT16: return makeScalarFieldWriter<uint16_t>( columnName, f );
206+
case CspType::Type::UINT32: return makeScalarFieldWriter<uint32_t>( columnName, f );
207+
case CspType::Type::UINT64: return makeScalarFieldWriter<uint64_t>( columnName, f );
208+
case CspType::Type::DOUBLE: return makeScalarFieldWriter<double>( columnName, f );
309209

310210
// --- String / Bytes ---
311-
case CspType::Type::STRING:
312-
if( isBytesField( f ) )
313-
return makeWriter<StringLikeWriter<::arrow::BinaryBuilder>>( columnName, f, ::arrow::binary() );
314-
return makeWriter<StringLikeWriter<::arrow::StringBuilder>>( columnName, f, ::arrow::utf8() );
211+
case CspType::Type::STRING: return makeScalarFieldWriter<std::string>( columnName, f, isBytesField( f ) );
315212

316-
case CspType::Type::ENUM: return makeWriter<EnumWriter>( columnName, f );
213+
case CspType::Type::ENUM: return makeScalarFieldWriter<CspEnum>( columnName, f );
317214

318215
// --- Temporal ---
319-
case CspType::Type::DATETIME:
320-
return makeNanosWriter<DateTime, ::arrow::TimestampBuilder>(
321-
columnName, f, std::make_shared<::arrow::TimestampType>( ::arrow::TimeUnit::NANO, "UTC" ) );
322-
case CspType::Type::TIMEDELTA:
323-
return makeNanosWriter<TimeDelta, ::arrow::DurationBuilder>(
324-
columnName, f, std::make_shared<::arrow::DurationType>( ::arrow::TimeUnit::NANO ) );
325-
case CspType::Type::TIME:
326-
return makeNanosWriter<Time, ::arrow::Time64Builder>(
327-
columnName, f, std::make_shared<::arrow::Time64Type>( ::arrow::TimeUnit::NANO ) );
216+
case CspType::Type::DATETIME: return makeScalarFieldWriter<DateTime>( columnName, f );
217+
case CspType::Type::TIMEDELTA: return makeScalarFieldWriter<TimeDelta>( columnName, f );
218+
case CspType::Type::TIME: return makeScalarFieldWriter<Time>( columnName, f );
328219

329220
// --- Date (days since epoch) ---
330-
case CspType::Type::DATE:
331-
{
332-
auto b = std::make_shared<::arrow::Date32Builder>();
333-
return makeUnsafeWriter( columnName, f, b, ::arrow::date32(), [f]( const Struct * s ) {
334-
auto & d = f -> value<Date>( s );
335-
return static_cast<int32_t>( DateTime( d.year(), d.month(), d.day() ).asNanoseconds() / csp::NANOS_PER_DAY );
336-
} );
337-
}
221+
case CspType::Type::DATE: return makeScalarFieldWriter<Date>( columnName, f );
338222

339223
// --- Nested struct ---
340224
case CspType::Type::STRUCT:

0 commit comments

Comments
 (0)