diff --git a/fdbclient/DatabaseContext.actor.cpp b/fdbclient/DatabaseContext.actor.cpp index 7eea7dbf16b..e5543d547d5 100644 --- a/fdbclient/DatabaseContext.actor.cpp +++ b/fdbclient/DatabaseContext.actor.cpp @@ -62,6 +62,7 @@ #include "fdbclient/CommitProxyInterface.h" #include "fdbclient/MonitorLeader.h" #include "fdbclient/MutationList.h" +#include "ProxyLoadBalance.h" #include "fdbclient/ReadYourWrites.h" #include "fdbclient/SpecialKeySpace.h" #include "fdbclient/StorageServerInterface.h" @@ -454,22 +455,17 @@ inline HealthMetrics populateHealthMetrics(const HealthMetrics& detailedMetrics, } } -ACTOR static Future getHealthMetricsActor(DatabaseContext* cx, bool detailed, bool sendDetailedRequest) { - loop { - choose { - when(wait(cx->onProxiesChanged())) {} - when(GetHealthMetricsReply rep = wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies::False), - &GrvProxyInterface::getHealthMetrics, - GetHealthMetricsRequest(sendDetailedRequest)))) { - cx->healthMetrics.update(rep.healthMetrics, sendDetailedRequest, true); - cx->healthMetricsLastUpdated = now(); - if (sendDetailedRequest) { - cx->detailedHealthMetricsLastUpdated = now(); - } - return populateHealthMetrics(cx->healthMetrics, detailed); - } - } +static Future getHealthMetricsActor(DatabaseContext* cx, bool detailed, bool sendDetailedRequest) { + GetHealthMetricsReply rep = + co_await grvProxyLoadBalance(Database(Reference::addRef(cx)), + makeReqBuilder(sendDetailedRequest), + &GrvProxyInterface::getHealthMetrics); + cx->healthMetrics.update(rep.healthMetrics, sendDetailedRequest, true); + cx->healthMetricsLastUpdated = now(); + if (sendDetailedRequest) { + cx->detailedHealthMetricsLastUpdated = now(); } + co_return populateHealthMetrics(cx->healthMetrics, detailed); } Future DatabaseContext::getHealthMetrics(bool detailed = false) { diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index c23beabb7ba..a6e77253a54 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -92,6 +92,8 @@ #include "flow/network.h" #include "flow/serialize.h" +#include "ProxyLoadBalance.h" + #ifdef ADDRESS_SANITIZER #include #endif @@ -5947,23 +5949,12 @@ Future DatabaseContext::getStorageMetrics(KeyRange const& keys, } } -ACTOR Future>> waitDataDistributionMetricsList(Database cx, - KeyRange keys, - int shardLimit) { - loop { - choose { - when(wait(cx->onProxiesChanged())) {} - when(ErrorOr rep = - wait(errorOr(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False), - &CommitProxyInterface::getDDMetrics, - GetDDMetricsRequest(keys, shardLimit))))) { - if (rep.isError()) { - throw rep.getError(); - } - return rep.get().storageMetricsList; - } - } - } +Future>> waitDataDistributionMetricsList(Database cx, + KeyRange keys, + int shardLimit) { + GetDDMetricsReply rep = co_await commitProxyLoadBalance( + cx, makeReqBuilder(keys, shardLimit), &CommitProxyInterface::getDDMetrics); + co_return rep.storageMetricsList; } Future>> DatabaseContext::getReadHotRanges(KeyRange const& keys) { @@ -6330,22 +6321,14 @@ void enableClientInfoLogging() { TraceEvent(SevInfo, "ClientInfoLoggingEnabled").log(); } -ACTOR Future snapCreate(Database cx, Standalone snapCmd, UID snapUID) { +Future snapCreate(Database cx, Standalone snapCmd, UID snapUID) { TraceEvent("SnapCreateEnter").detail("SnapCmd", snapCmd).detail("UID", snapUID); try { - loop { - choose { - when(wait(cx->onProxiesChanged())) {} - when(wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False), - &CommitProxyInterface::proxySnapReq, - ProxySnapRequest(snapCmd, snapUID, snapUID), - cx->taskID, - AtMostOnce::True))) { - TraceEvent("SnapCreateExit").detail("SnapCmd", snapCmd).detail("UID", snapUID); - return Void(); - } - } - } + co_await commitProxyLoadBalance(cx, + makeReqBuilder(snapCmd, snapUID, snapUID), + &CommitProxyInterface::proxySnapReq, + AtMostOnce::True); + TraceEvent("SnapCreateExit").detail("SnapCmd", snapCmd).detail("UID", snapUID); } catch (Error& e) { TraceEvent("SnapCreateError").error(e).detail("SnapCmd", snapCmd.toString()).detail("UID", snapUID); throw; @@ -6576,19 +6559,11 @@ ACTOR Future checkSafeExclusions(Database cx, std::vectoronProxiesChanged())) {} - when(ExclusionSafetyCheckReply _ddCheck = - wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False), - &CommitProxyInterface::exclusionSafetyCheckReq, - ExclusionSafetyCheckRequest(exclusions), - cx->taskID))) { - ddCheck = _ddCheck.safe; - break; - } - } - } + ExclusionSafetyCheckReply _ddCheck = + wait(commitProxyLoadBalance(cx, + makeReqBuilder(exclusions), + &CommitProxyInterface::exclusionSafetyCheckReq)); + ddCheck = _ddCheck.safe; } catch (Error& e) { if (e.code() != error_code_actor_cancelled) { TraceEvent("ExclusionSafetyCheckError") diff --git a/fdbclient/ProxyLoadBalance.h b/fdbclient/ProxyLoadBalance.h new file mode 100644 index 00000000000..76113226758 --- /dev/null +++ b/fdbclient/ProxyLoadBalance.h @@ -0,0 +1,84 @@ +/* + * ProxyLoadBalance.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2026 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "fdbclient/CommitProxyInterface.h" +#include "fdbclient/GrvProxyInterface.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbrpc/LoadBalance.actor.h" + +// Stores constructor arguments so a request can be rebuilt on each retry. +template +class ReqBuilder { + std::tuple args; + +public: + explicit ReqBuilder(Args... args) : args(std::move(args)...) {} + + Req build() const { + return std::apply([](auto const&... unpackedArgs) { return Req(unpackedArgs...); }, args); + } +}; + +// Infers the stored argument types while ensuring the request is constructible. +template +ReqBuilder...> makeReqBuilder(Args&&... args) { + static_assert(std::is_constructible_v...>); + return ReqBuilder...>(std::forward(args)...); +} + +// Retries a commit-proxy request whenever the proxy set changes before a reply arrives. +template +Future commitProxyLoadBalance(Database cx, + Builder reqBuilder, + RequestStream CommitProxyInterface::* channel, + AtMostOnce atMostOnce = AtMostOnce::False, + ExplicitVoid = {}) { + while (true) { + Future replyFuture = basicLoadBalance( + cx->getCommitProxies(UseProvisionalProxies::False), channel, reqBuilder.build(), cx->taskID, atMostOnce); + auto res = co_await race(replyFuture, cx->onProxiesChanged()); + if (res.index() == 0) { + co_return std::get<0>(std::move(res)); + } + } +} + +// Retries a GRV-proxy request whenever the proxy set changes before a reply arrives. +template +Future grvProxyLoadBalance(Database cx, + Builder reqBuilder, + RequestStream GrvProxyInterface::* channel, + AtMostOnce atMostOnce = AtMostOnce::False, + ExplicitVoid = {}) { + while (true) { + Future replyFuture = basicLoadBalance( + cx->getGrvProxies(UseProvisionalProxies::False), channel, reqBuilder.build(), cx->taskID, atMostOnce); + auto res = co_await race(replyFuture, cx->onProxiesChanged()); + if (res.index() == 0) { + co_return std::get<0>(std::move(res)); + } + } +} diff --git a/fdbclient/include/fdbclient/NativeAPI.actor.h b/fdbclient/include/fdbclient/NativeAPI.actor.h index ca6bb747870..ce6f3e88355 100644 --- a/fdbclient/include/fdbclient/NativeAPI.actor.h +++ b/fdbclient/include/fdbclient/NativeAPI.actor.h @@ -543,9 +543,7 @@ Future Database::run(Fun fun) { } ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanContext spanContext); -ACTOR Future>> waitDataDistributionMetricsList(Database cx, - KeyRange keys, - int shardLimit); +Future>> waitDataDistributionMetricsList(Database cx, KeyRange keys, int shardLimit); int64_t extractIntOption(Optional value, int64_t minValue = std::numeric_limits::min(), @@ -553,7 +551,7 @@ int64_t extractIntOption(Optional value, // Takes a snapshot of the cluster, specifically the following persistent // states: coordinator, TLog and storage state -ACTOR Future snapCreate(Database cx, Standalone snapCmd, UID snapUID); +Future snapCreate(Database cx, Standalone snapCmd, UID snapUID); // Adds necessary mutation(s) to the transaction, so that *one* checkpoint will be created for // each and every shards overlapping with `ranges`.