Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions kafka_actions/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ files:
description: |
Configuration for updating consumer group offsets.
WARNING: Can cause duplicate processing or data loss.
Consumer group should be stopped (no active members).
The consumer group must have no active members before this action runs.
The check enforces this and will fail with a clear error if members are found.
value:
type: object
required:
Expand All @@ -518,7 +519,9 @@ files:
offset: 1000
- topic: orders
partition: 1
offset: 1500
offset: -2
- topic: payments
timestamp: 1735689600000
properties:
- name: cluster
type: string
Expand All @@ -530,30 +533,43 @@ files:
example: order-processor
- name: offsets
type: array
description: List of topic-partition-offset tuples to update
description: |
List of offset specifications. Each entry must specify exactly one of `offset`
or `timestamp`. See the action description for details.
items:
type: object
required:
- topic
- partition
- offset
properties:
- name: topic
type: string
description: Topic name
- name: partition
type: integer
description: Partition number
description: |
Non-negative partition number. Required when `offset` is specified.
Optional when `timestamp` is specified. Omit to target all partitions.
- name: offset
type: integer
description: New offset value
description: |
Offset to commit. Use -2 for earliest (log-start), -1 for latest
(high-watermark), or a non-negative integer for an explicit position.
Mutually exclusive with `timestamp`. Requires `partition`.
- name: timestamp
type: integer
description: |
Milliseconds since epoch. Resets to the first offset at or after this
timestamp. Partitions with no message at or after the timestamp are
reset to latest. Mutually exclusive with `offset`.
example:
- topic: orders
partition: 0
offset: 1000
- topic: orders
partition: 1
offset: 1500
offset: -2
- topic: payments
timestamp: 1735689600000
fleet_configurable: false

