Skip to content

Commit 40783a6

Browse files
committed
vortex-row: RowEncode ScalarFn
Add the RowEncode variadic scalar function: encode N input columns into a single ListView<u8> in a five-phase pipeline. Phase 1: size pass via `compute_sizes`. Phase 2: allocate a zero-initialized output buffer sized to fit every row's encoded bytes; bail if the total exceeds u32::MAX. Phase 3: build per-row `listview_offsets`: i * fixed_per_row for the pure-fixed case, or i * fixed_per_row + exclusive cumsum of varlen lengths otherwise. Uses the simple `Vec::push` + `checked_add` loop. Phase 4: walk columns left-to-right and call `dispatch_encode` for every column (cursor path for all). Each call writes its per-row bytes at `offsets[i] + cursors[i]` and advances the cursor. Phase 5: build the ListView<u8> via the validating `try_new` constructor. `dispatch_encode` is the canonicalize-then-`codec::field_encode` fallback; in-crate kernel arms and the inventory registry land in PR 3. The `RowEncodeKernel` trait is defined but unused. PR 2 will iterate on this pipeline (skip zero-init, skip ListView validation, auto- vectorize the offsets loop, etc.). Signed-off-by: Claude <noreply@anthropic.com>
1 parent 5374f3b commit 40783a6

4 files changed

Lines changed: 325 additions & 0 deletions

File tree

vortex-row/public-api.lock

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,46 @@ pub fn vortex_row::codec::field_size(&vortex_array::canonical::Canonical, vortex
102102

103103
pub fn vortex_row::codec::row_width_for_dtype(&vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_row::codec::RowWidth>
104104

105+
pub mod vortex_row::encode
106+
107+
pub struct vortex_row::encode::RowEncode
108+
109+
impl core::clone::Clone for vortex_row::encode::RowEncode
110+
111+
pub fn vortex_row::encode::RowEncode::clone(&self) -> vortex_row::encode::RowEncode
112+
113+
impl core::fmt::Debug for vortex_row::encode::RowEncode
114+
115+
pub fn vortex_row::encode::RowEncode::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result
116+
117+
impl vortex_array::scalar_fn::vtable::ScalarFnVTable for vortex_row::encode::RowEncode
118+
119+
pub type vortex_row::encode::RowEncode::Options = vortex_row::options::RowEncodeOptions
120+
121+
pub fn vortex_row::encode::RowEncode::arity(&self, &Self::Options) -> vortex_array::scalar_fn::vtable::Arity
122+
123+
pub fn vortex_row::encode::RowEncode::child_name(&self, &Self::Options, usize) -> vortex_array::scalar_fn::vtable::ChildName
124+
125+
pub fn vortex_row::encode::RowEncode::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
126+
127+
pub fn vortex_row::encode::RowEncode::execute(&self, &Self::Options, &dyn vortex_array::scalar_fn::vtable::ExecutionArgs, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>
128+
129+
pub fn vortex_row::encode::RowEncode::id(&self) -> vortex_array::scalar_fn::ScalarFnId
130+
131+
pub fn vortex_row::encode::RowEncode::is_fallible(&self, &Self::Options) -> bool
132+
133+
pub fn vortex_row::encode::RowEncode::is_null_sensitive(&self, &Self::Options) -> bool
134+
135+
pub fn vortex_row::encode::RowEncode::return_dtype(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<vortex_array::dtype::DType>
136+
137+
pub fn vortex_row::encode::RowEncode::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
138+
139+
pub trait vortex_row::encode::RowEncodeKernel: vortex_array::array::vtable::VTable
140+
141+
pub fn vortex_row::encode::RowEncodeKernel::row_encode_into(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &[u32], &mut [u32], &mut [u8], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<()>>
142+
143+
pub fn vortex_row::encode::dispatch_encode(&vortex_array::array::erased::ArrayRef, vortex_row::options::SortField, &[u32], &mut [u32], &mut [u8], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()>
144+
105145
pub mod vortex_row::options
106146

107147
pub struct vortex_row::options::RowEncodeOptions
@@ -222,6 +262,38 @@ pub fn vortex_row::size::RowSizeKernel::row_size_contribution(vortex_array::arra
222262

223263
pub fn vortex_row::size::dispatch_size(&vortex_array::array::erased::ArrayRef, vortex_row::options::SortField, &mut [u32], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()>
224264

265+
pub struct vortex_row::RowEncode
266+
267+
impl core::clone::Clone for vortex_row::encode::RowEncode
268+
269+
pub fn vortex_row::encode::RowEncode::clone(&self) -> vortex_row::encode::RowEncode
270+
271+
impl core::fmt::Debug for vortex_row::encode::RowEncode
272+
273+
pub fn vortex_row::encode::RowEncode::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result
274+
275+
impl vortex_array::scalar_fn::vtable::ScalarFnVTable for vortex_row::encode::RowEncode
276+
277+
pub type vortex_row::encode::RowEncode::Options = vortex_row::options::RowEncodeOptions
278+
279+
pub fn vortex_row::encode::RowEncode::arity(&self, &Self::Options) -> vortex_array::scalar_fn::vtable::Arity
280+
281+
pub fn vortex_row::encode::RowEncode::child_name(&self, &Self::Options, usize) -> vortex_array::scalar_fn::vtable::ChildName
282+
283+
pub fn vortex_row::encode::RowEncode::deserialize(&self, &[u8], &vortex_session::VortexSession) -> vortex_error::VortexResult<Self::Options>
284+
285+
pub fn vortex_row::encode::RowEncode::execute(&self, &Self::Options, &dyn vortex_array::scalar_fn::vtable::ExecutionArgs, &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>
286+
287+
pub fn vortex_row::encode::RowEncode::id(&self) -> vortex_array::scalar_fn::ScalarFnId
288+
289+
pub fn vortex_row::encode::RowEncode::is_fallible(&self, &Self::Options) -> bool
290+
291+
pub fn vortex_row::encode::RowEncode::is_null_sensitive(&self, &Self::Options) -> bool
292+
293+
pub fn vortex_row::encode::RowEncode::return_dtype(&self, &Self::Options, &[vortex_array::dtype::DType]) -> vortex_error::VortexResult<vortex_array::dtype::DType>
294+
295+
pub fn vortex_row::encode::RowEncode::serialize(&self, &Self::Options) -> vortex_error::VortexResult<core::option::Option<alloc::vec::Vec<u8>>>
296+
225297
pub struct vortex_row::RowEncodeOptions
226298

227299
pub vortex_row::RowEncodeOptions::fields: smallvec::SmallVec<[vortex_row::options::SortField; 4]>
@@ -330,6 +402,10 @@ impl core::marker::Copy for vortex_row::options::SortField
330402

331403
impl core::marker::StructuralPartialEq for vortex_row::options::SortField
332404

405+
pub trait vortex_row::RowEncodeKernel: vortex_array::array::vtable::VTable
406+
407+
pub fn vortex_row::RowEncodeKernel::row_encode_into(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &[u32], &mut [u32], &mut [u8], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<()>>
408+
333409
pub trait vortex_row::RowSizeKernel: vortex_array::array::vtable::VTable
334410

335411
pub fn vortex_row::RowSizeKernel::row_size_contribution(vortex_array::array::view::ArrayView<'_, Self>, vortex_row::options::SortField, &mut [u32], &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<core::option::Option<()>>

vortex-row/src/encode.rs

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
#![allow(
5+
clippy::cast_possible_truncation,
6+
reason = "row encoding indexes into u32-sized buffers; lengths are validated to fit in u32"
7+
)]
8+
9+
//! `RowEncode` variadic scalar function: encode N input columns into a single `ListView<u8>`.
10+
//!
11+
//! The output's `(elements, offsets, sizes)` triple is built up in a single left-to-right
12+
//! pass over the input columns. The `sizes` array doubles as the per-row write cursor, so
13+
//! when the last column finishes encoding, the accumulator is the final array - no separate
14+
//! conversion step is needed.
15+
16+
use std::sync::Arc;
17+
18+
use vortex_array::ArrayRef;
19+
use vortex_array::ArrayView;
20+
use vortex_array::Canonical;
21+
use vortex_array::ExecutionCtx;
22+
use vortex_array::IntoArray;
23+
use vortex_array::VTable;
24+
use vortex_array::arrays::ListViewArray;
25+
use vortex_array::arrays::PrimitiveArray;
26+
use vortex_array::dtype::DType;
27+
use vortex_array::dtype::Nullability;
28+
use vortex_array::dtype::PType;
29+
use vortex_array::scalar_fn::Arity;
30+
use vortex_array::scalar_fn::ChildName;
31+
use vortex_array::scalar_fn::ExecutionArgs;
32+
use vortex_array::scalar_fn::ScalarFnId;
33+
use vortex_array::scalar_fn::ScalarFnVTable;
34+
use vortex_array::validity::Validity;
35+
use vortex_buffer::Buffer;
36+
use vortex_buffer::BufferMut;
37+
use vortex_error::VortexExpect;
38+
use vortex_error::VortexResult;
39+
use vortex_error::vortex_bail;
40+
use vortex_session::VortexSession;
41+
42+
use crate::codec;
43+
use crate::options::RowEncodeOptions;
44+
use crate::options::SortField;
45+
use crate::options::deserialize_row_encode_options;
46+
use crate::options::serialize_row_encode_options;
47+
use crate::size::compute_sizes;
48+
49+
/// Variadic scalar function that encodes N input columns into a single `List<u8>`
50+
/// [`ListViewArray`] where row `i` contains the row-encoded bytes for column values
51+
/// `cols[0][i], cols[1][i], ...` concatenated left-to-right.
52+
#[derive(Clone, Debug)]
53+
pub struct RowEncode;
54+
55+
impl ScalarFnVTable for RowEncode {
56+
type Options = RowEncodeOptions;
57+
58+
fn id(&self) -> ScalarFnId {
59+
ScalarFnId::from("vortex.row_encode")
60+
}
61+
62+
fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
63+
Ok(Some(serialize_row_encode_options(options)))
64+
}
65+
66+
fn deserialize(
67+
&self,
68+
metadata: &[u8],
69+
_session: &VortexSession,
70+
) -> VortexResult<Self::Options> {
71+
deserialize_row_encode_options(metadata)
72+
}
73+
74+
fn arity(&self, _options: &Self::Options) -> Arity {
75+
Arity::Variadic { min: 1, max: None }
76+
}
77+
78+
fn child_name(&self, _options: &Self::Options, child_idx: usize) -> ChildName {
79+
ChildName::from(Arc::from(format!("col_{}", child_idx)))
80+
}
81+
82+
fn return_dtype(&self, _options: &Self::Options, _args: &[DType]) -> VortexResult<DType> {
83+
Ok(DType::List(
84+
Arc::new(DType::Primitive(PType::U8, Nullability::NonNullable)),
85+
Nullability::NonNullable,
86+
))
87+
}
88+
89+
fn execute(
90+
&self,
91+
options: &Self::Options,
92+
args: &dyn ExecutionArgs,
93+
ctx: &mut ExecutionCtx,
94+
) -> VortexResult<ArrayRef> {
95+
execute_row_encode(options, args, ctx)
96+
}
97+
98+
fn is_null_sensitive(&self, _options: &Self::Options) -> bool {
99+
true
100+
}
101+
102+
fn is_fallible(&self, _options: &Self::Options) -> bool {
103+
false
104+
}
105+
}
106+
107+
fn execute_row_encode(
108+
options: &RowEncodeOptions,
109+
args: &dyn ExecutionArgs,
110+
ctx: &mut ExecutionCtx,
111+
) -> VortexResult<ArrayRef> {
112+
let nrows = args.row_count();
113+
114+
// ===== Phase 1: classify + size pass =====
115+
let crate::size::SizePassResult {
116+
fixed_per_row,
117+
var_lengths,
118+
col_kinds: _,
119+
first_varlen_idx: _,
120+
columns,
121+
} = compute_sizes(options, args, ctx, "RowEncode")?;
122+
123+
// ===== Phase 2: totals + buffer =====
124+
let var_total: u64 = var_lengths
125+
.as_ref()
126+
.map_or(0, |v| v.iter().map(|&x| u64::from(x)).sum());
127+
let total: u64 = (nrows as u64)
128+
.checked_mul(u64::from(fixed_per_row))
129+
.and_then(|t| t.checked_add(var_total))
130+
.vortex_expect("row-encoded total bytes overflow");
131+
if total > u32::MAX as u64 {
132+
vortex_bail!("row-encoded output size {} bytes exceeds u32::MAX", total);
133+
}
134+
let total_len = total as usize;
135+
136+
// Allocate the elements buffer (zero-initialized). The zero-init lets every encoder
137+
// assume previously-untouched bytes are zero, simplifying the null-row fill paths.
138+
// PR 2 skips this memset because every byte in the output range is written by some
139+
// encoder.
140+
let mut out_buf: BufferMut<u8> = BufferMut::with_capacity(total_len);
141+
out_buf.push_n(0u8, total_len);
142+
143+
// ===== Phase 3: per-row offsets =====
144+
// listview_offsets[i] is the absolute byte offset where row `i` begins.
145+
// For pure-fixed: i * fixed_per_row.
146+
// For mixed: i * fixed_per_row + exclusive prefix sum of var_lengths.
147+
let mut listview_offsets: Vec<u32> = Vec::with_capacity(nrows);
148+
match var_lengths.as_ref() {
149+
None => {
150+
for i in 0..nrows {
151+
listview_offsets.push(
152+
(i as u32)
153+
.checked_mul(fixed_per_row)
154+
.vortex_expect("row offset overflow (already validated total fits in u32)"),
155+
);
156+
}
157+
}
158+
Some(v) => {
159+
let mut acc: u32 = 0;
160+
for (i, &l) in v.iter().enumerate() {
161+
let off = (i as u32)
162+
.checked_mul(fixed_per_row)
163+
.and_then(|t| t.checked_add(acc))
164+
.vortex_expect("row offset overflow");
165+
listview_offsets.push(off);
166+
acc = acc.checked_add(l).vortex_expect("varlen prefix overflow");
167+
}
168+
}
169+
}
170+
171+
// Per-row write cursor (also doubles as the ListView `sizes` slot when done).
172+
let mut row_cursors = vec![0u32; nrows];
173+
174+
// ===== Phase 4: encode columns via the cursor path =====
175+
for (i, col) in columns.iter().enumerate() {
176+
dispatch_encode(
177+
col,
178+
options.fields[i],
179+
&listview_offsets,
180+
&mut row_cursors,
181+
&mut out_buf,
182+
ctx,
183+
)?;
184+
}
185+
186+
// ===== Phase 5: build ListView output =====
187+
let elements = PrimitiveArray::new(out_buf.freeze(), Validity::NonNullable).into_array();
188+
let offsets_arr = PrimitiveArray::new(
189+
Buffer::<u32>::copy_from(&listview_offsets),
190+
Validity::NonNullable,
191+
)
192+
.into_array();
193+
let sizes_arr = PrimitiveArray::new(
194+
Buffer::<u32>::copy_from(&row_cursors),
195+
Validity::NonNullable,
196+
)
197+
.into_array();
198+
Ok(
199+
ListViewArray::try_new(elements, offsets_arr, sizes_arr, Validity::NonNullable)?
200+
.into_array(),
201+
)
202+
}
203+
204+
/// Dispatch a single column's encoding into the shared `out` buffer.
205+
///
206+
/// For PR 1 this is just the canonicalize-then-`codec::field_encode` fallback path.
207+
/// In-crate fast paths for `Constant`/`Dict`/`Patched` and the inventory-based registry
208+
/// for downstream encodings are added in PR 3.
209+
pub fn dispatch_encode(
210+
col: &ArrayRef,
211+
field: SortField,
212+
offsets: &[u32],
213+
cursors: &mut [u32],
214+
out: &mut [u8],
215+
ctx: &mut ExecutionCtx,
216+
) -> VortexResult<()> {
217+
let canonical = col.clone().execute::<Canonical>(ctx)?;
218+
codec::field_encode(&canonical, field, offsets, cursors, out, ctx)
219+
}
220+
221+
/// Mutate-buffer kernel: write this column's per-row bytes into `out` at
222+
/// `offsets[i] + cursors[i]`, advancing `cursors[i]` by the bytes written.
223+
///
224+
/// Return `Ok(None)` to decline and fall back to the canonical path.
225+
///
226+
/// Trait is defined now; per-encoding impls and dispatch wiring land in PR 3.
227+
pub trait RowEncodeKernel: VTable {
228+
/// Write this column's per-row bytes into `out` at `offsets[i] + cursors[i]`, advancing
229+
/// `cursors[i]` by the bytes written.
230+
fn row_encode_into(
231+
column: ArrayView<'_, Self>,
232+
field: SortField,
233+
offsets: &[u32],
234+
cursors: &mut [u32],
235+
out: &mut [u8],
236+
ctx: &mut ExecutionCtx,
237+
) -> VortexResult<Option<()>>;
238+
}

vortex-row/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
//! This commit only establishes the crate skeleton and an `initialize` stub.
88
99
pub mod codec;
10+
pub mod encode;
1011
pub mod options;
1112
pub mod size;
1213

14+
pub use encode::RowEncode;
15+
pub use encode::RowEncodeKernel;
1316
pub use options::RowEncodeOptions;
1417
pub use options::SortField;
1518
pub use size::RowSize;

vortex-row/src/size.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,15 @@ pub(crate) enum ColKind {
7272
pub(crate) struct SizePassResult {
7373
pub fixed_per_row: u32,
7474
pub var_lengths: Option<Vec<u32>>,
75+
#[allow(
76+
dead_code,
77+
reason = "consumed by the arithmetic-write fast path added in PR 2"
78+
)]
7579
pub col_kinds: Vec<ColKind>,
80+
#[allow(
81+
dead_code,
82+
reason = "consumed by the arithmetic-write fast path added in PR 2"
83+
)]
7684
pub first_varlen_idx: Option<usize>,
7785
pub columns: Vec<ArrayRef>,
7886
}

0 commit comments

Comments
 (0)