|
15 | 15 | from apify_client import ApifyClient, ApifyClientAsync |
16 | 16 | from apify_client._logging import RedirectLogFormatter |
17 | 17 | from apify_client._status_message_watcher import StatusMessageWatcherBase |
18 | | -from apify_client._streamed_log import StreamedLogBase |
| 18 | +from apify_client._streamed_log import StreamedLog, StreamedLogBase |
19 | 19 |
|
20 | 20 | if TYPE_CHECKING: |
21 | 21 | from collections.abc import Iterator |
@@ -818,3 +818,45 @@ def generate_logs() -> Iterator[bytes]: |
818 | 818 | messages = [record.message for record in caplog.records] |
819 | 819 | assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}' |
820 | 820 | assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}' |
| 821 | + |
| 822 | + |
| 823 | +def test_streamed_log_sync_stop_does_not_hang_on_silent_stream( |
| 824 | + httpserver: HTTPServer, |
| 825 | + monkeypatch: pytest.MonkeyPatch, |
| 826 | +) -> None: |
| 827 | + """Verify `stop()` returns promptly even when the underlying stream is silent (no chunks).""" |
| 828 | + # Shorten the read timeout so the test doesn't wait for the production default. |
| 829 | + monkeypatch.setattr(StreamedLog, '_read_timeout', timedelta(seconds=1)) |
| 830 | + |
| 831 | + release_server = threading.Event() |
| 832 | + |
| 833 | + def _silent_handler(_request: Request) -> Response: |
| 834 | + def generate_logs() -> Iterator[bytes]: |
| 835 | + # Yield an empty chunk so werkzeug flushes headers and the client sees a streaming |
| 836 | + # response; then block without emitting any log data. |
| 837 | + yield b'' |
| 838 | + release_server.wait(timeout=30) |
| 839 | + |
| 840 | + return Response(response=generate_logs(), status=200, mimetype='application/octet-stream') |
| 841 | + |
| 842 | + httpserver.expect_request( |
| 843 | + f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true' |
| 844 | + ).respond_with_handler(_silent_handler) |
| 845 | + _register_run_and_actor_endpoints(httpserver) |
| 846 | + |
| 847 | + api_url = httpserver.url_for('/').removesuffix('/') |
| 848 | + run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID) |
| 849 | + streamed_log = run_client.get_streamed_log() |
| 850 | + |
| 851 | + streamed_log.start() |
| 852 | + try: |
| 853 | + # Give the streaming thread time to start and block inside iter_bytes. |
| 854 | + time.sleep(0.3) |
| 855 | + |
| 856 | + # Call stop() from a helper thread so the test cannot hang indefinitely if the fix regresses. |
| 857 | + stop_thread = threading.Thread(target=streamed_log.stop) |
| 858 | + stop_thread.start() |
| 859 | + stop_thread.join(timeout=5) |
| 860 | + assert not stop_thread.is_alive(), 'stop() hangs when the underlying stream is silent' |
| 861 | + finally: |
| 862 | + release_server.set() |
0 commit comments