Skip to content

Commit c15cb1b

Browse files
authored
fix: prevent StreamedLog stop() from hanging on a silent stream (#825)
## Summary In `StreamedLog` (`src/apify_client/_streamed_log.py`), `stop()` sets `_stop_logging = True` and then `join()`s the streaming thread. The thread is parked inside `iter_bytes()` on a blocking socket read, so the flag isn't observed until the next chunk arrives or the long-polling server-side timeout (~360s) elapses — `stop()` can block for minutes on a quiet actor. Passing a `_read_timeout` (30s) to `_log_client.stream()` caps how long `iter_bytes()` can block, so `stop()` unblocks within that window. The timeout is exposed as a `ClassVar[timedelta]` so it can be tuned (or shortened in tests) without monkeypatching a module global. The loop is also wrapped in `try/finally` so the buffered tail is flushed even if a read times out. Tradeoff: the sync stream now ends if the actor goes silent for more than 30s (vs. the prior 360s). This is documented on the class attribute and is the intended cost of bounded `stop()`. A regression test covers the bug.
1 parent e794411 commit c15cb1b

2 files changed

Lines changed: 114 additions & 12 deletions

File tree

src/apify_client/_streamed_log.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import re
66
import threading
77
from asyncio import Task
8-
from datetime import UTC, datetime
8+
from datetime import UTC, datetime, timedelta
99
from threading import Thread
10-
from typing import TYPE_CHECKING, Self, cast
10+
from typing import TYPE_CHECKING, ClassVar, Self, cast
1111

1212
from apify_client._docs import docs_group
1313

@@ -90,6 +90,10 @@ class StreamedLog(StreamedLogBase):
9090
call `start` and `stop` manually. Obtain an instance via `RunClient.get_streamed_log`.
9191
"""
9292

93+
# Caps how long `iter_bytes()` can block on a silent stream so `stop()` can unblock within
94+
# this window instead of waiting for the long-polling default.
95+
_read_timeout: ClassVar[timedelta] = timedelta(seconds=30)
96+
9397
def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_start: bool = True) -> None:
9498
"""Initialize `StreamedLog`.
9599
@@ -138,17 +142,17 @@ def __exit__(
138142
self.stop()
139143

140144
def _stream_log(self) -> None:
141-
with self._log_client.stream(raw=True) as log_stream:
145+
with self._log_client.stream(raw=True, timeout=self._read_timeout) as log_stream:
142146
if not log_stream:
143147
return
144-
for data in log_stream.iter_bytes():
145-
self._process_new_data(data)
146-
if self._stop_logging:
147-
break
148-
149-
# If the stream is finished, then the last part will be also processed.
150-
self._log_buffer_content(include_last_part=True)
151-
return
148+
try:
149+
for data in log_stream.iter_bytes():
150+
self._process_new_data(data)
151+
if self._stop_logging:
152+
break
153+
finally:
154+
# Flush the last buffered part even if the read timed out or was stopped.
155+
self._log_buffer_content(include_last_part=True)
152156

153157

154158
@docs_group('Other')

tests/unit/test_logging.py

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import json
55
import logging
6+
import threading
67
import time
78
from datetime import datetime, timedelta
89
from typing import TYPE_CHECKING
@@ -14,7 +15,7 @@
1415
from apify_client import ApifyClient, ApifyClientAsync
1516
from apify_client._logging import RedirectLogFormatter
1617
from apify_client._status_message_watcher import StatusMessageWatcherBase
17-
from apify_client._streamed_log import StreamedLogBase
18+
from apify_client._streamed_log import StreamedLog, StreamedLogBase
1819

1920
if TYPE_CHECKING:
2021
from collections.abc import Iterator
@@ -716,3 +717,100 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception(
716717
elapsed = time.monotonic() - start
717718

718719
assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s'
720+
721+
722+
def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
723+
"""Register the minimal run and actor endpoints required by `get_streamed_log`."""
724+
httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json(
725+
{
726+
'data': {
727+
'id': _MOCKED_RUN_ID,
728+
'actId': _MOCKED_ACTOR_ID,
729+
'userId': 'test_user_id',
730+
'startedAt': '2019-11-30T07:34:24.202Z',
731+
'finishedAt': '2019-12-12T09:30:12.202Z',
732+
'status': 'RUNNING',
733+
'statusMessage': 'Running',
734+
'isStatusMessageTerminal': False,
735+
'meta': {'origin': 'WEB'},
736+
'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1},
737+
'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048},
738+
'buildId': 'test_build_id',
739+
'generalAccess': 'RESTRICTED',
740+
'defaultKeyValueStoreId': 'test_kvs_id',
741+
'defaultDatasetId': 'test_dataset_id',
742+
'defaultRequestQueueId': 'test_rq_id',
743+
'buildNumber': '0.0.1',
744+
'containerUrl': 'https://test.runs.apify.net',
745+
}
746+
}
747+
)
748+
httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json(
749+
{
750+
'data': {
751+
'id': _MOCKED_ACTOR_ID,
752+
'userId': 'test_user_id',
753+
'name': _MOCKED_ACTOR_NAME,
754+
'username': 'test_user',
755+
'isPublic': False,
756+
'createdAt': '2019-07-08T11:27:57.401Z',
757+
'modifiedAt': '2019-07-08T14:01:05.546Z',
758+
'stats': {
759+
'totalBuilds': 0,
760+
'totalRuns': 0,
761+
'totalUsers': 0,
762+
'totalUsers7Days': 0,
763+
'totalUsers30Days': 0,
764+
'totalUsers90Days': 0,
765+
'totalMetamorphs': 0,
766+
'lastRunStartedAt': '2019-07-08T14:01:05.546Z',
767+
},
768+
'versions': [],
769+
'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048},
770+
'deploymentKey': 'test_key',
771+
}
772+
}
773+
)
774+
775+
776+
@pytest.mark.usefixtures('propagate_stream_logs')
777+
def test_streamed_log_sync_stop_does_not_hang_on_silent_stream(
778+
httpserver: HTTPServer,
779+
monkeypatch: pytest.MonkeyPatch,
780+
) -> None:
781+
"""Verify `stop()` returns promptly even when the underlying stream is silent (no chunks)."""
782+
# Shorten the read timeout so the test doesn't wait for the production default.
783+
monkeypatch.setattr(StreamedLog, '_read_timeout', timedelta(seconds=1))
784+
785+
release_server = threading.Event()
786+
787+
def _silent_handler(_request: Request) -> Response:
788+
def generate_logs() -> Iterator[bytes]:
789+
# Yield an empty chunk so werkzeug flushes headers and the client sees a streaming
790+
# response; then block without emitting any log data.
791+
yield b''
792+
release_server.wait(timeout=30)
793+
794+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
795+
796+
httpserver.expect_request(
797+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
798+
).respond_with_handler(_silent_handler)
799+
_register_run_and_actor_endpoints(httpserver)
800+
801+
api_url = httpserver.url_for('/').removesuffix('/')
802+
run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
803+
streamed_log = run_client.get_streamed_log()
804+
805+
streamed_log.start()
806+
try:
807+
# Give the streaming thread time to start and block inside iter_bytes.
808+
time.sleep(0.3)
809+
810+
# Call stop() from a helper thread so the test cannot hang indefinitely if the fix regresses.
811+
stop_thread = threading.Thread(target=streamed_log.stop)
812+
stop_thread.start()
813+
stop_thread.join(timeout=5)
814+
assert not stop_thread.is_alive(), 'stop() hangs when the underlying stream is silent'
815+
finally:
816+
release_server.set()

0 commit comments

Comments
 (0)