diff --git a/source/AgentCommon/Transceiver.cpp b/source/AgentCommon/Transceiver.cpp index 505e33cb8e..756b2d2ddf 100644 --- a/source/AgentCommon/Transceiver.cpp +++ b/source/AgentCommon/Transceiver.cpp @@ -1,6 +1,7 @@ #include "MaaAgent/Transceiver.h" #include +#include #include #include #include @@ -22,6 +23,17 @@ MAA_AGENT_NS_BEGIN +template +static int retry_on_eintr(Func&& func) +{ + int rc; + int retries = 0; + do { + rc = func(); + } while (rc < 0 && zmq_errno() == EINTR && ++retries < 10); + return rc; +} + Transceiver::~Transceiver() { LogFunc; @@ -249,7 +261,8 @@ void Transceiver::uninit_socket() bool Transceiver::alive() { std::unique_lock lock(socket_mutex_); - return zmq_sock_.handle() != nullptr && zmq::detail::poll(&zmq_pollitem_send_, 1, 0); + return zmq_sock_.handle() != nullptr + && retry_on_eintr([&] { return zmq_poll(&zmq_pollitem_send_, 1, 0); }) > 0; } void Transceiver::set_timeout(const std::chrono::milliseconds& timeout) @@ -267,7 +280,8 @@ bool Transceiver::poll(zmq::pollitem_t& pollitem) auto remaining_time = timeout_ > elapsed ? timeout_ - elapsed : std::chrono::milliseconds(0); auto interval = std::min(remaining_time, std::chrono::milliseconds(1000)); - if (zmq::poll(&pollitem, 1, interval)) { + int rc = retry_on_eintr([&] { return zmq_poll(&pollitem, 1, static_cast(interval.count())); }); + if (rc > 0) { return true; } @@ -294,8 +308,8 @@ bool Transceiver::send(const json::value& j) std::string jstr = j.dumps(); zmq::message_t msg(jstr.data(), jstr.size()); - bool sent = zmq_sock_.send(std::move(msg), zmq::send_flags::dontwait).has_value(); - if (!sent) { + int rc = retry_on_eintr([&] { return zmq_msg_send(msg.handle(), zmq_sock_.handle(), ZMQ_DONTWAIT); }); + if (rc < 0) { LogError << "failed to send msg" << VAR(j); return false; } @@ -313,8 +327,8 @@ std::optional Transceiver::recv() zmq::message_t msg; - auto size_opt = zmq_sock_.recv(msg, zmq::recv_flags::dontwait); - if (!size_opt || *size_opt == 0) { + int rc = retry_on_eintr([&] { return zmq_msg_recv(msg.handle(), zmq_sock_.handle(), ZMQ_DONTWAIT); }); + if (rc <= 0) { LogError << "failed to recv msg" << VAR(ipc_addr_); return std::nullopt; } @@ -355,16 +369,16 @@ std::string Transceiver::send_image(const cv::Mat& mat) } std::string jstr = json::value(header).dumps(); zmq::message_t header_msg(jstr.data(), jstr.size()); - bool sent = zmq_sock_.send(std::move(header_msg), zmq::send_flags::dontwait).has_value(); - if (!sent) { + int rc = retry_on_eintr([&] { return zmq_msg_send(header_msg.handle(), zmq_sock_.handle(), ZMQ_DONTWAIT); }); + if (rc < 0) { LogError << "failed to send header" << VAR(header) << VAR(ipc_addr_); return { }; } // send image data zmq::message_t img_msg(mat.data, mat.total() * mat.elemSize()); - sent = zmq_sock_.send(img_msg, zmq::send_flags::none).has_value(); - if (!sent) { + rc = retry_on_eintr([&] { return zmq_msg_send(img_msg.handle(), zmq_sock_.handle(), 0); }); + if (rc < 0) { LogError << "failed to send msg" << VAR(ipc_addr_); return { }; } @@ -392,16 +406,16 @@ std::string Transceiver::send_image_encoded(const ImageEncodedBuffer& encoded_da } std::string jstr = json::value(header).dumps(); zmq::message_t header_msg(jstr.data(), jstr.size()); - bool sent = zmq_sock_.send(std::move(header_msg), zmq::send_flags::dontwait).has_value(); - if (!sent) { + int rc = retry_on_eintr([&] { return zmq_msg_send(header_msg.handle(), zmq_sock_.handle(), ZMQ_DONTWAIT); }); + if (rc < 0) { LogError << "failed to send encoded header" << VAR(header) << VAR(ipc_addr_); return { }; } // send encoded image data zmq::message_t img_msg(encoded_data.data(), encoded_data.size()); - sent = zmq_sock_.send(img_msg, zmq::send_flags::none).has_value(); - if (!sent) { + rc = retry_on_eintr([&] { return zmq_msg_send(img_msg.handle(), zmq_sock_.handle(), 0); }); + if (rc < 0) { LogError << "failed to send encoded image data" << VAR(ipc_addr_); return { }; } @@ -453,8 +467,8 @@ void Transceiver::handle_image(const ImageHeader& header) std::unique_lock lock(socket_mutex_); zmq::message_t msg; - auto size_opt = zmq_sock_.recv(msg); - if (!size_opt || *size_opt == 0) { + int rc = retry_on_eintr([&] { return zmq_msg_recv(msg.handle(), zmq_sock_.handle(), 0); }); + if (rc <= 0) { LogError << "failed to recv msg" << VAR(ipc_addr_); return; } @@ -475,8 +489,8 @@ void Transceiver::handle_image_encoded(const ImageEncodedHeader& header) std::unique_lock lock(socket_mutex_); zmq::message_t msg; - auto size_opt = zmq_sock_.recv(msg); - if (!size_opt || *size_opt == 0) { + int rc = retry_on_eintr([&] { return zmq_msg_recv(msg.handle(), zmq_sock_.handle(), 0); }); + if (rc <= 0) { LogError << "failed to recv encoded image data" << VAR(ipc_addr_); return; }