Skip to content

Commit ea23338

Browse files
authored
fix: flush StreamedLogAsync tail when stop() cancels the task (#754)
## Summary In `StreamedLogAsync` (`src/apify_client/_streamed_log.py`), shutdown is driven by `stop()` cancelling the streaming task. `CancelledError` unwinds out of the `async for` before the trailing `_log_buffer_content(include_last_part=True)` runs, so whatever sits in the buffer (the tail after the last 8601 marker) is silently discarded. Wrapping the loop in `try/finally` guarantees the tail is flushed even on cancel. A regression test covers the bug.
1 parent 46b4789 commit ea23338

2 files changed

Lines changed: 52 additions & 5 deletions

File tree

src/apify_client/_streamed_log.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,9 @@ async def _stream_log(self) -> None:
218218
async with self._log_client.stream(raw=True) as log_stream:
219219
if not log_stream:
220220
return
221-
async for data in log_stream.aiter_bytes():
222-
self._process_new_data(data)
223-
224-
# If the stream is finished, then the last part will be also processed.
225-
self._log_buffer_content(include_last_part=True)
221+
try:
222+
async for data in log_stream.aiter_bytes():
223+
self._process_new_data(data)
224+
finally:
225+
# Flush the last buffered part even if the task is cancelled by `stop()`.
226+
self._log_buffer_content(include_last_part=True)

tests/unit/test_logging.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,10 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception(
719719
assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s'
720720

721721

722+
_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line'
723+
_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line'
724+
725+
722726
def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
723727
"""Register the minimal run and actor endpoints required by `get_streamed_log`."""
724728
httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json(
@@ -774,6 +778,48 @@ def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
774778

775779

776780
@pytest.mark.usefixtures('propagate_stream_logs')
781+
async def test_streamed_log_async_stop_flushes_buffered_tail(
782+
caplog: LogCaptureFixture,
783+
httpserver: HTTPServer,
784+
) -> None:
785+
"""Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`."""
786+
stop_emitting = threading.Event()
787+
788+
def _tail_handler(_request: Request) -> Response:
789+
def generate_logs() -> Iterator[bytes]:
790+
yield f'{_TAIL_FIRST_MESSAGE}\n'.encode()
791+
# Second marker has no trailing newline/next-marker, so it stays in the buffer.
792+
yield _TAIL_SECOND_MESSAGE.encode()
793+
# Block until the test tears the server down (or stop releases us).
794+
stop_emitting.wait(timeout=30)
795+
796+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
797+
798+
httpserver.expect_request(
799+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
800+
).respond_with_handler(_tail_handler)
801+
_register_run_and_actor_endpoints(httpserver)
802+
803+
api_url = httpserver.url_for('/').removesuffix('/')
804+
run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
805+
streamed_log = await run_client.get_streamed_log()
806+
807+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
808+
809+
try:
810+
with caplog.at_level(logging.DEBUG, logger=logger_name):
811+
async with streamed_log:
812+
# Wait long enough for both chunks to arrive and be processed.
813+
await asyncio.sleep(1)
814+
# Context exit calls stop(), which cancels the task mid-stream.
815+
finally:
816+
stop_emitting.set()
817+
818+
messages = [record.message for record in caplog.records]
819+
assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}'
820+
assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}'
821+
822+
777823
def test_streamed_log_sync_stop_does_not_hang_on_silent_stream(
778824
httpserver: HTTPServer,
779825
monkeypatch: pytest.MonkeyPatch,

0 commit comments

Comments
 (0)