Skip to content

Commit bb1a382

Browse files
authored
新增支持外部设置UDP Socket接收缓冲区 (#289)
1 parent 7302286 commit bb1a382

5 files changed

Lines changed: 192 additions & 7 deletions

File tree

src/Network/BufferSock.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*/
1010

1111
#include <assert.h>
12+
#include <limits>
1213
#include "BufferSock.h"
1314
#include "Util/logger.h"
1415
#include "Util/uv_errno.h"
@@ -609,14 +610,37 @@ class SocketRecvFromBuffer : public SocketRecvBuffer {
609610

610611
static constexpr auto kPacketCount = 32;
611612
static constexpr auto kBufferCapacity = 4 * 1024u;
613+
static constexpr auto kMaxTotalBufferBytes = 64 * 1024u * 1024u;
612614

613615
SocketRecvBuffer::Ptr SocketRecvBuffer::create(bool is_udp) {
616+
return create(is_udp, kPacketCount, kBufferCapacity);
617+
}
618+
619+
SocketRecvBuffer::Ptr SocketRecvBuffer::create(bool is_udp, size_t packet_count, size_t buffer_capacity) {
620+
packet_count = packet_count ? packet_count : kPacketCount;
621+
buffer_capacity = buffer_capacity ? buffer_capacity : kBufferCapacity;
622+
623+
auto use_default = false;
624+
if (packet_count < 1 || buffer_capacity < 2) {
625+
use_default = true;
626+
} else if (packet_count > (std::numeric_limits<size_t>::max)() / buffer_capacity) {
627+
use_default = true;
628+
} else if (packet_count * buffer_capacity > kMaxTotalBufferBytes) {
629+
use_default = true;
630+
}
631+
632+
if (use_default) {
633+
WarnL << "Invalid recv buffer config, fallback to defaults: packet_count="
634+
<< packet_count << ", buffer_capacity=" << buffer_capacity;
635+
packet_count = kPacketCount;
636+
buffer_capacity = kBufferCapacity;
637+
}
614638
#if defined(__linux) || defined(__linux__)
615639
if (is_udp) {
616-
return std::make_shared<SocketRecvmmsgBuffer>(kPacketCount, kBufferCapacity);
640+
return std::make_shared<SocketRecvmmsgBuffer>(packet_count, buffer_capacity);
617641
}
618642
#endif
619-
return std::make_shared<SocketRecvFromBuffer>(kPacketCount * kBufferCapacity);
643+
return std::make_shared<SocketRecvFromBuffer>(packet_count * buffer_capacity);
620644
}
621645

622646
} //toolkit

src/Network/BufferSock.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class SocketRecvBuffer {
8181
virtual struct sockaddr_storage &getAddress(size_t index) = 0;
8282

8383
static Ptr create(bool is_udp);
84+
static Ptr create(bool is_udp, size_t packet_count, size_t buffer_capacity);
8485
};
8586

8687
}

src/Network/Socket.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,13 @@ bool Socket::attachEvent(const SockNum::Ptr &sock) {
277277
// tcp客户端或udp [AUTO-TRANSLATED:00c16e7f]
278278
//TCP client or UDP
279279
auto read_buffer = _poller->getSharedBuffer(sock->type() == SockNum::Sock_UDP);
280+
if (sock->type() == SockNum::Sock_UDP) {
281+
LOCK_GUARD(_mtx_sock_fd);
282+
_udp_recv_buffer_frozen = true;
283+
if (_read_buffer) {
284+
read_buffer = _read_buffer;
285+
}
286+
}
280287
auto result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self, sock, read_buffer](int event) {
281288
auto strong_self = weak_self.lock();
282289
if (!strong_self) {
@@ -298,6 +305,10 @@ bool Socket::attachEvent(const SockNum::Ptr &sock) {
298305
}
299306
});
300307

308+
if (result == -1 && sock->type() == SockNum::Sock_UDP) {
309+
LOCK_GUARD(_mtx_sock_fd);
310+
_udp_recv_buffer_frozen = false;
311+
}
301312
return -1 != result;
302313
}
303314

@@ -321,7 +332,9 @@ ssize_t Socket::onRead(const SockNum::Ptr &sock, const SocketRecvBuffer::Ptr &bu
321332
if (sock->type() == SockNum::Sock_TCP) {
322333
emitErr(toSockException(err));
323334
} else {
324-
WarnL << "Recv err on udp socket[" << sock->rawFd() << "]: " << uv_strerror(err);
335+
if (!(err == UV_ECONNREFUSED && _ignore_udp_conn_refused.load(std::memory_order_acquire))) {
336+
WarnL << "Recv err on udp socket[" << sock->rawFd() << "]: " << uv_strerror(err);
337+
}
325338
}
326339
}
327340
return ret;
@@ -485,6 +498,7 @@ void Socket::closeSock(bool close_fd) {
485498
if (close_fd) {
486499
_err_emit = false;
487500
_sock_fd = nullptr;
501+
_udp_recv_buffer_frozen = false;
488502
} else if (_sock_fd) {
489503
_sock_fd->delEvent();
490504
}
@@ -703,6 +717,7 @@ void Socket::setSock(SockNum::Ptr sock) {
703717
SockUtil::get_sock_peer_addr(_sock_fd->rawFd(), _peer_addr);
704718
} else {
705719
_sock_fd = nullptr;
720+
_udp_recv_buffer_frozen = false;
706721
}
707722
}
708723

@@ -1021,6 +1036,25 @@ void Socket::setSendFlags(int flags) {
10211036
_sock_flags = flags;
10221037
}
10231038

1039+
bool Socket::setUdpRecvBuffer(const SocketRecvBuffer::Ptr &buffer) {
1040+
// This hook is setup-time only. UdpServer creation callbacks may run
1041+
// before the owner poller starts processing the fd, so the hard
1042+
// requirement here is "before fd creation/attach", not "already on
1043+
// poller thread". The customization itself is only honored for UDP
1044+
// sockets.
1045+
LOCK_GUARD(_mtx_sock_fd);
1046+
if (_sock_fd || _udp_recv_buffer_frozen) {
1047+
WarnL << "setUdpRecvBuffer must be called before the socket fd is created and UDP IO is attached";
1048+
return false;
1049+
}
1050+
_read_buffer = buffer;
1051+
return true;
1052+
}
1053+
1054+
void Socket::setIgnoreUdpConnRefused(bool ignore) {
1055+
_ignore_udp_conn_refused.store(ignore, std::memory_order_release);
1056+
}
1057+
10241058
///////////////SockSender///////////////////
10251059

10261060
SockSender &SockSender::operator<<(const char *buf) {

src/Network/Socket.h

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ namespace toolkit {
4545

4646
//默认的socket flags:不触发SIGPIPE,非阻塞发送 [AUTO-TRANSLATED:fefc4946]
4747
//Default socket flags: do not trigger SIGPIPE, non-blocking send
48-
#define SOCKET_DEFAULE_FLAGS (FLAG_NOSIGNAL | FLAG_DONTWAIT )
48+
#define SOCKET_DEFAULT_FLAGS (FLAG_NOSIGNAL | FLAG_DONTWAIT )
49+
// Backward compatibility alias for the legacy misspelled macro name.
50+
#ifndef SOCKET_DEFAULE_FLAGS
51+
#define SOCKET_DEFAULE_FLAGS SOCKET_DEFAULT_FLAGS
52+
#endif
4953

5054
//发送超时时间,如果在规定时间内一直没有发送数据成功,那么将触发onErr事件 [AUTO-TRANSLATED:9c5d8d87]
5155
//Send timeout time, if no data is sent successfully within the specified time, the onErr event will be triggered
@@ -649,7 +653,36 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
649653
650654
* [AUTO-TRANSLATED:2b11445c]
651655
*/
652-
void setSendFlags(int flags = SOCKET_DEFAULE_FLAGS);
656+
void setSendFlags(int flags = SOCKET_DEFAULT_FLAGS);
657+
658+
// Install a UDP-specific recv buffer before the socket starts receiving.
659+
// This is intended for setup-time tuning, not runtime reconfiguration
660+
// after IO callbacks are active.
661+
/**
662+
* Replace the UDP recv buffer before the socket fd is created/attached.
663+
* This is intended for setup-time customization of special UDP transports
664+
* and must not be used as a runtime reconfiguration hook.
665+
*
666+
* IMPORTANT: custom SocketRecvBuffer implementations must remain
667+
* compatible with the batched UDP receive path used by Socket::onRead().
668+
* For the active receive batch, both &buffer->getBuffer(0) and
669+
* &buffer->getAddress(0) are treated as pointers to contiguous arrays
670+
* that can be indexed up to count - 1, and the referenced storage must
671+
* remain valid for the duration of the receive operation.
672+
*
673+
* Passing a SocketRecvBuffer that does not satisfy this layout/lifetime
674+
* contract is unsupported and may lead to undefined behavior.
675+
*
676+
* @return Whether the configuration was accepted.
677+
*/
678+
bool setUdpRecvBuffer(const SocketRecvBuffer::Ptr &buffer);
679+
680+
// Suppress the UDP ECONNREFUSED read warning on sockets that intentionally
681+
// communicate with transient peers, such as QUIC sessions that may receive
682+
// a late ICMP port-unreachable after the peer has already closed. This is
683+
// a narrow transport-specific knob and should not be enabled casually by
684+
// ordinary upper-layer business code.
685+
void setIgnoreUdpConnRefused(bool ignore);
653686

654687
/**
655688
* 关闭套接字
@@ -734,11 +767,10 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
734767
ssize_t send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush = true);
735768
void connect_l(const std::string &url, uint16_t port, const onErrCB &con_cb_in, float timeout_sec, const std::string &local_ip, uint16_t local_port);
736769
bool fromSock_l(SockNum::Ptr sock);
737-
738770
private:
739771
// send socket时的flag [AUTO-TRANSLATED:e364a1bf]
740772
//Flag for sending socket
741-
int _sock_flags = SOCKET_DEFAULE_FLAGS;
773+
int _sock_flags = SOCKET_DEFAULT_FLAGS;
742774
// 最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数 [AUTO-TRANSLATED:3bd6dba3]
743775
//Maximum send buffer, in milliseconds, the time since the last send buffer was cleared cannot exceed this parameter
744776
uint32_t _max_send_buffer_ms = SEND_TIME_OUT_SEC * 1000;
@@ -823,6 +855,13 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
823855
//Object count statistics
824856
ObjectStatistic<Socket> _statistic;
825857

858+
// Optional per-socket recv path used by a small number of UDP transports.
859+
// This must be configured before the socket fd is created or any IO
860+
// callbacks are attached.
861+
SocketRecvBuffer::Ptr _read_buffer;
862+
bool _udp_recv_buffer_frozen = false;
863+
std::atomic<bool> _ignore_udp_conn_refused{false};
864+
826865
// 链接缓存地址,防止tcp reset 导致无法获取对端的地址 [AUTO-TRANSLATED:f8847463]
827866
//Connection cache address, to prevent TCP reset from causing the inability to obtain the peer's address
828867
struct sockaddr_storage _local_addr;
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
3+
*
4+
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
5+
*
6+
* Use of this source code is governed by MIT license that can be found in the
7+
* LICENSE file in the root of the source tree. All contributing project authors
8+
* may be found in the AUTHORS file in the root of the source tree.
9+
*/
10+
11+
#include <csignal>
12+
#include <iostream>
13+
#include <string>
14+
15+
#include "Network/Socket.h"
16+
#include "Thread/semaphore.h"
17+
#include "Util/logger.h"
18+
#include "Util/util.h"
19+
20+
using namespace std;
21+
using namespace toolkit;
22+
23+
int main() {
24+
Logger::Instance().add(std::make_shared<ConsoleChannel>());
25+
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
26+
27+
auto recv_sock = Socket::createSocket();
28+
auto send_sock = Socket::createSocket();
29+
if (!recv_sock || !send_sock) {
30+
cerr << "create socket failed" << endl;
31+
return 1;
32+
}
33+
34+
auto invalid_cfg_buffer = SocketRecvBuffer::create(true, 0, 1);
35+
if (!invalid_cfg_buffer) {
36+
cerr << "create invalid_cfg_buffer failed" << endl;
37+
return 2;
38+
}
39+
40+
if (!recv_sock->setUdpRecvBuffer(invalid_cfg_buffer)) {
41+
cerr << "setUdpRecvBuffer should succeed before fd creation" << endl;
42+
return 3;
43+
}
44+
45+
recv_sock->setIgnoreUdpConnRefused(true);
46+
47+
if (!recv_sock->bindUdpSock(0, "127.0.0.1")) {
48+
cerr << "bind recv socket failed" << endl;
49+
return 4;
50+
}
51+
if (!send_sock->bindUdpSock(0, "127.0.0.1")) {
52+
cerr << "bind send socket failed" << endl;
53+
return 5;
54+
}
55+
56+
if (recv_sock->setUdpRecvBuffer(SocketRecvBuffer::create(true, 1, 4096))) {
57+
cerr << "setUdpRecvBuffer should fail after fd creation" << endl;
58+
return 6;
59+
}
60+
61+
semaphore sem;
62+
string received;
63+
recv_sock->setOnRead([&](const Buffer::Ptr &buf, struct sockaddr *, int) {
64+
received.assign(buf ? buf->data() : "", buf ? buf->size() : 0);
65+
sem.post();
66+
});
67+
68+
auto dst = SockUtil::make_sockaddr("127.0.0.1", recv_sock->get_local_port());
69+
const string payload = "udp-buffer-config-smoke";
70+
if (send_sock->send(payload, reinterpret_cast<struct sockaddr *>(&dst)) <= 0) {
71+
cerr << "send payload failed" << endl;
72+
return 7;
73+
}
74+
75+
if (!sem.wait(3000)) {
76+
cerr << "recv timeout" << endl;
77+
return 8;
78+
}
79+
80+
if (received != payload) {
81+
cerr << "unexpected payload: " << received << endl;
82+
return 9;
83+
}
84+
85+
cout << "udp socket buffer config regression passed" << endl;
86+
return 0;
87+
}

0 commit comments

Comments
 (0)