Skip to content

Commit 0dc8ca5

Browse files
Create (grv|commit)ProxyLoadBalance helper coroutines (#12918)
* Create proxyLoadBalance coroutine * Update snapCreate to use proxyLoadBalance * Remove unnecessary errorOr call * Use proxyLoadBalance in waitDataDistributionMetricsList * Convert 2 proxyLoadBalance users to standard coroutines * Create grvProxyLoadBalance * Rename proxyLoadBalance * Use grvProxyLoadBalance in getHealthMetricsActor * Add comments in ProxyLoadBalance.h * Fix formatting
1 parent 42ee22e commit 0dc8ca5

4 files changed

Lines changed: 116 additions & 63 deletions

File tree

fdbclient/DatabaseContext.actor.cpp

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
#include "fdbclient/CommitProxyInterface.h"
6363
#include "fdbclient/MonitorLeader.h"
6464
#include "fdbclient/MutationList.h"
65+
#include "ProxyLoadBalance.h"
6566
#include "fdbclient/ReadYourWrites.h"
6667
#include "fdbclient/SpecialKeySpace.h"
6768
#include "fdbclient/StorageServerInterface.h"
@@ -454,22 +455,17 @@ inline HealthMetrics populateHealthMetrics(const HealthMetrics& detailedMetrics,
454455
}
455456
}
456457

457-
ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext* cx, bool detailed, bool sendDetailedRequest) {
458-
loop {
459-
choose {
460-
when(wait(cx->onProxiesChanged())) {}
461-
when(GetHealthMetricsReply rep = wait(basicLoadBalance(cx->getGrvProxies(UseProvisionalProxies::False),
462-
&GrvProxyInterface::getHealthMetrics,
463-
GetHealthMetricsRequest(sendDetailedRequest)))) {
464-
cx->healthMetrics.update(rep.healthMetrics, sendDetailedRequest, true);
465-
cx->healthMetricsLastUpdated = now();
466-
if (sendDetailedRequest) {
467-
cx->detailedHealthMetricsLastUpdated = now();
468-
}
469-
return populateHealthMetrics(cx->healthMetrics, detailed);
470-
}
471-
}
458+
static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext* cx, bool detailed, bool sendDetailedRequest) {
459+
GetHealthMetricsReply rep =
460+
co_await grvProxyLoadBalance(Database(Reference<DatabaseContext>::addRef(cx)),
461+
makeReqBuilder<GetHealthMetricsRequest>(sendDetailedRequest),
462+
&GrvProxyInterface::getHealthMetrics);
463+
cx->healthMetrics.update(rep.healthMetrics, sendDetailedRequest, true);
464+
cx->healthMetricsLastUpdated = now();
465+
if (sendDetailedRequest) {
466+
cx->detailedHealthMetricsLastUpdated = now();
472467
}
468+
co_return populateHealthMetrics(cx->healthMetrics, detailed);
473469
}
474470

475471
Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {

fdbclient/NativeAPI.actor.cpp

Lines changed: 19 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@
9292
#include "flow/network.h"
9393
#include "flow/serialize.h"
9494

95+
#include "ProxyLoadBalance.h"
96+
9597
#ifdef ADDRESS_SANITIZER
9698
#include <sanitizer/lsan_interface.h>
9799
#endif
@@ -5947,23 +5949,12 @@ Future<StorageMetrics> DatabaseContext::getStorageMetrics(KeyRange const& keys,
59475949
}
59485950
}
59495951

5950-
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx,
5951-
KeyRange keys,
5952-
int shardLimit) {
5953-
loop {
5954-
choose {
5955-
when(wait(cx->onProxiesChanged())) {}
5956-
when(ErrorOr<GetDDMetricsReply> rep =
5957-
wait(errorOr(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False),
5958-
&CommitProxyInterface::getDDMetrics,
5959-
GetDDMetricsRequest(keys, shardLimit))))) {
5960-
if (rep.isError()) {
5961-
throw rep.getError();
5962-
}
5963-
return rep.get().storageMetricsList;
5964-
}
5965-
}
5966-
}
5952+
Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx,
5953+
KeyRange keys,
5954+
int shardLimit) {
5955+
GetDDMetricsReply rep = co_await commitProxyLoadBalance(
5956+
cx, makeReqBuilder<GetDDMetricsRequest>(keys, shardLimit), &CommitProxyInterface::getDDMetrics);
5957+
co_return rep.storageMetricsList;
59675958
}
59685959

