Skip to content

kafka: foundation for ListOffsets leader epoch fix (CORE-12505)#30347

Open
nguyen-andrew wants to merge 4 commits intoredpanda-data:devfrom
nguyen-andrew:listoffsets-fix-foundation
Open

kafka: foundation for ListOffsets leader epoch fix (CORE-12505)#30347
nguyen-andrew wants to merge 4 commits intoredpanda-data:devfrom
nguyen-andrew:listoffsets-fix-foundation

Conversation

@nguyen-andrew
Copy link
Copy Markdown
Member

@nguyen-andrew nguyen-andrew commented Apr 30, 2026

CORE-12505: Redpanda's ListOffsets API returns the current leader epoch instead of the historical (record-time) one on three response paths (earliest, timequery-match, empty-partition timequery). This PR introduces a gating property and fixes two of the three on non-read-replica topics; the earliest path ships in a follow-up.

Mechanism. Adds enable_listoffsets_historical_leader_epoch as a development_feature_property<bool>, defaulting to false. When on, ListOffsets returns the historical leader epoch instead of the current one, matching Kafka (required for KIP-320 truncation detection on consumers).

Gated as a development_feature_property because the partial fix is internally inconsistent: the start offset reports the current epoch while later offsets report historical, worse than the original always-current bug.

A follow-up will promote this to a regular property, add legacy_default<bool> to preserve existing-cluster behavior on upgrade, and flip the default to true so new clusters get the fix automatically.

Delegated (earliest path). Requires a new partition_proxy method to surface the segment-level term for a given offset. Left for a follow-up PR; a TODO marker in list_offsets.cc points at response_leader_epoch (introduced here) as the integration target.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v26.1.x
  • v25.3.x
  • v25.2.x

Release Notes

  • none

@nguyen-andrew nguyen-andrew requested a review from a team as a code owner April 30, 2026 01:02
Copilot AI review requested due to automatic review settings April 30, 2026 01:02
@nguyen-andrew nguyen-andrew self-assigned this Apr 30, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Establishes an opt-in, hidden cluster property that gates a fix for CORE-12505 (ListOffsets leader epoch) and wires the corrected leader-epoch behavior into two response paths (timequery-match and empty-partition) while leaving the earliest path for follow-up.

Changes:

  • Introduces enable_listoffsets_correct_leader_epoch as a hidden-when-default cluster property with a legacy default anchor for safe future default flips.
  • Updates the ListOffsets handler to return historical leader epoch for timequery-match and invalid_leader_epoch for empty-partition when gated on and topic is not a read replica.
  • Adds/updates ducktape tests to validate epoch behavior across paths and parameterize expectations based on the new gate; extends rpk test helpers (group_seek_to, RpkConsumer clean shutdown).

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.

Show a summary per file
File Description
tools/type-checking/type-check-strictness.json Adds the new test file to pyright “standard” strictness tracking.
tests/rptest/tests/list_offsets_epoch_test.py New end-to-end and protocol-level tests for ListOffsets leader-epoch behavior (Kafka vs Redpanda), including gated expectations.
tests/rptest/services/rpk_consumer.py Adds clean_shutdown option to allow SIGTERM-based shutdown for commit/LeaveGroup flushing.
tests/rptest/clients/rpk.py Extends group_seek_to to optionally scope to topics and permit new topics.
src/v/kafka/server/handlers/list_offsets.cc Gates and returns corrected leader epochs for timequery-match and empty-partition paths; threads read-replica check into per-partition handling.
src/v/config/property.h Adds a hidden_when_default_property constructor overload supporting legacy_default.
src/v/config/configuration.h Declares the new cluster property.
src/v/config/configuration.cc Defines the new cluster property and its legacy default anchor.

@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented Apr 30, 2026

CI test results

test results on build#83861
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) DebugBundleTest test_delete_cancelled_job null integration https://buildkite.com/redpanda/redpanda/builds/83861#019ddc04-60c1-4275-b6aa-9fe055284e9d 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0000, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=DebugBundleTest&test_method=test_delete_cancelled_job
test results on build#84195
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) InternalTopicProtectionLargeClusterTest test_consumer_offset_topic null integration https://buildkite.com/redpanda/redpanda/builds/84195#019e04fd-1171-4ac6-b0bd-12a9118f3802 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0047, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=InternalTopicProtectionLargeClusterTest&test_method=test_consumer_offset_topic

@nguyen-andrew nguyen-andrew force-pushed the listoffsets-fix-foundation branch from 11fe1a0 to aa789f5 Compare May 6, 2026 22:51
@nguyen-andrew
Copy link
Copy Markdown
Member Author

Force push to rebase on dev (collapse merged stacked PR).

= octx.rctx.metadata_cache().get_topic_disabled_set(
model::topic_namespace_view{model::kafka_namespace, topic.name});

