From b905c78c35c29a31a8ecd4a70a93d3fb5ac8574e Mon Sep 17 00:00:00 2001 From: Eric Weaver Date: Mon, 15 Jun 2026 18:00:19 -0400 Subject: [PATCH 1/2] Cache ignored DDIGNORE queries in incremental query metrics collector (#24056) * Properly cache and ignore DDIGNORE labeled queries * partial commit miss * remove test dashboard * Fix edge case on cache correctness --- postgres/changelog.d/24056.fixed | 1 + .../postgres/obfuscation_lookup.py | 74 +++++++++++++-- .../datadog_checks/postgres/statements_v2.py | 25 ++--- postgres/tests/test_statements_v2.py | 93 +++++++++++++++++++ .../tests/test_statements_v2_integration.py | 68 ++++++++++++++ 5 files changed, 241 insertions(+), 20 deletions(-) create mode 100644 postgres/changelog.d/24056.fixed diff --git a/postgres/changelog.d/24056.fixed b/postgres/changelog.d/24056.fixed new file mode 100644 index 0000000000000..4b1ee4bc29558 --- /dev/null +++ b/postgres/changelog.d/24056.fixed @@ -0,0 +1 @@ +Cache ignored ``/* DDIGNORE */`` queries in incremental query metrics collector to avoid repeated lookups. diff --git a/postgres/datadog_checks/postgres/obfuscation_lookup.py b/postgres/datadog_checks/postgres/obfuscation_lookup.py index 328e3c3c42937..03fcd7954dca2 100644 --- a/postgres/datadog_checks/postgres/obfuscation_lookup.py +++ b/postgres/datadog_checks/postgres/obfuscation_lookup.py @@ -25,11 +25,22 @@ class ObfuscationResult: class ObfuscationLookup: - """Two-tier LRU cache: (queryid, dbid, userid) -> query_signature -> ObfuscationResult. - - Cache hit avoids both PG text fetch and FFI obfuscation. On miss the - caller supplies raw text; we obfuscate, store both mappings, and discard - the raw text. Multiple pgss keys sharing a query_signature share one result. + """LRU cache mapping pg_stat_statements keys to obfuscated query results. + + A lookup resolves a (queryid, dbid, userid) key to one of three outcomes: + + - hit: the obfuscated result is cached, avoiding both PG text fetch and FFI + obfuscation. Stored as two tiers, key -> query_signature -> result, so + multiple keys sharing a query_signature share one result. + - miss: nothing is cached for the key; the caller must fetch its text and + pass it to :meth:`populate` to obfuscate, store, and discard the raw text. + - ignored: the key's text is known to be non-cacheable (e.g. the agent's own + /* DDIGNORE */ queries). These are neither hit nor miss; lookup skips them + so they never trigger a repeated text fetch. + + The caller decides what is non-cacheable (via :meth:`mark_ignored`); the + cache only owns storage and lifecycle. All three tiers are LRU-bounded by + ``maxsize`` and cleared for a key by :meth:`evict` when it leaves pgss. """ def __init__(self, maxsize: int, obfuscate_options: str, log_unobfuscated_queries: bool = False): @@ -39,6 +50,8 @@ def __init__(self, maxsize: int, obfuscate_options: str, log_unobfuscated_querie self._key_to_sig: OrderedDict[PgssKey, str] = OrderedDict() self._sig_to_result: OrderedDict[str, ObfuscationResult] = OrderedDict() + # Negative cache: keys we have learned resolve to nothing cacheable. + self._ignored_keys: OrderedDict[PgssKey, None] = OrderedDict() self._hits = 0 self._misses = 0 @@ -51,6 +64,10 @@ def queryid_map_size(self) -> int: def signature_map_size(self) -> int: return len(self._sig_to_result) + @property + def ignored_map_size(self) -> int: + return len(self._ignored_keys) + @property def hits(self) -> int: return self._hits @@ -64,11 +81,20 @@ def reset_stats(self): self._misses = 0 def lookup(self, keys: set[PgssKey]) -> tuple[dict[PgssKey, ObfuscationResult], set[PgssKey]]: - """Return (hits, misses) for the given pg_stat_statements row keys.""" + """Return (hits, misses) for the given pg_stat_statements row keys. + + Keys in the negative cache are excluded from both: they are neither a hit + (no result to return) nor a miss (must not be re-fetched). + """ hits: dict[PgssKey, ObfuscationResult] = {} misses: set[PgssKey] = set() + ignored = 0 for pgss_key in keys: + if pgss_key in self._ignored_keys: + self._ignored_keys.move_to_end(pgss_key) + ignored += 1 + continue sig = self._key_to_sig.get(pgss_key) if sig is not None: self._key_to_sig.move_to_end(pgss_key) @@ -82,15 +108,35 @@ def lookup(self, keys: set[PgssKey]) -> tuple[dict[PgssKey, ObfuscationResult], misses.add(pgss_key) logger.debug( - "lookup: requested=%d hits=%d misses=%d key_map=%d sig_map=%d", + "lookup: requested=%d hits=%d misses=%d ignored=%d key_map=%d sig_map=%d ignored_map=%d", len(keys), len(hits), len(misses), + ignored, len(self._key_to_sig), len(self._sig_to_result), + len(self._ignored_keys), ) return hits, misses + def mark_ignored(self, keys: set[PgssKey]) -> None: + """Record keys whose text is non-cacheable so future lookups skip them. + + The caller is responsible for deciding what is non-cacheable (e.g. the + agent's own /* DDIGNORE */ queries). Entries are forgotten via + :meth:`evict` when their key disappears from pg_stat_statements. + """ + for pgss_key in keys: + # Drop any stale positive mapping so an ignored key can never resurface as a + # hit (e.g. if its signature is later repopulated by another key after this + # negative entry is LRU-trimmed). + self._key_to_sig.pop(pgss_key, None) + self._ignored_keys[pgss_key] = None + self._ignored_keys.move_to_end(pgss_key) + if keys: + self._trim_ignored() + logger.debug("mark_ignored: added=%d ignored_map=%d", len(keys), len(self._ignored_keys)) + def populate(self, raw_texts: dict[PgssKey, str]) -> dict[PgssKey, ObfuscationResult]: """Obfuscate raw texts, store results, and return pgss_key -> ObfuscationResult.""" results: dict[PgssKey, ObfuscationResult] = {} @@ -121,11 +167,17 @@ def populate(self, raw_texts: dict[PgssKey, str]) -> dict[PgssKey, ObfuscationRe return results def evict(self, keys: set[PgssKey]) -> None: - """Remove tier-1 entries for keys evicted from pgss.""" + """Forget all state (positive and negative) for keys evicted from pgss.""" for pgss_key in keys: self._key_to_sig.pop(pgss_key, None) + self._ignored_keys.pop(pgss_key, None) if keys: - logger.debug("evict: removed=%d key_map=%d", len(keys), len(self._key_to_sig)) + logger.debug( + "evict: removed=%d key_map=%d ignored_map=%d", + len(keys), + len(self._key_to_sig), + len(self._ignored_keys), + ) def _obfuscate_single(self, raw_text: str) -> ObfuscationResult | None: try: @@ -154,3 +206,7 @@ def _trim_keys(self): def _trim_sig(self): while len(self._sig_to_result) > self._maxsize: self._sig_to_result.popitem(last=False) + + def _trim_ignored(self): + while len(self._ignored_keys) > self._maxsize: + self._ignored_keys.popitem(last=False) diff --git a/postgres/datadog_checks/postgres/statements_v2.py b/postgres/datadog_checks/postgres/statements_v2.py index 0b10853459ea1..85843b0318591 100644 --- a/postgres/datadog_checks/postgres/statements_v2.py +++ b/postgres/datadog_checks/postgres/statements_v2.py @@ -356,17 +356,20 @@ def _resolve_obfuscations( if misses: raw_texts = self._fetch_query_texts(misses) - filtered = { - pgss_key: text - for pgss_key, text in raw_texts.items() - if text and text != '' and not text.startswith('/* DDIGNORE */') - } - self._log.debug( - "resolve: fetched=%d filtered=%d for %d misses", - len(raw_texts), - len(filtered), - len(misses), - ) + filtered = {} + ignorable: set[PgssKey] = set() + for pgss_key, text in raw_texts.items(): + if not text: + continue + if text.startswith('/* DDIGNORE */'): + # We want to ignore tracking query metrics for queries with the /* DDIGNORE */ comment. + ignorable.add(pgss_key) + continue + if text == '': + continue + filtered[pgss_key] = text + if ignorable: + self._obfuscation_lookup.mark_ignored(ignorable) populated = self._populate_obfuscation_lookup(filtered) hits.update(populated) diff --git a/postgres/tests/test_statements_v2.py b/postgres/tests/test_statements_v2.py index 1f942e8d36c1c..4f4e8dc0990d5 100644 --- a/postgres/tests/test_statements_v2.py +++ b/postgres/tests/test_statements_v2.py @@ -208,6 +208,63 @@ def test_lookup_updates_lru_order(self): _, misses = lk.lookup({(2, 2, 2)}) assert (2, 2, 2) in misses + # --- negative cache (ignored keys) --- + + def test_mark_ignored_excludes_from_hits_and_misses(self): + """A negatively-cached key is neither a hit nor a miss on lookup.""" + lk = self._make_lookup() + lk.mark_ignored({(1, 1, 1)}) + hits, misses = lk.lookup({(1, 1, 1), (2, 1, 1)}) + assert (1, 1, 1) not in hits + assert (1, 1, 1) not in misses + assert misses == {(2, 1, 1)} + assert lk.ignored_map_size == 1 + + def test_ignored_keys_do_not_increment_miss_counter(self): + lk = self._make_lookup() + lk.mark_ignored({(1, 1, 1)}) + lk.reset_stats() + lk.lookup({(1, 1, 1)}) + assert lk.misses == 0 + assert lk.hits == 0 + + def test_evict_forgets_ignored_key(self): + """Evicting a vanished key clears its negative-cache entry so it can be re-evaluated.""" + lk = self._make_lookup() + lk.mark_ignored({(1, 1, 1)}) + lk.evict({(1, 1, 1)}) + assert lk.ignored_map_size == 0 + _, misses = lk.lookup({(1, 1, 1)}) + assert (1, 1, 1) in misses + + def test_ignored_keys_lru_trimmed_to_maxsize(self): + lk = self._make_lookup(maxsize=2) + lk.mark_ignored({(1, 1, 1), (2, 2, 2), (3, 3, 3)}) + assert lk.ignored_map_size == 2 + + def test_mark_ignored_drops_stale_positive_mapping(self): + """An ignored key must not resurface as a hit via a stale tier-1 mapping. + + Reproduces the case where a key keeps its tier-1 mapping after its tier-2 + signature was evicted: marking it ignored must drop the tier-1 entry so that, + even after the negative entry is trimmed and the signature is repopulated by + another key, the ignored key never produces a positive hit. + """ + lk = self._make_lookup() + # Two keys share the same normalized SQL (one signature). + lk.populate({(1, 1, 1): 'SELECT 1', (2, 1, 1): 'SELECT 1'}) + assert lk.queryid_map_size == 2 + + # Key (1, 1, 1) turns out to be ignorable; its tier-1 mapping must be dropped. + lk.mark_ignored({(1, 1, 1)}) + assert (1, 1, 1) not in lk._key_to_sig + + # The shared signature is still cached (via the other key), but the ignored key + # must not hit it. + hits, misses = lk.lookup({(1, 1, 1)}) + assert (1, 1, 1) not in hits + assert (1, 1, 1) not in misses + # --------------------------------------------------------------------------- # PostgresStatementMetricsV2 — unit tests (no live database) @@ -300,6 +357,42 @@ def test_resolve_obfuscations_partial_filter(self): assert bad_key not in result assert good_key in result + def test_resolve_obfuscations_skips_known_ddignore_keys_on_later_cycles(self): + """A DDIGNORE key is fetched once, negative-cached, then skipped (no fetch) on later cycles.""" + v2 = self._make() + ddignore_key = (1, 1, 1) + + with mock.patch.object( + v2, '_fetch_query_texts', return_value={ddignore_key: '/* DDIGNORE */ SELECT 1'} + ) as fetch: + v2._resolve_obfuscations({ddignore_key}, set()) + assert fetch.call_count == 1 + assert ddignore_key in v2._obfuscation_lookup._ignored_keys + + # Second cycle: same key changes again but is now skipped before the fetch. + result = v2._resolve_obfuscations({ddignore_key}, set()) + assert result == {} + assert fetch.call_count == 1 + + def test_resolve_obfuscations_does_not_fetch_when_all_keys_ignored(self): + """When every changed key is already negative-cached, no text fetch is issued.""" + v2 = self._make() + ddignore_key = (1, 1, 1) + v2._obfuscation_lookup.mark_ignored({ddignore_key}) + with mock.patch.object(v2, '_fetch_query_texts') as fetch: + result = v2._resolve_obfuscations({ddignore_key}, set()) + assert result == {} + fetch.assert_not_called() + + def test_resolve_obfuscations_forgets_ignored_key_when_vanished(self): + """An ignored key that vanishes from pgss is dropped from the negative cache via evict.""" + v2 = self._make() + ddignore_key = (1, 1, 1) + v2._obfuscation_lookup.mark_ignored({ddignore_key}) + with mock.patch.object(v2, '_fetch_query_texts', return_value={}): + v2._resolve_obfuscations(set(), {ddignore_key}) + assert ddignore_key not in v2._obfuscation_lookup._ignored_keys + # --- execute query cancel event --- def test_execute_query_raises_when_cancelled(self): diff --git a/postgres/tests/test_statements_v2_integration.py b/postgres/tests/test_statements_v2_integration.py index b985bed13f096..4b8fac864ec7e 100644 --- a/postgres/tests/test_statements_v2_integration.py +++ b/postgres/tests/test_statements_v2_integration.py @@ -8,6 +8,7 @@ from datadog_checks.base.utils.db.sql import compute_sql_signature from datadog_checks.postgres.statements import PG_STAT_STATEMENTS_METRICS_COLUMNS +from datadog_checks.postgres.statements_v2 import PostgresStatementMetricsV2 from .common import ( DB_NAME, @@ -518,3 +519,70 @@ def test_fqt_cache_deduplication_v2(aggregator, integration_check, dbm_instance_ assert len(matching) == 1, ( f"Expected exactly 1 FQT event across 3 cycles but got {len(matching)}; TTL cache deduplication may be broken" ) + + +# --------------------------------------------------------------------------- +# Ignored (/* DDIGNORE */) queries are learned once and never re-fetched +# --------------------------------------------------------------------------- + + +@requires_over_10 +def test_ignored_queries_do_not_cause_lookup_cycles_v2(aggregator, integration_check, dbm_instance_v2): + """The check's own /* DDIGNORE */ queries run every cycle and would otherwise trigger a query-text + fetch each time. Once classified as DDIGNORE, a key must be remembered and skipped so it never costs + another fetch on a later cycle (unless it genuinely vanished from pg_stat_statements in between).""" + conn = psycopg.connect( + host=HOST, dbname=DB_NAME, user="bob", password="bob", autocommit=True, cursor_factory=ClientCursor + ) + + check = integration_check(dbm_instance_v2) + check._connect() + + # First cycle selects and instantiates the V2 collector; capture it before installing the spy. + conn.cursor().execute("SELECT city FROM persons WHERE city = %s", ("hello",)) + run_one_check(check, cancel=False) + job = check.statement_metrics + assert isinstance(job, PostgresStatementMetricsV2) + + # Spy on the text fetch across every cycle, recording which keys were classified as DDIGNORE and + # which keys vanished from pgss before each fetch (legitimate re-fetches if they later return). + original_fetch = job._fetch_query_texts + ddignore_keys_seen: set = set() + refetched_ddignore: set = set() + vanished_before_fetch: set = set() + + def _spy(keys): + # A key already known to be DDIGNORE that is fetched again — and did not vanish in the + # meantime — means it slipped past the skip and is still costing a lookup every cycle. + replayed = (set(keys) & ddignore_keys_seen) - vanished_before_fetch + refetched_ddignore.update(replayed) + texts = original_fetch(keys) + for key, text in texts.items(): + if text and text.startswith('/* DDIGNORE */'): + ddignore_keys_seen.add(key) + return texts + + original_resolve = job._resolve_obfuscations + + def _resolve_spy(changed_pgss_keys, vanished_pgss_keys): + vanished_before_fetch.update(vanished_pgss_keys) + return original_resolve(changed_pgss_keys, vanished_pgss_keys) + + with ( + mock.patch.object(job, '_fetch_query_texts', side_effect=_spy), + mock.patch.object(job, '_resolve_obfuscations', side_effect=_resolve_spy), + ): + for _ in range(8): + conn.cursor().execute("SELECT city FROM persons WHERE city = %s", ("hello",)) + run_one_check(check, cancel=False) + + conn.close() + + if not ddignore_keys_seen: + # Some Postgres versions (e.g. 18) normalize the leading comment out of the stored + # pg_stat_statements text, so /* DDIGNORE */ queries never reach the fetch path and + # there is nothing to assert about skipping them here. + pytest.skip("No /* DDIGNORE */ queries surfaced in pg_stat_statements on this version") + assert not refetched_ddignore, ( + f"DDIGNORE keys were re-fetched on later cycles instead of being skipped: {sorted(refetched_ddignore)}" + ) From 3101d3fdfb630d0fee51fca05829a92a58d0335a Mon Sep 17 00:00:00 2001 From: Steven Yuen Date: Mon, 15 Jun 2026 22:07:39 -0400 Subject: [PATCH 2/2] Collect ArgoCD entities for applications, cluster and repos (#23917) * test argocd submission * add unique identifier * better unique identifier * allow list approach * validations * argocd to submit entity metadata * Update 23917.added * remove public plication * Update metadata.csv * use a different separator * pin dcb * remove review files * fix naming * clean up * more knobs * nits * Read genresources config from pydantic and clean up reviewer findings - Defer ArgocdResourceCollector construction to the first check() call so it observes the populated pydantic config (self.config) instead of the raw instance dict; drops the hardcoded TTL / interval / cap literals that were duplicating the spec defaults. - Read collect_genresources from self.config. - Swap the volume-cap log args to read type/fetched/cap in order and use log.exception for the fetch-failure path so tracebacks survive. - Spec wording: ArgoCD (one word) and allowlist / allowlisted per the style guide. - Tests: add a _check() helper that loads the config models and attaches the collector, add coverage for the missing-endpoint path and for credential scrubbing on cluster.connectionState.message, tighten the volume-cap assertion against the new message format, and mark the file as pytest.mark.unit. * Update argocd/datadog_checks/argocd/resources.py Co-authored-by: Kyle Neale * Update argocd/datadog_checks/argocd/resources.py Co-authored-by: Kyle Neale * Update argocd/datadog_checks/argocd/resources.py Co-authored-by: Kyle Neale * Update argocd/datadog_checks/argocd/resources.py Co-authored-by: Kyle Neale --------- Co-authored-by: Kyle-Neale --- argocd/assets/configuration/spec.yaml | 103 ++++ argocd/changelog.d/23917.added | 1 + argocd/datadog_checks/argocd/check.py | 14 +- .../argocd/config_models/defaults.py | 16 + .../argocd/config_models/instance.py | 8 + argocd/datadog_checks/argocd/resources.py | 315 ++++++++++ .../argocd/resources_constants.py | 99 ++++ argocd/metadata.csv | 1 + argocd/pyproject.toml | 2 +- argocd/tests/test_resources.py | 542 ++++++++++++++++++ 10 files changed, 1099 insertions(+), 2 deletions(-) create mode 100644 argocd/changelog.d/23917.added create mode 100644 argocd/datadog_checks/argocd/resources.py create mode 100644 argocd/datadog_checks/argocd/resources_constants.py create mode 100644 argocd/tests/test_resources.py diff --git a/argocd/assets/configuration/spec.yaml b/argocd/assets/configuration/spec.yaml index 1a4176a33383e..acde7cd14b641 100644 --- a/argocd/assets/configuration/spec.yaml +++ b/argocd/assets/configuration/spec.yaml @@ -51,11 +51,114 @@ files: - name: commit_server_endpoint description: | Endpoint exposing Commit Server's Prometheus metrics. + fleet_configurable: false value: display_default: null example: http://argocd-commit-server:8087/metrics type: string + - name: collect_genresources + hidden: true + fleet_configurable: false + description: | + Enable the generic resources pilot collector that ships ArgoCD + Applications, Clusters, and Repositories to Datadog as generic + resources. Disabled by default. + value: + type: boolean + example: false + - name: genresources_endpoint + hidden: true + fleet_configurable: false + description: | + Base URL of the ArgoCD REST API (for example, ``https://argocd.example.com``). + Required when ``collect_genresources`` is set to ``true``. + value: + display_default: null + example: https:// + type: string + - name: genresources_auth_token + hidden: true + fleet_configurable: false + description: | + Raw bearer token used to authenticate against the ArgoCD REST API. + When set, the collector adds ``Authorization: Bearer `` to + each REST request. Leave unset to inherit the request authentication + configured on the instance (for example, the structured ``auth_token`` + config object handled by the HTTP wrapper). + secret: true + value: + display_default: null + example: + type: string + - name: genresources_ttl_seconds + hidden: true fleet_configurable: false + description: | + Time-to-live in seconds applied to every emitted resource. + Resources expire ``ttl_seconds`` after the last observation. + Minimum of 1. + value: + type: integer + example: 21600 + minimum: 1 + - name: genresources_collection_interval_seconds + hidden: true + fleet_configurable: false + description: | + Minimum number of seconds between generic resources collection cycles. + The collector polls the ArgoCD API at most once per interval, + independent of the check's metrics scrape frequency, to limit load on + the ArgoCD API server in large deployments. Minimum of 1. + value: + type: integer + example: 120 + minimum: 1 + - name: genresources_max_resources_per_cycle + hidden: true + fleet_configurable: false + description: | + Maximum number of items emitted per resource type per check cycle. + When an ArgoCD API endpoint returns more than this, the excess is + dropped and a warning is logged. Applied independently to + Applications, Clusters, and Repositories. + value: + type: integer + example: 10000 + minimum: 1 + - name: genresources_extra_include_paths + hidden: true + fleet_configurable: false + description: | + Additional dotted JSON paths appended to the baseline allowlist of + every collected resource type. Only enumerated paths are shipped; + anything not listed never leaves the Agent. ``[*]`` denotes array + iteration. A path that does not match any field for a given type is + silently skipped. This list is additive; it can add fields to ship but + cannot remove baseline fields. Each path must resolve to a value, or a + list of values, not a whole object: a path that lands on an object + causes that resource to be dropped, so enumerate leaf fields rather + than their parents. + value: + type: array + items: + type: string + example: [] + - name: genresources_exclude_paths + hidden: true + fleet_configurable: false + description: | + Dotted JSON paths to remove from the baseline allowlist of every + collected resource type, applied after ``genresources_extra_include_paths``. + An entry removes any allowlisted path equal to it or nested beneath it; + for example, ``status.history`` drops every ``status.history[*]`` field + while ``status.conditions[*].message`` drops only that leaf. Use this to + stop shipping a field without a code change. Removing fields only ever + ships less, never more. + value: + type: array + items: + type: string + example: [] - template: instances/openmetrics overrides: openmetrics_endpoint.required: false diff --git a/argocd/changelog.d/23917.added b/argocd/changelog.d/23917.added new file mode 100644 index 0000000000000..9608db909fd24 --- /dev/null +++ b/argocd/changelog.d/23917.added @@ -0,0 +1 @@ +Add a collector that submits ArgoCD applications, clusters, and repositories to Datadog as generic resources. diff --git a/argocd/datadog_checks/argocd/check.py b/argocd/datadog_checks/argocd/check.py index 2a447744de798..e5279883159fa 100644 --- a/argocd/datadog_checks/argocd/check.py +++ b/argocd/datadog_checks/argocd/check.py @@ -15,6 +15,7 @@ NOTIFICATIONS_CONTROLLER_METRICS, REPO_SERVER_METRICS, ) +from .resources import ArgocdResourceCollector ( API_SERVER_NAMESPACE, @@ -40,6 +41,17 @@ def __init__(self, name, init_config, instances): super(ArgocdCheck, self).__init__(name, init_config, instances) self.check_initializations.appendleft(self.parse_config) self.check_initializations.append(self.configure_additional_transformers) + self._resource_collector: ArgocdResourceCollector | None = None + + def check(self, instance): + if self.config.collect_genresources: + if self._resource_collector is None: + self._resource_collector = ArgocdResourceCollector(self) + try: + self._resource_collector.collect() + except Exception: + self.log.exception("genresources: collection cycle failed") + super().check(instance) def parse_config(self): endpoint_configs = [ @@ -97,7 +109,7 @@ def argocd_cluster_connection_status_transformer(_metric, sample_data, _runtime_ return argocd_cluster_connection_status_transformer def configure_additional_transformers(self): - endpoints = [key for key in self.instance.keys() if "_endpoint" in key] + endpoints = [key for key in self.instance.keys() if "_endpoint" in key and self.instance[key] in self.scrapers] for endpoint in endpoints: if endpoint == "app_controller_endpoint": self.scrapers[self.instance[endpoint]].metric_transformer.add_custom_transformer( diff --git a/argocd/datadog_checks/argocd/config_models/defaults.py b/argocd/datadog_checks/argocd/config_models/defaults.py index d7fc7782360ba..f8cfa47506b5b 100644 --- a/argocd/datadog_checks/argocd/config_models/defaults.py +++ b/argocd/datadog_checks/argocd/config_models/defaults.py @@ -36,6 +36,10 @@ def instance_collect_counters_with_distributions(): return False +def instance_collect_genresources(): + return False + + def instance_collect_histogram_buckets(): return True @@ -56,6 +60,18 @@ def instance_enable_legacy_tags_normalization(): return True +def instance_genresources_collection_interval_seconds(): + return 120 + + +def instance_genresources_max_resources_per_cycle(): + return 10000 + + +def instance_genresources_ttl_seconds(): + return 21600 + + def instance_histogram_buckets_as_distributions(): return False diff --git a/argocd/datadog_checks/argocd/config_models/instance.py b/argocd/datadog_checks/argocd/config_models/instance.py index 82de77384f133..b6a7332153a54 100644 --- a/argocd/datadog_checks/argocd/config_models/instance.py +++ b/argocd/datadog_checks/argocd/config_models/instance.py @@ -101,6 +101,7 @@ class InstanceConfig(BaseModel): cache_metric_wildcards: Optional[bool] = None cache_shared_labels: Optional[bool] = None collect_counters_with_distributions: Optional[bool] = None + collect_genresources: Optional[bool] = None collect_histogram_buckets: Optional[bool] = None commit_server_endpoint: Optional[str] = None connect_timeout: Optional[float] = None @@ -113,6 +114,13 @@ class InstanceConfig(BaseModel): exclude_metrics_by_labels: Optional[MappingProxyType[str, Union[bool, tuple[str, ...]]]] = None extra_headers: Optional[MappingProxyType[str, Any]] = None extra_metrics: Optional[tuple[Union[str, MappingProxyType[str, Union[str, ExtraMetrics]]], ...]] = None + genresources_auth_token: Optional[str] = None + genresources_collection_interval_seconds: Optional[int] = Field(None, ge=1) + genresources_endpoint: Optional[str] = None + genresources_exclude_paths: Optional[tuple[str, ...]] = None + genresources_extra_include_paths: Optional[tuple[str, ...]] = None + genresources_max_resources_per_cycle: Optional[int] = Field(None, ge=1) + genresources_ttl_seconds: Optional[int] = Field(None, ge=1) headers: Optional[MappingProxyType[str, Any]] = None histogram_buckets_as_distributions: Optional[bool] = None hostname_format: Optional[str] = None diff --git a/argocd/datadog_checks/argocd/resources.py b/argocd/datadog_checks/argocd/resources.py new file mode 100644 index 0000000000000..9a972779445c5 --- /dev/null +++ b/argocd/datadog_checks/argocd/resources.py @@ -0,0 +1,315 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +"""Generic resources pilot collector for ArgoCD.""" + +from __future__ import annotations + +import hashlib +import json +import os +import time +from collections.abc import Callable +from dataclasses import dataclass +from typing import TYPE_CHECKING +from urllib.parse import urlparse, urlunparse + +try: + import datadog_agent +except ImportError: + datadog_agent = None + +from .resources_constants import ( + APPLICATION_INCLUDE, + CLUSTER_INCLUDE, + GENRESOURCES_API_UP_METRIC, + KEY_SEPARATOR, + REPOSITORY_INCLUDE, + URL_CREDENTIALS_PATTERN, +) + +if TYPE_CHECKING: + from .check import ArgocdCheck + + +def _instance_prefix(endpoint: str | None) -> str: + """Build a multi-part prefix that disambiguates resources across clusters and envs. + + Order: kube_cluster_name : env. The argocd hostname is used only as a + last-resort fallback when neither cluster name nor env is available + (e.g. an out-of-k8s agent monitoring a remote argocd without DD_ENV set). + """ + parts: list[str] = [] + if datadog_agent is not None: + cluster = "" + try: + cluster = datadog_agent.get_clustername() or "" + except Exception: + pass + if not cluster: + try: + cluster = datadog_agent.get_config("cluster_name") or "" + except Exception: + pass + if cluster: + parts.append(cluster) + try: + tags = datadog_agent.get_config("tags") or [] + except Exception: + tags = [] + env = next((t.split(":", 1)[1] for t in tags if t.startswith("env:")), "") or os.environ.get("DD_ENV", "") + if env: + parts.append(env) + if not parts and endpoint: + host = urlparse(endpoint).hostname or "" + if host: + parts.append(host) + return KEY_SEPARATOR.join(parts) + + +@dataclass(frozen=True) +class ResourceTypeSpec: + resource_type: str + api_path: str + include: dict[str, tuple[str, ...]] + key_builder: Callable[[dict], str] + + +def _strip_url_userinfo(url: str) -> str: + """Drop any embedded ``user:token@`` credentials from a URL.""" + parsed = urlparse(url) + if not (parsed.username or parsed.password): + return url + netloc = parsed.hostname or "" + if parsed.port: + netloc = f"{netloc}:{parsed.port}" + return urlunparse(parsed._replace(netloc=netloc)) + + +def _scrub_url_credentials(text: str) -> str: + """Strip ``user:token@`` userinfo from any URL embedded in free text (e.g. error messages).""" + return URL_CREDENTIALS_PATTERN.sub(r"\1", text) + + +def _sanitize_item(item: dict, resource_type: str) -> None: + """Strip embedded credentials from repo URLs and free-text messages in place before they ship.""" + if resource_type == "argocd_application": + spec = item.get("spec") or {} + source = spec.get("source") + if isinstance(source, dict) and isinstance(source.get("repoURL"), str): + source["repoURL"] = _strip_url_userinfo(source["repoURL"]) + for src in spec.get("sources") or []: + if isinstance(src, dict) and isinstance(src.get("repoURL"), str): + src["repoURL"] = _strip_url_userinfo(src["repoURL"]) + for condition in (item.get("status") or {}).get("conditions") or []: + if isinstance(condition, dict) and isinstance(condition.get("message"), str): + condition["message"] = _scrub_url_credentials(condition["message"]) + elif resource_type == "argocd_repository": + if isinstance(item.get("repo"), str): + item["repo"] = _strip_url_userinfo(item["repo"]) + connection = item.get("connectionState") + if isinstance(connection, dict) and isinstance(connection.get("message"), str): + connection["message"] = _scrub_url_credentials(connection["message"]) + + +def _change_token(item: dict) -> str: + """A value that changes whenever the resource changes: k8s resourceVersion if present, else a content hash.""" + version = (item.get("metadata") or {}).get("resourceVersion") + if isinstance(version, str) and version: + return version + encoded = json.dumps(item, sort_keys=True, default=str).encode("utf-8") + return hashlib.sha1(encoded, usedforsecurity=False).hexdigest() + + +def _application_key(item: dict) -> str: + metadata = item.get("metadata") or {} + namespace = metadata.get("namespace") or "default" + name = metadata.get("name") + if not name: + raise ValueError("argocd_application is missing metadata.name") + return f"{namespace}{KEY_SEPARATOR}{name}" + + +def _cluster_key(item: dict) -> str: + server = item.get("server") + if not server: + raise ValueError("argocd_cluster is missing server") + return server + + +def _repository_key(item: dict) -> str: + repo = item.get("repo") + if not repo: + raise ValueError("argocd_repository is missing repo") + return repo + + +RESOURCE_TYPE_SPECS: tuple[ResourceTypeSpec, ...] = ( + ResourceTypeSpec( + resource_type="argocd_application", + api_path="/api/v1/applications", + include=APPLICATION_INCLUDE, + key_builder=_application_key, + ), + ResourceTypeSpec( + resource_type="argocd_cluster", + api_path="/api/v1/clusters", + include=CLUSTER_INCLUDE, + key_builder=_cluster_key, + ), + ResourceTypeSpec( + resource_type="argocd_repository", + api_path="/api/v1/repositories", + include=REPOSITORY_INCLUDE, + key_builder=_repository_key, + ), +) + + +def _is_excluded(path: str, exclude_paths: tuple[str, ...]) -> bool: + """True if a path equals an exclude entry or is nested beneath one (subtree match).""" + return any(path == ex or path.startswith(f"{ex}.") or path.startswith(f"{ex}[") for ex in exclude_paths) + + +def _build_include( + spec_include: dict[str, tuple[str, ...]], extra_paths: list[str], exclude_paths: tuple[str, ...] +) -> dict[str, list[str]]: + """Compose a type's final allow-list: baseline paths plus extras, minus any excluded paths and maps.""" + paths = list(spec_include["paths"]) + extra_paths + return { + "paths": [p for p in paths if not _is_excluded(p, exclude_paths)], + "map_paths": [p for p in spec_include["map_paths"] if not _is_excluded(p, exclude_paths)], + "annotation_keys": list(spec_include["annotation_keys"]), + } + + +class ArgocdResourceCollector: + """Fetches ArgoCD Applications, Clusters, and Repositories and ships them as generic resources.""" + + def __init__(self, check: "ArgocdCheck") -> None: + self.check = check + config = check.config + self._endpoint: str | None = config.genresources_endpoint + self._ttl_seconds: int = config.genresources_ttl_seconds + self._max_resources: int = config.genresources_max_resources_per_cycle + self._extra_paths: list[str] = list(config.genresources_extra_include_paths or []) + self._exclude_paths: tuple[str, ...] = tuple(config.genresources_exclude_paths or []) + self._auth_token: str | None = config.genresources_auth_token + self._instance_prefix: str = _instance_prefix(self._endpoint) + self._submitted: dict[str, str] = {} + self._last_full_submit: float = 0.0 + self._resubmit_interval: int = max(1, self._ttl_seconds // 2) + self._collection_interval: int = config.genresources_collection_interval_seconds + self._last_collect: float = 0.0 + self._includes: dict[str, dict[str, list[str]]] = { + spec.resource_type: _build_include(spec.include, self._extra_paths, self._exclude_paths) + for spec in RESOURCE_TYPE_SPECS + } + for resource_type, include in self._includes.items(): + if not include["paths"] and not include["map_paths"]: + self.check.log.warning( + "genresources: genresources_exclude_paths emptied the allow-list for %s; " + "nothing will be collected for it", + resource_type, + ) + if self._ttl_seconds < self._collection_interval: + self.check.log.warning( + "genresources: genresources_ttl_seconds (%d) is shorter than " + "genresources_collection_interval_seconds (%d); resources will expire " + "before the next refresh and cause entity churn", + self._ttl_seconds, + self._collection_interval, + ) + + def collect(self) -> None: + seen_at = int(time.time()) + if seen_at - self._last_collect < self._collection_interval: + return + self._last_collect = seen_at + + if not self._endpoint: + self.check.log.warning("collect_genresources is enabled but genresources_endpoint is not set; skipping") + for spec in RESOURCE_TYPE_SPECS: + self.check.gauge(GENRESOURCES_API_UP_METRIC, 0, tags=[f"resource_type:{spec.resource_type}"]) + return + + expire_at = seen_at + self._ttl_seconds + force_full = (seen_at - self._last_full_submit) >= self._resubmit_interval + if force_full: + self._last_full_submit = seen_at + + for spec in RESOURCE_TYPE_SPECS: + self._collect_type(spec, seen_at=seen_at, expire_at=expire_at, force_full=force_full) + + def _collect_type(self, spec: ResourceTypeSpec, *, seen_at: int, expire_at: int, force_full: bool) -> None: + tags = [f"resource_type:{spec.resource_type}"] + try: + items = self._fetch(spec.api_path) + except Exception: + self.check.log.exception("genresources: failed to fetch %s", spec.resource_type) + self.check.gauge(GENRESOURCES_API_UP_METRIC, 0, tags=tags) + return + + self.check.gauge(GENRESOURCES_API_UP_METRIC, 1, tags=tags) + + if len(items) > self._max_resources: + self.check.log.warning( + "genresources: volume cap hit for type=%s: fetched %d, capped at %d; " + "increase genresources_max_resources_per_cycle if expected", + spec.resource_type, + len(items), + self._max_resources, + ) + items = items[: self._max_resources] + + seen = set() + for item in items: + cache_key = self._emit_item(item, spec, seen_at=seen_at, expire_at=expire_at, force_full=force_full) + if cache_key is not None: + seen.add(cache_key) + namespace = f"{spec.resource_type}{KEY_SEPARATOR}" + self._submitted = {k: v for k, v in self._submitted.items() if not k.startswith(namespace) or k in seen} + + def _fetch(self, api_path: str) -> list[dict]: + url = self._endpoint.rstrip("/") + api_path + kwargs: dict = {} + if self._auth_token: + kwargs["headers"] = {"Authorization": f"Bearer {self._auth_token}"} + response = self.check.http.get(url, **kwargs) + response.raise_for_status() + payload = response.json() + return list(payload.get("items") or []) + + def _emit_item( + self, item: dict, spec: ResourceTypeSpec, *, seen_at: int, expire_at: int, force_full: bool + ) -> str | None: + token = _change_token(item) + try: + _sanitize_item(item, spec.resource_type) + except Exception: + self.check.log.exception("genresources: sanitize failed for %s", spec.resource_type) + return None + try: + key = spec.key_builder(item) + except Exception: + self.check.log.warning("genresources: skipping malformed %s", spec.resource_type, exc_info=True) + return None + if self._instance_prefix: + key = f"{self._instance_prefix}{KEY_SEPARATOR}{key}" + include = self._includes[spec.resource_type] + cache_key = f"{spec.resource_type}{KEY_SEPARATOR}{key}" + if force_full or self._submitted.get(cache_key) != token: + try: + self.check.submit_generic_resource( + type=spec.resource_type, + key=key, + fields=item, + include=include, + seen_at=seen_at, + expire_at=expire_at, + ) + self._submitted[cache_key] = token + except Exception: + self.check.log.exception("genresources: failed to submit %s (key=%s)", spec.resource_type, key) + return cache_key diff --git a/argocd/datadog_checks/argocd/resources_constants.py b/argocd/datadog_checks/argocd/resources_constants.py new file mode 100644 index 0000000000000..28b50fca3921b --- /dev/null +++ b/argocd/datadog_checks/argocd/resources_constants.py @@ -0,0 +1,99 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +"""Constants for the ArgoCD generic resources collector.""" + +from __future__ import annotations + +import re + +# Resource-key delimiter. "|" rather than ":" because ":" appears inside cluster/repo server URLs +# (https://host:port). "|" can't occur in a k8s name/namespace, hostname, URL, or DD tag value, so keys +# stay unambiguously splittable. +KEY_SEPARATOR = "|" + +GENRESOURCES_API_UP_METRIC = "argocd.genresources.api.up" + +APPLICATION_INCLUDE: dict[str, tuple[str, ...]] = { + "paths": ( + "metadata.name", + "metadata.namespace", + "spec.project", + "spec.source.repoURL", + "spec.source.path", + "spec.source.targetRevision", + "spec.source.chart", + "spec.sources[*].repoURL", + "spec.sources[*].path", + "spec.sources[*].targetRevision", + "spec.sources[*].chart", + "spec.destination.server", + "spec.destination.namespace", + "spec.destination.name", + "spec.syncPolicy.automated.prune", + "spec.syncPolicy.automated.selfHeal", + "status.sync.status", + "status.sync.revision", + "status.health.status", + "status.health.message", + "status.conditions[*].type", + "status.conditions[*].message", + "status.conditions[*].lastTransitionTime", + "status.operationState.phase", + "status.operationState.startedAt", + "status.operationState.finishedAt", + "status.operationState.operation.initiatedBy.username", + "status.operationState.operation.initiatedBy.automated", + "status.sourceType", + "status.reconciledAt", + "status.summary.images[*]", + "status.history[*].id", + "status.history[*].revision", + "status.history[*].deployedAt", + "status.history[*].deployStartedAt", + "status.history[*].initiatedBy.username", + "status.history[*].initiatedBy.automated", + "status.resources[*].kind", + "status.resources[*].name", + "status.resources[*].namespace", + "status.resources[*].group", + "status.resources[*].version", + "status.resources[*].status", + "status.resources[*].health.status", + "status.resources[*].requiresPruning", + ), + "map_paths": ("metadata.labels",), + "annotation_keys": (), +} + +CLUSTER_INCLUDE: dict[str, tuple[str, ...]] = { + "paths": ( + "name", + "server", + "serverVersion", + "namespaces[*]", + "connectionState.status", + "connectionState.message", + "info.applicationsCount", + "info.serverVersion", + "info.cacheInfo.resourcesCount", + ), + "map_paths": ("labels",), + "annotation_keys": (), +} + +REPOSITORY_INCLUDE: dict[str, tuple[str, ...]] = { + "paths": ( + "repo", + "type", + "name", + "project", + "connectionState.status", + "connectionState.message", + ), + "map_paths": (), + "annotation_keys": (), +} + +URL_CREDENTIALS_PATTERN = re.compile(r"([a-zA-Z][a-zA-Z0-9+.\-]*://)[^/\s@]+@") diff --git a/argocd/metadata.csv b/argocd/metadata.csv index d6a921b23ee8e..d47f5a2c774c2 100644 --- a/argocd/metadata.csv +++ b/argocd/metadata.csv @@ -242,6 +242,7 @@ argocd.commit_server.process.virtual_memory.max_bytes,gauge,,byte,,The maximum a argocd.commit_server.userinfo.request.duration.seconds.bucket,count,,,,The userinfo requests duration histogram buckets,0,argocd,commit_server userinfo request duration seconds bucket,, argocd.commit_server.userinfo.request.duration.seconds.count,count,,,,The count aggregation of the userinfo requests duration histogram,0,argocd,commit_server userinfo request duration seconds count,, argocd.commit_server.userinfo.request.duration.seconds.sum,count,,second,,The sum aggregation of the userinfo requests duration histogram,0,argocd,commit_server userinfo request duration seconds sum,, +argocd.genresources.api.up,gauge,,,,Reachability of the ArgoCD REST API used by the generic resources collector. Emits 1 per resource_type tag on success and 0 on failure.,0,argocd,genresources api up,, argocd.notifications_controller.go.gc.duration.seconds.count,count,,second,,The summary count of garbage collection cycles in the API Server,0,argocd,notifications_controller go gc duration seconds count,, argocd.notifications_controller.go.gc.duration.seconds.quantile,gauge,,second,,A summary of the pause duration of garbage collection cycles in the API Server,0,argocd,notifications_controller go gc duration seconds,, argocd.notifications_controller.go.gc.duration.seconds.sum,count,,second,,The sum of the pause duration of garbage collection cycles in the API Server,0,argocd,notifications_controller go gc duration seconds sum,, diff --git a/argocd/pyproject.toml b/argocd/pyproject.toml index 3ec48e2323383..1cdee59d594a4 100644 --- a/argocd/pyproject.toml +++ b/argocd/pyproject.toml @@ -28,7 +28,7 @@ classifiers = [ "Topic :: System :: Monitoring", ] dependencies = [ - "datadog-checks-base>=37.33.0", + "datadog-checks-base>=37.40.0", ] dynamic = [ "version", diff --git a/argocd/tests/test_resources.py b/argocd/tests/test_resources.py new file mode 100644 index 0000000000000..5de8355b5bfe7 --- /dev/null +++ b/argocd/tests/test_resources.py @@ -0,0 +1,542 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from datadog_checks.argocd import ArgocdCheck +from datadog_checks.argocd.resources import ArgocdResourceCollector +from datadog_checks.argocd.resources_constants import ( + APPLICATION_INCLUDE, + CLUSTER_INCLUDE, + GENRESOURCES_API_UP_METRIC, +) +from datadog_checks.dev.http import MockResponse + +pytestmark = pytest.mark.unit + +ARGOCD_ENDPOINT = "https://argocd.example.com" +APPLICATIONS_URL = f"{ARGOCD_ENDPOINT}/api/v1/applications" +CLUSTERS_URL = f"{ARGOCD_ENDPOINT}/api/v1/clusters" +REPOSITORIES_URL = f"{ARGOCD_ENDPOINT}/api/v1/repositories" + + +def _application( + name: str, + *, + namespace: str = "argocd", + cluster: str = "https://kubernetes.default.svc", + dest_namespace: str | None = None, +) -> dict: + return { + "metadata": {"name": name, "namespace": namespace}, + "spec": {"destination": {"server": cluster, "namespace": dest_namespace or namespace}, "source": {}}, + "status": {"sync": {"status": "Synced"}}, + } + + +def _cluster(server: str, *, name: str = "prod", username: str = "", password: str = "") -> dict: + return { + "server": server, + "name": name, + "config": {"username": username, "password": password}, + } + + +def _repository(repo: str, *, username: str = "", password: str = "", ssh_key: str = "") -> dict: + return {"repo": repo, "username": username, "password": password, "sshPrivateKey": ssh_key, "type": "git"} + + +def _instance(**overrides) -> dict: + instance = { + "app_controller_endpoint": "http://app_controller:8082", + "collect_genresources": True, + "genresources_endpoint": ARGOCD_ENDPOINT, + "genresources_auth_token": "test-token", + } + instance.update(overrides) + return instance + + +def _items_response(items: list[dict], status_code: int = 200) -> MockResponse: + return MockResponse(json_data={"items": items}, status_code=status_code) + + +def _check(**instance_overrides) -> ArgocdCheck: + """Build an ArgocdCheck with config models loaded and the genresources collector attached. + + Production builds the collector lazily on the first ``check()`` call once + ``check_initializations`` has populated ``self.config``. Tests reach into + ``check._resource_collector`` directly, so we mirror that bootstrap here. + """ + check = ArgocdCheck("argocd", {}, [_instance(**instance_overrides)]) + check.load_configuration_models() + check._resource_collector = ArgocdResourceCollector(check) + return check + + +def test_collect_emits_applications_clusters_and_repositories(aggregator, mock_http_response_per_endpoint): + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application("checkout")])], + CLUSTERS_URL: [_items_response([_cluster("https://cluster-a.example")])], + REPOSITORIES_URL: [_items_response([_repository("https://github.com/team/repo")])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + by_type = {call.kwargs["type"]: call.kwargs for call in submit.call_args_list} + assert by_type["argocd_application"]["key"] == "argocd.example.com|argocd|checkout" + assert by_type["argocd_cluster"]["key"] == "argocd.example.com|https://cluster-a.example" + assert by_type["argocd_repository"]["key"] == "argocd.example.com|https://github.com/team/repo" + for spec_type in ("argocd_application", "argocd_cluster", "argocd_repository"): + aggregator.assert_metric(GENRESOURCES_API_UP_METRIC, value=1, tags=[f"resource_type:{spec_type}"]) + + +def test_application_include_contains_prd_fields_required_for_idp_catalog(): + assert { + "spec.sources[*].chart", + "status.conditions[*].lastTransitionTime", + "status.history[*].id", + "status.history[*].revision", + "status.history[*].deployedAt", + "status.history[*].deployStartedAt", + "status.history[*].initiatedBy.username", + "status.history[*].initiatedBy.automated", + "status.operationState.startedAt", + "status.operationState.finishedAt", + "status.summary.images[*]", + } <= set(APPLICATION_INCLUDE["paths"]) + + +def test_application_include_contains_kubernetes_resource_identity_for_automatic_mapping(): + assert { + "status.resources[*].group", + "status.resources[*].version", + "status.resources[*].requiresPruning", + } <= set(APPLICATION_INCLUDE["paths"]) + + +def test_application_key_uses_app_identity_not_destination(mock_http_response_per_endpoint): + apps = [ + _application("web", namespace="team-a", cluster="https://remote", dest_namespace="prod"), + _application("web", namespace="team-b", cluster="https://remote", dest_namespace="prod"), + ] + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response(apps)], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + keys = {c.kwargs["key"] for c in submit.call_args_list if c.kwargs["type"] == "argocd_application"} + assert keys == {"argocd.example.com|team-a|web", "argocd.example.com|team-b|web"} + + +def test_collect_appends_extra_include_paths_to_every_type(mock_http_response_per_endpoint): + extra = ["metadata.generation"] + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application("checkout")])], + CLUSTERS_URL: [_items_response([_cluster("https://cluster-a.example")])], + REPOSITORIES_URL: [_items_response([_repository("https://github.com/team/repo")])], + } + ) + check = _check(genresources_extra_include_paths=extra) + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + for call in submit.call_args_list: + assert call.kwargs["include"]["paths"][-1] == extra[0] + + +def test_collect_isolates_per_endpoint_failures(aggregator, mock_http_response_per_endpoint): + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application("checkout")])], + CLUSTERS_URL: [_items_response([], status_code=403)], + REPOSITORIES_URL: [_items_response([_repository("https://github.com/team/repo")])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + emitted_types = {call.kwargs["type"] for call in submit.call_args_list} + assert emitted_types == {"argocd_application", "argocd_repository"} + aggregator.assert_metric(GENRESOURCES_API_UP_METRIC, value=1, tags=["resource_type:argocd_application"]) + aggregator.assert_metric(GENRESOURCES_API_UP_METRIC, value=0, tags=["resource_type:argocd_cluster"]) + aggregator.assert_metric(GENRESOURCES_API_UP_METRIC, value=1, tags=["resource_type:argocd_repository"]) + + +def test_collect_skips_malformed_items_without_poisoning_cycle(mock_http_response_per_endpoint, caplog): + malformed = {"spec": {}} + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application("checkout"), malformed, _application("payments")])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + application_emits = [c for c in submit.call_args_list if c.kwargs["type"] == "argocd_application"] + assert len(application_emits) == 2 + assert any("skipping malformed argocd_application" in rec.message for rec in caplog.records) + + +def test_collect_caps_per_type_with_warning(mock_http_response_per_endpoint, caplog): + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application(f"app-{i}") for i in range(7)])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check(genresources_max_resources_per_cycle=3) + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + application_emits = [c for c in submit.call_args_list if c.kwargs["type"] == "argocd_application"] + assert len(application_emits) == 3 + warning_messages = [rec.message for rec in caplog.records if "volume cap hit" in rec.message] + assert warning_messages, "expected a volume-cap warning" + warning = warning_messages[0] + assert "type=argocd_application" in warning + assert "fetched 7" in warning + assert "capped at 3" in warning + + +def test_collect_logs_submit_failures_distinctly_from_malformed(mock_http_response_per_endpoint, caplog): + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application("checkout")])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource", side_effect=RuntimeError("helper boom")): + check._resource_collector.collect() + + assert any("failed to submit argocd_application" in rec.message for rec in caplog.records) + assert not any("skipping malformed" in rec.message for rec in caplog.records) + + +def test_collect_strips_credentials_from_repo_urls(mock_http_response_per_endpoint): + app = _application("web") + app["spec"]["source"] = {"repoURL": "https://user:t0ken@github.com/org/repo"} + repo = {"repo": "https://user:t0ken@github.com/org/repo", "type": "git"} + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([app])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([repo])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + by_type = {c.kwargs["type"]: c.kwargs for c in submit.call_args_list} + assert by_type["argocd_application"]["fields"]["spec"]["source"]["repoURL"] == "https://github.com/org/repo" + assert by_type["argocd_repository"]["fields"]["repo"] == "https://github.com/org/repo" + assert by_type["argocd_repository"]["key"] == "argocd.example.com|https://github.com/org/repo" + + +def test_collect_scrubs_credentials_from_condition_messages(mock_http_response_per_endpoint): + app = _application("broken") + app["status"]["conditions"] = [ + { + "type": "ComparisonError", + "message": "failed to fetch https://oauth2:t0ken@github.com/org/repo: auth required", + } + ] + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([app])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + app_call = next(c for c in submit.call_args_list if c.kwargs["type"] == "argocd_application") + conditions = app_call.kwargs["fields"]["status"]["conditions"] + assert conditions[0]["type"] == "ComparisonError" + assert "t0ken" not in conditions[0]["message"] + assert "https://github.com/org/repo" in conditions[0]["message"] + + +def test_collect_scrubs_credentials_from_cluster_connection_state(mock_http_response_per_endpoint): + cluster = _cluster("https://cluster-a.example") + cluster["connectionState"] = { + "status": "Failed", + "message": "dial https://oauth2:t0ken@cluster-a.example: connection refused", + } + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([])], + CLUSTERS_URL: [_items_response([cluster])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + cluster_call = next(c for c in submit.call_args_list if c.kwargs["type"] == "argocd_cluster") + connection = cluster_call.kwargs["fields"]["connectionState"] + assert connection["status"] == "Failed" + assert "t0ken" not in connection["message"] + assert "https://cluster-a.example" in connection["message"] + + +def test_collect_emits_api_up_zero_when_endpoint_missing(aggregator, mock_http_response_per_endpoint, caplog): + captured = mock_http_response_per_endpoint({}) + check = _check(genresources_endpoint=None) + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + assert submit.call_count == 0 + assert captured.call_count == 0 # no HTTP calls attempted when the endpoint is unset + for resource_type in ("argocd_application", "argocd_cluster", "argocd_repository"): + aggregator.assert_metric(GENRESOURCES_API_UP_METRIC, value=0, tags=[f"resource_type:{resource_type}"]) + assert any( + "collect_genresources is enabled but genresources_endpoint is not set" in rec.message for rec in caplog.records + ) + + +def test_collect_skips_unchanged_resources_on_second_cycle(mock_http_response_per_endpoint): + app = _application("checkout") + app["metadata"]["resourceVersion"] = "100" + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([app])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + after_first = submit.call_count + check._resource_collector._last_collect = 0.0 # simulate the collection interval elapsing + check._resource_collector.collect() + after_second = submit.call_count + + assert after_first == 1 + assert after_second == 1 # unchanged app is not re-submitted on the next cycle + + +def test_collect_resubmits_application_when_resource_version_changes(mock_http_response_per_endpoint): + app_v1 = _application("checkout") + app_v1["metadata"]["resourceVersion"] = "100" + app_v2 = _application("checkout") + app_v2["metadata"]["resourceVersion"] = "200" + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([app_v1]), _items_response([app_v2])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check() + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + after_first = submit.call_count + check._resource_collector._last_collect = 0.0 # simulate the collection interval elapsing + check._resource_collector.collect() + after_second = submit.call_count + + assert after_first == 1 + assert after_second == 2 # resourceVersion changed, so the app is re-submitted + + +def test_collect_resubmits_unchanged_resources_on_ttl_sweep(mock_http_response_per_endpoint): + app = _application("checkout") + app["metadata"]["resourceVersion"] = "100" + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([app])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check() + collector = check._resource_collector + + with patch.object(check, "submit_generic_resource") as submit: + collector.collect() + after_first = submit.call_count + collector._last_full_submit = 0.0 # force the next cycle to be a full TTL-refresh sweep + collector._last_collect = 0.0 + collector.collect() + after_second = submit.call_count + + assert after_first == 1 + assert after_second == 2 # the sweep re-submits even unchanged resources to refresh their TTL + + +def test_collect_respects_collection_interval(mock_http_response_per_endpoint): + app_v1 = _application("checkout") + app_v1["metadata"]["resourceVersion"] = "100" + app_v2 = _application("checkout") + app_v2["metadata"]["resourceVersion"] = "200" + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([app_v1]), _items_response([app_v2])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check() # default 120s collection interval + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + after_first = submit.call_count + check._resource_collector.collect() + after_second = submit.call_count + + assert after_first == 1 + assert after_second == 1 # second call is within the interval -> skipped, the changed v2 is never fetched + + +def _application_include(submit) -> dict: + return next(c.kwargs["include"] for c in submit.call_args_list if c.kwargs["type"] == "argocd_application") + + +def test_collect_excludes_listed_leaf_path_from_allow_list(mock_http_response_per_endpoint): + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application("checkout")])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check(genresources_exclude_paths=["status.conditions[*].message"]) + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + paths = _application_include(submit)["paths"] + assert "status.conditions[*].message" not in paths + assert "status.conditions[*].type" in paths # sibling leaf is untouched + + +def test_collect_exclude_drops_whole_subtree_given_parent_path(mock_http_response_per_endpoint): + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application("checkout")])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check(genresources_exclude_paths=["status.history"]) + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + paths = _application_include(submit)["paths"] + assert not any(p.startswith("status.history") for p in paths) + assert "status.sync.status" in paths # unrelated path is retained + + +def test_collect_exclude_removes_map_path(mock_http_response_per_endpoint): + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application("checkout")])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check(genresources_exclude_paths=["metadata.labels"]) + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + assert _application_include(submit)["map_paths"] == [] + + +def test_collect_exclude_overrides_extra_include_path(mock_http_response_per_endpoint): + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([_application("checkout")])], + CLUSTERS_URL: [_items_response([])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check( + genresources_extra_include_paths=["metadata.generation"], + genresources_exclude_paths=["metadata.generation"], + ) + + with patch.object(check, "submit_generic_resource") as submit: + check._resource_collector.collect() + + assert "metadata.generation" not in _application_include(submit)["paths"] + + +def test_real_helper_strips_secrets_and_excluded_fields(aggregator, mock_http_response_per_endpoint): + # Exercises the real submit_generic_resource (unmocked): proves the allow-list projection strips + # secrets and that genresources_exclude_paths actually removes a field from the shipped payload. + cluster = _cluster("https://k8s.example", name="prod") + cluster["config"] = {"bearerToken": "SUPERSECRETTOKEN"} # config is not in CLUSTER_INCLUDE + cluster["annotations"] = { # how GitOps stores it: the whole manifest, token and all + "kubectl.kubernetes.io/last-applied-configuration": '{"config":{"bearerToken":"SUPERSECRETTOKEN"}}' + } + cluster["connectionState"] = {"status": "Successful", "message": ""} + app = _application("checkout") + app["status"]["conditions"] = [{"type": "ComparisonError", "message": "EXCLUDEDMARKER"}] + mock_http_response_per_endpoint( + { + APPLICATIONS_URL: [_items_response([app])], + CLUSTERS_URL: [_items_response([cluster])], + REPOSITORIES_URL: [_items_response([])], + } + ) + check = _check(genresources_exclude_paths=["status.conditions"]) + + check._resource_collector.collect() + + blob = b"".join( + p if isinstance(p, bytes) else p.encode() + for p in aggregator.get_event_platform_events("genresources", parse_json=False) + ) + assert b"checkout" in blob # the application shipped + assert b"https://k8s.example" in blob # the cluster shipped + assert b"SUPERSECRETTOKEN" not in blob # allow-list dropped config + the last-applied-config annotation + assert b"EXCLUDEDMARKER" not in blob # genresources_exclude_paths dropped status.conditions + + +def test_collector_warns_when_exclude_empties_a_type(caplog): + nuke = list(CLUSTER_INCLUDE["paths"]) + list(CLUSTER_INCLUDE["map_paths"]) + _check(genresources_exclude_paths=nuke) + assert any("emptied the allow-list for argocd_cluster" in rec.message for rec in caplog.records)