77
88use vortex:: buffer:: Buffer ;
99use vortex:: buffer:: BufferMut ;
10- use vortex:: buffer:: buffer_mut;
1110use vortex_array:: Canonical ;
1211use vortex_array:: buffer:: BufferHandle ;
1312use vortex_array:: dtype:: IntegerPType ;
@@ -40,19 +39,6 @@ const fn patch_lanes<V: Sized>() -> usize {
4039 if size_of :: < V > ( ) < 8 { 32 } else { 16 }
4140}
4241
43- #[ derive( Clone ) ]
44- struct Chunk < V > {
45- lanes : Vec < Lane < V > > ,
46- }
47-
48- impl < V : Copy + Default > Default for Chunk < V > {
49- fn default ( ) -> Self {
50- Self {
51- lanes : vec ! [ Lane :: <V >:: default ( ) ; patch_lanes:: <V >( ) ] ,
52- }
53- }
54- }
55-
5642/// A set of patches of values `V` existing in host buffers.
5743#[ allow( dead_code) ]
5844pub struct HostPatches < V > {
@@ -96,6 +82,20 @@ impl<V: Copy> HostPatches<V> {
9682 }
9783 }
9884
85+ /// Apply the patches on top of the other buffer.
86+ #[ cfg( test) ]
87+ fn apply ( & self , output : & mut BufferMut < V > ) {
88+ for chunk in 0 ..self . n_chunks {
89+ for lane in 0 ..self . n_lanes {
90+ let patches = self . patches ( chunk, lane) ;
91+ for ( & index, & value) in std:: iter:: zip ( patches. indices , patches. values ) {
92+ let full_index = chunk * 1024 + ( index as usize ) ;
93+ output[ full_index] = value;
94+ }
95+ }
96+ }
97+ }
98+
9999 /// Export the patches for use on the device associated with the provided execution context.
100100 pub async fn export_to_device (
101101 mut self ,
@@ -122,23 +122,6 @@ impl<V: Copy> HostPatches<V> {
122122 }
123123}
124124
125- #[ derive( Debug , Default , Clone ) ]
126- struct Lane < V > {
127- indices : Vec < u16 > ,
128- values : Vec < V > ,
129- }
130-
131- impl < V : Copy > Lane < V > {
132- fn push ( & mut self , index : u16 , value : V ) {
133- self . indices . push ( index) ;
134- self . values . push ( value) ;
135- }
136-
137- fn len ( & self ) -> usize {
138- self . indices . len ( )
139- }
140- }
141-
142125/// Transpose a set of patches from the default sorted layout into the data parallel layout.
143126#[ allow( clippy:: cognitive_complexity) ]
144127pub async fn transpose_patches (
@@ -180,8 +163,8 @@ pub async fn transpose_patches(
180163
181164#[ allow( clippy:: cast_possible_truncation) ]
182165fn transpose < I : IntegerPType , V : NativePType > (
183- indices : & [ I ] ,
184- values : & [ V ] ,
166+ indices_in : & [ I ] ,
167+ values_in : & [ V ] ,
185168 offset : usize ,
186169 array_len : usize ,
187170) -> HostPatches < V > {
@@ -193,30 +176,56 @@ fn transpose<I: IntegerPType, V: NativePType>(
193176 ) ;
194177
195178 let n_lanes = patch_lanes :: < V > ( ) ;
196- let mut chunks: Vec < Chunk < V > > = vec ! [ Chunk :: default ( ) ; n_chunks] ;
197179
198- // For each chunk, for each lane, push new values
199- for ( index, & value) in std:: iter:: zip ( indices, values) {
180+ // We know upfront how many indices and values we'll have.
181+ let mut indices_buffer = BufferMut :: with_capacity ( indices_in. len ( ) ) ;
182+ let mut values_buffer = BufferMut :: with_capacity ( values_in. len ( ) ) ;
183+
184+ // number of patches in each chunk.
185+ let mut lane_offsets: BufferMut < u32 > = BufferMut :: zeroed ( n_chunks * n_lanes + 1 ) ;
186+
187+ // Scan the index/values once to get chunk/lane counts
188+ for index in indices_in {
200189 let index = index. as_ ( ) - offset;
190+ let chunk = index / 1024 ;
191+ let lane = index % n_lanes;
192+
193+ lane_offsets[ chunk * n_lanes + lane + 1 ] += 1 ;
194+ }
195+
196+ // Prefix-sum sizes -> offsets
197+ for index in 1 ..lane_offsets. len ( ) {
198+ lane_offsets[ index] += lane_offsets[ index - 1 ] ;
199+ }
201200
201+ // Loop over patches, writing them to final positions
202+ let indices_out = indices_buffer. spare_capacity_mut ( ) ;
203+ let values_out = values_buffer. spare_capacity_mut ( ) ;
204+ for ( index, & value) in std:: iter:: zip ( indices_in, values_in) {
205+ let index = index. as_ ( ) - offset;
202206 let chunk = index / 1024 ;
203207 let lane = index % n_lanes;
204208
205- chunks[ chunk] . lanes [ lane] . push ( ( index % 1024 ) as u16 , value) ;
209+ let position = & mut lane_offsets[ chunk * n_lanes + lane] ;
210+ indices_out[ * position as usize ] . write ( ( index % 1024 ) as u16 ) ;
211+ values_out[ * position as usize ] . write ( value) ;
212+ * position += 1 ;
206213 }
207214
208- // Reshuffle the different containers into a single contiguous buffer each for indices/values
209- let mut lane_offset = 0 ;
210- let mut lane_offsets = buffer_mut ! [ 0u32 ] ;
211- let mut indices_buffer = BufferMut :: empty ( ) ;
212- let mut values_buffer = BufferMut :: empty ( ) ;
213- for chunk in chunks {
214- for lane in chunk. lanes {
215- indices_buffer. extend_from_slice ( & lane. indices ) ;
216- values_buffer. extend_from_slice ( & lane. values ) ;
217- lane_offset += lane. len ( ) as u32 ;
218- lane_offsets. push ( lane_offset) ;
219- }
215+ // SAFETY: we know there are exactly indices_in.len() indices/values, and we just
216+ // set them to the appropriate values in the loop above.
217+ unsafe {
218+ indices_buffer. set_len ( indices_in. len ( ) ) ;
219+ values_buffer. set_len ( values_in. len ( ) ) ;
220+ }
221+
222+ // Now, pass over all the indices and values again and subtract out the position increments.
223+ for index in indices_in {
224+ let index = index. as_ ( ) - offset;
225+ let chunk = index / 1024 ;
226+ let lane = index % n_lanes;
227+
228+ lane_offsets[ chunk * n_lanes + lane] -= 1 ;
220229 }
221230
222231 HostPatches {
@@ -232,6 +241,15 @@ fn transpose<I: IntegerPType, V: NativePType>(
232241mod tests {
233242 use vortex:: buffer:: BufferMut ;
234243 use vortex:: buffer:: buffer;
244+ use vortex:: buffer:: buffer_mut;
245+ use vortex_array:: ExecutionCtx ;
246+ use vortex_array:: IntoArray ;
247+ use vortex_array:: LEGACY_SESSION ;
248+ use vortex_array:: arrays:: PrimitiveArray ;
249+ use vortex_array:: assert_arrays_eq;
250+ use vortex_array:: dtype:: NativePType ;
251+ use vortex_array:: patches:: Patches ;
252+ use vortex_error:: VortexResult ;
235253
236254 use crate :: kernel:: patches:: types:: transpose;
237255
@@ -285,4 +303,53 @@ mod tests {
285303 assert_eq ! ( transposed. patches( 3 , 4 ) . values, & [ 80 ] ) ;
286304 assert_eq ! ( transposed. patches( 3 , 4 ) . indices, & [ 4 ] ) ;
287305 }
306+
307+ #[ test]
308+ #[ allow( clippy:: cast_possible_truncation) ]
309+ fn test_transpose_complex ( ) -> VortexResult < ( ) > {
310+ test_case ( 1024 , 0 , & [ 0 ] , & [ 0f32 ] ) ?;
311+ test_case ( 512 , 512 , & [ 512 , 513 , 514 ] , & [ 10i8 , 20 , 30 ] ) ?;
312+ test_case ( 10_000 , 100 , & [ 500 , 1_000 , 1_001 , 1_002 ] , & [ 1i16 , 2 , 3 , 4 ] ) ?;
313+
314+ for len in ( 1 ..4096 ) . step_by ( 10 ) {
315+ let offset = len / 2 ;
316+
317+ let indices: Vec < u32 > = ( offset..len) . map ( |x| x as u32 ) . collect ( ) ;
318+
319+ test_case ( len, offset, & indices, & indices) ?;
320+ }
321+
322+ Ok ( ( ) )
323+ }
324+
325+ fn test_case < V : NativePType > (
326+ len : usize ,
327+ offset : usize ,
328+ patch_indices : & [ u32 ] ,
329+ patch_values : & [ V ] ,
330+ ) -> VortexResult < ( ) > {
331+ let mut data = buffer_mut ! [ V :: default ( ) ; len] ;
332+ let array = PrimitiveArray :: from_iter ( data. iter ( ) . copied ( ) ) ;
333+
334+ let patches = Patches :: new (
335+ len,
336+ offset,
337+ PrimitiveArray :: from_iter ( patch_indices. iter ( ) . copied ( ) ) . into_array ( ) ,
338+ PrimitiveArray :: from_iter ( patch_values. iter ( ) . copied ( ) ) . into_array ( ) ,
339+ None ,
340+ ) ?;
341+
342+ // Verify that the outputs match between Patches and transpose_patches().
343+ let mut ctx = ExecutionCtx :: new ( LEGACY_SESSION . clone ( ) ) ;
344+ let patched = array. patch ( & patches, & mut ctx) ?. into_array ( ) ;
345+
346+ let transposed = transpose ( patch_indices, patch_values, offset, len) ;
347+ transposed. apply ( & mut data) ;
348+
349+ let patched_transposed = data. freeze ( ) . into_array ( ) ;
350+
351+ assert_arrays_eq ! ( patched, patched_transposed) ;
352+
353+ Ok ( ( ) )
354+ }
288355}
0 commit comments