Skip to content

Commit 4949bf6

Browse files
committed
fix: apply suggestions for evpp dir from ai code review
1 parent 9536fa1 commit 4949bf6

10 files changed

Lines changed: 82 additions & 38 deletions

evpp/Channel.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
namespace hv {
1515

16+
// Channel is a loop-bound wrapper around hio_t.
17+
// The Channel address is stored in hio_context(io), so the object lifetime must cover all pending hio callbacks.
1618
class Channel {
1719
public:
1820
Channel(hio_t* io = NULL) {

evpp/EventLoop.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
namespace hv {
1717

18+
// EventLoop is a loop-bound wrapper around hloop_t.
19+
// When constructed with an external hloop_t, the caller remains responsible for that loop's lifetime.
1820
class EventLoop : public Status {
1921
public:
2022

@@ -104,6 +106,7 @@ class EventLoop : public Status {
104106

105107
// setTimerInLoop thread-safe
106108
TimerID setTimerInLoop(int timeout_ms, TimerCallback cb, uint32_t repeat = INFINITE, TimerID timerID = INVALID_TIMER_ID) {
109+
if (loop_ == NULL) return INVALID_TIMER_ID;
107110
if (timerID == INVALID_TIMER_ID) {
108111
timerID = generateTimerID();
109112
}

evpp/EventLoopThread.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
namespace hv {
1111

12+
// EventLoopThread owns a background thread running one EventLoop.
1213
class EventLoopThread : public Status {
1314
public:
1415
// Return 0 means OK, other failed.

evpp/TcpClient.h

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
namespace hv {
1212

1313
template<class TSocketChannel = SocketChannel>
14+
// TcpClientEventLoopTmpl is a loop-bound wrapper around one outbound connection.
15+
// When bound to an external EventLoopPtr, the caller must ensure the object is stopped and destroyed on the owner loop.
16+
// For long-lived async usage, prefer heap allocation and use stop()/closesocket()/deleteInLoop() as the controlled teardown path.
1417
class TcpClientEventLoopTmpl {
1518
public:
1619
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
@@ -23,9 +26,11 @@ class TcpClientEventLoopTmpl {
2326
tls_setting = NULL;
2427
reconn_setting = NULL;
2528
unpack_setting = NULL;
29+
reconn_timer_id = INVALID_TIMER_ID;
2630
}
2731

2832
virtual ~TcpClientEventLoopTmpl() {
33+
cancelReconnectTimer();
2934
HV_FREE(tls_setting);
3035
HV_FREE(reconn_setting);
3136
HV_FREE(unpack_setting);
@@ -36,6 +41,7 @@ class TcpClientEventLoopTmpl {
3641
}
3742

3843
// delete thread-safe
44+
// NOTE: This is intended for heap objects that need to be destroyed on the owner loop.
3945
void deleteInLoop() {
4046
loop_->runInLoop([this](){
4147
delete this;
@@ -104,6 +110,7 @@ class TcpClientEventLoopTmpl {
104110
}
105111

106112
int startConnect() {
113+
loop_->assertInLoopThread();
107114
if (channel == NULL || channel->isClosed()) {
108115
int connfd = -1;
109116
if (reconn_setting && reconn_setting->cur_retry_cnt > 1) {
@@ -172,12 +179,15 @@ class TcpClientEventLoopTmpl {
172179
}
173180

174181
int startReconnect() {
182+
loop_->assertInLoopThread();
175183
if (!reconn_setting) return -1;
176184
if (!reconn_setting_can_retry(reconn_setting)) return -2;
177185
uint32_t delay = reconn_setting_calc_delay(reconn_setting);
178186
hlogi("reconnect... cnt=%d, delay=%d", reconn_setting->cur_retry_cnt, reconn_setting->cur_delay);
179-
loop_->setTimeout(delay, [this](TimerID timerID){
180-
(void)(timerID);
187+
reconn_timer_id = loop_->setTimeout(delay, [this](TimerID timerID){
188+
if (reconn_timer_id == timerID) {
189+
reconn_timer_id = INVALID_TIMER_ID;
190+
}
181191
startConnect();
182192
});
183193
return 0;
@@ -223,6 +233,7 @@ class TcpClientEventLoopTmpl {
223233

224234
void setReconnect(reconn_setting_t* setting) {
225235
if (setting == NULL) {
236+
cancelReconnectTimer();
226237
HV_FREE(reconn_setting);
227238
return;
228239
}
@@ -265,7 +276,16 @@ class TcpClientEventLoopTmpl {
265276
std::function<void(const TSocketChannelPtr&, Buffer*)> onWriteComplete;
266277

267278
private:
268-
EventLoopPtr loop_;
279+
void cancelReconnectTimer() {
280+
if (reconn_timer_id != INVALID_TIMER_ID) {
281+
loop_->killTimer(reconn_timer_id);
282+
reconn_timer_id = INVALID_TIMER_ID;
283+
}
284+
}
285+
286+
private:
287+
EventLoopPtr loop_;
288+
TimerID reconn_timer_id;
269289
};
270290

271291
template<class TSocketChannel = SocketChannel>
@@ -297,6 +317,7 @@ class TcpClientTmpl : private EventLoopThread, public TcpClientEventLoopTmpl<TSo
297317
}
298318

299319
// stop thread-safe
320+
// NOTE: When constructed with an external loop, this only closes the socket and does not stop that loop.
300321
void stop(bool wait_threads_stopped = true) {
301322
TcpClientEventLoopTmpl<TSocketChannel>::closesocket();
302323
if (is_loop_owner) {

evpp/TcpClient_test.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ int main(int argc, char* argv[]) {
2828
remote_host = argv[2];
2929
}
3030

31-
TcpClient cli;
32-
int connfd = cli.createsocket(remote_port, remote_host);
31+
auto cli = std::make_shared<TcpClient>();
32+
int connfd = cli->createsocket(remote_port, remote_host);
3333
if (connfd < 0) {
3434
return -20;
3535
}
3636
printf("client connect to port %d, connfd=%d ...\n", remote_port, connfd);
37-
cli.onConnection = [&cli](const SocketChannelPtr& channel) {
37+
cli->onConnection = [cli](const SocketChannelPtr& channel) {
3838
std::string peeraddr = channel->peeraddr();
3939
if (channel->isConnected()) {
4040
printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
@@ -54,11 +54,11 @@ int main(int argc, char* argv[]) {
5454
} else {
5555
printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());
5656
}
57-
if (cli.isReconnect()) {
58-
printf("reconnect cnt=%d, delay=%d\n", cli.reconn_setting->cur_retry_cnt, cli.reconn_setting->cur_delay);
57+
if (cli->isReconnect()) {
58+
printf("reconnect cnt=%d, delay=%d\n", cli->reconn_setting->cur_retry_cnt, cli->reconn_setting->cur_delay);
5959
}
6060
};
61-
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
61+
cli->onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
6262
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
6363
};
6464

@@ -69,27 +69,27 @@ int main(int argc, char* argv[]) {
6969
reconn.min_delay = 1000;
7070
reconn.max_delay = 10000;
7171
reconn.delay_policy = 2;
72-
cli.setReconnect(&reconn);
72+
cli->setReconnect(&reconn);
7373
#endif
7474

7575
#if TEST_TLS
76-
cli.withTLS();
76+
cli->withTLS();
7777
#endif
7878

79-
cli.start();
79+
cli->start();
8080

8181
std::string str;
8282
while (std::getline(std::cin, str)) {
8383
if (str == "close") {
84-
cli.closesocket();
84+
cli->closesocket();
8585
} else if (str == "start") {
86-
cli.start();
86+
cli->start();
8787
} else if (str == "stop") {
88-
cli.stop();
88+
cli->stop(true);
8989
break;
9090
} else {
91-
if (!cli.isConnected()) break;
92-
cli.send(str);
91+
if (!cli->isConnected()) break;
92+
cli->send(str);
9393
}
9494
}
9595

evpp/TcpServer.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
namespace hv {
1212

1313
template<class TSocketChannel = SocketChannel>
14+
// TcpServerEventLoopTmpl is a loop-bound wrapper around one listening socket and its accepted channels.
15+
// When an external EventLoopPtr is supplied, the caller remains responsible for owner-loop shutdown and destruction ordering.
1416
class TcpServerEventLoopTmpl {
1517
public:
1618
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
@@ -74,6 +76,7 @@ class TcpServerEventLoopTmpl {
7476
}
7577

7678
int startAccept() {
79+
acceptor_loop->assertInLoopThread();
7780
if (listenfd < 0) {
7881
listenfd = createsocket(port, host.c_str());
7982
if (listenfd < 0) {
@@ -101,6 +104,7 @@ class TcpServerEventLoopTmpl {
101104
}
102105

103106
int stopAccept() {
107+
acceptor_loop->assertInLoopThread();
104108
if (listenfd < 0) return -1;
105109
hloop_t* loop = acceptor_loop->loop();
106110
if (loop == NULL) return -2;
@@ -117,6 +121,7 @@ class TcpServerEventLoopTmpl {
117121
acceptor_loop->runInLoop(std::bind(&TcpServerEventLoopTmpl::startAccept, this));
118122
}
119123
// stop thread-safe
124+
// NOTE: When an external loop is supplied, this closes the listener but does not own that loop's lifetime.
120125
void stop(bool wait_threads_stopped = true) {
121126
closesocket();
122127
if (worker_threads.threadNum() > 0) {
@@ -173,6 +178,7 @@ class TcpServerEventLoopTmpl {
173178
return channels.size();
174179
}
175180

181+
// NOTE: fn is executed while holding mutex_, so it must stay short and must not call server APIs that may lock channels again.
176182
int foreachChannel(std::function<void(const TSocketChannelPtr& channel)> fn) {
177183
std::lock_guard<std::mutex> locker(mutex_);
178184
for (auto& pair : channels) {
@@ -194,16 +200,19 @@ class TcpServerEventLoopTmpl {
194200

195201
private:
196202
static void newConnEvent(hio_t* connio) {
203+
assert(connio != NULL);
197204
TcpServerEventLoopTmpl* server = (TcpServerEventLoopTmpl*)hevent_userdata(connio);
205+
assert(server != NULL);
206+
EventLoop* worker_loop = currentThreadEventLoop;
207+
assert(worker_loop != NULL);
198208
if (server->connectionNum() >= server->max_connections) {
209+
--worker_loop->connectionNum;
199210
hlogw("over max_connections");
200211
hio_close(connio);
201212
return;
202213
}
203214

204215
// NOTE: attach to worker loop
205-
EventLoop* worker_loop = currentThreadEventLoop;
206-
assert(worker_loop != NULL);
207216
hio_attach(worker_loop->loop(), connio);
208217

209218
const TSocketChannelPtr& channel = server->addChannel(connio);
@@ -229,7 +238,7 @@ class TcpServerEventLoopTmpl {
229238
server->onConnection(channel);
230239
}
231240
server->removeChannel(channel);
232-
// NOTE: After removeChannel, channel may be destroyed,
241+
// NOTE: After removeChannel, channel may be destroyed immediately,
233242
// so in this lambda function, no code should be added below.
234243
};
235244

evpp/UdpClient.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
namespace hv {
1010

1111
template<class TSocketChannel = SocketChannel>
12+
// UdpClientEventLoopTmpl is a loop-bound wrapper around one udp client socket.
13+
// When used with an external EventLoopPtr, the caller must stop receiving and destroy the object on the owner loop.
1214
class UdpClientEventLoopTmpl {
1315
public:
1416
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
@@ -72,6 +74,7 @@ class UdpClientEventLoopTmpl {
7274
}
7375

7476
int startRecv() {
77+
loop_->assertInLoopThread();
7578
if (channel == NULL || channel->isClosed()) {
7679
int sockfd = createsocket(remote_port, remote_host.c_str());
7780
if (sockfd < 0) {
@@ -179,6 +182,7 @@ class UdpClientTmpl : private EventLoopThread, public UdpClientEventLoopTmpl<TSo
179182
}
180183

181184
// stop thread-safe
185+
// NOTE: When constructed with an external loop, this closes the socket but does not stop that loop.
182186
void stop(bool wait_threads_stopped = true) {
183187
UdpClientEventLoopTmpl<TSocketChannel>::closesocket();
184188
if (is_loop_owner) {

evpp/UdpClient_test.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,36 +25,36 @@ int main(int argc, char* argv[]) {
2525
remote_host = argv[2];
2626
}
2727

28-
UdpClient cli;
29-
int sockfd = cli.createsocket(remote_port, remote_host);
28+
auto cli = std::make_shared<UdpClient>();
29+
int sockfd = cli->createsocket(remote_port, remote_host);
3030
if (sockfd < 0) {
3131
return -20;
3232
}
3333
printf("client sendto port %d, sockfd=%d ...\n", remote_port, sockfd);
34-
cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
34+
cli->onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
3535
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
3636
};
37-
cli.start();
37+
cli->start();
3838

3939
// sendto(time) every 3s
40-
cli.loop()->setInterval(3000, [&cli](TimerID timerID) {
40+
cli->loop()->setInterval(3000, [cli](TimerID timerID) {
4141
char str[DATETIME_FMT_BUFLEN] = {0};
4242
datetime_t dt = datetime_now();
4343
datetime_fmt(&dt, str);
44-
cli.sendto(str);
44+
cli->sendto(str);
4545
});
4646

4747
std::string str;
4848
while (std::getline(std::cin, str)) {
4949
if (str == "close") {
50-
cli.closesocket();
50+
cli->closesocket();
5151
} else if (str == "start") {
52-
cli.start();
52+
cli->start();
5353
} else if (str == "stop") {
54-
cli.stop();
54+
cli->stop(true);
5555
break;
5656
} else {
57-
cli.sendto(str);
57+
cli->sendto(str);
5858
}
5959
}
6060

evpp/UdpServer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
namespace hv {
1010

1111
template<class TSocketChannel = SocketChannel>
12+
// UdpServerEventLoopTmpl is a loop-bound wrapper around one udp server socket.
13+
// When used with an external EventLoopPtr, stop receiving first and destroy the object on the owner loop.
1214
class UdpServerEventLoopTmpl {
1315
public:
1416
typedef std::shared_ptr<TSocketChannel> TSocketChannelPtr;
@@ -48,6 +50,7 @@ class UdpServerEventLoopTmpl {
4850
}
4951

5052
int startRecv() {
53+
loop_->assertInLoopThread();
5154
if (channel == NULL || channel->isClosed()) {
5255
int bindfd = createsocket(port, host.c_str());
5356
if (bindfd < 0) {
@@ -153,6 +156,7 @@ class UdpServerTmpl : private EventLoopThread, public UdpServerEventLoopTmpl<TSo
153156
}
154157

155158
// stop thread-safe
159+
// NOTE: When constructed with an external loop, this closes the socket but does not stop that loop.
156160
void stop(bool wait_threads_stopped = true) {
157161
UdpServerEventLoopTmpl<TSocketChannel>::closesocket();
158162
if (is_loop_owner) {

evpp/UdpServer_test.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,30 @@ int main(int argc, char* argv[]) {
2020
}
2121
int port = atoi(argv[1]);
2222

23-
UdpServer srv;
24-
int bindfd = srv.createsocket(port);
23+
auto srv = std::make_shared<UdpServer>();
24+
int bindfd = srv->createsocket(port);
2525
if (bindfd < 0) {
2626
return -20;
2727
}
2828
printf("server bind on port %d, bindfd=%d ...\n", port, bindfd);
29-
srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
29+
srv->onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
3030
// echo
3131
printf("< %.*s\n", (int)buf->size(), (char*)buf->data());
3232
channel->write(buf);
3333
};
34-
srv.start();
34+
srv->start();
3535

3636
std::string str;
3737
while (std::getline(std::cin, str)) {
3838
if (str == "close") {
39-
srv.closesocket();
39+
srv->closesocket();
4040
} else if (str == "start") {
41-
srv.start();
41+
srv->start();
4242
} else if (str == "stop") {
43-
srv.stop();
43+
srv->stop(true);
4444
break;
4545
} else {
46-
srv.sendto(str);
46+
srv->sendto(str);
4747
}
4848
}
4949

0 commit comments

Comments
 (0)