Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions source/AgentCommon/Transceiver.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "MaaAgent/Transceiver.h"

#include <algorithm>
#include <cerrno>
#include <cctype>
#include <cstdlib>
#include <format>
Expand All @@ -22,6 +23,17 @@

MAA_AGENT_NS_BEGIN

template <typename Func>
static int retry_on_eintr(Func&& func)
{
int rc;
int retries = 0;
do {
rc = func();
} while (rc < 0 && zmq_errno() == EINTR && ++retries < 10);
return rc;
}

Transceiver::~Transceiver()
{
LogFunc;
Expand Down Expand Up @@ -249,7 +261,8 @@ void Transceiver::uninit_socket()
bool Transceiver::alive()
{
std::unique_lock lock(socket_mutex_);
return zmq_sock_.handle() != nullptr && zmq::detail::poll(&zmq_pollitem_send_, 1, 0);
return zmq_sock_.handle() != nullptr
&& retry_on_eintr([&] { return zmq_poll(&zmq_pollitem_send_, 1, 0); }) > 0;
}

void Transceiver::set_timeout(const std::chrono::milliseconds& timeout)
Expand All @@ -267,7 +280,8 @@ bool Transceiver::poll(zmq::pollitem_t& pollitem)
auto remaining_time = timeout_ > elapsed ? timeout_ - elapsed : std::chrono::milliseconds(0);
auto interval = std::min(remaining_time, std::chrono::milliseconds(1000));

if (zmq::poll(&pollitem, 1, interval)) {
int rc = retry_on_eintr([&] { return zmq_poll(&pollitem, 1, static_cast<long>(interval.count())); });
if (rc > 0) {
return true;
}

Expand All @@ -294,8 +308,8 @@ bool Transceiver::send(const json::value& j)
std::string jstr = j.dumps();
zmq::message_t msg(jstr.data(), jstr.size());

bool sent = zmq_sock_.send(std::move(msg), zmq::send_flags::dontwait).has_value();
if (!sent) {
int rc = retry_on_eintr([&] { return zmq_msg_send(msg.handle(), zmq_sock_.handle(), ZMQ_DONTWAIT); });
if (rc < 0) {
LogError << "failed to send msg" << VAR(j);
return false;
}
Expand All @@ -313,8 +327,8 @@ std::optional<json::value> Transceiver::recv()

zmq::message_t msg;

auto size_opt = zmq_sock_.recv(msg, zmq::recv_flags::dontwait);
if (!size_opt || *size_opt == 0) {
int rc = retry_on_eintr([&] { return zmq_msg_recv(msg.handle(), zmq_sock_.handle(), ZMQ_DONTWAIT); });
if (rc <= 0) {
LogError << "failed to recv msg" << VAR(ipc_addr_);
return std::nullopt;
}
Expand Down Expand Up @@ -355,16 +369,16 @@ std::string Transceiver::send_image(const cv::Mat& mat)
}
std::string jstr = json::value(header).dumps();
zmq::message_t header_msg(jstr.data(), jstr.size());
bool sent = zmq_sock_.send(std::move(header_msg), zmq::send_flags::dontwait).has_value();
if (!sent) {
int rc = retry_on_eintr([&] { return zmq_msg_send(header_msg.handle(), zmq_sock_.handle(), ZMQ_DONTWAIT); });
if (rc < 0) {
LogError << "failed to send header" << VAR(header) << VAR(ipc_addr_);
return { };
}

// send image data
zmq::message_t img_msg(mat.data, mat.total() * mat.elemSize());
sent = zmq_sock_.send(img_msg, zmq::send_flags::none).has_value();
if (!sent) {
rc = retry_on_eintr([&] { return zmq_msg_send(img_msg.handle(), zmq_sock_.handle(), 0); });
if (rc < 0) {
LogError << "failed to send msg" << VAR(ipc_addr_);
return { };
}
Expand Down Expand Up @@ -392,16 +406,16 @@ std::string Transceiver::send_image_encoded(const ImageEncodedBuffer& encoded_da
}
std::string jstr = json::value(header).dumps();
zmq::message_t header_msg(jstr.data(), jstr.size());
bool sent = zmq_sock_.send(std::move(header_msg), zmq::send_flags::dontwait).has_value();
if (!sent) {
int rc = retry_on_eintr([&] { return zmq_msg_send(header_msg.handle(), zmq_sock_.handle(), ZMQ_DONTWAIT); });
if (rc < 0) {
LogError << "failed to send encoded header" << VAR(header) << VAR(ipc_addr_);
return { };
}

// send encoded image data
zmq::message_t img_msg(encoded_data.data(), encoded_data.size());
sent = zmq_sock_.send(img_msg, zmq::send_flags::none).has_value();
if (!sent) {
rc = retry_on_eintr([&] { return zmq_msg_send(img_msg.handle(), zmq_sock_.handle(), 0); });
if (rc < 0) {
LogError << "failed to send encoded image data" << VAR(ipc_addr_);
return { };
}
Expand Down Expand Up @@ -453,8 +467,8 @@ void Transceiver::handle_image(const ImageHeader& header)
std::unique_lock lock(socket_mutex_);

zmq::message_t msg;
auto size_opt = zmq_sock_.recv(msg);
if (!size_opt || *size_opt == 0) {
int rc = retry_on_eintr([&] { return zmq_msg_recv(msg.handle(), zmq_sock_.handle(), 0); });
if (rc <= 0) {
LogError << "failed to recv msg" << VAR(ipc_addr_);
return;
}
Expand All @@ -475,8 +489,8 @@ void Transceiver::handle_image_encoded(const ImageEncodedHeader& header)
std::unique_lock lock(socket_mutex_);

zmq::message_t msg;
auto size_opt = zmq_sock_.recv(msg);
if (!size_opt || *size_opt == 0) {
int rc = retry_on_eintr([&] { return zmq_msg_recv(msg.handle(), zmq_sock_.handle(), 0); });
if (rc <= 0) {
LogError << "failed to recv encoded image data" << VAR(ipc_addr_);
return;
}
Expand Down
Loading