Skip to content

[bugfix] Kafka: batch + retry offsets_for_times to survive broker timeouts#547

Merged
tiankongdeguiji merged 2 commits into
alibaba:masterfrom
tiankongdeguiji:kafka_offsets_for_times_batch_retry
Jun 23, 2026
Merged

[bugfix] Kafka: batch + retry offsets_for_times to survive broker timeouts#547
tiankongdeguiji merged 2 commits into
alibaba:masterfrom
tiankongdeguiji:kafka_offsets_for_times_batch_retry

Conversation

@tiankongdeguiji

@tiankongdeguiji tiankongdeguiji commented Jun 18, 2026

Copy link
Copy Markdown
Collaborator

Problem

When a Kafka input uses start.timestamp.ms, the reader resolves the timestamp to per-partition offsets inside the on_assign rebalance callback with a single consumer.offsets_for_times(ts_partitions, timeout=30.0) call over all assigned partitions (128 in a production export). When the broker failed to answer the ListOffsetsRequest within the hardcoded 30s, the call raised inside the rebalance callback, killing the DataLoader worker and failing the job:

RuntimeError: DataLoader worker (pid(s) ...) exited unexpectedly
... tzrec.export FAILED

Change

Resolve offsets in a new helper _offsets_for_times_batched:

  • Chunked: split partitions into batches (default 32 per offsets_for_times call) so each broker request is smaller and more likely to complete within the timeout.
  • Retried: each chunk is retried (default 2 retries / 3 attempts) on KafkaException, mirroring the existing _read_rows_arrow_with_retry style in odps_dataset.py.
  • Configurable via env:
    • TZREC_KAFKA_OFFSETS_FOR_TIMES_BATCH_SIZE (default 32)
    • TZREC_KAFKA_OFFSETS_FOR_TIMES_TIMEOUT (default 30.0, seconds)
    • TZREC_KAFKA_OFFSETS_FOR_TIMES_RETRIES (default 2)

The per-partition fallback to auto.offset.reset is unchanged and still applies only to partitions the broker legitimately resolves to -1 (no message at/after the timestamp). A persistent failure after retries re-raises by design: _reader is shared with training, where silently falling back to auto.offset.reset would replay the entire topic from the earliest offset.

Notes / limitation

Chunking + retry materially raises the success probability against transient broker slowness or per-request overload, but does not eliminate failures when the cluster is globally unavailable — those still surface loudly (intended).

Docs

The three env vars are documented in the KafkaDataset section of docs/source/feature/data.md, next to the start.timestamp.ms bullet.

Test

New broker-free, mock-only OffsetsForTimesBatchedTest in kafka_dataset_test.py covering: empty input, single chunk, chunk boundaries (65 parts → 30/30/5), -1 filtering, retry-then-success, retry-exhausted-raises, and per-chunk retry isolation. Existing broker-gated tests are unchanged.

🤖 Generated with Claude Code

@tiankongdeguiji tiankongdeguiji added the claude-review Let Claude Review label Jun 18, 2026
@github-actions github-actions Bot removed the claude-review Let Claude Review label Jun 18, 2026
Comment thread tzrec/datasets/kafka_dataset.py Outdated
Comment on lines +335 to +341
offt_batch_size = max(
1, int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_BATCH_SIZE", "30"))
)
offt_timeout = float(
os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_TIMEOUT", "30.0")
)
offt_retries = int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_RETRIES", "3"))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch_size is clamped with max(1, ...), but timeout and retries are not bounded — and all three raise an uncaught ValueError on a malformed value (e.g. RETRIES=abc or an empty string), crashing the DataLoader worker, which is the exact failure mode this PR sets out to prevent.

  • negative retriesretry_cnt >= max_retries is true on the first failure (0 effective retries) and logs the misleading "failed after -1 retries"
  • timeout <= 0 is passed straight to offsets_for_times, where it has undefined/poll-once semantics and can burn every retry on spurious immediate timeouts

Clamping the two unbounded values (and ideally tolerating non-numeric input via try/except → default) would close this:

Suggested change
offt_batch_size = max(
1, int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_BATCH_SIZE", "30"))
)
offt_timeout = float(
os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_TIMEOUT", "30.0")
)
offt_retries = int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_RETRIES", "3"))
offt_batch_size = max(
1, int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_BATCH_SIZE", "30"))
)
offt_timeout = max(
0.1, float(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_TIMEOUT", "30.0"))
)
offt_retries = max(
0, int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_RETRIES", "3"))
)

Comment on lines +365 to +372
resolved_map = _offsets_for_times_batched(
consumer,
ts_partitions,
start_timestamp_ms,
offt_batch_size,
offt_timeout,
offt_retries,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth sanity-checking the default time budget against max.poll.interval.ms (default 300s, read just above at line 333). This converts one call into ceil(N/batch_size) sequential chunks, each retried max_retries+1 times with backoff. With the production N=128 and the defaults (batch=30, timeout=30, retries=3):

5 chunks × (4 × 30s + (2+4+6)s backoff) ≈ 660s — more than 2× the 300s poll interval.

Two concerns under the sustained broker slowness this PR targets:

  • on_assign runs synchronously nested inside consume() while the main thread holds consumer_lock, so the heartbeat thread (which also needs consumer_lock) is blocked the entire time and can't keep the consumer alive.
  • exceeding max.poll.interval.ms inside the callback risks the coordinator evicting the consumer → reassignment → on_assign re-runs from scratch, i.e. a rebalance loop rather than the clean bounded failure intended.

Consider bounding the helper with an overall deadline derived from max_poll_interval_ms (already computed at line 333), and/or lowering the default timeout/retries so the documented defaults comfortably fit inside the poll interval.

@github-actions

Copy link
Copy Markdown

Code review summary

Solid, well-targeted bugfix. Chunking + per-chunk retry with capped backoff, an injectable sleep_fn for broker-free tests, and the deliberate re-raise (instead of silently falling back to auto.offset.reset, which would replay the whole topic) are all good calls. Test coverage of the happy paths, chunk boundaries (65→30/30/5), -1 filtering, and retry-then-success/exhaustion is thorough.

Two noteworthy items posted inline:

  1. Default time budget vs max.poll.interval.ms — worst-case ~660s inside on_assign (N=128, defaults) exceeds the default 300s poll interval, and the heartbeat is blocked during the callback, so under the very broker slowness this targets it can self-induce a rebalance loop rather than fail cleanly.
  2. Env-var clamping/robustnesstimeout/retries aren't bounded (unlike batch_size), and malformed values raise an uncaught ValueError in the DataLoader worker.

Minor (non-blocking):

  • Docs: the three TZREC_KAFKA_OFFSETS_FOR_TIMES_* env vars aren't documented anywhere user-facing, even though the error message at kafka_dataset.py:148 tells users to tune them. The ## KafkaDataset section in docs/source/feature/data.md (near the start.timestamp.ms bullet) is the natural home, matching the existing env-var doc pattern.
  • Tests: the backoff value is never asserted (only sleep_fn.call_count), so a regression in the min(2.0 * retry_cnt, 10.0) formula would pass — consider asserting sleep_fn.call_args_list (e.g. [2.0, 4.0, 6.0]) plus the 10s cap.
  • Helper docstring: _offsets_for_times_batched mutates the caller's TopicPartition.offset in place; correct today (the call site overwrites it immediately after) but worth a one-line note since the function returns a dict and otherwise reads as pure.

…eouts

Resolving start.timestamp.ms to offsets issued a single offsets_for_times
call for all assigned partitions (128 in prod) with a hardcoded 30s
timeout inside the on_assign rebalance callback. When the broker did not
answer in time the call raised, killing the DataLoader worker and failing
export.

Resolve offsets in chunks (default 32 partitions per call) and retry each
chunk (default 2) so a smaller per-call broker request and per-chunk retry
survive transient slowness. Chunk size, per-call timeout, and retry count
are env-configurable via TZREC_KAFKA_OFFSETS_FOR_TIMES_{BATCH_SIZE,TIMEOUT,
RETRIES}. A persistent failure still re-raises rather than silently falling
back to auto.offset.reset, because _reader is shared with training where
that would replay the whole topic.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@tiankongdeguiji tiankongdeguiji force-pushed the kafka_offsets_for_times_batch_retry branch 2 times, most recently from f0a1f49 to a6fd6a4 Compare June 18, 2026 12:17
Add the three offsets_for_times tuning knobs (batch size, timeout,
retries) to the KafkaDataset section next to the start.timestamp.ms
bullet, so the hint in the runtime error message points somewhere.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@tiankongdeguiji tiankongdeguiji force-pushed the kafka_offsets_for_times_batch_retry branch from a6fd6a4 to cbf7a8f Compare June 18, 2026 12:18
@tiankongdeguiji tiankongdeguiji merged commit 5166440 into alibaba:master Jun 23, 2026
7 checks passed
@tiankongdeguiji tiankongdeguiji deleted the kafka_offsets_for_times_batch_retry branch June 23, 2026 09:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants