-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcontrol.py
More file actions
64 lines (50 loc) · 2.08 KB
/
Copy pathcontrol.py
File metadata and controls
64 lines (50 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# ==============================================================================
# Copyright (c) 2024 Botts Innovative Research, Inc.
# Date: 2024/7/1
# Author: Ian Patterson
# Contact Email: ian@botts-inc.com
# ==============================================================================
import websockets
from csapi4py.mqtt import MQTTCommClient
from schema_datamodels import ControlStreamJSONSchema, CommandJSON
from src.oshconnect import System
class ControlSchema:
schema: dict = None
class ControlStream:
name: str = None
_parent_systems: System = None
_strategy: str = "mqtt"
_resource_endpoint = None
# _auth: str = None
_websocket: websockets.WebSocketServerProtocol = None
_schema: ControlStreamJSONSchema = None
_mqtt_client: MQTTCommClient = None
def __init__(self, parent_system: System, resource_endpoint: str, name=None, strategy="mqtt"):
self._parent_systems = parent_system
self.name = name
self._strategy = strategy
self._resource_endpoint = resource_endpoint
def set_schema(self, schema: ControlStreamJSONSchema):
self._schema = schema
def connect(self):
pass
def subscribe(self):
if self._strategy == "mqtt" and self._mqtt_client is not None:
self._mqtt_client.subscribe(f'{self._resource_endpoint}/commands')
elif self._strategy == "mqtt" and self._mqtt_client is None:
raise ValueError("No MQTT Client found.")
elif self._strategy == "websocket":
pass
def publish(self, payload: CommandJSON):
if self._strategy == "mqtt" and self._mqtt_client is not None:
self._mqtt_client.publish(f'{self._resource_endpoint}/status', payload=payload, qos=1)
elif self._strategy == "mqtt" and self._mqtt_client is None:
raise ValueError("No MQTT Client found.")
elif self._strategy == "websocket":
pass
def disconnect(self):
pass
def unsubscribe(self):
self._mqtt_client.unsubscribe(f'{self._resource_endpoint}/commands')
class Command:
pass