Skip to content

Commit 99279c5

Browse files
committed
fix: prevent StreamedLog stop() from hanging on a silent stream
1 parent e794411 commit 99279c5

2 files changed

Lines changed: 114 additions & 12 deletions

File tree

src/apify_client/_streamed_log.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import re
66
import threading
77
from asyncio import Task
8-
from datetime import UTC, datetime
8+
from datetime import UTC, datetime, timedelta
99
from threading import Thread
10-
from typing import TYPE_CHECKING, Self, cast
10+
from typing import TYPE_CHECKING, ClassVar, Self, cast
1111

1212
from apify_client._docs import docs_group
1313

@@ -90,6 +90,10 @@ class StreamedLog(StreamedLogBase):
9090
call `start` and `stop` manually. Obtain an instance via `RunClient.get_streamed_log`.
9191
"""
9292

93+
# Caps how long `iter_bytes()` can block on a silent stream so `stop()` can unblock within
94+
# this window instead of waiting for the long-polling default.
95+
_read_timeout: ClassVar[timedelta] = timedelta(seconds=30)
96+
9397
def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_start: bool = True) -> None:
9498
"""Initialize `StreamedLog`.
9599
@@ -138,17 +142,17 @@ def __exit__(
138142
self.stop()
139143

140144
def _stream_log(self) -> None:
141-
with self._log_client.stream(raw=True) as log_stream:
145+
with self._log_client.stream(raw=True, timeout=self._read_timeout) as log_stream:
142146
if not log_stream:
143147
return
144-
for data in log_stream.iter_bytes():
145-
self._process_new_data(data)
146-
if self._stop_logging:
147-
break
148-
149-
# If the stream is finished, then the last part will be also processed.
150-
self._log_buffer_content(include_last_part=True)
151-
return
148+
try:
149+
for data in log_stream.iter_bytes():
150+
self._process_new_data(data)
151+
if self._stop_logging:
152+
break
153+
finally:
154+
# Flush the last buffered part even if the read timed out or was stopped.
155+
self._log_buffer_content(include_last_part=True)
152156

153157

154158
@docs_group('Other')

tests/unit/test_logging.py

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import json
55
import logging
6+
import threading
67
import time
78
from datetime import datetime, timedelta
89
from typing import TYPE_CHECKING
@@ -14,7 +15,7 @@
1415
from apify_client import ApifyClient, ApifyClientAsync
1516
from apify_client._logging import RedirectLogFormatter
1617
from apify_client._status_message_watcher import StatusMessageWatcherBase
17-
from apify_client._streamed_log import StreamedLogBase
18+
from apify_client._streamed_log import StreamedLog, StreamedLogBase
1819

1920
if TYPE_CHECKING:
2021
from collections.abc import Iterator
@@ -716,3 +717,100 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception(
716717
elapsed = time.monotonic() - start
717718

718719
assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s'
720+
721+
722+
def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None:
723+
"""Register the minimal run and actor endpoints required by `get_streamed_log`."""
724+
httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json(
725+
{
726+
'data': {
727+
'id': _MOCKED_RUN_ID,
728+
'actId': _MOCKED_ACTOR_ID,
729+
'userId': 'test_user_id',
730+
'startedAt': '2019-11-30T07:34:24.202Z',
731+
'finishedAt': '2019-12-12T09:30:12.202Z',
732+
'status': 'RUNNING',
733+
'statusMessage': 'Running',
734+
'isStatusMessageTerminal': False,
735+
'meta': {'origin': 'WEB'},
736+
'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1},
737+
'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048},
738+
'buildId': 'test_build_id',
739+
'generalAccess': 'RESTRICTED',
740+
'defaultKeyValueStoreId': 'test_kvs_id',
741+
'defaultDatasetId': 'test_dataset_id',
742+
'defaultRequestQueueId': 'test_rq_id',
743+
'buildNumber': '0.0.1',
744+
'containerUrl': 'https://test.runs.apify.net',
745+
}
746+
}
747+
)
748+
httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json(
749+
{
750+
'data': {
751+
'id': _MOCKED_ACTOR_ID,
752+
'userId': 'test_user_id',
753+
'name': _MOCKED_ACTOR_NAME,
754+
'username': 'test_user',
755+
'isPublic': False,
756+
'createdAt': '2019-07-08T11:27:57.401Z',
757+
'modifiedAt': '2019-07-08T14:01:05.546Z',
758+
'stats': {
759+
'totalBuilds': 0,
760+
'totalRuns': 0,
761+
'totalUsers': 0,
762+
'totalUsers7Days': 0,
763+
'totalUsers30Days': 0,
764+
'totalUsers90Days': 0,
765+
'totalMetamorphs': 0,
766+
'lastRunStartedAt': '2019-07-08T14:01:05.546Z',
767+
},
768+
'versions': [],
769+
'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048},
770+
'deploymentKey': 'test_key',
771+
}
772+
}
773+
)
774+
775+
776+
@pytest.mark.usefixtures('propagate_stream_logs')
777+
def test_streamed_log_sync_stop_does_not_hang_on_silent_stream(
778+
httpserver: HTTPServer,
779+
monkeypatch: pytest.MonkeyPatch,
780+
) -> None:
781+
"""Verify `stop()` returns promptly even when the underlying stream is silent (no chunks)."""
782+
# Shorten the read timeout so the test doesn't wait for the production default.
783+
monkeypatch.setattr(StreamedLog, '_read_timeout', timedelta(seconds=1))
784+
785+
release_server = threading.Event()
786+
787+
def _silent_handler(_request: Request) -> Response:
788+
def generate_logs() -> Iterator[bytes]:
789+
# Yield an empty chunk so werkzeug flushes headers and the client sees a streaming
790+
# response; then block without emitting any log data.
791+
yield b''
792+
release_server.wait(timeout=30)
793+
794+
return Response(response=generate_logs(), status=200, mimetype='application/octet-stream')
795+
796+
httpserver.expect_request(
797+
f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true'
798+
).respond_with_handler(_silent_handler)
799+
_register_run_and_actor_endpoints(httpserver)
800+
801+
api_url = httpserver.url_for('/').removesuffix('/')
802+
run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID)
803+
streamed_log = run_client.get_streamed_log()
804+
805+
streamed_log.start()
806+
try:
807+
# Give the streaming thread time to start and block inside iter_bytes.
808+
time.sleep(0.3)
809+
810+
# Call stop() from a helper thread so the test cannot hang indefinitely if the fix regresses.
811+
stop_thread = threading.Thread(target=streamed_log.stop)
812+
stop_thread.start()
813+
stop_thread.join(timeout=5)
814+
assert not stop_thread.is_alive(), 'stop() hangs when the underlying stream is silent'
815+
finally:
816+
release_server.set()

0 commit comments

Comments
 (0)