Add clean_session parameter and command support to MQTTDynamicSubscriber#110
Add clean_session parameter and command support to MQTTDynamicSubscriber#110cergab wants to merge 2 commits into
Conversation
clean_session flag is set by default. Add support for following commands: 'subscribe' (topic subscribe) and 'unsubscribe' (topic unsubscribe) Other small updates
| message["client_id"] = client_id | ||
| message = "" | ||
| for (client_id, mqtt) in self.mqtt_dict.iteritems(): | ||
| if (calvinsys.can_read(mqtt)): |
There was a problem hiding this comment.
Please remove superfluous parentheses (in all if-statements, not only here)
| def update_topic(self, client_id, uri, topic, qos): | ||
| @condition(action_input=['client_id', 'uri', 'cmd', 'topic', 'qos']) | ||
| def update_topic(self, client_id, uri, cmd, topic, qos): | ||
| if (topic is None): |
There was a problem hiding this comment.
Topic must contain at least one character.
| "type": "string" | ||
| }, | ||
| }, | ||
| "required": ["topic"] |
There was a problem hiding this comment.
Command would be better as an enum. Also, describe defaults (when qos and cmd missing.)
| .format(topic, status[0])) | ||
|
|
||
| if (not done and retry > 0): | ||
| time.sleep(0.2) |
There was a problem hiding this comment.
If you need delay or wait, either use thread or delayed call (see other calvinsys for usage). This usage will freeze the entire runtime during the sleep.
There was a problem hiding this comment.
When I said that we should have retries, I meant for the connection, but forgot that when using the loop_start reconnect is handled automatically. But what needs to be added is to resubscribe to all topics in the on_connect callback.
| self.topics = [(topic.encode("ascii"), qos) for topic in topics] | ||
| self.topics = {(topic.encode("ascii")) : qos for topic in list(set(topics))} | ||
| self.data = [] | ||
| clean_session = kwargs.get('clean_session', 'false').lower() == "true" |
There was a problem hiding this comment.
Clean session that comes in should already be a boolean.
| _log.warning("TLS configuration is missing!") | ||
|
|
||
| self.client.connect_async(host=hostname, port=port) | ||
| self.client.connect(host=hostname, port=port) |
There was a problem hiding this comment.
Why change this from async. This would block the runtime, please change back.
Add support for MQTTDynamicSubscriber actor migration Add migration test using public MQTT broker http://test.mosquitto.org/ Other minor updates
| _log.warning("MQTT broker {}:{} disconnected".format(hostname, port)) | ||
|
|
||
| def on_message(client, userdata, message): | ||
| _log.info("New message {}".format(message)) |
There was a problem hiding this comment.
will be removed on the next commit
clean_session flag is set by default.
Add support for following commands: 'subscribe' (topic subscribe) and
'unsubscribe' (topic unsubscribe)
Other small updates