Skip to content

Commit a557c8b

Browse files
authored
Merge branch 'release-7.3' into 7.3_DD_init_logging
2 parents 61c6221 + bf78a46 commit a557c8b

22 files changed

Lines changed: 694 additions & 295 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ flow/coveragetool/obj
9494
/.ccls-cache
9595
/.clangd
9696
/.cache
97+
TAGS
9798

9899
# Temporary and user configuration files
99100
*~

fdbclient/ServerKnobs.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
969969
init( DD_PREFER_LOW_READ_UTIL_TEAM, true );
970970
init( DD_TRACE_MOVE_BYTES_AVERAGE_INTERVAL, 120);
971971
init( MOVING_WINDOW_SAMPLE_SIZE, 10000000); // 10MB
972+
init( DD_TEAMS_BY_SERVER_IDS_CONSISTENCY_CHECK_PROB_SIM, 0.0 ); if( isSimulated ) DD_TEAMS_BY_SERVER_IDS_CONSISTENCY_CHECK_PROB_SIM = (deterministicRandom()->random01() * 0.4) + 0.1;
972973

973974
//Storage Server
974975
init( STORAGE_LOGGING_DELAY, 5.0 );
@@ -1132,6 +1133,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
11321133
init( CONFIGURATION_ROWS_TO_FETCH, 20000 );
11331134
init( DISABLE_DUPLICATE_LOG_WARNING, false );
11341135
init( HISTOGRAM_REPORT_INTERVAL, 300.0 );
1136+
init( GENERIC_METRICS_REPORT_INTERVAL, isSimulated ? 10.0 : 300.0 );
11351137

11361138
// Timekeeper
11371139
init( TIME_KEEPER_DELAY, 10 );

