Skip to content

Commit 0e64d5e

Browse files
committed
reverse array access
1 parent c91d833 commit 0e64d5e

2 files changed

Lines changed: 49 additions & 3 deletions

File tree

vortex-layout/src/scan/repeated_scan.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ pub struct RepeatedScan<A: 'static + Send> {
3939
projection: Expression,
4040
filter: Option<Expression>,
4141
ordered: bool,
42+
/// Whether to iterate chunks in reverse order (last chunk first).
43+
reversed: bool,
4244
/// Optionally read a subset of the rows in the file.
4345
row_range: Option<Range<u64>>,
4446
/// The selection mask to apply to the selected row range.
@@ -100,13 +102,15 @@ impl<A: 'static + Send> RepeatedScan<A> {
100102
map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
101103
limit: Option<u64>,
102104
dtype: DType,
105+
reversed: bool,
103106
) -> Self {
104107
Self {
105108
session,
106109
layout_reader,
107110
projection,
108111
filter,
109112
ordered,
113+
reversed,
110114
row_range,
111115
selection,
112116
splits,
@@ -167,6 +171,12 @@ impl<A: 'static + Send> RepeatedScan<A> {
167171
let mut limit = self.limit;
168172
let mut tasks = Vec::new();
169173

174+
let ranges = if self.reversed {
175+
Either::Left(ranges.collect::<Vec<_>>().into_iter().rev())
176+
} else {
177+
Either::Right(ranges)
178+
};
179+
170180
for range in ranges {
171181
if range.start >= range.end {
172182
continue;
@@ -196,7 +206,7 @@ impl<A: 'static + Send> RepeatedScan<A> {
196206
let stream =
197207
futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
198208

199-
let stream = if self.ordered {
209+
let stream = if self.ordered || self.reversed {
200210
stream.buffered(concurrency).boxed()
201211
} else {
202212
stream.buffer_unordered(concurrency).boxed()

vortex-layout/src/scan/scan_builder.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use futures::future::BoxFuture;
1414
use futures::stream::BoxStream;
1515
use itertools::Itertools;
1616
use vortex_array::ArrayRef;
17+
use vortex_array::IntoArray;
18+
use vortex_array::arrays::PrimitiveArray;
1719
use vortex_array::dtype::DType;
1820
use vortex_array::dtype::Field;
1921
use vortex_array::dtype::FieldMask;
@@ -55,6 +57,10 @@ pub struct ScanBuilder<A> {
5557
filter: Option<Expression>,
5658
/// Whether the scan needs to return splits in the order they appear in the file.
5759
ordered: bool,
60+
/// Whether to yield chunks in reverse file order, with rows within each chunk also reversed.
61+
///
62+
/// Implies ordered output (chunks are emitted in strict reverse sequence, not interleaved).
63+
reversed: bool,
5864
/// Optionally read a subset of the rows in the file.
5965
row_range: Option<Range<u64>>,
6066
/// The selection mask to apply to the selected row range.
@@ -95,6 +101,7 @@ impl ScanBuilder<ArrayRef> {
95101
file_stats: None,
96102
limit: None,
97103
row_offset: 0,
104+
reversed: false,
98105
}
99106
}
100107

@@ -146,6 +153,20 @@ impl<A: 'static + Send> ScanBuilder<A> {
146153
self
147154
}
148155

156+
pub fn reversed(&self) -> bool {
157+
self.reversed
158+
}
159+
160+
/// Reverse the scan order: chunks are yielded last-to-first, and rows within each chunk are
161+
/// also reversed. This produces a globally reversed row sequence without reading the whole
162+
/// file first.
163+
///
164+
/// Reversed scans always produce ordered output (equivalent to `with_ordered(true)`).
165+
pub fn with_reversed(mut self, reversed: bool) -> Self {
166+
self.reversed = reversed;
167+
self
168+
}
169+
149170
pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
150171
self.row_range = Some(row_range);
151172
self
@@ -233,6 +254,7 @@ impl<A: 'static + Send> ScanBuilder<A> {
233254
file_stats: self.file_stats,
234255
limit: self.limit,
235256
row_offset: self.row_offset,
257+
reversed: self.reversed,
236258
map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
237259
}
238260
}
@@ -285,6 +307,19 @@ impl<A: 'static + Send> ScanBuilder<A> {
285307
)?)
286308
};
287309

310+
// If reversed, wrap the map_fn to reverse row order within each chunk via a lazy
311+
// `DictArray` take. Chunk order reversal is handled by `RepeatedScan::execute`.
312+
let map_fn = if self.reversed {
313+
let original = self.map_fn;
314+
Arc::new(move |array: ArrayRef| {
315+
let n = array.len() as u64;
316+
let indices = PrimitiveArray::from_iter((0..n).rev()).into_array();
317+
original(array.take(indices)?)
318+
}) as Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>
319+
} else {
320+
self.map_fn
321+
};
322+
288323
Ok(RepeatedScan::new(
289324
self.session.clone(),
290325
layout_reader,
@@ -295,9 +330,10 @@ impl<A: 'static + Send> ScanBuilder<A> {
295330
self.selection,
296331
splits,
297332
self.concurrency,
298-
self.map_fn,
333+
map_fn,
299334
self.limit,
300335
dtype,
336+
self.reversed,
301337
))
302338
}
303339

@@ -366,7 +402,7 @@ impl<A: 'static + Send> Stream for LazyScanStream<A> {
366402
match &mut self.state {
367403
LazyScanState::Builder(builder) => {
368404
let builder = builder.take().vortex_expect("polled after completion");
369-
let ordered = builder.ordered;
405+
let ordered = builder.ordered || builder.reversed;
370406
let num_workers = std::thread::available_parallelism()
371407
.map(|n| n.get())
372408
.unwrap_or(1);

0 commit comments

Comments
 (0)