Skip to content

Commit 02be074

Browse files
authored
Forward-port to 7.4: Add DD init and team collection logging for diagnosing slow startups (#13002)
* Add DD init visibility, metrics retries, shard tracking, scan progress, and team collection logging (#12913) Add logging throughout DD startup and runtime to diagnose slow startups, stuck data moves, and undesired server classification. DDTxnProcessor: Log elapsed time for server list + data move read transaction and keyServer scan. Warn when getRange(dataMoveKeys) takes over 5 seconds. DataDistribution: Add NumShards and NumServers to DDInitGotInitialDD. Add DDInitResumedDataMoves summary with ValidMoves, CancelledMoves, EmptyMoves counts and elapsed time. Log DD exit reason as DDExiting at SevWarn. Add DDInit-prefixed trace events throughout startup sequence. DDTeamCollection: Add Reason and Address details to UndesiredStorageServer trace events to distinguish version lag, same-address, wrong-class, and exclusion causes. DDShardTracker: Log TrackInitialShardsComplete with shard count and TrackInitialShardsMetricsComplete with elapsed time. NativeAPI: Add retry counting and logging for getStorageMetrics timeouts. * Remove double trace * Fix DD trace event issues: rate-limit warns, deduplicate events, fix DDExiting - Wire up unused lastLogTime in waitStorageMetrics to rate-limit SevWarn events to once per 10s after the 60s threshold. Previously every retry (up to 100/s with 10ms WRONG_SHARD_SERVER_DELAY) emitted SevWarn. - Remove redundant DataDistributorRunning and DDInitRunning events inside the try block. Rename the original DataDistributorRunning to DDInitRunning so a single event serves both purposes and fits the DDInit* query pattern. - Move DDExiting to the three actual throw sites so it only fires on terminal exits, not on retries of movekeys_conflict/dd_config_changed when DD is disabled. * Trim over-apologetic DDInitDone comment
1 parent e2be8c1 commit 02be074

6 files changed

Lines changed: 138 additions & 9 deletions

File tree

fdbclient/NativeAPI.actor.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8144,6 +8144,9 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(
81448144
int expectedShardCount,
81458145
Optional<Reference<TransactionState>> trState) {
81468146
state Span span("NAPI:WaitStorageMetrics"_loc, generateSpanID(cx->transactionTracingSample));
8147+
state double startTime = now();
8148+
state double lastLogTime = 0;
8149+
state int retryCount = 0;
81478150
loop {
81488151
if (trState.present()) {
81498152
wait(trState.get()->startTransaction());
@@ -8191,7 +8194,21 @@ ACTOR Future<std::pair<Optional<StorageMetrics>, int>> waitStorageMetrics(
81918194
return std::make_pair(res, -1);
81928195
}
81938196
} catch (Error& e) {
8194-
TraceEvent(SevDebug, "WaitStorageMetricsHandleError").error(e);
8197+
retryCount++;
8198+
// Upgrade from SevDebug to SevWarn after 60 seconds of retrying,
8199+
// but rate-limit warns to avoid flooding the trace log on fast retries.
8200+
Severity sev = SevDebug;
8201+
if (now() - startTime > 60.0) {
8202+
if (now() - lastLogTime >= 10.0) {
8203+
sev = SevWarn;
8204+
lastLogTime = now();
8205+
}
8206+
}
8207+
TraceEvent(sev, "WaitStorageMetricsHandleError")
8208+
.error(e)
8209+
.detail("Keys", keys)
8210+
.detail("Elapsed", now() - startTime)
8211+
.detail("Retries", retryCount);
81958212
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
81968213
cx->invalidateCache(tenantInfo.prefix, keys);
81978214
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskPriority::DataDistribution));

fdbserver/DDShardTracker.actor.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,9 +1426,19 @@ ACTOR Future<Void> trackInitialShards(DataDistributionTracker* self, Reference<I
14261426
wait(yield(TaskPriority::DataDistribution));
14271427
}
14281428

1429+
TraceEvent("TrackInitialShardsComplete", self->distributorId).detail("ShardsTracked", s);
1430+
1431+
state double changeSizesStart = now();
14291432
Future<Void> initialSize = changeSizes(self, KeyRangeRef(allKeys.begin, allKeys.end), 0, "ShardInit");
14301433
self->readyToStart.send(Void());
14311434
wait(initialSize);
1435+
1436+
TraceEvent("TrackInitialShardsMetricsComplete", self->distributorId)
1437+
.detail("ElapsedSeconds", now() - changeSizesStart);
1438+
1439+
// DDInitDone bookends DDInitRunning — marks DD fully operational.
1440+
TraceEvent("DDInitDone", self->distributorId);
1441+
14321442
self->maxShardSizeUpdater = updateMaxShardSize(self->dbSizeEstimate, self->maxShardSize);
14331443

14341444
return Void();

fdbserver/DDTeamCollection.actor.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,9 +1376,17 @@ class DDTeamCollectionImpl {
13761376
try {
13771377
loop {
13781378
state bool isBm = BlobMigratorInterface::isBlobMigrator(server->getLastKnownInterface().id());
1379-
status.isUndesired =
1380-
(!self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get()) || isBm;
1381-
1379+
{
1380+
bool versionLagUndesired =
1381+
!self->disableFailingLaggingServers.get() && server->ssVersionTooFarBehind.get();
1382+
if ((versionLagUndesired || isBm) && !status.isUndesired) {
1383+
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
1384+
.detail("Server", server->getId())
1385+
.detail("Address", server->getLastKnownInterface().address())
1386+
.detail("Reason", isBm ? "BlobMigrator" : "VersionLag");
1387+
}
1388+
status.isUndesired = versionLagUndesired || isBm;
1389+
}
13821390
status.isWrongConfiguration = isBm;
13831391
status.isWiggling = false;
13841392
hasWrongDC = !self->isCorrectDC(*server);
@@ -1414,6 +1422,7 @@ class DDTeamCollectionImpl {
14141422
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
14151423
.detail("Server", server->getId())
14161424
.detail("Address", server->getLastKnownInterface().address())
1425+
.detail("Reason", "SameAddress")
14171426
.detail("OtherServer", i.second->getId())
14181427
.detail("NumShards",
14191428
self->shardsAffectedByTeamFailure->getNumberOfShards(server->getId()))
@@ -1439,6 +1448,8 @@ class DDTeamCollectionImpl {
14391448
if (self->optimalTeamCount > 0) {
14401449
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
14411450
.detail("Server", server->getId())
1451+
.detail("Address", server->getLastKnownInterface().address())
1452+
.detail("Reason", "WrongMachineClass")
14421453
.detail("OptimalTeamCount", self->optimalTeamCount)
14431454
.detail("Fitness", server->getLastKnownClass().machineClassFitness(ProcessClass::Storage));
14441455
status.isUndesired = true;
@@ -1515,9 +1526,16 @@ class DDTeamCollectionImpl {
15151526
}
15161527

15171528
if (worstStatus != DDTeamCollection::Status::NONE) {
1529+
const char* exclusionType = worstStatus == DDTeamCollection::Status::WIGGLING ? "Wiggling"
1530+
: worstStatus == DDTeamCollection::Status::FAILED ? "Failed"
1531+
: worstStatus == DDTeamCollection::Status::EXCLUDED ? "Excluded"
1532+
: "Unknown";
15181533
TraceEvent(SevWarn, "UndesiredStorageServer", self->distributorId)
15191534
.detail("Server", server->getId())
1520-
.detail("Excluded", worstAddr.toString());
1535+
.detail("Address", server->getLastKnownInterface().address())
1536+
.detail("Reason", "Excluded")
1537+
.detail("ExclusionType", exclusionType)
1538+
.detail("ExcludedAddress", worstAddr.toString());
15211539
status.isUndesired = true;
15221540
status.isWrongConfiguration = true;
15231541

fdbserver/DDTxnProcessor.actor.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ class DDTxnProcessorImpl {
261261
CODE_PROBE((bool)skipDDModeCheck, "DD Mode won't prevent read initial data distribution.");
262262
// Get the server list in its own try/catch block since it modifies result. We don't want a subsequent failure
263263
// causing entries to be duplicated
264+
// Phase 1: Single transaction to read server list and all persisted data moves
265+
state double serverListAndDataMoveReadStart = now();
264266
loop {
265267
numDataMoves = 0;
266268
server_dc.clear();
@@ -334,7 +336,12 @@ class DDTxnProcessorImpl {
334336
}
335337
}
336338

339+
state double dataMoveReadStart = now();
337340
RangeResult dms = wait(tr.getRange(dataMoveKeys, CLIENT_KNOBS->TOO_MANY));
341+
if (now() - dataMoveReadStart > 5.0) {
342+
TraceEvent(SevWarn, "DDInitSlowDataMoveRead", distributorId)
343+
.detail("ElapsedSeconds", now() - dataMoveReadStart);
344+
}
338345
ASSERT(!dms.more && dms.size() < CLIENT_KNOBS->TOO_MANY);
339346
// For each data move, find out the src or dst servers are in primary or remote DC.
340347
for (int i = 0; i < dms.size(); ++i) {
@@ -382,6 +389,11 @@ class DDTxnProcessorImpl {
382389

383390
succeeded = true;
384391

392+
TraceEvent("DDInitServerListAndDataMoveReadComplete", distributorId)
393+
.detail("NumDataMoves", numDataMoves)
394+
.detail("NumServers", result->allServers.size())
395+
.detail("ElapsedSeconds", now() - serverListAndDataMoveReadStart);
396+
385397
break;
386398
} catch (Error& e) {
387399
TraceEvent("GetInitialTeamsRetry", distributorId).error(e);
@@ -393,6 +405,10 @@ class DDTxnProcessorImpl {
393405

394406
// If keyServers is too large to read in a single transaction, then we will have to break this process up into
395407
// multiple transactions. In that case, each iteration should begin where the previous left off
408+
// Scan keyServers in batches to build the shard map
409+
state double keyServerScanStart = now();
410+
state double lastScanLogTime = now();
411+
state int scanBatchCount = 0;
396412
while (beginKey < allKeys.end) {
397413
CODE_PROBE(beginKey > allKeys.begin, "Multi-transactional getInitialDataDistribution");
398414
loop {
@@ -479,6 +495,15 @@ class DDTxnProcessorImpl {
479495

480496
ASSERT_GT(keyServers.size(), 0);
481497
beginKey = keyServers.end()[-1].key;
498+
scanBatchCount++;
499+
if (now() - lastScanLogTime >= 30.0) {
500+
lastScanLogTime = now();
501+
TraceEvent("DDInitKeyServerScanProgress", distributorId)
502+
.detail("BeginKey", beginKey)
503+
.detail("Batches", scanBatchCount)
504+
.detail("ShardsScanned", result->shards.size())
505+
.detail("ElapsedSeconds", now() - keyServerScanStart);
506+
}
482507
break;
483508
} catch (Error& e) {
484509
TraceEvent("GetInitialTeamsKeyServersRetry", distributorId).error(e);
@@ -495,6 +520,10 @@ class DDTxnProcessorImpl {
495520
// a dummy shard at the end with no keys or servers makes life easier for trackInitialShards()
496521
result->shards.push_back(DDShardInfo(allKeys.end));
497522

523+
TraceEvent("DDInitKeyServerScanComplete", distributorId)
524+
.detail("NumShards", result->shards.size())
525+
.detail("ElapsedSeconds", now() - keyServerScanStart);
526+
498527
if (SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA && numDataMoves > 0) {
499528
for (int shard = 0; shard < result->shards.size() - 1; ++shard) {
500529
const DDShardInfo& iShard = result->shards[shard];

fdbserver/DataDistribution.actor.cpp

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,31 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
592592
// Initialize the required internal states of DataDistributor from system metadata. It's necessary before
593593
// DataDistributor start working. Doesn't include initialization of optional components, like TenantCache, DDQueue,
594594
// Tracker, TeamCollection. The components should call its own ::init methods.
595+
//
596+
// DD Startup Progress (trace events in order):
597+
// DDInitRunning - DD process recruited and starting init
598+
// DDInitTakingMoveKeysLock - Acquiring move keys lock
599+
// DDInitTookMoveKeysLock - Lock acquired
600+
// DDInitGotConfiguration - Database configuration loaded
601+
// DDInitUpdatedReplicaKeys - Replica keys updated
602+
// DDInitSlowDataMoveRead - (SevWarn) dataMoveKeys read taking >5s
603+
// DDInitServerListAndDataMoveReadComplete - Server list + data moves read: NumDataMoves, NumServers,
604+
// ElapsedSeconds
605+
// DDInitKeyServerScanProgress - (every 30s) keyServer scan: BeginKey, Batches, ShardsScanned
606+
// DDInitKeyServerScanComplete - keyServer scan done: NumShards, ElapsedSeconds
607+
// DDInitGotInitialDD - Init data loaded: NumShards, NumServers
608+
// DDInitDataLoaded - Init data loaded, ElapsedSeconds (does NOT mean DD is fully operational)
609+
//
610+
// After init(), the following startup events fire from other components:
611+
// DDInitResumeDataMovesProgress - (every 30s) data move resume: ValidMoves, CancelledMoves, EmptyMoves
612+
// DDInitResumedDataMoves - Data move resume complete with counts
613+
// TrackInitialShards - Shard tracker setup started with InitialShardCount
614+
// TrackInitialShardsComplete - Shard trackers created: ShardsTracked
615+
// DDTrackerStarting - Teams ready (fires from DDTeamCollection after readyToStart + delay)
616+
// TrackInitialShardsMetricsComplete - All shard metrics received: ElapsedSeconds
617+
// WaitStorageMetricsHandleError may fire (SevWarn after 60s) if a
618+
// shard's metrics read is stuck retrying: Keys, Retries
619+
// DDInitDone - DD is fully operational with all shard sizes loaded
595620
ACTOR static Future<Void> init(Reference<DataDistributor> self) {
596621
loop {
597622
wait(self->waitDataDistributorEnabled());
@@ -650,13 +675,17 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
650675
.detail("E", self->initData->shards.end()[-1].key)
651676
.detail("Src", describe(self->initData->shards.end()[-2].primarySrc))
652677
.detail("Dest", describe(self->initData->shards.end()[-2].primaryDest))
678+
.detail("NumShards", self->initData->shards.size())
679+
.detail("NumServers", self->initData->allServers.size())
653680
.trackLatest(self->initialDDEventHolder->trackingKey);
654681
} else {
655682
TraceEvent("DDInitGotInitialDD", self->ddId)
656683
.detail("B", "")
657684
.detail("E", "")
658685
.detail("Src", "[no items]")
659686
.detail("Dest", "[no items]")
687+
.detail("NumShards", self->initData->shards.size())
688+
.detail("NumServers", self->initData->allServers.size())
660689
.trackLatest(self->initialDDEventHolder->trackingKey);
661690
}
662691

@@ -853,6 +882,11 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
853882
// TODO: unit test needed
854883
ACTOR static Future<Void> resumeFromDataMoves(Reference<DataDistributor> self, Future<Void> readyToStart) {
855884
state KeyRangeMap<std::shared_ptr<DataMove>>::iterator it = self->initData->dataMoveMap.ranges().begin();
885+
state int validMoves = 0;
886+
state int cancelledMoves = 0;
887+
state int emptyMoves = 0;
888+
state double resumeStart = now();
889+
state double lastLogTime = now();
856890

857891
wait(readyToStart);
858892

@@ -861,6 +895,7 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
861895
DataMoveType dataMoveType = getDataMoveTypeFromDataMoveId(meta.id);
862896
if (meta.ranges.empty()) {
863897
TraceEvent(SevInfo, "EmptyDataMoveRange", self->ddId).detail("DataMoveMetaData", meta.toString());
898+
emptyMoves++;
864899
continue;
865900
}
866901
if (meta.bulkLoadTaskState.present()) {
@@ -891,6 +926,7 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
891926
rs.cancelled = true;
892927
self->relocationProducer.send(rs);
893928
TraceEvent("DDInitScheduledCancelDataMove", self->ddId).detail("DataMove", meta.toString());
929+
cancelledMoves++;
894930
} else if (it.value()->valid) {
895931
TraceEvent(SevDebug, "DDInitFoundDataMove", self->ddId).detail("DataMove", meta.toString());
896932
ASSERT(meta.ranges.front() == it.range());
@@ -914,9 +950,24 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
914950
self->shardsAffectedByTeamFailure->moveShard(rs.keys, teams);
915951
self->relocationProducer.send(rs);
916952
wait(yield(TaskPriority::DataDistribution));
953+
validMoves++;
954+
}
955+
if (now() - lastLogTime >= 30.0) {
956+
lastLogTime = now();
957+
TraceEvent("DDInitResumeDataMovesProgress", self->ddId)
958+
.detail("ValidMoves", validMoves)
959+
.detail("CancelledMoves", cancelledMoves)
960+
.detail("EmptyMoves", emptyMoves)
961+
.detail("ElapsedSeconds", now() - resumeStart);
917962
}
918963
}
919964

965+
TraceEvent("DDInitResumedDataMoves", self->ddId)
966+
.detail("ValidMoves", validMoves)
967+
.detail("CancelledMoves", cancelledMoves)
968+
.detail("EmptyMoves", emptyMoves)
969+
.detail("ElapsedSeconds", now() - resumeStart);
970+
920971
// Trigger background cleanup for datamove tombstones
921972
if (!self->txnProcessor->isMocked()) {
922973
self->addActor.send(self->removeDataMoveTombstoneBackground(self));
@@ -2588,6 +2639,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
25882639

25892640
loop {
25902641
self->context->trackerCancelled = false;
2642+
state double ddStartTime = now();
25912643
// whether all initial shard are tracked
25922644
self->initialized = Promise<Void>();
25932645

@@ -2599,7 +2651,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
25992651

26002652
wait(DataDistributor::init(self));
26012653

2602-
TraceEvent(SevInfo, "DataDistributionInitProgress", self->ddId).detail("Phase", "Metadata Initialized");
2654+
TraceEvent("DDInitDataLoaded", self->ddId).detail("ElapsedSeconds", now() - ddStartTime);
26032655

26042656
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
26052657
ASSERT(self->configuration.storageTeamSize > 0);
@@ -2853,6 +2905,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
28532905
TraceEvent(SevWarn, "DataDistributorCancelled");
28542906
}
28552907
shards.clear();
2908+
TraceEvent(SevWarn, "DDExiting", self->ddId).error(e);
28562909
throw e;
28572910
} else {
28582911
wait(shards.clearAsync());
@@ -2864,12 +2917,14 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
28642917
wait(self->removeStorageServer(removeFailedServer.getFuture().get()));
28652918
} else {
28662919
if (err.code() != error_code_movekeys_conflict && err.code() != error_code_dd_config_changed) {
2920+
TraceEvent(SevWarn, "DDExiting", self->ddId).error(err);
28672921
throw err;
28682922
}
28692923

28702924
bool ddEnabled = wait(self->isDataDistributionEnabled());
28712925
TraceEvent("DataDistributionError", self->ddId).error(err).detail("DataDistributionEnabled", ddEnabled);
28722926
if (ddEnabled) {
2927+
TraceEvent(SevWarn, "DDExiting", self->ddId).error(err);
28732928
throw err;
28742929
}
28752930
}
@@ -5112,7 +5167,7 @@ ACTOR Future<Void> dataDistributor_impl(DataDistributorInterface di,
51125167
state std::map<UID, DistributorSnapRequest> ddSnapReqMap;
51135168
state std::map<UID, ErrorOr<Void>> ddSnapReqResultMap;
51145169

5115-
TraceEvent("DataDistributorRunning", di.id()).detail("IsMocked", isMocked);
5170+
TraceEvent("DDInitRunning", di.id()).detail("IsMocked", isMocked);
51165171
self->addActor.send(actors.getResult());
51175172
self->addActor.send(traceRole(Role::DATA_DISTRIBUTOR, di.id()));
51185173
self->addActor.send(waitFailureServer(di.waitFailure.getFuture()));

flow/include/flow/IndexedSet.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,11 @@ struct IndexedSet {
344344
// direction 0 = left, 1 = right
345345
template <int direction>
346346
static void moveIterator(Node const*& node) {
347-
moveIteratorImpl<direction, true>(node);
347+
IndexedSet::template moveIteratorImpl<direction, true>(node);
348348
}
349349
template <int direction>
350350
static void moveIterator(Node*& node) {
351-
moveIteratorImpl<direction, false>(node);
351+
IndexedSet::template moveIteratorImpl<direction, false>(node);
352352
}
353353

354354
public: // but testonly

0 commit comments

Comments
 (0)