Skip to content

Commit 43998a9

Browse files
committed
k/d/rpc/client: Introduce get_single_partition_offsets
Reliable version that tries to mitigate not_leader errors Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
1 parent cad3b64 commit 43998a9

3 files changed

Lines changed: 117 additions & 0 deletions

File tree

src/v/kafka/data/rpc/client.cc

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,64 @@ client::get_remote_partition_offsets(
524524
co_return ret_t(std::move(result.value().partition_offsets));
525525
}
526526

527+
ss::future<result<partition_offsets, cluster::errc>>
528+
client::get_single_partition_offsets(model::topic_partition tp) {
529+
co_return co_await retry_with_leader_mitigation(
530+
[this, tp]() { return do_get_single_partition_offsets_once(tp); });
531+
}
532+
533+
ss::future<result<partition_offsets, cluster::errc>>
534+
client::do_get_single_partition_offsets_once(model::topic_partition tp) {
535+
using ret_t = result<partition_offsets, cluster::errc>;
536+
model::topic_namespace_view tp_ns(model::kafka_namespace, tp.topic);
537+
538+
auto topic_cfg = _metadata_cache->find_topic_cfg(tp_ns);
539+
if (!topic_cfg) {
540+
co_return ret_t(cluster::errc::topic_not_exists);
541+
}
542+
543+
auto leader = _leaders->get_leader_node(tp_ns, tp.partition);
544+
if (!leader) {
545+
co_return ret_t(cluster::errc::not_leader);
546+
}
547+
548+
// Build the single-partition request in the same wire format the
549+
// existing get_offsets RPC expects.
550+
chunked_vector<topic_partitions> topics;
551+
topic_partitions tps;
552+
tps.topic = tp.topic;
553+
tps.partitions.push_back(tp.partition);
554+
topics.push_back(std::move(tps));
555+
556+
partition_offsets_map offsets_map;
557+
if (*leader == _self) {
558+
offsets_map = co_await _local_service->local().get_offsets(
559+
std::move(topics));
560+
} else {
561+
auto remote_result = co_await get_remote_partition_offsets(
562+
*leader, std::move(topics));
563+
if (remote_result.has_error()) {
564+
co_return ret_t(remote_result.error());
565+
}
566+
offsets_map = std::move(remote_result.value());
567+
}
568+
569+
// Extract the single result.
570+
auto topic_it = offsets_map.find(tp.topic);
571+
if (topic_it == offsets_map.end()) {
572+
co_return ret_t(cluster::errc::not_leader);
573+
}
574+
auto part_it = topic_it->second.find(tp.partition);
575+
if (part_it == topic_it->second.end()) {
576+
co_return ret_t(cluster::errc::not_leader);
577+
}
578+
auto& por = part_it->second;
579+
if (por.err != cluster::errc::success) {
580+
co_return ret_t(por.err);
581+
}
582+
co_return ret_t(por.offsets);
583+
}
584+
527585
ss::future<result<consume_reply, cluster::errc>> client::consume(
528586
model::topic_partition tp,
529587
kafka::offset start_offset,

src/v/kafka/data/rpc/client.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ class client {
9494
ss::future<result<partition_offsets_map, cluster::errc>>
9595
get_partition_offsets(chunked_vector<topic_partitions>);
9696

97+
/// Get offsets for a single partition. Retries with leadership
98+
/// mitigation — suitable for latency-sensitive callers like schema
99+
/// registry that always target a single known partition.
100+
ss::future<result<partition_offsets, cluster::errc>>
101+
get_single_partition_offsets(model::topic_partition);
102+
97103
ss::future<result<consume_reply, cluster::errc>> consume(
98104
model::topic_partition,
99105
kafka::offset start_offset,
@@ -108,6 +114,9 @@ class client {
108114
ss::future<produce_reply>
109115
do_remote_produce(model::node_id, produce_request);
110116

117+
ss::future<result<partition_offsets, cluster::errc>>
118+
do_get_single_partition_offsets_once(model::topic_partition);
119+
111120
ss::future<result<partition_offsets_map, cluster::errc>>
112121
get_remote_partition_offsets(
113122
model::node_id, chunked_vector<topic_partitions> topics);

src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,14 @@ class KafkaDataRpcTest : public ::testing::TestWithParam<test_parameters> {
174174
.offsets.high_watermark;
175175
}
176176

177+
result<partition_offsets, cluster::errc>
178+
get_single_partition_offsets(model::topic_partition tp) {
179+
return _kd->client()
180+
.local()
181+
.get_single_partition_offsets(std::move(tp))
182+
.get();
183+
}
184+
177185
result<consume_reply, cluster::errc> consume(
178186
model::topic_partition tp,
179187
kafka::offset start_offset,
@@ -405,6 +413,48 @@ TEST_P(KafkaDataRpcTest, ProduceWithLeaderMitigationRetries) {
405413
ASSERT_TRUE(r.last_offset.has_value());
406414
EXPECT_EQ(get_hwm(ntp), kafka::offset(model::next_offset(*r.last_offset)));
407415
}
416+
417+
TEST_P(KafkaDataRpcTest, ClientCanGetSinglePartitionOffsets) {
418+
auto ntp = make_ntp("single_offsets");
419+
create_topic(model::topic_namespace(ntp.ns, ntp.tp.topic));
420+
421+
auto res = get_single_partition_offsets(ntp.tp);
422+
ASSERT_TRUE(res.has_value());
423+
// No records produced — HWM is 0 (next of -1), LSO is -1
424+
EXPECT_EQ(res.value().high_watermark, kafka::offset(0));
425+
EXPECT_EQ(res.value().last_stable_offset, kafka::offset(-1));
426+
427+
// After producing a record, offsets should advance.
428+
auto batch = model::test::make_random_batch({.count = 1, .records = 1});
429+
auto pr = produce_with_leader_mitigation(ntp, std::move(batch));
430+
ASSERT_EQ(pr.ec, cluster::errc::success);
431+
432+
auto res2 = get_single_partition_offsets(ntp.tp);
433+
ASSERT_TRUE(res2.has_value());
434+
EXPECT_EQ(
435+
res2.value().high_watermark,
436+
kafka::offset(model::next_offset(*pr.last_offset)));
437+
}
438+
439+
TEST_P(KafkaDataRpcTest, GetSinglePartitionOffsetsReturnsTopicNotExists) {
440+
auto ntp = make_ntp("does-not-exist");
441+
auto res = get_single_partition_offsets(ntp.tp);
442+
ASSERT_TRUE(res.has_error());
443+
EXPECT_EQ(res.error(), cluster::errc::topic_not_exists);
444+
}
445+
446+
TEST_P(KafkaDataRpcTest, GetSinglePartitionOffsetsRetries) {
447+
auto ntp = make_ntp("retry_offsets");
448+
create_topic(model::topic_namespace(ntp.ns, ntp.tp.topic));
449+
450+
// Inject timeouts on the first 2 attempts; the basic retry policy
451+
// should back off and succeed on the 3rd attempt.
452+
set_errors_to_inject(2);
453+
auto res = get_single_partition_offsets(ntp.tp);
454+
ASSERT_TRUE(res.has_value());
455+
EXPECT_EQ(res.value().high_watermark, kafka::offset(0));
456+
EXPECT_EQ(res.value().last_stable_offset, kafka::offset(-1));
457+
}
408458
INSTANTIATE_TEST_SUITE_P(
409459
WorksLocallyAndRemotely,
410460
KafkaDataRpcTest,

0 commit comments

Comments
 (0)