Skip to content

Commit 8986424

Browse files
vdusekclaude
andcommitted
fix: prevent StreamedLog stop() from dropping tail or hanging on silent stream
The async variant lost the final buffered log line because the trailing `_log_buffer_content(include_last_part=True)` was skipped when `stop()` cancelled the task; wrapping the loop in try/finally ensures the tail is always flushed. The sync variant could hang in `iter_bytes()` on a silent stream — pass a 30s read timeout (overridable via the new `_read_timeout` class attribute) so `stop()` can unblock within bounded time. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e794411 commit 8986424

2 files changed

Lines changed: 167 additions & 17 deletions

File tree

src/apify_client/_streamed_log.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import re
66
import threading
77
from asyncio import Task
8-
from datetime import UTC, datetime
8+
from datetime import UTC, datetime, timedelta
99
from threading import Thread
10-
from typing import TYPE_CHECKING, Self, cast
10+
from typing import TYPE_CHECKING, ClassVar, Self, cast
1111

1212
from apify_client._docs import docs_group
1313

@@ -90,6 +90,10 @@ class StreamedLog(StreamedLogBase):
9090
call `start` and `stop` manually. Obtain an instance via `RunClient.get_streamed_log`.
9191
"""
9292

93+
# Caps how long `iter_bytes()` can block on a silent stream so `stop()` can unblock within
94+
# this window instead of waiting for the long-polling default.
95+
_read_timeout: ClassVar[timedelta] = timedelta(seconds=30)
96+
9397
def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_start: bool = True) -> None:
9498
"""Initialize `StreamedLog`.
9599
@@ -138,17 +142,17 @@ def __exit__(
138142
self.stop()
139143

140144
def _stream_log(self) -> None:
141-
with self._log_client.stream(raw=True) as log_stream:
145+
with self._log_client.stream(raw=True, timeout=self._read_timeout) as log_stream:
142146
if not log_stream:
143147
return
144-
for data in log_stream.iter_bytes():
145-
self._process_new_data(data)
146-
if self._stop_logging:
147-
break
148-
149-
# If the stream is finished, then the last part will be also processed.
150-
self._log_buffer_content(include_last_part=True)
151-
return
148+
try:
149+
for data in log_stream.iter_bytes():
150+
self._process_new_data(data)
151+
if self._stop_logging:
152+
break
153+
finally:
154+
# Flush the last buffered part even if the read timed out or was stopped.
155+
self._log_buffer_content(include_last_part=True)
152156

153157

154158
@docs_group('Other')
@@ -214,8 +218,9 @@ async def _stream_log(self) -> None:
214218
async with self._log_client.stream(raw=True) as log_stream:
215219
if not log_stream:
216220
return
217-
async for data in log_stream.aiter_bytes():
218-
self._process_new_data(data)
219-
220-
# If the stream is finished, then the last part will be also processed.
221-
self._log_buffer_content(include_last_part=True)
221+
try:
222+
async for data in log_stream.aiter_bytes():
223+
self._process_new_data(data)
224+
finally:
225+
# Flush the last buffered part even if the task is cancelled by `stop()`.
226+
self._log_buffer_content(include_last_part=True)

tests/unit/test_logging.py

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import json
55
import logging
6+
import threading
67
import time
78
from datetime import datetime, timedelta
89
from typing import TYPE_CHECKING
@@ -14,7 +15,7 @@
1415
from apify_client import ApifyClient, ApifyClientAsync
1516
from apify_client._logging import RedirectLogFormatter
1617
from apify_client._status_message_watcher import StatusMessageWatcherBase
17-
from apify_client._streamed_log import StreamedLogBase
18+
from apify_client._streamed_log import StreamedLog, StreamedLogBase
1819

1920
if TYPE_CHECKING:
2021
from collections.abc import Iterator
@@ -716,3 +717,147 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception(
716717
elapsed = time.monotonic() - start
717718

718719
assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s'
720+
721+
722+
_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line'
723+
_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line'
724+
725+
726+
def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
727+
"""Register the minimal run and actor endpoints required by `get_streamed_log`."""
728+
httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json(
729+
{
730+
'data': {
731+
'id': _MOCKED_RUN_ID,
732+
'actId': _MOCKED_ACTOR_ID,
733+
'userId': 'test_user_id',
734+
'startedAt': '2019-11-30T07:34:24.202Z',
735+
'finishedAt': '2019-12-12T09:30:12.202Z',
736+
'status': 'RUNNING',
737+
'statusMessage': 'Running',
738+
'isStatusMessageTerminal': False,
739+
'meta': {'origin': 'WEB'},
740+
'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1},
741+
'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048},
742+
'buildId': 'test_build_id',
743+
'generalAccess': 'RESTRICTED',
744+
'defaultKeyValueStoreId': 'test_kvs_id',
745+
'defaultDatasetId': 'test_dataset_id',
746+
'defaultRequestQueueId': 'test_rq_id',
747+
'buildNumber': '0.0.1',
748+
'containerUrl': 'https://test.runs.apify.net',
749+
}
750+
}
751+
)
752+
httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json(
753+
{
754+
'data': {
755+
'id': _MOCKED_ACTOR_ID,
756+
'userId': 'test_user_id',
757+
'name': _MOCKED_ACTOR_NAME,
758+
'username': 'test_user',
759+
'isPublic': False,
760+
'createdAt': '2019-07-08T11:27:57.401Z',
761+
'modifiedAt': '2019-07-08T14:01:05.546Z',
762+
'stats': {
763+
'totalBuilds': 0,
764+
'totalRuns': 0,
765+
'totalUsers': 0,
766+
'totalUsers7Days': 0,
767+
'totalUsers30Days': 0,
768+
'totalUsers90Days': 0,
769+
'totalMetamorphs': 0,
770+
'lastRunStartedAt': '2019-07-08T14:01:05.546Z',
771+
},
772+
'versions': [],
773+
'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048},
774+
'deploymentKey': 'test_key',
775+
}
776+
}
777+
)
778+
779+
780+
@pytest.mark.usefixtures('propagate_stream_logs')
781+
async def test_streamed_log_async_stop_flushes_buffered_tail(
782+
caplog: LogCaptureFixture,
783+
httpserver: HTTPServer,
784+
) -> None:
785+
"""Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`."""
786+
stop_emitting = threading.Event()
787+
788+
def _tail_handler(_request: Request) -> Response:
789+
def generate_logs() -> Iterator[bytes]:
790+
yield f'{_TAIL_FIRST_MESSAGE}\n'.encode()
791+
# Second marker has no trailing newline/next-marker, so it stays in the buffer.
792+
yield _TAIL_SECOND_MESSAGE.encode()
793+
# Block until the test tears the server down (or stop releases us).
794+
stop_emitting.wait(timeout=30)
795+
796+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
797+
798+
httpserver.expect_request(
799+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
800+
).respond_with_handler(_tail_handler)
801+
_register_run_and_actor_endpoints(httpserver)
802+
803+
api_url = httpserver.url_for('/').removesuffix('/')
804+
run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
805+
streamed_log = await run_client.get_streamed_log()
806+
807+
logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}'
808+
809+
try:
810+
with caplog.at_level(logging.DEBUG, logger=logger_name):
811+
async with streamed_log:
812+
# Wait long enough for both chunks to arrive and be processed.
813+
await asyncio.sleep(1)
814+
# Context exit calls stop(), which cancels the task mid-stream.
815+
finally:
816+
stop_emitting.set()
817+
818+
messages = [record.message for record in caplog.records]
819+
assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}'
820+
assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}'
821+
822+
823+
@pytest.mark.usefixtures('propagate_stream_logs')
824+
def test_streamed_log_sync_stop_does_not_hang_on_silent_stream(
825+
httpserver: HTTPServer,
826+
monkeypatch: pytest.MonkeyPatch,
827+
) -> None:
828+
"""Verify `stop()` returns promptly even when the underlying stream is silent (no chunks)."""
829+
# Shorten the read timeout so the test doesn't wait for the production default.
830+
monkeypatch.setattr(StreamedLog, '_read_timeout', timedelta(seconds=1))
831+
832+
release_server = threading.Event()
833+
834+
def _silent_handler(_request: Request) -> Response:
835+
def generate_logs() -> Iterator[bytes]:
836+
# Yield an empty chunk so werkzeug flushes headers and the client sees a streaming
837+
# response; then block without emitting any log data.
838+
yield b''
839+
release_server.wait(timeout=30)
840+
841+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
842+
843+
httpserver.expect_request(
844+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
845+
).respond_with_handler(_silent_handler)
846+
_register_run_and_actor_endpoints(httpserver)
847+
848+
api_url = httpserver.url_for('/').removesuffix('/')
849+
run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
850+
streamed_log = run_client.get_streamed_log()
851+
852+
streamed_log.start()
853+
try:
854+
# Give the streaming thread time to start and block inside iter_bytes.
855+
time.sleep(0.3)
856+
857+
# Call stop() from a helper thread so the test cannot hang indefinitely if the fix regresses.
858+
stop_thread = threading.Thread(target=streamed_log.stop)
859+
stop_thread.start()
860+
stop_thread.join(timeout=5)
861+
assert not stop_thread.is_alive(), 'stop() hangs when the underlying stream is silent'
862+
finally:
863+
release_server.set()

0 commit comments

Comments
 (0)