Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,17 @@ configuration::configuration()
"if it means returning less bytes in the fetch than are available.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
5s)
, enable_listoffsets_historical_leader_epoch(
*this,
"enable_listoffsets_historical_leader_epoch",
"When enabled, the Kafka ListOffsets API returns the historical "
"(record-time) leader epoch instead of the current leader epoch. "
"Gated as a development feature: not all response paths are fixed "
"yet (CORE-12505), so enabling this property produces internally "
"inconsistent epoch values across paths and must not be enabled in "
"production.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
false)
, fetch_read_strategy(
*this,
"fetch_read_strategy",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ struct configuration final : public config_store {
property<std::chrono::milliseconds> tx_timeout_delay_ms;
property<std::chrono::milliseconds> fetch_reads_debounce_timeout;
property<std::chrono::milliseconds> kafka_fetch_request_timeout_ms;
development_feature_property<bool>
enable_listoffsets_historical_leader_epoch;
enum_property<model::fetch_read_strategy> fetch_read_strategy;
bounded_property<size_t> fetch_max_read_concurrency;
bounded_property<double, numeric_bounds> fetch_pid_p_coeff;
Expand Down
62 changes: 53 additions & 9 deletions src/v/kafka/server/handlers/list_offsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "cluster/metadata_cache.h"
#include "cluster/partition_manager.h"
#include "cluster/shard_table.h"
#include "config/configuration.h"
#include "container/chunked_vector.h"
#include "kafka/data/partition_proxy.h"
#include "kafka/protocol/errors.h"
Expand Down Expand Up @@ -61,12 +62,38 @@ struct list_offsets_ctx {
, unauthorized_topics(std::move(unauthorized_topics)) {}
};

/// Compute the leader_epoch to put on a ListOffsets response.
///
/// CORE-12505: when `enable_listoffsets_historical_leader_epoch` is on
/// (and the topic is not a read replica), return the record's leader
/// epoch, to match Kafka and support KIP-320 truncation detection on
/// consumers. Otherwise, return the partition's current leader epoch.
///
/// `historical_term` is the term of the matched record, or `nullopt`
/// when no record was matched (e.g., timequery on an empty partition),
/// in which case we return `kafka::invalid_leader_epoch`.
///
/// Read replicas are excluded pending separate analysis.
static kafka::leader_epoch response_leader_epoch(
const partition_proxy& kafka_partition,
bool is_read_replica,
std::optional<model::term_id> historical_term) {
const bool correct_epoch_enabled
= config::shard_local_cfg().enable_listoffsets_historical_leader_epoch();
if (!correct_epoch_enabled || is_read_replica) {
return kafka_partition.leader_epoch();
}
return historical_term ? kafka::leader_epoch_from_term(*historical_term)
: kafka::invalid_leader_epoch;
}

static ss::future<list_offset_partition_response> list_offsets_partition(
list_offsets_ctx& octx,
model::timestamp timestamp,
model::ktp ktp,
model::isolation_level isolation_lvl,
kafka::leader_epoch current_leader_epoch,
bool is_read_replica,
cluster::partition_manager& mgr) {
auto kafka_partition = make_partition_proxy(ktp, mgr);
if (!kafka_partition) {
Expand Down Expand Up @@ -116,6 +143,11 @@ static ss::future<list_offset_partition_response> list_offsets_partition(
ktp.get_partition(), maybe_start_ofs.error());
}

// TODO(CORE-12505 follow-up): route through response_leader_epoch
// once partition_proxy exposes a term-for-offset lookup. When
// the property is true and the topic is not a read replica,
// this should return the historical term for
// maybe_start_ofs.value() instead of the current leader term.
co_return list_offsets_response::make_partition(
ktp.get_partition(),
model::timestamp(-1),
Expand All @@ -138,7 +170,8 @@ static ss::future<list_offset_partition_response> list_offsets_partition(
ktp.get_partition(),
model::timestamp(-1),
model::offset(-1),
kafka_partition->leader_epoch());
response_leader_epoch(
*kafka_partition, is_read_replica, std::nullopt));
}

auto res_fut = co_await ss::coroutine::as_future(kafka_partition->timequery(
Expand All @@ -161,7 +194,10 @@ static ss::future<list_offset_partition_response> list_offsets_partition(
auto res = res_fut.get();
if (res) {
co_return list_offsets_response::make_partition(
id, res->time, res->offset, kafka_partition->leader_epoch());
id,
res->time,
res->offset,
response_leader_epoch(*kafka_partition, is_read_replica, res->term));
}
co_return list_offsets_response::make_partition(id, error_code::none);
}
Expand All @@ -170,7 +206,8 @@ static ss::future<list_offset_partition_response> list_offsets_partition(
list_offsets_ctx& octx,
model::timestamp timestamp,
list_offset_topic& topic,
list_offset_partition& part) {
list_offset_partition& part,
bool is_read_replica) {
model::ktp ktp(topic.name, part.partition_index);

auto shard = octx.rctx.shards().shard_for(ktp);
Expand All @@ -188,14 +225,15 @@ static ss::future<list_offset_partition_response> list_offsets_partition(
ntp = std::move(ktp),
isolation_lvl = model::isolation_level(
octx.request.data.isolation_level),
current_leader_epoch = part.current_leader_epoch](
cluster::partition_manager& mgr) mutable {
current_leader_epoch = part.current_leader_epoch,
is_read_replica](cluster::partition_manager& mgr) mutable {
return list_offsets_partition(
octx,
timestamp,
std::move(ntp),
isolation_lvl,
current_leader_epoch,
is_read_replica,
mgr);
});
}
Expand All @@ -209,6 +247,9 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) {
= octx.rctx.metadata_cache().get_topic_disabled_set(
model::topic_namespace_view{model::kafka_namespace, topic.name});

const auto topic_cfg = octx.rctx.metadata_cache().get_topic_cfg(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should error out if topic_cfg is missing. Suggest making is_rr an optional and use it in the per partition conditional below

if (!octx.rctx.metadata_cache().contains(
              model::topic_namespace_view(model::kafka_namespace, topic.name),
              part.partition_index) || !is_rr.has_value()) {

Wdyt?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I added a check for topic_cfg in the existing per-partition not-found branch. After that point, we can use is_read_replica as a plain bool.

model::topic_namespace_view{model::kafka_namespace, topic.name});

for (auto& part : topic.partitions) {
if (octx.request.duplicate_tp(topic.name, part.partition_index)) {
partitions.push_back(
Expand All @@ -218,9 +259,11 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) {
continue;
}

if (!octx.rctx.metadata_cache().contains(
model::topic_namespace_view(model::kafka_namespace, topic.name),
part.partition_index)) {
if (
!octx.rctx.metadata_cache().contains(
model::topic_namespace_view(model::kafka_namespace, topic.name),
part.partition_index)
|| !topic_cfg.has_value()) {
partitions.push_back(
ss::make_ready_future<list_offset_partition_response>(
list_offsets_response::make_partition(
Expand All @@ -237,7 +280,8 @@ list_offsets_topic(list_offsets_ctx& octx, list_offset_topic& topic) {
continue;
}

auto pr = list_offsets_partition(octx, part.timestamp, topic, part);
auto pr = list_offsets_partition(
octx, part.timestamp, topic, part, topic_cfg->is_read_replica());
partitions.push_back(std::move(pr));
}

Expand Down
50 changes: 42 additions & 8 deletions tests/rptest/tests/list_offsets_epoch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from rptest.services.rpk_consumer import RpkConsumer
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import wait_until_result
from ducktape.mark import parametrize


# --- ListOffsets v4 (API key 2) ---
Expand Down Expand Up @@ -371,14 +372,20 @@ def _get_committed(self, cluster, group, topic, partition):

return (-1, -1)

def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior):
def _test_list_offsets_epoch(
self, cluster, expect_incorrect_earliest, expect_incorrect_timequery
):
"""Verify ListOffsets returns the correct leader epoch for each
timestamp query type.

All records are produced before leadership is transferred 3
times. The earliest and timequery paths should return the
initial epoch (the record epoch), while the latest path should
return the current leader epoch (correct per Kafka).

Earliest and timequery are gated by separate flags because
Redpanda's partial fix (CORE-12505) addresses timequery before
earliest — the earliest-path lookup ships in a follow-up.
"""
initial_epoch, current_epoch = self._setup_topic_with_epoch_gap(cluster)

Expand All @@ -387,7 +394,7 @@ def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior):
self.logger.info(
f"Earliest: offset={offset}, epoch={epoch}, current_epoch={current_epoch}"
)
if expect_incorrect_behavior:
if expect_incorrect_earliest:
assert epoch == current_epoch, (
f"Bug expected: earliest epoch should be current "
f"({current_epoch}), got {epoch}"
Expand Down Expand Up @@ -415,7 +422,7 @@ def _test_list_offsets_epoch(self, cluster, expect_incorrect_behavior):
self.logger.info(
f"Timequery: offset={offset}, epoch={epoch}, current_epoch={current_epoch}"
)
if expect_incorrect_behavior:
if expect_incorrect_timequery:
assert epoch == current_epoch, (
f"Bug expected: timequery epoch should be current "
f"({current_epoch}), got {epoch}"
Expand Down Expand Up @@ -637,13 +644,36 @@ def _apply_throwaway_hack(self, real_group, throwaway_group, topic, partition):
)

@cluster(num_nodes=3)
def test_list_offsets_epoch(self):
self._test_list_offsets_epoch(self.redpanda, expect_incorrect_behavior=True)
@parametrize(correct_epoch=False)
@parametrize(correct_epoch=True)
def test_list_offsets_epoch(self, correct_epoch):
if correct_epoch:
# enable_listoffsets_historical_leader_epoch is gated as a
# development feature; opt in to dev features before flipping it.
self.redpanda.enable_development_feature_support()
self.redpanda.set_cluster_config(
{"enable_listoffsets_historical_leader_epoch": True}
)
self._test_list_offsets_epoch(
self.redpanda,
expect_incorrect_earliest=True, # earliest still buggy until follow-up PR
expect_incorrect_timequery=not correct_epoch,
)

@cluster(num_nodes=3)
def test_list_offsets_epoch_empty_partition(self):
@parametrize(correct_epoch=False)
@parametrize(correct_epoch=True)
def test_list_offsets_epoch_empty_partition(self, correct_epoch):
if correct_epoch:
# enable_listoffsets_historical_leader_epoch is gated as a
# development feature; opt in to dev features before flipping it.
self.redpanda.enable_development_feature_support()
self.redpanda.set_cluster_config(
{"enable_listoffsets_historical_leader_epoch": True}
)
self._test_empty_partition_list_offsets_epoch(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing coverage for earliest/latest queries with empty partition? These return leader_epoch()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, those are tested in _test_empty_partition_list_offsets_epoch (which the parameterized test calls into): earliest and latest assert they return the current epoch on empty partitions.

self.redpanda, expect_incorrect_behavior=True
self.redpanda,
expect_incorrect_behavior=not correct_epoch,
)

@cluster(num_nodes=4)
Expand Down Expand Up @@ -860,7 +890,11 @@ def _restart_leader(self, cluster, topic, partition):
@ducktape_cluster(num_nodes=4)
def test_list_offsets_epoch(self):
# Kafka defines the correct behavior we compare against.
self._test_list_offsets_epoch(self.kafka, expect_incorrect_behavior=False)
self._test_list_offsets_epoch(
self.kafka,
expect_incorrect_earliest=False,
expect_incorrect_timequery=False,
)

@ducktape_cluster(num_nodes=4)
def test_list_offsets_epoch_empty_partition(self):
Expand Down
Loading