Prerequisites
Bug Description
I'm using the paho-mqtt Python library, and I'm encountering a critical issue with its background thread after network recovery. Here's the scenario I'm testing:
Test Steps:
- Start a simple MQTT client using loop_start() / loop_stop().
- Simulate a network outage for the client process for around 15 minutes.
- Restore the network connection.
To simulate the network outage, I used a script (generated by ChatGPT) that isolates the app process by modifying iptables rules to block all outbound traffic for that process.
Script
#!/bin/bash
ACTION=$1
PID=$(pgrep -f 'python -m tests.worker.mqtt.test_safe_mqtt_client' | head -n 1)
CGROUP_NAME="netblock"
CGROUP_PATH="/sys/fs/cgroup/net_cls/$CGROUP_NAME"
CLASSID_HEX="0x100001"
INTERFACE=$(ip route | grep default | awk '{print $5}')
if [ -z "$ACTION" ] || [ -z "$PID" ]; then
echo "Usage: $0 [enable|disable] <PID>"
exit 1
fi
function cleanup_cgroup {
if [ -f "$CGROUP_PATH/cgroup.procs" ]; then
echo 0 | sudo tee "$CGROUP_PATH/net_cls.classid" > /dev/null
echo "$PID" | sudo tee /sys/fs/cgroup/net_cls/cgroup.procs > /dev/null
fi
sudo cgdelete -g net_cls:/$CGROUP_NAME 2>/dev/null
}
case "$ACTION" in
enable)
echo "[*] Blocking network for PID $PID..."
# create cgroup
sudo cgcreate -g net_cls:/$CGROUP_NAME
# Set classid
echo $CLASSID_HEX | sudo tee $CGROUP_PATH/net_cls.classid > /dev/null
# Add process into cgroup
sudo cgclassify -g net_cls:/$CGROUP_NAME $PID
# config tc
sudo tc qdisc show dev $INTERFACE | grep -q "htb" || {
sudo tc qdisc add dev $INTERFACE root handle 1: htb
sudo tc class add dev $INTERFACE parent 1: classid 1:1 htb rate 1000Mbps
sudo tc filter add dev $INTERFACE parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1
}
# Block by iptables
sudo iptables -t mangle -A OUTPUT -m cgroup --cgroup $CLASSID_HEX -j DROP
sudo iptables -t mangle -A INPUT -m cgroup --cgroup $CLASSID_HEX -j DROP
echo "[+] Network blocked for PID $PID"
;;
disable)
echo "[*] Restoring network for PID $PID..."
# Delete iptables
sudo iptables -t mangle -D OUTPUT -m cgroup --cgroup $CLASSID_HEX -j DROP 2>/dev/null
sudo iptables -t mangle -D INPUT -m cgroup --cgroup $CLASSID_HEX -j DROP 2>/dev/null
# Delete tc
sudo tc qdisc del dev $INTERFACE root 2>/dev/null
# Remove process from cgroup and delete cgroup
cleanup_cgroup
echo "[+] Network restored for PID $PID"
;;
*)
echo "Invalid action: $ACTION"
echo "Usage: $0 [enable|disable] <PID>"
exit 1
;;
esac
Behavior:
After restoring the network:
- The client successfully reconnects to the MQTT server.
- The on_connect callback is triggered, and inside that callback, I call client.subscribe(...).
This results in a BrokenPipeError exception inside the Paho MQTT thread, which causes the thread to crash silently.
Final State of the app:
- The Paho MQTT background thread is no longer running.
- client.publish(...) calls do not raise errors, but messages are not delivered.
2025-06-23 23:18:50,035 - __main__ - INFO - Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_test_safe_mqtt_client' properties=[SessionExpiryInterval : 120]
2025-06-23 23:18:50,035 - __main__ - INFO - Connection failed, retrying
2025-06-23 23:18:50,233 - __main__ - INFO - Publishing message {'timestamp': '2025-06-23T23:18:50.233779'}, thread 134276862379072
2025-06-23 23:18:50,371 - __main__ - INFO - Received CONNACK (0, Success) properties=[ReceiveMaximum : 10, TopicAliasMaximum : 10]
2025-06-23 23:18:50,372 - __main__ - INFO - Callback on_connect, thread 134276835735104
2025-06-23 23:18:50,372 - __main__ - INFO - Connected to MQTT broker mqtt://test.mosquitto.org:1883 with reason code Success, flags: ConnectFlags(session_present=False), properties: [ReceiveMaximum : 10, TopicAliasMaximum : 10]
2025-06-23 23:18:50,372 - __main__ - INFO - Broker created a new session for this client
2025-06-23 23:18:50,372 - __main__ - INFO - Sending SUBSCRIBE (d0, m2551) [(b'command', {QoS=1, noLocal=False, retainAsPublished=False, retainHandling=0})]
2025-06-23 23:18:50,372 - __main__ - INFO - Caught exception in on_connect: [Errno 32] Broken pipe
Exception in thread paho-mqtt-client-client_test_safe_mqtt_client:
Traceback (most recent call last):
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 4523, in _thread_main
self.loop_forever(retry_first_connection=True)
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2297, in loop_forever
rc = self._loop(timeout)
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 1686, in _loop
rc = self.loop_read()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2100, in loop_read
rc = self._packet_read()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3142, in _packet_read
rc = self._packet_handle()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3814, in _packet_handle
return self._handle_connack()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3934, in _handle_connack
on_connect(
File "/home/user/workspace/surveillance/backend-services/tests/worker/mqtt/test_safe_mqtt_client2.py", line 128, in _on_connect
client.subscribe(list_topics) # exception HERE
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2038, in subscribe
return self._send_subscribe(False, topic_qos_list, properties) # ----- SUBSCRIBE -----
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3652, in _send_subscribe
return (self._packet_queue(command, packet, local_mid, 1), local_mid)
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3782, in _packet_queue
self._sockpairW.send(sockpair_data)
BrokenPipeError: [Errno 32] Broken pipe
Case 2:
If I remove the client.subscribe(...) call from on_connect, I get a different exception inside the MQTT thread, but the outcome is the same: the thread dies.
2025-06-20 09:08:44,719 - INFO - safe_mqtt_client.py - Connected to MQTT broker mqtt://10.124.1.39:1883 with reason code Success, flags: ConnectFlags(session_present=False), properties: [ReceiveMaximum : 32, TopicAliasMaximum : 65535, RetainAvailable : 1, MaximumPacketSize : 1048576, WildcardSubscriptionAvailable : 1, SubscriptionIdentifierAvailable : 1, SharedSubscriptionAvailable : 1]
2025-06-20 09:08:44,719 - INFO - safe_mqtt_client.py - Broker created a new session for this client
2025-06-20 09:08:44,719 - ERROR - safe_mqtt_client.py - Failed to subscribe to topics: [Errno 32] Broken pipe
Exception in thread paho-mqtt-client-client_test_safe_mqtt_client:
Traceback (most recent call last):
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 4523, in _thread_main
self.loop_forever(retry_first_connection=True)
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2297, in loop_forever
rc = self._loop(timeout)
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 1686, in _loop
rc = self.loop_read()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2100, in loop_read
rc = self._packet_read()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3142, in _packet_read
rc = self._packet_handle()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3814, in _packet_handle
return self._handle_connack()
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3976, in _handle_connack
rc = self._send_publish( # ----- PUBLISH -----
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3444, in _send_publish
return self._packet_queue(PUBLISH, packet, mid, qos, info)
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3782, in _packet_queue
self._sockpairW.send(sockpair_data)
BrokenPipeError: [Errno 32] Broken pipe
Concern:
I feel unsafe. When the MQTT thread crashes:
- There's no built-in health check or recovery mechanism for the MQTT loop thread.
- The client continues to accept publish calls, but they silently fail because the internal thread died
Environment
- Python version: 3.10
- Library version: 2.1.0
- Operating system (including version): Ubuntu 22.04
- MQTT server (name, version, configuration, hosting details): image emqx/emqx:5.8.6
Prerequisites
to confirm the issue is specific to this client.
Bug Description
I'm using the paho-mqtt Python library, and I'm encountering a critical issue with its background thread after network recovery. Here's the scenario I'm testing:
Test Steps:
To simulate the network outage, I used a script (generated by ChatGPT) that isolates the app process by modifying iptables rules to block all outbound traffic for that process.
Script
Behavior:
After restoring the network:
This results in a BrokenPipeError exception inside the Paho MQTT thread, which causes the thread to crash silently.
Final State of the app:
Case 2:
If I remove the client.subscribe(...) call from on_connect, I get a different exception inside the MQTT thread, but the outcome is the same: the thread dies.
Concern:
I feel unsafe. When the MQTT thread crashes:
Environment