-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathtest_full_scan_retry.py
More file actions
285 lines (223 loc) · 9.72 KB
/
Copy pathtest_full_scan_retry.py
File metadata and controls
285 lines (223 loc) · 9.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
"""Tests for the full-scan upload retry on transient gateway/connection failures.
A `POST /orgs/<org>/full-scans` upload can fail transiently (an HTTP 502/503/504/408, a
dropped or reset connection, or a timeout) without the server having created the scan.
`Core.create_full_scan` retries the failures the SDK classifies as transient
(`APIFailure.is_transient_error()`, socketdev>=3.3.0); these tests cover the retry
decision, the loop bounds, and that the temporary brotli-compressed facts files survive
until every attempt has finished.
"""
import logging
from unittest.mock import MagicMock
import pytest
from socketdev.exceptions import (
APIAccessDenied,
APIBadGateway,
APIConnectionError,
APIFailure,
APIResourceNotFound,
APITimeout,
)
from socketdev.fullscans import FullScanMetadata
from socketsecurity.core import (
FULL_SCAN_UPLOAD_MAX_ATTEMPTS,
SOCKET_FACTS_BROTLI_FILENAME,
SOCKET_FACTS_FILENAME,
Core,
)
def _success_response():
metadata = FullScanMetadata(
id="scan-1",
created_at="2026-01-01T00:00:00Z",
updated_at="2026-01-01T00:00:00Z",
organization_id="org-1",
repository_id="repo-1",
branch="main",
html_report_url="https://socket.dev/report",
)
response = MagicMock()
response.success = True
response.data = metadata
return response
# Catch-all APIFailure as the SDK raises it for statuses without a dedicated class
# (socketdev/core/api.py); the recorded status_code drives is_transient_error().
def _catch_all_failure(status_code: int) -> APIFailure:
return APIFailure(
f"Bad Request: HTTP original_status_code:{status_code}\n"
f"Path: https://api.socket.dev/v0/orgs/org/full-scans\n\n"
f"Headers:\ncf-ray: abc123",
status_code=status_code,
)
@pytest.fixture
def core_with_mock_sdk():
# Build a Core without running org setup; we only exercise create_full_scan.
core = Core.__new__(Core)
core.sdk = MagicMock()
core.cli_config = None # skip the tier1 finalize branch
return core
@pytest.fixture(autouse=True)
def no_sleep(mocker):
return mocker.patch("socketsecurity.core.time.sleep")
def test_upload_succeeds_first_try(core_with_mock_sdk, tmp_path, no_sleep):
manifest = tmp_path / "package.json"
manifest.write_text("{}")
core_with_mock_sdk.sdk.fullscans.post.return_value = _success_response()
full_scan = core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert full_scan.id == "scan-1"
assert core_with_mock_sdk.sdk.fullscans.post.call_count == 1
no_sleep.assert_not_called()
def test_upload_retries_on_502_then_succeeds(
core_with_mock_sdk, tmp_path, no_sleep, caplog
):
manifest = tmp_path / "package.json"
manifest.write_text("{}")
core_with_mock_sdk.sdk.fullscans.post.side_effect = [
APIBadGateway(),
APIBadGateway(),
_success_response(),
]
with caplog.at_level(logging.WARNING, logger="socketdev"):
full_scan = core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert full_scan.id == "scan-1"
assert core_with_mock_sdk.sdk.fullscans.post.call_count == 3
assert no_sleep.call_count == 2 # waits before attempts 2 and 3
retry_warnings = [r for r in caplog.records if "retrying in" in r.message]
assert len(retry_warnings) == 2
assert "APIBadGateway" in retry_warnings[0].message
assert f"(attempt 2/{FULL_SCAN_UPLOAD_MAX_ATTEMPTS})" in retry_warnings[0].message
def test_upload_raises_after_exhausting_attempts(
core_with_mock_sdk, tmp_path, no_sleep
):
manifest = tmp_path / "package.json"
manifest.write_text("{}")
core_with_mock_sdk.sdk.fullscans.post.side_effect = APIBadGateway()
with pytest.raises(APIBadGateway):
core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert (
core_with_mock_sdk.sdk.fullscans.post.call_count
== FULL_SCAN_UPLOAD_MAX_ATTEMPTS
)
@pytest.mark.parametrize("status_code", [408, 503, 504])
def test_upload_retries_on_catch_all_transient_statuses(
core_with_mock_sdk, tmp_path, no_sleep, status_code
):
manifest = tmp_path / "package.json"
manifest.write_text("{}")
core_with_mock_sdk.sdk.fullscans.post.side_effect = [
_catch_all_failure(status_code),
_success_response(),
]
full_scan = core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert full_scan.id == "scan-1"
assert core_with_mock_sdk.sdk.fullscans.post.call_count == 2
@pytest.mark.parametrize("error_class", [APIConnectionError, APITimeout])
def test_upload_retries_on_connection_level_errors(
core_with_mock_sdk, tmp_path, no_sleep, error_class
):
manifest = tmp_path / "package.json"
manifest.write_text("{}")
core_with_mock_sdk.sdk.fullscans.post.side_effect = [
error_class(),
_success_response(),
]
full_scan = core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert full_scan.id == "scan-1"
assert core_with_mock_sdk.sdk.fullscans.post.call_count == 2
def test_upload_does_not_retry_on_400(core_with_mock_sdk, tmp_path, no_sleep):
manifest = tmp_path / "package.json"
manifest.write_text("{}")
core_with_mock_sdk.sdk.fullscans.post.side_effect = _catch_all_failure(400)
with pytest.raises(APIFailure):
core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert core_with_mock_sdk.sdk.fullscans.post.call_count == 1
no_sleep.assert_not_called()
@pytest.mark.parametrize(
"error_class,status_code", [(APIAccessDenied, 401), (APIResourceNotFound, 404)]
)
def test_upload_does_not_retry_on_dedicated_4xx_classes(
core_with_mock_sdk, tmp_path, no_sleep, error_class, status_code
):
manifest = tmp_path / "package.json"
manifest.write_text("{}")
core_with_mock_sdk.sdk.fullscans.post.side_effect = error_class(
status_code=status_code
)
with pytest.raises(error_class):
core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert core_with_mock_sdk.sdk.fullscans.post.call_count == 1
no_sleep.assert_not_called()
def test_upload_does_not_retry_on_error_payload(core_with_mock_sdk, tmp_path, no_sleep):
# A response that came back but reports failure (res.success False) is not transient.
manifest = tmp_path / "package.json"
manifest.write_text("{}")
failed = MagicMock()
failed.success = False
failed.message = "tarball too large"
failed.status = 200
core_with_mock_sdk.sdk.fullscans.post.return_value = failed
with pytest.raises(Exception, match="tarball too large"):
core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert core_with_mock_sdk.sdk.fullscans.post.call_count == 1
no_sleep.assert_not_called()
def test_temp_br_file_survives_retries_and_is_cleaned_after(
core_with_mock_sdk, tmp_path, no_sleep
):
# The brotli-compressed facts sibling must stay on disk across every retry attempt
# (the SDK re-reads it per call) and only be deleted once all attempts finished.
facts = tmp_path / SOCKET_FACTS_FILENAME
facts.write_text('{"components": []}')
compressed = tmp_path / SOCKET_FACTS_BROTLI_FILENAME
br_present_per_attempt = []
def post_side_effect(upload_files, *args, **kwargs):
br_present_per_attempt.append(compressed.is_file())
assert str(compressed) in upload_files
if len(br_present_per_attempt) < 3:
raise APIBadGateway()
return _success_response()
core_with_mock_sdk.sdk.fullscans.post.side_effect = post_side_effect
full_scan = core_with_mock_sdk.create_full_scan([str(facts)], MagicMock())
assert full_scan.id == "scan-1"
assert br_present_per_attempt == [True, True, True]
assert not compressed.exists() # cleaned up after the attempts finished
assert facts.is_file() # the original facts file is never touched
def test_temp_br_file_cleaned_after_exhausted_retries(
core_with_mock_sdk, tmp_path, no_sleep
):
facts = tmp_path / SOCKET_FACTS_FILENAME
facts.write_text('{"components": []}')
compressed = tmp_path / SOCKET_FACTS_BROTLI_FILENAME
core_with_mock_sdk.sdk.fullscans.post.side_effect = APIBadGateway()
with pytest.raises(APIBadGateway):
core_with_mock_sdk.create_full_scan([str(facts)], MagicMock())
assert (
core_with_mock_sdk.sdk.fullscans.post.call_count
== FULL_SCAN_UPLOAD_MAX_ATTEMPTS
)
assert not compressed.exists()
class _StubFailure(APIFailure):
"""An APIFailure whose transience is fixed, regardless of class or status code."""
def __init__(self, transient: bool):
super().__init__("stub failure")
self._transient = transient
def is_transient_error(self) -> bool:
return self._transient
@pytest.mark.parametrize("transient,expected_calls", [(True, 2), (False, 1)])
def test_retry_decision_delegates_to_sdk_classification(
core_with_mock_sdk, tmp_path, no_sleep, transient, expected_calls
):
# The CLI encodes no knowledge of the SDK's exception hierarchy or status codes:
# the retry decision is exactly APIFailure.is_transient_error(). (The transient /
# non-transient truth table itself is tested in the SDK, next to the code that
# raises the exceptions.)
manifest = tmp_path / "package.json"
manifest.write_text("{}")
core_with_mock_sdk.sdk.fullscans.post.side_effect = [
_StubFailure(transient),
_success_response(),
]
if transient:
full_scan = core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert full_scan.id == "scan-1"
else:
with pytest.raises(_StubFailure):
core_with_mock_sdk.create_full_scan([str(manifest)], MagicMock())
assert core_with_mock_sdk.sdk.fullscans.post.call_count == expected_calls