@@ -933,45 +933,54 @@ reconciler<Clock>::commit_objects(
933933 // the metastore.
934934 const auto & corrected_next_offsets
935935 = add_objects_result.value ().corrected_next_offsets ;
936- std::optional<reconcile_error> error ;
936+ chunked_vector< const commit_info*> all_commits ;
937937 for (const auto & obj_meta : objects) {
938938 for (const auto & commit : obj_meta.commits ) {
939- auto tidp = commit.source ->topic_id_partition ();
940- kafka::offset lro = commit.metadata .last_offset ;
941- auto it = corrected_next_offsets.find (tidp);
942- if (it != corrected_next_offsets.end ()) {
943- _probe.increment_offset_corrections ();
944- // We want the previous offset, because that is what was last
945- // reconciled. During next reconciliation we should get the
946- // offset *after* the LRO to start reading from.
947- lro = kafka::prev_offset (it->second );
948- }
949- auto result = co_await commit.source ->set_last_reconciled_offset (
950- lro, _as);
951- if (result.has_value ()) {
952- vlog (
953- lg.debug ,
954- " successfully bumped LRO for {} (tidp: {}) to {}" ,
955- commit.source ->ntp (),
956- tidp,
957- lro);
958- } else {
959- // Don't fail early, just keep going until we're done.
960- if (error) {
961- error = error->with_context (
962- " failed to set LRO in L0: {}" , result.error ());
963- } else {
964- error = reconcile_error (
965- " failed to set LRO in L0: {}" , result.error ());
966- }
967- if (result.error () == source::errc::failure) {
968- // Other errors can be expected in normal operating
969- // conditions.
970- error = error->non_benign ();
971- }
972- }
939+ all_commits.push_back (&commit);
973940 }
974941 }
942+ std::optional<reconcile_error> error;
943+ static constexpr size_t max_concurrent_lro_updates = 32 ;
944+ co_await ss::max_concurrent_for_each (
945+ all_commits,
946+ max_concurrent_lro_updates,
947+ [this , &corrected_next_offsets, &error](
948+ this auto , const commit_info* commit) -> ss::future<> {
949+ auto tidp = commit->source ->topic_id_partition ();
950+ kafka::offset lro = commit->metadata .last_offset ;
951+ auto it = corrected_next_offsets.find (tidp);
952+ if (it != corrected_next_offsets.end ()) {
953+ _probe.increment_offset_corrections ();
954+ // We want the previous offset, because that is what was last
955+ // reconciled. During next reconciliation we should get the
956+ // offset *after* the LRO to start reading from.
957+ lro = kafka::prev_offset (it->second );
958+ }
959+ auto result = co_await commit->source ->set_last_reconciled_offset (
960+ lro, _as);
961+ if (result.has_value ()) {
962+ vlog (
963+ lg.debug ,
964+ " successfully bumped LRO for {} (tidp: {}) to {}" ,
965+ commit->source ->ntp (),
966+ tidp,
967+ lro);
968+ co_return ;
969+ }
970+ // Don't fail early, just keep going until we're done.
971+ if (error) {
972+ error = error->with_context (
973+ " failed to set LRO in L0: {}" , result.error ());
974+ } else {
975+ error = reconcile_error (
976+ " failed to set LRO in L0: {}" , result.error ());
977+ }
978+ if (result.error () == source::errc::failure) {
979+ // Other errors can be expected in normal operating
980+ // conditions.
981+ error = error->non_benign ();
982+ }
983+ });
975984 co_return error
976985 .transform (
977986 [](reconcile_error& err) -> std::expected<void , reconcile_error> {
0 commit comments