fdbclient/include/fdbclient/ServerKnobs.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,9 @@ class ServerKnobs : public KnobsImpl<ServerKnobs> {
986986
// Rolling window duration over which the average bytes moved by DD is calculated for the 'MovingData' trace event.
987987
double DD_TRACE_MOVE_BYTES_AVERAGE_INTERVAL;
988988
int64_t MOVING_WINDOW_SAMPLE_SIZE;
989+
// Probability of running the consistency check between teams and teamsByServerIDs
990+
// in getTeamByServers (simulation only). 0.0 in prod, random [0.1, 0.5) in simulation.
991+
double DD_TEAMS_BY_SERVER_IDS_CONSISTENCY_CHECK_PROB_SIM;
989992

990993
// Storage Server
991994
double STORAGE_LOGGING_DELAY;
@@ -1144,6 +1147,7 @@ class ServerKnobs : public KnobsImpl<ServerKnobs> {
11441147
int CONFIGURATION_ROWS_TO_FETCH;
11451148
bool DISABLE_DUPLICATE_LOG_WARNING;
11461149
double HISTOGRAM_REPORT_INTERVAL;
1150+
double GENERIC_METRICS_REPORT_INTERVAL;
11471151

11481152
// Timekeeper
11491153
int64_t TIME_KEEPER_DELAY;

fdbrpc/FlowTransport.actor.cpp

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -315,16 +315,6 @@ class TransportData {
315315
countConnEstablished.init("Net2.CountConnEstablished"_sr);
316316
countConnClosedWithError.init("Net2.CountConnClosedWithError"_sr);
317317
countConnClosedWithoutError.init("Net2.CountConnClosedWithoutError"_sr);
318-
countConnIncompatible.init("Net2.CountConnIncompatible"_sr);
319-
countConnIncompatibleWithOldClient.init("Net2.CountConnIncompatibleWithOldClient"_sr);
320-
countConnHandshakeAccepted.init("Net2.CountConnHandshakeAccepted"_sr);
321-
countConnHandshakeRequested.init("Net2.CountConnHandshakeRequested"_sr);
322-
countIncomingConnRequested.init("Net2.CountIncomingConnRequested"_sr);
323-
countIncomingConnAccepted.init("Net2.CountIncomingConnAccepted"_sr);
324-
countOutgoingConnHandshakeComplete.init("Net2.CountOutgoingConnHandshakeComplete"_sr);
325-
countOutgoingConnHandshakeRequested.init("Net2.CountOutgoingConnHandshakeRequested"_sr);
326-
countIncomingConnectionTimedout.init("Net2.CountIncomingConnectionTimedout"_sr);
327-
countIncomingConnConnected.init("Net2.CountIncomingConnConnected"_sr);
328318
}
329319

330320
Reference<struct Peer> getPeer(NetworkAddress const& address);
@@ -353,16 +343,6 @@ class TransportData {
353343
Int64MetricHandle countConnEstablished;
354344
Int64MetricHandle countConnClosedWithError;
355345
Int64MetricHandle countConnClosedWithoutError;
356-
Int64MetricHandle countConnIncompatible;
357-
Int64MetricHandle countConnIncompatibleWithOldClient;
358-
Int64MetricHandle countConnHandshakeAccepted;
359-
Int64MetricHandle countConnHandshakeRequested;
360-
Int64MetricHandle countIncomingConnRequested;
361-
Int64MetricHandle countIncomingConnAccepted;
362-
Int64MetricHandle countOutgoingConnHandshakeComplete;
363-
Int64MetricHandle countOutgoingConnHandshakeRequested;
364-
Int64MetricHandle countIncomingConnectionTimedout;
365-
Int64MetricHandle countIncomingConnConnected;
366346

367347
std::map<NetworkAddress, std::pair<uint64_t, double>> incompatiblePeers;
368348
AsyncTrigger incompatiblePeersChanged;
@@ -839,9 +819,14 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
839819
when(Reference<IConnection> _conn =
840820
wait(INetworkConnections::net()->connect(self->destination))) {
841821
conn = _conn;
842-
self->transport->countOutgoingConnHandshakeRequested++;
822+
static SimpleCounter<int64_t>* countOutgoingConnectionCreated =
823+
SimpleCounter<int64_t>::makeCounter("/Transport/TLS/OutgoingConnectionCreated");
824+
countOutgoingConnectionCreated->increment(1);
843825
wait(conn->connectHandshake());
844-
self->transport->countOutgoingConnHandshakeComplete++;
826+
static SimpleCounter<int64_t>* countOutgoingConnectionHandshakeComplete =
827+
SimpleCounter<int64_t>::makeCounter(
828+
"/Transport/TLS/OutgoingConnectionHandshakeComplete");
829+
countOutgoingConnectionHandshakeComplete->increment(1);
845830
self->connectLatencies.addSample(now() - self->lastConnectTime);
846831
if (FlowTransport::isClient()) {
847832
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false));
@@ -1527,12 +1512,17 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
15271512
now() + FLOW_KNOBS->CONNECTION_ID_TIMEOUT;
15281513
}
15291514
compatible = false;
1530-
transport->countConnIncompatible++;
1515+
static SimpleCounter<int64_t>* countConnectionIncompatible =
1516+
SimpleCounter<int64_t>::makeCounter("/Transport/TLS/ConnectionIncompatible");
1517+
countConnectionIncompatible->increment(1);
15311518
if (!protocolVersion.hasInexpensiveMultiVersionClient()) {
15321519
if (peer) {
15331520
peer->protocolVersion->set(protocolVersion);
15341521
}
1535-
transport->countConnIncompatibleWithOldClient++;
1522+
static SimpleCounter<int64_t>* countConnectionIncompatibleWithVeryOldClient =
1523+
SimpleCounter<int64_t>::makeCounter(
1524+
"/Transport/TLS/ConnectionIncompatibleWithVeryOldClient");
1525+
countConnectionIncompatibleWithVeryOldClient->increment(1);
15361526
// Older versions expected us to hang up. It may work even if we don't hang up here, but
15371527
// it's safer to keep the old behavior.
15381528
throw incompatible_protocol_version();
@@ -1631,9 +1621,10 @@ ACTOR static Future<Void> connectionIncoming(TransportData* self, Reference<ICon
16311621
entry.time = now();
16321622
entry.addr = conn->getPeerAddress();
16331623
try {
1634-
self->countConnHandshakeRequested++;
16351624
wait(conn->acceptHandshake());
1636-
self->countConnHandshakeAccepted++;
1625+
static SimpleCounter<int64_t>* countIncomingConnectionHandshakeAccepted =
1626+
SimpleCounter<int64_t>::makeCounter("/Transport/TLS/IncomingConnectionHandshakeAccepted");
1627+
countIncomingConnectionHandshakeAccepted->increment(1);
16371628
state Promise<Reference<Peer>> onConnected;
16381629
state Future<Void> reader = connectionReader(self, conn, Reference<Peer>(), onConnected);
16391630
if (FLOW_KNOBS->LOG_CONNECTION_ATTEMPTS_ENABLED) {
@@ -1650,17 +1641,24 @@ ACTOR static Future<Void> connectionIncoming(TransportData* self, Reference<ICon
16501641
}
16511642
when(wait(delayJittered(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
16521643
CODE_PROBE(true, "Incoming connection timed out");
1653-
self->countIncomingConnectionTimedout++;
1644+
static SimpleCounter<int64_t>* countIncomingConnectionTimedout =
1645+
SimpleCounter<int64_t>::makeCounter("/Transport/TLS/IncomingConnectionTimedout");
1646+
countIncomingConnectionTimedout->increment(1);
16541647
throw timed_out();
16551648
}
16561649
}
1657-
self->countIncomingConnConnected++;
1650+
static SimpleCounter<int64_t>* countIncomingConnectionConnected =
1651+
SimpleCounter<int64_t>::makeCounter("/Transport/TLS/IncomingConnectionConnected");
1652+
countIncomingConnectionConnected->increment(1);
16581653
} catch (Error& e) {
16591654
if (e.code() != error_code_actor_cancelled) {
16601655
TraceEvent("IncomingConnectionError", conn->getDebugID())
16611656
.errorUnsuppressed(e)
16621657
.suppressFor(1.0)
16631658
.detail("FromAddress", conn->getPeerAddress());
1659+
static SimpleCounter<int64_t>* countIncomingConnectionFailed =
1660+
SimpleCounter<int64_t>::makeCounter("/Transport/TLS/IncomingConnectionFailed");
1661+
countIncomingConnectionFailed->increment(1);
16641662
if (FLOW_KNOBS->LOG_CONNECTION_ATTEMPTS_ENABLED) {
16651663
entry.failed = true;
16661664
self->connectionHistory.push_back(entry);
@@ -1685,9 +1683,10 @@ ACTOR static Future<Void> listen(TransportData* self, NetworkAddress listenAddr)
16851683
state uint64_t connectionCount = 0;
16861684
try {
16871685
loop {
1688-
self->countIncomingConnRequested++;
16891686
Reference<IConnection> conn = wait(listener->accept());
1690-
self->countIncomingConnAccepted++;
1687+
static SimpleCounter<int64_t>* countIncomingConnectionCreated =
1688+
SimpleCounter<int64_t>::makeCounter("/Transport/TLS/IncomingConnectionCreated");
1689+
countIncomingConnectionCreated->increment(1);
16911690
if (conn) {
16921691
TraceEvent("ConnectionFrom", conn->getDebugID())
16931692
.suppressFor(1.0)

fdbserver/DDTeamCollection.actor.cpp

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,17 +185,43 @@ class DDTeamCollectionImpl {
185185

186186
// Find the team with the exact storage servers as req.src.
187187
static void getTeamByServers(DDTeamCollection* self, GetTeamRequest req) {
188-
const std::string servers = TCTeamInfo::serversToString(req.src);
188+
getTeamByServersConsistencyCheckInSim(self);
189189
Optional<Reference<IDataDistributionTeam>> res;
190-
for (const auto& team : self->teams) {
191-
if (team->getServerIDsStr() == servers) {
192-
res = team;
193-
break;
194-
}
190+
auto it = self->teamsByServerIDs.find(TCTeamInfo::serversToString(req.src));
191+
if (it != self->teamsByServerIDs.end()) {
192+
res = it->second;
195193
}
196194
req.reply.send(std::make_pair(res, false));
197195
}
198196

197+
// Probabilistic consistency check between teams and teamsByServerIDs
198+
// Run only in simulation with a probability of DD_TEAMS_BY_SERVER_IDS_CONSISTENCY_CHECK_PROB_SIM
199+
// We may need to tune this knob if simulation runs too slowly (in real-time) and results in
200+
// ExternalTimeout in Joshua
201+
static void getTeamByServersConsistencyCheckInSim(DDTeamCollection* self) {
202+
// This check can be expensive in prod so only run it in simulation
203+
if (!g_network->isSimulated()) {
204+
return;
205+
}
206+
207+
if (deterministicRandom()->random01() < SERVER_KNOBS->DD_TEAMS_BY_SERVER_IDS_CONSISTENCY_CHECK_PROB_SIM) {
208+
std::unordered_map<std::string, Reference<TCTeamInfo>> expected;
209+
for (const auto& team : self->teams) {
210+
expected[team->getServerIDsStr()] = team;
211+
}
212+
ASSERT(expected.size() == self->teamsByServerIDs.size());
213+
for (const auto& [key, value] : expected) {
214+
auto it = self->teamsByServerIDs.find(key);
215+
ASSERT(it != self->teamsByServerIDs.end());
216+
ASSERT(it->second == value);
217+
}
218+
TraceEvent("TeamByServerIDsConsistencyCheckPassed")
219+
.suppressFor(5.0)
220+
.detail("TeamsSize", self->teams.size())
221+
.detail("MapSize", self->teamsByServerIDs.size());
222+
}
223+
}
224+
199225
// Return a threshold of team queue size which guarantees at least DD_LONG_STORAGE_QUEUE_TEAM_MAJORITY_PERCENTILE
200226
// portion of teams that have longer storage queues
201227
// A team storage queue size is defined as the longest storage queue size among all SSes of the team
@@ -4783,6 +4809,7 @@ void DDTeamCollection::addTeam(const std::vector<Reference<TCServerInfo>>& newTe
47834809

47844810
// For a good team, we add it to teams and create machine team for it when necessary
47854811
teams.push_back(teamInfo);
4812+
teamsByServerIDs[teamInfo->getServerIDsStr()] = teamInfo;
47864813
for (auto& server : newTeamServers) {
47874814
server->addTeam(teamInfo);
47884815
}
@@ -5708,6 +5735,10 @@ void DDTeamCollection::addServer(StorageServerInterface newServer,
57085735

57095736
bool DDTeamCollection::removeTeam(Reference<TCTeamInfo> team) {
57105737
TraceEvent("RemovedServerTeam", distributorId).detail("Team", team->getDesc());
5738+
auto it = teamsByServerIDs.find(team->getServerIDsStr());
5739+
if (it != teamsByServerIDs.end()) {
5740+
teamsByServerIDs.erase(it);
5741+
}
57115742
bool found = false;
57125743
for (int t = 0; t < teams.size(); t++) {
57135744
if (teams[t] == team) {

fdbserver/VersionedBTree.actor.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10008,8 +10008,8 @@ TEST_CASE("Lredwood/correctness/unit/deltaTree/IntIntPair") {
1000810008
return Void();
1000910009
}
1001010010

10011-
struct SimpleCounter {
10012-
SimpleCounter() : x(0), t(timer()), start(t), xt(0) {}
10011+
struct ReallySimpleCounter {
10012+
ReallySimpleCounter() : x(0), t(timer()), start(t), xt(0) {}
1001310013
void operator+=(int n) { x += n; }
1001410014
void operator++() { x++; }
1001510015
int64_t get() { return x; }
@@ -10214,12 +10214,12 @@ TEST_CASE("Lredwood/correctness/btree") {
1021410214

1021510215
state Version version = lastVer + 1;
1021610216

10217-
state SimpleCounter mutationBytes;
10218-
state SimpleCounter keyBytesInserted;
10219-
state SimpleCounter valueBytesInserted;
10220-
state SimpleCounter sets;
10221-
state SimpleCounter rangeClears;
10222-
state SimpleCounter keyBytesCleared;
10217+
state ReallySimpleCounter mutationBytes;
10218+
state ReallySimpleCounter keyBytesInserted;
10219+
state ReallySimpleCounter valueBytesInserted;
10220+
state ReallySimpleCounter sets;
10221+
state ReallySimpleCounter rangeClears;
10222+
state ReallySimpleCounter keyBytesCleared;
1022310223
state int mutationBytesThisCommit = 0;
1022410224
state int mutationBytesTargetThisCommit = randomSize(maxCommitSize);
1022510225

fdbserver/fdbserver.actor.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
#include "flow/FaultInjection.h"
8686
#include "flow/flow.h"
8787
#include "flow/network.h"
88+
#include "flow/SimpleCounter.h"
8889

8990
#if defined(__linux__) || defined(__FreeBSD__)
9091
#include <execinfo.h>
@@ -381,6 +382,14 @@ ACTOR Future<Void> histogramReport() {
381382
}
382383
}
383384

385+
ACTOR Future<Void> metricsReport() {
386+
loop {
387+
wait(delay(SERVER_KNOBS->GENERIC_METRICS_REPORT_INTERVAL));
388+
389+
simpleCounterReport();
390+
}
391+
}
392+
384393
void testSerializationSpeed() {
385394
double tstart;
386395
double build = 0, serialize = 0, deserialize = 0, copy = 0, deallocate = 0;
@@ -1944,11 +1953,6 @@ int main(int argc, char* argv[]) {
19441953
// Enables profiling on this thread (but does not start it)
19451954
registerThreadForProfiling();
19461955

1947-
#ifdef _WIN32
1948-
// Windows needs a gentle nudge to format floats correctly
1949-
//_set_output_format(_TWO_DIGIT_EXPONENT);
1950-
#endif
1951-
19521956
auto opts = CLIOptions::parseArgs(argc, argv);
19531957
const auto role = opts.role;
19541958

@@ -2019,8 +2023,8 @@ int main(int argc, char* argv[]) {
20192023
flushAndExit(FDB_EXIT_SUCCESS);
20202024
}
20212025

2022-
// Initialize the thread pool
20232026
CoroThreadPool::init();
2027+
20242028
// Ordinarily, this is done when the network is run. However, network thread should be set before TraceEvents
20252029
// are logged. This thread will eventually run the network, so call it now.
20262030
TraceEvent::setNetworkThread();
@@ -2159,6 +2163,7 @@ int main(int argc, char* argv[]) {
21592163
TraceEvent("Simulation").detail("TestFile", opts.testFile);
21602164

21612165
auto histogramReportActor = histogramReport();
2166+
auto metricsReportActor = metricsReport();
21622167

21632168
CLIENT_KNOBS->trace();
21642169
FLOW_KNOBS->trace();
@@ -2345,7 +2350,14 @@ int main(int argc, char* argv[]) {
23452350
opts.configDBType,
23462351
opts.consistencyCheckUrgentMode));
23472352
actors.push_back(histogramReport());
2348-
// actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement
2353+
actors.push_back(metricsReport());
2354+
2355+
#ifdef FLOW_GRPC_ENABLED
2356+
if (opts.grpcAddressStrs.size() > 0) {
2357+
FlowGrpc::init(&opts.tlsConfig, NetworkAddress::parse(opts.grpcAddressStrs[0]));
2358+
actors.push_back(GrpcServer::instance()->run());
2359+
}
2360+
#endif
23492361

23502362
f = stopAfter(waitForAll(actors));
23512363
g_network->run();
@@ -2397,6 +2409,7 @@ int main(int argc, char* argv[]) {
23972409
setupRunLoopProfiler();
23982410
auto m =
23992411
startSystemMonitor(opts.dataFolder, opts.dcId, opts.zoneId, opts.zoneId, opts.localities.dataHallId());
2412+
auto metricsReportActor = metricsReport();
24002413
f = stopAfter(runTests(opts.connectionFile,
24012414
TEST_TYPE_UNIT_TESTS,
24022415
TEST_HERE,

fdbserver/include/fdbserver/DDTeamCollection.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,7 +681,12 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
681681
std::map<Standalone<StringRef>, Reference<TCMachineInfo>> machine_info;
682682
std::vector<Reference<TCMachineTeamInfo>> machineTeams; // all machine teams
683683

684+
// IMPORTANT: teams and teamsByServerIDs MUST be consistent, so any time we
685+
// mutate teams, we must also mutate teamsByServerIDs
684686
std::vector<Reference<TCTeamInfo>> teams;
687+
// O(1) hash map from server ID string to team information
688+
// Currently used by getTeamByServers
689+
std::unordered_map<std::string, Reference<TCTeamInfo>> teamsByServerIDs;
685690

686691
std::vector<DDTeamCollection*> teamCollections;
687692
AsyncTrigger printDetailedTeamsInfo;

fdbserver/workloads/UnitTests.actor.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ void forceLinkCommitProxyTests();
5252
void forceLinkWipedStringTests();
5353
void forceLinkRandomKeyValueUtilsTests();
5454
void forceLinkSimKmsVaultTests();
55+
void forceLinkRESTSimKmsVaultTest();
56+
void forceLinkActorFuzzUnitTests();
57+
void forceLinkGrpcTests();
58+
void forceLinkGrpcTests2();
59+
void forceLinkSimpleCounterTests();
5560

5661
struct UnitTestWorkload : TestWorkload {
5762
static constexpr auto NAME = "UnitTests";
@@ -118,7 +123,12 @@ struct UnitTestWorkload : TestWorkload {
118123
forceLinkDDSketchTests();
119124
forceLinkWipedStringTests();
120125
forceLinkRandomKeyValueUtilsTests();
121-
forceLinkSimKmsVaultTests();
126+
forceLinkSimpleCounterTests();
127+
128+
#ifdef FLOW_GRPC_ENABLED
129+
forceLinkGrpcTests();
130+
forceLinkGrpcTests2();
131+
#endif
122132
}
123133

124134
Future<Void> setup(Database const& cx) override {

0 commit comments

Comments
 (0)