Skip to content

Commit 7820458

Browse files
github-actions[bot]Copilot
authored andcommitted
fix(operators): reset retry budget per subscription to fix retry+repeat
The retry(n) generator was created once at observable creation time and shared across all subscriptions. When combined with repeat(), the retry pool was exhausted after n total subscriptions regardless of errors, causing repeat() to resubscribe to an empty generator that immediately completed. Fix: create the generator inside the subscribe closure so each subscription gets a fresh retry budget. This matches the expected ReactiveX semantics where retry(n) means 'on error, resubscribe up to n times per subscription'. Closes #712 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 49e9a89 commit 7820458

2 files changed

Lines changed: 45 additions & 7 deletions

File tree

reactivex/operators/_retry.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import TypeVar
22

33
import reactivex
4-
from reactivex import Observable
4+
from reactivex import Observable, abc
55
from reactivex.internal import curry_flip
66
from reactivex.internal.utils import infinite
77

@@ -17,6 +17,10 @@ def retry_(
1717
times or until it successfully terminates. If the retry count is
1818
not specified, it retries indefinitely.
1919
20+
The retry budget is per-subscription, so combining ``retry(n)`` with
21+
``repeat()`` works as expected: each resubscription by ``repeat()``
22+
starts with a fresh retry allowance.
23+
2024
Examples:
2125
>>> result = source.pipe(retry())
2226
>>> result = retry()(source)
@@ -32,12 +36,21 @@ def retry_(
3236
sequence repeatedly until it terminates successfully.
3337
"""
3438

35-
if retry_count is None:
36-
gen = infinite()
37-
else:
38-
gen = range(retry_count)
39-
40-
return reactivex.catch_with_iterable(source for _ in gen)
39+
def subscribe(
40+
observer: abc.ObserverBase[_T], scheduler_: abc.SchedulerBase | None = None
41+
) -> abc.DisposableBase:
42+
# Create a fresh generator on every subscription so that the retry
43+
# budget is not shared across resubscriptions (e.g. via repeat()).
44+
if retry_count is None:
45+
gen = infinite()
46+
else:
47+
gen = range(retry_count)
48+
49+
return reactivex.catch_with_iterable(source for _ in gen).subscribe(
50+
observer, scheduler=scheduler_
51+
)
52+
53+
return Observable(subscribe)
4154

4255

4356
__all__ = ["retry_"]

tests/test_observable/test_retry.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,31 @@ def dispose(_, __):
193193
with pytest.raises(Exception):
194194
xss.subscribe()
195195

196+
def test_retry_with_count_combined_with_repeat(self):
197+
"""retry(n) should reset its budget per subscription so repeat() works correctly.
198+
199+
Regression test for https://github.com/ReactiveX/RxPY/issues/712.
200+
"""
201+
scheduler = TestScheduler()
202+
xs = scheduler.create_cold_observable(on_next(90, 42), on_completed(200))
203+
204+
result = scheduler.start(
205+
lambda: xs.pipe(ops.retry(2), ops.repeat()),
206+
disposed=1000,
207+
)
208+
assert result.messages == [
209+
on_next(290, 42),
210+
on_next(490, 42),
211+
on_next(690, 42),
212+
on_next(890, 42),
213+
]
214+
assert xs.subscriptions == [
215+
subscribe(200, 400),
216+
subscribe(400, 600),
217+
subscribe(600, 800),
218+
subscribe(800, 1000),
219+
]
220+
196221

197222
if __name__ == "__main__":
198223
unittest.main()

0 commit comments

Comments
 (0)