Skip to content

Commit 9581ca5

Browse files
authored
Merge pull request #84 from sssooonnnggg/master
fix: uv_async_init thread safe issue
2 parents db8d57f + 848fcb9 commit 9581ca5

2 files changed

Lines changed: 52 additions & 31 deletions

File tree

emmy_debugger/include/emmy_debugger/transporter/transporter.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,17 @@
1616
#pragma once
1717

1818
#include <thread>
19+
#include <mutex>
20+
#include <queue>
1921
#include "uv.h"
2022
#include "nlohmann/json_fwd.hpp"
2123

24+
struct WriteRequest {
25+
uv_write_t req;
26+
uv_buf_t buf;
27+
uv_stream_t* handler;
28+
};
29+
2230
class EmmyFacade;
2331

2432
enum class MessageCMD {
@@ -62,6 +70,13 @@ class Transporter {
6270
bool running;
6371
bool connected;
6472
bool serverMode;
73+
74+
uv_async_t writeAsync;
75+
std::mutex writeMtx;
76+
std::queue<WriteRequest*> writeQueue;
77+
78+
static void OnAsyncWrite(uv_async_t* handle);
79+
static void AfterWrite(uv_write_t* req, int status);
6580
protected:
6681
uv_loop_t* loop;
6782
public:

emmy_debugger/src/transporter/transporter.cpp

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ Transporter::Transporter(bool server):
2828
loop = uv_loop_new();
2929
bufSize = 10 * 1024;
3030
buf = static_cast<char*>(malloc(bufSize));
31+
32+
writeAsync.data = this;
33+
uv_async_init(loop, &writeAsync, OnAsyncWrite);
3134
}
3235

3336
Transporter::~Transporter()
@@ -152,30 +155,26 @@ bool Transporter::IsServerMode() const
152155
////////////////////////////////////////////////////////////////////////////////
153156
// send data
154157

155-
typedef struct
156-
{
157-
uv_write_t req;
158-
uv_buf_t buf;
159-
uv_stream_t* handler;
160-
} write_req_t;
161-
162-
static void after_write(uv_write_t* req, int status)
158+
void Transporter::AfterWrite(uv_write_t* req, int status)
163159
{
164-
const auto* writeReq = reinterpret_cast<write_req_t*>(req);
160+
auto* writeReq = reinterpret_cast<WriteRequest*>(req);
165161
free(writeReq->buf.base);
166162
delete writeReq;
167163
}
168164

169-
static void after_async(uv_handle_t* h)
170-
{
171-
delete h;
172-
}
173-
174-
static void async_write(uv_async_t* h)
165+
void Transporter::OnAsyncWrite(uv_async_t* handle)
175166
{
176-
auto* writeReq = (write_req_t*)h->data;
177-
uv_write(&writeReq->req, writeReq->handler, &writeReq->buf, 1, after_write);
178-
uv_close((uv_handle_t*)h, after_async);
167+
auto* self = static_cast<Transporter*>(handle->data);
168+
std::queue<WriteRequest*> pending;
169+
{
170+
std::lock_guard<std::mutex> lock(self->writeMtx);
171+
std::swap(pending, self->writeQueue);
172+
}
173+
while (!pending.empty()) {
174+
auto* writeReq = pending.front();
175+
pending.pop();
176+
uv_write(&writeReq->req, writeReq->handler, &writeReq->buf, 1, AfterWrite);
177+
}
179178
}
180179

181180
void Transporter::Send(uv_stream_t* handler, int cmd, const char* data, size_t len)
@@ -184,7 +183,7 @@ void Transporter::Send(uv_stream_t* handler, int cmd, const char* data, size_t l
184183
{
185184
return;
186185
}
187-
auto* writeReq = new write_req_t();
186+
auto* writeReq = new WriteRequest();
188187
char cmdValue[100];
189188
const int l1 = sprintf(cmdValue, "%d\n", cmd);
190189
const size_t newLen = len + l1 + 1;
@@ -197,11 +196,11 @@ void Transporter::Send(uv_stream_t* handler, int cmd, const char* data, size_t l
197196
writeReq->buf = uv_buf_init(newData, newLen);
198197
writeReq->handler = handler;
199198

200-
// thread safe:
201-
auto* async = new uv_async_t;
202-
async->data = writeReq;
203-
uv_async_init(loop, async, async_write);
204-
uv_async_send(async);
199+
{
200+
std::lock_guard<std::mutex> lock(writeMtx);
201+
writeQueue.push(writeReq);
202+
}
203+
uv_async_send(&writeAsync);
205204
}
206205

207206
void Transporter::Send(uv_stream_t* handler, const char* data, size_t len)
@@ -210,18 +209,17 @@ void Transporter::Send(uv_stream_t* handler, const char* data, size_t len)
210209
{
211210
return;
212211
}
213-
auto* writeReq = new write_req_t();
212+
auto* writeReq = new WriteRequest();
214213
char* newData = static_cast<char*>(malloc(len));
215-
216214
memcpy(newData, data, len);
217215
writeReq->buf = uv_buf_init(newData, len);
218216
writeReq->handler = handler;
219217

220-
// thread safe:
221-
auto* async = new uv_async_t;
222-
async->data = writeReq;
223-
uv_async_init(loop, async, async_write);
224-
uv_async_send(async);
218+
{
219+
std::lock_guard<std::mutex> lock(writeMtx);
220+
writeQueue.push(writeReq);
221+
}
222+
uv_async_send(&writeAsync);
225223
}
226224

227225
void Transporter::StartEventLoop()
@@ -242,6 +240,14 @@ void Transporter::Run()
242240
int Transporter::Stop()
243241
{
244242
running = false;
243+
// 清理未发送的写请求
244+
std::lock_guard<std::mutex> lock(writeMtx);
245+
while (!writeQueue.empty()) {
246+
auto* req = writeQueue.front();
247+
writeQueue.pop();
248+
free(req->buf.base);
249+
delete req;
250+
}
245251
return 0;
246252
}
247253

0 commit comments

Comments
 (0)