Skip to content

Paho MQTT client thread crashes after long network disconnection #894

Description

@kietheros

Prerequisites

  • Test the latest release of the library.
  • Search existing issues.
  • Read the relevant documentation.
  • Review your server configuration and logs.
  • Consider testing against a different server (e.g. mqtt.eclipseprojects.io or test.mosquitto.org)
  • If possible, test using another tool (e.g. MQTTX / mosquitto_sub)
    to confirm the issue is specific to this client.
  • If you are unsure if you have found a bug, please consider asking on stackoverflow for a quicker response.

Bug Description

I'm using the paho-mqtt Python library, and I'm encountering a critical issue with its background thread after network recovery. Here's the scenario I'm testing:

Test Steps:

  • Start a simple MQTT client using loop_start() / loop_stop().
  • Simulate a network outage for the client process for around 15 minutes.
  • Restore the network connection.

To simulate the network outage, I used a script (generated by ChatGPT) that isolates the app process by modifying iptables rules to block all outbound traffic for that process.

Script
#!/bin/bash

ACTION=$1
PID=$(pgrep -f 'python -m tests.worker.mqtt.test_safe_mqtt_client' | head -n 1)

CGROUP_NAME="netblock"
CGROUP_PATH="/sys/fs/cgroup/net_cls/$CGROUP_NAME"
CLASSID_HEX="0x100001"
INTERFACE=$(ip route | grep default | awk '{print $5}')

if [ -z "$ACTION" ] || [ -z "$PID" ]; then
    echo "Usage: $0 [enable|disable] <PID>"
    exit 1
fi

function cleanup_cgroup {
    if [ -f "$CGROUP_PATH/cgroup.procs" ]; then
        echo 0 | sudo tee "$CGROUP_PATH/net_cls.classid" > /dev/null
        echo "$PID" | sudo tee /sys/fs/cgroup/net_cls/cgroup.procs > /dev/null
    fi
    sudo cgdelete -g net_cls:/$CGROUP_NAME 2>/dev/null
}

case "$ACTION" in
  enable)
    echo "[*] Blocking network for PID $PID..."

    # create cgroup
    sudo cgcreate -g net_cls:/$CGROUP_NAME

    # Set classid
    echo $CLASSID_HEX | sudo tee $CGROUP_PATH/net_cls.classid > /dev/null

    # Add process into cgroup
    sudo cgclassify -g net_cls:/$CGROUP_NAME $PID

    # config tc
    sudo tc qdisc show dev $INTERFACE | grep -q "htb" || {
        sudo tc qdisc add dev $INTERFACE root handle 1: htb
        sudo tc class add dev $INTERFACE parent 1: classid 1:1 htb rate 1000Mbps
        sudo tc filter add dev $INTERFACE parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1
    }

    # Block by iptables
    sudo iptables -t mangle -A OUTPUT -m cgroup --cgroup $CLASSID_HEX -j DROP
    sudo iptables -t mangle -A INPUT  -m cgroup --cgroup $CLASSID_HEX -j DROP

    echo "[+] Network blocked for PID $PID"
    ;;
  
  disable)
    echo "[*] Restoring network for PID $PID..."

    # Delete iptables
    sudo iptables -t mangle -D OUTPUT -m cgroup --cgroup $CLASSID_HEX -j DROP 2>/dev/null
    sudo iptables -t mangle -D INPUT  -m cgroup --cgroup $CLASSID_HEX -j DROP 2>/dev/null

    # Delete tc
    sudo tc qdisc del dev $INTERFACE root 2>/dev/null

    # Remove process from cgroup and delete cgroup
    cleanup_cgroup

    echo "[+] Network restored for PID $PID"
    ;;
  
  *)
    echo "Invalid action: $ACTION"
    echo "Usage: $0 [enable|disable] <PID>"
    exit 1
    ;;
esac

Behavior:
After restoring the network:

  • The client successfully reconnects to the MQTT server.
  • The on_connect callback is triggered, and inside that callback, I call client.subscribe(...).
    This results in a BrokenPipeError exception inside the Paho MQTT thread, which causes the thread to crash silently.

Final State of the app:

  • The Paho MQTT background thread is no longer running.
  • client.publish(...) calls do not raise errors, but messages are not delivered.
