diff --git a/fluxcd/assets/dashboards/fluxcd.json b/fluxcd/assets/dashboards/fluxcd.json index 8b72627e00863..633f7415b912a 100644 --- a/fluxcd/assets/dashboards/fluxcd.json +++ b/fluxcd/assets/dashboards/fluxcd.json @@ -117,7 +117,7 @@ "title_size": "16", "title_align": "left", "type": "check_status", - "check": "cilium.prometheus.health", + "check": "fluxcd.openmetrics.health", "grouping": "cluster", "group_by": [], "tags": [] diff --git a/kafka_consumer/changelog.d/23778.added b/kafka_consumer/changelog.d/23778.added new file mode 100644 index 0000000000000..12073936dc62b --- /dev/null +++ b/kafka_consumer/changelog.d/23778.added @@ -0,0 +1 @@ +Schema Registry: emit per-subject and global compatibility on the Data Streams schema payload, and stop emitting Datadog Events for broker, topic, and schema registry configurations (those payloads continue to flow to the Data Streams intake). diff --git a/kafka_consumer/changelog.d/23902.added b/kafka_consumer/changelog.d/23902.added new file mode 100644 index 0000000000000..bacd1d6b83316 --- /dev/null +++ b/kafka_consumer/changelog.d/23902.added @@ -0,0 +1 @@ +Emit connection_error DSM event when the integration cannot connect to Kafka. diff --git a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py index dcee07ecfe7c9..9f282e68fb3b9 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py @@ -8,8 +8,9 @@ import json import random import time +from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import TypedDict +from typing import Any, NotRequired, TypedDict from urllib.parse import quote from confluent_kafka import IsolationLevel, TopicPartition @@ -26,6 +27,17 @@ class SchemaDefinition(TypedDict): class SubjectVersionInfo(TypedDict): version: int schema_id: int + compatibility: NotRequired[str | None] + + +class SchemaInfo(TypedDict): + schema_content: str + topic_name: str + schema_for: str + schema_version: int | None + schema_id: int | None + schema_type: str + compatibility: str | None class ClusterMetadataCollector: @@ -48,12 +60,14 @@ def __init__(self, check, client, config, log): self.TOPIC_CONFIG_BATCH_SIZE = 100 # Max topics to describe_configs per check run self.SCHEMA_VERSION_CHECK_BATCH_SIZE = 200 # Lightweight calls, can do more per run + self.SCHEMA_COMPATIBILITY_BATCH_SIZE = 200 # Lightweight calls, refreshed on configs cadence self.SCHEMA_FETCH_CONCURRENCY = 10 # Parallel HTTP requests # Cache size limits self.BROKER_CONFIG_CACHE_MAX_SIZE = 1_000 self.TOPIC_CONFIG_CACHE_MAX_SIZE = 20_000 self.SCHEMA_VERSION_CHECK_CACHE_MAX_SIZE = 20_000 + self.SCHEMA_COMPATIBILITY_FETCH_CACHE_MAX_SIZE = 20_000 self.SCHEMA_ID_CACHE_MAX_SIZE = 20_000 self.BROKER_CONFIG_CACHE_KEY = 'kafka_broker_config_cache' @@ -63,8 +77,10 @@ def __init__(self, check, client, config, log): self.TOPIC_HWM_SUM_CACHE_KEY = 'kafka_topic_hwm_sum_cache' self.SCHEMA_CACHE_KEY = 'kafka_schema_cache' self.SCHEMA_VERSION_CHECK_CACHE_KEY = 'kafka_schema_version_check_cache' + self.SCHEMA_COMPATIBILITY_FETCH_CACHE_KEY = 'kafka_schema_compatibility_fetch_cache' self.SCHEMA_LATEST_VERSION_CACHE_KEY = 'kafka_schema_latest_version_cache' self.SCHEMA_ID_CACHE_KEY = 'kafka_schema_id_cache' + self.GLOBAL_COMPATIBILITY_CACHE_KEY = 'kafka_schema_global_compatibility_cache' self._schema_registry_oauth_token = None self._schema_registry_oauth_token_expiry = 0 @@ -156,26 +172,51 @@ def _fetch_oidc_token(self, oauth_config: dict) -> tuple[str, float]: return access_token, expires_at - def _get_schema_registry_subjects(self): - base_url = self.config._collect_schema_registry - response = self.http.get(f"{base_url}/subjects") + def _schema_registry_get(self, path: str, **kwargs: Any) -> Any: + """GET a Schema Registry path and return the parsed JSON body.""" + url = f"{self.config._collect_schema_registry}{path}" + response = self.http.get(url, **kwargs) response.raise_for_status() return response.json() + def _get_schema_registry_subjects(self): + return self._schema_registry_get('/subjects') + def _get_schema_registry_versions(self, subject: str) -> list[int]: """Fetch the list of version numbers for a subject (lightweight call).""" - base_url = self.config._collect_schema_registry encoded_subject = quote(subject, safe='') - response = self.http.get(f"{base_url}/subjects/{encoded_subject}/versions") - response.raise_for_status() - return response.json() + return self._schema_registry_get(f'/subjects/{encoded_subject}/versions') def _get_schema_registry_latest_version(self, subject): - base_url = self.config._collect_schema_registry encoded_subject = quote(subject, safe='') - response = self.http.get(f"{base_url}/subjects/{encoded_subject}/versions/latest") - response.raise_for_status() - return response.json() + return self._schema_registry_get(f'/subjects/{encoded_subject}/versions/latest') + + def _get_schema_registry_global_compatibility(self) -> str | None: + """Return the global compatibility level from the Schema Registry.""" + return self._schema_registry_get('/config').get('compatibilityLevel') + + def _get_schema_registry_subject_compatibility(self, subject: str) -> str | None: + """Return the effective compatibility for a subject, falling back to global.""" + encoded_subject = quote(subject, safe='') + return self._schema_registry_get( + f'/config/{encoded_subject}', + params={'defaultToGlobal': 'true'}, + ).get('compatibilityLevel') + + def _parallel_fetch(self, fn: Callable[[str], Any], subjects: list[str], error_label: str) -> dict[str, Any]: + """Run fn(subject) for each subject concurrently; drop and log individual failures.""" + results: dict[str, Any] = {} + if not subjects: + return results + with ThreadPoolExecutor(max_workers=self.SCHEMA_FETCH_CONCURRENCY) as executor: + future_to_subject = {executor.submit(fn, subject): subject for subject in subjects} + for future in as_completed(future_to_subject): + subject = future_to_subject[future] + try: + results[subject] = future.result() + except Exception as e: + self.log.warning("Error fetching %s for %s: %s", error_label, subject, e) + return results def collect_all_metadata(self, highwater_offsets): try: @@ -411,7 +452,7 @@ def _collect_broker_metadata(self, metadata=None): if not broker_meta: continue - tags = self._get_tags(cluster_id) + [ + metric_tags = self._get_tags(cluster_id) + [ f'broker_id:{broker_id_str}', f'broker_host:{broker_meta.host}', f'broker_port:{broker_meta.port}', @@ -443,7 +484,7 @@ def _collect_broker_metadata(self, metadata=None): try: value = float(config_data[config_name]) if config_data[config_name] else 0 metric_name = f"broker.config.{config_name.replace('.', '_')}" - self.check.gauge(metric_name, value, tags=tags) + self.check.gauge(metric_name, value, tags=metric_tags) except (ValueError, TypeError): self.log.debug( "Could not convert broker %s config %s value %r to float", @@ -457,7 +498,6 @@ def _collect_broker_metadata(self, metadata=None): fetched_broker_configs[broker_id_str] = { 'event_text': event_text, - 'tags': tags, 'broker_host': broker_meta.host, 'broker_port': broker_meta.port, } @@ -475,19 +515,6 @@ def _collect_broker_metadata(self, metadata=None): for broker_id in brokers_to_emit: info = fetched_broker_configs[broker_id] - self.check.event( - { - 'timestamp': int(time.time()), - 'event_type': 'config_change', - 'source_type_name': 'kafka', - 'msg_title': f'Broker {broker_id} Configuration', - 'msg_text': info['event_text'], - 'tags': info['tags'] + ['event_type:broker_config'], - 'aggregation_key': f'kafka_broker_config_{broker_id}', - 'alert_type': 'info', - } - ) - self.check.event_platform_event( json.dumps( { @@ -758,7 +785,6 @@ def _collect_topic_metadata(self, metadata, highwater_offsets): fetched_topic_configs[topic_name] = { 'event_text': event_text, - 'tags': topic_tags, } self._mark_items_fetched( @@ -774,19 +800,6 @@ def _collect_topic_metadata(self, metadata, highwater_offsets): for topic_name in topics_to_emit: info = fetched_topic_configs[topic_name] - self.check.event( - { - 'timestamp': int(time.time()), - 'event_type': 'info', - 'source_type_name': 'kafka', - 'msg_title': f'Topic: {topic_name} (custom config)', - 'msg_text': info['event_text'], - 'tags': info['tags'] + ['event_type:topic_config'], - 'aggregation_key': f'kafka_topic_config_{topic_name}', - 'alert_type': 'info', - } - ) - self.check.event_platform_event( json.dumps( { @@ -916,6 +929,21 @@ def _save_latest_version_cache(self, cache: dict[str, SubjectVersionInfo]): except Exception as e: self.log.debug("Could not write schema latest version cache: %s", e) + def _load_global_compatibility_cache(self) -> str | None: + """Return the last successfully fetched global compatibility level.""" + try: + return self.check.read_persistent_cache(self.GLOBAL_COMPATIBILITY_CACHE_KEY) or None + except Exception as e: + self.log.debug("Could not read global compatibility cache: %s", e) + return None + + def _save_global_compatibility_cache(self, value: str) -> None: + """Persist the last known global compatibility level.""" + try: + self.check.write_persistent_cache(self.GLOBAL_COMPATIBILITY_CACHE_KEY, value) + except Exception as e: + self.log.debug("Could not write global compatibility cache: %s", e) + def _collect_schema_registry_info(self, metadata): if not self.config._collect_schema_registry: return @@ -942,6 +970,18 @@ def _collect_schema_registry_info(self, metadata): self.check.gauge('schema_registry.subjects', len(subjects), tags=self._get_tags(cluster_id)) + try: + global_compatibility = self._get_schema_registry_global_compatibility() + if global_compatibility is not None: + self._save_global_compatibility_cache(global_compatibility) + else: + # A transient empty /config response shouldn't drop global_compatibility from + # this cycle's payloads — fall back to the last known value, same as the error path. + global_compatibility = self._load_global_compatibility_cache() + except Exception as e: + self.log.warning("Failed to fetch global compatibility from Schema Registry: %s", e) + global_compatibility = self._load_global_compatibility_cache() + # --- Tier 1: Lightweight version checks --- # GET /subjects/{subject}/versions returns just [1, 2, 3] — very cheap. # We use this to detect if a subject has a new version without fetching schema content. @@ -955,19 +995,7 @@ def _collect_schema_registry_info(self, metadata): latest_version_cache = self._load_latest_version_cache() # Fetch version lists in parallel (lightweight calls) - version_responses = {} - if subjects_to_check: - with ThreadPoolExecutor(max_workers=self.SCHEMA_FETCH_CONCURRENCY) as executor: - future_to_subject = { - executor.submit(self._get_schema_registry_versions, subject): subject - for subject in subjects_to_check - } - for future in as_completed(future_to_subject): - subject = future_to_subject[future] - try: - version_responses[subject] = future.result() - except Exception as e: - self.log.warning("Error getting version list for %s: %s", subject, e) + version_responses = self._parallel_fetch(self._get_schema_registry_versions, subjects_to_check, "version list") # Mark all checked subjects as fetched (even if they errored) self._mark_items_fetched( @@ -1001,19 +1029,23 @@ def _collect_schema_registry_info(self, metadata): schema_id_cache_updated = False # Fetch latest versions in parallel for subjects that changed - schema_responses = {} - if subjects_needing_full_fetch: - with ThreadPoolExecutor(max_workers=self.SCHEMA_FETCH_CONCURRENCY) as executor: - future_to_subject = { - executor.submit(self._get_schema_registry_latest_version, subject): subject - for subject in subjects_needing_full_fetch - } - for future in as_completed(future_to_subject): - subject = future_to_subject[future] - try: - schema_responses[subject] = future.result() - except Exception as e: - self.log.warning("Error getting schema details for %s: %s", subject, e) + schema_responses = self._parallel_fetch( + self._get_schema_registry_latest_version, subjects_needing_full_fetch, "schema details" + ) + + compatibility_responses = self._collect_subject_compatibilities(subjects, subjects_needing_full_fetch) + + # Apply standalone compatibility updates to existing cache entries so a flip alone (without a + # version bump) flows into the next schema emission via the cache_content key. This must run + # before the schema_responses loop below replaces latest_version_cache[subject] for + # version-bumped subjects; the `subject in schema_responses` guard keeps the two write sites + # mutually exclusive per subject. + for subject, compatibility in compatibility_responses.items(): + if compatibility is None or subject in schema_responses: + continue + entry = latest_version_cache.get(subject) + if isinstance(entry, dict): + entry['compatibility'] = compatibility fetched_schemas = {} @@ -1026,8 +1058,19 @@ def _collect_schema_registry_info(self, metadata): self.log.warning("Schema Registry returned incomplete data for %s: %s", subject, latest_schema) continue - # Update the latest version cache with version and schema_id - latest_version_cache[subject] = {'version': schema_version, 'schema_id': schema_id} + # A None here means either the fetch failed or the response carried no + # compatibilityLevel. With defaultToGlobal=true the registry returns the effective + # level for any existing subject, so in practice None signals a fetch failure — fall + # back to the cached value rather than overwriting it with None. + compatibility = compatibility_responses.get(subject) + if compatibility is None: + compatibility = (latest_version_cache.get(subject) or {}).get('compatibility') + + latest_version_cache[subject] = { + 'version': schema_version, + 'schema_id': schema_id, + 'compatibility': compatibility, + } # Use permanent schema ID cache to avoid processing unchanged schemas. schema_id_str = str(schema_id) @@ -1043,29 +1086,13 @@ def _collect_schema_registry_info(self, metadata): } schema_id_cache_updated = True - topic_name, schema_for = self._parse_subject(subject) - - subject_tags = self._get_tags(cluster_id) + [f'subject:{subject}'] - event_tags = subject_tags + [ - f'schema_id:{schema_id}', - f'schema_version:{schema_version}', - f'schema_type:{schema_type}', - f'topic:{topic_name}', - f'schema_for:{schema_for}', - 'event_type:schema_registry', - ] - - cache_content = f"{schema_id}:{schema_version}:{schema_content}" + cache_content = f"{schema_id}:{schema_version}:{compatibility}:{global_compatibility}:{schema_content}" fetched_schemas[subject] = { 'cache_content': cache_content, - 'schema_content': schema_content, - 'topic_name': topic_name, - 'schema_for': schema_for, - 'schema_version': schema_version, - 'schema_id': schema_id, - 'schema_type': schema_type, - 'event_tags': event_tags, + **self._build_schema_info( + subject, schema_content, schema_type, schema_version, schema_id, compatibility + ), } # Persist caches @@ -1075,6 +1102,9 @@ def _collect_schema_registry_info(self, metadata): # Build lightweight cache_content strings for all known subjects (from cache, no extra HTTP calls). # This allows re-emission of unchanged schemas when the event cache TTL expires. + # Note: changing the cache_content format (e.g. adding the compatibility fields) makes every + # cached subject hash differently on the first run after an upgrade, so all known schemas + # re-emit once. This is self-healing and bounded to a single collection cycle. all_schema_cache_contents = {subject: info['cache_content'] for subject, info in fetched_schemas.items()} for subject in subjects: if subject in all_schema_cache_contents: @@ -1094,77 +1124,130 @@ def _collect_schema_registry_info(self, metadata): if not id_entry: continue - all_schema_cache_contents[subject] = f"{schema_id}:{version}:{id_entry['schema']}" + cached_compat = cached_info.get('compatibility') + all_schema_cache_contents[subject] = ( + f"{schema_id}:{version}:{cached_compat}:{global_compatibility}:{id_entry['schema']}" + ) # Determine which subjects need event emission (changed or TTL expired) schemas_to_emit = self._get_events_to_send(self.SCHEMA_CACHE_KEY, all_schema_cache_contents) - # Build full payloads only for subjects that need emission - for subject in schemas_to_emit: + self._emit_schema_registry_events( + schemas_to_emit, + fetched_schemas, + latest_version_cache, + schema_id_cache, + cluster_id, + global_compatibility, + ) + + def _collect_subject_compatibilities( + self, + subjects: list[str], + subjects_needing_full_fetch: list[str], + ) -> dict[str, str | None]: + """Fetch per-subject compatibility on a cadence and return the raw results. + + Compatibility is refreshed on its own cadence so a flip without a version bump is still picked + up; a per-subject flip therefore surfaces only on the next compat-fetch cadence (up to + CONFIGS_REFRESH_INTERVAL + jitter later), while a global flip re-emits immediately via + global_compatibility. + + The SCHEMA_COMPATIBILITY_BATCH_SIZE clamp bounds only the cadence-driven `compat_due` list; + version-bumped subjects are always fetched on top of that budget, so the effective ceiling on + /config/{subject} calls is SCHEMA_VERSION_CHECK_BATCH_SIZE (which bounds + subjects_needing_full_fetch), not SCHEMA_COMPATIBILITY_BATCH_SIZE. They are equal today, so + there is no over-fetch. + """ + compat_due = self._get_items_to_fetch(self.SCHEMA_COMPATIBILITY_FETCH_CACHE_KEY, subjects) + remaining_slots = max(0, self.SCHEMA_COMPATIBILITY_BATCH_SIZE - len(subjects_needing_full_fetch)) + compat_due = compat_due[:remaining_slots] + compat_subjects_to_fetch = list(set(subjects_needing_full_fetch) | set(compat_due)) + + compatibility_responses: dict[str, str | None] = self._parallel_fetch( + self._get_schema_registry_subject_compatibility, compat_subjects_to_fetch, "compatibility" + ) + if compat_subjects_to_fetch: + # Mark all attempted subjects as fetched (even if they errored), mirroring the version + # tier: a subject that fails this run isn't retried until the next configs cadence rather + # than hammered every check, at the cost of holding stale compatibility until then. + self._mark_items_fetched( + self.SCHEMA_COMPATIBILITY_FETCH_CACHE_KEY, + compat_subjects_to_fetch, + ttl_base=self.CONFIGS_REFRESH_INTERVAL, + ttl_jitter=self.CONFIGS_REFRESH_JITTER, + max_cache_size=self.SCHEMA_COMPATIBILITY_FETCH_CACHE_MAX_SIZE, + ) + + return compatibility_responses + + def _build_schema_info( + self, + subject: str, + schema_content: str, + schema_type: str, + schema_version: int | None, + schema_id: int | None, + compatibility: str | None, + ) -> SchemaInfo: + """Assemble the canonical schema info dict used to build a data-streams schema payload.""" + topic_name, schema_for = self._parse_subject(subject) + return { + 'schema_content': schema_content, + 'topic_name': topic_name, + 'schema_for': schema_for, + 'schema_version': schema_version, + 'schema_id': schema_id, + 'schema_type': schema_type, + 'compatibility': compatibility, + } + + def _emit_schema_registry_events( + self, + subjects_to_emit: list[str], + fetched_schemas: dict[str, dict], + latest_version_cache: dict[str, SubjectVersionInfo], + schema_id_cache: dict[str, SchemaDefinition], + cluster_id: str, + global_compatibility: str | None, + ): + """Emit a data-streams-message payload for each subject that changed or whose event TTL expired.""" + for subject in subjects_to_emit: if subject in fetched_schemas: info = fetched_schemas[subject] else: - # Reconstruct from caches cached_info = latest_version_cache.get(subject, {}) - version = cached_info.get('version') schema_id = cached_info.get('schema_id') id_entry = schema_id_cache.get(str(schema_id), {}) - schema_content = id_entry.get('schema', '') - schema_type = id_entry.get('schema_type', 'AVRO') - - topic_name, schema_for = self._parse_subject(subject) - - subject_tags = self._get_tags(cluster_id) + [f'subject:{subject}'] - event_tags = subject_tags + [ - f'schema_id:{schema_id}', - f'schema_version:{version}', - f'schema_type:{schema_type}', - f'topic:{topic_name}', - f'schema_for:{schema_for}', - 'event_type:schema_registry', - ] - - info = { - 'schema_content': schema_content, - 'topic_name': topic_name, - 'schema_for': schema_for, - 'schema_version': version, - 'schema_id': schema_id, - 'schema_type': schema_type, - 'event_tags': event_tags, - } - - self.check.event( - { - 'timestamp': int(time.time()), - 'event_type': 'info', - 'source_type_name': 'kafka', - 'msg_title': f'{info["topic_name"]} ({info["schema_for"]}) - Schema v{info["schema_version"]}', - 'msg_text': info['schema_content'], - 'tags': info['event_tags'], - 'aggregation_key': f'kafka_schema_{subject}_{info["schema_version"]}', - 'alert_type': 'info', - } - ) + info = self._build_schema_info( + subject, + id_entry.get('schema', ''), + id_entry.get('schema_type', 'AVRO'), + cached_info.get('version'), + schema_id, + cached_info.get('compatibility'), + ) + + ds_payload = { + 'collection_timestamp': int(time.time() * 1000), + 'kafka_cluster_id': cluster_id, + **self._original_cluster_id_field(), + 'subject': subject, + 'topic': info['topic_name'], + 'schema_for': info['schema_for'], + 'schema_id': info['schema_id'], + 'schema_version': info['schema_version'], + 'schema_type': info['schema_type'], + 'config_type': 'schema', + 'schema': info['schema_content'], + } + subject_compat = info.get('compatibility') + if subject_compat is not None: + ds_payload['compatibility'] = subject_compat + if global_compatibility is not None: + ds_payload['global_compatibility'] = global_compatibility - self.check.event_platform_event( - json.dumps( - { - 'collection_timestamp': int(time.time() * 1000), - 'kafka_cluster_id': cluster_id, - **self._original_cluster_id_field(), - 'subject': subject, - 'topic': info['topic_name'], - 'schema_for': info['schema_for'], - 'schema_id': info['schema_id'], - 'schema_version': info['schema_version'], - 'schema_type': info['schema_type'], - 'config_type': 'schema', - 'schema': info['schema_content'], - } - ), - "data-streams-message", - ) + self.check.event_platform_event(json.dumps(ds_payload), "data-streams-message") @staticmethod def _parse_subject(subject: str) -> tuple[str, str]: diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 87b5957edeb6d..6dea7e472c240 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -42,10 +42,15 @@ def check(self, _): try: self.client.request_metadata_update() - except: + except Exception as e: + if self.config._cluster_monitoring_enabled: + try: + self._send_cluster_monitoring_connection_error(str(e)) + except Exception: + self.log.warning("Failed to emit connection_error DSM event", exc_info=True) raise Exception( "Unable to connect to the AdminClient. This is likely due to an error in the configuration." - ) + ) from e try: # Fetch consumer offsets @@ -140,19 +145,31 @@ def _get_broker_list(self) -> list[dict]: for broker_meta in cluster_metadata.brokers.values() ] + def _emit_cluster_monitoring_event(self, payload: dict) -> None: + payload.setdefault('collection_timestamp', int(time() * 1000)) + payload.setdefault('bootstrap_servers', self.config._kafka_connect_str) + self.event_platform_event(json.dumps(payload), "data-streams-message") + + def _send_cluster_monitoring_connection_error(self, reason: str) -> None: + self._emit_cluster_monitoring_event( + { + 'kafka_cluster_id': self.config._kafka_cluster_id_override or '', + 'config_type': 'connection_error', + 'reason': reason, + } + ) + def _send_cluster_monitoring_heartbeat(self, total_contexts: int, cluster_id: str) -> None: payload = { - 'collection_timestamp': int(time() * 1000), 'kafka_cluster_id': cluster_id, 'config_type': 'heartbeat', 'contexts': total_contexts, 'contexts_limit': self._context_limit, - 'bootstrap_servers': self.config._kafka_connect_str, 'brokers': self._get_broker_list(), } if self.config._kafka_cluster_id_override: payload['original_kafka_cluster_id'] = self.config._auto_detected_cluster_id - self.event_platform_event(json.dumps(payload), "data-streams-message") + self._emit_cluster_monitoring_event(payload) def get_consumer_offsets(self): # {(consumer_group, topic, partition): offset} diff --git a/kafka_consumer/tests/test_cluster_metadata.py b/kafka_consumer/tests/test_cluster_metadata.py index 845b90d8e42f9..ea2071bf1c9a5 100644 --- a/kafka_consumer/tests/test_cluster_metadata.py +++ b/kafka_consumer/tests/test_cluster_metadata.py @@ -3,6 +3,7 @@ # Licensed under a 3-clause BSD style license (see LICENSE) """Tests for Kafka cluster metadata collection.""" +import hashlib import json import time from unittest import mock @@ -176,7 +177,7 @@ def mock_list_offsets(requests, **_kwargs): return client -def mock_schema_registry_methods(metadata_collector): +def mock_schema_registry_methods(metadata_collector, global_compat='BACKWARD', subject_compat='BACKWARD'): """Mock Schema Registry methods on the metadata collector.""" metadata_collector._get_schema_registry_subjects = mock.Mock(return_value=['test-topic-value']) @@ -207,6 +208,57 @@ def mock_schema_registry_methods(metadata_collector): } ) + mock_compatibility_methods(metadata_collector, global_compat=global_compat, subject_compat=subject_compat) + + +def mock_compatibility_methods(collector, global_compat='BACKWARD', subject_compat='BACKWARD'): + """Mock the global and per-subject compatibility fetches on the collector.""" + collector._get_schema_registry_global_compatibility = mock.Mock(return_value=global_compat) + collector._get_schema_registry_subject_compatibility = mock.Mock(return_value=subject_compat) + + +def schema_ds_events(check): + """Return the parsed data-streams-message payloads with config_type 'schema' emitted by the check.""" + events = [] + for call in check.event_platform_event.call_args_list: + args = call[0] + if len(args) > 1 and args[1] == 'data-streams-message': + payload = json.loads(args[0]) + if payload.get('config_type') == 'schema': + events.append(payload) + return events + + +def _make_schema_registry_check(check, instance_overrides=None): + """Return a check instance wired with a mock Kafka client and persistent cache mocks.""" + instance = { + 'kafka_connect_str': 'localhost:9092', + 'enable_cluster_monitoring': True, + 'schema_registry_url': 'http://localhost:8081', + 'monitor_unlisted_consumer_groups': True, + } + if instance_overrides: + instance.update(instance_overrides) + kafka_consumer_check = check(instance) + mock_kafka_client = seed_mock_kafka_client() + kafka_consumer_check.client = mock_kafka_client + kafka_consumer_check.metadata_collector.client = mock_kafka_client + return kafka_consumer_check + + +def _wire_cache(kafka_consumer_check, seed=None): + """Wire persistent-cache and event mocks on the check, returning the backing cache_storage dict. + + Each test only declares its seed cache entries; reads and writes go through this in-memory dict. + """ + cache_storage = dict(seed or {}) + kafka_consumer_check.read_persistent_cache = mock.Mock(side_effect=cache_storage.get) + kafka_consumer_check.write_persistent_cache = mock.Mock( + side_effect=lambda key, value: cache_storage.__setitem__(key, value) + ) + kafka_consumer_check.event_platform_event = mock.Mock() + return cache_storage + @pytest.fixture def cluster_config(): @@ -244,7 +296,7 @@ def test_collect_cluster_metadata(check, dd_run_check, aggregator): kafka_consumer_check.metadata_collector.client = mock_kafka_client # Mock schema registry methods on metadata collector - mock_schema_registry_methods(kafka_consumer_check.metadata_collector) + mock_schema_registry_methods(kafka_consumer_check.metadata_collector, subject_compat='FULL') # Mock persistent cache for throughput calculation and schema registry events # Using per-partition format: partition 0 was at 75, partition 1 was at 175 @@ -479,129 +531,12 @@ def mocked_read_cache(key): tags=['test_tag:test_value', 'kafka_cluster_id:test-cluster-id'], ) - # Verify broker configuration event structure and content - broker_config_events = [e for e in aggregator.events if 'event_type:broker_config' in e.get('tags', [])] - assert broker_config_events, f"Expected at least 1 broker config event, found {len(broker_config_events)}" - - # Check broker event structure and content - broker_event = broker_config_events[0] - assert broker_event['event_type'] == 'config_change', "Broker event type should be 'config_change'" - assert broker_event['source_type_name'] == 'kafka', "Broker event source should be 'kafka'" - assert broker_event['msg_title'] == 'Broker 1 Configuration', "Broker event title mismatch" - assert broker_event['alert_type'] == 'info', "Broker event alert type should be 'info'" - assert broker_event['aggregation_key'] == 'kafka_broker_config_1', "Broker event aggregation key mismatch" - - # Verify broker event tags - expected_broker_tags = [ - 'test_tag:test_value', - 'kafka_cluster_id:test-cluster-id', - 'broker_id:1', - 'broker_host:broker1', - 'broker_port:9092', - 'event_type:broker_config', - ] - for tag in expected_broker_tags: - assert tag in broker_event['tags'], f"Missing broker event tag: {tag}" - - # Verify broker config content (msg_text should be JSON with realistic config data) - broker_config_json = json.loads(broker_event['msg_text']) - expected_broker_config = { - 'log.retention.bytes': '1073741824', - 'log.retention.ms': '604800000', - 'log.segment.bytes': '1073741824', - 'num.partitions': '3', - 'num.network.threads': '3', - 'num.io.threads': '8', - 'default.replication.factor': '2', - 'min.insync.replicas': '1', - 'compression.type': 'producer', - } - assert broker_config_json == expected_broker_config, ( - f"Broker config mismatch. Expected {expected_broker_config}, got {broker_config_json}" - ) - - # Verify topic configuration event structure and content - topic_config_events = [e for e in aggregator.events if 'event_type:topic_config' in e.get('tags', [])] - assert topic_config_events, f"Expected at least 1 topic config event, found {len(topic_config_events)}" - - # Check topic event structure and content - topic_event = topic_config_events[0] - assert topic_event['event_type'] == 'info', "Topic event type should be 'info'" - assert topic_event['source_type_name'] == 'kafka', "Topic event source should be 'kafka'" - assert topic_event['msg_title'] == 'Topic: test-topic (custom config)', "Topic event title mismatch" - assert topic_event['alert_type'] == 'info', "Topic event alert type should be 'info'" - assert topic_event['aggregation_key'] == 'kafka_topic_config_test-topic', "Topic event aggregation key mismatch" - - # Verify topic event tags - expected_topic_tags = [ - 'test_tag:test_value', - 'kafka_cluster_id:test-cluster-id', - 'topic:test-topic', - 'event_type:topic_config', - ] - for tag in expected_topic_tags: - assert tag in topic_event['tags'], f"Missing topic event tag: {tag}" - - # Verify topic config content (msg_text should be JSON with realistic config data) - topic_config_json = json.loads(topic_event['msg_text']) - expected_topic_config = { - 'retention.ms': '604800000', - 'retention.bytes': '-1', - 'max.message.bytes': '1048588', - 'compression.type': 'producer', - 'cleanup.policy': 'delete', - } - assert topic_config_json == expected_topic_config, ( - f"Topic config mismatch. Expected {expected_topic_config}, got {topic_config_json}" - ) - - # Verify schema registry event - check complete structure and content - schema_events = [e for e in aggregator.events if 'event_type:schema_registry' in e.get('tags', [])] - assert len(schema_events) == 1, f"Expected 1 schema registry event, found {len(schema_events)}" - - schema_event = schema_events[0] - - # Verify event structure - assert schema_event['event_type'] == 'info', "Schema event type should be 'info'" - assert schema_event['source_type_name'] == 'kafka', "Schema event source should be 'kafka'" - assert schema_event['msg_title'] == 'test-topic (value) - Schema v2', "Schema event title mismatch" - assert schema_event['alert_type'] == 'info', "Schema event alert type should be 'info'" - assert schema_event['aggregation_key'] == 'kafka_schema_test-topic-value_2', "Schema event aggregation key mismatch" - - # Verify schema content (msg_text should be a valid Avro schema JSON) - schema_json = json.loads(schema_event['msg_text']) - expected_schema = { - "type": "record", - "name": "User", - "namespace": "com.example", - "fields": [ - {"name": "id", "type": "long"}, - {"name": "username", "type": "string"}, - {"name": "email", "type": ["null", "string"], "default": None}, - ], - } - assert schema_json == expected_schema, f"Schema mismatch. Expected {expected_schema}, got {schema_json}" - - # Verify the event has a timestamp - assert 'timestamp' in schema_event, "Schema event should have a timestamp" - assert isinstance(schema_event['timestamp'], int), "Schema event timestamp should be an integer" - - # Verify all expected tags are present - expected_schema_tags = [ - 'test_tag:test_value', - 'kafka_cluster_id:test-cluster-id', - 'subject:test-topic-value', - 'schema_id:1', - 'schema_version:2', - 'schema_type:AVRO', - 'topic:test-topic', - 'schema_for:value', - 'event_type:schema_registry', - ] - for tag in expected_schema_tags: - assert tag in schema_event['tags'], f"Missing schema event tag: {tag}" + # Broker, topic, and schema configs are emitted only to the Data Streams intake. + assert not [e for e in aggregator.events if 'event_type:broker_config' in e.get('tags', [])] + assert not [e for e in aggregator.events if 'event_type:topic_config' in e.get('tags', [])] + assert not [e for e in aggregator.events if 'event_type:schema_registry' in e.get('tags', [])] - # Verify events are also sent to Data Streams intake + # Verify events are sent to Data Streams intake ds_calls = kafka_consumer_check.event_platform_event.call_args_list ds_events = [ json.loads(call[0][0]) for call in ds_calls if len(call[0]) > 1 and call[0][1] == "data-streams-message" @@ -618,6 +553,17 @@ def mocked_read_cache(key): assert broker_ds['broker_host'] == 'broker1' assert broker_ds['broker_port'] == 9092 assert 'collection_timestamp' in broker_ds + expected_broker_config = { + 'log.retention.bytes': '1073741824', + 'log.retention.ms': '604800000', + 'log.segment.bytes': '1073741824', + 'num.partitions': '3', + 'num.network.threads': '3', + 'num.io.threads': '8', + 'default.replication.factor': '2', + 'min.insync.replicas': '1', + 'compression.type': 'producer', + } assert broker_ds['config'] == expected_broker_config topic_ds_events = [e for e in ds_events if e.get('config_type') == 'topic'] @@ -626,11 +572,18 @@ def mocked_read_cache(key): assert topic_ds['kafka_cluster_id'] == 'test-cluster-id' assert topic_ds['topic'] == 'test-topic' assert 'collection_timestamp' in topic_ds + expected_topic_config = { + 'retention.ms': '604800000', + 'retention.bytes': '-1', + 'max.message.bytes': '1048588', + 'compression.type': 'producer', + 'cleanup.policy': 'delete', + } assert topic_ds['config'] == expected_topic_config - schema_ds_events = [e for e in ds_events if e.get('config_type') == 'schema'] - assert len(schema_ds_events) >= 1, "Expected at least 1 schema Data Streams event" - schema_ds = schema_ds_events[0] + schema_events = [e for e in ds_events if e.get('config_type') == 'schema'] + assert len(schema_events) >= 1, "Expected at least 1 schema Data Streams event" + schema_ds = schema_events[0] assert schema_ds['kafka_cluster_id'] == 'test-cluster-id' assert schema_ds['subject'] == 'test-topic-value' assert schema_ds['schema_id'] == 1 @@ -638,6 +591,19 @@ def mocked_read_cache(key): assert schema_ds['schema_type'] == 'AVRO' assert 'collection_timestamp' in schema_ds assert 'schema' in schema_ds + assert schema_ds['compatibility'] == 'FULL' + assert schema_ds['global_compatibility'] == 'BACKWARD' + expected_schema = { + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + {"name": "id", "type": "long"}, + {"name": "username", "type": "string"}, + {"name": "email", "type": ["null", "string"], "default": None}, + ], + } + assert json.loads(schema_ds['schema']) == expected_schema def test_throughput_with_offset_decrease(check, dd_run_check, aggregator): @@ -809,17 +775,7 @@ def test_schema_registry_batching(check, dd_run_check, aggregator): With thousands of subjects, only a limited batch should be checked per check run to avoid overwhelming the registry. """ - instance = { - 'kafka_connect_str': 'localhost:9092', - 'enable_cluster_monitoring': True, - 'schema_registry_url': 'http://localhost:8081', - 'monitor_unlisted_consumer_groups': True, - } - - kafka_consumer_check = check(instance) - mock_kafka_client = seed_mock_kafka_client() - kafka_consumer_check.client = mock_kafka_client - kafka_consumer_check.metadata_collector.client = mock_kafka_client + kafka_consumer_check = _make_schema_registry_check(check) collector = kafka_consumer_check.metadata_collector # Set a small batch size for testing @@ -837,17 +793,9 @@ def test_schema_registry_batching(check, dd_run_check, aggregator): return_value={'id': 1, 'version': 1, 'schema': avro_schema, 'schemaType': 'AVRO'} ) - cache_storage = {} - - def mock_read(key): - return cache_storage.get(key) + _wire_cache(kafka_consumer_check) - def mock_write(key, value): - cache_storage[key] = value - - kafka_consumer_check.read_persistent_cache = mock.Mock(side_effect=mock_read) - kafka_consumer_check.write_persistent_cache = mock.Mock(side_effect=mock_write) - kafka_consumer_check.event_platform_event = mock.Mock() + mock_compatibility_methods(collector) # Run 1: first batch of 2 subjects dd_run_check(kafka_consumer_check) @@ -896,17 +844,7 @@ def test_schema_registry_schema_id_cache(check, dd_run_check, aggregator): Schema IDs in the registry are immutable, so once we fetch the content for a given ID we should never need to fetch it again. """ - instance = { - 'kafka_connect_str': 'localhost:9092', - 'enable_cluster_monitoring': True, - 'schema_registry_url': 'http://localhost:8081', - 'monitor_unlisted_consumer_groups': True, - } - - kafka_consumer_check = check(instance) - mock_kafka_client = seed_mock_kafka_client() - kafka_consumer_check.client = mock_kafka_client - kafka_consumer_check.metadata_collector.client = mock_kafka_client + kafka_consumer_check = _make_schema_registry_check(check) collector = kafka_consumer_check.metadata_collector @@ -920,17 +858,9 @@ def test_schema_registry_schema_id_cache(check, dd_run_check, aggregator): return_value={'id': 42, 'version': 3, 'schema': avro_schema, 'schemaType': 'AVRO'} ) - cache_storage = {} - - def mock_read(key): - return cache_storage.get(key) + cache_storage = _wire_cache(kafka_consumer_check) - def mock_write(key, value): - cache_storage[key] = value - - kafka_consumer_check.read_persistent_cache = mock.Mock(side_effect=mock_read) - kafka_consumer_check.write_persistent_cache = mock.Mock(side_effect=mock_write) - kafka_consumer_check.event_platform_event = mock.Mock() + mock_compatibility_methods(collector) dd_run_check(kafka_consumer_check) @@ -972,17 +902,7 @@ def test_schema_registry_two_tier_no_fetch_when_unchanged(check, dd_run_check, a The two-tier approach checks version numbers first (lightweight). If the max version matches what's cached, no full fetch (/versions/latest) should be made. """ - instance = { - 'kafka_connect_str': 'localhost:9092', - 'enable_cluster_monitoring': True, - 'schema_registry_url': 'http://localhost:8081', - 'monitor_unlisted_consumer_groups': True, - } - - kafka_consumer_check = check(instance) - mock_kafka_client = seed_mock_kafka_client() - kafka_consumer_check.client = mock_kafka_client - kafka_consumer_check.metadata_collector.client = mock_kafka_client + kafka_consumer_check = _make_schema_registry_check(check) collector = kafka_consumer_check.metadata_collector @@ -1001,19 +921,9 @@ def test_schema_registry_two_tier_no_fetch_when_unchanged(check, dd_run_check, a 'other-topic-key': {'version': 2, 'schema_id': 11}, } - cache_storage = { - 'kafka_schema_latest_version_cache': json.dumps(latest_version_cache), - } + _wire_cache(kafka_consumer_check, {'kafka_schema_latest_version_cache': json.dumps(latest_version_cache)}) - def mock_read(key): - return cache_storage.get(key) - - def mock_write(key, value): - cache_storage[key] = value - - kafka_consumer_check.read_persistent_cache = mock.Mock(side_effect=mock_read) - kafka_consumer_check.write_persistent_cache = mock.Mock(side_effect=mock_write) - kafka_consumer_check.event_platform_event = mock.Mock() + mock_compatibility_methods(collector) dd_run_check(kafka_consumer_check) @@ -1033,17 +943,7 @@ def test_schema_registry_two_tier_fetch_on_new_version(check, dd_run_check, aggr When a subject has a new version (e.g., max goes from 2 to 3), only that subject should trigger a full /versions/latest fetch. """ - instance = { - 'kafka_connect_str': 'localhost:9092', - 'enable_cluster_monitoring': True, - 'schema_registry_url': 'http://localhost:8081', - 'monitor_unlisted_consumer_groups': True, - } - - kafka_consumer_check = check(instance) - mock_kafka_client = seed_mock_kafka_client() - kafka_consumer_check.client = mock_kafka_client - kafka_consumer_check.metadata_collector.client = mock_kafka_client + kafka_consumer_check = _make_schema_registry_check(check) collector = kafka_consumer_check.metadata_collector @@ -1062,25 +962,17 @@ def mock_versions(subject): return_value={'id': 99, 'version': 3, 'schema': avro_schema, 'schemaType': 'AVRO'} ) + mock_compatibility_methods(collector) + # Pre-populate: both subjects were last seen at version 2 latest_version_cache = { 'unchanged-topic-value': {'version': 2, 'schema_id': 50}, 'changed-topic-value': {'version': 2, 'schema_id': 50}, } - cache_storage = { - 'kafka_schema_latest_version_cache': json.dumps(latest_version_cache), - } - - def mock_read(key): - return cache_storage.get(key) - - def mock_write(key, value): - cache_storage[key] = value - - kafka_consumer_check.read_persistent_cache = mock.Mock(side_effect=mock_read) - kafka_consumer_check.write_persistent_cache = mock.Mock(side_effect=mock_write) - kafka_consumer_check.event_platform_event = mock.Mock() + cache_storage = _wire_cache( + kafka_consumer_check, {'kafka_schema_latest_version_cache': json.dumps(latest_version_cache)} + ) dd_run_check(kafka_consumer_check) @@ -1093,8 +985,102 @@ def mock_write(key, value): # Latest version cache should be updated for the changed subject updated_cache = json.loads(cache_storage['kafka_schema_latest_version_cache']) - assert updated_cache['changed-topic-value'] == {'version': 3, 'schema_id': 99} - assert updated_cache['unchanged-topic-value'] == {'version': 2, 'schema_id': 50} # unchanged + assert updated_cache['changed-topic-value'] == {'version': 3, 'schema_id': 99, 'compatibility': 'BACKWARD'} + # Compatibility is refreshed on its own cadence, so the unchanged subject also picks it up. + assert updated_cache['unchanged-topic-value'] == {'version': 2, 'schema_id': 50, 'compatibility': 'BACKWARD'} + + +def test_schema_registry_compat_not_refetched_when_cache_fresh(check, dd_run_check, aggregator): + """A subject with a fresh compat-fetch cache and no version bump must not refetch compatibility. + + This guards the cadence-skip path: SCHEMA_COMPATIBILITY_FETCH_CACHE_KEY + remaining_slots logic + should keep _get_schema_registry_subject_compatibility from being called every run. + """ + kafka_consumer_check = _make_schema_registry_check(check) + collector = kafka_consumer_check.metadata_collector + + subject = 'my-topic-value' + collector._get_schema_registry_subjects = mock.Mock(return_value=[subject]) + collector._get_schema_registry_versions = mock.Mock(return_value=[1, 2]) + collector._get_schema_registry_latest_version = mock.Mock() + mock_compatibility_methods(collector) + + # Subject is already at version 2 (no bump) and its compat fetch cache is unexpired. + latest_version_cache = {subject: {'version': 2, 'schema_id': 50, 'compatibility': 'BACKWARD'}} + compat_fetch_cache = {subject: time.time() + 3600} + _wire_cache( + kafka_consumer_check, + { + 'kafka_schema_latest_version_cache': json.dumps(latest_version_cache), + 'kafka_schema_compatibility_fetch_cache': json.dumps(compat_fetch_cache), + }, + ) + + dd_run_check(kafka_consumer_check) + + # No version bump → no full fetch, and a fresh compat cache → no compat fetch. + collector._get_schema_registry_latest_version.assert_not_called() + collector._get_schema_registry_subject_compatibility.assert_not_called() + + +@pytest.mark.parametrize( + "global_compat, subject_compat, expected_compat, expected_global", + [ + pytest.param('BACKWARD', 'FULL', 'FULL', 'BACKWARD', id='subject_flip'), + pytest.param('FULL', 'BACKWARD', 'BACKWARD', 'FULL', id='global_flip'), + ], +) +def test_schema_registry_compatibility_flip_triggers_reemission( + check, dd_run_check, aggregator, global_compat, subject_compat, expected_compat, expected_global +): + """A compatibility change without a version bump (subject or global) triggers schema re-emission. + + The cache_content key includes both compatibility fields, so flipping either one causes + re-emission even when the schema version and content are identical and the subject is served + entirely from cache. + """ + kafka_consumer_check = _make_schema_registry_check(check) + collector = kafka_consumer_check.metadata_collector + + subject = 'my-topic-value' + avro_schema = json.dumps({"type": "string"}) + schema_id = 50 + + collector._get_schema_registry_subjects = mock.Mock(return_value=[subject]) + collector._get_schema_registry_versions = mock.Mock(return_value=[1, 2]) + collector._get_schema_registry_latest_version = mock.Mock( + return_value={'id': schema_id, 'version': 2, 'schema': avro_schema, 'schemaType': 'AVRO'} + ) + mock_compatibility_methods(collector, global_compat=global_compat, subject_compat=subject_compat) + + # Pre-populate caches as if a previous run emitted this subject under BACKWARD/BACKWARD. + old_cache_content = f"{schema_id}:2:BACKWARD:BACKWARD:{avro_schema}" + old_hash = hashlib.sha256(old_cache_content.encode()).hexdigest() + + latest_version_cache = {subject: {'version': 2, 'schema_id': schema_id, 'compatibility': 'BACKWARD'}} + schema_id_cache = {str(schema_id): {'schema': avro_schema, 'schema_type': 'AVRO'}} + schema_emit_cache = {subject: {'hash': old_hash, 'expire_at': time.time() + 3600}} + + _wire_cache( + kafka_consumer_check, + { + 'kafka_schema_latest_version_cache': json.dumps(latest_version_cache), + 'kafka_schema_id_cache': json.dumps(schema_id_cache), + 'kafka_schema_cache': json.dumps(schema_emit_cache), + }, + ) + + dd_run_check(kafka_consumer_check) + + # No version bump — full schema fetch should be skipped (subject served from cache). + collector._get_schema_registry_latest_version.assert_not_called() + + # Flipping either compatibility field should have triggered exactly one re-emission. + schema_events = schema_ds_events(kafka_consumer_check) + assert len(schema_events) == 1, f"Expected exactly 1 schema re-emission, got {len(schema_events)}" + assert schema_events[0]['subject'] == subject + assert schema_events[0]['compatibility'] == expected_compat + assert schema_events[0]['global_compatibility'] == expected_global @pytest.mark.parametrize( @@ -1365,6 +1351,14 @@ def test_schema_registry_url_encodes_subject_names(check): 'http://localhost:8081/subjects/google%2Fprotobuf%2Ftimestamp.proto/versions/latest' ) + collector.http.get.reset_mock() + mock_response.json.return_value = {'compatibilityLevel': 'BACKWARD'} + collector._get_schema_registry_subject_compatibility(subject) + collector.http.get.assert_called_with( + 'http://localhost:8081/config/google%2Fprotobuf%2Ftimestamp.proto', + params={'defaultToGlobal': 'true'}, + ) + @pytest.mark.parametrize( "replicas, isrs, expected_oos, expected_under", @@ -1458,3 +1452,98 @@ def test_heartbeat_brokers_empty_when_no_metadata(check): hb_events = [e for e in hb_events if e.get('config_type') == 'heartbeat'] assert len(hb_events) == 1 assert hb_events[0]['brokers'] == [] + + +def test_schema_registry_subject_compat_failure_on_version_bump_preserves_cached_compat( + check, dd_run_check, aggregator +): + """When compat fetch raises for a version-bumped subject, the previous cached value is preserved.""" + kafka_consumer_check = _make_schema_registry_check(check) + + subject = 'my-topic-value' + avro_schema = json.dumps({"type": "string"}) + schema_id = 42 + + collector = kafka_consumer_check.metadata_collector + collector._get_schema_registry_subjects = mock.Mock(return_value=[subject]) + collector._get_schema_registry_versions = mock.Mock(return_value=[1, 2, 3]) + collector._get_schema_registry_latest_version = mock.Mock( + return_value={'id': schema_id, 'version': 3, 'schema': avro_schema, 'schemaType': 'AVRO'} + ) + collector._get_schema_registry_global_compatibility = mock.Mock(return_value='BACKWARD') + collector._get_schema_registry_subject_compatibility = mock.Mock(side_effect=Exception("registry down")) + + # Previous run had version 2 with FULL compatibility cached. + latest_version_cache = {subject: {'version': 2, 'schema_id': 10, 'compatibility': 'FULL'}} + cache_storage = _wire_cache( + kafka_consumer_check, + { + 'kafka_schema_latest_version_cache': json.dumps(latest_version_cache), + 'kafka_schema_id_cache': json.dumps({}), + }, + ) + + dd_run_check(kafka_consumer_check) + + # The new cache entry must preserve the previously known compatibility, not write None. + saved_cache = json.loads(cache_storage.get('kafka_schema_latest_version_cache', '{}')) + assert saved_cache[subject]['compatibility'] == 'FULL' + + # The emitted payload must also carry the preserved compatibility. + schema_events = schema_ds_events(kafka_consumer_check) + assert len(schema_events) == 1 + assert schema_events[0]['compatibility'] == 'FULL' + + +def test_schema_registry_global_compat_failure_uses_last_known_value(check, dd_run_check, aggregator): + """When the global compatibility fetch fails, the last successfully fetched value is used.""" + kafka_consumer_check = _make_schema_registry_check(check) + + subject = 'my-topic-value' + avro_schema = json.dumps({"type": "string"}) + schema_id = 77 + + collector = kafka_consumer_check.metadata_collector + collector._get_schema_registry_subjects = mock.Mock(return_value=[subject]) + collector._get_schema_registry_versions = mock.Mock(return_value=[1]) + collector._get_schema_registry_latest_version = mock.Mock( + return_value={'id': schema_id, 'version': 1, 'schema': avro_schema, 'schemaType': 'AVRO'} + ) + collector._get_schema_registry_global_compatibility = mock.Mock(side_effect=Exception("registry down")) + collector._get_schema_registry_subject_compatibility = mock.Mock(return_value='BACKWARD') + + # Simulate a previously cached global compatibility of 'FULL'. + _wire_cache(kafka_consumer_check, {'kafka_schema_global_compatibility_cache': 'FULL'}) + + dd_run_check(kafka_consumer_check) + + schema_events = schema_ds_events(kafka_consumer_check) + assert len(schema_events) == 1 + assert schema_events[0]['global_compatibility'] == 'FULL' + + +def test_schema_registry_none_compat_in_cache_omits_field(check, dd_run_check, aggregator): + """A cached entry with compatibility=None must not include the field in the DS payload.""" + kafka_consumer_check = _make_schema_registry_check(check) + + subject = 'my-topic-value' + avro_schema = json.dumps({"type": "string"}) + schema_id = 99 + + collector = kafka_consumer_check.metadata_collector + collector._get_schema_registry_subjects = mock.Mock(return_value=[subject]) + collector._get_schema_registry_versions = mock.Mock(return_value=[1, 2]) + collector._get_schema_registry_latest_version = mock.Mock( + return_value={'id': schema_id, 'version': 2, 'schema': avro_schema, 'schemaType': 'AVRO'} + ) + collector._get_schema_registry_global_compatibility = mock.Mock(return_value=None) + collector._get_schema_registry_subject_compatibility = mock.Mock(return_value=None) + + _wire_cache(kafka_consumer_check) + + dd_run_check(kafka_consumer_check) + + schema_events = schema_ds_events(kafka_consumer_check) + assert len(schema_events) == 1 + assert 'compatibility' not in schema_events[0] + assert 'global_compatibility' not in schema_events[0] diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 35bbcacaf1488..0653cbd44a1c4 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -1,6 +1,7 @@ # (C) Datadog, Inc. 2023-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import json import logging from contextlib import nullcontext as does_not_raise @@ -708,3 +709,61 @@ def test_kafka_cluster_id_override(check, kafka_instance, dd_run_check, aggregat for metric in aggregator.metrics(metric_name): for tag in expected_override_tags: assert tag in metric.tags, f"{tag} not in {metric.tags} for {metric_name}" + + +def _connection_error_events(check_instance): + return [ + json.loads(c[0][0]) + for c in check_instance.event_platform_event.call_args_list + if c[0][1] == 'data-streams-message' and json.loads(c[0][0]).get('config_type') == 'connection_error' + ] + + +def _setup_failing_check(check, kafka_instance, dd_run_check): + kafka_consumer_check = check(kafka_instance) + kafka_consumer_check.client = seed_mock_client() + kafka_consumer_check.client.request_metadata_update.side_effect = Exception('broker down') + kafka_consumer_check.event_platform_event = mock.Mock() + with pytest.raises(Exception, match="Unable to connect to the AdminClient"): + dd_run_check(kafka_consumer_check) + return kafka_consumer_check + + +def test_connection_error_emits_dsm_event(check, kafka_instance, dd_run_check): + """A connection_error event is emitted when request_metadata_update fails and cluster monitoring is on.""" + kafka_instance['enable_cluster_monitoring'] = True + kafka_consumer_check = _setup_failing_check(check, kafka_instance, dd_run_check) + + events = _connection_error_events(kafka_consumer_check) + assert len(events) == 1 + assert events[0]['reason'] == 'broker down' + assert events[0]['bootstrap_servers'] == kafka_instance['kafka_connect_str'] + assert 'collection_timestamp' in events[0] + + +def test_connection_error_includes_cluster_id_override(check, kafka_instance, dd_run_check): + """connection_error event uses kafka_cluster_id_override when configured.""" + kafka_instance['enable_cluster_monitoring'] = True + kafka_instance['kafka_cluster_id_override'] = 'my-cluster' + kafka_consumer_check = _setup_failing_check(check, kafka_instance, dd_run_check) + + events = _connection_error_events(kafka_consumer_check) + assert len(events) == 1 + assert events[0]['kafka_cluster_id'] == 'my-cluster' + + +def test_connection_error_not_emitted_without_cluster_monitoring(check, kafka_instance, dd_run_check): + """No connection_error event is emitted when cluster monitoring is disabled.""" + kafka_consumer_check = _setup_failing_check(check, kafka_instance, dd_run_check) + assert not _connection_error_events(kafka_consumer_check) + + +def test_connection_error_sink_failure_does_not_mask_broker_error(check, kafka_instance, dd_run_check): + """Sink failure during connection_error emission does not mask the original AdminClient error.""" + kafka_instance['enable_cluster_monitoring'] = True + kafka_consumer_check = check(kafka_instance) + kafka_consumer_check.client = seed_mock_client() + kafka_consumer_check.client.request_metadata_update.side_effect = Exception('broker down') + kafka_consumer_check.event_platform_event = mock.Mock(side_effect=Exception('intake unavailable')) + with pytest.raises(Exception, match="Unable to connect to the AdminClient"): + dd_run_check(kafka_consumer_check) diff --git a/litellm/changelog.d/23957.added b/litellm/changelog.d/23957.added new file mode 100644 index 0000000000000..4d56bd7eb8b89 --- /dev/null +++ b/litellm/changelog.d/23957.added @@ -0,0 +1 @@ +add support for new metric names litellm_remaining_tokens_metric and litellm_remaining_requests_metric \ No newline at end of file diff --git a/litellm/datadog_checks/litellm/metrics.py b/litellm/datadog_checks/litellm/metrics.py index 6b4543471d520..1a8d455964185 100644 --- a/litellm/datadog_checks/litellm/metrics.py +++ b/litellm/datadog_checks/litellm/metrics.py @@ -64,6 +64,7 @@ 'litellm_remaining_requests_metric': "remaining_requests.metric", 'litellm_remaining_team_budget_metric': "remaining.team_budget.metric", 'litellm_remaining_tokens': "remaining_tokens", + 'litellm_remaining_tokens_metric': "remaining_tokens.metric", 'litellm_request_total_latency_metric': "request.total_latency.metric", 'litellm_reset_budget_job_failed_requests': "reset_budget_job.failed_requests", 'litellm_reset_budget_job_latency': "reset_budget_job.latency", diff --git a/litellm/tests/common.py b/litellm/tests/common.py index b222a4fe7887c..4637d54f987b1 100644 --- a/litellm/tests/common.py +++ b/litellm/tests/common.py @@ -124,4 +124,6 @@ def get_fixture_path(filename): 'litellm.total.tokens.count', 'litellm.input_tokens.metric.count', 'litellm.output_tokens.metric.count', + 'litellm.remaining_tokens.metric', + 'litellm.remaining_requests.metric', ] diff --git a/litellm/tests/fixtures/renamed_metrics.txt b/litellm/tests/fixtures/renamed_metrics.txt index 0a42b60532d9e..d18ea6985e4fe 100644 --- a/litellm/tests/fixtures/renamed_metrics.txt +++ b/litellm/tests/fixtures/renamed_metrics.txt @@ -6,4 +6,10 @@ litellm_total_tokens_metric_total 0.0 litellm_input_tokens_metric_total 0.0 # HELP litellm_output_tokens_metric_total Total number of output tokens from LLM requests # TYPE litellm_output_tokens_metric_total counter -litellm_output_tokens_metric_total 0.0 \ No newline at end of file +litellm_output_tokens_metric_total 0.0 +# HELP litellm_remaining_requests_metric LLM Deployment Analytics - remaining requests for model, returned from LLM API Provider +# TYPE litellm_remaining_requests_metric gauge +litellm_remaining_requests_metric 0.0 +# HELP litellm_remaining_tokens_metric remaining tokens for model, returned from LLM API Provider +# TYPE litellm_remaining_tokens_metric gauge +litellm_remaining_tokens_metric 0.0 \ No newline at end of file diff --git a/vsphere/assets/dashboards/vsphere_overview.json b/vsphere/assets/dashboards/vsphere_overview.json index f1bb2018c5ff8..39e6eab8edf46 100644 --- a/vsphere/assets/dashboards/vsphere_overview.json +++ b/vsphere/assets/dashboards/vsphere_overview.json @@ -857,7 +857,7 @@ { "data_source": "metrics", "name": "query1", - "query": "avg:vsphere.datastore.numberReadAveraged.avg{vsphere_type:datastore,$vcenter_server,$vcenter_datacenter , $vsphere_host , $VM} by {vsphere_datastore}" + "query": "avg:vsphere.datastore.numberReadAveraged.avg{$vcenter_server,$vcenter_datacenter,$vsphere_host,$VM} by {vmfs_uuid}" } ], "response_format": "timeseries", @@ -912,7 +912,7 @@ { "data_source": "metrics", "name": "query1", - "query": "avg:vsphere.disk.numberWriteAveraged.avg{vsphere_type:datastore,$vcenter_server,$vcenter_datacenter,$vsphere_host,$VM} by {vsphere_datastore}" + "query": "avg:vsphere.datastore.numberWriteAveraged.avg{$vcenter_server,$vcenter_datacenter,$vsphere_host,$VM} by {vmfs_uuid}" } ], "response_format": "timeseries",