Skip to content

Commit 2c5abdb

Browse files
authored
fix: graceful stream shutdown on exceptions in streaming actions (#680)
* fix: allow streaming actions to gracefully handle raised exceptions When a streaming action catches an exception and yields a final state in a try/except/finally block, the stream now completes gracefully instead of propagating the exception and killing the connection. If the generator yields a valid state_update before the exception propagates, the exception is suppressed and the stream terminates normally. If no state was yielded, the exception propagates as before. Closes #581 * fix: add logging for suppressed exceptions and tests for streaming graceful shutdown - Add logger.warning with exc_info in all 4 streaming except blocks - Remove dead caught_exc variable in multi-step functions - Fix count increment asymmetry between sync and async single-step - Add 8 tests covering graceful exception handling and propagation * style: apply black formatting * style: apply pre-commit formatting (black 23.11.0, isort 5.12.0)
1 parent f884adf commit 2c5abdb

2 files changed

Lines changed: 565 additions & 114 deletions

File tree

burr/core/application.py

Lines changed: 155 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -338,31 +338,44 @@ def _run_single_step_streaming_action(
338338
result = None
339339
state_update = None
340340
count = 0
341-
for item in generator:
342-
if not isinstance(item, tuple):
343-
# TODO -- consider adding support for just returning a result.
344-
raise ValueError(
345-
f"Action {action.name} must yield a tuple of (result, state_update). "
346-
f"For all non-final results (intermediate),"
347-
f"the state update must be None"
348-
)
349-
result, state_update = item
350-
count += 1
341+
try:
342+
for item in generator:
343+
if not isinstance(item, tuple):
344+
# TODO -- consider adding support for just returning a result.
345+
raise ValueError(
346+
f"Action {action.name} must yield a tuple of (result, state_update). "
347+
f"For all non-final results (intermediate),"
348+
f"the state update must be None"
349+
)
350+
result, state_update = item
351+
if state_update is None:
352+
count += 1
353+
if first_stream_start_time is None:
354+
first_stream_start_time = system.now()
355+
lifecycle_adapters.call_all_lifecycle_hooks_sync(
356+
"post_stream_item",
357+
item=result,
358+
item_index=count,
359+
stream_initialize_time=stream_initialize_time,
360+
first_stream_item_start_time=first_stream_start_time,
361+
action=action.name,
362+
app_id=app_id,
363+
partition_key=partition_key,
364+
sequence_id=sequence_id,
365+
)
366+
yield result, None
367+
except Exception as e:
351368
if state_update is None:
352-
if first_stream_start_time is None:
353-
first_stream_start_time = system.now()
354-
lifecycle_adapters.call_all_lifecycle_hooks_sync(
355-
"post_stream_item",
356-
item=result,
357-
item_index=count,
358-
stream_initialize_time=stream_initialize_time,
359-
first_stream_item_start_time=first_stream_start_time,
360-
action=action.name,
361-
app_id=app_id,
362-
partition_key=partition_key,
363-
sequence_id=sequence_id,
364-
)
365-
yield result, None
369+
raise
370+
logger.warning(
371+
"Streaming action '%s' raised %s after yielding %d items. "
372+
"Proceeding with final state from generator cleanup. Original error: %s",
373+
action.name,
374+
type(e).__name__,
375+
count,
376+
e,
377+
exc_info=True,
378+
)
366379

367380
if state_update is None:
368381
raise ValueError(
@@ -391,31 +404,45 @@ async def _arun_single_step_streaming_action(
391404
result = None
392405
state_update = None
393406
count = 0
394-
async for item in generator:
395-
if not isinstance(item, tuple):
396-
# TODO -- consider adding support for just returning a result.
397-
raise ValueError(
398-
f"Action {action.name} must yield a tuple of (result, state_update). "
399-
f"For all non-final results (intermediate),"
400-
f"the state update must be None"
401-
)
402-
result, state_update = item
407+
try:
408+
async for item in generator:
409+
if not isinstance(item, tuple):
410+
# TODO -- consider adding support for just returning a result.
411+
raise ValueError(
412+
f"Action {action.name} must yield a tuple of (result, state_update). "
413+
f"For all non-final results (intermediate),"
414+
f"the state update must be None"
415+
)
416+
result, state_update = item
417+
if state_update is None:
418+
count += 1
419+
if first_stream_start_time is None:
420+
first_stream_start_time = system.now()
421+
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
422+
"post_stream_item",
423+
item=result,
424+
item_index=count,
425+
stream_initialize_time=stream_initialize_time,
426+
first_stream_item_start_time=first_stream_start_time,
427+
action=action.name,
428+
app_id=app_id,
429+
partition_key=partition_key,
430+
sequence_id=sequence_id,
431+
)
432+
yield result, None
433+
except Exception as e:
403434
if state_update is None:
404-
if first_stream_start_time is None:
405-
first_stream_start_time = system.now()
406-
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
407-
"post_stream_item",
408-
item=result,
409-
item_index=count,
410-
stream_initialize_time=stream_initialize_time,
411-
first_stream_item_start_time=first_stream_start_time,
412-
action=action.name,
413-
app_id=app_id,
414-
partition_key=partition_key,
415-
sequence_id=sequence_id,
416-
)
417-
count += 1
418-
yield result, None
435+
raise
436+
logger.warning(
437+
"Streaming action '%s' raised %s after yielding %d items. "
438+
"Proceeding with final state from generator cleanup. Original error: %s",
439+
action.name,
440+
type(e).__name__,
441+
count,
442+
e,
443+
exc_info=True,
444+
)
445+
419446
if state_update is None:
420447
raise ValueError(
421448
f"Action {action.name} did not return a state update. For async actions, the last yield "
@@ -450,28 +477,42 @@ def _run_multi_step_streaming_action(
450477
result = None
451478
first_stream_start_time = None
452479
count = 0
453-
for item in generator:
454-
# We want to peek ahead so we can return the last one
455-
# This is slightly eager, but only in the case in which we
456-
# are using a multi-step streaming action
457-
next_result = result
458-
result = item
459-
if next_result is not None:
460-
if first_stream_start_time is None:
461-
first_stream_start_time = system.now()
462-
lifecycle_adapters.call_all_lifecycle_hooks_sync(
463-
"post_stream_item",
464-
item=next_result,
465-
item_index=count,
466-
stream_initialize_time=stream_initialize_time,
467-
first_stream_item_start_time=first_stream_start_time,
468-
action=action.name,
469-
app_id=app_id,
470-
partition_key=partition_key,
471-
sequence_id=sequence_id,
472-
)
473-
count += 1
474-
yield next_result, None
480+
try:
481+
for item in generator:
482+
# We want to peek ahead so we can return the last one
483+
# This is slightly eager, but only in the case in which we
484+
# are using a multi-step streaming action
485+
next_result = result
486+
result = item
487+
if next_result is not None:
488+
if first_stream_start_time is None:
489+
first_stream_start_time = system.now()
490+
lifecycle_adapters.call_all_lifecycle_hooks_sync(
491+
"post_stream_item",
492+
item=next_result,
493+
item_index=count,
494+
stream_initialize_time=stream_initialize_time,
495+
first_stream_item_start_time=first_stream_start_time,
496+
action=action.name,
497+
app_id=app_id,
498+
partition_key=partition_key,
499+
sequence_id=sequence_id,
500+
)
501+
count += 1
502+
yield next_result, None
503+
except Exception as e:
504+
if result is None:
505+
raise
506+
logger.warning(
507+
"Streaming action '%s' raised %s after yielding %d items. "
508+
"Proceeding with last yielded result for reducer. "
509+
"Note: the reducer will run on potentially partial data. Original error: %s",
510+
action.name,
511+
type(e).__name__,
512+
count,
513+
e,
514+
exc_info=True,
515+
)
475516
state_update = _run_reducer(action, state, result, action.name)
476517
_validate_result(result, action.name, action.schema)
477518
_validate_reducer_writes(action, state_update, action.name)
@@ -494,28 +535,42 @@ async def _arun_multi_step_streaming_action(
494535
result = None
495536
first_stream_start_time = None
496537
count = 0
497-
async for item in generator:
498-
# We want to peek ahead so we can return the last one
499-
# This is slightly eager, but only in the case in which we
500-
# are using a multi-step streaming action
501-
next_result = result
502-
result = item
503-
if next_result is not None:
504-
if first_stream_start_time is None:
505-
first_stream_start_time = system.now()
506-
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
507-
"post_stream_item",
508-
item=next_result,
509-
stream_initialize_time=stream_initialize_time,
510-
item_index=count,
511-
first_stream_item_start_time=first_stream_start_time,
512-
action=action.name,
513-
app_id=app_id,
514-
partition_key=partition_key,
515-
sequence_id=sequence_id,
516-
)
517-
count += 1
518-
yield next_result, None
538+
try:
539+
async for item in generator:
540+
# We want to peek ahead so we can return the last one
541+
# This is slightly eager, but only in the case in which we
542+
# are using a multi-step streaming action
543+
next_result = result
544+
result = item
545+
if next_result is not None:
546+
if first_stream_start_time is None:
547+
first_stream_start_time = system.now()
548+
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
549+
"post_stream_item",
550+
item=next_result,
551+
stream_initialize_time=stream_initialize_time,
552+
item_index=count,
553+
first_stream_item_start_time=first_stream_start_time,
554+
action=action.name,
555+
app_id=app_id,
556+
partition_key=partition_key,
557+
sequence_id=sequence_id,
558+
)
559+
count += 1
560+
yield next_result, None
561+
except Exception as e:
562+
if result is None:
563+
raise
564+
logger.warning(
565+
"Streaming action '%s' raised %s after yielding %d items. "
566+
"Proceeding with last yielded result for reducer. "
567+
"Note: the reducer will run on potentially partial data. Original error: %s",
568+
action.name,
569+
type(e).__name__,
570+
count,
571+
e,
572+
exc_info=True,
573+
)
519574
state_update = _run_reducer(action, state, result, action.name)
520575
_validate_result(result, action.name, action.schema)
521576
_validate_reducer_writes(action, state_update, action.name)
@@ -1862,7 +1917,9 @@ def stream_iterate(
18621917
halt_before: Optional[Union[str, List[str]]] = None,
18631918
inputs: Optional[Dict[str, Any]] = None,
18641919
) -> Generator[
1865-
Tuple[Action, StreamingResultContainer[ApplicationStateType, Union[dict, Any]]], None, None
1920+
Tuple[Action, StreamingResultContainer[ApplicationStateType, Union[dict, Any]]],
1921+
None,
1922+
None,
18661923
]:
18671924
"""Produces an iterator that iterates through intermediate streams. You may want
18681925
to use this in something like deep research mode in which:
@@ -1905,7 +1962,11 @@ async def astream_iterate(
19051962
halt_before: Optional[Union[str, List[str]]] = None,
19061963
inputs: Optional[Dict[str, Any]] = None,
19071964
) -> AsyncGenerator[
1908-
Tuple[Action, AsyncStreamingResultContainer[ApplicationStateType, Union[dict, Any]]], None
1965+
Tuple[
1966+
Action,
1967+
AsyncStreamingResultContainer[ApplicationStateType, Union[dict, Any]],
1968+
],
1969+
None,
19091970
]:
19101971
"""Async version of stream_iterate. Produces an async generator that iterates
19111972
through intermediate streams. See stream_iterate for more details.

0 commit comments

Comments
 (0)