# ========================================================================
Expand Down
1 change: 1 addition & 0 deletions kafka_actions/changelog.d/24165.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added earliest, latest, and timestamp-based offset support to the ``update_consumer_group_offsets`` action, along with an inactive-group precondition check and per-partition error reporting.
12 changes: 9 additions & 3 deletions kafka_actions/datadog_checks/kafka_actions/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,10 +743,12 @@ def _action_update_consumer_group_offsets(self):
offsets:
- topic: orders
partition: 0
offset: 1000
offset: 1000 # explicit offset
- topic: orders
partition: 1
offset: 1500
offset: -2 # earliest
- topic: payments
timestamp: 1735689600000 # all partitions at/after this timestamp
"""
config = self.config.update_consumer_group_offsets

Expand All @@ -755,10 +757,14 @@ def _action_update_consumer_group_offsets(self):
consumer_group = config['consumer_group']
offsets = config['offsets']

self.kafka_client.check_consumer_group_inactive(consumer_group)

self.log.warning(
"Updating offsets for consumer group '%s' on cluster '%s' - may cause duplicate processing or data loss",
"Updating offsets for consumer group '%s' on cluster '%s' - may cause duplicate processing or data loss. "
"Offsets: %s",
consumer_group,
self.cluster,
offsets,
)

success = self.kafka_client.update_consumer_group_offsets(consumer_group=consumer_group, offsets=offsets)
Expand Down
41 changes: 33 additions & 8 deletions kafka_actions/datadog_checks/kafka_actions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import os
from typing import Any

from datadog_checks.base import ConfigurationError, is_affirmative

Expand Down Expand Up @@ -289,6 +290,11 @@ def _validate_delete_consumer_group(self):
if not config.get('consumer_group'):
raise ConfigurationError("delete_consumer_group action requires 'consumer_group' parameter")

def _validate_offset_entry_partition(self, offset_entry: dict[str, Any], index: int) -> None:
"""Validate that offsets[index].partition, if present, is a non-negative integer."""
if not isinstance(offset_entry['partition'], int) or offset_entry['partition'] < 0:
raise ConfigurationError(f"offsets[{index}].partition must be a non-negative integer")

def _validate_update_consumer_group_offsets(self):
"""Validate update_consumer_group_offsets action configuration."""
config = self.update_consumer_group_offsets
Expand All @@ -310,17 +316,36 @@ def _validate_update_consumer_group_offsets(self):
if not offset_entry.get('topic'):
raise ConfigurationError(f"offsets[{i}] requires 'topic' parameter")

if 'partition' not in offset_entry:
raise ConfigurationError(f"offsets[{i}] requires 'partition' parameter")
has_offset = 'offset' in offset_entry
has_timestamp = 'timestamp' in offset_entry

if not has_offset and not has_timestamp:
raise ConfigurationError(f"offsets[{i}] requires 'offset' or 'timestamp'")

if has_offset and has_timestamp:
raise ConfigurationError(f"offsets[{i}] cannot specify both 'offset' and 'timestamp'")

if 'offset' not in offset_entry:
raise ConfigurationError(f"offsets[{i}] requires 'offset' parameter")
if has_offset:
if 'partition' not in offset_entry:
raise ConfigurationError(f"offsets[{i}] requires 'partition' when 'offset' is specified")

if not isinstance(offset_entry.get('partition'), int):
raise ConfigurationError(f"offsets[{i}].partition must be an integer")
self._validate_offset_entry_partition(offset_entry, i)

offset_val = offset_entry['offset']
if not isinstance(offset_val, int) or offset_val < -2:
raise ConfigurationError(
f"offsets[{i}].offset must be -2 (earliest), -1 (latest), or a non-negative integer"
)

if has_timestamp:
ts = offset_entry['timestamp']
if not isinstance(ts, int) or ts <= 0:
raise ConfigurationError(
f"offsets[{i}].timestamp must be a positive integer (milliseconds since epoch)"
)

if not isinstance(offset_entry.get('offset'), int):
raise ConfigurationError(f"offsets[{i}].offset must be an integer")
if 'partition' in offset_entry:
self._validate_offset_entry_partition(offset_entry, i)

def _validate_produce_message(self):
"""Validate produce_message action configuration."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,18 @@ class Offset(BaseModel):
arbitrary_types_allowed=True,
frozen=True,
)
offset: int = Field(..., description='New offset value')
partition: int = Field(..., description='Partition number')
offset: Optional[int] = Field(
None,
description='Offset to commit. Use -2 for earliest (log-start), -1 for latest\n(high-watermark), or a non-negative integer for an explicit position.\nMutually exclusive with `timestamp`. Requires `partition`.\n',
)
partition: Optional[int] = Field(
None,
description='Non-negative partition number. Required when `offset` is specified.\nOptional when `timestamp` is specified. Omit to target all partitions.\n',
)
timestamp: Optional[int] = Field(
None,
description='Milliseconds since epoch. Resets to the first offset at or after this\ntimestamp. Partitions with no message at or after the timestamp are\nreset to latest. Mutually exclusive with `offset`.\n',
)
topic: str = Field(..., description='Topic name')


Expand All @@ -219,11 +229,12 @@ class UpdateConsumerGroupOffsets(BaseModel):
consumer_group: str = Field(..., description='Consumer group ID to update', examples=['order-processor'])
offsets: tuple[Offset, ...] = Field(
...,
description='List of topic-partition-offset tuples to update',
description='List of offset specifications. Each entry must specify exactly one of `offset`\nor `timestamp`. See the action description for details.\n',
examples=[
[
{'offset': 1000, 'partition': 0, 'topic': 'orders'},
{'offset': 1500, 'partition': 1, 'topic': 'orders'},
{'offset': -2, 'partition': 1, 'topic': 'orders'},
{'timestamp': 1735689600000, 'topic': 'payments'},
]
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ instances:
## @param update_consumer_group_offsets - mapping - optional
## Configuration for updating consumer group offsets.
## WARNING: Can cause duplicate processing or data loss.
## Consumer group should be stopped (no active members).
## The consumer group must have no active members before this action runs.
## The check enforces this and will fail with a clear error if members are found.
#
# update_consumer_group_offsets:
# cluster: prod-kafka-1
Expand All @@ -270,7 +271,9 @@ instances:
# offset: 1000
# - topic: orders
# partition: 1
# offset: 1500
# offset: -2
# - topic: payments
# timestamp: 1735689600000

## @param produce_message - mapping - optional
## Configuration for producing a message to a Kafka topic.
Expand Down
Loading
Loading