From e46818c2f99ba8884ded7c35d6a50e5bc5c89ea3 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Tue, 28 Apr 2026 21:33:43 +0000 Subject: [PATCH 1/4] config: add enable_listoffsets_historical_leader_epoch dev property Foundation for CORE-12505 ListOffsets leader epoch fix. When on, ListOffsets returns the historical (record-time) leader epoch on non-read-replica topics, matching Kafka. Gated as a development_feature_property because this PR only fixes two of the three response paths (timequery-match and empty-partition; earliest ships in a follow-up). With the earliest path unfixed, enabling produces inconsistent epochs across paths: the start offset reports the current epoch while later offsets report historical, worse than the original bug. A follow-up will promote to a regular property, add legacy_default to preserve existing-cluster behavior, and flip the default to true for new clusters. --- src/v/config/configuration.cc | 11 +++++++++++ src/v/config/configuration.h | 2 ++ 2 files changed, 13 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 7025246fda37b..934d9db3e4e2f 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -706,6 +706,17 @@ configuration::configuration() "if it means returning less bytes in the fetch than are available.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 5s) + , enable_listoffsets_historical_leader_epoch( + *this, + "enable_listoffsets_historical_leader_epoch", + "When enabled, the Kafka ListOffsets API returns the historical " + "(record-time) leader epoch instead of the current leader epoch. " + "Gated as a development feature: not all response paths are fixed " + "yet (CORE-12505), so enabling this property produces internally " + "inconsistent epoch values across paths and must not be enabled in " + "production.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + false) , fetch_read_strategy( *this, "fetch_read_strategy", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 424eaba82a623..2d4a7ebdeb003 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -199,6 +199,8 @@ struct configuration final : public config_store { property tx_timeout_delay_ms; property fetch_reads_debounce_timeout; property kafka_fetch_request_timeout_ms; + development_feature_property + enable_listoffsets_historical_leader_epoch; enum_property fetch_read_strategy; bounded_property fetch_max_read_concurrency; bounded_property fetch_pid_p_coeff; From 8f7b8e2b9118702d132667fe7a2b7df2b827792b Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 29 Apr 2026 01:36:30 +0000 Subject: [PATCH 2/4] kafka: fix empty-partition timequery leader_epoch in ListOffsets When enable_listoffsets_historical_leader_epoch is on and the topic is not a read replica, a timequery on an empty partition now returns kafka::invalid_leader_epoch (Kafka spec compliant). Earliest and latest queries on empty partitions still return the partition's current leader epoch, matching Kafka. Introduces response_leader_epoch as the canonical decision point for historical-vs-current epoch on ListOffsets responses. Takes is_read_replica (plumbed from a per-topic get_topic_cfg lookup) and an optional historical_term (term of the matched record, or nullopt when no record was located). If the topic_cfg lookup is empty, every partition returns unknown_topic_or_partition rather than silently being treated as non-read-replica. Later commits route timequery-match through the same helper. Test parameterized on the property, with the legacy variant pinning the bug and the fixed variant asserting the sentinel. --- src/v/kafka/server/handlers/list_offsets.cc | 52 ++++++++++++++++--- tests/rptest/tests/list_offsets_epoch_test.py | 15 +++++- 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/src/v/kafka/server/handlers/list_offsets.cc b/src/v/kafka/server/handlers/list_offsets.cc index 71ef5a92d9e4f..8a42b3a74f12f 100644 --- a/src/v/kafka/server/handlers/list_offsets.cc +++ b/src/v/kafka/server/handlers/list_offsets.cc @@ -12,6 +12,7 @@ #include "cluster/metadata_cache.h" #include "cluster/partition_manager.h" #include "cluster/shard_table.h" +#include "config/configuration.h" #include "container/chunked_vector.h" #include "kafka/data/partition_proxy.h" #include "kafka/protocol/errors.h" @@ -61,12 +62,38 @@ struct list_offsets_ctx { , unauthorized_topics(std::move(unauthorized_topics)) {} }; +/// Compute the leader_epoch to put on a ListOffsets response. +/// +/// CORE-12505: when `enable_listoffsets_historical_leader_epoch` is on +/// (and the topic is not a read replica), return the record's leader +/// epoch, to match Kafka and support KIP-320 truncation detection on +/// consumers. Otherwise, return the partition's current leader epoch. +/// +/// `historical_term` is the term of the matched record, or `nullopt` +/// when no record was matched (e.g., timequery on an empty partition), +/// in which case we return `kafka::invalid_leader_epoch`. +/// +/// Read replicas are excluded pending separate analysis. +static kafka::leader_epoch response_leader_epoch( + const partition_proxy& kafka_partition, + bool is_read_replica, + std::optional historical_term) { + const bool correct_epoch_enabled + = config::shard_local_cfg().enable_listoffsets_historical_leader_epoch(); + if (!correct_epoch_enabled || is_read_replica) { + return kafka_partition.leader_epoch(); + } + return historical_term ? kafka::leader_epoch_from_term(*historical_term) + : kafka::invalid_leader_epoch; +} + static ss::future list_offsets_partition( list_offsets_ctx& octx, model::timestamp timestamp, model::ktp ktp, model::isolation_level isolation_lvl, kafka::leader_epoch current_leader_epoch, + bool is_read_replica, cluster::partition_manager& mgr) { auto kafka_partition = make_partition_proxy(ktp, mgr); if (!kafka_partition) { @@ -138,7 +165,8 @@ static ss::future list_offsets_partition( ktp.get_partition(), model::timestamp(-1), model::offset(-1), - kafka_partition->leader_epoch()); + response_leader_epoch( + *kafka_partition, is_read_replica, std::nullopt)); } auto res_fut = co_await ss::coroutine::as_future(kafka_partition->timequery( @@ -170,7 +198,8 @@ static ss::future list_offsets_partition( list_offsets_ctx& octx, model::timestamp timestamp, list_offset_topic& topic, - list_offset_partition& part) { + list_offset_partition& part, + bool is_read_replica) { model::ktp ktp(topic.name, part.partition_index); auto shard = octx.rctx.shards().shard_for(ktp); @@ -188,14 +217,15 @@ static ss::future list_offsets_partition( ntp = std::move(ktp), isolation_lvl = model::isolation_level( octx.request.data.isolation_level), - current_leader_epoch = part.current_leader_epoch]( - cluster::partition_manager& mgr) mutable { + current_leader_epoch = part.current_leader_epoch, + is_read_replica](cluster::partition_manager& mgr) mutable { return list_offsets_partition( octx, timestamp, std::move(ntp), isolation_lvl, current_leader_epoch, + is_read_replica, mgr); }); } @@ -209,6 +239,9 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) { = octx.rctx.metadata_cache().get_topic_disabled_set( model::topic_namespace_view{model::kafka_namespace, topic.name}); + const auto topic_cfg = octx.rctx.metadata_cache().get_topic_cfg( + model::topic_namespace_view{model::kafka_namespace, topic.name}); + for (auto& part : topic.partitions) { if (octx.request.duplicate_tp(topic.name, part.partition_index)) { partitions.push_back( @@ -218,9 +251,11 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) { continue; } - if (!octx.rctx.metadata_cache().contains( - model::topic_namespace_view(model::kafka_namespace, topic.name), - part.partition_index)) { + if ( + !octx.rctx.metadata_cache().contains( + model::topic_namespace_view(model::kafka_namespace, topic.name), + part.partition_index) + || !topic_cfg.has_value()) { partitions.push_back( ss::make_ready_future( list_offsets_response::make_partition( @@ -237,7 +272,8 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) { continue; } - auto pr = list_offsets_partition(octx, part.timestamp, topic, part); + auto pr = list_offsets_partition( + octx, part.timestamp, topic, part, topic_cfg->is_read_replica()); partitions.push_back(std::move(pr)); } diff --git a/tests/rptest/tests/list_offsets_epoch_test.py b/tests/rptest/tests/list_offsets_epoch_test.py index 89ab86f77373a..d54dc7608770d 100644 --- a/tests/rptest/tests/list_offsets_epoch_test.py +++ b/tests/rptest/tests/list_offsets_epoch_test.py @@ -32,6 +32,7 @@ from rptest.services.rpk_consumer import RpkConsumer from rptest.tests.redpanda_test import RedpandaTest from rptest.util import wait_until_result +from ducktape.mark import parametrize # --- ListOffsets v4 (API key 2) --- @@ -641,9 +642,19 @@ def test_list_offsets_epoch(self): self._test_list_offsets_epoch(self.redpanda, expect_incorrect_behavior=True) @cluster(num_nodes=3) - def test_list_offsets_epoch_empty_partition(self): + @parametrize(correct_epoch=False) + @parametrize(correct_epoch=True) + def test_list_offsets_epoch_empty_partition(self, correct_epoch): + if correct_epoch: + # enable_listoffsets_historical_leader_epoch is gated as a + # development feature; opt in to dev features before flipping it. + self.redpanda.enable_development_feature_support() + self.redpanda.set_cluster_config( + {"enable_listoffsets_historical_leader_epoch": True} + ) self._test_empty_partition_list_offsets_epoch( - self.redpanda, expect_incorrect_behavior=True + self.redpanda, + expect_incorrect_behavior=not correct_epoch, ) @cluster(num_nodes=4) From 05400a29d5819678e8d8504acc43ff155a2e9191 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 29 Apr 2026 02:05:52 +0000 Subject: [PATCH 3/4] kafka: fix timequery-match leader_epoch in ListOffsets When enable_listoffsets_historical_leader_epoch is on and the topic is not a read replica, ListOffsets timequery match now returns the historical leader epoch from the matched batch (sourced from storage::timequery_result.term, already populated by batch_timequery and previously discarded by the handler). Routes through response_leader_epoch (introduced in the previous commit), passing res->term as the historical_term. Refactors _test_list_offsets_epoch to accept per-path expectations (expect_incorrect_earliest and expect_incorrect_timequery) so the test can pin earliest as still-buggy while asserting the corrected timequery behavior under the partial fix. --- src/v/kafka/server/handlers/list_offsets.cc | 5 ++- tests/rptest/tests/list_offsets_epoch_test.py | 35 +++++++++++++++---- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/v/kafka/server/handlers/list_offsets.cc b/src/v/kafka/server/handlers/list_offsets.cc index 8a42b3a74f12f..716ac9bdf0be8 100644 --- a/src/v/kafka/server/handlers/list_offsets.cc +++ b/src/v/kafka/server/handlers/list_offsets.cc @@ -189,7 +189,10 @@ static ss::future list_offsets_partition( auto res = res_fut.get(); if (res) { co_return list_offsets_response::make_partition( - id, res->time, res->offset, kafka_partition->leader_epoch()); + id, + res->time, + res->offset, + response_leader_epoch(*kafka_partition, is_read_replica, res->term)); } co_return list_offsets_response::make_partition(id, error_code::none); } diff --git a/tests/rptest/tests/list_offsets_epoch_test.py b/tests/rptest/tests/list_offsets_epoch_test.py index d54dc7608770d..0de0c85dc12e7 100644 --- a/tests/rptest/tests/list_offsets_epoch_test.py +++ b/tests/rptest/tests/list_offsets_epoch_test.py @@ -372,7 +372,9 @@ def _get_committed(self, cluster, group, topic, partition): return (-1, -1) - def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior): + def _test_list_offsets_epoch( + self, cluster, expect_incorrect_earliest, expect_incorrect_timequery + ): """Verify ListOffsets returns the correct leader epoch for each timestamp query type. @@ -380,6 +382,10 @@ def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior): times. The earliest and timequery paths should return the initial epoch (the record epoch), while the latest path should return the current leader epoch (correct per Kafka). + + Earliest and timequery are gated by separate flags because + Redpanda's partial fix (CORE-12505) addresses timequery before + earliest — the earliest-path lookup ships in a follow-up. """ initial_epoch, current_epoch = self._setup_topic_with_epoch_gap(cluster) @@ -388,7 +394,7 @@ def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior): self.logger.info( f"Earliest: offset={offset}, epoch={epoch}, current_epoch={current_epoch}" ) - if expect_incorrect_behavior: + if expect_incorrect_earliest: assert epoch == current_epoch, ( f"Bug expected: earliest epoch should be current " f"({current_epoch}), got {epoch}" @@ -416,7 +422,7 @@ def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior): self.logger.info( f"Timequery: offset={offset}, epoch={epoch}, current_epoch={current_epoch}" ) - if expect_incorrect_behavior: + if expect_incorrect_timequery: assert epoch == current_epoch, ( f"Bug expected: timequery epoch should be current " f"({current_epoch}), got {epoch}" @@ -638,8 +644,21 @@ def _apply_throwaway_hack(self, real_group, throwaway_group, topic, partition): ) @cluster(num_nodes=3) - def test_list_offsets_epoch(self): - self._test_list_offsets_epoch(self.redpanda, expect_incorrect_behavior=True) + @parametrize(correct_epoch=False) + @parametrize(correct_epoch=True) + def test_list_offsets_epoch(self, correct_epoch): + if correct_epoch: + # enable_listoffsets_historical_leader_epoch is gated as a + # development feature; opt in to dev features before flipping it. + self.redpanda.enable_development_feature_support() + self.redpanda.set_cluster_config( + {"enable_listoffsets_historical_leader_epoch": True} + ) + self._test_list_offsets_epoch( + self.redpanda, + expect_incorrect_earliest=True, # earliest still buggy until follow-up PR + expect_incorrect_timequery=not correct_epoch, + ) @cluster(num_nodes=3) @parametrize(correct_epoch=False) @@ -871,7 +890,11 @@ def _restart_leader(self, cluster, topic, partition): @ducktape_cluster(num_nodes=4) def test_list_offsets_epoch(self): # Kafka defines the correct behavior we compare against. - self._test_list_offsets_epoch(self.kafka, expect_incorrect_behavior=False) + self._test_list_offsets_epoch( + self.kafka, + expect_incorrect_earliest=False, + expect_incorrect_timequery=False, + ) @ducktape_cluster(num_nodes=4) def test_list_offsets_epoch_empty_partition(self): From 2dda91defcef492dbbffd681047886d8a9991430 Mon Sep 17 00:00:00 2001 From: Andrew Nguyen Date: Wed, 29 Apr 2026 02:09:19 +0000 Subject: [PATCH 4/4] kafka: add TODO marker for ListOffsets earliest-path fix handoff Marks where the follow-up should route earliest through response_leader_epoch once partition_proxy exposes a term-for-offset lookup. No behavior change. --- src/v/kafka/server/handlers/list_offsets.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/v/kafka/server/handlers/list_offsets.cc b/src/v/kafka/server/handlers/list_offsets.cc index 716ac9bdf0be8..7a593c75e07f0 100644 --- a/src/v/kafka/server/handlers/list_offsets.cc +++ b/src/v/kafka/server/handlers/list_offsets.cc @@ -143,6 +143,11 @@ static ss::future list_offsets_partition( ktp.get_partition(), maybe_start_ofs.error()); } + // TODO(CORE-12505 follow-up): route through response_leader_epoch + // once partition_proxy exposes a term-for-offset lookup. When + // the property is true and the topic is not a read replica, + // this should return the historical term for + // maybe_start_ofs.value() instead of the current leader term. co_return list_offsets_response::make_partition( ktp.get_partition(), model::timestamp(-1),