Skip to content

Commit ddcb81e

Browse files
committed
Fix broker disconnect to better handle SIGPIPE
1 parent 81a495e commit ddcb81e

3 files changed

Lines changed: 332 additions & 0 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,5 @@ src/mqtt_broker
128128

129129
tests/fuzz/broker_fuzz
130130
tests/fuzz/corpus/
131+
tests/test_broker_connect
132+
tests/unit_tests

src/mqtt_broker.c

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ static void MqttBroker_ForceZero(void* mem, word32 len)
6363
#include <arpa/inet.h>
6464
#include <fcntl.h>
6565
#include <netinet/in.h>
66+
#include <signal.h>
6667
#include <sys/select.h>
6768
#include <sys/socket.h>
6869
#include <time.h>
@@ -631,6 +632,9 @@ static int BrokerPosix_Accept(void* ctx, BROKER_SOCKET_T listen_sock,
631632
BROKER_SOCKET_T* client_sock)
632633
{
633634
BROKER_SOCKET_T fd;
635+
#ifdef SO_NOSIGPIPE
636+
int on = 1;
637+
#endif
634638
(void)ctx;
635639

636640
fd = accept(listen_sock, NULL, NULL);
@@ -644,6 +648,14 @@ static int BrokerPosix_Accept(void* ctx, BROKER_SOCKET_T listen_sock,
644648
close(fd);
645649
return MQTT_CODE_ERROR_SYSTEM;
646650
}
651+
#ifdef SO_NOSIGPIPE
652+
/* macOS / BSDs: suppress SIGPIPE on writes to a peer-closed socket.
653+
* Without this (and without MSG_NOSIGNAL in send()), a client that
654+
* publishes QoS>0 and immediately closes its socket would cause the
655+
* broker's PUBACK/PUBREC write to deliver SIGPIPE, terminating the
656+
* broker. Linux uses MSG_NOSIGNAL in BrokerPosix_Write instead. */
657+
(void)setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on));
658+
#endif
647659
*client_sock = fd;
648660
return MQTT_CODE_SUCCESS;
649661
}
@@ -714,7 +726,16 @@ static int BrokerPosix_Write(void* ctx, BROKER_SOCKET_T sock,
714726
return MQTT_CODE_ERROR_NETWORK;
715727
}
716728

729+
/* MSG_NOSIGNAL (Linux/BSDs that define it) prevents SIGPIPE delivery when
730+
* the peer has already closed the connection - the syscall just returns
731+
* EPIPE and we treat it as a normal network error. Platforms without
732+
* MSG_NOSIGNAL (e.g. macOS) rely on the SO_NOSIGPIPE socket option set
733+
* in BrokerPosix_Accept. */
734+
#ifdef MSG_NOSIGNAL
735+
rc = (int)send(sock, buf, (size_t)buf_len, MSG_NOSIGNAL);
736+
#else
717737
rc = (int)send(sock, buf, (size_t)buf_len, 0);
738+
#endif
718739
if (rc <= 0) {
719740
if (rc < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
720741
return MQTT_CODE_CONTINUE;
@@ -737,9 +758,28 @@ static int BrokerPosix_Close(void* ctx, BROKER_SOCKET_T sock)
737758

738759
int MqttBrokerNet_Init(MqttBrokerNet* net)
739760
{
761+
#ifdef SIGPIPE
762+
struct sigaction sa;
763+
#endif
740764
if (net == NULL) {
741765
return MQTT_CODE_ERROR_BAD_ARG;
742766
}
767+
#ifdef SIGPIPE
768+
/* Backstop the per-send (MSG_NOSIGNAL) and per-socket (SO_NOSIGPIPE)
769+
* suppression so a write to a peer-closed socket cannot terminate the
770+
* process on platforms that define neither, or if the SO_NOSIGPIPE
771+
* setsockopt in BrokerPosix_Accept fails. Only install the ignore when
772+
* SIGPIPE is still at its default disposition, so we never clobber a
773+
* SIGPIPE handler the embedding application has already chosen. The
774+
* targeted mechanisms remain primary so send() still returns EPIPE for
775+
* clean client teardown. */
776+
if (sigaction(SIGPIPE, NULL, &sa) == 0 && sa.sa_handler == SIG_DFL) {
777+
sa.sa_handler = SIG_IGN;
778+
sigemptyset(&sa.sa_mask);
779+
sa.sa_flags = 0;
780+
(void)sigaction(SIGPIPE, &sa, NULL);
781+
}
782+
#endif
743783
XMEMSET(net, 0, sizeof(*net));
744784
net->listen = BrokerPosix_Listen;
745785
net->accept = BrokerPosix_Accept;
@@ -4906,6 +4946,13 @@ int wolfmqtt_broker(int argc, char** argv)
49064946
g_broker_shutdown = 0;
49074947
signal(SIGINT, broker_signal_handler);
49084948
signal(SIGTERM, broker_signal_handler);
4949+
/* Belt-and-suspenders for the SIGPIPE-on-peer-close path. The socket
4950+
* layer already uses MSG_NOSIGNAL / SO_NOSIGPIPE per platform, but
4951+
* ignore SIGPIPE process-wide too so any reused or custom net callback
4952+
* cannot kill the broker on a write to a closed peer. */
4953+
#ifdef SIGPIPE
4954+
signal(SIGPIPE, SIG_IGN);
4955+
#endif
49094956

49104957
rc = MqttBroker_Start(&broker);
49114958
if (rc == MQTT_CODE_SUCCESS) {

0 commit comments

Comments
 (0)