Skip to content

Commit c88e9d4

Browse files
author
michael stack
committed
Fix peer disconnect detection and add a knob protected KRM uncoalesced handling
tool. Bug fixes: - Detect peer disconnect in waitValueOrSignal (genericactors.actor.h). Adds a when() clause watching peer->disconnect so dead connections (e.g., from NAT timeouts) are detected immediately instead of hanging indefinitely waiting on a connection the lower layer has already replaced. We saw this in an incident where waiting on a long reply on a network with frequent disconnects; low level fdb would make a new connection but high-level would wait until we timed out on the original. - Add DD_COALESCE_UNCOALESCED_KRM knob to tolerate uncoalesced KRM entries (KeyRangeMap.actor.cpp, MoveKeys.actor.cpp). When enabled, logs a warning and skips instead of crashing with ASSERT on adjacent entries with the same value. Off by default. Tests: - Unit tests for waitValueOrSignal peer disconnect detection - KRMCoalesceTest workload and toml for testing uncoalesced KRM tolerance
1 parent c99365d commit c88e9d4

14 files changed

Lines changed: 527 additions & 12 deletions

fdbclient/KeyRangeMap.actor.cpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "fdbclient/CommitTransaction.h"
2424
#include "fdbclient/FDBTypes.h"
2525
#include "fdbclient/ReadYourWrites.h"
26+
#include "fdbclient/IKnobCollection.h"
2627
#include "flow/UnitTest.h"
2728
#include "flow/actorcompiler.h" // has to be last include
2829

@@ -292,12 +293,27 @@ static Future<Void> krmSetRangeCoalescing_(Transaction* tr,
292293
endValue = existingValue;
293294
}
294295

295-
tr->clear(KeyRangeRef(beginKey, endKey));
296+
// Check for uncoalesced data: if value equals endValue but we're not at maxWithPrefix.end,
297+
// there are redundant entries beyond what we read. With the knob enabled, skip the operation
298+
// entirely to avoid creating inconsistent state. Without knob, ASSERT as before.
299+
if (value == endValue && endKey != maxWithPrefix.end) {
300+
if (IKnobCollection::getGlobalKnobCollection().getServerKnobs().DD_COALESCE_UNCOALESCED_KRM) {
301+
// Log warning and return early - don't modify metadata when uncoalesced entries exist
302+
TraceEvent(SevWarnAlways, "KRMSkippingUncoalescedEntries")
303+
.detail("MapPrefix", mapPrefix)
304+
.detail("BeginKey", beginKey)
305+
.detail("EndKey", endKey)
306+
.detail("MaxEnd", maxWithPrefix.end)
307+
.detail("Value", value);
308+
return Void(); // Skip this operation entirely
309+
} else {
310+
ASSERT(false); // Uncoalesced KRM entries detected; set DD_COALESCE_UNCOALESCED_KRM=true to skip
311+
}
312+
}
296313

