Skip to content

Commit 2c8cf23

Browse files
Dandandanclaude
andauthored
Fix: lead/lag extreme offsets handling (#22243)
## Which issue does this PR close? - Closes #22231 - Closes #22221 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7b4d2e6 commit 2c8cf23

2 files changed

Lines changed: 99 additions & 17 deletions

File tree

datafusion/functions-window/src/lead_lag.rs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3636
use std::cmp::min;
3737
use std::collections::VecDeque;
3838
use std::hash::Hash;
39-
use std::ops::{Neg, Range};
39+
use std::ops::Range;
4040
use std::sync::{Arc, LazyLock};
4141

4242
get_or_init_udwf!(
@@ -116,7 +116,7 @@ impl WindowShiftKind {
116116
fn shift_offset(&self, value: Option<i64>) -> i64 {
117117
match self {
118118
WindowShiftKind::Lag => value.unwrap_or(1),
119-
WindowShiftKind::Lead => value.map(|v| v.neg()).unwrap_or(-1),
119+
WindowShiftKind::Lead => value.map_or(-1, |v| v.wrapping_neg()),
120120
}
121121
}
122122
}
@@ -266,7 +266,7 @@ impl WindowUDFImpl for WindowShift {
266266
.map(|n| self.kind.shift_offset(n))
267267
.map(|offset| {
268268
if partition_evaluator_args.is_reversed() {
269-
-offset
269+
offset.wrapping_neg()
270270
} else {
271271
offset
272272
}
@@ -410,6 +410,15 @@ struct WindowShiftEvaluator {
410410
non_null_offsets: VecDeque<usize>,
411411
}
412412

413+
fn offset_magnitude(offset: i64) -> usize {
414+
let offset = offset.unsigned_abs();
415+
if offset > usize::MAX as u64 {
416+
usize::MAX
417+
} else {
418+
offset as usize
419+
}
420+
}
421+
413422
impl WindowShiftEvaluator {
414423
fn is_lag(&self) -> bool {
415424
// Mode is LAG, when shift_offset is positive
@@ -503,27 +512,27 @@ fn shift_with_default_value(
503512

504513
impl PartitionEvaluator for WindowShiftEvaluator {
505514
fn get_range(&self, idx: usize, n_rows: usize) -> Result<Range<usize>> {
515+
let offset = offset_magnitude(self.shift_offset);
516+
506517
if self.is_lag() {
507-
let start = if self.non_null_offsets.len() == self.shift_offset as usize {
518+
let start = if self.non_null_offsets.len() == offset {
508519
// How many rows needed previous than the current row to get necessary lag result
509520
let offset: usize = self.non_null_offsets.iter().sum();
510521
idx.saturating_sub(offset)
511522
} else if !self.ignore_nulls {
512-
let offset = self.shift_offset as usize;
513523
idx.saturating_sub(offset)
514524
} else {
515525
0
516526
};
517527
let end = idx + 1;
518528
Ok(Range { start, end })
519529
} else {
520-
let end = if self.non_null_offsets.len() == (-self.shift_offset) as usize {
530+
let end = if self.non_null_offsets.len() == offset {
521531
// How many rows needed further than the current row to get necessary lead result
522532
let offset: usize = self.non_null_offsets.iter().sum();
523-
min(idx + offset + 1, n_rows)
533+
min(idx.saturating_add(offset).saturating_add(1), n_rows)
524534
} else if !self.ignore_nulls {
525-
let offset = (-self.shift_offset) as usize;
526-
min(idx + offset, n_rows)
535+
min(idx.saturating_add(offset), n_rows)
527536
} else {
528537
n_rows
529538
};
@@ -546,20 +555,26 @@ impl PartitionEvaluator for WindowShiftEvaluator {
546555

547556
// LAG mode
548557
let i = if self.is_lag() {
549-
(range.end as i64 - self.shift_offset - 1) as usize
558+
range
559+
.end
560+
.checked_sub(1)
561+
.and_then(|end| (end as i64).checked_sub(self.shift_offset))
562+
.and_then(|value| usize::try_from(value).ok())
550563
} else {
551564
// LEAD mode
552-
(range.start as i64 - self.shift_offset) as usize
565+
(range.start as i64)
566+
.checked_sub(self.shift_offset)
567+
.and_then(|value| usize::try_from(value).ok())
553568
};
554-
555-
let mut idx: Option<usize> = if i < len { Some(i) } else { None };
569+
let mut idx: Option<usize> = i.filter(|i| *i < len);
556570

557571
// LAG with IGNORE NULLS calculated as the current row index - offset, but only for non-NULL rows
558572
// If current row index points to NULL value the row is NOT counted
559573
if self.ignore_nulls && self.is_lag() {
560574
// LAG when NULLS are ignored.
561575
// Find the nonNULL row index that shifted by offset comparing to current row index
562-
idx = if self.non_null_offsets.len() == self.shift_offset as usize {
576+
let shift_offset = offset_magnitude(self.shift_offset);
577+
idx = if self.non_null_offsets.len() == shift_offset {
563578
let total_offset: usize = self.non_null_offsets.iter().sum();
564579
Some(range.end - 1 - total_offset)
565580
} else {
@@ -570,7 +585,7 @@ impl PartitionEvaluator for WindowShiftEvaluator {
570585
if array.is_valid(range.end - 1) {
571586
// Non-null add new offset
572587
self.non_null_offsets.push_back(1);
573-
if self.non_null_offsets.len() > self.shift_offset as usize {
588+
if self.non_null_offsets.len() > shift_offset {
574589
// WE do not need to keep track of more than `lag number of offset` values.
575590
self.non_null_offsets.pop_front();
576591
}
@@ -582,7 +597,7 @@ impl PartitionEvaluator for WindowShiftEvaluator {
582597
} else if self.ignore_nulls && !self.is_lag() {
583598
// LEAD when NULLS are ignored.
584599
// Stores the necessary non-null entry number further than the current row.
585-
let non_null_row_count = (-self.shift_offset) as usize;
600+
let non_null_row_count = offset_magnitude(self.shift_offset);
586601

587602
if self.non_null_offsets.is_empty() {
588603
// When empty, fill non_null offsets with the data further than the current row.
@@ -596,7 +611,8 @@ impl PartitionEvaluator for WindowShiftEvaluator {
596611
}
597612
// It is enough to keep track of `non_null_row_count + 1` non-null offset.
598613
// further data is unnecessary for the result.
599-
if self.non_null_offsets.len() == non_null_row_count + 1 {
614+
if self.non_null_offsets.len() == non_null_row_count.saturating_add(1)
615+
{
600616
break;
601617
}
602618
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Tests for extreme lead offsets regression cases (#22221, #22231)
2+
3+
statement ok
4+
CREATE TABLE lead_lag_extreme_offsets(id INT, value INT) AS VALUES
5+
(1, 10),
6+
(2, 20),
7+
(3, 30);
8+
9+
# i64::MIN lead offset should not panic and should produce the provided default
10+
query I
11+
SELECT lead(value, -9223372036854775808, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id;
12+
----
13+
0
14+
0
15+
0
16+
17+
# Very large lead offset should not panic and should produce the provided default
18+
query I
19+
SELECT lead(value, 9223372036854775807, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id;
20+
----
21+
0
22+
0
23+
0
24+
25+
# i64::MIN lead offset without explicit default should return NULL
26+
query I
27+
SELECT lead(value, -9223372036854775808) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id;
28+
----
29+
NULL
30+
NULL
31+
NULL
32+
33+
# Out-of-range offset literal should fail as a non-integer offset
34+
query error DataFusion error: Execution error: Expected an integer value
35+
SELECT lead(value, -9223372036854775809, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets;
36+
37+
# Very large lag offset should not panic and should produce the provided default
38+
query I
39+
SELECT lag(value, 9223372036854775807, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id;
40+
----
41+
0
42+
0
43+
0
44+
45+
# i64::MIN lag offset should not panic and should produce the provided default
46+
query I
47+
SELECT lag(value, -9223372036854775808, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id;
48+
----
49+
0
50+
0
51+
0
52+
53+
# Very large lag offset without explicit default should return NULL
54+
query I
55+
SELECT lag(value, 9223372036854775807) OVER (ORDER BY id) FROM lead_lag_extreme_offsets ORDER BY id;
56+
----
57+
NULL
58+
NULL
59+
NULL
60+
61+
# Out-of-range lag offset literal should fail as a non-integer offset
62+
query error DataFusion error: Execution error: Expected an integer value
63+
SELECT lag(value, -9223372036854775809, 0) OVER (ORDER BY id) FROM lead_lag_extreme_offsets;
64+
65+
statement ok
66+
DROP TABLE lead_lag_extreme_offsets;

0 commit comments

Comments
 (0)