Skip to content

Commit 5169b06

Browse files
kaarolchcursoragent
andcommitted
fix(aggregate transform): use Euclidean division for event-time buckets
Rust integer `/` truncates toward zero, so negative millis (e.g. just before the Unix epoch) were aligned to bucket 0 instead of the correct window start. Use `div_euclid` for consistent half-open window alignment. Adds a regression test for timestamp -1 ms at interval 10s. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 718b502 commit 5169b06

1 file changed

Lines changed: 43 additions & 2 deletions

File tree

src/transforms/aggregate.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,17 @@ impl Aggregate {
256256
})
257257
}
258258

259-
const fn bucket_key(&self, timestamp: DateTime<Utc>) -> BucketKey {
259+
/// Start of the half-open window `[bucket_key, bucket_key + interval_ms)` containing
260+
/// `timestamp`, aligned to multiples of `interval_ms` from the Unix epoch.
261+
///
262+
/// Euclidean division (`div_euclid`) is required: Rust's truncating `/`
263+
/// rounds toward zero, so timestamps just before the epoch (negative
264+
/// millis) would incorrectly map into the non-negative bucket `[0, interval)`
265+
/// instead of `[-interval, 0)`.
266+
fn bucket_key(&self, timestamp: DateTime<Utc>) -> BucketKey {
260267
let timestamp_ms = timestamp.timestamp_millis();
261268
let interval_ms = self.interval.as_millis() as i64;
262-
(timestamp_ms / interval_ms) * interval_ms
269+
timestamp_ms.div_euclid(interval_ms).saturating_mul(interval_ms)
263270
}
264271

265272
/// Returns `true` if `bucket_key` belongs to a window that has already
@@ -1763,6 +1770,40 @@ interval_ms = 999999
17631770
}
17641771
}
17651772

1773+
/// Rust truncating `/` rounds toward zero, so `-1 / 10000 == 0` and the
1774+
/// bucket anchor would wrongly be `0`. Euclidean alignment places
1775+
/// `-1ms` in `[-interval_ms, 0)` anchored at `-interval_ms`.
1776+
#[test]
1777+
fn event_time_pre_epoch_buckets_use_floor_division() {
1778+
let mut agg = Aggregate::new(&AggregateConfig {
1779+
interval_ms: 10_000_u64,
1780+
mode: AggregationMode::Auto,
1781+
time_source: TimeSource::EventTime,
1782+
allowed_lateness_ms: 0,
1783+
use_system_time_for_missing_timestamps: false,
1784+
max_future_ms: 10_000,
1785+
})
1786+
.unwrap();
1787+
1788+
let ts = Utc
1789+
.timestamp_millis_opt(-1)
1790+
.latest()
1791+
.expect("valid millis near epoch");
1792+
1793+
agg.record(make_metric_with_timestamp(
1794+
"pre_epoch",
1795+
MetricKind::Incremental,
1796+
MetricValue::Counter { value: 1.0 },
1797+
ts,
1798+
));
1799+
1800+
assert_eq!(
1801+
agg.event_time_buckets.keys().next().copied(),
1802+
Some(-10_000),
1803+
"-1 ms must bucket to [-10000, 0), not [0, 10000)"
1804+
);
1805+
}
1806+
17661807
#[test]
17671808
fn event_time_out_of_order_rejection() {
17681809
let mut agg = Aggregate::new(&AggregateConfig {

0 commit comments

Comments
 (0)