297-
ASSERT(value != endValue || endKey == maxWithPrefix.end);
314+
tr->clear(KeyRangeRef(beginKey, endKey));
298315
tr->set(beginKey, value);
299316
tr->set(endKey, endValue);
300-
301317
return Void();
302318
}
303319
Future<Void> krmSetRangeCoalescing(Transaction* const& tr,

fdbclient/RandomKeyValueUtils.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,12 @@ template <typename T>
2525
void printNextN(T generator, int count = 10) {
2626
fmt::print("Generating from .next() on {}\n", generator.toString());
2727
for (int i = 0; i < count; ++i) {
28-
fmt::print(" {}\n", generator.next());
28+
auto next_val = generator.next();
29+
if constexpr (std::is_same_v<decltype(next_val), unsigned int> || std::is_same_v<decltype(next_val), int>) {
30+
fmt::print(" {}\n", next_val);
31+
} else {
32+
fmt::print(" {}\n", next_val.toString());
33+
}
2934
}
3035
fmt::print("\n");
3136
}

fdbclient/ServerKnobs.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
330330
init( DD_VALIDATE_LOCALITY, true ); if( randomize && BUGGIFY ) DD_VALIDATE_LOCALITY = false;
331331
init( DD_CHECK_INVALID_LOCALITY_DELAY, 60 ); if( randomize && BUGGIFY ) DD_CHECK_INVALID_LOCALITY_DELAY = 1 + deterministicRandom()->random01() * 600;
332332
init( DD_ENABLE_VERBOSE_TRACING, true ); if( randomize && BUGGIFY ) DD_ENABLE_VERBOSE_TRACING = false;
333+
init( DD_COALESCE_UNCOALESCED_KRM, false ); // Off by default; enable to auto-fix corrupted metadata
333334
init( DD_SS_FAILURE_VERSIONLAG, 250000000 );
334335
init( DD_SS_ALLOWED_VERSIONLAG, 200000000 ); if( randomize && BUGGIFY ) { DD_SS_FAILURE_VERSIONLAG = deterministicRandom()->randomInt(15000000, 500000000); DD_SS_ALLOWED_VERSIONLAG = 0.75 * DD_SS_FAILURE_VERSIONLAG; }
335336
init( DD_SS_STUCK_TIME_LIMIT, 300.0 ); if( randomize && BUGGIFY ) { DD_SS_STUCK_TIME_LIMIT = 200.0 + deterministicRandom()->random01() * 100.0; }

fdbclient/include/fdbclient/DataDistributionConfig.actor.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ struct DDRangeConfig {
6565
bool operator==(DDRangeConfig const& rhs) const = default;
6666

6767
// String description of the range config
68-
std::string toString() const { return fmt::format("replication={} teamID={}", replicationFactor, teamID); }
68+
std::string toString() const {
69+
return fmt::format("replication={} teamID={}",
70+
replicationFactor.present() ? std::to_string(replicationFactor.get()) : "unset",
71+
teamID.present() ? std::to_string(teamID.get()) : "unset");
72+
}
6973

7074
template <class Ar>
7175
void serialize(Ar& ar) {

fdbclient/include/fdbclient/ServerKnobs.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ class ServerKnobs : public KnobsImpl<ServerKnobs> {
310310
bool DD_VALIDATE_LOCALITY;
311311
int DD_CHECK_INVALID_LOCALITY_DELAY;
312312
bool DD_ENABLE_VERBOSE_TRACING;
313+
bool DD_COALESCE_UNCOALESCED_KRM; // If true, auto-coalesce uncoalesced KRM entries instead of crashing with ASSERT
313314
int64_t
314315
DD_SS_FAILURE_VERSIONLAG; // Allowed SS version lag from the current read version before marking it as failed.
315316
int64_t DD_SS_ALLOWED_VERSIONLAG; // SS will be marked as healthy if it's version lag goes below this value.

fdbrpc/FlowTests.actor.cpp

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "flow/IThreadPool.h"
2828
#include "flow/WriteOnlySet.h"
2929
#include "fdbrpc/fdbrpc.h"
30+
#include "fdbrpc/FlowTransport.h"
3031
#include "flow/IAsyncFile.h"
3132
#include "flow/TLSConfig.actor.h"
3233
#include "flow/actorcompiler.h" // This must be the last #include.
@@ -1615,3 +1616,126 @@ TEST_CASE("/flow/flow/FlowMutex") {
16151616

16161617
return Void();
16171618
}
1619+
1620+
TEST_CASE("/fdbrpc/waitValueOrSignal/peerDisconnect") {
1621+
// Test that waitValueOrSignal detects peer disconnect and returns request_maybe_delivered.
1622+
// This reproduces the bug where DD would hang forever because waitValueOrSignal didn't
1623+
// watch peer->disconnect, so dead connections (e.g., from K8s NAT timeouts) were never detected.
1624+
1625+
// Construct a minimal Peer. We pass nullptr for TransportData since waitValueOrSignal
1626+
// only accesses peer->disconnect and PeerHolder only touches outstandingReplies.
1627+
NetworkAddress fakeAddr = NetworkAddress::parse("1.2.3.4:1234");
1628+
state Reference<Peer> peer = makeReference<Peer>(nullptr, fakeAddr);
1629+
1630+
// Create a value future that never resolves (simulating a stuck RPC to unreachable storage server)
1631+
state Promise<Void> neverReply;
1632+
1633+
// No failure signal either (simulating failure monitor not yet detecting the failure)
1634+
state Endpoint ep;
1635+
1636+
// Call waitValueOrSignal with the peer
1637+
state Future<ErrorOr<Void>> result =
1638+
waitValueOrSignal(neverReply.getFuture(), Never(), ep, ReplyPromise<Void>(), peer);
1639+
1640+
// Result should not be ready yet - the reply hasn't come and disconnect hasn't fired
1641+
ASSERT(!result.isReady());
1642+
1643+
// Simulate peer disconnection (as would happen when connectionReader/connectionKeeper detects failure)
1644+
peer->disconnect.send(Void());
1645+
1646+
// Now waitValueOrSignal should detect the disconnect and return request_maybe_delivered
1647+
ASSERT(result.isReady());
1648+
ErrorOr<Void> r = result.get();
1649+
ASSERT(r.isError());
1650+
ASSERT(r.getError().code() == error_code_request_maybe_delivered);
1651+
1652+
return Void();
1653+
}
1654+
1655+
TEST_CASE("/fdbrpc/waitValueOrSignal/noPeerFallback") {
1656+
// Test that waitValueOrSignal still works correctly when no peer is provided (the default).
1657+
// The broken_promise path should still be handled: when the reply promise breaks,
1658+
// the endpoint should be marked as not found and value set to Never().
1659+
1660+
state Promise<Void> reply;
1661+
1662+
// Call waitValueOrSignal without a peer (default behavior)
1663+
state Future<ErrorOr<Void>> result = waitValueOrSignal(reply.getFuture(), Never(), Endpoint());
1664+
1665+
ASSERT(!result.isReady());
1666+
1667+
// Break the promise (simulating endpoint failure)
1668+
reply.sendError(broken_promise());
1669+
1670+
// waitValueOrSignal should handle broken_promise by setting value = Never() and looping.
1671+
// Since signal is Never() and there's no peer disconnect, it should now wait forever.
1672+
// We verify it doesn't crash and the result is NOT ready (it's stuck in the loop).
1673+
wait(delay(0.1));
1674+
ASSERT(!result.isReady());
1675+
1676+
return Void();
1677+
}
1678+
1679+
TEST_CASE("/fdbrpc/waitValueOrSignal/retryOnDisconnect") {
1680+
// End-to-end test of the retry pattern that loadBalance uses:
1681+
// 1. First attempt: RPC to peer1, peer1 disconnects → request_maybe_delivered
1682+
// 2. Second attempt: RPC to peer2, peer2 responds → success
1683+
// 3. Verify numAttempts > 1
1684+
//
1685+
// This reproduces the exact scenario in production: DD sends waitMetrics to a storage
1686+
// server, the K8s NAT kills the connection at ~3 minutes, and the fix ensures the
1687+
// request_maybe_delivered error propagates so loadBalance can retry on another server.
1688+
1689+
NetworkAddress addr1 = NetworkAddress::parse("1.2.3.4:1234");
1690+
NetworkAddress addr2 = NetworkAddress::parse("1.2.3.5:1234");
1691+
state Reference<Peer> peer1 = makeReference<Peer>(nullptr, addr1);
1692+
state Reference<Peer> peer2 = makeReference<Peer>(nullptr, addr2);
1693+
1694+
state int numAttempts = 0;
1695+
1696+
// --- Attempt 1: peer1 disconnects mid-request (simulating K8s NAT timeout) ---
1697+
{
1698+
state Promise<Void> reply1;
1699+
state Endpoint ep1;
1700+
1701+
state Future<ErrorOr<Void>> result1 =
1702+
waitValueOrSignal(reply1.getFuture(), Never(), ep1, ReplyPromise<Void>(), peer1);
1703+
1704+
numAttempts++;
1705+
ASSERT(!result1.isReady());
1706+
1707+
// Connection killed by external factor (K8s NAT, network partition, etc.)
1708+
peer1->disconnect.send(Void());
1709+
1710+
// waitValueOrSignal must detect the disconnect and return request_maybe_delivered
1711+
ASSERT(result1.isReady());
1712+
ErrorOr<Void> r1 = result1.get();
1713+
ASSERT(r1.isError());
1714+
ASSERT(r1.getError().code() == error_code_request_maybe_delivered);
1715+
}
1716+
1717+
// --- Attempt 2: retry to peer2, which responds successfully ---
1718+
// This is what loadBalance does: on request_maybe_delivered, pick next alternative and retry
1719+
{
1720+
state Promise<Void> reply2;
1721+
state Endpoint ep2;
1722+
1723+
state Future<ErrorOr<Void>> result2 =
1724+
waitValueOrSignal(reply2.getFuture(), Never(), ep2, ReplyPromise<Void>(), peer2);
1725+
1726+
numAttempts++;
1727+
1728+
// Second server is healthy and responds
1729+
reply2.send(Void());
1730+
1731+
ASSERT(result2.isReady());
1732+
ErrorOr<Void> r2 = result2.get();
1733+
ASSERT(r2.present()); // Success!
1734+
}
1735+
1736+
// The critical assertion: retries happened (numAttempts > 1)
1737+
ASSERT_GE(numAttempts, 2);
1738+
TraceEvent("WaitValueOrSignalRetryTest").detail("NumAttempts", numAttempts);
1739+
1740+
return Void();
1741+
}

fdbrpc/IPAllowList.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@ namespace {
3232

3333
template <std::size_t C>
3434
std::string binRep(std::array<unsigned char, C> const& addr) {
35-
return fmt::format("{:02x}", fmt::join(addr, ":"));
35+
std::string result;
36+
for (size_t i = 0; i < addr.size(); ++i) {
37+
if (i > 0)
38+
result += ":";
39+
result += fmt::format("{:02x}", addr[i]);
40+
}
41+
return result;
3642
}
3743

3844
template <std::size_t C>

fdbrpc/TokenCache.actor.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,11 +300,17 @@ bool TokenCacheImpl::validate(TenantId tenantId, StringRef token) {
300300
}
301301
if (!tenantFound) {
302302
CODE_PROBE(true, "Valid token doesn't reference tenant");
303+
std::string tenantsStr;
304+
for (int i = 0; i < entry->tenants.size(); ++i) {
305+
if (i > 0)
306+
tenantsStr += " ";
307+
tenantsStr += fmt::format("{:#x}", entry->tenants[i]);
308+
}
303309
TraceEvent(SevWarn, "InvalidToken"_audit)
304310
.detail("From", peer)
305311
.detail("Reason", "TenantTokenMismatch")
306312
.detail("RequestedTenant", fmt::format("{:#x}", tenantId))
307-
.detail("TenantsInToken", fmt::format("{:#x}", fmt::join(entry->tenants, " ")));
313+
.detail("TenantsInToken", tenantsStr);
308314
return false;
309315
}
310316
// audit logging

fdbrpc/include/fdbrpc/genericactors.actor.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,13 @@ Future<ErrorOr<X>> waitValueOrSignal(Future<X> value,
377377
? unauthorized_attempt()
378378
: request_maybe_delivered());
379379
}
380+
when(wait(peer.isValid() ? peer->disconnect.getFuture() : Never())) {
381+
CODE_PROBE(true, "waitValueOrSignal detected peer disconnect");
382+
TraceEvent("WaitValueOrSignalPeerDisconnect")
383+
.detail("Endpoint", endpoint.getPrimaryAddress())
384+
.detail("Token", endpoint.token);
385+
return ErrorOr<X>(request_maybe_delivered());
386+
}
380387
}
381388
} catch (Error& e) {
382389
if (signal.isError()) {

fdbserver/MoveKeys.actor.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,24 @@ ACTOR Future<Void> unassignServerKeys(Transaction* tr, UID ssId, KeyRange range,
154154

155155
tr->clear(KeyRangeRef(beginKey, endKey));
156156

157+
// Write entries in pairs, checking for uncoalesced entries (adjacent with same value).
158+
// Original loop wrote pairs: kvs[i] and kvs[i+1] for each i < size-1.
159+
// We maintain that behavior but add uncoalesced detection.
157160
for (int i = 0; i < kvs.size() - 1; ++i) {
158-
ASSERT(kvs[i].value != kvs[i + 1].value || kvs[i + 1].key.removePrefix(mapPrefix) == allKeys.end);
161+
// Check for uncoalesced entry (adjacent entries with same value, not at end boundary)
162+
bool atEnd = kvs[i + 1].key.removePrefix(mapPrefix) == allKeys.end;
163+
if (kvs[i].value == kvs[i + 1].value && !atEnd) {
164+
if (SERVER_KNOBS->DD_COALESCE_UNCOALESCED_KRM) {
165+
// Log and continue - the entries will still be written but we don't crash
166+
TraceEvent(SevWarnAlways, "MoveKeysUncoalescedDetected", logId)
167+
.detail("SSID", ssId)
168+
.detail("Key1", kvs[i].key)
169+
.detail("Key2", kvs[i + 1].key)
170+
.detail("Value", kvs[i].value);
171+
} else {
172+
ASSERT(false); // Uncoalesced KRM entries detected; set DD_COALESCE_UNCOALESCED_KRM=true to skip
173+
}
174+
}
159175
tr->set(kvs[i].key, kvs[i].value);
160176
tr->set(kvs[i + 1].key, kvs[i + 1].value);
161177
}
@@ -2796,6 +2812,7 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
27962812
.detail("End", currentKeys.end);
27972813
wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverID), currentKeys, allKeys, serverKeysFalse));
27982814
wait(tr.commit());
2815+
27992816
TraceEvent(SevDebug, "FailedServerCommitSuccess", serverID)
28002817
.detail("Begin", currentKeys.begin)
28012818
.detail("End", currentKeys.end)

0 commit comments

Comments
 (0)