[bugfix] Kafka: batch + retry offsets_for_times to survive broker timeouts#547
Conversation
| offt_batch_size = max( | ||
| 1, int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_BATCH_SIZE", "30")) | ||
| ) | ||
| offt_timeout = float( | ||
| os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_TIMEOUT", "30.0") | ||
| ) | ||
| offt_retries = int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_RETRIES", "3")) |
There was a problem hiding this comment.
batch_size is clamped with max(1, ...), but timeout and retries are not bounded — and all three raise an uncaught ValueError on a malformed value (e.g. RETRIES=abc or an empty string), crashing the DataLoader worker, which is the exact failure mode this PR sets out to prevent.
- negative
retries→retry_cnt >= max_retriesis true on the first failure (0 effective retries) and logs the misleading "failed after -1 retries" timeout <= 0is passed straight tooffsets_for_times, where it has undefined/poll-once semantics and can burn every retry on spurious immediate timeouts
Clamping the two unbounded values (and ideally tolerating non-numeric input via try/except → default) would close this:
| offt_batch_size = max( | |
| 1, int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_BATCH_SIZE", "30")) | |
| ) | |
| offt_timeout = float( | |
| os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_TIMEOUT", "30.0") | |
| ) | |
| offt_retries = int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_RETRIES", "3")) | |
| offt_batch_size = max( | |
| 1, int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_BATCH_SIZE", "30")) | |
| ) | |
| offt_timeout = max( | |
| 0.1, float(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_TIMEOUT", "30.0")) | |
| ) | |
| offt_retries = max( | |
| 0, int(os.environ.get("TZREC_KAFKA_OFFSETS_FOR_TIMES_RETRIES", "3")) | |
| ) |
| resolved_map = _offsets_for_times_batched( | ||
| consumer, | ||
| ts_partitions, | ||
| start_timestamp_ms, | ||
| offt_batch_size, | ||
| offt_timeout, | ||
| offt_retries, | ||
| ) |
There was a problem hiding this comment.
Worth sanity-checking the default time budget against max.poll.interval.ms (default 300s, read just above at line 333). This converts one call into ceil(N/batch_size) sequential chunks, each retried max_retries+1 times with backoff. With the production N=128 and the defaults (batch=30, timeout=30, retries=3):
5 chunks × (4 × 30s + (2+4+6)s backoff) ≈ 660s — more than 2× the 300s poll interval.
Two concerns under the sustained broker slowness this PR targets:
on_assignruns synchronously nested insideconsume()while the main thread holdsconsumer_lock, so the heartbeat thread (which also needsconsumer_lock) is blocked the entire time and can't keep the consumer alive.- exceeding
max.poll.interval.msinside the callback risks the coordinator evicting the consumer → reassignment →on_assignre-runs from scratch, i.e. a rebalance loop rather than the clean bounded failure intended.
Consider bounding the helper with an overall deadline derived from max_poll_interval_ms (already computed at line 333), and/or lowering the default timeout/retries so the documented defaults comfortably fit inside the poll interval.
Code review summarySolid, well-targeted bugfix. Chunking + per-chunk retry with capped backoff, an injectable Two noteworthy items posted inline:
Minor (non-blocking):
|
…eouts
Resolving start.timestamp.ms to offsets issued a single offsets_for_times
call for all assigned partitions (128 in prod) with a hardcoded 30s
timeout inside the on_assign rebalance callback. When the broker did not
answer in time the call raised, killing the DataLoader worker and failing
export.
Resolve offsets in chunks (default 32 partitions per call) and retry each
chunk (default 2) so a smaller per-call broker request and per-chunk retry
survive transient slowness. Chunk size, per-call timeout, and retry count
are env-configurable via TZREC_KAFKA_OFFSETS_FOR_TIMES_{BATCH_SIZE,TIMEOUT,
RETRIES}. A persistent failure still re-raises rather than silently falling
back to auto.offset.reset, because _reader is shared with training where
that would replay the whole topic.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
f0a1f49 to
a6fd6a4
Compare
Add the three offsets_for_times tuning knobs (batch size, timeout, retries) to the KafkaDataset section next to the start.timestamp.ms bullet, so the hint in the runtime error message points somewhere. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
a6fd6a4 to
cbf7a8f
Compare
Problem
When a Kafka input uses
start.timestamp.ms, the reader resolves the timestamp to per-partition offsets inside theon_assignrebalance callback with a singleconsumer.offsets_for_times(ts_partitions, timeout=30.0)call over all assigned partitions (128 in a production export). When the broker failed to answer theListOffsetsRequestwithin the hardcoded 30s, the call raised inside the rebalance callback, killing the DataLoader worker and failing the job:Change
Resolve offsets in a new helper
_offsets_for_times_batched:offsets_for_timescall) so each broker request is smaller and more likely to complete within the timeout.KafkaException, mirroring the existing_read_rows_arrow_with_retrystyle inodps_dataset.py.TZREC_KAFKA_OFFSETS_FOR_TIMES_BATCH_SIZE(default32)TZREC_KAFKA_OFFSETS_FOR_TIMES_TIMEOUT(default30.0, seconds)TZREC_KAFKA_OFFSETS_FOR_TIMES_RETRIES(default2)The per-partition fallback to
auto.offset.resetis unchanged and still applies only to partitions the broker legitimately resolves to-1(no message at/after the timestamp). A persistent failure after retries re-raises by design:_readeris shared with training, where silently falling back toauto.offset.resetwould replay the entire topic from the earliest offset.Notes / limitation
Chunking + retry materially raises the success probability against transient broker slowness or per-request overload, but does not eliminate failures when the cluster is globally unavailable — those still surface loudly (intended).
Docs
The three env vars are documented in the
KafkaDatasetsection ofdocs/source/feature/data.md, next to thestart.timestamp.msbullet.Test
New broker-free, mock-only
OffsetsForTimesBatchedTestinkafka_dataset_test.pycovering: empty input, single chunk, chunk boundaries (65 parts → 30/30/5),-1filtering, retry-then-success, retry-exhausted-raises, and per-chunk retry isolation. Existing broker-gated tests are unchanged.🤖 Generated with Claude Code