Skip to content

Commit 2711504

Browse files
committed
vortex-row: arithmetic-write fast path for fixed-before-varlen columns
Classify each column in the size pass (`ColKind` + `first_varlen_idx`): a fixed-width column with no varlen column before it has a constant within-row offset, so its write position is pure arithmetic (`i * fixed_per_row + prefix + var_prefix[i]`) with no per-row cursor. Route those columns through `field_encode_fixed_arithmetic`; the cursor path is seeded to start at the first varlen column. Primitive columns in the pure-fixed case use a `chunks_exact_mut` hot loop (matching arrow-row's not-null path); all other fixed types reuse the cursor encoder at the computed offsets, so output is byte-identical. Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 65a24f9 commit 2711504

3 files changed

Lines changed: 229 additions & 21 deletions

File tree

vortex-row/src/codec.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,59 @@ pub(crate) fn field_size(
269269
Ok(())
270270
}
271271

272+
/// Encode a fixed-width column at arithmetic offsets, without reading or writing any per-row
273+
/// cursor.
274+
///
275+
/// For row `i`, the column's bytes are written starting at `i * row_stride + col_prefix
276+
/// (+ var_prefix[i])`, where `var_prefix` is the exclusive prefix sum of the varlen
277+
/// contributions (`None` when the row layout has no variable-length columns). This is the
278+
/// fast path for fixed-width columns that appear before any varlen column, so their
279+
/// within-row position is a constant offset rather than a running cursor.
280+
///
281+
/// For primitive columns in the pure-fixed case it uses a `chunks_exact_mut` hot loop that
282+
/// removes the per-row offset/cursor indirection (matching `arrow-row`'s `encode_not_null`).
283+
/// All other types reuse [`field_encode`] at the materialized offsets, so the bytes written
284+
/// are byte-identical to the cursor path.
285+
#[allow(clippy::too_many_arguments)]
286+
pub(crate) fn field_encode_fixed_arithmetic(
287+
canonical: &Canonical,
288+
field: RowSortField,
289+
col_prefix: u32,
290+
row_stride: u32,
291+
var_prefix: Option<&[u32]>,
292+
nrows: usize,
293+
out: &mut [u8],
294+
ctx: &mut ExecutionCtx,
295+
) -> VortexResult<()> {
296+
if var_prefix.is_none()
297+
&& let Canonical::Primitive(arr) = canonical
298+
{
299+
return encode_primitive_arith(arr, field, col_prefix, row_stride, out, ctx);
300+
}
301+
302+
// General path: materialize this column's per-row start offsets and reuse the cursor
303+
// encoder with zero-initialized cursors, so every row is written at its arithmetic
304+
// offset with the exact same bytes the cursor path would produce.
305+
let mut offsets: Vec<u32> = Vec::with_capacity(nrows);
306+
let mut base = col_prefix;
307+
match var_prefix {
308+
None => {
309+
for _ in 0..nrows {
310+
offsets.push(base);
311+
base = base.wrapping_add(row_stride);
312+
}
313+
}
314+
Some(vp) => {
315+
for &p in vp.iter().take(nrows) {
316+
offsets.push(base.wrapping_add(p));
317+
base = base.wrapping_add(row_stride);
318+
}
319+
}
320+
}
321+
let mut cursors = vec![0u32; nrows];
322+
field_encode(canonical, field, &offsets, &mut cursors, out, ctx)
323+
}
324+
272325
/// Encode each row's bytes for the given canonical view into `out`, writing starting at
273326
/// `offsets[i] + cursors[i]` for row `i` and advancing `cursors[i]` by the number of
274327
/// bytes written.
@@ -958,6 +1011,68 @@ fn encode_extension(
9581011
field_encode(&storage, field, row_offsets, col_offset, out, ctx)
9591012
}
9601013

1014+
/// Arithmetic-write primitive encoder: writes each row's `sentinel + value` slot at a
1015+
/// constant within-row offset, iterating the output in `row_stride`-sized chunks so the
1016+
/// compiler can drop the per-row offset/cursor indirection.
1017+
fn encode_primitive_arith(
1018+
arr: &PrimitiveArray,
1019+
field: RowSortField,
1020+
col_prefix: u32,
1021+
row_stride: u32,
1022+
out: &mut [u8],
1023+
ctx: &mut ExecutionCtx,
1024+
) -> VortexResult<()> {
1025+
match_each_native_ptype!(arr.ptype(), |T| {
1026+
encode_primitive_arith_typed::<T>(arr, field, col_prefix, row_stride, out, ctx)?;
1027+
});
1028+
Ok(())
1029+
}
1030+
1031+
fn encode_primitive_arith_typed<T: NativePType + RowEncode>(
1032+
arr: &PrimitiveArray,
1033+
field: RowSortField,
1034+
col_prefix: u32,
1035+
row_stride: u32,
1036+
out: &mut [u8],
1037+
ctx: &mut ExecutionCtx,
1038+
) -> VortexResult<()> {
1039+
let slice: &[T] = arr.as_slice();
1040+
let non_null = field.non_null_sentinel();
1041+
let value_bytes = size_of::<T>();
1042+
let slot_size = 1 + value_bytes;
1043+
let stride = row_stride as usize;
1044+
let prefix = col_prefix as usize;
1045+
let descending = field.descending;
1046+
1047+
match resolve_validity(arr.as_ref().validity()?, arr.len(), ctx)? {
1048+
ValidityKind::AllValid => {
1049+
// Hot path: each row's slot is a fixed window inside its `stride`-sized chunk,
1050+
// so the inner write vectorizes the same way as `arrow-row`'s not-null path.
1051+
for (chunk, &v) in out.chunks_exact_mut(stride).zip(slice.iter()) {
1052+
let slot = &mut chunk[prefix..prefix + slot_size];
1053+
slot[0] = non_null;
1054+
v.encode_to(&mut slot[1..], descending);
1055+
}
1056+
}
1057+
ValidityKind::Mask(mask) => {
1058+
let null = field.null_sentinel();
1059+
for (i, (chunk, &v)) in out.chunks_exact_mut(stride).zip(slice.iter()).enumerate() {
1060+
let slot = &mut chunk[prefix..prefix + slot_size];
1061+
if mask.value(i) {
1062+
slot[0] = non_null;
1063+
v.encode_to(&mut slot[1..], descending);
1064+
} else {
1065+
slot[0] = null;
1066+
for b in &mut slot[1..] {
1067+
*b = 0;
1068+
}
1069+
}
1070+
}
1071+
}
1072+
}
1073+
Ok(())
1074+
}
1075+
9611076
/// Encode a non-empty variable-length byte slice into `out` in 32-byte blocks with
9621077
/// continuation/length markers. Returns the number of bytes written. Empty values are
9631078
/// encoded by the caller as a single sentinel byte and never reach this function.

vortex-row/src/encode.rs

Lines changed: 75 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::codec;
3434
use crate::options::RowEncodingOptions;
3535
use crate::options::deserialize_row_encoding_options;
3636
use crate::options::serialize_row_encoding_options;
37+
use crate::size::ColKind;
3738
use crate::size::compute_sizes;
3839

3940
/// Variadic scalar function that encodes N input columns into a single `List<u8>`
@@ -112,6 +113,8 @@ fn execute_row_encode(
112113
let crate::size::SizePassResult {
113114
fixed_per_row,
114115
var_lengths,
116+
col_kinds,
117+
first_varlen_idx,
115118
columns,
116119
} = compute_sizes(options, args, ctx)?;
117120

@@ -149,53 +152,107 @@ fn execute_row_encode(
149152
// listview_offsets[i] is the absolute byte offset where row `i` begins.
150153
// For pure-fixed: i * fixed_per_row.
151154
// For mixed: i * fixed_per_row + exclusive prefix sum of var_lengths.
155+
//
156+
// When fixed-before-varlen columns coexist with a varlen column, we additionally build
157+
// `var_prefix_for_arith[i] = exclusive cumsum of var_lengths[..i]` and hand it to the
158+
// arithmetic encoders so they can compute per-row write positions without a cursor.
159+
let need_arith_prefix = first_varlen_idx.is_some()
160+
&& col_kinds.iter().any(|k| {
161+
matches!(
162+
k,
163+
ColKind::Fixed {
164+
before_varlen: true,
165+
..
166+
}
167+
)
168+
});
169+
152170
// Build directly into a BufferMut to avoid a Vec→Buffer copy at the end.
153171
let mut listview_offsets: BufferMut<u32> = BufferMut::with_capacity(nrows);
154172
// SAFETY: `nrows` of capacity reserved above; every index in `[0, nrows)` is written
155173
// before the buffer is read out. `nrows` was validated to fit `u32` at function entry,
156-
// so `i as u32` below is exact and the multiplications can't overflow.
174+
// so the `0u32..` counters below are exact and the multiplications can't overflow.
157175
unsafe { listview_offsets.set_len(nrows) };
158176
let off = listview_offsets.as_mut_slice();
177+
let mut var_prefix_for_arith: Option<Vec<u32>> = None;
159178
match var_lengths.as_ref() {
160179
None => {
161180
// Pure-fixed: offsets[i] = i * fixed_per_row. Zipping against a `u32` counter
162-
// elides per-element bounds checks (and avoids a per-element `usize as u32`
163-
// cast), so LLVM auto-vectorizes this multiply. `nrows` fits u32, so the counter
164-
// never overflows.
181+
// elides per-element bounds checks, so LLVM auto-vectorizes this multiply.
165182
for (slot, i) in off.iter_mut().zip(0u32..) {
166183
*slot = i * fixed_per_row;
167184
}
168185
}
169186
Some(v) => {
170187
// Mixed: offsets[i] = i * fixed_per_row + var_prefix[i], where var_prefix is the
171-
// exclusive cumsum of varlen lengths. `iter_mut().zip` elides per-element bounds
172-
// checks; the total was validated to fit u32 upstream so the wrapping arithmetic
173-
// is exact (it never actually wraps).
188+
// exclusive cumsum of varlen lengths. The total was validated to fit u32 upstream
189+
// so the wrapping arithmetic is exact (it never actually wraps).
190+
let mut vp: Option<Vec<u32>> = need_arith_prefix.then(|| Vec::with_capacity(nrows));
174191
let mut acc: u32 = 0;
175192
for ((slot, &l), i) in off.iter_mut().zip(v.iter()).zip(0u32..) {
193+
if let Some(p) = vp.as_mut() {
194+
p.push(acc);
195+
}
176196
*slot = i.wrapping_mul(fixed_per_row).wrapping_add(acc);
177197
acc = acc.wrapping_add(l);
178198
}
199+
var_prefix_for_arith = vp;
179200
}
180201
}
181202
let listview_offsets_slice: &[u32] = listview_offsets.as_slice();
182203

183204
// Per-row write cursor (also doubles as the ListView `sizes` slot when done). We build
184205
// it as a BufferMut so we can hand it directly to the output PrimitiveArray.
206+
//
207+
// The cursor path begins at the first cursor-path column. Fixed-before-varlen columns
208+
// are written by the arithmetic path and do not touch the cursor, so the cursor is
209+
// pre-seeded with the within-row offset of the first varlen column (its `fixed_prefix`).
210+
// When there are no varlen columns at all, every column takes the arithmetic path and
211+
// the cursor loop runs zero iterations; seeding with `fixed_per_row` then leaves the
212+
// cursors already correct as per-row sizes.
213+
let initial_cursor: u32 = match first_varlen_idx {
214+
Some(idx) => match col_kinds[idx] {
215+
ColKind::Variable { fixed_prefix } => fixed_prefix,
216+
ColKind::Fixed { .. } => unreachable!("first_varlen_idx points at a varlen column"),
217+
},
218+
None => fixed_per_row,
219+
};
185220
let mut row_cursors: BufferMut<u32> = BufferMut::with_capacity(nrows);
186-
row_cursors.push_n(0u32, nrows);
221+
row_cursors.push_n(initial_cursor, nrows);
187222

188-
// ===== Phase 4: encode columns via the cursor path =====
189-
// Each column was canonicalized once during the size pass; reuse that canonical form.
223+
// ===== Phase 4: encode columns =====
224+
// Fixed-before-varlen columns take the arithmetic-write path (constant within-row
225+
// offset, no cursor mutation). Fixed-after-varlen and varlen columns take the cursor
226+
// path. Each column was canonicalized once during the size pass; reuse that form.
190227
for (i, canonical) in columns.iter().enumerate() {
191-
codec::field_encode(
192-
canonical,
193-
options.fields[i],
194-
listview_offsets_slice,
195-
row_cursors.as_mut_slice(),
196-
&mut out_buf,
197-
ctx,
198-
)?;
228+
match col_kinds[i] {
229+
ColKind::Fixed {
230+
prefix,
231+
before_varlen: true,
232+
..
233+
} => {
234+
codec::field_encode_fixed_arithmetic(
235+
canonical,
236+
options.fields[i],
237+
prefix,
238+
fixed_per_row,
239+
var_prefix_for_arith.as_deref(),
240+
nrows,
241+
&mut out_buf,
242+
ctx,
243+
)?;
244+
}
245+
ColKind::Fixed { .. } | ColKind::Variable { .. } => {
246+
codec::field_encode(
247+
canonical,
248+
options.fields[i],
249+
listview_offsets_slice,
250+
row_cursors.as_mut_slice(),
251+
&mut out_buf,
252+
ctx,
253+
)?;
254+
}
255+
}
199256
}
200257

201258
// ===== Phase 5: build ListView output =====

vortex-row/src/size.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,24 @@ use crate::options::RowEncodingOptions;
3636
use crate::options::deserialize_row_encoding_options;
3737
use crate::options::serialize_row_encoding_options;
3838

39+
/// Classification of a single input column for the size pass.
40+
///
41+
/// Tracks each column's within-row byte offset (the constant prefix from all preceding
42+
/// fixed-width columns) and, for fixed columns, whether any variable-length column has
43+
/// appeared yet — the encode pass uses this to choose between the arithmetic-write fast
44+
/// path (no varlen before this column, so the within-row position is constant per row) and
45+
/// the cursor-write path.
46+
#[derive(Clone, Copy, Debug)]
47+
pub(crate) enum ColKind {
48+
/// Fixed-width column. `prefix` is the within-row byte offset of this column's first
49+
/// byte. When `before_varlen` is true no variable-length column precedes this one, so the
50+
/// within-row offset is constant for every row.
51+
Fixed { prefix: u32, before_varlen: bool },
52+
/// Column has variable per-row width. `fixed_prefix` is the sum of widths of all
53+
/// preceding fixed columns; the contribution of earlier varlen columns is added per row.
54+
Variable { fixed_prefix: u32 },
55+
}
56+
3957
/// Result of the size pass: enough information for both [`RowSize::execute`] and the
4058
/// downstream [`RowEncode`](super::encode::RowEncode) pipeline.
4159
///
@@ -45,6 +63,8 @@ use crate::options::serialize_row_encoding_options;
4563
pub(crate) struct SizePassResult {
4664
pub fixed_per_row: u32,
4765
pub var_lengths: Option<Vec<u32>>,
66+
pub col_kinds: Vec<ColKind>,
67+
pub first_varlen_idx: Option<usize>,
4868
pub columns: Vec<Canonical>,
4969
}
5070

@@ -77,8 +97,11 @@ pub(crate) fn compute_sizes(
7797
let nrows = args.row_count();
7898

7999
let mut columns: Vec<Canonical> = Vec::with_capacity(n_inputs);
100+
let mut col_kinds: Vec<ColKind> = Vec::with_capacity(n_inputs);
80101
let mut fixed_per_row: u32 = 0;
81102
let mut var_lengths: Option<Vec<u32>> = None;
103+
let mut first_varlen_idx: Option<usize> = None;
104+
let mut running_fixed_prefix: u32 = 0;
82105

83106
for i in 0..n_inputs {
84107
let col = args.get(i)?;
@@ -95,13 +118,24 @@ pub(crate) fn compute_sizes(
95118
let canonical = col.execute::<Canonical>(ctx)?;
96119
match width {
97120
RowWidth::Fixed(w) => {
98-
fixed_per_row = fixed_per_row.checked_add(w).ok_or_else(|| {
99-
vortex_error::vortex_err!("per-row fixed width overflows u32 at column {}", i)
100-
})?;
121+
col_kinds.push(ColKind::Fixed {
122+
prefix: running_fixed_prefix,
123+
before_varlen: first_varlen_idx.is_none(),
124+
});
125+
let overflow =
126+
|| vortex_error::vortex_err!("per-row fixed width overflows u32 at column {i}");
127+
fixed_per_row = fixed_per_row.checked_add(w).ok_or_else(overflow)?;
128+
running_fixed_prefix = running_fixed_prefix.checked_add(w).ok_or_else(overflow)?;
101129
}
102130
RowWidth::Variable => {
131+
if first_varlen_idx.is_none() {
132+
first_varlen_idx = Some(i);
133+
}
103134
let v = var_lengths.get_or_insert_with(|| vec![0u32; nrows]);
104135
codec::field_size(&canonical, options.fields[i], v, ctx)?;
136+
col_kinds.push(ColKind::Variable {
137+
fixed_prefix: running_fixed_prefix,
138+
});
105139
}
106140
}
107141
columns.push(canonical);
@@ -110,6 +144,8 @@ pub(crate) fn compute_sizes(
110144
Ok(SizePassResult {
111145
fixed_per_row,
112146
var_lengths,
147+
col_kinds,
148+
first_varlen_idx,
113149
columns,
114150
})
115151
}

0 commit comments

Comments
 (0)