Skip to content

Commit fe7c12c

Browse files
committed
umqtt.async: Initial implementation based on umqtt.simple
1 parent 6ae440a commit fe7c12c

File tree

2 files changed

+245
-0
lines changed

2 files changed

+245
-0
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
metadata(description="Lightweight async MQTT client for MicroPython.", version="1.7.0")
2+
3+
# Originally written by Paul Sokolovsky, adapted by Dmitry Ketov
4+
5+
require("ssl")
6+
7+
package("umqtt")
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
import socket
2+
import struct
3+
from binascii import hexlify
4+
import uasyncio as aio
5+
6+
7+
class MQTTException(Exception):
8+
pass
9+
10+
11+
class MQTTClient:
12+
def __init__(
13+
self,
14+
client_id,
15+
server,
16+
port=0,
17+
user=None,
18+
password=None,
19+
keepalive=0,
20+
ssl=None,
21+
ssl_params={},
22+
):
23+
if port == 0:
24+
port = 8883 if ssl else 1883
25+
self.client_id = client_id
26+
self.r, self.w = None, None
27+
self.server = server
28+
self.port = port
29+
self.ssl = ssl
30+
self.ssl_params = ssl_params
31+
self.pid = 0
32+
self.cb = None
33+
self.user = user
34+
self.pswd = password
35+
self.keepalive = keepalive
36+
self.lw_topic = None
37+
self.lw_msg = None
38+
self.lw_qos = 0
39+
self.lw_retain = False
40+
41+
async def _send_str(self, s):
42+
await self.w.awrite(struct.pack("!H", len(s)))
43+
await self.w.awrite(s)
44+
45+
async def _recv_len(self):
46+
n = 0
47+
sh = 0
48+
while 1:
49+
b = (await self.r.read(1))[0]
50+
n |= (b & 0x7F) << sh
51+
if not b & 0x80:
52+
return n
53+
sh += 7
54+
55+
def set_callback(self, f):
56+
self.cb = f
57+
58+
def set_last_will(self, topic, msg, retain=False, qos=0):
59+
assert 0 <= qos <= 2
60+
assert topic
61+
self.lw_topic = topic
62+
self.lw_msg = msg
63+
self.lw_qos = qos
64+
self.lw_retain = retain
65+
66+
async def connect(self, clean_session=True, timeout=None):
67+
self.r, self.w = await aio.open_connection(self.server, self.port)
68+
premsg = bytearray(b"\x10\0\0\0\0\0")
69+
msg = bytearray(b"\x04MQTT\x04\x02\0\0")
70+
71+
sz = 10 + 2 + len(self.client_id)
72+
msg[6] = clean_session << 1
73+
if self.user:
74+
sz += 2 + len(self.user) + 2 + len(self.pswd)
75+
msg[6] |= 0xC0
76+
if self.keepalive:
77+
assert self.keepalive < 65536
78+
msg[7] |= self.keepalive >> 8
79+
msg[8] |= self.keepalive & 0x00FF
80+
if self.lw_topic:
81+
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
82+
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
83+
msg[6] |= self.lw_retain << 5
84+
85+
i = 1
86+
while sz > 0x7F:
87+
premsg[i] = (sz & 0x7F) | 0x80
88+
sz >>= 7
89+
i += 1
90+
premsg[i] = sz
91+
92+
await self.w.awrite(premsg[:i + 2])
93+
await self.w.awrite(msg)
94+
# print(hex(len(msg)), hexlify(msg, ":"))
95+
await self._send_str(self.client_id)
96+
if self.lw_topic:
97+
await self._send_str(self.lw_topic)
98+
await self._send_str(self.lw_msg)
99+
if self.user:
100+
await self._send_str(self.user)
101+
await self._send_str(self.pswd)
102+
resp = await self.r.read(4)
103+
assert resp[0] == 0x20 and resp[1] == 0x02
104+
if resp[3] != 0:
105+
raise MQTTException(resp[3])
106+
return resp[2] & 1
107+
108+
async def disconnect(self):
109+
await self.w.awrite(b"\xe0\0")
110+
await self.w.close()
111+
await self.r.close()
112+
113+
async def ping(self):
114+
await self.w.awrite(b"\xc0\0")
115+
116+
async def publish(self, topic, msg, retain=False, qos=0):
117+
pkt = bytearray(b"\x30\0\0\0")
118+
pkt[0] |= qos << 1 | retain
119+
sz = 2 + len(topic) + len(msg)
120+
if qos > 0:
121+
sz += 2
122+
assert sz < 2097152
123+
i = 1
124+
while sz > 0x7F:
125+
pkt[i] = (sz & 0x7F) | 0x80
126+
sz >>= 7
127+
i += 1
128+
pkt[i] = sz
129+
# print(hex(len(pkt)), hexlify(pkt, ":"))
130+
await self.w.awrite(pkt[:i + 1])
131+
await self._send_str(topic)
132+
if qos > 0:
133+
self.pid += 1
134+
pid = self.pid
135+
struct.pack_into("!H", pkt, 0, pid)
136+
await self.w.awrite(pkt, 2)
137+
await self.w.awrite(msg)
138+
if qos == 1:
139+
while 1:
140+
op = await self.wait_msg()
141+
if op == 0x40:
142+
sz = await self.r.read(1)
143+
assert sz == b"\x02"
144+
rcv_pid = await self.r.read(2)
145+
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
146+
if pid == rcv_pid:
147+
return
148+
elif qos == 2:
149+
assert 0
150+
151+
async def subscribe(self, topic, qos=0):
152+
assert self.cb is not None, "Subscribe callback is not set"
153+
pkt = bytearray(b"\x82\0\0\0")
154+
self.pid += 1
155+
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
156+
# print(hex(len(pkt)), hexlify(pkt, ":"))
157+
await self.w.awrite(pkt)
158+
await self._send_str(topic)
159+
await self.w.awrite(qos.to_bytes(1, "little"))
160+
while 1:
161+
op = await self.wait_msg()
162+
if op == 0x90:
163+
resp = await self.r.read(4)
164+
# print(resp)
165+
assert resp[1] == pkt[2] and resp[2] == pkt[3]
166+
if resp[3] == 0x80:
167+
raise MQTTException(resp[3])
168+
return
169+
170+
async def unsubscribe(self, topic):
171+
pkt = bytearray(b"\xa2\0\0\0")
172+
self.pid += 1
173+
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), self.pid)
174+
await self.w.awrite(pkt)
175+
await self._send_str(topic)
176+
while 1:
177+
op = await self.wait_msg()
178+
if op == 0xB0:
179+
resp = await self.r.read(3)
180+
assert resp[1] == pkt[2] and resp[2] == pkt[3]
181+
return
182+
183+
# Wait for a single incoming MQTT message and process it.
184+
# Subscribed messages are delivered to a callback previously
185+
# set by .set_callback() method. Other (internal) MQTT
186+
# messages processed internally.
187+
async def wait_msg(self):
188+
res = await self.r.read(1)
189+
if res is None:
190+
return None
191+
if res == b"":
192+
raise OSError(-1)
193+
if res == b"\xd0": # PINGRESP
194+
sz = await self.r.read(1)[0]
195+
assert sz == 0
196+
return None
197+
op = res[0]
198+
if op & 0xF0 != 0x30:
199+
return op
200+
sz = await self._recv_len()
201+
topic_len = await self.r.read(2)
202+
topic_len = (topic_len[0] << 8) | topic_len[1]
203+
topic = await self.r.read(topic_len)
204+
sz -= topic_len + 2
205+
if op & 6:
206+
pid = await self.r.read(2)
207+
pid = pid[0] << 8 | pid[1]
208+
sz -= 2
209+
msg = await self.r.read(sz)
210+
self.cb(topic, msg)
211+
if op & 6 == 2:
212+
pkt = bytearray(b"\x40\x02\0\0")
213+
struct.pack_into("!H", pkt, 2, pid)
214+
await self.w.awrite(pkt)
215+
elif op & 6 == 4:
216+
assert 0
217+
return op
218+
219+
if __name__ == "__main__":
220+
import uasyncio as aio
221+
222+
async def test(name):
223+
c = MQTTClient(name, "localhost")
224+
await c.connect()
225+
c.set_callback(lambda t, m: print(t, m))
226+
await c.subscribe("#")
227+
await c.publish(name, name)
228+
for i in range(100):
229+
print("!", await c.wait_msg())
230+
c.disconnect()
231+
232+
async def main():
233+
await aio.gather(
234+
aio.create_task(test("c1")),
235+
aio.create_task(test("c2"))
236+
)
237+
238+
aio.run(main())

0 commit comments

Comments
 (0)