2025-06-23 23:18:50,035 - __main__ - INFO - Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_test_safe_mqtt_client' properties=[SessionExpiryInterval : 120]
2025-06-23 23:18:50,035 - __main__ - INFO - Connection failed, retrying
2025-06-23 23:18:50,233 - __main__ - INFO - Publishing message {'timestamp': '2025-06-23T23:18:50.233779'}, thread 134276862379072
2025-06-23 23:18:50,371 - __main__ - INFO - Received CONNACK (0, Success) properties=[ReceiveMaximum : 10, TopicAliasMaximum : 10]
2025-06-23 23:18:50,372 - __main__ - INFO - Callback on_connect, thread 134276835735104
2025-06-23 23:18:50,372 - __main__ - INFO - Connected to MQTT broker mqtt://test.mosquitto.org:1883 with reason code Success, flags: ConnectFlags(session_present=False), properties: [ReceiveMaximum : 10, TopicAliasMaximum : 10]
2025-06-23 23:18:50,372 - __main__ - INFO - Broker created a new session for this client
2025-06-23 23:18:50,372 - __main__ - INFO - Sending SUBSCRIBE (d0, m2551) [(b'command', {QoS=1, noLocal=False, retainAsPublished=False, retainHandling=0})]
2025-06-23 23:18:50,372 - __main__ - INFO - Caught exception in on_connect: [Errno 32] Broken pipe
Exception in thread paho-mqtt-client-client_test_safe_mqtt_client:
Traceback (most recent call last):
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 4523, in _thread_main
    self.loop_forever(retry_first_connection=True)
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2297, in loop_forever
    rc = self._loop(timeout)
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 1686, in _loop
    rc = self.loop_read()
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2100, in loop_read
    rc = self._packet_read()
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3142, in _packet_read
    rc = self._packet_handle()
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3814, in _packet_handle
    return self._handle_connack()
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3934, in _handle_connack
    on_connect(
  File "/home/user/workspace/surveillance/backend-services/tests/worker/mqtt/test_safe_mqtt_client2.py", line 128, in _on_connect
    client.subscribe(list_topics) # exception HERE

  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2038, in subscribe
    return self._send_subscribe(False, topic_qos_list, properties) # ----- SUBSCRIBE -----

  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3652, in _send_subscribe
    return (self._packet_queue(command, packet, local_mid, 1), local_mid)
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3782, in _packet_queue
    self._sockpairW.send(sockpair_data)
BrokenPipeError: [Errno 32] Broken pipe

Case 2:
If I remove the client.subscribe(...) call from on_connect, I get a different exception inside the MQTT thread, but the outcome is the same: the thread dies.

2025-06-20 09:08:44,719 -     INFO - safe_mqtt_client.py - Connected to MQTT broker mqtt://10.124.1.39:1883 with reason code Success, flags: ConnectFlags(session_present=False), properties: [ReceiveMaximum : 32, TopicAliasMaximum : 65535, RetainAvailable : 1, MaximumPacketSize : 1048576, WildcardSubscriptionAvailable : 1, SubscriptionIdentifierAvailable : 1, SharedSubscriptionAvailable : 1]
2025-06-20 09:08:44,719 -     INFO - safe_mqtt_client.py - Broker created a new session for this client
2025-06-20 09:08:44,719 -    ERROR - safe_mqtt_client.py - Failed to subscribe to topics: [Errno 32] Broken pipe
Exception in thread paho-mqtt-client-client_test_safe_mqtt_client:
Traceback (most recent call last):
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 4523, in _thread_main
    self.loop_forever(retry_first_connection=True)
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2297, in loop_forever
    rc = self._loop(timeout)
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 1686, in _loop
    rc = self.loop_read()
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 2100, in loop_read
    rc = self._packet_read()
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3142, in _packet_read
    rc = self._packet_handle()
  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3814, in _packet_handle
    return self._handle_connack()

  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3976, in _handle_connack
    rc = self._send_publish(  # ----- PUBLISH -----

  File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3444, in _send_publish
    return self._packet_queue(PUBLISH, packet, mid, qos, info) 
File "/home/user/installed/miniconda3/envs/backend-services/lib/python3.10/site-packages/paho/mqtt/client.py", line 3782, in _packet_queue
    self._sockpairW.send(sockpair_data)
BrokenPipeError: [Errno 32] Broken pipe

Concern:
I feel unsafe. When the MQTT thread crashes:

  • There's no built-in health check or recovery mechanism for the MQTT loop thread.
  • The client continues to accept publish calls, but they silently fail because the internal thread died

Environment

  • Python version: 3.10
  • Library version: 2.1.0
  • Operating system (including version): Ubuntu 22.04
  • MQTT server (name, version, configuration, hosting details): image emqx/emqx:5.8.6

Metadata

Metadata

Assignees

No one assigned

    Labels

    Status: AvailableNo one has claimed responsibility for resolving this issue.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions