Skip to content

Commit a369219

Browse files
committed
adding digital i/o interface
1 parent 6e5c476 commit a369219

3 files changed

Lines changed: 250 additions & 2 deletions

File tree

uc2rest/UC2Client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from .analog import Analog
2222
from .modules import Modules
2323
from .digitalout import DigitalOut
24+
from .digitalin import DigitalIn
2425
from .rotator import Rotator
2526
from .logger import Logger
2627
from .cmdrecorder import cmdRecorder
@@ -150,6 +151,9 @@ def __init__(self, host=None, port=31950, serialport=None, identity="UC2_Feather
150151
# initialize digital out
151152
self.digitalout = DigitalOut(self)
152153

154+
# initialize digital in
155+
self.digitalin = DigitalIn(self)
156+
153157
# initialize messaging
154158
self.message = Message(self)
155159

uc2rest/digitalin.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import numpy as np
2+
3+
class DigitalIn(object):
4+
## DigitalIn
5+
def __init__(self, parent):
6+
self._parent = parent
7+
self.nDigitalIns = 3
8+
self.digitalInValues = np.zeros((self.nDigitalIns), dtype=int)
9+
10+
# register a callback function for the digitalin status on the serial loop
11+
if hasattr(self._parent, "serial"):
12+
self._parent.serial.register_callback(self._callback_digitalin_status, pattern="digitalin")
13+
14+
# announce a function that is called when we receive a digitalin update through the callback
15+
self._callbackPerKey = {}
16+
self.nCallbacks = 10
17+
self._callbackPerKey = self.init_callback_functions(nCallbacks=self.nCallbacks)
18+
print(self._callbackPerKey)
19+
20+
21+
def init_callback_functions(self, nCallbacks=10):
22+
''' initialize the callback functions - each key holds a list of callbacks '''
23+
_callbackPerKey = {}
24+
self.nCallbacks = nCallbacks
25+
for i in range(nCallbacks):
26+
_callbackPerKey[i] = [] # Initialize as list to support multiple callbacks
27+
return _callbackPerKey
28+
29+
def _callback_digitalin_status(self, data):
30+
''' cast the json in the form:
31+
++
32+
{"digitalin":{"digitalinid":1,"digitalinval":1,"isDone":1},"qid":2}
33+
--
34+
into the digitalin values array '''
35+
try:
36+
digitalin_data = data["digitalin"]
37+
# Handle both single digitalin and multiple digitalins
38+
if isinstance(digitalin_data, dict):
39+
# Single digitalin format: {"digitalinid":1,"digitalinval":1}
40+
digitalin_id = digitalin_data.get("digitalinid", 0)
41+
digitalin_val = digitalin_data.get("digitalinval", 0)
42+
if 0 < digitalin_id <= self.nDigitalIns:
43+
self.digitalInValues[digitalin_id - 1] = digitalin_val
44+
elif isinstance(digitalin_data, list):
45+
# Multiple digitalins format: [{"digitalinid":1,"digitalinval":1}, ...]
46+
for digitalin in digitalin_data:
47+
digitalin_id = digitalin.get("digitalinid", 0)
48+
digitalin_val = digitalin.get("digitalinval", 0)
49+
if 0 < digitalin_id <= self.nDigitalIns:
50+
self.digitalInValues[digitalin_id - 1] = digitalin_val
51+
52+
# Call all registered callbacks for key 0
53+
for callback in self._callbackPerKey[0]:
54+
if callable(callback):
55+
try:
56+
callback(self.digitalInValues)
57+
except Exception as callback_error:
58+
print(f"Error in callback execution: {callback_error}")
59+
except Exception as e:
60+
print("Error in _callback_digitalin_status: ", e)
61+
62+
def register_callback(self, key, callbackfct):
63+
''' register a callback function for a specific key - supports multiple callbacks per key '''
64+
if key not in self._callbackPerKey:
65+
self._callbackPerKey[key] = []
66+
if callbackfct not in self._callbackPerKey[key]: # Avoid duplicate registrations
67+
self._callbackPerKey[key].append(callbackfct)
68+
print(f"Registered callback for key {key}. Total callbacks for this key: {len(self._callbackPerKey[key])}")
69+
70+
71+
def get_digitalin(self, digitalinid=1, timeout=1, is_blocking=True):
72+
"""
73+
Get the current value of a digital input.
74+
75+
Parameters:
76+
-----------
77+
digitalinid : int
78+
ID of the digital input (1, 2, or 3)
79+
timeout : float
80+
Timeout for the request in seconds
81+
is_blocking : bool
82+
Whether to wait for a response
83+
84+
Returns:
85+
--------
86+
dict or None
87+
Response from the device containing digitalin status
88+
"""
89+
path = '/digitalin_get'
90+
91+
payload = {
92+
"task": path,
93+
"digitalinid": digitalinid
94+
}
95+
96+
r = self._parent.post_json(path, payload, getReturn=is_blocking, timeout=timeout)
97+
return r
98+
99+
def act_digitalin(self, timeout=1, is_blocking=False):
100+
"""
101+
Trigger the digitalin act function.
102+
103+
Parameters:
104+
-----------
105+
timeout : float
106+
Timeout for the request in seconds
107+
is_blocking : bool
108+
Whether to wait for a response
109+
110+
Returns:
111+
--------
112+
dict or None
113+
Response from the device
114+
"""
115+
path = '/digitalin_act'
116+
117+
payload = {
118+
"task": path
119+
}
120+
121+
r = self._parent.post_json(path, payload, getReturn=is_blocking, timeout=timeout)
122+
return r

uc2rest/digitalout.py

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,72 @@
1-
class DigitalOut(object):
1+
import numpy as np
22

3+
class DigitalOut(object):
4+
## DigitalOut
35
def __init__(self, parent):
46
self._parent = parent
57
self._logger = parent.logger
6-
8+
self.nDigitalOuts = 3
9+
self.digitalOutValues = np.zeros((self.nDigitalOuts), dtype=int)
10+
11+
# register a callback function for the digitalout status on the serial loop
12+
if hasattr(self._parent, "serial"):
13+
self._parent.serial.register_callback(self._callback_digitalout_status, pattern="digitalout")
14+
15+
# announce a function that is called when we receive a digitalout update through the callback
16+
self._callbackPerKey = {}
17+
self.nCallbacks = 10
18+
self._callbackPerKey = self.init_callback_functions(nCallbacks=self.nCallbacks)
19+
print(self._callbackPerKey)
20+
21+
22+
def init_callback_functions(self, nCallbacks=10):
23+
''' initialize the callback functions - each key holds a list of callbacks '''
24+
_callbackPerKey = {}
25+
self.nCallbacks = nCallbacks
26+
for i in range(nCallbacks):
27+
_callbackPerKey[i] = [] # Initialize as list to support multiple callbacks
28+
return _callbackPerKey
29+
30+
def _callback_digitalout_status(self, data):
31+
''' cast the json in the form:
32+
++
33+
{"digitalout":{"digitaloutid":1,"digitaloutval":1,"digitaloutpin":4,"isDone":1},"qid":2}
34+
--
35+
into the digitalout values array '''
36+
try:
37+
digitalout_data = data["digitalout"]
38+
# Handle both single digitalout and multiple digitalouts
39+
if isinstance(digitalout_data, dict):
40+
# Single digitalout format: {"digitaloutid":1,"digitaloutval":1,"digitaloutpin":4}
41+
digitalout_id = digitalout_data.get("digitaloutid", 0)
42+
digitalout_val = digitalout_data.get("digitaloutval", 0)
43+
if 0 < digitalout_id <= self.nDigitalOuts:
44+
self.digitalOutValues[digitalout_id - 1] = digitalout_val
45+
elif isinstance(digitalout_data, list):
46+
# Multiple digitalouts format: [{"digitaloutid":1,"digitaloutval":1}, ...]
47+
for digitalout in digitalout_data:
48+
digitalout_id = digitalout.get("digitaloutid", 0)
49+
digitalout_val = digitalout.get("digitaloutval", 0)
50+
if 0 < digitalout_id <= self.nDigitalOuts:
51+
self.digitalOutValues[digitalout_id - 1] = digitalout_val
52+
53+
# Call all registered callbacks for key 0
54+
for callback in self._callbackPerKey[0]:
55+
if callable(callback):
56+
try:
57+
callback(self.digitalOutValues)
58+
except Exception as callback_error:
59+
print(f"Error in callback execution: {callback_error}")
60+
except Exception as e:
61+
print("Error in _callback_digitalout_status: ", e)
62+
63+
def register_callback(self, key, callbackfct):
64+
''' register a callback function for a specific key - supports multiple callbacks per key '''
65+
if key not in self._callbackPerKey:
66+
self._callbackPerKey[key] = []
67+
if callbackfct not in self._callbackPerKey[key]: # Avoid duplicate registrations
68+
self._callbackPerKey[key].append(callbackfct)
69+
print(f"Registered callback for key {key}. Total callbacks for this key: {len(self._callbackPerKey[key])}")
770

871
def setup_digitaloutpin(self, id=1, pin=4):
972
path = '/digitalout_set'
@@ -63,3 +126,62 @@ def sendTrigger(self, triggerId=0):
63126

64127
r = self._parent.post_json(path, payload)
65128
return r
129+
130+
def get_digitalout(self, digitaloutid=1, timeout=1, is_blocking=True):
131+
"""
132+
Get the current value and pin of a digital output.
133+
134+
Parameters:
135+
-----------
136+
digitaloutid : int
137+
ID of the digital output (1, 2, or 3)
138+
timeout : float
139+
Timeout for the request in seconds
140+
is_blocking : bool
141+
Whether to wait for a response
142+
143+
Returns:
144+
--------
145+
dict or None
146+
Response from the device containing digitalout status (id, val, pin)
147+
"""
148+
path = '/digitalout_get'
149+
150+
payload = {
151+
"task": path,
152+
"digitaloutid": digitaloutid
153+
}
154+
155+
r = self._parent.post_json(path, payload, getReturn=is_blocking, timeout=timeout)
156+
return r
157+
158+
def set_digitalout(self, digitaloutid=1, digitaloutval=0, timeout=1, is_blocking=False):
159+
"""
160+
Set the value of a digital output. Use digitaloutval=-1 to trigger a pulse (HIGH->LOW).
161+
162+
Parameters:
163+
-----------
164+
digitaloutid : int
165+
ID of the digital output (1, 2, or 3)
166+
digitaloutval : int
167+
Value to set (0=LOW, 1=HIGH, -1=pulse/trigger)
168+
timeout : float
169+
Timeout for the request in seconds
170+
is_blocking : bool
171+
Whether to wait for a response
172+
173+
Returns:
174+
--------
175+
dict or None
176+
Response from the device
177+
"""
178+
path = '/digitalout_act'
179+
180+
payload = {
181+
"task": path,
182+
"digitaloutid": digitaloutid,
183+
"digitaloutval": digitaloutval
184+
}
185+
186+
r = self._parent.post_json(path, payload, getReturn=is_blocking, timeout=timeout)
187+
return r

0 commit comments

Comments
 (0)