diff --git a/fdbrpc/HTTPServer.cpp b/fdbrpc/HTTPServer.cpp index 68e877bbf4a..31d19718226 100644 --- a/fdbrpc/HTTPServer.cpp +++ b/fdbrpc/HTTPServer.cpp @@ -23,11 +23,16 @@ #include "flow/Trace.h" #include "fdbrpc/simulator.h" #include "fdbrpc/SimulatorProcessInfo.h" + +struct SharedFlowMutex : ReferenceCounted { + FlowMutex mutex; +}; + Future callbackHandler(Reference conn, Future readRequestDone, Reference requestHandler, Reference req, - FlowMutex* mutex) { + Reference mutexHolder) { auto response = makeReference(); UnsentPacketQueue content; response->data.content = &content; @@ -58,7 +63,7 @@ Future callbackHandler(Reference conn, } // take out response mutex to ensure no parallel writers to response connection // FIXME: is this necessary? I think it is - FlowMutex::Lock lock = co_await mutex->take(); + FlowMutex::Lock lock = co_await mutexHolder->mutex.take(); try { co_await response->write(conn); } catch (Error& e) { @@ -79,7 +84,7 @@ Future connectionHandler(Reference server, Reference requestHandler) { try { // TODO do we actually have multiple requests on a connection? how does this work - FlowMutex responseMutex; + Reference responseMutex = makeReference(); Future readPrevRequest = Future(Void()); co_await conn->acceptHandshake(); while (true) { @@ -88,7 +93,7 @@ Future connectionHandler(Reference server, co_await conn->onReadable(); auto req = makeReference(); readPrevRequest = req->read(conn, false); - server->actors.add(callbackHandler(conn, readPrevRequest, requestHandler, req, &responseMutex)); + server->actors.add(callbackHandler(conn, readPrevRequest, requestHandler, req, responseMutex)); } } catch (Error& e) { if (e.code() != error_code_actor_cancelled) { diff --git a/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h b/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h index 49cbec09e4d..874805301da 100644 --- a/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h +++ b/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h @@ -90,6 +90,9 @@ struct ProcessInfo : NonCopyable { } Future onShutdown() { return shutdownSignal.getFuture(); } + // Fires for both hard kills and reboot-style shutdowns. Connection code cannot rely on onShutdown(), because + // KillInstantly sets failed without sending shutdownSignal. + Future onTerminated() { return terminatedSignal.getFuture(); } bool isSpawnedKVProcess() const { // SOMEDAY: use a separate bool may be better? @@ -167,6 +170,7 @@ struct ProcessInfo : NonCopyable { // Members not for external use Promise shutdownSignal; + Promise terminatedSignal; }; } // namespace simulator diff --git a/fdbrpc/sim2.cpp b/fdbrpc/sim2.cpp index 516ac1b2e1d..de7ca65dc59 100644 --- a/fdbrpc/sim2.cpp +++ b/fdbrpc/sim2.cpp @@ -48,6 +48,7 @@ #include "crc32/crc32c.h" #include "fdbrpc/TraceFileIO.h" #include "flow/flow.h" +#include "flow/CoroUtils.h" #include "flow/swift.h" #include "flow/swift/ABI/Task.h" #include "flow/genericactors.actor.h" @@ -302,9 +303,8 @@ SimClogging g_clogging; struct Sim2Conn final : IConnection, ReferenceCounted { explicit Sim2Conn(ISimulator::ProcessInfo* process) : opened(false), closedByCaller(false), stableConnection(false), trustedPeer(true), process(process), - dbgid(deterministicRandom()->randomUniqueID()), stopReceive(Never()) { - pipes = sender(this) && receiver(this); - } + peerProcess(nullptr), dbgid(deterministicRandom()->randomUniqueID()), stopReceive(false), + incomingClosed(false) {} // connect() is called on a pair of connections immediately after creation; logically it is part of the constructor // and no other method may be called previously! @@ -344,6 +344,10 @@ struct Sim2Conn final : IConnection, ReferenceCounted { .detail("SendBufSize", sendBufSize) .detail("Latency", latency) .detail("StableConnection", stableConnection); + + // sender()/receiver() dereference peer-side state immediately, so they must not run until connect() has + // published the peer metadata. + pipes = sender(this) && receiver(this); } ~Sim2Conn() { ASSERT_ABORT(!opened || closedByCaller); } @@ -361,7 +365,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted { Future onWritable() override { return whenWritable(this); } Future onReadable() override { return whenReadable(this); } - bool isPeerGone() const { return !peer || peerProcess->failed; } + bool isPeerGone() const { return !peer || processTerminating(peerProcess); } bool hasTrustedPeer() const override { return trustedPeer; } @@ -369,7 +373,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted { void peerClosed() { leakedConnectionTracker = trackLeakedConnection(this); - stopReceive = delay(1.0); + armStopReceive(); } // Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might @@ -378,6 +382,13 @@ struct Sim2Conn final : IConnection, ReferenceCounted { rollRandomClose(); int64_t avail = receivedBytes.get() - readBytes.get(); // SOMEDAY: random? + if (avail == 0 && !incomingClosed.get() && peerProcess && processTerminating(peerProcess) && + sentBytes.get() == receivedBytes.get()) { + setIncomingClosed(); + } + if (avail == 0 && incomingClosed.get()) { + throw connection_failed(); + } int toRead = std::min(end - begin, avail); ASSERT(toRead >= 0 && toRead <= recvBuf.size() && toRead <= end - begin); for (int i = 0; i < toRead; i++) @@ -392,6 +403,10 @@ struct Sim2Conn final : IConnection, ReferenceCounted { int write(SendBuffer const* buffer, int limit) override { rollRandomClose(); ASSERT(limit > 0); + Reference targetPeer = peer; + if (writeClosed(targetPeer)) { + throw connection_failed(); + } int toSend = 0; if (BUGGIFY && !stableConnection) { @@ -410,19 +425,18 @@ struct Sim2Conn final : IConnection, ReferenceCounted { if (BUGGIFY && !stableConnection) toSend = std::min(toSend, deterministicRandom()->randomInt(0, 1000)); - if (!peer) - return toSend; - toSend = std::min(toSend, peer->availableSendBufferForPeer()); + toSend = std::min(toSend, targetPeer->availableSendBufferForPeer()); ASSERT(toSend >= 0); int leftToSend = toSend; for (auto p = buffer; p && leftToSend > 0; p = p->next) { int ts = std::min(leftToSend, p->bytes_written - p->bytes_sent); - peer->recvBuf.insert(peer->recvBuf.end(), p->data() + p->bytes_sent, p->data() + p->bytes_sent + ts); + targetPeer->recvBuf.insert( + targetPeer->recvBuf.end(), p->data() + p->bytes_sent, p->data() + p->bytes_sent + ts); leftToSend -= ts; } ASSERT(leftToSend == 0); - peer->writtenBytes.set(peer->writtenBytes.get() + toSend); + targetPeer->writtenBytes.set(targetPeer->writtenBytes.get() + toSend); return toSend; } @@ -447,18 +461,133 @@ struct Sim2Conn final : IConnection, ReferenceCounted { int sendBufSize; Future leakedConnectionTracker; - + AsyncVar stopReceive; + bool stopReceiveArmed = false; + // Becomes true after the peer has closed and no more bytes can be delivered to recvBuf. + AsyncVar incomingClosed; + // Supplementary wakeup source for sender()/receiver()/whenWritable(). + // Treat this as a level-triggered helper only: trigger() can be lost, so every waiter must first check the + // persistent state it cares about (peer, stopReceive, writtenBytes, sentBytes, receivedBytes) before awaiting + // it. + AsyncTrigger connWakeup; + // Must be destroyed before the AsyncVars it mutates or awaits on. + Future stopReceiveTask; Future pipes; - Future stopReceive; int availableSendBufferForPeer() const { return sendBufSize - (writtenBytes.get() - receivedBytes.get()); } // SOMEDAY: acknowledgedBytes instead of receivedBytes + static bool processTerminating(const ISimulator::ProcessInfo* process) { + return !process || process->failed || process->rebooting || process->shutdownSignal.isSet() || + process->terminatedSignal.isSet(); + } + + static Future hopToProcessOrTermination(ISimulator::ProcessInfo* process, TaskPriority taskID) { + if (g_simulator->getCurrentProcess() == process) { + co_return !processTerminating(process); + } + if (processTerminating(process)) { + co_return false; + } + auto result = co_await race(g_simulator->onProcess(process, taskID), process->onTerminated()); + co_return result.index() == 0 && !processTerminating(process); + } + + static Future delayOrTermination(ISimulator::ProcessInfo* process, double seconds, TaskPriority taskID) { + if (processTerminating(process)) { + co_return false; + } + auto result = co_await race(delay(seconds, taskID), process->onTerminated()); + co_return result.index() == 0 && !processTerminating(process); + } + + static Future waitForSenderWakeup(Sim2Conn* self) { + if (processTerminating(self->peerProcess)) { + (void)co_await race(self->writtenBytes.onChange(), self->connWakeup.onTrigger()); + co_return; + } + (void)co_await race( + self->writtenBytes.onChange(), self->connWakeup.onTrigger(), self->peerProcess->onTerminated()); + co_return; + } + + static Future waitForReceiverWakeup(Sim2Conn* self) { + if (processTerminating(self->peerProcess)) { + (void)co_await self->connWakeup.onTrigger(); + co_return; + } + (void)co_await race(self->connWakeup.onTrigger(), self->peerProcess->onTerminated()); + co_return; + } + + static Future waitForReadableStateChange(Sim2Conn* self) { + (void)co_await race(self->receivedBytes.onChange(), self->incomingClosed.onChange()); + co_return; + } + + static Future waitForReadableStateChangeOrTermination(Sim2Conn* self) { + if (processTerminating(self->peerProcess)) { + (void)co_await race(self->receivedBytes.onChange(), self->incomingClosed.onChange()); + co_return; + } + (void)co_await race( + self->receivedBytes.onChange(), self->incomingClosed.onChange(), self->peerProcess->onTerminated()); + co_return; + } + + static Future waitForWritableStateChangeOrTermination(Reference targetPeer) { + (void)co_await race(targetPeer->receivedBytes.onChange(), + targetPeer->incomingClosed.onChange(), + targetPeer->connWakeup.onTrigger(), + targetPeer->process->onTerminated()); + co_return; + } + + bool writeClosed(const Reference& targetPeer) const { + if (!targetPeer) { + return true; + } + // Once the peer has started shutting down, further writes can no longer be delivered reliably. + return g_clogging.disconnected(process->address.ip, targetPeer->process->address.ip) || + processTerminating(targetPeer->process) || targetPeer->stopReceiveArmed || + targetPeer->incomingClosed.get() || targetPeer->pipes.isReady(); + } + + static Future triggerStopReceive(Sim2Conn* self) { + // armStopReceive() can be called from either endpoint's simulator process, and this delay therefore fires on + // whichever process armed it. Resumed waiters must re-check their state and, if needed, hop back to their + // owning process before mutating connection state. + co_await delay(1.0); + self->stopReceive.set(true); + self->connWakeup.trigger(); + co_return; + } + + void armStopReceive() { + if (!stopReceiveArmed) { + stopReceiveArmed = true; + stopReceiveTask = triggerStopReceive(this); + connWakeup.trigger(); + } + } + + void setIncomingClosed() { + ASSERT(g_simulator->getCurrentProcess() == process); + if (!incomingClosed.get()) { + TraceEvent traceEvent("Sim2IncomingClosed"); + traceEvent.detail("Receiver", process->address).detail("Connection", dbgid); + if (peerProcess) { + traceEvent.detail("Peer", peerProcess->address); + } + incomingClosed.set(true); + } + } + void closeInternal() { if (peer) { peer->peerClosed(); - stopReceive = delay(1.0); + armStopReceive(); } leakedConnectionTracker.cancel(); peer.clear(); @@ -466,56 +595,178 @@ struct Sim2Conn final : IConnection, ReferenceCounted { static Future sender(Sim2Conn* self) { loop { - co_await self->writtenBytes.onChange(); // takes place on peer! + TaskPriority currentTaskID = g_network->getCurrentTask(); + while (self->writtenBytes.get() == self->sentBytes.get()) { + if (self->stopReceive.get() || processTerminating(self->peerProcess)) { + co_return; + } + co_await waitForSenderWakeup(self); + if (processTerminating(self->peerProcess)) { + co_return; + } + if (g_simulator->getCurrentProcess() != self->peerProcess && + !co_await hopToProcessOrTermination(self->peerProcess, currentTaskID)) { + // stopReceive can wake us on the closer's process, but sentBytes must advance on the sender side. + co_return; + } + } + ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); + if (!co_await delayOrTermination( + self->peerProcess, .002 * deterministicRandom()->random01(), currentTaskID)) { + co_return; + } ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); - co_await delay(.002 * deterministicRandom()->random01()); self->sentBytes.set(self->writtenBytes.get()); // or possibly just some sometimes... + self->connWakeup.trigger(); } } static Future receiver(Sim2Conn* self) { - loop { - if (self->sentBytes.get() != self->receivedBytes.get()) - co_await g_simulator->onProcess(self->peerProcess); - while (self->sentBytes.get() == self->receivedBytes.get()) - co_await self->sentBytes.onChange(); - ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); + Optional err; + try { + loop { + TaskPriority currentTaskID = g_network->getCurrentTask(); + if (self->sentBytes.get() != self->receivedBytes.get() && !processTerminating(self->peerProcess)) { + (void)co_await hopToProcessOrTermination(self->peerProcess, currentTaskID); + // If the hop fails because peerProcess terminates, fall through to the processTerminating() + // handling below so already-sent bytes can still be delivered from self->process. + } + while (self->sentBytes.get() == self->receivedBytes.get()) { + if (self->stopReceive.get() || processTerminating(self->peerProcess)) { + // stopReceive can become ready on the peer-closing process, but incomingClosed must notify + // readers on the owning receiver process. + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; + } + if ((self->stopReceive.get() || processTerminating(self->peerProcess)) && + self->sentBytes.get() == self->receivedBytes.get()) { + self->setIncomingClosed(); + co_return; + } + if (processTerminating(self->peerProcess) || + !co_await hopToProcessOrTermination(self->peerProcess, currentTaskID)) { + break; + } + continue; + } + co_await waitForReceiverWakeup(self); + if (g_simulator->getCurrentProcess() != self->peerProcess && + !processTerminating(self->peerProcess)) { + // sentBytes notifications arrive on peerProcess, but stopReceive can resume us on the closer. + if (!co_await hopToProcessOrTermination(self->peerProcess, currentTaskID)) { + break; + } + } + } + if (processTerminating(self->peerProcess)) { + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; + } + if (self->sentBytes.get() == self->receivedBytes.get()) { + self->setIncomingClosed(); + co_return; + } + } else { + ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); + } - // Simulated network disconnection. Make sure to only throw connection_failed() on the sender process. - if (g_clogging.disconnected(self->peerProcess->address.ip, self->process->address.ip)) { - TraceEvent("SimulatedDisconnection") - .detail("Phase", "DataTransfer") - .detail("Sender", self->peerProcess->address) - .detail("Receiver", self->process->address); - throw connection_failed(); - } + // Simulated network disconnection. Make sure to only throw connection_failed() on the sender process. + if (!processTerminating(self->peerProcess) && + g_clogging.disconnected(self->peerProcess->address.ip, self->process->address.ip)) { + TraceEvent("SimulatedDisconnection") + .detail("Phase", "DataTransfer") + .detail("Sender", self->peerProcess->address) + .detail("Receiver", self->process->address); + throw connection_failed(); + } - int64_t pos = - deterministicRandom()->random01() < .5 - ? self->sentBytes.get() - : deterministicRandom()->randomInt64(self->receivedBytes.get(), self->sentBytes.get() + 1); - co_await delay(g_clogging.getSendDelay( - self->peerProcess->address, self->process->address, self->isStableConnection())); - co_await g_simulator->onProcess(self->process); - ASSERT(g_simulator->getCurrentProcess() == self->process); - co_await delay(g_clogging.getRecvDelay( - self->peerProcess->address, self->process->address, self->isStableConnection())); - ASSERT(g_simulator->getCurrentProcess() == self->process); - if (self->stopReceive.isReady()) { - co_await Future(Never()); + int64_t pos = + deterministicRandom()->random01() < .5 + ? self->sentBytes.get() + : deterministicRandom()->randomInt64(self->receivedBytes.get(), self->sentBytes.get() + 1); + double sendDelay = g_clogging.getSendDelay( + self->peerProcess->address, self->process->address, self->isStableConnection()); + double sendDelayEnd = now() + sendDelay; + co_await delayOrTermination(self->peerProcess, sendDelay, currentTaskID); + // Once bytes have advanced into sentBytes they are already "on the wire". Preserve the normal + // sender-side delay while the peer is alive, but if the sender dies mid-flight, finish only the + // remaining network delay on the receiver side so already-sent bytes do not get stranded behind a dead + // process. + double remainingSendDelay = std::max(0.0, sendDelayEnd - now()); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; + } + if (remainingSendDelay > ISimulator::TIME_EPS) { + co_await delay(remainingSendDelay); + } + ASSERT(g_simulator->getCurrentProcess() == self->process); + co_await delay(g_clogging.getRecvDelay( + self->peerProcess->address, self->process->address, self->isStableConnection())); + ASSERT(g_simulator->getCurrentProcess() == self->process); + // A peer close only prevents future bytes from arriving. If stopReceive becomes ready after sentBytes + // moved forward, keep the connection readable until we've delivered through sentBytes. + bool shouldCloseAfterDelivery = + (self->stopReceive.get() || processTerminating(self->peerProcess)) && pos == self->sentBytes.get(); + self->receivedBytes.set(pos); + if (shouldCloseAfterDelivery) { + self->setIncomingClosed(); + co_return; + } + co_await Future(Void()); // Prior notification can delete self and cancel this actor. + ASSERT(g_simulator->getCurrentProcess() == self->process); } - self->receivedBytes.set(pos); - co_await Future(Void()); // Prior notification can delete self and cancel this actor - ASSERT(g_simulator->getCurrentProcess() == self->process); + } catch (Error& e) { + err = e; + } + if (err.present()) { + if (err.get().code() == error_code_connection_failed) { + TaskPriority currentTaskID = g_network->getCurrentTask(); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + // If our own process is terminating too, there is no surviving owner left that could observe a + // re-thrown pipe error, so finishing shutdown here is equivalent. + co_return; + } + self->setIncomingClosed(); + } + throw err.get(); } } static Future whenReadable(Sim2Conn* self) { try { loop { + TaskPriority currentTaskID = g_network->getCurrentTask(); if (self->readBytes.get() != self->receivedBytes.get()) { ASSERT(g_simulator->getCurrentProcess() == self->process); co_return; } - co_await self->receivedBytes.onChange(); + if (self->incomingClosed.get()) { + throw connection_failed(); + } + if (processTerminating(self->peerProcess) && self->sentBytes.get() == self->receivedBytes.get()) { + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; + } + // sentBytes can still advance while we hop back to the owning process, so re-check before + // declaring the connection drained and closed. + if (processTerminating(self->peerProcess) && self->sentBytes.get() == self->receivedBytes.get()) { + self->setIncomingClosed(); + throw connection_failed(); + } + continue; + } + if (!processTerminating(self->peerProcess)) { + co_await waitForReadableStateChangeOrTermination(self); + } else { + co_await waitForReadableStateChange(self); + } + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; + } self->rollRandomClose(); } } catch (Error& e) { @@ -526,20 +777,20 @@ struct Sim2Conn final : IConnection, ReferenceCounted { static Future whenWritable(Sim2Conn* self) { try { loop { - if (!self->peer) - co_return; - if (self->peer->availableSendBufferForPeer() > 0) { + TaskPriority currentTaskID = g_network->getCurrentTask(); + Reference targetPeer = self->peer; + if (self->writeClosed(targetPeer)) { + throw connection_failed(); + } + if (targetPeer->availableSendBufferForPeer() > 0) { ASSERT(g_simulator->getCurrentProcess() == self->process); co_return; } - try { - co_await self->peer->receivedBytes.onChange(); - ASSERT(g_simulator->getCurrentProcess() == self->peerProcess); - } catch (Error& e) { - if (e.code() != error_code_broken_promise) - throw; + co_await waitForWritableStateChangeOrTermination(targetPeer); + if (g_simulator->getCurrentProcess() != self->process && + !co_await hopToProcessOrTermination(self->process, currentTaskID)) { + co_return; } - co_await g_simulator->onProcess(self->process); } } catch (Error& e) { ASSERT(g_simulator->getCurrentProcess() == self->process); @@ -1836,6 +2087,9 @@ class Sim2 final : public ISimulator, public INetworkConnections { if (!machine->isSpawnedKVProcess()) latestEventCache.clear(); machine->failed = true; + if (!machine->terminatedSignal.isSet()) { + machine->terminatedSignal.send(Void()); + } } else if (kt == KillType::InjectFaults) { TraceEvent(SevWarn, "FaultMachine") .detail("Name", machine->name) @@ -2900,6 +3154,9 @@ Future doReboot(Uncancellable, ISimulator::ProcessInfo* p, ISimulator::Kil .detail("Cleared", p->cleared) .backtrace(); p->rebooting = true; + if (!p->terminatedSignal.isSet()) { + p->terminatedSignal.send(Void()); + } if ((kt == ISimulator::KillType::RebootAndDelete) || (kt == ISimulator::KillType::RebootProcessAndDelete)) { p->cleared = true; g_simulator->clearAddress(p->address);