const auto topic_cfg = octx.rctx.metadata_cache().get_topic_cfg(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should error out if topic_cfg is missing. Suggest making is_rr an optional and use it in the per partition conditional below

if (!octx.rctx.metadata_cache().contains(
              model::topic_namespace_view(model::kafka_namespace, topic.name),
              part.partition_index) || !is_rr.has_value()) {

Wdyt?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I added a check for topic_cfg in the existing per-partition not-found branch. After that point, we can use is_read_replica as a plain bool.

Comment thread src/v/config/configuration.cc Outdated
"if it means returning less bytes in the fetch than are available.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
5s)
, enable_listoffsets_correct_leader_epoch(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the naming is vague and in the first line of description you actually describe much better the property than the name does. Take that description and put it in the name: enable_listoffsets_historical_leader_epoch or enable_listoffsets_record_leader_epoch

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to enable_listoffsets_historical_leader_epoch.

model::ktp ktp,
model::isolation_level isolation_lvl,
kafka::leader_epoch current_leader_epoch,
bool is_rr,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: bool is_read_replica - not too many characters compared to other parameters - easier to read for everyone - including agents

self.redpanda.set_cluster_config(
{"enable_listoffsets_correct_leader_epoch": True}
)
self._test_empty_partition_list_offsets_epoch(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing coverage for earliest/latest queries with empty partition? These return leader_epoch()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, those are tested in _test_empty_partition_list_offsets_epoch (which the parameterized test calls into): earliest and latest assert they return the current epoch on empty partitions.


const auto topic_cfg = octx.rctx.metadata_cache().get_topic_cfg(
model::topic_namespace_view{model::kafka_namespace, topic.name});
const auto is_rr = topic_cfg.has_value() && topic_cfg->is_read_replica();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: we should error if we don't have topic cfg

model::timestamp(-1),
model::offset(-1),
kafka_partition->leader_epoch());
(correct_epoch_enabled && !is_rr) ? kafka::invalid_leader_epoch
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo comment? consider extracting this logic (repeated a couple of times) to a function where you can document why we have this !is_rr condition. Extracting as a function will give you this one single canonical place which hopefully everyone in the future wondering about the behavior will reach. Sprinkling the check everywhere will make it harder to find the reasoning.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, extracted to response_leader_epoch with the rationale documented in the doc comment.

ktp.get_partition(), error_code::unknown_topic_or_partition);
}

const auto correct_epoch_enabled
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: const bool - same number of characters; so much better to read

ktp.get_partition(), maybe_start_ofs.error());
}

// TODO(CORE-12505 follow-up): wire the correct_epoch_enabled
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be ugly... Querying earliest is usually a metadata call but now it will require reading a batch of data OR updating all the prefix truncation paths and make them do the read so that start is epoch/start epoch overrides is stored together with "kafka offset override".

If we take the first route (easy) I'm worried about listing metadata for topics with 1000s of partitions.

Not for this PR but something to keep in mind.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, the easy route would scale poorly on high-partition topics.

Comment thread src/v/config/configuration.h Outdated
property<std::chrono::milliseconds> tx_timeout_delay_ms;
property<std::chrono::milliseconds> fetch_reads_debounce_timeout;
property<std::chrono::milliseconds> kafka_fetch_request_timeout_ms;
hidden_when_default_property<bool> enable_listoffsets_correct_leader_epoch;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mark this as a development property until we are happy with implementation? Right now it has weird behavior where it will return a higher leader_epoch for partition start offset than for later offsets.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, switched to development_feature_property.

Copy link
Copy Markdown
Contributor

@nvartolomei nvartolomei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

Empty partition: returns kafka::invalid_leader_epoch.

Second commit in the chain and PR description describe incorrectly/incompletely the expected behavior. Empty partitions do return current leader epoch for earliest/latest timequeries. That's what test asserts too.

Please update text to avoid confusion.

Foundation for CORE-12505 ListOffsets leader epoch fix. When on,
ListOffsets returns the historical (record-time) leader epoch on
non-read-replica topics, matching Kafka.

Gated as a development_feature_property because this PR only fixes
two of the three response paths (timequery-match and empty-partition;
earliest ships in a follow-up). With the earliest path unfixed,
enabling produces inconsistent epochs across paths: the start offset
reports the current epoch while later offsets report historical,
worse than the original bug.

A follow-up will promote to a regular property, add legacy_default
to preserve existing-cluster behavior, and flip the default to true
for new clusters.
When enable_listoffsets_historical_leader_epoch is on
and the topic is not a read replica, a timequery on an empty
partition now returns kafka::invalid_leader_epoch (Kafka spec
compliant). Earliest and latest queries on empty partitions still
return the partition's current leader epoch, matching Kafka.

Introduces response_leader_epoch as the canonical decision point
for historical-vs-current epoch on ListOffsets responses. Takes
is_read_replica (plumbed from a per-topic get_topic_cfg lookup) and
an optional historical_term (term of the matched record, or nullopt
when no record was located). If the topic_cfg lookup is empty,
every partition returns unknown_topic_or_partition rather than
silently being treated as non-read-replica. Later commits route
timequery-match through the same helper.

Test parameterized on the property, with the legacy variant pinning
the bug and the fixed variant asserting the sentinel.
When enable_listoffsets_historical_leader_epoch is on
and the topic is not a read replica, ListOffsets timequery match now
returns the historical leader epoch from the matched batch (sourced
from storage::timequery_result.term, already populated by
batch_timequery and previously discarded by the handler). Routes
through response_leader_epoch (introduced in the previous commit),
passing res->term as the historical_term.

Refactors _test_list_offsets_epoch to accept per-path expectations
(expect_incorrect_earliest and expect_incorrect_timequery) so the
test can pin earliest as still-buggy while asserting the corrected
timequery behavior under the partial fix.
Marks where the follow-up should route earliest through
response_leader_epoch once partition_proxy exposes a term-for-offset
lookup. No behavior change.
@nguyen-andrew nguyen-andrew force-pushed the listoffsets-fix-foundation branch from aa789f5 to 2dda91d Compare May 7, 2026 23:59
@nguyen-andrew
Copy link
Copy Markdown
Member Author

Force push to address PR review comments.

@nguyen-andrew nguyen-andrew requested a review from nvartolomei May 8, 2026 00:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants