Skip to content

Commit b1dfab1

Browse files
Gautam ManchandaniGautam Manchandani
authored andcommitted
Stop canceled uploads from finishing
1 parent d9f59e2 commit b1dfab1

2 files changed

Lines changed: 27 additions & 0 deletions

File tree

packages/reflex-base/src/reflex_base/event/processor/event_processor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,9 @@ async def _emit_delta_impl(
439439
yield await result
440440
waiting_for.add(asyncio.create_task(deltas.get()))
441441
break
442+
except (asyncio.CancelledError, GeneratorExit):
443+
task_future.cancel()
444+
raise
442445
finally:
443446
for future in waiting_for:
444447
future.cancel()

tests/units/reflex_base/event/processor/test_event_processor.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ async def _multi_delta_handler():
6464
await asyncio.sleep(0.01)
6565

6666

67+
async def _delta_then_slow_logging_handler():
68+
"""Emit an initial delta, then block long enough for cancellation to matter."""
69+
ctx = EventContext.get()
70+
await ctx.emit_delta({"state": {"x": 1}})
71+
await asyncio.sleep(0.5)
72+
_CALL_LOG.append({"value": "completed_after_stream_close"})
73+
74+
6775
async def _slow_logging_handler(value: str = "default"):
6876
"""A slow logging handler that pauses before recording.
6977
@@ -117,6 +125,7 @@ async def _background_slow_logging_handler(value: str = "default"):
117125
chaining_event = EventHandler(fn=_chaining_handler)
118126
delta_event = EventHandler(fn=_delta_handler)
119127
multi_delta_event = EventHandler(fn=_multi_delta_handler)
128+
delta_then_slow_logging_event = EventHandler(fn=_delta_then_slow_logging_handler)
120129
slow_logging_event = EventHandler(fn=_slow_logging_handler)
121130
multi_chaining_event = EventHandler(fn=_multi_chaining_handler)
122131
background_slow_logging_event = EventHandler(fn=_background_slow_logging_handler)
@@ -140,6 +149,7 @@ def _register_handlers(forked_registration_context: RegistrationContext):
140149
chaining_event,
141150
delta_event,
142151
multi_delta_event,
152+
delta_then_slow_logging_event,
143153
slow_logging_event,
144154
multi_chaining_event,
145155
background_slow_logging_event,
@@ -510,6 +520,20 @@ async def test_stream_delta_noop_handler_yields_nothing(token: str):
510520
assert collected == []
511521

512522

523+
async def test_stream_delta_cancels_event_when_generator_closes_early(token: str):
524+
"""Closing enqueue_stream_delta early cancels the in-flight event."""
525+
ep = EventProcessor(graceful_shutdown_timeout=2)
526+
ep.configure()
527+
async with ep:
528+
event = Event.from_event_type(delta_then_slow_logging_event())[0]
529+
stream = ep.enqueue_stream_delta(token, event)
530+
assert await anext(stream) == {"state": {"x": 1}}
531+
await stream.aclose()
532+
await asyncio.sleep(0.1)
533+
534+
assert _CALL_LOG == []
535+
536+
513537
async def test_stream_delta_not_configured_raises():
514538
"""enqueue_stream_delta raises RuntimeError if processor is not configured."""
515539
ep = EventProcessor()

0 commit comments

Comments
 (0)