Skip to content

Commit 047ede0

Browse files
committed
PatchedArray: basics and wiring
Signed-off-by: Andrew Duffy <andrew@a10y.dev>
1 parent 20070d2 commit 047ede0

File tree

12 files changed

+1419
-0
lines changed

12 files changed

+1419
-0
lines changed

vortex-array/src/arrays/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ pub mod null;
6666
pub use null::Null;
6767
pub use null::NullArray;
6868

69+
pub mod patched;
70+
pub use patched::Patched;
71+
pub use patched::PatchedArray;
72+
6973
pub mod primitive;
7074
pub use primitive::Primitive;
7175
pub use primitive::PrimitiveArray;
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::ops::Range;
5+
6+
use vortex_buffer::Buffer;
7+
use vortex_buffer::BufferMut;
8+
use vortex_error::VortexResult;
9+
use vortex_error::vortex_ensure;
10+
11+
use crate::ArrayRef;
12+
use crate::Canonical;
13+
use crate::DynArray;
14+
use crate::ExecutionCtx;
15+
use crate::arrays::patched::PatchAccessor;
16+
use crate::arrays::patched::TransposedPatches;
17+
use crate::arrays::patched::patch_lanes;
18+
use crate::buffer::BufferHandle;
19+
use crate::dtype::IntegerPType;
20+
use crate::dtype::NativePType;
21+
use crate::dtype::PType;
22+
use crate::match_each_native_ptype;
23+
use crate::match_each_unsigned_integer_ptype;
24+
use crate::patches::Patches;
25+
use crate::stats::ArrayStats;
26+
27+
/// An array that partially "patches" another array with new values.
28+
///
29+
/// Patched arrays implement the set of nodes that do this instead here...I think?
30+
#[derive(Debug, Clone)]
31+
pub struct PatchedArray {
32+
/// The inner array that is being patched. This is the zeroth child.
33+
pub(super) inner: ArrayRef,
34+
35+
/// Number of 1024-element chunks. Pre-computed for convenience.
36+
pub(super) n_chunks: usize,
37+
38+
/// Number of lanes the patch indices and values have been split into. Each of the `n_chunks`
39+
/// of 1024 values is split into `n_lanes` lanes horizontally, each lane having 1024 / n_lanes
40+
/// values that might be patched.
41+
pub(super) n_lanes: usize,
42+
43+
/// Offset into the first chunk
44+
pub(super) offset: usize,
45+
/// Total length.
46+
pub(super) len: usize,
47+
48+
/// lane offsets. The PType of these MUST be u32
49+
pub(super) lane_offsets: BufferHandle,
50+
/// indices within a 1024-element chunk. The PType of these MUST be u16
51+
pub(super) indices: BufferHandle,
52+
/// patch values corresponding to the indices. The ptype is specified by `values_ptype`.
53+
pub(super) values: BufferHandle,
54+
/// PType of the scalars in `values`. Can be any native type.
55+
pub(super) values_ptype: PType,
56+
57+
pub(super) stats_set: ArrayStats,
58+
}
59+
60+
impl PatchedArray {
61+
pub fn from_array_and_patches(
62+
inner: ArrayRef,
63+
patches: &Patches,
64+
ctx: &mut ExecutionCtx,
65+
) -> VortexResult<Self> {
66+
vortex_ensure!(
67+
inner.dtype().eq_with_nullability_superset(patches.dtype()),
68+
"array DType must match patches DType"
69+
);
70+
71+
let values_ptype = patches.dtype().as_ptype();
72+
73+
let TransposedPatches {
74+
n_chunks,
75+
n_lanes,
76+
lane_offsets,
77+
indices,
78+
values,
79+
} = transpose_patches(patches, ctx)?;
80+
81+
let len = inner.len();
82+
83+
Ok(Self {
84+
inner,
85+
n_chunks,
86+
n_lanes,
87+
values_ptype,
88+
offset: 0,
89+
len,
90+
lane_offsets: BufferHandle::new_host(lane_offsets),
91+
indices: BufferHandle::new_host(indices),
92+
values: BufferHandle::new_host(values),
93+
stats_set: ArrayStats::default(),
94+
})
95+
}
96+
97+
/// Get an accessor, which allows ranged access to patches by chunk/lane.
98+
pub fn accessor<V: NativePType>(&self) -> PatchAccessor<'_, V> {
99+
PatchAccessor {
100+
n_lanes: self.n_lanes,
101+
lane_offsets: self.lane_offsets.as_host().reinterpret::<u32>(),
102+
indices: self.indices.as_host().reinterpret::<u16>(),
103+
values: self.values.as_host().reinterpret::<V>(),
104+
}
105+
}
106+
107+
/// Slice the array to just the patches and inner values that are within the chunk range.
108+
pub(crate) fn slice_chunks(&self, chunks: Range<usize>) -> VortexResult<Self> {
109+
let lane_offsets_start = chunks.start * self.n_lanes;
110+
let lane_offsets_stop = chunks.end * self.n_lanes + 1;
111+
112+
let sliced_lane_offsets = self
113+
.lane_offsets
114+
.slice_typed::<u32>(lane_offsets_start..lane_offsets_stop);
115+
let indices = self.indices.clone();
116+
let values = self.values.clone();
117+
118+
let begin = (chunks.start * 1024).max(self.offset);
119+
let end = (chunks.end * 1024).min(self.len);
120+
121+
let offset = begin % 1024;
122+
123+
let inner = self.inner.slice(begin..end)?;
124+
125+
let len = end - begin;
126+
let n_chunks = (end - begin).div_ceil(1024);
127+
128+
Ok(PatchedArray {
129+
inner,
130+
n_chunks,
131+
n_lanes: self.n_lanes,
132+
offset,
133+
len,
134+
indices,
135+
values,
136+
values_ptype: self.values_ptype,
137+
lane_offsets: sliced_lane_offsets,
138+
stats_set: ArrayStats::default(),
139+
})
140+
}
141+
}
142+
143+
/// Transpose a set of patches from the default sorted layout into the data parallel layout.
144+
#[allow(clippy::cognitive_complexity)]
145+
fn transpose_patches(patches: &Patches, ctx: &mut ExecutionCtx) -> VortexResult<TransposedPatches> {
146+
let array_len = patches.array_len();
147+
let offset = patches.offset();
148+
149+
let indices = patches
150+
.indices()
151+
.clone()
152+
.execute::<Canonical>(ctx)?
153+
.into_primitive();
154+
155+
let values = patches
156+
.values()
157+
.clone()
158+
.execute::<Canonical>(ctx)?
159+
.into_primitive();
160+
161+
let indices_ptype = indices.ptype();
162+
let values_ptype = values.ptype();
163+
164+
let indices = indices.buffer_handle().clone().unwrap_host();
165+
let values = values.buffer_handle().clone().unwrap_host();
166+
167+
match_each_unsigned_integer_ptype!(indices_ptype, |I| {
168+
match_each_native_ptype!(values_ptype, |V| {
169+
let indices: Buffer<I> = Buffer::from_byte_buffer(indices);
170+
let values: Buffer<V> = Buffer::from_byte_buffer(values);
171+
172+
Ok(transpose(
173+
indices.as_slice(),
174+
values.as_slice(),
175+
offset,
176+
array_len,
177+
))
178+
})
179+
})
180+
}
181+
182+
#[allow(clippy::cast_possible_truncation)]
183+
fn transpose<I: IntegerPType, V: NativePType>(
184+
indices_in: &[I],
185+
values_in: &[V],
186+
offset: usize,
187+
array_len: usize,
188+
) -> TransposedPatches {
189+
// Total number of slots is number of chunks times number of lanes.
190+
let n_chunks = array_len.div_ceil(1024);
191+
assert!(
192+
n_chunks <= u32::MAX as usize,
193+
"Cannot transpose patches for array with >= 4 trillion elements"
194+
);
195+
196+
let n_lanes = patch_lanes::<V>();
197+
198+
// We know upfront how many indices and values we'll have.
199+
let mut indices_buffer = BufferMut::with_capacity(indices_in.len());
200+
let mut values_buffer = BufferMut::with_capacity(values_in.len());
201+
202+
// number of patches in each chunk.
203+
let mut lane_offsets: BufferMut<u32> = BufferMut::zeroed(n_chunks * n_lanes + 1);
204+
205+
// Scan the index/values once to get chunk/lane counts
206+
for index in indices_in {
207+
let index = index.as_() - offset;
208+
let chunk = index / 1024;
209+
let lane = index % n_lanes;
210+
211+
lane_offsets[chunk * n_lanes + lane + 1] += 1;
212+
}
213+
214+
// Prefix-sum sizes -> offsets
215+
for index in 1..lane_offsets.len() {
216+
lane_offsets[index] += lane_offsets[index - 1];
217+
}
218+
219+
// Loop over patches, writing thme to final positions
220+
let indices_out = indices_buffer.spare_capacity_mut();
221+
let values_out = values_buffer.spare_capacity_mut();
222+
for (index, &value) in std::iter::zip(indices_in, values_in) {
223+
let index = index.as_() - offset;
224+
let chunk = index / 1024;
225+
let lane = index % n_lanes;
226+
227+
let position = &mut lane_offsets[chunk * n_lanes + lane];
228+
indices_out[*position as usize].write((index % 1024) as u16);
229+
values_out[*position as usize].write(value);
230+
*position += 1;
231+
}
232+
233+
// SAFETY: we know there are exactly indices_in.len() indices/values, and we just
234+
// set them to the appropriate values in the loop above.
235+
unsafe {
236+
indices_buffer.set_len(indices_in.len());
237+
values_buffer.set_len(values_in.len());
238+
}
239+
240+
// Now, pass over all the indices and values again and subtract out the position increments.
241+
for index in indices_in {
242+
let index = index.as_() - offset;
243+
let chunk = index / 1024;
244+
let lane = index % n_lanes;
245+
246+
lane_offsets[chunk * n_lanes + lane] -= 1;
247+
}
248+
249+
TransposedPatches {
250+
n_chunks,
251+
n_lanes,
252+
lane_offsets: lane_offsets.freeze().into_byte_buffer(),
253+
indices: indices_buffer.freeze().into_byte_buffer(),
254+
values: values_buffer.freeze().into_byte_buffer(),
255+
}
256+
}

0 commit comments

Comments
 (0)