|
1 | | -class DigitalOut(object): |
| 1 | +import numpy as np |
2 | 2 |
|
| 3 | +class DigitalOut(object): |
| 4 | + ## DigitalOut |
3 | 5 | def __init__(self, parent): |
4 | 6 | self._parent = parent |
5 | 7 | self._logger = parent.logger |
6 | | - |
| 8 | + self.nDigitalOuts = 3 |
| 9 | + self.digitalOutValues = np.zeros((self.nDigitalOuts), dtype=int) |
| 10 | + |
| 11 | + # register a callback function for the digitalout status on the serial loop |
| 12 | + if hasattr(self._parent, "serial"): |
| 13 | + self._parent.serial.register_callback(self._callback_digitalout_status, pattern="digitalout") |
| 14 | + |
| 15 | + # announce a function that is called when we receive a digitalout update through the callback |
| 16 | + self._callbackPerKey = {} |
| 17 | + self.nCallbacks = 10 |
| 18 | + self._callbackPerKey = self.init_callback_functions(nCallbacks=self.nCallbacks) |
| 19 | + print(self._callbackPerKey) |
| 20 | + |
| 21 | + |
| 22 | + def init_callback_functions(self, nCallbacks=10): |
| 23 | + ''' initialize the callback functions - each key holds a list of callbacks ''' |
| 24 | + _callbackPerKey = {} |
| 25 | + self.nCallbacks = nCallbacks |
| 26 | + for i in range(nCallbacks): |
| 27 | + _callbackPerKey[i] = [] # Initialize as list to support multiple callbacks |
| 28 | + return _callbackPerKey |
| 29 | + |
| 30 | + def _callback_digitalout_status(self, data): |
| 31 | + ''' cast the json in the form: |
| 32 | + ++ |
| 33 | + {"digitalout":{"digitaloutid":1,"digitaloutval":1,"digitaloutpin":4,"isDone":1},"qid":2} |
| 34 | + -- |
| 35 | + into the digitalout values array ''' |
| 36 | + try: |
| 37 | + digitalout_data = data["digitalout"] |
| 38 | + # Handle both single digitalout and multiple digitalouts |
| 39 | + if isinstance(digitalout_data, dict): |
| 40 | + # Single digitalout format: {"digitaloutid":1,"digitaloutval":1,"digitaloutpin":4} |
| 41 | + digitalout_id = digitalout_data.get("digitaloutid", 0) |
| 42 | + digitalout_val = digitalout_data.get("digitaloutval", 0) |
| 43 | + if 0 < digitalout_id <= self.nDigitalOuts: |
| 44 | + self.digitalOutValues[digitalout_id - 1] = digitalout_val |
| 45 | + elif isinstance(digitalout_data, list): |
| 46 | + # Multiple digitalouts format: [{"digitaloutid":1,"digitaloutval":1}, ...] |
| 47 | + for digitalout in digitalout_data: |
| 48 | + digitalout_id = digitalout.get("digitaloutid", 0) |
| 49 | + digitalout_val = digitalout.get("digitaloutval", 0) |
| 50 | + if 0 < digitalout_id <= self.nDigitalOuts: |
| 51 | + self.digitalOutValues[digitalout_id - 1] = digitalout_val |
| 52 | + |
| 53 | + # Call all registered callbacks for key 0 |
| 54 | + for callback in self._callbackPerKey[0]: |
| 55 | + if callable(callback): |
| 56 | + try: |
| 57 | + callback(self.digitalOutValues) |
| 58 | + except Exception as callback_error: |
| 59 | + print(f"Error in callback execution: {callback_error}") |
| 60 | + except Exception as e: |
| 61 | + print("Error in _callback_digitalout_status: ", e) |
| 62 | + |
| 63 | + def register_callback(self, key, callbackfct): |
| 64 | + ''' register a callback function for a specific key - supports multiple callbacks per key ''' |
| 65 | + if key not in self._callbackPerKey: |
| 66 | + self._callbackPerKey[key] = [] |
| 67 | + if callbackfct not in self._callbackPerKey[key]: # Avoid duplicate registrations |
| 68 | + self._callbackPerKey[key].append(callbackfct) |
| 69 | + print(f"Registered callback for key {key}. Total callbacks for this key: {len(self._callbackPerKey[key])}") |
7 | 70 |
|
8 | 71 | def setup_digitaloutpin(self, id=1, pin=4): |
9 | 72 | path = '/digitalout_set' |
@@ -63,3 +126,62 @@ def sendTrigger(self, triggerId=0): |
63 | 126 |
|
64 | 127 | r = self._parent.post_json(path, payload) |
65 | 128 | return r |
| 129 | + |
| 130 | + def get_digitalout(self, digitaloutid=1, timeout=1, is_blocking=True): |
| 131 | + """ |
| 132 | + Get the current value and pin of a digital output. |
| 133 | + |
| 134 | + Parameters: |
| 135 | + ----------- |
| 136 | + digitaloutid : int |
| 137 | + ID of the digital output (1, 2, or 3) |
| 138 | + timeout : float |
| 139 | + Timeout for the request in seconds |
| 140 | + is_blocking : bool |
| 141 | + Whether to wait for a response |
| 142 | + |
| 143 | + Returns: |
| 144 | + -------- |
| 145 | + dict or None |
| 146 | + Response from the device containing digitalout status (id, val, pin) |
| 147 | + """ |
| 148 | + path = '/digitalout_get' |
| 149 | + |
| 150 | + payload = { |
| 151 | + "task": path, |
| 152 | + "digitaloutid": digitaloutid |
| 153 | + } |
| 154 | + |
| 155 | + r = self._parent.post_json(path, payload, getReturn=is_blocking, timeout=timeout) |
| 156 | + return r |
| 157 | + |
| 158 | + def set_digitalout(self, digitaloutid=1, digitaloutval=0, timeout=1, is_blocking=False): |
| 159 | + """ |
| 160 | + Set the value of a digital output. Use digitaloutval=-1 to trigger a pulse (HIGH->LOW). |
| 161 | + |
| 162 | + Parameters: |
| 163 | + ----------- |
| 164 | + digitaloutid : int |
| 165 | + ID of the digital output (1, 2, or 3) |
| 166 | + digitaloutval : int |
| 167 | + Value to set (0=LOW, 1=HIGH, -1=pulse/trigger) |
| 168 | + timeout : float |
| 169 | + Timeout for the request in seconds |
| 170 | + is_blocking : bool |
| 171 | + Whether to wait for a response |
| 172 | + |
| 173 | + Returns: |
| 174 | + -------- |
| 175 | + dict or None |
| 176 | + Response from the device |
| 177 | + """ |
| 178 | + path = '/digitalout_act' |
| 179 | + |
| 180 | + payload = { |
| 181 | + "task": path, |
| 182 | + "digitaloutid": digitaloutid, |
| 183 | + "digitaloutval": digitaloutval |
| 184 | + } |
| 185 | + |
| 186 | + r = self._parent.post_json(path, payload, getReturn=is_blocking, timeout=timeout) |
| 187 | + return r |
0 commit comments