Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions fdbrpc/HTTPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
#include "flow/Trace.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/SimulatorProcessInfo.h"

struct SharedFlowMutex : ReferenceCounted<SharedFlowMutex> {
FlowMutex mutex;
};

Future<Void> callbackHandler(Reference<IConnection> conn,
Future<Void> readRequestDone,
Reference<HTTP::IRequestHandler> requestHandler,
Reference<HTTP::IncomingRequest> req,
FlowMutex* mutex) {
Reference<SharedFlowMutex> mutexHolder) {
auto response = makeReference<HTTP::OutgoingResponse>();
UnsentPacketQueue content;
response->data.content = &content;
Expand Down Expand Up @@ -58,7 +63,7 @@ Future<Void> callbackHandler(Reference<IConnection> conn,
}
// take out response mutex to ensure no parallel writers to response connection
// FIXME: is this necessary? I think it is
FlowMutex::Lock lock = co_await mutex->take();
FlowMutex::Lock lock = co_await mutexHolder->mutex.take();
try {
co_await response->write(conn);
} catch (Error& e) {
Expand All @@ -79,7 +84,7 @@ Future<Void> connectionHandler(Reference<HTTP::SimServerContext> server,
Reference<HTTP::IRequestHandler> requestHandler) {
try {
// TODO do we actually have multiple requests on a connection? how does this work
FlowMutex responseMutex;
Reference<SharedFlowMutex> responseMutex = makeReference<SharedFlowMutex>();
Future<Void> readPrevRequest = Future<Void>(Void());
co_await conn->acceptHandshake();
while (true) {
Expand All @@ -88,7 +93,7 @@ Future<Void> connectionHandler(Reference<HTTP::SimServerContext> server,
co_await conn->onReadable();
auto req = makeReference<HTTP::IncomingRequest>();
readPrevRequest = req->read(conn, false);
server->actors.add(callbackHandler(conn, readPrevRequest, requestHandler, req, &responseMutex));
server->actors.add(callbackHandler(conn, readPrevRequest, requestHandler, req, responseMutex));
}
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
Expand Down
4 changes: 4 additions & 0 deletions fdbrpc/include/fdbrpc/SimulatorProcessInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ struct ProcessInfo : NonCopyable {
}

Future<KillType> onShutdown() { return shutdownSignal.getFuture(); }
// Fires for both hard kills and reboot-style shutdowns. Connection code cannot rely on onShutdown(), because
// KillInstantly sets failed without sending shutdownSignal.
Future<Void> onTerminated() { return terminatedSignal.getFuture(); }

bool isSpawnedKVProcess() const {
// SOMEDAY: use a separate bool may be better?
Expand Down Expand Up @@ -167,6 +170,7 @@ struct ProcessInfo : NonCopyable {

// Members not for external use
Promise<KillType> shutdownSignal;
Promise<Void> terminatedSignal;
};

} // namespace simulator
Expand Down
Loading