Skip to content

Commit fac4bc9

Browse files
kafka_actions: harden reset-consumer-offsets action (DataDog#24165)
* kafka_actions: harden reset-consumer-offsets action - Abort before altering offsets when the consumer group still has active members (describe_consumer_groups pre-flight check) so the Kafka NON_EMPTY_GROUP error never surfaces as a cryptic partial failure. - Inspect per-partition TopicPartition error codes in the alter_consumer_group_offsets result; previously only group-level futures were checked, letting silent per-partition failures through. - Add reset_to: earliest|latest support: resolves the log-start or high-watermark via list_offsets at runtime, then commits the concrete offset. alter_consumer_group_offsets does not accept symbolic offsets. - Validate that partition and offset are non-negative; reject entries that set both offset and reset_to or neither. - Clarify in spec.yaml that the cluster field must be the Kafka-internal UUID (from AdminClient.list_topics().cluster_id), not a human-readable name; update examples and error messages accordingly. - Log the full offset list in the pre-execution warning for audit trail. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_actions: add changelog entry for PR DataDog#24165 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_actions: fix ruff formatting Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_actions: fix ruff import sort in remaining test files Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix update_consumer_group_offsets in kafka_actions The confluent-kafka AdminClient.alter_consumer_group_offsets() API takes a single list of ConsumerGroupTopicPartitions, not (group, partitions) as two positional arguments, which raised "takes 2 positional arguments but 3 were given". ConsumerGroupTopicPartitions is also exported from confluent_kafka, not confluent_kafka.admin. Additionally, future.result() returns a single ConsumerGroupTopicPartitions object rather than a list of partitions, so iterate its topic_partitions to collect per-partition errors. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * kafka_actions: add timestamp-based offset reset and sentinel offsets Replace reset_to: earliest/latest with offset: -2/-1 sentinel values and add a timestamp field that resolves all partitions of a topic to the first offset at or after the given millisecond timestamp, with automatic partition discovery when partition is omitted. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_actions: remove unrelated cluster-UUID doc changes from spec.yaml Reverts stray documentation claiming the 'cluster' field for update_consumer_group_offsets must be the Kafka-internal UUID rather than a human-readable name. That claim is unrelated to this PR's hardening work and inconsistent with every other action's cluster docs/examples (prod-kafka-1). * kafka_actions: sync changelog entry with current update_consumer_group_offsets behavior * kafka_actions: restore autogenerated file headers and fix formatting Regenerated config_models were missing their license headers (a stray regeneration artifact), and kafka_client.py / test_unit.py had two small ruff formatting/import-order issues. * kafka_actions: address round-1 review feedback on offset-reset hardening Adds request timeouts and error handling around list_offsets/describe_consumer_groups calls, de-dups overlapping offset targets, guards against both offset and timestamp being set, and splits update_consumer_group_offsets into smaller resolver methods. Extracts a shared partition-discovery helper and de-dups the config.py partition validation. Adds unit test coverage for the client-internal sentinel/timestamp/ per-partition-error/active-member logic and parameterizes the offsets validation tests. Cleans up doc formatting in spec.yaml and splits the changelog entry into .fixed/.added to reflect the net-new sentinel/timestamp capabilities. * [kafka_actions] Address round-2 review feedback Batch the timestamp-fallback offset lookup instead of issuing one list_offsets RPC per partition, pass the admin client into the resolver helpers consistently, wrap the describe_consumer_groups result with the same log-and-reraise pattern as the other admin calls, and tighten a couple of doc/type-hint/test gaps flagged by the second review pass. Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com> * [kafka_actions] Fix import grouping in test files (ruff I001) CI's ruff check flagged missing blank-line separation between third-party and first-party import groups across the test suite. * [kafka_actions] Address round-3 review nits Type-hint the admin param on the offset resolver helpers, and key the fallback test's list_offsets stub on OffsetSpec type instead of request length so it stays correct if a second fallback partition is added. * [kafka_actions] Fix import grouping (ruff I001) Restore the blank line between third-party and first-party import groups after adding the OffsetSpec import. * [kafka_actions] Trim redundant offsets doc and merge changelog entries Remove the duplicated offset/timestamp usage block from the action description (already covered in the per-field docs), and consolidate the two changelog fragments into a single added entry. --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent ac244fb commit fac4bc9

9 files changed

Lines changed: 584 additions & 69 deletions

File tree

kafka_actions/assets/configuration/spec.yaml

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,8 @@ files:
502502
description: |
503503
Configuration for updating consumer group offsets.
504504
WARNING: Can cause duplicate processing or data loss.
505-
Consumer group should be stopped (no active members).
505+
The consumer group must have no active members before this action runs.
506+
The check enforces this and will fail with a clear error if members are found.
506507
value:
507508
type: object
508509
required:
@@ -518,7 +519,9 @@ files:
518519
offset: 1000
519520
- topic: orders
520521
partition: 1
521-
offset: 1500
522+
offset: -2
523+
- topic: payments
524+
timestamp: 1735689600000
522525
properties:
523526
- name: cluster
524527
type: string
@@ -530,30 +533,43 @@ files:
530533
example: order-processor
531534
- name: offsets
532535
type: array
533-
description: List of topic-partition-offset tuples to update
536+
description: |
537+
List of offset specifications. Each entry must specify exactly one of `offset`
538+
or `timestamp`. See the action description for details.
534539
items:
535540
type: object
536541
required:
537542
- topic
538-
- partition
539-
- offset
540543
properties:
541544
- name: topic
542545
type: string
543546
description: Topic name
544547
- name: partition
545548
type: integer
546-
description: Partition number
549+
description: |
550+
Non-negative partition number. Required when `offset` is specified.
551+
Optional when `timestamp` is specified. Omit to target all partitions.
547552
- name: offset
548553
type: integer
549-
description: New offset value
554+
description: |
555+
Offset to commit. Use -2 for earliest (log-start), -1 for latest
556+
(high-watermark), or a non-negative integer for an explicit position.
557+
Mutually exclusive with `timestamp`. Requires `partition`.
558+
- name: timestamp
559+
type: integer
560+
description: |
561+
Milliseconds since epoch. Resets to the first offset at or after this
562+
timestamp. Partitions with no message at or after the timestamp are
563+
reset to latest. Mutually exclusive with `offset`.
550564
example:
551565
- topic: orders
552566
partition: 0
553567
offset: 1000
554568
- topic: orders
555569
partition: 1
556-
offset: 1500
570+
offset: -2
571+
- topic: payments
572+
timestamp: 1735689600000
557573
fleet_configurable: false
558574

559575
# ========================================================================
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added earliest, latest, and timestamp-based offset support to the ``update_consumer_group_offsets`` action, along with an inactive-group precondition check and per-partition error reporting.

kafka_actions/datadog_checks/kafka_actions/check.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -743,10 +743,12 @@ def _action_update_consumer_group_offsets(self):
743743
offsets:
744744
- topic: orders
745745
partition: 0
746-
offset: 1000
746+
offset: 1000 # explicit offset
747747
- topic: orders
748748
partition: 1
749-
offset: 1500
749+
offset: -2 # earliest
750+
- topic: payments
751+
timestamp: 1735689600000 # all partitions at/after this timestamp
750752
"""
751753
config = self.config.update_consumer_group_offsets
752754

@@ -755,10 +757,14 @@ def _action_update_consumer_group_offsets(self):
755757
consumer_group = config['consumer_group']
756758
offsets = config['offsets']
757759

760+
self.kafka_client.check_consumer_group_inactive(consumer_group)
761+
758762
self.log.warning(
759-
"Updating offsets for consumer group '%s' on cluster '%s' - may cause duplicate processing or data loss",
763+
"Updating offsets for consumer group '%s' on cluster '%s' - may cause duplicate processing or data loss. "
764+
"Offsets: %s",
760765
consumer_group,
761766
self.cluster,
767+
offsets,
762768
)
763769

764770
success = self.kafka_client.update_consumer_group_offsets(consumer_group=consumer_group, offsets=offsets)

kafka_actions/datadog_checks/kafka_actions/config.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# All rights reserved
33
# Licensed under a 3-clause BSD style license (see LICENSE)
44
import os
5+
from typing import Any
56

67
from datadog_checks.base import ConfigurationError, is_affirmative
78

@@ -289,6 +290,11 @@ def _validate_delete_consumer_group(self):
289290
if not config.get('consumer_group'):
290291
raise ConfigurationError("delete_consumer_group action requires 'consumer_group' parameter")
291292

293+
def _validate_offset_entry_partition(self, offset_entry: dict[str, Any], index: int) -> None:
294+
"""Validate that offsets[index].partition, if present, is a non-negative integer."""
295+
if not isinstance(offset_entry['partition'], int) or offset_entry['partition'] < 0:
296+
raise ConfigurationError(f"offsets[{index}].partition must be a non-negative integer")
297+
292298
def _validate_update_consumer_group_offsets(self):
293299
"""Validate update_consumer_group_offsets action configuration."""
294300
config = self.update_consumer_group_offsets
@@ -310,17 +316,36 @@ def _validate_update_consumer_group_offsets(self):
310316
if not offset_entry.get('topic'):
311317
raise ConfigurationError(f"offsets[{i}] requires 'topic' parameter")
312318

313-
if 'partition' not in offset_entry:
314-
raise ConfigurationError(f"offsets[{i}] requires 'partition' parameter")
319+
has_offset = 'offset' in offset_entry
320+
has_timestamp = 'timestamp' in offset_entry
321+
322+
if not has_offset and not has_timestamp:
323+
raise ConfigurationError(f"offsets[{i}] requires 'offset' or 'timestamp'")
324+
325+
if has_offset and has_timestamp:
326+
raise ConfigurationError(f"offsets[{i}] cannot specify both 'offset' and 'timestamp'")
315327

316-
if 'offset' not in offset_entry:
317-
raise ConfigurationError(f"offsets[{i}] requires 'offset' parameter")
328+
if has_offset:
329+
if 'partition' not in offset_entry:
330+
raise ConfigurationError(f"offsets[{i}] requires 'partition' when 'offset' is specified")
318331

319-
if not isinstance(offset_entry.get('partition'), int):
320-
raise ConfigurationError(f"offsets[{i}].partition must be an integer")
332+
self._validate_offset_entry_partition(offset_entry, i)
333+
334+
offset_val = offset_entry['offset']
335+
if not isinstance(offset_val, int) or offset_val < -2:
336+
raise ConfigurationError(
337+
f"offsets[{i}].offset must be -2 (earliest), -1 (latest), or a non-negative integer"
338+
)
339+
340+
if has_timestamp:
341+
ts = offset_entry['timestamp']
342+
if not isinstance(ts, int) or ts <= 0:
343+
raise ConfigurationError(
344+
f"offsets[{i}].timestamp must be a positive integer (milliseconds since epoch)"
345+
)
321346

322-
if not isinstance(offset_entry.get('offset'), int):
323-
raise ConfigurationError(f"offsets[{i}].offset must be an integer")
347+
if 'partition' in offset_entry:
348+
self._validate_offset_entry_partition(offset_entry, i)
324349

325350
def _validate_produce_message(self):
326351
"""Validate produce_message action configuration."""

kafka_actions/datadog_checks/kafka_actions/config_models/instance.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,18 @@ class Offset(BaseModel):
205205
arbitrary_types_allowed=True,
206206
frozen=True,
207207
)
208-
offset: int = Field(..., description='New offset value')
209-
partition: int = Field(..., description='Partition number')
208+
offset: Optional[int] = Field(
209+
None,
210+
description='Offset to commit. Use -2 for earliest (log-start), -1 for latest\n(high-watermark), or a non-negative integer for an explicit position.\nMutually exclusive with `timestamp`. Requires `partition`.\n',
211+
)
212+
partition: Optional[int] = Field(
213+
None,
214+
description='Non-negative partition number. Required when `offset` is specified.\nOptional when `timestamp` is specified. Omit to target all partitions.\n',
215+
)
216+
timestamp: Optional[int] = Field(
217+
None,
218+
description='Milliseconds since epoch. Resets to the first offset at or after this\ntimestamp. Partitions with no message at or after the timestamp are\nreset to latest. Mutually exclusive with `offset`.\n',
219+
)
210220
topic: str = Field(..., description='Topic name')
211221

212222

@@ -219,11 +229,12 @@ class UpdateConsumerGroupOffsets(BaseModel):
219229
consumer_group: str = Field(..., description='Consumer group ID to update', examples=['order-processor'])
220230
offsets: tuple[Offset, ...] = Field(
221231
...,
222-
description='List of topic-partition-offset tuples to update',
232+
description='List of offset specifications. Each entry must specify exactly one of `offset`\nor `timestamp`. See the action description for details.\n',
223233
examples=[
224234
[
225235
{'offset': 1000, 'partition': 0, 'topic': 'orders'},
226-
{'offset': 1500, 'partition': 1, 'topic': 'orders'},
236+
{'offset': -2, 'partition': 1, 'topic': 'orders'},
237+
{'timestamp': 1735689600000, 'topic': 'payments'},
227238
]
228239
],
229240
)

kafka_actions/datadog_checks/kafka_actions/data/conf.yaml.example

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,8 @@ instances:
259259
## @param update_consumer_group_offsets - mapping - optional
260260
## Configuration for updating consumer group offsets.
261261
## WARNING: Can cause duplicate processing or data loss.
262-
## Consumer group should be stopped (no active members).
262+
## The consumer group must have no active members before this action runs.
263+
## The check enforces this and will fail with a clear error if members are found.
263264
#
264265
# update_consumer_group_offsets:
265266
# cluster: prod-kafka-1
@@ -270,7 +271,9 @@ instances:
270271
# offset: 1000
271272
# - topic: orders
272273
# partition: 1
273-
# offset: 1500
274+
# offset: -2
275+
# - topic: payments
276+
# timestamp: 1735689600000
274277

275278
## @param produce_message - mapping - optional
276279
## Configuration for producing a message to a Kafka topic.

0 commit comments

Comments
 (0)