diff --git a/ggml/src/ggml-rpc/ggml-rpc.cpp b/ggml/src/ggml-rpc/ggml-rpc.cpp index 505bec73d37..7b0a74b6d50 100644 --- a/ggml/src/ggml-rpc/ggml-rpc.cpp +++ b/ggml/src/ggml-rpc/ggml-rpc.cpp @@ -304,9 +304,24 @@ static bool parse_endpoint(const std::string & endpoint, std::string & host, int return true; } +// Per-socket lock so concurrent set_tensor/etc calls from multiple +// threads don't interleave their bytes on the shared cached socket +// (which corrupts the protocol and trips "Remote RPC server crashed +// or returned malformed response"). Local TCP loopback is fast enough +// that the race window rarely opens, but WAN latency widens it. +// recursive_mutex because the response variant calls the no-response +// variant from inside an already-locked section. +static std::recursive_mutex & socket_send_mutex(const std::shared_ptr & sock) { + static std::mutex map_mutex; + static std::unordered_map mutexes; + std::lock_guard lk(map_mutex); + return mutexes[sock.get()]; +} + // RPC request : | rpc_cmd (1 byte) | request_size (8 bytes) | request_data (request_size bytes) | // No response static bool send_rpc_cmd(socket_ptr sock, enum rpc_cmd cmd, const void * input, size_t input_size) { + std::lock_guard lk(socket_send_mutex(sock)); uint8_t cmd_byte = cmd; if (!sock->send_data(&cmd_byte, sizeof(cmd_byte))) { return false; @@ -323,6 +338,7 @@ static bool send_rpc_cmd(socket_ptr sock, enum rpc_cmd cmd, const void * input, // RPC request : | rpc_cmd (1 byte) | request_size (8 bytes) | request_data (request_size bytes) | // RPC response: | response_size (8 bytes) | response_data (response_size bytes) | static bool send_rpc_cmd(socket_ptr sock, enum rpc_cmd cmd, const void * input, size_t input_size, void * output, size_t output_size) { + std::lock_guard lk(socket_send_mutex(sock)); if (!send_rpc_cmd(sock, cmd, input, input_size)) { return false; }