From 720af66a77d32f7d875345a61bac1f6893ecb6a6 Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:28:13 +0800 Subject: [PATCH 1/4] Fix Sim2Conn read hanging after peer close When a peer closed a simulated connection, the receiver coroutine would co_await Never() after stopReceive fired, leaving readers with no way to discover the connection was done. Add an incomingClosed AsyncVar that is set after all in-flight bytes have been delivered, so read() and whenReadable() can surface connection_failed instead of blocking forever. Care is taken to switch to self->process before setting incomingClosed, since stopReceive's delay may fire on the closing peer's process. --- fdbrpc/sim2.cpp | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/fdbrpc/sim2.cpp b/fdbrpc/sim2.cpp index 516ac1b2e1d..f0601f9f71d 100644 --- a/fdbrpc/sim2.cpp +++ b/fdbrpc/sim2.cpp @@ -302,7 +302,7 @@ SimClogging g_clogging; struct Sim2Conn final : IConnection, ReferenceCounted { explicit Sim2Conn(ISimulator::ProcessInfo* process) : opened(false), closedByCaller(false), stableConnection(false), trustedPeer(true), process(process), - dbgid(deterministicRandom()->randomUniqueID()), stopReceive(Never()) { + dbgid(deterministicRandom()->randomUniqueID()), stopReceive(Never()), incomingClosed(false) { pipes = sender(this) && receiver(this); } @@ -378,6 +378,9 @@ struct Sim2Conn final : IConnection, ReferenceCounted { rollRandomClose(); int64_t avail = receivedBytes.get() - readBytes.get(); // SOMEDAY: random? + if (avail == 0 && incomingClosed.get()) { + throw connection_failed(); + } int toRead = std::min(end - begin, avail); ASSERT(toRead >= 0 && toRead <= recvBuf.size() && toRead <= end - begin); for (int i = 0; i < toRead; i++) @@ -450,6 +453,8 @@ struct Sim2Conn final : IConnection, ReferenceCounted { Future pipes; Future stopReceive; + // Becomes true after the peer has closed and no more bytes can be delivered to recvBuf. + AsyncVar incomingClosed; int availableSendBufferForPeer() const { return sendBufSize - (writtenBytes.get() - receivedBytes.get()); @@ -476,8 +481,24 @@ struct Sim2Conn final : IConnection, ReferenceCounted { loop { if (self->sentBytes.get() != self->receivedBytes.get()) co_await g_simulator->onProcess(self->peerProcess); - while (self->sentBytes.get() == self->receivedBytes.get()) - co_await self->sentBytes.onChange(); + while (self->sentBytes.get() == self->receivedBytes.get()) { + if (self->stopReceive.isReady()) { + // stopReceive can become ready on the peer-closing process, but incomingClosed must notify readers + // on the owning receiver process. + co_await g_simulator->onProcess(self->process); + if (self->stopReceive.isReady() && self->sentBytes.get() == self->receivedBytes.get()) { + self->incomingClosed.set(true); + co_return; + } + co_await g_simulator->onProcess(self->peerProcess); + continue; + } + co_await (self->sentBytes.onChange() || self->stopReceive); + if (g_simulator->getCurrentProcess() != self->peerProcess) { + // sentBytes notifications arrive on peerProcess, but stopReceive can resume us on the closer. + co_await g_simulator->onProcess(self->peerProcess); + } + } ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); // Simulated network disconnection. Make sure to only throw connection_failed() on the sender process. @@ -500,10 +521,13 @@ struct Sim2Conn final : IConnection, ReferenceCounted { co_await delay(g_clogging.getRecvDelay( self->peerProcess->address, self->process->address, self->isStableConnection())); ASSERT(g_simulator->getCurrentProcess() == self->process); - if (self->stopReceive.isReady()) { - co_await Future(Never()); - } self->receivedBytes.set(pos); + // A peer close only prevents future bytes from arriving. If stopReceive becomes ready after sentBytes moved + // forward, keep the connection readable until we've delivered through sentBytes. + if (self->stopReceive.isReady() && self->receivedBytes.get() == self->sentBytes.get()) { + self->incomingClosed.set(true); + co_return; + } co_await Future(Void()); // Prior notification can delete self and cancel this actor ASSERT(g_simulator->getCurrentProcess() == self->process); } @@ -515,7 +539,10 @@ struct Sim2Conn final : IConnection, ReferenceCounted { ASSERT(g_simulator->getCurrentProcess() == self->process); co_return; } - co_await self->receivedBytes.onChange(); + if (self->incomingClosed.get()) { + throw connection_failed(); + } + co_await (self->receivedBytes.onChange() || self->incomingClosed.onChange()); self->rollRandomClose(); } } catch (Error& e) { From 546a81241d4f0c81dc6d9187b615a539645fd455 Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Thu, 2 Apr 2026 14:41:52 +0800 Subject: [PATCH 2/4] Preserve Sim2Conn task priority across process hops --- fdbrpc/sim2.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/fdbrpc/sim2.cpp b/fdbrpc/sim2.cpp index f0601f9f71d..9cc3d512b4c 100644 --- a/fdbrpc/sim2.cpp +++ b/fdbrpc/sim2.cpp @@ -479,24 +479,25 @@ struct Sim2Conn final : IConnection, ReferenceCounted { } static Future receiver(Sim2Conn* self) { loop { + TaskPriority currentTaskID = g_network->getCurrentTask(); if (self->sentBytes.get() != self->receivedBytes.get()) - co_await g_simulator->onProcess(self->peerProcess); + co_await g_simulator->onProcess(self->peerProcess, currentTaskID); while (self->sentBytes.get() == self->receivedBytes.get()) { if (self->stopReceive.isReady()) { // stopReceive can become ready on the peer-closing process, but incomingClosed must notify readers // on the owning receiver process. - co_await g_simulator->onProcess(self->process); + co_await g_simulator->onProcess(self->process, currentTaskID); if (self->stopReceive.isReady() && self->sentBytes.get() == self->receivedBytes.get()) { self->incomingClosed.set(true); co_return; } - co_await g_simulator->onProcess(self->peerProcess); + co_await g_simulator->onProcess(self->peerProcess, currentTaskID); continue; } co_await (self->sentBytes.onChange() || self->stopReceive); if (g_simulator->getCurrentProcess() != self->peerProcess) { // sentBytes notifications arrive on peerProcess, but stopReceive can resume us on the closer. - co_await g_simulator->onProcess(self->peerProcess); + co_await g_simulator->onProcess(self->peerProcess, currentTaskID); } } ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); @@ -516,7 +517,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted { : deterministicRandom()->randomInt64(self->receivedBytes.get(), self->sentBytes.get() + 1); co_await delay(g_clogging.getSendDelay( self->peerProcess->address, self->process->address, self->isStableConnection())); - co_await g_simulator->onProcess(self->process); + co_await g_simulator->onProcess(self->process, currentTaskID); ASSERT(g_simulator->getCurrentProcess() == self->process); co_await delay(g_clogging.getRecvDelay( self->peerProcess->address, self->process->address, self->isStableConnection())); From feaddc52c3128d1b575097f09abc44ad8a9d57b3 Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:13:31 +0800 Subject: [PATCH 3/4] Fix Sim2Conn close and termination handling --- fdbrpc/include/fdbrpc/SimulatorProcessInfo.h | 4 + fdbrpc/sim2.cpp | 307 ++++++++++++++----- 2 files changed, 238 insertions(+), 73 deletions(-) diff --git a/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h b/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h index 49cbec09e4d..874805301da 100644 --- a/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h +++ b/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h @@ -90,6 +90,9 @@ struct ProcessInfo : NonCopyable { } Future onShutdown() { return shutdownSignal.getFuture(); } + // Fires for both hard kills and reboot-style shutdowns. Connection code cannot rely on onShutdown(), because + // KillInstantly sets failed without sending shutdownSignal. + Future onTerminated() { return terminatedSignal.getFuture(); } bool isSpawnedKVProcess() const { // SOMEDAY: use a separate bool may be better? @@ -167,6 +170,7 @@ struct ProcessInfo : NonCopyable { // Members not for external use Promise shutdownSignal; + Promise terminatedSignal; }; } // namespace simulator diff --git a/fdbrpc/sim2.cpp b/fdbrpc/sim2.cpp index 9cc3d512b4c..9678964b335 100644 --- a/fdbrpc/sim2.cpp +++ b/fdbrpc/sim2.cpp @@ -302,9 +302,8 @@ SimClogging g_clogging; struct Sim2Conn final : IConnection, ReferenceCounted { explicit Sim2Conn(ISimulator::ProcessInfo* process) : opened(false), closedByCaller(false), stableConnection(false), trustedPeer(true), process(process), - dbgid(deterministicRandom()->randomUniqueID()), stopReceive(Never()), incomingClosed(false) { - pipes = sender(this) && receiver(this); - } + peerProcess(nullptr), dbgid(deterministicRandom()->randomUniqueID()), stopReceive(false), + incomingClosed(false) {} // connect() is called on a pair of connections immediately after creation; logically it is part of the constructor // and no other method may be called previously! @@ -344,6 +343,10 @@ struct Sim2Conn final : IConnection, ReferenceCounted { .detail("SendBufSize", sendBufSize) .detail("Latency", latency) .detail("StableConnection", stableConnection); + + // sender()/receiver() dereference peer-side state immediately, so they must not run until connect() has + // published the peer metadata. + pipes = sender(this) && receiver(this); } ~Sim2Conn() { ASSERT_ABORT(!opened || closedByCaller); } @@ -361,7 +364,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted { Future onWritable() override { return whenWritable(this); } Future onReadable() override { return whenReadable(this); } - bool isPeerGone() const { return !peer || peerProcess->failed; } + bool isPeerGone() const { return !peer || processTerminating(peerProcess); } bool hasTrustedPeer() const override { return trustedPeer; } @@ -369,7 +372,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted { void peerClosed() { leakedConnectionTracker = trackLeakedConnection(this); - stopReceive = delay(1.0); + armStopReceive(); } // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might @@ -378,6 +381,10 @@ struct Sim2Conn final : IConnection, ReferenceCounted { rollRandomClose(); int64_t avail = receivedBytes.get() - readBytes.get(); // SOMEDAY: random? + if (avail == 0 && !incomingClosed.get() && processTerminating(peerProcess) && + sentBytes.get() == receivedBytes.get()) { + setIncomingClosed(); + } if (avail == 0 && incomingClosed.get()) { throw connection_failed(); } @@ -395,6 +402,10 @@ struct Sim2Conn final : IConnection, ReferenceCounted { int write(SendBuffer const* buffer, int limit) override { rollRandomClose(); ASSERT(limit > 0); + Reference targetPeer = peer; + if (writeClosed(targetPeer)) { + throw connection_failed(); + } int toSend = 0; if (BUGGIFY && !stableConnection) { @@ -413,19 +424,18 @@ struct Sim2Conn final : IConnection, ReferenceCounted { if (BUGGIFY && !stableConnection) toSend = std::min(toSend, deterministicRandom()->randomInt(0, 1000)); - if (!peer) - return toSend; - toSend = std::min(toSend, peer->availableSendBufferForPeer()); + toSend = std::min(toSend, targetPeer->availableSendBufferForPeer()); ASSERT(toSend >= 0); int leftToSend = toSend; for (auto p = buffer; p && leftToSend > 0; p = p->next) { int ts = std::min(leftToSend, p->bytes_written - p->bytes_sent); - peer->recvBuf.insert(peer->recvBuf.end(), p->data() + p->bytes_sent, p->data() + p->bytes_sent + ts); + targetPeer->recvBuf.insert( + targetPeer->recvBuf.end(), p->data() + p->bytes_sent, p->data() + p->bytes_sent + ts); leftToSend -= ts; } ASSERT(leftToSend == 0); - peer->writtenBytes.set(peer->writtenBytes.get() + toSend); + targetPeer->writtenBytes.set(targetPeer->writtenBytes.get() + toSend); return toSend; } @@ -450,20 +460,69 @@ struct Sim2Conn final : IConnection, ReferenceCounted { int sendBufSize; Future leakedConnectionTracker; - - Future pipes; - Future stopReceive; + AsyncVar stopReceive; + bool stopReceiveArmed = false; // Becomes true after the peer has closed and no more bytes can be delivered to recvBuf. AsyncVar incomingClosed; + // Supplementary wakeup source for sender()/receiver()/whenWritable(). + // Treat this as a level-triggered helper only: trigger() can be lost, so every waiter must first check the + // persistent state it cares about (peer, stopReceive, writtenBytes, sentBytes, receivedBytes) before awaiting + // it. + AsyncTrigger connWakeup; + // Must be destroyed before the AsyncVars it mutates or awaits on. + Future stopReceiveTask; + Future pipes; int availableSendBufferForPeer() const { return sendBufSize - (writtenBytes.get() - receivedBytes.get()); } // SOMEDAY: acknowledgedBytes instead of receivedBytes + static bool processTerminating(const ISimulator::ProcessInfo* process) { + return process->failed || process->rebooting || process->shutdownSignal.isSet() || + process->terminatedSignal.isSet(); + } + + bool writeClosed(const Reference& targetPeer) const { + if (!targetPeer) { + return true; + } + // Once the peer has started shutting down, further writes can no longer be delivered reliably. + return g_clogging.disconnected(process->address.ip, targetPeer->process->address.ip) || + processTerminating(targetPeer->process) || targetPeer->stopReceiveArmed || + targetPeer->incomingClosed.get() || targetPeer->pipes.isReady(); + } + + static Future triggerStopReceive(Sim2Conn* self) { + // armStopReceive() can be called from either endpoint's simulator process. The resumed waiters must therefore + // re-check and, if needed, hop back to their owning process before mutating connection state. + co_await delay(1.0); + self->stopReceive.set(true); + self->connWakeup.trigger(); + } + + void armStopReceive() { + if (!stopReceiveArmed) { + stopReceiveArmed = true; + stopReceiveTask = triggerStopReceive(this); + connWakeup.trigger(); + } + } + + void setIncomingClosed() { + ASSERT(g_simulator->getCurrentProcess() == process); + if (!incomingClosed.get()) { + TraceEvent("Sim2IncomingClosed") + .detail("Receiver", process->address) + .detail("Peer", peerProcess->address) + .detail("Connection", dbgid); + incomingClosed.set(true); + } + } + void closeInternal() { if (peer) { peer->peerClosed(); - stopReceive = delay(1.0); + armStopReceive(); } leakedConnectionTracker.cancel(); peer.clear(); @@ -471,71 +530,154 @@ struct Sim2Conn final : IConnection, ReferenceCounted { static Future sender(Sim2Conn* self) { loop { - co_await self->writtenBytes.onChange(); // takes place on peer! + TaskPriority currentTaskID = g_network->getCurrentTask(); + while (self->writtenBytes.get() == self->sentBytes.get()) { + if (self->stopReceive.get() || processTerminating(self->peerProcess)) { + co_return; + } + co_await ((self->writtenBytes.onChange() || self->connWakeup.onTrigger()) || + self->peerProcess->onTerminated()); + if (processTerminating(self->peerProcess)) { + co_return; + } + if (g_simulator->getCurrentProcess() != self->peerProcess) { + // stopReceive can wake us on the closer's process, but sentBytes must advance on the sender side. + co_await (g_simulator->onProcess(self->peerProcess, currentTaskID) || + self->peerProcess->onTerminated()); + if (processTerminating(self->peerProcess)) { + co_return; + } + } + } + ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); + co_await (delay(.002 * deterministicRandom()->random01(), currentTaskID) || + self->peerProcess->onTerminated()); + if (processTerminating(self->peerProcess)) { + co_return; + } ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); - co_await delay(.002 * deterministicRandom()->random01()); self->sentBytes.set(self->writtenBytes.get()); // or possibly just some sometimes... + self->connWakeup.trigger(); } } static Future receiver(Sim2Conn* self) { - loop { - TaskPriority currentTaskID = g_network->getCurrentTask(); - if (self->sentBytes.get() != self->receivedBytes.get()) - co_await g_simulator->onProcess(self->peerProcess, currentTaskID); - while (self->sentBytes.get() == self->receivedBytes.get()) { - if (self->stopReceive.isReady()) { - // stopReceive can become ready on the peer-closing process, but incomingClosed must notify readers - // on the owning receiver process. - co_await g_simulator->onProcess(self->process, currentTaskID); - if (self->stopReceive.isReady() && self->sentBytes.get() == self->receivedBytes.get()) { - self->incomingClosed.set(true); + Optional err; + try { + loop { + TaskPriority currentTaskID = g_network->getCurrentTask(); + if (self->sentBytes.get() != self->receivedBytes.get() && !processTerminating(self->peerProcess)) { + co_await (g_simulator->onProcess(self->peerProcess, currentTaskID) || + self->peerProcess->onTerminated()); + } + while (self->sentBytes.get() == self->receivedBytes.get()) { + if (self->stopReceive.get() || processTerminating(self->peerProcess)) { + // stopReceive can become ready on the peer-closing process, but incomingClosed must notify + // readers on the owning receiver process. + if (g_simulator->getCurrentProcess() != self->process) { + co_await g_simulator->onProcess(self->process, currentTaskID); + } + if ((self->stopReceive.get() || processTerminating(self->peerProcess)) && + self->sentBytes.get() == self->receivedBytes.get()) { + self->setIncomingClosed(); + co_return; + } + if (!processTerminating(self->peerProcess)) { + co_await (g_simulator->onProcess(self->peerProcess, currentTaskID) || + self->peerProcess->onTerminated()); + } + if (processTerminating(self->peerProcess)) { + break; + } + continue; + } + co_await (self->connWakeup.onTrigger() || self->peerProcess->onTerminated()); + if (g_simulator->getCurrentProcess() != self->peerProcess) { + // sentBytes notifications arrive on peerProcess, but stopReceive can resume us on the closer. + if (processTerminating(self->peerProcess)) { + break; + } + co_await (g_simulator->onProcess(self->peerProcess, currentTaskID) || + self->peerProcess->onTerminated()); + if (processTerminating(self->peerProcess)) { + break; + } + } + } + if (processTerminating(self->peerProcess)) { + if (g_simulator->getCurrentProcess() != self->process) { + co_await g_simulator->onProcess(self->process, currentTaskID); + } + if (self->sentBytes.get() == self->receivedBytes.get()) { + self->setIncomingClosed(); co_return; } - co_await g_simulator->onProcess(self->peerProcess, currentTaskID); - continue; + } else { + ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); } - co_await (self->sentBytes.onChange() || self->stopReceive); - if (g_simulator->getCurrentProcess() != self->peerProcess) { - // sentBytes notifications arrive on peerProcess, but stopReceive can resume us on the closer. - co_await g_simulator->onProcess(self->peerProcess, currentTaskID); + + // Simulated network disconnection. Make sure to only throw connection_failed() on the sender process. + if (!processTerminating(self->peerProcess) && + g_clogging.disconnected(self->peerProcess->address.ip, self->process->address.ip)) { + TraceEvent("SimulatedDisconnection") + .detail("Phase", "DataTransfer") + .detail("Sender", self->peerProcess->address) + .detail("Receiver", self->process->address); + throw connection_failed(); } - } - ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); - // Simulated network disconnection. Make sure to only throw connection_failed() on the sender process. - if (g_clogging.disconnected(self->peerProcess->address.ip, self->process->address.ip)) { - TraceEvent("SimulatedDisconnection") - .detail("Phase", "DataTransfer") - .detail("Sender", self->peerProcess->address) - .detail("Receiver", self->process->address); - throw connection_failed(); + int64_t pos = + deterministicRandom()->random01() < .5 + ? self->sentBytes.get() + : deterministicRandom()->randomInt64(self->receivedBytes.get(), self->sentBytes.get() + 1); + double sendDelay = g_clogging.getSendDelay( + self->peerProcess->address, self->process->address, self->isStableConnection()); + double sendDelayEnd = now() + sendDelay; + co_await (delay(sendDelay, currentTaskID) || self->peerProcess->onTerminated()); + // Once bytes have advanced into sentBytes they are already "on the wire". Preserve the normal + // sender-side delay while the peer is alive, but if the sender dies mid-flight, finish only the + // remaining network delay on the receiver side so already-sent bytes do not get stranded behind a dead + // process. + double remainingSendDelay = std::max(0.0, sendDelayEnd - now()); + if (g_simulator->getCurrentProcess() != self->process) { + co_await g_simulator->onProcess(self->process, currentTaskID); + } + if (remainingSendDelay > ISimulator::TIME_EPS) { + co_await delay(remainingSendDelay); + } + ASSERT(g_simulator->getCurrentProcess() == self->process); + co_await delay(g_clogging.getRecvDelay( + self->peerProcess->address, self->process->address, self->isStableConnection())); + ASSERT(g_simulator->getCurrentProcess() == self->process); + // A peer close only prevents future bytes from arriving. If stopReceive becomes ready after sentBytes + // moved forward, keep the connection readable until we've delivered through sentBytes. + bool shouldCloseAfterDelivery = + (self->stopReceive.get() || processTerminating(self->peerProcess)) && pos == self->sentBytes.get(); + self->receivedBytes.set(pos); + if (shouldCloseAfterDelivery) { + self->setIncomingClosed(); + co_return; + } + co_await Future(Void()); // Prior notification can delete self and cancel this actor. + ASSERT(g_simulator->getCurrentProcess() == self->process); } - - int64_t pos = - deterministicRandom()->random01() < .5 - ? self->sentBytes.get() - : deterministicRandom()->randomInt64(self->receivedBytes.get(), self->sentBytes.get() + 1); - co_await delay(g_clogging.getSendDelay( - self->peerProcess->address, self->process->address, self->isStableConnection())); - co_await g_simulator->onProcess(self->process, currentTaskID); - ASSERT(g_simulator->getCurrentProcess() == self->process); - co_await delay(g_clogging.getRecvDelay( - self->peerProcess->address, self->process->address, self->isStableConnection())); - ASSERT(g_simulator->getCurrentProcess() == self->process); - self->receivedBytes.set(pos); - // A peer close only prevents future bytes from arriving. If stopReceive becomes ready after sentBytes moved - // forward, keep the connection readable until we've delivered through sentBytes. - if (self->stopReceive.isReady() && self->receivedBytes.get() == self->sentBytes.get()) { - self->incomingClosed.set(true); - co_return; + } catch (Error& e) { + err = e; + } + if (err.present()) { + if (err.get().code() == error_code_connection_failed) { + TaskPriority currentTaskID = g_network->getCurrentTask(); + if (g_simulator->getCurrentProcess() != self->process) { + co_await g_simulator->onProcess(self->process, currentTaskID); + } + self->setIncomingClosed(); } - co_await Future(Void()); // Prior notification can delete self and cancel this actor - ASSERT(g_simulator->getCurrentProcess() == self->process); + throw err.get(); } } static Future whenReadable(Sim2Conn* self) { try { loop { + TaskPriority currentTaskID = g_network->getCurrentTask(); if (self->readBytes.get() != self->receivedBytes.get()) { ASSERT(g_simulator->getCurrentProcess() == self->process); co_return; @@ -543,7 +685,19 @@ struct Sim2Conn final : IConnection, ReferenceCounted { if (self->incomingClosed.get()) { throw connection_failed(); } - co_await (self->receivedBytes.onChange() || self->incomingClosed.onChange()); + if (processTerminating(self->peerProcess) && self->sentBytes.get() == self->receivedBytes.get()) { + self->setIncomingClosed(); + throw connection_failed(); + } + if (!processTerminating(self->peerProcess)) { + co_await ((self->receivedBytes.onChange() || self->incomingClosed.onChange()) || + self->peerProcess->onTerminated()); + } else { + co_await (self->receivedBytes.onChange() || self->incomingClosed.onChange()); + } + if (g_simulator->getCurrentProcess() != self->process) { + co_await g_simulator->onProcess(self->process, currentTaskID); + } self->rollRandomClose(); } } catch (Error& e) { @@ -554,20 +708,21 @@ struct Sim2Conn final : IConnection, ReferenceCounted { static Future whenWritable(Sim2Conn* self) { try { loop { - if (!self->peer) - co_return; - if (self->peer->availableSendBufferForPeer() > 0) { + TaskPriority currentTaskID = g_network->getCurrentTask(); + Reference targetPeer = self->peer; + if (self->writeClosed(targetPeer)) { + throw connection_failed(); + } + if (targetPeer->availableSendBufferForPeer() > 0) { ASSERT(g_simulator->getCurrentProcess() == self->process); co_return; } - try { - co_await self->peer->receivedBytes.onChange(); - ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); - } catch (Error& e) { - if (e.code() != error_code_broken_promise) - throw; + co_await (((targetPeer->receivedBytes.onChange() || targetPeer->incomingClosed.onChange()) || + targetPeer->connWakeup.onTrigger()) || + targetPeer->process->onTerminated()); + if (g_simulator->getCurrentProcess() != self->process) { + co_await g_simulator->onProcess(self->process, currentTaskID); } - co_await g_simulator->onProcess(self->process); } } catch (Error& e) { ASSERT(g_simulator->getCurrentProcess() == self->process); @@ -1864,6 +2019,9 @@ class Sim2 final : public ISimulator, public INetworkConnections { if (!machine->isSpawnedKVProcess()) latestEventCache.clear(); machine->failed = true; + if (!machine->terminatedSignal.isSet()) { + machine->terminatedSignal.send(Void()); + } } else if (kt == KillType::InjectFaults) { TraceEvent(SevWarn, "FaultMachine") .detail("Name", machine->name) @@ -2928,6 +3086,9 @@ Future doReboot(Uncancellable, ISimulator::ProcessInfo* p, ISimulator::Kil .detail("Cleared", p->cleared) .backtrace(); p->rebooting = true; + if (!p->terminatedSignal.isSet()) { + p->terminatedSignal.send(Void()); + } if ((kt == ISimulator::KillType::RebootAndDelete) || (kt == ISimulator::KillType::RebootProcessAndDelete)) { p->cleared = true; g_simulator->clearAddress(p->address); From 6a37bd5a9b5dd49776baa37fd0d803da16bae5c2 Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Wed, 8 Apr 2026 11:56:25 +0800 Subject: [PATCH 4/4] Fix simulator connection and HTTP response lifetime races Rewrite Sim2Conn termination-sensitive waits and process hops with race-based helpers, keep in-flight bytes deliverable after peer death, and clarify shutdown semantics in the receiver/readable paths. Also make the simulator HTTP response mutex ref-counted so callback actors cannot outlive a stack-allocated FlowMutex when a connection closes early. --- fdbrpc/HTTPServer.cpp | 13 ++- fdbrpc/sim2.cpp | 178 +++++++++++++++++++++++++++++------------- 2 files changed, 132 insertions(+), 59 deletions(-) diff --git a/fdbrpc/HTTPServer.cpp b/fdbrpc/HTTPServer.cpp index 68e877bbf4a..31d19718226 100644 --- a/fdbrpc/HTTPServer.cpp +++ b/fdbrpc/HTTPServer.cpp @@ -23,11 +23,16 @@ #include "flow/Trace.h" #include "fdbrpc/simulator.h" #include "fdbrpc/SimulatorProcessInfo.h" + +struct SharedFlowMutex : ReferenceCounted { + FlowMutex mutex; +}; + Future callbackHandler(Reference conn, Future readRequestDone, Reference requestHandler, Reference req, - FlowMutex* mutex) { + Reference mutexHolder) { auto response = makeReference(); UnsentPacketQueue content; response->data.content = &content; @@ -58,7 +63,7 @@ Future callbackHandler(Reference conn, } // take out response mutex to ensure no parallel writers to response connection // FIXME: is this necessary? I think it is - FlowMutex::Lock lock = co_await mutex->take(); + FlowMutex::Lock lock = co_await mutexHolder->mutex.take(); try { co_await response->write(conn); } catch (Error& e) { @@ -79,7 +84,7 @@ Future connectionHandler(Reference server, Reference requestHandler) { try { // TODO do we actually have multiple requests on a connection? how does this work - FlowMutex responseMutex; + Reference responseMutex = makeReference(); Future readPrevRequest = Future(Void()); co_await conn->acceptHandshake(); while (true) { @@ -88,7 +93,7 @@ Future connectionHandler(Reference server, co_await conn->onReadable(); auto req = makeReference(); readPrevRequest = req->read(conn, false); - server->actors.add(callbackHandler(conn, readPrevRequest, requestHandler, req, &responseMutex)); + server->actors.add(callbackHandler(conn, readPrevRequest, requestHandler, req, responseMutex)); } } catch (Error& e) { if (e.code() != error_code_actor_cancelled) { diff --git a/fdbrpc/sim2.cpp b/fdbrpc/sim2.cpp index 9678964b335..de7ca65dc59 100644 --- a/fdbrpc/sim2.cpp +++ b/fdbrpc/sim2.cpp @@ -48,6 +48,7 @@ #include "crc32/crc32c.h" #include "fdbrpc/TraceFileIO.h" #include "flow/flow.h" +#include "flow/CoroUtils.h" #include "flow/swift.h" #include "flow/swift/ABI/Task.h" #include "flow/genericactors.actor.h" @@ -381,7 +382,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted { rollRandomClose(); int64_t avail = receivedBytes.get() - readBytes.get(); // SOMEDAY: random? - if (avail == 0 && !incomingClosed.get() && processTerminating(peerProcess) && + if (avail == 0 && !incomingClosed.get() && peerProcess && processTerminating(peerProcess) && sentBytes.get() == receivedBytes.get()) { setIncomingClosed(); } @@ -478,10 +479,71 @@ struct Sim2Conn final : IConnection, ReferenceCounted { } // SOMEDAY: acknowledgedBytes instead of receivedBytes static bool processTerminating(const ISimulator::ProcessInfo* process) { - return process->failed || process->rebooting || process->shutdownSignal.isSet() || + return !process || process->failed || process->rebooting || process->shutdownSignal.isSet() || process->terminatedSignal.isSet(); } + static Future hopToProcessOrTermination(ISimulator::ProcessInfo* process, TaskPriority taskID) { + if (g_simulator->getCurrentProcess() == process) { + co_return !processTerminating(process); + } + if (processTerminating(process)) { + co_return false; + } + auto result = co_await race(g_simulator->onProcess(process, taskID), process->onTerminated()); + co_return result.index() == 0 && !processTerminating(process); + } + + static Future delayOrTermination(ISimulator::ProcessInfo* process, double seconds, TaskPriority taskID) { + if (processTerminating(process)) { + co_return false; + } + auto result = co_await race(delay(seconds, taskID), process->onTerminated()); + co_return result.index() == 0 && !processTerminating(process); + } + + static Future waitForSenderWakeup(Sim2Conn* self) { + if (processTerminating(self->peerProcess)) { + (void)co_await race(self->writtenBytes.onChange(), self->connWakeup.onTrigger()); + co_return; + } + (void)co_await race( + self->writtenBytes.onChange(), self->connWakeup.onTrigger(), self->peerProcess->onTerminated()); + co_return; + } + + static Future waitForReceiverWakeup(Sim2Conn* self) { + if (processTerminating(self->peerProcess)) { + (void)co_await self->connWakeup.onTrigger(); + co_return; + } + (void)co_await race(self->connWakeup.onTrigger(), self->peerProcess->onTerminated()); + co_return; + } + + static Future waitForReadableStateChange(Sim2Conn* self) { + (void)co_await race(self->receivedBytes.onChange(), self->incomingClosed.onChange()); + co_return; + } + + static Future waitForReadableStateChangeOrTermination(Sim2Conn* self) { + if (processTerminating(self->peerProcess)) { + (void)co_await race(self->receivedBytes.onChange(), self->incomingClosed.onChange()); + co_return; + } + (void)co_await race( + self->receivedBytes.onChange(), self->incomingClosed.onChange(), self->peerProcess->onTerminated()); + co_return; + } + + static Future waitForWritableStateChangeOrTermination(Reference targetPeer) { + (void)co_await race(targetPeer->receivedBytes.onChange(), + targetPeer->incomingClosed.onChange(), + targetPeer->connWakeup.onTrigger(), + targetPeer->process->onTerminated()); + co_return; + } + bool writeClosed(const Reference& targetPeer) const { if (!targetPeer) { return true; @@ -493,11 +555,13 @@ struct Sim2Conn final : IConnection, ReferenceCounted { } static Future triggerStopReceive(Sim2Conn* self) { - // armStopReceive() can be called from either endpoint's simulator process. The resumed waiters must therefore - // re-check and, if needed, hop back to their owning process before mutating connection state. + // armStopReceive() can be called from either endpoint's simulator process, and this delay therefore fires on + // whichever process armed it. Resumed waiters must re-check their state and, if needed, hop back to their + // owning process before mutating connection state. co_await delay(1.0); self->stopReceive.set(true); self->connWakeup.trigger(); + co_return; } void armStopReceive() { @@ -511,10 +575,11 @@ struct Sim2Conn final : IConnection, ReferenceCounted { void setIncomingClosed() { ASSERT(g_simulator->getCurrentProcess() == process); if (!incomingClosed.get()) { - TraceEvent("Sim2IncomingClosed") - .detail("Receiver", process->address) - .detail("Peer", peerProcess->address) - .detail("Connection", dbgid); + TraceEvent traceEvent("Sim2IncomingClosed"); + traceEvent.detail("Receiver", process->address).detail("Connection", dbgid); + if (peerProcess) { + traceEvent.detail("Peer", peerProcess->address); + } incomingClosed.set(true); } } @@ -535,24 +600,19 @@ struct Sim2Conn final : IConnection, ReferenceCounted { if (self->stopReceive.get() || processTerminating(self->peerProcess)) { co_return; } - co_await ((self->writtenBytes.onChange() || self->connWakeup.onTrigger()) || - self->peerProcess->onTerminated()); + co_await waitForSenderWakeup(self); if (processTerminating(self->peerProcess)) { co_return; } - if (g_simulator->getCurrentProcess() != self->peerProcess) { + if (g_simulator->getCurrentProcess() != self->peerProcess && + !co_await hopToProcessOrTermination(self->peerProcess, currentTaskID)) { // stopReceive can wake us on the closer's process, but sentBytes must advance on the sender side. - co_await (g_simulator->onProcess(self->peerProcess, currentTaskID) || - self->peerProcess->onTerminated()); - if (processTerminating(self->peerProcess)) { - co_return; - } + co_return; } } ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); - co_await (delay(.002 * deterministicRandom()->random01(), currentTaskID) || - self->peerProcess->onTerminated()); - if (processTerminating(self->peerProcess)) { + if (!co_await delayOrTermination( + self->peerProcess, .002 * deterministicRandom()->random01(), currentTaskID)) { co_return; } ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); @@ -566,46 +626,42 @@ struct Sim2Conn final : IConnection, ReferenceCounted { loop { TaskPriority currentTaskID = g_network->getCurrentTask(); if (self->sentBytes.get() != self->receivedBytes.get() && !processTerminating(self->peerProcess)) { - co_await (g_simulator->onProcess(self->peerProcess, currentTaskID) || - self->peerProcess->onTerminated()); + (void)co_await hopToProcessOrTermination(self->peerProcess, currentTaskID); + // If the hop fails because peerProcess terminates, fall through to the processTerminating() + // handling below so already-sent bytes can still be delivered from self->process. } while (self->sentBytes.get() == self->receivedBytes.get()) { if (self->stopReceive.get() || processTerminating(self->peerProcess)) { // stopReceive can become ready on the peer-closing process, but incomingClosed must notify // readers on the owning receiver process. - if (g_simulator->getCurrentProcess() != self->process) { - co_await g_simulator->onProcess(self->process, currentTaskID); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; } if ((self->stopReceive.get() || processTerminating(self->peerProcess)) && self->sentBytes.get() == self->receivedBytes.get()) { self->setIncomingClosed(); co_return; } - if (!processTerminating(self->peerProcess)) { - co_await (g_simulator->onProcess(self->peerProcess, currentTaskID) || - self->peerProcess->onTerminated()); - } - if (processTerminating(self->peerProcess)) { + if (processTerminating(self->peerProcess) || + !co_await hopToProcessOrTermination(self->peerProcess, currentTaskID)) { break; } continue; } - co_await (self->connWakeup.onTrigger() || self->peerProcess->onTerminated()); - if (g_simulator->getCurrentProcess() != self->peerProcess) { + co_await waitForReceiverWakeup(self); + if (g_simulator->getCurrentProcess() != self->peerProcess && + !processTerminating(self->peerProcess)) { // sentBytes notifications arrive on peerProcess, but stopReceive can resume us on the closer. - if (processTerminating(self->peerProcess)) { - break; - } - co_await (g_simulator->onProcess(self->peerProcess, currentTaskID) || - self->peerProcess->onTerminated()); - if (processTerminating(self->peerProcess)) { + if (!co_await hopToProcessOrTermination(self->peerProcess, currentTaskID)) { break; } } } if (processTerminating(self->peerProcess)) { - if (g_simulator->getCurrentProcess() != self->process) { - co_await g_simulator->onProcess(self->process, currentTaskID); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; } if (self->sentBytes.get() == self->receivedBytes.get()) { self->setIncomingClosed(); @@ -632,14 +688,15 @@ struct Sim2Conn final : IConnection, ReferenceCounted { double sendDelay = g_clogging.getSendDelay( self->peerProcess->address, self->process->address, self->isStableConnection()); double sendDelayEnd = now() + sendDelay; - co_await (delay(sendDelay, currentTaskID) || self->peerProcess->onTerminated()); + co_await delayOrTermination(self->peerProcess, sendDelay, currentTaskID); // Once bytes have advanced into sentBytes they are already "on the wire". Preserve the normal // sender-side delay while the peer is alive, but if the sender dies mid-flight, finish only the // remaining network delay on the receiver side so already-sent bytes do not get stranded behind a dead // process. double remainingSendDelay = std::max(0.0, sendDelayEnd - now()); - if (g_simulator->getCurrentProcess() != self->process) { - co_await g_simulator->onProcess(self->process, currentTaskID); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; } if (remainingSendDelay > ISimulator::TIME_EPS) { co_await delay(remainingSendDelay); @@ -666,8 +723,11 @@ struct Sim2Conn final : IConnection, ReferenceCounted { if (err.present()) { if (err.get().code() == error_code_connection_failed) { TaskPriority currentTaskID = g_network->getCurrentTask(); - if (g_simulator->getCurrentProcess() != self->process) { - co_await g_simulator->onProcess(self->process, currentTaskID); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + // If our own process is terminating too, there is no surviving owner left that could observe a + // re-thrown pipe error, so finishing shutdown here is equivalent. + co_return; } self->setIncomingClosed(); } @@ -686,17 +746,26 @@ struct Sim2Conn final : IConnection, ReferenceCounted { throw connection_failed(); } if (processTerminating(self->peerProcess) && self->sentBytes.get() == self->receivedBytes.get()) { - self->setIncomingClosed(); - throw connection_failed(); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; + } + // sentBytes can still advance while we hop back to the owning process, so re-check before + // declaring the connection drained and closed. + if (processTerminating(self->peerProcess) && self->sentBytes.get() == self->receivedBytes.get()) { + self->setIncomingClosed(); + throw connection_failed(); + } + continue; } if (!processTerminating(self->peerProcess)) { - co_await ((self->receivedBytes.onChange() || self->incomingClosed.onChange()) || - self->peerProcess->onTerminated()); + co_await waitForReadableStateChangeOrTermination(self); } else { - co_await (self->receivedBytes.onChange() || self->incomingClosed.onChange()); + co_await waitForReadableStateChange(self); } - if (g_simulator->getCurrentProcess() != self->process) { - co_await g_simulator->onProcess(self->process, currentTaskID); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; } self->rollRandomClose(); } @@ -717,11 +786,10 @@ struct Sim2Conn final : IConnection, ReferenceCounted { ASSERT(g_simulator->getCurrentProcess() == self->process); co_return; } - co_await (((targetPeer->receivedBytes.onChange() || targetPeer->incomingClosed.onChange()) || - targetPeer->connWakeup.onTrigger()) || - targetPeer->process->onTerminated()); - if (g_simulator->getCurrentProcess() != self->process) { - co_await g_simulator->onProcess(self->process, currentTaskID); + co_await waitForWritableStateChangeOrTermination(targetPeer); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; } } } catch (Error& e) {