diff --git a/kafka_actions/assets/configuration/spec.yaml b/kafka_actions/assets/configuration/spec.yaml index b43f0955f03b9..2e1a068120148 100644 --- a/kafka_actions/assets/configuration/spec.yaml +++ b/kafka_actions/assets/configuration/spec.yaml @@ -502,7 +502,8 @@ files: description: | Configuration for updating consumer group offsets. WARNING: Can cause duplicate processing or data loss. - Consumer group should be stopped (no active members). + The consumer group must have no active members before this action runs. + The check enforces this and will fail with a clear error if members are found. value: type: object required: @@ -518,7 +519,9 @@ files: offset: 1000 - topic: orders partition: 1 - offset: 1500 + offset: -2 + - topic: payments + timestamp: 1735689600000 properties: - name: cluster type: string @@ -530,30 +533,43 @@ files: example: order-processor - name: offsets type: array - description: List of topic-partition-offset tuples to update + description: | + List of offset specifications. Each entry must specify exactly one of `offset` + or `timestamp`. See the action description for details. items: type: object required: - topic - - partition - - offset properties: - name: topic type: string description: Topic name - name: partition type: integer - description: Partition number + description: | + Non-negative partition number. Required when `offset` is specified. + Optional when `timestamp` is specified. Omit to target all partitions. - name: offset type: integer - description: New offset value + description: | + Offset to commit. Use -2 for earliest (log-start), -1 for latest + (high-watermark), or a non-negative integer for an explicit position. + Mutually exclusive with `timestamp`. Requires `partition`. + - name: timestamp + type: integer + description: | + Milliseconds since epoch. Resets to the first offset at or after this + timestamp. Partitions with no message at or after the timestamp are + reset to latest. Mutually exclusive with `offset`. example: - topic: orders partition: 0 offset: 1000 - topic: orders partition: 1 - offset: 1500 + offset: -2 + - topic: payments + timestamp: 1735689600000 fleet_configurable: false # ======================================================================== diff --git a/kafka_actions/changelog.d/24165.added b/kafka_actions/changelog.d/24165.added new file mode 100644 index 0000000000000..d3bfc98aecee4 --- /dev/null +++ b/kafka_actions/changelog.d/24165.added @@ -0,0 +1 @@ +Added earliest, latest, and timestamp-based offset support to the ``update_consumer_group_offsets`` action, along with an inactive-group precondition check and per-partition error reporting. diff --git a/kafka_actions/datadog_checks/kafka_actions/check.py b/kafka_actions/datadog_checks/kafka_actions/check.py index e5aee37daa984..8f5cb21ca8b5e 100644 --- a/kafka_actions/datadog_checks/kafka_actions/check.py +++ b/kafka_actions/datadog_checks/kafka_actions/check.py @@ -743,10 +743,12 @@ def _action_update_consumer_group_offsets(self): offsets: - topic: orders partition: 0 - offset: 1000 + offset: 1000 # explicit offset - topic: orders partition: 1 - offset: 1500 + offset: -2 # earliest + - topic: payments + timestamp: 1735689600000 # all partitions at/after this timestamp """ config = self.config.update_consumer_group_offsets @@ -755,10 +757,14 @@ def _action_update_consumer_group_offsets(self): consumer_group = config['consumer_group'] offsets = config['offsets'] + self.kafka_client.check_consumer_group_inactive(consumer_group) + self.log.warning( - "Updating offsets for consumer group '%s' on cluster '%s' - may cause duplicate processing or data loss", + "Updating offsets for consumer group '%s' on cluster '%s' - may cause duplicate processing or data loss. " + "Offsets: %s", consumer_group, self.cluster, + offsets, ) success = self.kafka_client.update_consumer_group_offsets(consumer_group=consumer_group, offsets=offsets) diff --git a/kafka_actions/datadog_checks/kafka_actions/config.py b/kafka_actions/datadog_checks/kafka_actions/config.py index b6f944055636e..3de5b0ba9c374 100644 --- a/kafka_actions/datadog_checks/kafka_actions/config.py +++ b/kafka_actions/datadog_checks/kafka_actions/config.py @@ -2,6 +2,7 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) import os +from typing import Any from datadog_checks.base import ConfigurationError, is_affirmative @@ -289,6 +290,11 @@ def _validate_delete_consumer_group(self): if not config.get('consumer_group'): raise ConfigurationError("delete_consumer_group action requires 'consumer_group' parameter") + def _validate_offset_entry_partition(self, offset_entry: dict[str, Any], index: int) -> None: + """Validate that offsets[index].partition, if present, is a non-negative integer.""" + if not isinstance(offset_entry['partition'], int) or offset_entry['partition'] < 0: + raise ConfigurationError(f"offsets[{index}].partition must be a non-negative integer") + def _validate_update_consumer_group_offsets(self): """Validate update_consumer_group_offsets action configuration.""" config = self.update_consumer_group_offsets @@ -310,17 +316,36 @@ def _validate_update_consumer_group_offsets(self): if not offset_entry.get('topic'): raise ConfigurationError(f"offsets[{i}] requires 'topic' parameter") - if 'partition' not in offset_entry: - raise ConfigurationError(f"offsets[{i}] requires 'partition' parameter") + has_offset = 'offset' in offset_entry + has_timestamp = 'timestamp' in offset_entry + + if not has_offset and not has_timestamp: + raise ConfigurationError(f"offsets[{i}] requires 'offset' or 'timestamp'") + + if has_offset and has_timestamp: + raise ConfigurationError(f"offsets[{i}] cannot specify both 'offset' and 'timestamp'") - if 'offset' not in offset_entry: - raise ConfigurationError(f"offsets[{i}] requires 'offset' parameter") + if has_offset: + if 'partition' not in offset_entry: + raise ConfigurationError(f"offsets[{i}] requires 'partition' when 'offset' is specified") - if not isinstance(offset_entry.get('partition'), int): - raise ConfigurationError(f"offsets[{i}].partition must be an integer") + self._validate_offset_entry_partition(offset_entry, i) + + offset_val = offset_entry['offset'] + if not isinstance(offset_val, int) or offset_val < -2: + raise ConfigurationError( + f"offsets[{i}].offset must be -2 (earliest), -1 (latest), or a non-negative integer" + ) + + if has_timestamp: + ts = offset_entry['timestamp'] + if not isinstance(ts, int) or ts <= 0: + raise ConfigurationError( + f"offsets[{i}].timestamp must be a positive integer (milliseconds since epoch)" + ) - if not isinstance(offset_entry.get('offset'), int): - raise ConfigurationError(f"offsets[{i}].offset must be an integer") + if 'partition' in offset_entry: + self._validate_offset_entry_partition(offset_entry, i) def _validate_produce_message(self): """Validate produce_message action configuration.""" diff --git a/kafka_actions/datadog_checks/kafka_actions/config_models/instance.py b/kafka_actions/datadog_checks/kafka_actions/config_models/instance.py index ac656dd818ecd..4b288a35a2874 100644 --- a/kafka_actions/datadog_checks/kafka_actions/config_models/instance.py +++ b/kafka_actions/datadog_checks/kafka_actions/config_models/instance.py @@ -205,8 +205,18 @@ class Offset(BaseModel): arbitrary_types_allowed=True, frozen=True, ) - offset: int = Field(..., description='New offset value') - partition: int = Field(..., description='Partition number') + offset: Optional[int] = Field( + None, + description='Offset to commit. Use -2 for earliest (log-start), -1 for latest\n(high-watermark), or a non-negative integer for an explicit position.\nMutually exclusive with `timestamp`. Requires `partition`.\n', + ) + partition: Optional[int] = Field( + None, + description='Non-negative partition number. Required when `offset` is specified.\nOptional when `timestamp` is specified. Omit to target all partitions.\n', + ) + timestamp: Optional[int] = Field( + None, + description='Milliseconds since epoch. Resets to the first offset at or after this\ntimestamp. Partitions with no message at or after the timestamp are\nreset to latest. Mutually exclusive with `offset`.\n', + ) topic: str = Field(..., description='Topic name') @@ -219,11 +229,12 @@ class UpdateConsumerGroupOffsets(BaseModel): consumer_group: str = Field(..., description='Consumer group ID to update', examples=['order-processor']) offsets: tuple[Offset, ...] = Field( ..., - description='List of topic-partition-offset tuples to update', + description='List of offset specifications. Each entry must specify exactly one of `offset`\nor `timestamp`. See the action description for details.\n', examples=[ [ {'offset': 1000, 'partition': 0, 'topic': 'orders'}, - {'offset': 1500, 'partition': 1, 'topic': 'orders'}, + {'offset': -2, 'partition': 1, 'topic': 'orders'}, + {'timestamp': 1735689600000, 'topic': 'payments'}, ] ], ) diff --git a/kafka_actions/datadog_checks/kafka_actions/data/conf.yaml.example b/kafka_actions/datadog_checks/kafka_actions/data/conf.yaml.example index a2528c7c0d42d..514c369b28f94 100644 --- a/kafka_actions/datadog_checks/kafka_actions/data/conf.yaml.example +++ b/kafka_actions/datadog_checks/kafka_actions/data/conf.yaml.example @@ -259,7 +259,8 @@ instances: ## @param update_consumer_group_offsets - mapping - optional ## Configuration for updating consumer group offsets. ## WARNING: Can cause duplicate processing or data loss. - ## Consumer group should be stopped (no active members). + ## The consumer group must have no active members before this action runs. + ## The check enforces this and will fail with a clear error if members are found. # # update_consumer_group_offsets: # cluster: prod-kafka-1 @@ -270,7 +271,9 @@ instances: # offset: 1000 # - topic: orders # partition: 1 - # offset: 1500 + # offset: -2 + # - topic: payments + # timestamp: 1735689600000 ## @param produce_message - mapping - optional ## Configuration for producing a message to a Kafka topic. diff --git a/kafka_actions/datadog_checks/kafka_actions/kafka_client.py b/kafka_actions/datadog_checks/kafka_actions/kafka_client.py index 0ab39818747af..d9c6e7e76ad9d 100644 --- a/kafka_actions/datadog_checks/kafka_actions/kafka_client.py +++ b/kafka_actions/datadog_checks/kafka_actions/kafka_client.py @@ -8,7 +8,14 @@ import time from typing import TYPE_CHECKING, Any -from confluent_kafka import Consumer, KafkaError, KafkaException, Producer, TopicPartition +from confluent_kafka import ( + Consumer, + ConsumerGroupTopicPartitions, + KafkaError, + KafkaException, + Producer, + TopicPartition, +) from confluent_kafka.admin import AdminClient, ConfigResource, NewTopic, OffsetSpec, ResourceType try: @@ -228,11 +235,7 @@ def consume_messages( try: if partition == -1: - metadata = consumer.list_topics(topic, timeout=10) - if topic not in metadata.topics: - raise ValueError(f"Topic '{topic}' not found") - - partition_ids = list(metadata.topics[topic].partitions.keys()) + partition_ids = self._discover_partition_ids(consumer, topic) else: partition_ids = [partition] @@ -308,6 +311,13 @@ def consume_messages( consumer.close() self.consumer = None + def _discover_partition_ids(self, client, topic: str) -> list[int]: + """Look up all partition IDs for a topic via the given client's list_topics.""" + metadata = client.list_topics(topic, timeout=10) + if topic not in metadata.topics: + raise ValueError(f"Topic '{topic}' not found") + return list(metadata.topics[topic].partitions.keys()) + def _resolve_start_offsets( self, consumer, @@ -561,15 +571,117 @@ def delete_consumer_group(self, consumer_group: str) -> bool: self.log.error("Failed to delete consumer group '%s': %s", group_id, e) raise + def check_consumer_group_inactive(self, consumer_group: str) -> None: + """Raise if the consumer group has active members. + + alter_consumer_group_offsets requires a dead or empty group; active members + cause Kafka to return NON_EMPTY_GROUP errors per partition. + """ + admin = self.get_admin_client() + futures = admin.describe_consumer_groups([consumer_group], request_timeout=10) + future = futures[consumer_group] + try: + description = future.result() + except Exception as e: + self.log.error("Failed to describe consumer group '%s': %s", consumer_group, e) + raise + if description.members: + raise Exception( + f"Consumer group '{consumer_group}' has {len(description.members)} active member(s). " + "Stop all consumers in the group before resetting offsets." + ) + + def _resolve_sentinel_offsets( + self, admin: AdminClient, requests: list[tuple[str, int, int]] + ) -> dict[tuple[str, int], int]: + """Resolve a batch of sentinel offset values (-1 latest, -2 earliest) to concrete offsets. + + requests is a list of (topic, partition, offset) tuples; returns a dict keyed by + (topic, partition) mapping to the resolved concrete offset. + """ + tp_by_key = {} + offset_request = {} + for topic, partition, offset in requests: + if offset not in (-1, -2): + raise ValueError(f"Sentinel offset must be -1 (latest) or -2 (earliest), got {offset}") + tp = TopicPartition(topic, partition) + tp_by_key[(topic, partition)] = tp + offset_request[tp] = OffsetSpec.earliest() if offset == -2 else OffsetSpec.latest() + + futures = admin.list_offsets(offset_request, request_timeout=10) + resolved = {} + for (topic, partition), tp in tp_by_key.items(): + try: + resolved[(topic, partition)] = futures[tp].result().offset + except Exception as e: + self.log.error("Failed to resolve sentinel offset for %s[%d]: %s", topic, partition, e) + raise + return resolved + + def _resolve_sentinel_offset(self, admin: AdminClient, topic: str, partition: int, offset: int) -> int: + """Resolve a single sentinel offset value (-1 latest, -2 earliest) to a concrete offset.""" + return self._resolve_sentinel_offsets(admin, [(topic, partition, offset)])[(topic, partition)] + + def _resolve_timestamp_targets( + self, admin: AdminClient, topic: str, partition: int | None, timestamp: int + ) -> list[TopicPartition]: + """Resolve a timestamp offset spec to concrete TopicPartitions for one or all partitions of a topic.""" + partition_ids = [partition] if partition is not None else self._discover_partition_ids(admin, topic) + + offset_request = {TopicPartition(topic, p): OffsetSpec.for_timestamp(timestamp) for p in partition_ids} + futures = admin.list_offsets(offset_request, request_timeout=10) + + resolved_by_partition = {} + for tp, future in futures.items(): + try: + resolved_by_partition[tp.partition] = future.result().offset + except Exception as e: + self.log.error("Failed to resolve timestamp offset for %s[%d]: %s", topic, tp.partition, e) + raise + + no_message_partitions = [p for p, offset in resolved_by_partition.items() if offset == -1] + if no_message_partitions: + fallback = self._resolve_sentinel_offsets(admin, [(topic, p, -1) for p in no_message_partitions]) + for p in no_message_partitions: + resolved_by_partition[p] = fallback[(topic, p)] + self.log.debug( + "Partition %d: no message at timestamp %d, using latest offset %d", + p, + timestamp, + resolved_by_partition[p], + ) + + resolved_partitions = [] + for partition_id, resolved in resolved_by_partition.items(): + if partition_id not in no_message_partitions: + self.log.debug("Partition %d: timestamp %d resolved to offset %d", partition_id, timestamp, resolved) + resolved_partitions.append(TopicPartition(topic, partition_id, resolved)) + return resolved_partitions + + def _resolve_explicit_target(self, admin: AdminClient, topic: str, partition: int, offset: int) -> TopicPartition: + """Resolve an explicit or sentinel offset spec to a concrete TopicPartition.""" + if offset in (-1, -2): + resolved = self._resolve_sentinel_offset(admin, topic, partition, offset) + label = 'earliest' if offset == -2 else 'latest' + self.log.debug("Resolved '%s' for %s[%d] to offset %d", label, topic, partition, resolved) + else: + resolved = offset + return TopicPartition(topic, partition, resolved) + def update_consumer_group_offsets(self, consumer_group: str, offsets: list[dict[str, Any]]) -> bool: """Update consumer group offsets for specific topic-partitions. Args: consumer_group: Consumer group ID offsets: List of offset specifications, each with: - - topic: Topic name - - partition: Partition number - - offset: New offset value + - topic: Topic name (required) + - partition: Partition number. Required when 'offset' is specified; + optional when 'timestamp' is specified (auto-discovers all partitions). + - offset: Offset to commit. Use -2 for earliest, -1 for latest, or a + non-negative integer for an explicit offset. Mutually exclusive with timestamp. + - timestamp: Milliseconds since epoch. Resets to the first offset at or after + this timestamp in each matching partition. When no message exists at or after + the timestamp the partition is reset to latest. Mutually exclusive with offset. Returns: True if successful @@ -577,28 +689,54 @@ def update_consumer_group_offsets(self, consumer_group: str, offsets: list[dict[ admin = self.get_admin_client() topic_partitions = [] + seen_targets = set() for offset_spec in offsets: topic = offset_spec.get('topic') partition = offset_spec.get('partition') offset = offset_spec.get('offset') + timestamp = offset_spec.get('timestamp') - if topic is None or partition is None or offset is None: - raise ValueError("Each offset specification must have 'topic', 'partition', and 'offset'") + if topic is None: + raise ValueError("Each offset specification must have 'topic'") + if offset is not None and timestamp is not None: + raise ValueError(f"offsets entry for topic '{topic}' cannot specify both 'offset' and 'timestamp'") - tp = TopicPartition(topic, partition, offset) - topic_partitions.append(tp) + if timestamp is not None: + targets = self._resolve_timestamp_targets(admin, topic, partition, timestamp) + else: + if partition is None: + raise ValueError("Each offset specification must have 'partition' when 'offset' is specified") + targets = [self._resolve_explicit_target(admin, topic, partition, offset)] + + for tp in targets: + key = (tp.topic, tp.partition) + if key in seen_targets: + raise ValueError( + f"Multiple offset specifications target the same partition: {tp.topic}[{tp.partition}]" + ) + seen_targets.add(key) + topic_partitions.append(tp) - futures = admin.alter_consumer_group_offsets(consumer_group, topic_partitions) + futures = admin.alter_consumer_group_offsets([ConsumerGroupTopicPartitions(consumer_group, topic_partitions)]) for group_id, future in futures.items(): try: - future.result() - self.log.debug("Consumer group '%s' offsets updated for %d partitions", group_id, len(topic_partitions)) - return True + result = future.result() except Exception as e: self.log.error("Failed to update consumer group '%s' offsets: %s", group_id, e) raise + partition_errors = [ + f"{tp.topic}[{tp.partition}]: {tp.error}" for tp in result.topic_partitions if tp.error is not None + ] + if partition_errors: + error_msg = f"Per-partition errors for group '{group_id}': {'; '.join(partition_errors)}" + self.log.error(error_msg) + raise Exception(error_msg) + + self.log.debug("Consumer group '%s' offsets updated for %d partitions", group_id, len(topic_partitions)) + return True + def close(self): """Close all Kafka clients.""" if self.consumer: diff --git a/kafka_actions/tests/test_config_validation.py b/kafka_actions/tests/test_config_validation.py index 29e302f56137f..f3eb44a90b6ae 100644 --- a/kafka_actions/tests/test_config_validation.py +++ b/kafka_actions/tests/test_config_validation.py @@ -3,6 +3,7 @@ # Licensed under a 3-clause BSD style license (see LICENSE) import base64 +from contextlib import nullcontext import pytest @@ -12,6 +13,18 @@ pytestmark = [pytest.mark.unit] +def _update_consumer_group_offsets_instance(offsets=None): + """Build an instance dict for the update_consumer_group_offsets action, omitting 'offsets' when None.""" + action_config = {'cluster': 'test', 'consumer_group': 'test'} + if offsets is not None: + action_config['offsets'] = offsets + return { + 'remote_config_id': 'test-id', + 'kafka_connect_str': 'localhost:9092', + 'update_consumer_group_offsets': action_config, + } + + class TestConfigValidation: """Test configuration validation.""" @@ -237,31 +250,67 @@ def test_missing_topic(self): class TestUpdateConsumerGroupOffsetsValidation: """Test update_consumer_group_offsets action validation.""" - def test_missing_offsets(self): - """Test that missing offsets raises error.""" - instance = { - 'remote_config_id': 'test-id', - 'kafka_connect_str': 'localhost:9092', - 'update_consumer_group_offsets': {'cluster': 'test', 'consumer_group': 'test'}, - } - - with pytest.raises(ConfigurationError, match="update_consumer_group_offsets action requires 'offsets' list"): - config = KafkaActionsConfig(instance, None) - config.validate_config() - - def test_invalid_offset_entry(self): - """Test that invalid offset entry raises error.""" - instance = { - 'remote_config_id': 'test-id', - 'kafka_connect_str': 'localhost:9092', - 'update_consumer_group_offsets': { - 'cluster': 'test', - 'consumer_group': 'test', - 'offsets': [{'topic': 'test'}], # Missing partition and offset - }, - } - - with pytest.raises(ConfigurationError, match="offsets\\[0\\] requires 'partition' parameter"): + @pytest.mark.parametrize( + ('offsets', 'expected_error'), + [ + pytest.param(None, "update_consumer_group_offsets action requires 'offsets' list", id='missing_offsets'), + pytest.param( + [{'topic': 'test'}], + r"offsets\[0\] requires 'offset' or 'timestamp'", + id='missing_offset_and_timestamp', + ), + pytest.param( + [{'topic': 'test', 'partition': 0, 'offset': 100, 'timestamp': 1735689600000}], + r"offsets\[0\] cannot specify both 'offset' and 'timestamp'", + id='both_offset_and_timestamp', + ), + pytest.param( + [{'topic': 'test', 'offset': 100}], + r"offsets\[0\] requires 'partition' when 'offset' is specified", + id='missing_partition_with_offset', + ), + pytest.param( + [{'topic': 'test', 'partition': 0, 'offset': -3}], + r"offsets\[0\].offset must be -2", + id='out_of_range_offset', + ), + pytest.param( + [ + {'topic': 'test', 'partition': 0, 'offset': -2}, + {'topic': 'test', 'partition': 1, 'offset': -1}, + ], + None, + id='sentinel_offsets_are_valid', + ), + pytest.param( + [{'topic': 'test', 'partition': -1, 'offset': 0}], + r"offsets\[0\].partition must be a non-negative integer", + id='negative_partition', + ), + pytest.param( + [{'topic': 'test', 'partition': -1, 'timestamp': 1735689600000}], + r"offsets\[0\].partition must be a non-negative integer", + id='negative_partition_with_timestamp', + ), + pytest.param( + [{'topic': 'test', 'timestamp': -1}], + r"offsets\[0\].timestamp must be a positive integer", + id='invalid_timestamp', + ), + pytest.param([{'topic': 'test', 'timestamp': 1735689600000}], None, id='valid_timestamp_all_partitions'), + pytest.param( + [{'topic': 'test', 'partition': 2, 'timestamp': 1735689600000}], + None, + id='valid_timestamp_specific_partition', + ), + ], + ) + def test_offsets_validation(self, offsets, expected_error): + """Test update_consumer_group_offsets offsets validation across valid and invalid entries.""" + instance = _update_consumer_group_offsets_instance(offsets) + expectation = pytest.raises(ConfigurationError, match=expected_error) if expected_error else nullcontext() + + with expectation: config = KafkaActionsConfig(instance, None) config.validate_config() diff --git a/kafka_actions/tests/test_unit.py b/kafka_actions/tests/test_unit.py index 46ba38d4a3a1f..5c6225e5afdbd 100644 --- a/kafka_actions/tests/test_unit.py +++ b/kafka_actions/tests/test_unit.py @@ -8,7 +8,8 @@ from unittest.mock import MagicMock, patch import pytest -from confluent_kafka import KafkaError +from confluent_kafka import KafkaError, KafkaException, TopicPartition +from confluent_kafka.admin import OffsetSpec from datadog_checks.kafka_actions import KafkaActionsCheck from datadog_checks.kafka_actions.kafka_client import KafkaActionsClient @@ -26,6 +27,28 @@ def _futures(offsets): return out +def _offset_future(offset): + fut = MagicMock() + fut.result.return_value = MagicMock(offset=offset) + return fut + + +def _offset_future_with_topic_partitions(topic_partitions): + """Build a future mimicking AdminClient.alter_consumer_group_offsets results.""" + future = MagicMock() + future.result.return_value = MagicMock(topic_partitions=topic_partitions) + return future + + +def _list_offsets_stub(offsets_by_partition): + """A `list_offsets`-compatible side_effect resolving any requested TopicPartition by partition number.""" + + def _stub(request, **kwargs): + return {tp: _offset_future(offsets_by_partition[tp.partition]) for tp in request} + + return _stub + + def _eof(partition): """A mock poll() result representing a _PARTITION_EOF event.""" msg = MagicMock() @@ -272,7 +295,7 @@ class TestUpdateConsumerGroupOffsetsAction: """Test update_consumer_group_offsets action.""" def test_update_consumer_group_offsets(self, aggregator, dd_run_check): - """Test updating consumer group offsets.""" + """Test updating consumer group offsets with explicit values.""" instance = { 'remote_config_id': 'test-update-offsets-001', 'kafka_connect_str': 'localhost:9092', @@ -291,6 +314,7 @@ def test_update_consumer_group_offsets(self, aggregator, dd_run_check): with ( patch.object(check.kafka_client, 'update_consumer_group_offsets', return_value=True) as mock_update_offsets, patch.object(check.kafka_client, 'get_cluster_id', return_value='test-cluster'), + patch.object(check.kafka_client, 'check_consumer_group_inactive'), ): dd_run_check(check) @@ -300,6 +324,248 @@ def test_update_consumer_group_offsets(self, aggregator, dd_run_check): assert len(call_kwargs['offsets']) == 3 assert call_kwargs['offsets'][0] == {'topic': 'orders', 'partition': 0, 'offset': 1000} + def test_update_consumer_group_offsets_with_sentinels(self, aggregator, dd_run_check): + """Test updating consumer group offsets with sentinel values -2 (earliest) and -1 (latest).""" + instance = { + 'remote_config_id': 'test-update-offsets-002', + 'kafka_connect_str': 'localhost:9092', + 'update_consumer_group_offsets': { + 'cluster': 'test-cluster', + 'consumer_group': 'my-consumer-group', + 'offsets': [ + {'topic': 'orders', 'partition': 0, 'offset': -2}, + {'topic': 'orders', 'partition': 1, 'offset': -1}, + ], + }, + } + + check = KafkaActionsCheck('kafka_actions', {}, [instance]) + with ( + patch.object(check.kafka_client, 'update_consumer_group_offsets', return_value=True) as mock_update_offsets, + patch.object(check.kafka_client, 'get_cluster_id', return_value='test-cluster'), + patch.object(check.kafka_client, 'check_consumer_group_inactive'), + ): + dd_run_check(check) + + mock_update_offsets.assert_called_once() + call_kwargs = mock_update_offsets.call_args[1] + assert call_kwargs['offsets'][0] == {'topic': 'orders', 'partition': 0, 'offset': -2} + assert call_kwargs['offsets'][1] == {'topic': 'orders', 'partition': 1, 'offset': -1} + + def test_update_consumer_group_offsets_with_timestamp(self, aggregator, dd_run_check): + """Test updating consumer group offsets with a timestamp (all partitions).""" + instance = { + 'remote_config_id': 'test-update-offsets-004', + 'kafka_connect_str': 'localhost:9092', + 'update_consumer_group_offsets': { + 'cluster': 'test-cluster', + 'consumer_group': 'my-consumer-group', + 'offsets': [ + {'topic': 'payments', 'timestamp': 1735689600000}, + ], + }, + } + + check = KafkaActionsCheck('kafka_actions', {}, [instance]) + with ( + patch.object(check.kafka_client, 'update_consumer_group_offsets', return_value=True) as mock_update_offsets, + patch.object(check.kafka_client, 'get_cluster_id', return_value='test-cluster'), + patch.object(check.kafka_client, 'check_consumer_group_inactive'), + ): + dd_run_check(check) + + mock_update_offsets.assert_called_once() + call_kwargs = mock_update_offsets.call_args[1] + assert call_kwargs['offsets'][0] == {'topic': 'payments', 'timestamp': 1735689600000} + + def test_update_consumer_group_offsets_blocks_on_active_group(self, aggregator, dd_run_check): + """Test that the check aborts when the consumer group has active members.""" + instance = { + 'remote_config_id': 'test-update-offsets-003', + 'kafka_connect_str': 'localhost:9092', + 'update_consumer_group_offsets': { + 'cluster': 'test-cluster', + 'consumer_group': 'my-consumer-group', + 'offsets': [{'topic': 'orders', 'partition': 0, 'offset': 0}], + }, + } + + check = KafkaActionsCheck('kafka_actions', {}, [instance]) + with ( + patch.object(check.kafka_client, 'get_cluster_id', return_value='test-cluster'), + patch.object( + check.kafka_client, + 'check_consumer_group_inactive', + side_effect=Exception("Consumer group 'my-consumer-group' has 2 active member(s)."), + ), + ): + with pytest.raises(Exception, match="active member"): + dd_run_check(check) + + +class TestCheckConsumerGroupInactive: + """Test the check_consumer_group_inactive guard.""" + + def test_raises_when_group_has_active_members(self): + mock_admin = MagicMock() + description = MagicMock(members=[MagicMock(), MagicMock()]) + future = MagicMock() + future.result.return_value = description + mock_admin.describe_consumer_groups.return_value = {'my-group': future} + + client = _client() + with patch.object(client, 'get_admin_client', return_value=mock_admin): + with pytest.raises(Exception, match="2 active member"): + client.check_consumer_group_inactive('my-group') + + mock_admin.describe_consumer_groups.assert_called_once_with(['my-group'], request_timeout=10) + + def test_passes_when_group_has_no_members(self): + mock_admin = MagicMock() + description = MagicMock(members=[]) + future = MagicMock() + future.result.return_value = description + mock_admin.describe_consumer_groups.return_value = {'my-group': future} + + client = _client() + with patch.object(client, 'get_admin_client', return_value=mock_admin): + client.check_consumer_group_inactive('my-group') + + +class TestResolveSentinelOffset: + """Test _resolve_sentinel_offset.""" + + def test_resolves_earliest(self): + mock_admin = MagicMock() + mock_admin.list_offsets.side_effect = _list_offsets_stub({0: 42}) + + client = _client() + assert client._resolve_sentinel_offset(mock_admin, 't', 0, -2) == 42 + + def test_resolves_latest(self): + mock_admin = MagicMock() + mock_admin.list_offsets.side_effect = _list_offsets_stub({0: 99}) + + client = _client() + assert client._resolve_sentinel_offset(mock_admin, 't', 0, -1) == 99 + + def test_rejects_non_sentinel_offset_without_calling_admin(self): + mock_admin = MagicMock() + client = _client() + with pytest.raises(ValueError, match="Sentinel offset"): + client._resolve_sentinel_offset(mock_admin, 't', 0, 5) + + mock_admin.list_offsets.assert_not_called() + + def test_propagates_result_error(self): + tp = TopicPartition('t', 0) + future = MagicMock() + future.result.side_effect = KafkaException(MagicMock()) + mock_admin = MagicMock() + mock_admin.list_offsets.return_value = {tp: future} + + client = _client() + with pytest.raises(KafkaException): + client._resolve_sentinel_offset(mock_admin, 't', 0, -1) + + +class TestUpdateConsumerGroupOffsetsClient: + """Test update_consumer_group_offsets client-internal offset resolution logic.""" + + def test_resolves_sentinel_and_explicit_offsets(self): + mock_admin = MagicMock() + mock_admin.list_offsets.side_effect = _list_offsets_stub({0: 500}) + mock_admin.alter_consumer_group_offsets.return_value = {'g': _offset_future_with_topic_partitions([])} + + client = _client() + with patch.object(client, 'get_admin_client', return_value=mock_admin): + assert client.update_consumer_group_offsets( + 'g', + [ + {'topic': 't', 'partition': 0, 'offset': -2}, + {'topic': 't', 'partition': 1, 'offset': 1000}, + ], + ) + + committed = mock_admin.alter_consumer_group_offsets.call_args[0][0][0].topic_partitions + by_partition = {tp.partition: tp.offset for tp in committed} + assert by_partition == {0: 500, 1: 1000} + + def test_resolves_timestamp_across_all_partitions(self): + admin_metadata = MagicMock() + admin_metadata.topics = {'t': MagicMock(partitions={0: MagicMock(), 1: MagicMock()})} + + mock_admin = MagicMock() + mock_admin.list_topics.return_value = admin_metadata + mock_admin.list_offsets.side_effect = _list_offsets_stub({0: 10, 1: 20}) + mock_admin.alter_consumer_group_offsets.return_value = {'g': _offset_future_with_topic_partitions([])} + + client = _client() + with patch.object(client, 'get_admin_client', return_value=mock_admin): + assert client.update_consumer_group_offsets('g', [{'topic': 't', 'timestamp': 1700000000000}]) + + committed = mock_admin.alter_consumer_group_offsets.call_args[0][0][0].topic_partitions + by_partition = {tp.partition: tp.offset for tp in committed} + assert by_partition == {0: 10, 1: 20} + + def test_resolves_timestamp_falls_back_to_latest_when_no_message_after(self): + admin_metadata = MagicMock() + admin_metadata.topics = {'t': MagicMock(partitions={0: MagicMock(), 1: MagicMock()})} + + # Partition 0 has a message at/after the timestamp; partition 1 doesn't (-1) and + # must fall back to a separate batched `latest` lookup. + timestamp_offsets = {0: 10, 1: -1} + latest_offsets = {1: 30} + + def list_offsets_side_effect(request, **kwargs): + is_latest_batch = all(spec == OffsetSpec.latest() for spec in request.values()) + offsets = latest_offsets if is_latest_batch else timestamp_offsets + return {tp: _offset_future(offsets[tp.partition]) for tp in request} + + mock_admin = MagicMock() + mock_admin.list_topics.return_value = admin_metadata + mock_admin.list_offsets.side_effect = list_offsets_side_effect + mock_admin.alter_consumer_group_offsets.return_value = {'g': _offset_future_with_topic_partitions([])} + + client = _client() + with patch.object(client, 'get_admin_client', return_value=mock_admin): + assert client.update_consumer_group_offsets('g', [{'topic': 't', 'timestamp': 1700000000000}]) + + committed = mock_admin.alter_consumer_group_offsets.call_args[0][0][0].topic_partitions + by_partition = {tp.partition: tp.offset for tp in committed} + assert by_partition == {0: 10, 1: 30} + assert mock_admin.list_offsets.call_count == 2 + + def test_rejects_both_offset_and_timestamp(self): + client = _client() + with patch.object(client, 'get_admin_client', return_value=MagicMock()): + with pytest.raises(ValueError, match="cannot specify both"): + client.update_consumer_group_offsets('g', [{'topic': 't', 'partition': 0, 'offset': 1, 'timestamp': 1}]) + + def test_rejects_overlapping_targets(self): + mock_admin = MagicMock() + client = _client() + with patch.object(client, 'get_admin_client', return_value=mock_admin): + with pytest.raises(ValueError, match="Multiple offset specifications target the same partition"): + client.update_consumer_group_offsets( + 'g', + [ + {'topic': 't', 'partition': 0, 'offset': 1}, + {'topic': 't', 'partition': 0, 'offset': 2}, + ], + ) + + def test_raises_on_per_partition_error(self): + mock_admin = MagicMock() + mock_admin.list_offsets.side_effect = _list_offsets_stub({0: 1}) + failed_tp = MagicMock(topic='t', partition=0, error='UNKNOWN_MEMBER_ID') + mock_admin.alter_consumer_group_offsets.return_value = {'g': _offset_future_with_topic_partitions([failed_tp])} + + client = _client() + with patch.object(client, 'get_admin_client', return_value=mock_admin): + with pytest.raises(Exception, match="Per-partition errors"): + client.update_consumer_group_offsets('g', [{'topic': 't', 'partition': 0, 'offset': 1}]) + class TestProduceMessageAction: """Test produce_message action."""