Skip to content

Commit 5a0a752

Browse files
committed
add support for mqtt v5, upgrade RabbitMQ and paho-mqtt versions
Signed-off-by: Lance-Drane <Lance-Drane@users.noreply.github.com>
1 parent d0bcb95 commit 5a0a752

11 files changed

Lines changed: 76 additions & 48 deletions

File tree

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
services:
55
broker:
6-
image: "bitnami/rabbitmq:3.13.3"
6+
image: "bitnami/rabbitmq:4.1"
77
ports:
88
- "1883:1883" # MQTT port
99
- "5672:5672" # AMQP port

docs/core_concepts.rst

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,16 @@ You can emit an event by calling ``self.intersect_sdk_emit_event(event_name, eve
4747

4848
A simple example of how to configure this:
4949

50-
```python
51-
class YourCapability(IntersectBaseCapabilityImplementation):
52-
# You should configure it on the class itself. Do NOT configure it on the instance.
53-
intersect_sdk_events: ClassVar[dict[str, IntersectEventDefinition]] = {
54-
'my_integer_event': IntersectEventDefinition(event_type=int),
55-
'my_str_event': IntersectEventDefinition(event_type=str),
56-
'my_float_event': IntersectEventDefinition(event_type=float),
57-
}
58-
```
50+
.. code-block:: python
51+
52+
class YourCapability(IntersectBaseCapabilityImplementation):
53+
# You should configure it on the class itself. Do NOT configure it on the instance.
54+
intersect_sdk_events: ClassVar[dict[str, IntersectEventDefinition]] = {
55+
'my_integer_event': IntersectEventDefinition(event_type=int),
56+
'my_str_event': IntersectEventDefinition(event_type=str),
57+
'my_float_event': IntersectEventDefinition(event_type=float),
58+
}
59+
5960
6061
Now this capability can call ``self.intersect_sdk_emit_event('my_integer_event', value)`` as long as "value" is actually an integer.
6162

examples/2_counting/counting_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def client_callback(
141141
'username': 'intersect_username',
142142
'password': 'intersect_password',
143143
'port': 1883,
144-
'protocol': 'mqtt3.1.1',
144+
'protocol': 'mqtt5.0',
145145
},
146146
],
147147
}

examples/2_counting/counting_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def _run_count(self) -> None:
168168
'username': 'intersect_username',
169169
'password': 'intersect_password',
170170
'port': 1883,
171-
'protocol': 'mqtt3.1.1',
171+
'protocol': 'mqtt5.0',
172172
},
173173
],
174174
}

examples/2_counting_events/counting_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def event_callback(
6363
'username': 'intersect_username',
6464
'password': 'intersect_password',
6565
'port': 1883,
66-
'protocol': 'mqtt3.1.1',
66+
'protocol': 'mqtt5.0',
6767
},
6868
],
6969
}

examples/2_counting_events/counting_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def increment_counter_function(self) -> None:
6262
'username': 'intersect_username',
6363
'password': 'intersect_password',
6464
'port': 1883,
65-
'protocol': 'mqtt3.1.1',
65+
'protocol': 'mqtt5.0',
6666
},
6767
],
6868
}

pdm.lock

Lines changed: 1 addition & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ lint = [
178178
"pre-commit>=3.3.1",
179179
"ruff==0.12.7",
180180
"mypy>=1.10.0",
181-
"types-paho-mqtt>=1.6.0.20240106",
182181
"codespell>=2.3.0",
183182
]
184183
test = ["pytest>=7.3.2", "pytest-cov>=4.1.0", "httpretty>=1.1.4"]

src/intersect_sdk/_internal/control_plane/brokers/mqtt_client.py

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing import TYPE_CHECKING, Any
66

77
import paho.mqtt.client as paho_client
8+
from paho.mqtt.enums import CallbackAPIVersion
89
from retrying import retry
910

1011
from ...logger import logger
@@ -13,6 +14,10 @@
1314
if TYPE_CHECKING:
1415
from collections.abc import Callable
1516

17+
from paho.mqtt.client import DisconnectFlags
18+
from paho.mqtt.properties import Properties
19+
from paho.mqtt.reasoncodes import ReasonCode
20+
1621
from ..topic_handler import TopicHandler
1722

1823

@@ -42,6 +47,7 @@ def __init__(
4247
password: str,
4348
topics_to_handlers: Callable[[], dict[str, TopicHandler]],
4449
uid: str | None = None,
50+
v5: bool = False,
4551
) -> None:
4652
"""The default constructor.
4753
@@ -52,20 +58,22 @@ def __init__(
5258
password: password credentials for MQTT broker
5359
topics_to_handlers: callback function which gets the topic to handler map from the channel manager
5460
uid: A string representing the unique id to identify the client.
61+
v5: if true - protocol is MQTTv5; if false - protocol is MQTTv3.1.1
5562
"""
5663
# Unique id for the MQTT broker to associate this client with
5764
self.uid = uid if uid else str(uuid.uuid4())
5865
self.host = host
5966
self.port = port
6067

6168
# Create a client to connect to RabbitMQ
62-
# TODO clean_session param is ONLY for MQTT v3 here
6369
self._connection = paho_client.Client(
64-
callback_api_version=paho_client.CallbackAPIVersion.VERSION2,
70+
callback_api_version=CallbackAPIVersion.VERSION2,
71+
protocol=paho_client.MQTTv5 if v5 else paho_client.MQTTv311,
6572
client_id=self.uid,
66-
clean_session=False,
73+
clean_session=False if not v5 else None,
6774
)
6875
self._connection.username_pw_set(username=username, password=password)
76+
self._v5 = v5
6977

