Skip to content

Commit 4101ef4

Browse files
committed
fix: flush StreamedLogAsync tail when stop() cancels the task
1 parent e794411 commit 4101ef4

2 files changed

Lines changed: 108 additions & 5 deletions

File tree

src/apify_client/_streamed_log.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,9 @@ async def _stream_log(self) -> None:
214214
async with self._log_client.stream(raw=True) as log_stream:
215215
if not log_stream:
216216
return
217-
async for data in log_stream.aiter_bytes():
218-
self._process_new_data(data)
219-
220-
# If the stream is finished, then the last part will be also processed.
221-
self._log_buffer_content(include_last_part=True)
217+
try:
218+
async for data in log_stream.aiter_bytes():
219+
self._process_new_data(data)
220+
finally:
221+
# Flush the last buffered part even if the task is cancelled by `stop()`.
222+
self._log_buffer_content(include_last_part=True)

tests/unit/test_logging.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import json
55
import logging
6+
import threading
67
import time
78
from datetime import datetime, timedelta
89
from typing import TYPE_CHECKING
@@ -716,3 +717,104 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception(
716717
elapsed = time.monotonic() - start
717718

718719
assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s'
720+
721+
722+
_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line'
723+
_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line'
724+
725+
726+
def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
727+
"""Register the minimal run and actor endpoints required by `get_streamed_log`."""
728+
httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json(
729+
{
730+
'data': {
731+
'id': _MOCKED_RUN_ID,
732+
'actId': _MOCKED_ACTOR_ID,
733+
'userId': 'test_user_id',
734+
'startedAt': '2019-11-30T07:34:24.202Z',
735+
'finishedAt': '2019-12-12T09:30:12.202Z',
736+
'status': 'RUNNING',
737+
'statusMessage': 'Running',
738+
'isStatusMessageTerminal': False,
739+
'meta': {'origin': 'WEB'},
740+
'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1},
741+
'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048},
742+
'buildId': 'test_build_id',
743+
'generalAccess': 'RESTRICTED',
744+
'defaultKeyValueStoreId': 'test_kvs_id',
745+
'defaultDatasetId': 'test_dataset_id',
746+
'defaultRequestQueueId': 'test_rq_id',
747+
'buildNumber': '0.0.1',
748+
'containerUrl': 'https://test.runs.apify.net',
749+
}
750+
}
751+
)
752+
httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json(
753+
{
754+
'data': {
755+
'id': _MOCKED_ACTOR_ID,
756+
'userId': 'test_user_id',
757+
'name': _MOCKED_ACTOR_NAME,
758+
'username': 'test_user',
759+
'isPublic': False,
760+
'createdAt': '2019-07-08T11:27:57.401Z',
761+
'modifiedAt': '2019-07-08T14:01:05.546Z',
762+
'stats': {
763+
'totalBuilds': 0,
764+
'totalRuns': 0,
765+
'totalUsers': 0,
766+
'totalUsers7Days': 0,
767+
'totalUsers30Days': 0,
768+
'totalUsers90Days': 0,
769+
'totalMetamorphs': 0,
770+
'lastRunStartedAt': '2019-07-08T14:01:05.546Z',
771+
},
772+
'versions': [],
773+
'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048},
774+
'deploymentKey': 'test_key',
775+
}
776+
}
777+
)
778+
779+
780+
@pytest.mark.usefixtures('propagate_stream_logs')
781+
async def test_streamed_log_async_stop_flushes_buffered_tail(
782+
caplog: LogCaptureFixture,
783+
httpserver: HTTPServer,
784+
) -> None:
785+
"""Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`."""
786+
stop_emitting = threading.Event()
787+
788+
def _tail_handler(_request: Request) -> Response:
789+
def generate_logs() -> Iterator[bytes]:
790+
yield f'{_TAIL_FIRST_MESSAGE}\n'.encode()
791+
# Second marker has no trailing newline/next-marker, so it stays in the buffer.
792+
yield _TAIL_SECOND_MESSAGE.encode()
793+
# Block until the test tears the server down (or stop releases us).
794+
stop_emitting.wait(timeout=30)
795+
796+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
797+
798+
httpserver.expect_request(
799+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
800+
).respond_with_handler(_tail_handler)
801+
_register_run_and_actor_endpoints(httpserver)
802+
803+
api_url = httpserver.url_for('/').removesuffix('/')
804+
run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
805+
streamed_log = await run_client.get_streamed_log()
806+
807+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
808+
809+
try:
810+
with caplog.at_level(logging.DEBUG, logger=logger_name):
811+
async with streamed_log:
812+
# Wait long enough for both chunks to arrive and be processed.
813+
await asyncio.sleep(1)
814+
# Context exit calls stop(), which cancels the task mid-stream.
815+
finally:
816+
stop_emitting.set()
817+
818+
messages = [record.message for record in caplog.records]
819+
assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}'
820+
assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}'

0 commit comments

Comments
 (0)