Skip to content

Commit 56c23d7

Browse files
authored
Add tick_delay to mitigate boundary races in cascaded resampling (#1394)
## Problem In cascaded timer-based resamplers (for example `1s -> 10s`), a race can happen at window boundaries: the downstream resampler can tick and compute its window before the upstream resampler has emitted the last sample for the previous window. Example with `max_data_age_in_periods=1`: - The upstream resampler computes `[9,10)` and emits its result (label `9`) slightly **after** `t=10`. - The downstream resampler computes `[0,10)` at `t=10` and does not see that label-`9` sample yet. - At the next tick (`t=20`), that delayed sample is too old and is no longer considered. Result: the affected downstream window stays permanently incomplete. ## Change - Added `tick_delay` to `ResamplerConfig` and `ResamplerConfig2`. - `tick_delay` is keyword-only to avoid positional-argument API breakage. - The resampler can now optionally wait `tick_delay` after a tick, while still computing with the original window boundary. - This delays processing without shifting the actual window definitions. - Added validation: `tick_delay` must be `>= 0` and `< resampling_period`. - Added documentation for the cascaded-resampling use case. - Marked `tick_delay` as experimental (it may be changed or deprecated in the future). ## Tests - Added validation tests for invalid `tick_delay` values (`< 0`, `>= resampling_period`). - Added fixture-based timing tests that separate sample arrival time from timestamp-based window filtering. - Added a with/without-delay late-arrival comparison (same timestamps and arrival timing, only `tick_delay` differs) to make the delay effect explicit.
2 parents 57c8f28 + 56c4868 commit 56c23d7

4 files changed

Lines changed: 383 additions & 2 deletions

File tree

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* A new `tick_delay` option was added to `ResamplerConfig` and `ResamplerConfig2` to delay resampling execution after each timer tick. The delay was designed to postpone processing while keeping window boundaries aligned to the original tick times, which can be used for cascaded resampling pipelines. This option is experimental and may be changed or deprecated in a future release.
1414

1515
## Bug Fixes
1616

src/frequenz/sdk/timeseries/_resampling/_config.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,25 @@ class ResamplerConfig:
201201
time the resampler is created.
202202
"""
203203

204+
tick_delay: timedelta = field(default=timedelta(0), kw_only=True)
205+
"""Delay before processing each resampling tick.
206+
207+
This delays when resampling computation happens, while keeping the
208+
resampling windows aligned to the original timer tick boundaries. This
209+
delay is only a time-based buffer, not a strict synchronization mechanism.
210+
211+
Warning:
212+
This is an experimental option and may be changed or deprecated in
213+
the future.
214+
215+
Example:
216+
This can be used in cascaded resampling setups to reduce timing races
217+
where downstream windows are processed before upstream resampled values
218+
are emitted.
219+
220+
It must be non-negative and smaller than `resampling_period`.
221+
"""
222+
204223
def __post_init__(self) -> None:
205224
"""Check that config values are valid.
206225
@@ -245,6 +264,13 @@ def __post_init__(self) -> None:
245264
raise ValueError(
246265
f"align_to ({self.align_to}) should be a timezone aware datetime"
247266
)
267+
if self.tick_delay < timedelta(0):
268+
raise ValueError(f"tick_delay ({self.tick_delay}) should be non-negative")
269+
if self.tick_delay >= self.resampling_period:
270+
raise ValueError(
271+
f"tick_delay ({self.tick_delay}) should be smaller than "
272+
f"resampling_period ({self.resampling_period})"
273+
)
248274

249275

250276
class ResamplingFunction2(Protocol):
@@ -415,3 +441,10 @@ def __post_init__(self) -> None:
415441
raise ValueError(
416442
f"align_to ({self.align_to}) must be specified via timer_config"
417443
)
444+
if self.tick_delay < timedelta(0):
445+
raise ValueError(f"tick_delay ({self.tick_delay}) should be non-negative")
446+
if self.tick_delay >= self.resampling_period:
447+
raise ValueError(
448+
f"tick_delay ({self.tick_delay}) should be smaller than "
449+
f"resampling_period ({self.resampling_period})"
450+
)

src/frequenz/sdk/timeseries/_resampling/_resampler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ async def resample(self, *, one_shot: bool = False) -> None:
200200
case unexpected:
201201
assert_never(unexpected)
202202

203+
# Delay processing to let upstream cascaded resamplers emit their
204+
# boundary samples first; window boundaries still use next_tick_time.
205+
if self._config.tick_delay:
206+
await asyncio.sleep(self._config.tick_delay.total_seconds())
207+
203208
# We need to make a copy here because we need to match the results to the
204209
# current resamplers, and since we await here, new resamplers could be added
205210
# or removed from the dict while we awaiting the resampling, which would

0 commit comments

Comments
 (0)