[Repo Assist] fix(operators): forward scheduler in pairwise, to_marbles, delay_with_mapper#757
Merged
dbrattli merged 4 commits intoApr 17, 2026
Conversation
…_mapper Three operators were subscribing to their source without forwarding the scheduler received at subscription time, breaking scheduler propagation for downstream operators. * _pairwise.py: add scheduler=scheduler to source.subscribe() * _tomarbles.py: add scheduler=scheduler to source.subscribe() * _delaywithmapper.py: add scheduler=scheduler to sub_delay.subscribe() in the subscription-delay path (the main source path was already fixed) The other operators originally listed in issue #480 (delay_with_mapper main path, throttle_first, timeout_with_mapper) were already forwarding the scheduler correctly. Closes #480 (partial) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Cover pairwise, to_marbles, and delay_with_mapper (subscription-delay path): each test installs a source whose subscribe captures the scheduler it received, pipes through the operator, subscribes with a known scheduler, and asserts the captured scheduler matches. Verified to fail on the pre-fix code. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Collaborator
ReviewSmall, surgical fix adding the missing Correctness
TestsPushed a follow-up commit adding regression tests for each operator (one test per file). Each test installs a source Full suite: 1495 passed, 15 skipped. Pyright and ruff clean. Taking out of draft. |
dbrattli
approved these changes
Apr 17, 2026
…ding-ea9c5c512fb11be9
3 tasks
…ding-ea9c5c512fb11be9
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
🤖 This PR was created by Repo Assist, an automated AI assistant.
Closes #480 (partial)
Root Cause
Three operators were calling
source.subscribe(on_next, ...)without forwarding theschedulerargument received at subscription time:pairwisereactivex/operators/_pairwise.pysource.subscribe()missingscheduler=schedulerto_marblesreactivex/operators/_tomarbles.pysource.subscribe()missingscheduler=schedulerdelay_with_mapperreactivex/operators/_delaywithmapper.pysub_delay.subscribe()in the subscription-delay path missingscheduler=schedulerThis means that when a user composes a pipeline with an explicit scheduler (e.g.,
pipe(pairwise(), observe_on(my_scheduler))), the scheduler is silently dropped at these operators, breaking deterministic time control and scheduler propagation.Fix
Added
scheduler=schedulerto thesource.subscribe(...)/sub_delay.subscribe(...)calls in each affected operator. The fix is minimal and surgical — no behaviour change whenscheduler=None(the default).The other operators originally listed in #480 (
delay_with_mappermain path,throttle_first,timeout_with_mapper) were already forwarding the scheduler correctly in the current codebase.Testing
Existing tests pass:
tests/test_observable/test_pairwise.py— 12 passedtests/test_observable/test_delaywithmapper.py— included in aboveTrade-offs
scheduler=Nonebehaviour is identical to before.changes.mdupdated under Unreleased.Note
🔒 Integrity filter blocked 27 items
The following items were blocked because they don't meet the GitHub integrity level.
list_issues: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_pull_requests: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_pull_requests: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_pull_requests: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_pull_requests: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_pull_requests: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_pull_requests: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_pull_requests: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_pull_requests: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_issues: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_issues: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_issues: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_issues: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_issues: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_issues: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".list_issues: has lower integrity than agent requires. The agent cannot read data with integrity below "approved".To allow these resources, lower
min-integrityin your GitHub frontmatter: