@@ -583,43 +583,56 @@ client::do_get_single_partition_offsets_once(model::topic_partition tp) {
583583}
584584
585585ss::future<result<consume_reply, cluster::errc>> client::consume (
586+ model::topic_partition tp,
587+ kafka::offset start_offset,
588+ kafka::offset max_offset,
589+ size_t min_bytes,
590+ size_t max_bytes,
591+ model::timeout_clock::duration timeout) {
592+ co_return co_await retry_with_leader_mitigation (
593+ [this , tp, start_offset, max_offset, min_bytes, max_bytes, timeout]() {
594+ return do_consume_once (
595+ tp, start_offset, max_offset, min_bytes, max_bytes, timeout);
596+ });
597+ }
598+
599+ ss::future<result<consume_reply, cluster::errc>> client::do_consume_once (
586600 model::topic_partition tp,
587601 kafka::offset start_offset,
588602 kafka::offset max_offset,
589603 size_t min_bytes,
590604 size_t max_bytes,
591605 model::timeout_clock::duration timeout) {
592606 using ret_t = result<consume_reply, cluster::errc>;
607+ model::topic_namespace_view tp_ns (model::kafka_namespace, tp.topic );
593608
594- // Check if topic exists first
595- auto topic_cfg = _metadata_cache->find_topic_cfg (
596- model::topic_namespace_view (model::kafka_namespace, tp.topic ));
609+ auto topic_cfg = _metadata_cache->find_topic_cfg (tp_ns);
597610 if (!topic_cfg) {
598611 consume_reply reply;
599612 reply.tp = tp;
600613 reply.err = cluster::errc::topic_not_exists;
601614 co_return ret_t (std::move (reply));
602615 }
603616
604- // Find the leader for this partition
605- auto ktp = model::ktp (tp.topic , tp.partition );
606- auto leader = _leaders->get_leader_node (
607- model::topic_namespace_view (model::kafka_namespace, tp.topic ),
608- tp.partition );
609-
617+ auto leader = _leaders->get_leader_node (tp_ns, tp.partition );
610618 if (!leader) {
611- consume_reply reply;
612- reply.tp = tp;
613- reply.err = cluster::errc::not_leader;
614- co_return ret_t (std::move (reply));
619+ co_return ret_t (cluster::errc::not_leader);
615620 }
616621
617622 consume_request req (
618623 tp, start_offset, max_offset, min_bytes, max_bytes, timeout);
619624
625+ auto is_retriable = [](cluster::errc ec) -> bool {
626+ return ec == cluster::errc::not_leader || ec == cluster::errc::timeout
627+ || ec == cluster::errc::partition_operation_failed;
628+ };
629+
620630 // If leader is local, call local service
621631 if (*leader == _self) {
622632 auto reply = co_await _local_service->local ().consume (std::move (req));
633+ if (is_retriable (reply.err )) {
634+ co_return ret_t (reply.err );
635+ }
623636 co_return ret_t (std::move (reply));
624637 }
625638
@@ -643,12 +656,13 @@ ss::future<result<consume_reply, cluster::errc>> client::consume(
643656 });
644657
645658 if (result.has_error ()) {
646- consume_reply reply;
647- reply.tp = tp;
648- reply.err = map_errc (result.assume_error ());
649- co_return ret_t (std::move (reply));
659+ co_return ret_t (map_errc (result.assume_error ()));
650660 }
651661
652- co_return ret_t (std::move (result.value ()));
662+ auto reply = std::move (result).value ();
663+ if (is_retriable (reply.err )) {
664+ co_return ret_t (reply.err );
665+ }
666+ co_return ret_t (std::move (reply));
653667}
654668} // namespace kafka::data::rpc
0 commit comments