-
Notifications
You must be signed in to change notification settings - Fork 148
Expand file tree
/
Copy pathtest_dapr_runner.py
More file actions
138 lines (100 loc) · 4.51 KB
/
Copy pathtest_dapr_runner.py
File metadata and controls
138 lines (100 loc) · 4.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import subprocess
import time
from pathlib import Path
from typing import IO
import pytest
from tests.examples.conftest import DaprRunner
class FakeProcess:
def __init__(self, output: str, returncode: int = 1) -> None:
self.stdout = iter(output.splitlines(keepends=True))
self.returncode = returncode
def poll(self) -> int:
return self.returncode
def wait(self, timeout: int | None = None) -> int:
return self.returncode
class FakeBackgroundProcess:
"""Stand-in for a ``dapr run`` background process started via ``start()``.
A real background sidecar writes to the file object passed as ``stdout`` and
keeps running (``poll()`` returns ``None``); a sidecar that died on a port
bind has exited (``poll()`` returns a non-``None`` code).
"""
def __init__(self, output: str, returncode: int | None, stdout: IO[str]) -> None:
stdout.write(output)
stdout.flush()
self._returncode = returncode
def poll(self) -> int | None:
return self._returncode
def wait(self, timeout: int | None = None) -> int:
return self._returncode or 0
def test_run_retries_transient_dapr_port_bind_failure(
monkeypatch, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
outputs = [
(
'level=error msg="Failed to listen for gRPC server on TCP address :33223 '
'with error: listen tcp :33223: bind: address already in use"\n'
'level=fatal msg="Fatal error from runtime: failed to start internal gRPC '
'server: could not listen on any endpoint"\n'
),
"{'secretKey': 'secretValue'}\n",
]
popen_calls = []
def fake_popen(*args, **kwargs) -> FakeProcess:
popen_calls.append((args, kwargs))
return FakeProcess(outputs.pop(0))
monkeypatch.setattr(subprocess, 'Popen', fake_popen)
sleeps: list[int] = []
monkeypatch.setattr(time, 'sleep', sleeps.append)
output = DaprRunner(tmp_path).run('--app-id=secretsapp -- python3 example.py', timeout=1)
assert output == "{'secretKey': 'secretValue'}\n"
assert len(popen_calls) == 2
assert sleeps == [1]
assert (
'Dapr sidecar failed to bind a random port; retrying startup after 1s'
in capsys.readouterr().out
)
def test_run_does_not_retry_non_port_bind_failure(monkeypatch, tmp_path: Path) -> None:
popen_calls = []
def fake_popen(*args, **kwargs) -> FakeProcess:
popen_calls.append((args, kwargs))
return FakeProcess('application failed before printing expected output\n')
monkeypatch.setattr(subprocess, 'Popen', fake_popen)
output = DaprRunner(tmp_path).run('--app-id=secretsapp -- python3 example.py', timeout=1)
assert output == 'application failed before printing expected output\n'
assert len(popen_calls) == 1
PORT_BIND_FAILURE_OUTPUT = (
'level=error msg="Failed to listen for gRPC server on TCP address :38779 '
'with error: listen tcp :38779: bind: address already in use"\n'
'level=fatal msg="Fatal error from runtime: failed to start internal gRPC '
'server: could not listen on any endpoint"\n'
)
def test_start_retries_transient_dapr_port_bind_failure(
monkeypatch, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
attempts = [
(PORT_BIND_FAILURE_OUTPUT, 1),
('INFO: Application startup complete.\n', None),
]
popen_calls = []
def fake_popen(*args, **kwargs) -> FakeBackgroundProcess:
popen_calls.append((args, kwargs))
output, returncode = attempts.pop(0)
return FakeBackgroundProcess(output, returncode, kwargs['stdout'])
monkeypatch.setattr(subprocess, 'Popen', fake_popen)
sleeps: list[int] = []
monkeypatch.setattr(time, 'sleep', sleeps.append)
DaprRunner(tmp_path).start('--app-id demo-actor -- uvicorn demo:app', wait=0)
assert len(popen_calls) == 2
assert sleeps == [0, 1, 0]
assert (
'Dapr background sidecar failed to bind a random port; retrying startup after 1s'
in capsys.readouterr().out
)
def test_start_does_not_retry_non_port_bind_failure(monkeypatch, tmp_path: Path) -> None:
popen_calls = []
def fake_popen(*args, **kwargs) -> FakeBackgroundProcess:
popen_calls.append((args, kwargs))
return FakeBackgroundProcess('app crashed for an unrelated reason\n', 1, kwargs['stdout'])
monkeypatch.setattr(subprocess, 'Popen', fake_popen)
DaprRunner(tmp_path).start('--app-id demo-actor -- uvicorn demo:app', wait=0)
assert len(popen_calls) == 1