59695960
Future<Standalone<VectorRef<ReadHotRangeWithMetrics>>> DatabaseContext::getReadHotRanges(KeyRange const& keys) {
@@ -6330,22 +6321,14 @@ void enableClientInfoLogging() {
63306321
TraceEvent(SevInfo, "ClientInfoLoggingEnabled").log();
63316322
}
63326323

6333-
ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID) {
6324+
Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID) {
63346325
TraceEvent("SnapCreateEnter").detail("SnapCmd", snapCmd).detail("UID", snapUID);
63356326
try {
6336-
loop {
6337-
choose {
6338-
when(wait(cx->onProxiesChanged())) {}
6339-
when(wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False),
6340-
&CommitProxyInterface::proxySnapReq,
6341-
ProxySnapRequest(snapCmd, snapUID, snapUID),
6342-
cx->taskID,
6343-
AtMostOnce::True))) {
6344-
TraceEvent("SnapCreateExit").detail("SnapCmd", snapCmd).detail("UID", snapUID);
6345-
return Void();
6346-
}
6347-
}
6348-
}
6327+
co_await commitProxyLoadBalance(cx,
6328+
makeReqBuilder<ProxySnapRequest>(snapCmd, snapUID, snapUID),
6329+
&CommitProxyInterface::proxySnapReq,
6330+
AtMostOnce::True);
6331+
TraceEvent("SnapCreateExit").detail("SnapCmd", snapCmd).detail("UID", snapUID);
63496332
} catch (Error& e) {
63506333
TraceEvent("SnapCreateError").error(e).detail("SnapCmd", snapCmd.toString()).detail("UID", snapUID);
63516334
throw;
@@ -6576,19 +6559,11 @@ ACTOR Future<bool> checkSafeExclusions(Database cx, std::vector<AddressExclusion
65766559
.detail("Exclusions", describe(exclusions));
65776560
state bool ddCheck;
65786561
try {
6579-
loop {
6580-
choose {
6581-
when(wait(cx->onProxiesChanged())) {}
6582-
when(ExclusionSafetyCheckReply _ddCheck =
6583-
wait(basicLoadBalance(cx->getCommitProxies(UseProvisionalProxies::False),
6584-
&CommitProxyInterface::exclusionSafetyCheckReq,
6585-
ExclusionSafetyCheckRequest(exclusions),
6586-
cx->taskID))) {
6587-
ddCheck = _ddCheck.safe;
6588-
break;
6589-
}
6590-
}
6591-
}
6562+
ExclusionSafetyCheckReply _ddCheck =
6563+
wait(commitProxyLoadBalance(cx,
6564+
makeReqBuilder<ExclusionSafetyCheckRequest>(exclusions),
6565+
&CommitProxyInterface::exclusionSafetyCheckReq));
6566+
ddCheck = _ddCheck.safe;
65926567
} catch (Error& e) {
65936568
if (e.code() != error_code_actor_cancelled) {
65946569
TraceEvent("ExclusionSafetyCheckError")

fdbclient/ProxyLoadBalance.h

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* ProxyLoadBalance.h
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2013-2026 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#pragma once
22+
23+
#include <tuple>
24+
#include <type_traits>
25+
#include <utility>
26+
27+
#include "fdbclient/CommitProxyInterface.h"
28+
#include "fdbclient/GrvProxyInterface.h"
29+
#include "fdbclient/NativeAPI.actor.h"
30+
#include "fdbrpc/LoadBalance.actor.h"
31+
32+
// Stores constructor arguments so a request can be rebuilt on each retry.
33+
template <class Req, class... Args>
34+
class ReqBuilder {
35+
std::tuple<Args...> args;
36+
37+
public:
38+
explicit ReqBuilder(Args... args) : args(std::move(args)...) {}
39+
40+
Req build() const {
41+
return std::apply([](auto const&... unpackedArgs) { return Req(unpackedArgs...); }, args);
42+
}
43+
};
44+
45+
// Infers the stored argument types while ensuring the request is constructible.
46+
template <class Req, class... Args>
47+
ReqBuilder<Req, std::decay_t<Args>...> makeReqBuilder(Args&&... args) {
48+
static_assert(std::is_constructible_v<Req, std::decay_t<Args>...>);
49+
return ReqBuilder<Req, std::decay_t<Args>...>(std::forward<Args>(args)...);
50+
}
51+
52+
// Retries a commit-proxy request whenever the proxy set changes before a reply arrives.
53+
template <class Req, class Builder>
54+
Future<REPLY_TYPE(Req)> commitProxyLoadBalance(Database cx,
55+
Builder reqBuilder,
56+
RequestStream<Req> CommitProxyInterface::* channel,
57+
AtMostOnce atMostOnce = AtMostOnce::False,
58+
ExplicitVoid = {}) {
59+
while (true) {
60+
Future<REPLY_TYPE(Req)> replyFuture = basicLoadBalance(
61+
cx->getCommitProxies(UseProvisionalProxies::False), channel, reqBuilder.build(), cx->taskID, atMostOnce);
62+
auto res = co_await race(replyFuture, cx->onProxiesChanged());
63+
if (res.index() == 0) {
64+
co_return std::get<0>(std::move(res));
65+
}
66+
}
67+
}
68+
69+
// Retries a GRV-proxy request whenever the proxy set changes before a reply arrives.
70+
template <class Req, class Builder>
71+
Future<REPLY_TYPE(Req)> grvProxyLoadBalance(Database cx,
72+
Builder reqBuilder,
73+
RequestStream<Req> GrvProxyInterface::* channel,
74+
AtMostOnce atMostOnce = AtMostOnce::False,
75+
ExplicitVoid = {}) {
76+
while (true) {
77+
Future<REPLY_TYPE(Req)> replyFuture = basicLoadBalance(
78+
cx->getGrvProxies(UseProvisionalProxies::False), channel, reqBuilder.build(), cx->taskID, atMostOnce);
79+
auto res = co_await race(replyFuture, cx->onProxiesChanged());
80+
if (res.index() == 0) {
81+
co_return std::get<0>(std::move(res));
82+
}
83+
}
84+
}

fdbclient/include/fdbclient/NativeAPI.actor.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -543,17 +543,15 @@ Future<Void> Database::run(Fun fun) {
543543
}
544544

545545
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanContext spanContext);
546-
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx,
547-
KeyRange keys,
548-
int shardLimit);
546+
Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx, KeyRange keys, int shardLimit);
549547

550548
int64_t extractIntOption(Optional<StringRef> value,
551549
int64_t minValue = std::numeric_limits<int64_t>::min(),
552550
int64_t maxValue = std::numeric_limits<int64_t>::max());
553551

554552
// Takes a snapshot of the cluster, specifically the following persistent
555553
// states: coordinator, TLog and storage state
556-
ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID);
554+
Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID);
557555

558556
// Adds necessary mutation(s) to the transaction, so that *one* checkpoint will be created for
559557
// each and every shards overlapping with `ranges`.

0 commit comments

Comments
 (0)