7078
# Whether the connection is currently active
7179
self._connected = False
@@ -77,6 +85,10 @@ def __init__(
7785
# ConnectionManager callable state
7886
self._topics_to_handlers = topics_to_handlers
7987

88+
# MQTT v3.1.1 automatically downgrades a QOS which is too high (good), but MQTT v5 will terminate the connection (bad)
89+
# see https://github.com/rabbitmq/rabbitmq-server/discussions/11842
90+
self._max_supported_qos = 2
91+
8092
# MQTT callback functions
8193
self._connection.on_connect = self._handle_connect
8294
self._connection.on_disconnect = self._handle_disconnect
@@ -86,10 +98,14 @@ def __init__(
8698
def connect(self) -> None:
8799
"""Connect to the defined broker."""
88100
# Create a client to connect to RabbitMQ
89-
# TODO MQTT v5 implementations should set clean_start to NEVER here
90101
self._should_disconnect = False
91102
self._connected_flag.clear()
92-
self._connection.connect(self.host, self.port, 60)
103+
self._connection.connect(
104+
self.host,
105+
self.port,
106+
60,
107+
clean_start=False if self._v5 else 3,
108+
)
93109
self._connection.loop_start()
94110
while not self.is_connected() and not self._connected_flag.is_set():
95111
self._connected_flag.wait(1.0)
@@ -124,7 +140,7 @@ def publish(self, topic: str, payload: bytes, persist: bool) -> None:
124140
if it should be removed immediately (False)
125141
"""
126142
# NOTE: RabbitMQ only works with QOS of 1 and 0, and seems to convert QOS2 to QOS1
127-
self._connection.publish(topic, payload, qos=2 if persist else 0)
143+
self._connection.publish(topic, payload, qos=self._max_supported_qos if persist else 0)
128144

129145
def subscribe(self, topic: str, persist: bool) -> None:
130146
"""Subscribe to a topic over the pre-existing connection (via connect()).
@@ -134,7 +150,7 @@ def subscribe(self, topic: str, persist: bool) -> None:
134150
persist: Determine if the associated message queue of the topic is long-lived (True) or not (False)
135151
"""
136152
# NOTE: RabbitMQ only works with QOS of 1 and 0, and seems to convert QOS2 to QOS1
137-
self._connection.subscribe(topic, qos=2 if persist else 0)
153+
self._connection.subscribe(topic, qos=2 if persist else 0, properties=None)
138154

139155
def unsubscribe(self, topic: str) -> None:
140156
"""Unsubscribe from a topic over the pre-existing connection.
@@ -165,10 +181,10 @@ def _on_message(
165181
def _handle_disconnect(
166182
self,
167183
client: paho_client.Client,
168-
userdata: Any, # noqa: ARG002
169-
flags: dict[str, Any], # noqa: ARG002
170-
reason_code: int, # noqa: ARG002
171-
properties: None, # noqa: ARG002
184+
userdata: Any,
185+
flags: DisconnectFlags,
186+
reason_code: ReasonCode,
187+
properties: Properties | None,
172188
) -> None:
173189
"""Handle a disconnection from the MQTT server.
174190
@@ -178,9 +194,17 @@ def _handle_disconnect(
178194
client: The Paho client.
179195
userdata: MQTT user data.
180196
flags: List of MQTT connection flags.
181-
reason_code: MQTT return code as an integer.
197+
reason_code: MQTT return code.
182198
properties: MQTT user properties.
183199
"""
200+
logger.debug(
201+
'mqtt disconnected log - uid=%s reason_code=%s flags=%s userdata=%s properties=%s',
202+
self.uid,
203+
reason_code,
204+
flags,
205+
userdata,
206+
properties,
207+
)
184208
self._connected = False
185209
if not self._should_disconnect:
186210
client.reconnect()
@@ -190,8 +214,8 @@ def _handle_connect(
190214
client: paho_client.Client, # noqa: ARG002
191215
userdata: Any,
192216
flags: dict[str, Any],
193-
reason_code: int,
194-
properties: None, # noqa: ARG002
217+
reason_code: ReasonCode,
218+
properties: Properties | None,
195219
) -> None:
196220
"""Set the connection status in response to the result of a Paho connection attempt.
197221
@@ -202,26 +226,40 @@ def _handle_connect(
202226
client: The Paho MQTT client.
203227
userdata: The MQTT userdata.
204228
flags: List of MQTT connection flags.
205-
reason_code: The MQTT return code as an int.
229+
reason_code: The MQTT return code.
206230
properties: MQTT user properties
207231
"""
208-
# Return code 0 means connection was successful
209-
if reason_code == 0:
232+
if str(reason_code) == 'Success':
233+
logger.debug(
234+
'MQTT connected log - reason-code=%s properties=%s userdata=%s flags=%s',
235+
reason_code,
236+
properties,
237+
userdata,
238+
flags,
239+
)
210240
self._connected = True
211241
self._connection_retries = 0
212242
self._should_disconnect = False
243+
244+
# mimic "automatic QoS downgrade" of MQTTv3 for MQTTv5
245+
if properties and hasattr(properties, 'MaximumQoS'):
246+
logger.info('MQTT: Maximum supported QoS is %s', properties.MaximumQoS)
247+
self._max_supported_qos = properties.MaximumQoS
248+
213249
self._connected_flag.set()
214250
for topic, topic_handler in self._topics_to_handlers().items():
215251
self.subscribe(topic, topic_handler.topic_persist)
216252
else:
217253
# This will generally suggest a misconfiguration
218254
self._connected = False
219255
self._connection_retries += 1
256+
logger.error('Bad connection (reason: %s)', reason_code)
220257
logger.error(
221-
f'On connect error received (probable broker config error), have tried {self._connection_retries} times'
258+
'On connect error received (probable broker config error), have tried %s times',
259+
self._connection_retries,
222260
)
223-
logger.error(f'Connection error userdata: {userdata}')
224-
logger.error(f'Connection error flags: {flags}')
261+
logger.error('Connection error userdata: %s', userdata)
262+
logger.error('Connection error flags: %s', flags)
225263
if self._connection_retries >= _MQTT_MAX_RETRIES:
226264
logger.error('Giving up MQTT reconnection attempt')
227265
self._connected_flag.set()

src/intersect_sdk/_internal/control_plane/control_plane_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def create_control_provider(
5656
username=config.username,
5757
password=config.password,
5858
topics_to_handlers=topic_handler_callback,
59+
v5=config.protocol == 'mqtt5.0',
5960
)
6061

6162

0 commit comments

Comments
 (0)