Skip to content

Commit 231e871

Browse files
committed
review fixes
1 parent 3b39503 commit 231e871

8 files changed

Lines changed: 141 additions & 15 deletions

File tree

argocd/assets/configuration/spec.yaml

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ files:
6060
hidden: true
6161
fleet_configurable: false
6262
description: |
63-
Collect the Argo CD Prometheus/OpenMetrics endpoints, the check's
63+
Collect the Argo CD Prometheus (OpenMetrics) endpoints, the check's
6464
default metrics. Enabled by default. Set to false to run only the
6565
generic resources collector (``collect_genresources``) without scraping
6666
metrics; in that mode the ``*_endpoint`` options are not required.
@@ -134,7 +134,10 @@ files:
134134
description: |
135135
Seconds between full Application scrapes, in both streaming and polling
136136
modes. A full scrape re-submits every Application to refresh its
137-
time-to-live (TTL) and backfill anything the stream missed. Minimum of 1.
137+
time-to-live (TTL) and backfill anything the stream missed. While
138+
streaming, the dedup poll is disabled, so during a stream outage a newly
139+
created Application is not collected until the next full scrape -- this
140+
value is the worst-case discovery lag. Minimum of 1.
138141
value:
139142
type: integer
140143
example: 600
@@ -232,6 +235,19 @@ files:
232235
type: integer
233236
example: 30
234237
minimum: 1
238+
- name: genresources_stream_read_timeout_seconds
239+
hidden: true
240+
fleet_configurable: false
241+
description: |
242+
Socket read timeout, in seconds, for the application watch stream. Argo CD
243+
does not send keepalives on an idle stream, so this is effectively how long
244+
an idle but healthy connection is held before it is treated as dead and
245+
reconnected. Larger values reduce reconnect churn on low-traffic instances;
246+
smaller values detect a silently dropped connection sooner. Minimum of 1.
247+
value:
248+
type: integer
249+
example: 300
250+
minimum: 1
235251
- template: instances/openmetrics
236252
overrides:
237253
openmetrics_endpoint.required: false

argocd/datadog_checks/argocd/config_models/defaults.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ def instance_genresources_stream_backoff_max_seconds():
9292
return 30
9393

9494

95+
def instance_genresources_stream_read_timeout_seconds():
96+
return 300
97+
98+
9599
def instance_genresources_ttl_seconds():
96100
return 1800
97101

argocd/datadog_checks/argocd/config_models/instance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ class InstanceConfig(BaseModel):
126126
genresources_repository_scrape_interval_seconds: Optional[int] = Field(None, ge=1)
127127
genresources_stream_applications_enabled: Optional[bool] = None
128128
genresources_stream_backoff_max_seconds: Optional[int] = Field(None, ge=1)
129+
genresources_stream_read_timeout_seconds: Optional[int] = Field(None, ge=1)
129130
genresources_ttl_seconds: Optional[int] = Field(None, ge=1)
130131
headers: Optional[MappingProxyType[str, Any]] = None
131132
histogram_buckets_as_distributions: Optional[bool] = None

argocd/datadog_checks/argocd/resources.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ def __init__(self, check: "ArgocdCheck") -> None:
225225
self._submitted: dict[str, str] = {}
226226
self._stream_enabled: bool = bool(config.genresources_stream_applications_enabled)
227227
self._backoff_max: int = config.genresources_stream_backoff_max_seconds
228+
self._stream_read_timeout: int = config.genresources_stream_read_timeout_seconds
228229
self._app_poll_interval: int = config.genresources_application_poll_interval_seconds
229230
self._app_full_scrape_interval: int = config.genresources_application_full_scrape_interval_seconds
230231
self._cluster_scrape_interval: int = config.genresources_cluster_scrape_interval_seconds
@@ -266,8 +267,8 @@ def collect(self) -> None:
266267
return
267268
now = int(time.time())
268269
if self._stream_enabled:
269-
self._ensure_listener()
270-
connected = self._listener is not None and self._listener.is_connected()
270+
listener = self._ensure_listener()
271+
connected = listener is not None and listener.is_connected()
271272
self.check.gauge(GENRESOURCES_STREAM_UP_METRIC, 1 if connected else 0)
272273
self._collect_due_types(now)
273274

@@ -299,17 +300,18 @@ def _collect_due_types(self, now: int) -> None:
299300
self._last_repository_scrape = now
300301
self._collect_type(REPOSITORY_SPEC, seen_at=now, expire_at=expire_at, force_full=True)
301302

