Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fluxcd/assets/dashboards/fluxcd.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"title_size": "16",
"title_align": "left",
"type": "check_status",
"check": "cilium.prometheus.health",
"check": "fluxcd.openmetrics.health",
"grouping": "cluster",
"group_by": [],
"tags": []
Expand Down
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/23778.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Schema Registry: emit per-subject and global compatibility on the Data Streams schema payload, and stop emitting Datadog Events for broker, topic, and schema registry configurations (those payloads continue to flow to the Data Streams intake).
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/23902.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Emit connection_error DSM event when the integration cannot connect to Kafka.
383 changes: 233 additions & 150 deletions kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py

Large diffs are not rendered by default.

27 changes: 22 additions & 5 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,15 @@ def check(self, _):

try:
self.client.request_metadata_update()
except:
except Exception as e:
if self.config._cluster_monitoring_enabled:
try:
self._send_cluster_monitoring_connection_error(str(e))
except Exception:
self.log.warning("Failed to emit connection_error DSM event", exc_info=True)
raise Exception(
"Unable to connect to the AdminClient. This is likely due to an error in the configuration."
)
) from e

try:
# Fetch consumer offsets
Expand Down Expand Up @@ -140,19 +145,31 @@ def _get_broker_list(self) -> list[dict]:
for broker_meta in cluster_metadata.brokers.values()
]

def _emit_cluster_monitoring_event(self, payload: dict) -> None:
payload.setdefault('collection_timestamp', int(time() * 1000))
payload.setdefault('bootstrap_servers', self.config._kafka_connect_str)
self.event_platform_event(json.dumps(payload), "data-streams-message")

def _send_cluster_monitoring_connection_error(self, reason: str) -> None:
self._emit_cluster_monitoring_event(
{
'kafka_cluster_id': self.config._kafka_cluster_id_override or '',
'config_type': 'connection_error',
'reason': reason,
}
)

def _send_cluster_monitoring_heartbeat(self, total_contexts: int, cluster_id: str) -> None:
payload = {
'collection_timestamp': int(time() * 1000),
'kafka_cluster_id': cluster_id,
'config_type': 'heartbeat',
'contexts': total_contexts,
'contexts_limit': self._context_limit,
'bootstrap_servers': self.config._kafka_connect_str,
'brokers': self._get_broker_list(),
}
if self.config._kafka_cluster_id_override:
payload['original_kafka_cluster_id'] = self.config._auto_detected_cluster_id
self.event_platform_event(json.dumps(payload), "data-streams-message")
self._emit_cluster_monitoring_event(payload)

def get_consumer_offsets(self):
# {(consumer_group, topic, partition): offset}
Expand Down
Loading
Loading