Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions ggml/src/ggml-rpc/ggml-rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,24 @@ static bool parse_endpoint(const std::string & endpoint, std::string & host, int
return true;
}

// Per-socket lock so concurrent set_tensor/etc calls from multiple
// threads don't interleave their bytes on the shared cached socket
// (which corrupts the protocol and trips "Remote RPC server crashed
// or returned malformed response"). Local TCP loopback is fast enough
// that the race window rarely opens, but WAN latency widens it.
// recursive_mutex because the response variant calls the no-response
// variant from inside an already-locked section.
static std::recursive_mutex & socket_send_mutex(const std::shared_ptr<socket_t> & sock) {
static std::mutex map_mutex;
static std::unordered_map<socket_t *, std::recursive_mutex> mutexes;
std::lock_guard<std::mutex> lk(map_mutex);
return mutexes[sock.get()];
}

// RPC request : | rpc_cmd (1 byte) | request_size (8 bytes) | request_data (request_size bytes) |
// No response
static bool send_rpc_cmd(socket_ptr sock, enum rpc_cmd cmd, const void * input, size_t input_size) {
std::lock_guard<std::recursive_mutex> lk(socket_send_mutex(sock));
uint8_t cmd_byte = cmd;
if (!sock->send_data(&cmd_byte, sizeof(cmd_byte))) {
return false;
Expand All @@ -323,6 +338,7 @@ static bool send_rpc_cmd(socket_ptr sock, enum rpc_cmd cmd, const void * input,
// RPC request : | rpc_cmd (1 byte) | request_size (8 bytes) | request_data (request_size bytes) |
// RPC response: | response_size (8 bytes) | response_data (response_size bytes) |
static bool send_rpc_cmd(socket_ptr sock, enum rpc_cmd cmd, const void * input, size_t input_size, void * output, size_t output_size) {
std::lock_guard<std::recursive_mutex> lk(socket_send_mutex(sock));
if (!send_rpc_cmd(sock, cmd, input, input_size)) {
return false;
}
Expand Down