302-
def _ensure_listener(self) -> None:
303+
def _ensure_listener(self) -> ArgocdApplicationStreamListener | None:
303304
with self._listener_lock:
304305
if self._stopped:
305-
return
306+
return None
306307
if self._listener is None:
307308
self._listener = ArgocdApplicationStreamListener(
308309
self.check,
309310
self,
310311
endpoint=self._endpoint,
311312
auth_token=self._auth_token,
312313
backoff_max_seconds=self._backoff_max,
314+
read_timeout_seconds=self._stream_read_timeout,
313315
http=RequestsWrapper(
314316
self.check.instance,
315317
self.check.init_config,
@@ -319,6 +321,7 @@ def _ensure_listener(self) -> None:
319321
)
320322
if not self._listener.is_alive():
321323
self._listener.start()
324+
return self._listener
322325

323326
def stop(self) -> None:
324327
"""Signal the stream listener to stop; must not block, per AgentCheck.cancel()'s contract."""
@@ -329,7 +332,10 @@ def stop(self) -> None:
329332
listener.cancel()
330333

331334
def emit_stream_application(self, application: dict) -> None:
332-
"""Emit a single application received from the stream (ADDED/MODIFIED) through the shared pipeline."""
335+
"""Emit a single application received from the stream (ADDED/MODIFIED) through the shared pipeline.
336+
337+
Mutates ``application`` in place (credential sanitization); the caller must not reuse the dict after.
338+
"""
333339
seen_at = int(time.time())
334340
self._emit_item(
335341
application, APPLICATION_SPEC, seen_at=seen_at, expire_at=seen_at + self._ttl_seconds, force_full=False
@@ -382,13 +388,18 @@ def _collect_type(self, spec: ResourceTypeSpec, *, seen_at: int, expire_at: int,
382388
cache_key = self._emit_item(item, spec, seen_at=seen_at, expire_at=expire_at, force_full=force_full)
383389
if cache_key is not None:
384390
seen.add(cache_key)
391+
# A stream frame that lands between _fetch above and this prune isn't in `seen`, so its fresh cache
392+
# entry is dropped here. Harmless: the app was already submitted; it just gets one redundant re-submit
393+
# on its next stream frame. Holding the lock across the fetch would stall real-time emits instead.
385394
namespace = f"{spec.resource_type}{KEY_SEPARATOR}"
386395
with self._submitted_lock:
387396
self._submitted = {k: v for k, v in self._submitted.items() if not k.startswith(namespace) or k in seen}
388397

389398
def _fetch(self, api_path: str) -> list[dict]:
390399
url = self._endpoint.rstrip("/") + api_path
391-
response = self.check.http.get(url, headers=auth_headers(self._auth_token))
400+
# extra_headers (not headers) so a dedicated token is *merged* into the wrapper's configured headers;
401+
# passing headers= would shadow them and drop inherited auth when genresources_auth_token is unset.
402+
response = self.check.http.get(url, extra_headers=auth_headers(self._auth_token))
392403
response.raise_for_status()
393404
payload = response.json()
394405
return list(payload.get("items") or [])

argocd/datadog_checks/argocd/stream_listener.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
STREAM_PATH = "/api/v1/stream/applications"
2828
CONNECT_TIMEOUT_SECONDS = 10
29-
READ_TIMEOUT_SECONDS = 60
3029
INITIAL_BACKOFF_SECONDS = 1
3130

3231

@@ -39,6 +38,10 @@ class ArgocdApplicationStreamListener:
3938
shutdown path is ``cancel()`` (signal the loop and close the socket to unblock ``iter_lines``); it does not
4039
block, and callers may ``join()`` afterwards to wait. ``daemon=True`` is only a backstop for a hard
4140
interpreter exit that never calls ``cancel()``.
41+
42+
Metrics (``count``) and generic resources (``submit_generic_resource`` via the collector) are submitted
43+
directly from this thread; the Datadog aggregator tolerates concurrent submission (same pattern as
44+
``DBMAsyncJob``), and submitting inline is what makes stream updates near-real-time.
4245
"""
4346

4447
def __init__(
@@ -49,6 +52,7 @@ def __init__(
4952
endpoint: str,
5053
auth_token: str | None,
5154
backoff_max_seconds: int,
55+
read_timeout_seconds: int,
5256
http: "RequestsWrapper",
5357
) -> None:
5458
self.check = check
@@ -57,6 +61,7 @@ def __init__(
5761
self._endpoint = endpoint.rstrip("/")
5862
self._auth_token = auth_token
5963
self._backoff_max = max(1, backoff_max_seconds)
64+
self._read_timeout = max(1, read_timeout_seconds)
6065
self._stop = threading.Event()
6166
self._connected = threading.Event()
6267
self._thread: threading.Thread | None = None
@@ -79,6 +84,9 @@ def is_connected(self) -> bool:
7984
def cancel(self) -> None:
8085
"""Signal the loop to stop and close the active connection to unblock ``iter_lines``. Does not block."""
8186
self._stop.set()
87+
# If cancel lands in the narrow window between _stream_once's get() returning and its
88+
# ``self._response = response``, we see None and skip close(); the thread still exits on its next
89+
# _stop check, at worst after the read timeout. Bounded shutdown latency on a daemon thread, not a leak.
8290
response = self._response
8391
if response is not None:
8492
try:
@@ -119,8 +127,10 @@ def _stream_once(self) -> bool:
119127
url = self._endpoint + STREAM_PATH
120128
kwargs: dict = {
121129
"stream": True,
122-
"timeout": (CONNECT_TIMEOUT_SECONDS, READ_TIMEOUT_SECONDS),
123-
"headers": auth_headers(self._auth_token),
130+
"timeout": (CONNECT_TIMEOUT_SECONDS, self._read_timeout),
131+
# extra_headers (not headers): merge the token into the wrapper's configured headers rather than
132+
# shadowing them, so inherited auth survives when genresources_auth_token is unset.
133+
"extra_headers": auth_headers(self._auth_token),
124134
}
125135
got_data = False
126136
response = None

argocd/tests/test_argocd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ def test_app_controller_service_check(dd_run_check, aggregator, mock_http_respon
105105

106106

107107
def test_both_collectors_disabled_raises(dd_run_check):
108+
check = ArgocdCheck('argocd', {}, [{"collect_openmetrics": False, "collect_genresources": False}])
108109
with pytest.raises(Exception, match="Enable at least one of"):
109-
check = ArgocdCheck('argocd', {}, [{"collect_openmetrics": False, "collect_genresources": False}])
110110
dd_run_check(check)
111111

112112

argocd/tests/test_resources.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
GENRESOURCES_API_UP_METRIC,
1515
REPOSITORY_INCLUDE,
1616
)
17+
from datadog_checks.base.utils.http import RequestsWrapper
1718
from datadog_checks.dev.http import MockResponse
1819

1920
from .common import _check as _make_check
@@ -636,3 +637,23 @@ def test_collector_warns_when_exclude_empties_a_type(caplog):
636637
def test_collector_warns_when_ttl_shorter_than_longest_scrape_interval(caplog):
637638
_check(genresources_ttl_seconds=100, genresources_application_full_scrape_interval_seconds=600)
638639
assert any("shorter than the longest scrape interval" in rec.message for rec in caplog.records)
640+
641+
642+
def test_fetch_adds_bearer_via_extra_headers_so_configured_auth_is_preserved():
643+
check = _check(genresources_auth_token="tok")
644+
with patch.object(RequestsWrapper, "get", return_value=MockResponse(json_data={"items": []})) as get:
645+
check._resource_collector._fetch("/api/v1/applications")
646+
647+
kwargs = get.call_args.kwargs
648+
assert kwargs.get("extra_headers") == {"Authorization": "Bearer tok"} # merged into the wrapper's headers
649+
assert "headers" not in kwargs # never pass `headers`; it would shadow the wrapper's configured auth
650+
651+
652+
def test_fetch_inherits_wrapper_auth_when_no_genresources_token():
653+
check = _check(genresources_auth_token=None) # rely on the instance's HTTP auth (headers / auth_token)
654+
with patch.object(RequestsWrapper, "get", return_value=MockResponse(json_data={"items": []})) as get:
655+
check._resource_collector._fetch("/api/v1/applications")
656+
657+
kwargs = get.call_args.kwargs
658+
assert "headers" not in kwargs # must not clobber the wrapper's configured headers
659+
assert not kwargs.get("extra_headers") # nothing added -> the inherited auth headers survive

argocd/tests/test_resources_stream.py

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def _listener(collector, *, check=None) -> ArgocdApplicationStreamListener:
4040
endpoint=GENRESOURCES_ENDPOINT,
4141
auth_token=None,
4242
backoff_max_seconds=10,
43+
read_timeout_seconds=60,
4344
http=Mock(),
4445
)
4546

@@ -174,6 +175,17 @@ def test_ensure_listener_gives_the_listener_its_own_http_session():
174175
assert http is not check.http # a dedicated RequestsWrapper, not the check's shared session
175176

176177

178+
def test_ensure_listener_passes_the_configured_read_timeout():
179+
check = _check(genresources_stream_applications_enabled=True, genresources_stream_read_timeout_seconds=123)
180+
collector = check._resource_collector
181+
182+
with patch("datadog_checks.argocd.resources.ArgocdApplicationStreamListener") as listener_cls:
183+
listener_cls.return_value.is_alive.return_value = False
184+
collector._ensure_listener()
185+
186+
assert listener_cls.call_args.kwargs["read_timeout_seconds"] == 123
187+
188+
177189
def test_collect_with_stream_emits_stream_up_one_when_the_listener_is_connected(aggregator):
178190
check = _check(genresources_stream_applications_enabled=True)
179191
collector = check._resource_collector
@@ -263,7 +275,13 @@ def close(self):
263275
http = Mock()
264276
http.get.return_value = _Resp()
265277
listener = ArgocdApplicationStreamListener(
266-
check, collector, endpoint=GENRESOURCES_ENDPOINT, auth_token=None, backoff_max_seconds=10, http=http
278+
check,
279+
collector,
280+
endpoint=GENRESOURCES_ENDPOINT,
281+
auth_token=None,
282+
backoff_max_seconds=10,
283+
read_timeout_seconds=60,
284+
http=http,
267285
)
268286

269287
with patch.object(check, "submit_generic_resource") as submit:
@@ -293,7 +311,13 @@ def close(self):
293311
http = Mock()
294312
http.get.return_value = _Resp()
295313
listener = ArgocdApplicationStreamListener(
296-
check, collector, endpoint=GENRESOURCES_ENDPOINT, auth_token=None, backoff_max_seconds=10, http=http
314+
check,
315+
collector,
316+
endpoint=GENRESOURCES_ENDPOINT,
317+
auth_token=None,
318+
backoff_max_seconds=10,
319+
read_timeout_seconds=60,
320+
http=http,
297321
)
298322

299323
with patch.object(check, "submit_generic_resource"):
@@ -302,6 +326,39 @@ def close(self):
302326
assert got_data is True # data arrived before the drop; _stream_once caught the error and reported it
303327

304328

329+
def test_stream_once_inherits_wrapper_auth_when_no_token():
330+
check = _check(genresources_stream_applications_enabled=True, genresources_auth_token=None)
331+
collector = check._resource_collector
332+
333+
class _Resp:
334+
def raise_for_status(self):
335+
pass
336+
337+
def iter_lines(self):
338+
return iter(())
339+
340+
def close(self):
341+
pass
342+
343+
http = Mock()
344+
http.get.return_value = _Resp()
345+
listener = ArgocdApplicationStreamListener(
346+
check,
347+
collector,
348+
endpoint=GENRESOURCES_ENDPOINT,
349+
auth_token=None,
350+
backoff_max_seconds=10,
351+
read_timeout_seconds=60,
352+
http=http,
353+
)
354+
355+
listener._stream_once()
356+
357+
kwargs = http.get.call_args.kwargs
358+
assert "headers" not in kwargs # must not clobber the wrapper's configured auth
359+
assert not kwargs.get("extra_headers") # no token -> nothing added, inherited auth survives
360+
361+
305362
def test_handle_line_emits_events_received_metric_matching_metadata(aggregator):
306363
check = _check(genresources_stream_applications_enabled=True)
307364
collector = check._resource_collector
@@ -357,7 +414,13 @@ def close(self):
357414
http = Mock()
358415
http.get.return_value = _Resp()
359416
listener = ArgocdApplicationStreamListener(
360-
check, collector, endpoint=GENRESOURCES_ENDPOINT, auth_token=None, backoff_max_seconds=1, http=http
417+
check,
418+
collector,
419+
endpoint=GENRESOURCES_ENDPOINT,
420+
auth_token=None,
421+
backoff_max_seconds=1,
422+
read_timeout_seconds=60,
423+
http=http,
361424
)
362425

363426
listener.start()

0 commit comments

Comments
 (0)