kafka: foundation for ListOffsets leader epoch fix (CORE-12505)#30347
kafka: foundation for ListOffsets leader epoch fix (CORE-12505)#30347nguyen-andrew wants to merge 4 commits intoredpanda-data:devfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Establishes an opt-in, hidden cluster property that gates a fix for CORE-12505 (ListOffsets leader epoch) and wires the corrected leader-epoch behavior into two response paths (timequery-match and empty-partition) while leaving the earliest path for follow-up.
Changes:
- Introduces
enable_listoffsets_correct_leader_epochas a hidden-when-default cluster property with a legacy default anchor for safe future default flips. - Updates the ListOffsets handler to return historical leader epoch for timequery-match and
invalid_leader_epochfor empty-partition when gated on and topic is not a read replica. - Adds/updates ducktape tests to validate epoch behavior across paths and parameterize expectations based on the new gate; extends rpk test helpers (
group_seek_to,RpkConsumerclean shutdown).
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| tools/type-checking/type-check-strictness.json | Adds the new test file to pyright “standard” strictness tracking. |
| tests/rptest/tests/list_offsets_epoch_test.py | New end-to-end and protocol-level tests for ListOffsets leader-epoch behavior (Kafka vs Redpanda), including gated expectations. |
| tests/rptest/services/rpk_consumer.py | Adds clean_shutdown option to allow SIGTERM-based shutdown for commit/LeaveGroup flushing. |
| tests/rptest/clients/rpk.py | Extends group_seek_to to optionally scope to topics and permit new topics. |
| src/v/kafka/server/handlers/list_offsets.cc | Gates and returns corrected leader epochs for timequery-match and empty-partition paths; threads read-replica check into per-partition handling. |
| src/v/config/property.h | Adds a hidden_when_default_property constructor overload supporting legacy_default. |
| src/v/config/configuration.h | Declares the new cluster property. |
| src/v/config/configuration.cc | Defines the new cluster property and its legacy default anchor. |
CI test resultstest results on build#83861
test results on build#84195
|
11fe1a0 to
aa789f5
Compare
|
Force push to rebase on dev (collapse merged stacked PR). |
| = octx.rctx.metadata_cache().get_topic_disabled_set( | ||
| model::topic_namespace_view{model::kafka_namespace, topic.name}); | ||
|
|
||
| const auto topic_cfg = octx.rctx.metadata_cache().get_topic_cfg( |
There was a problem hiding this comment.
We should error out if topic_cfg is missing. Suggest making is_rr an optional and use it in the per partition conditional below
if (!octx.rctx.metadata_cache().contains(
model::topic_namespace_view(model::kafka_namespace, topic.name),
part.partition_index) || !is_rr.has_value()) {
Wdyt?
There was a problem hiding this comment.
Makes sense, I added a check for topic_cfg in the existing per-partition not-found branch. After that point, we can use is_read_replica as a plain bool.
| "if it means returning less bytes in the fetch than are available.", | ||
| {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, | ||
| 5s) | ||
| , enable_listoffsets_correct_leader_epoch( |
There was a problem hiding this comment.
nit: the naming is vague and in the first line of description you actually describe much better the property than the name does. Take that description and put it in the name: enable_listoffsets_historical_leader_epoch or enable_listoffsets_record_leader_epoch
There was a problem hiding this comment.
Renamed to enable_listoffsets_historical_leader_epoch.
| model::ktp ktp, | ||
| model::isolation_level isolation_lvl, | ||
| kafka::leader_epoch current_leader_epoch, | ||
| bool is_rr, |
There was a problem hiding this comment.
nit: bool is_read_replica - not too many characters compared to other parameters - easier to read for everyone - including agents
| self.redpanda.set_cluster_config( | ||
| {"enable_listoffsets_correct_leader_epoch": True} | ||
| ) | ||
| self._test_empty_partition_list_offsets_epoch( |
There was a problem hiding this comment.
Missing coverage for earliest/latest queries with empty partition? These return leader_epoch()
|
|
||
| const auto topic_cfg = octx.rctx.metadata_cache().get_topic_cfg( | ||
| model::topic_namespace_view{model::kafka_namespace, topic.name}); | ||
| const auto is_rr = topic_cfg.has_value() && topic_cfg->is_read_replica(); |
There was a problem hiding this comment.
ditto: we should error if we don't have topic cfg
| model::timestamp(-1), | ||
| model::offset(-1), | ||
| kafka_partition->leader_epoch()); | ||
| (correct_epoch_enabled && !is_rr) ? kafka::invalid_leader_epoch |
There was a problem hiding this comment.
todo comment? consider extracting this logic (repeated a couple of times) to a function where you can document why we have this !is_rr condition. Extracting as a function will give you this one single canonical place which hopefully everyone in the future wondering about the behavior will reach. Sprinkling the check everywhere will make it harder to find the reasoning.
There was a problem hiding this comment.
Done, extracted to response_leader_epoch with the rationale documented in the doc comment.
| ktp.get_partition(), error_code::unknown_topic_or_partition); | ||
| } | ||
|
|
||
| const auto correct_epoch_enabled |
There was a problem hiding this comment.
nit: const bool - same number of characters; so much better to read
| ktp.get_partition(), maybe_start_ofs.error()); | ||
| } | ||
|
|
||
| // TODO(CORE-12505 follow-up): wire the correct_epoch_enabled |
There was a problem hiding this comment.
This will be ugly... Querying earliest is usually a metadata call but now it will require reading a batch of data OR updating all the prefix truncation paths and make them do the read so that start is epoch/start epoch overrides is stored together with "kafka offset override".
If we take the first route (easy) I'm worried about listing metadata for topics with 1000s of partitions.
Not for this PR but something to keep in mind.
There was a problem hiding this comment.
Good point, the easy route would scale poorly on high-partition topics.
| property<std::chrono::milliseconds> tx_timeout_delay_ms; | ||
| property<std::chrono::milliseconds> fetch_reads_debounce_timeout; | ||
| property<std::chrono::milliseconds> kafka_fetch_request_timeout_ms; | ||
| hidden_when_default_property<bool> enable_listoffsets_correct_leader_epoch; |
There was a problem hiding this comment.
Shall we mark this as a development property until we are happy with implementation? Right now it has weird behavior where it will return a higher leader_epoch for partition start offset than for later offsets.
There was a problem hiding this comment.
Done, switched to development_feature_property.
nvartolomei
left a comment
There was a problem hiding this comment.
Looks good.
Empty partition: returns kafka::invalid_leader_epoch.
Second commit in the chain and PR description describe incorrectly/incompletely the expected behavior. Empty partitions do return current leader epoch for earliest/latest timequeries. That's what test asserts too.
Please update text to avoid confusion.
Foundation for CORE-12505 ListOffsets leader epoch fix. When on, ListOffsets returns the historical (record-time) leader epoch on non-read-replica topics, matching Kafka. Gated as a development_feature_property because this PR only fixes two of the three response paths (timequery-match and empty-partition; earliest ships in a follow-up). With the earliest path unfixed, enabling produces inconsistent epochs across paths: the start offset reports the current epoch while later offsets report historical, worse than the original bug. A follow-up will promote to a regular property, add legacy_default to preserve existing-cluster behavior, and flip the default to true for new clusters.
When enable_listoffsets_historical_leader_epoch is on and the topic is not a read replica, a timequery on an empty partition now returns kafka::invalid_leader_epoch (Kafka spec compliant). Earliest and latest queries on empty partitions still return the partition's current leader epoch, matching Kafka. Introduces response_leader_epoch as the canonical decision point for historical-vs-current epoch on ListOffsets responses. Takes is_read_replica (plumbed from a per-topic get_topic_cfg lookup) and an optional historical_term (term of the matched record, or nullopt when no record was located). If the topic_cfg lookup is empty, every partition returns unknown_topic_or_partition rather than silently being treated as non-read-replica. Later commits route timequery-match through the same helper. Test parameterized on the property, with the legacy variant pinning the bug and the fixed variant asserting the sentinel.
When enable_listoffsets_historical_leader_epoch is on and the topic is not a read replica, ListOffsets timequery match now returns the historical leader epoch from the matched batch (sourced from storage::timequery_result.term, already populated by batch_timequery and previously discarded by the handler). Routes through response_leader_epoch (introduced in the previous commit), passing res->term as the historical_term. Refactors _test_list_offsets_epoch to accept per-path expectations (expect_incorrect_earliest and expect_incorrect_timequery) so the test can pin earliest as still-buggy while asserting the corrected timequery behavior under the partial fix.
Marks where the follow-up should route earliest through response_leader_epoch once partition_proxy exposes a term-for-offset lookup. No behavior change.
aa789f5 to
2dda91d
Compare
|
Force push to address PR review comments. |
CORE-12505: Redpanda's ListOffsets API returns the current leader epoch instead of the historical (record-time) one on three response paths (earliest, timequery-match, empty-partition timequery). This PR introduces a gating property and fixes two of the three on non-read-replica topics; the earliest path ships in a follow-up.
Mechanism. Adds
enable_listoffsets_historical_leader_epochas adevelopment_feature_property<bool>, defaulting tofalse. When on, ListOffsets returns the historical leader epoch instead of the current one, matching Kafka (required for KIP-320 truncation detection on consumers).Gated as a
development_feature_propertybecause the partial fix is internally inconsistent: the start offset reports the current epoch while later offsets report historical, worse than the original always-current bug.A follow-up will promote this to a regular property, add
legacy_default<bool>to preserve existing-cluster behavior on upgrade, and flip the default totrueso new clusters get the fix automatically.Delegated (earliest path). Requires a new
partition_proxymethod to surface the segment-level term for a given offset. Left for a follow-up PR; a TODO marker inlist_offsets.ccpoints atresponse_leader_epoch(introduced here) as the integration target.Backports Required
Release Notes