Skip to content

Commit 1a1134b

Browse files
Initial commit
0 parents  commit 1a1134b

97 files changed

Lines changed: 7061 additions & 0 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# =========================
2+
# ROS2
3+
# =========================
4+
build/
5+
install/
6+
log/
7+
8+
# =========================
9+
# Python
10+
# =========================
11+
__pycache__/
12+
*.pyc
13+
*.pyo
14+
*.pyd
15+
*.so
16+
venv/
17+
.env
18+
19+
# =========================
20+
# VSCode / IDE
21+
# =========================
22+
.vscode/
23+
.idea/
24+
25+
# =========================
26+
# Linux / misc
27+
# =========================
28+
.DS_Store
29+
*.swp
30+
31+
# =========================
32+
# CSV outputs
33+
# =========================
34+
delsys-bridge/csv_test/
35+
36+
# =========================
37+
# Godot exported binaries
38+
# =========================
39+
delsys-bridge/Godot_v4.3-stable_linux.x86_64
40+
delsys-bridge/Godot_v4.3-stable_linux.x86_64.zip
41+
42+
# =========================
43+
# ROS2 generated files
44+
# =========================
45+
**/build/
46+
**/install/
47+
**/log/
48+
49+
delsys-bridge/Godot_v4.3-stable_linux.x86_64
50+
51+
# Python
52+
__pycache__/
53+
*.pyc
54+
55+
# ROS2 build files
56+
ros2_ws/build/
57+
ros2_ws/install/
58+
ros2_ws/log/
59+
build/
60+
install/
61+
log/
62+
63+
# Generated/test data
64+
delsys-bridge/csv_test/*.csv
65+
66+
# Large local binaries
67+
delsys-bridge/Godot_v4.3-stable_linux.x86_64

DELSYS.png

26.4 KB
Loading

arscontrol.jpeg

56.3 KB
Loading

arscontrol_symbol.png

27.1 KB
Loading
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
"""
2+
This is the class that handles the data that is output from the Delsys Trigno Base.
3+
Create an instance of this and pass it a reference to the Trigno base for initialization.
4+
See CollectDataController.py for a usage example.
5+
"""
6+
import numpy as np
7+
8+
9+
class DataKernel():
10+
def __init__(self, trigno_base):
11+
self.trigno_base = trigno_base
12+
self.TrigBase = trigno_base.TrigBase
13+
self.packetCount = 0
14+
self.sampleCount = 0
15+
self.allcollectiondata = []
16+
self.channel1time = []
17+
self.channel_guids = []
18+
19+
def processData(self, data_queue):
20+
"""Processes the data from the DelsysAPI and place it in the data_queue argument"""
21+
outArr = self.GetData()
22+
if outArr is not None:
23+
for i in range(len(outArr)):
24+
self.allcollectiondata[i].extend(outArr[i][0].tolist())
25+
try:
26+
for i in range(len(outArr[0])):
27+
if np.asarray(outArr[0]).ndim == 1:
28+
data_queue.append(list(np.asarray(outArr, dtype='object')[0]))
29+
else:
30+
data_queue.append(list(np.asarray(outArr, dtype='object')[:, i]))
31+
try:
32+
self.packetCount += len(outArr[0])
33+
self.sampleCount += len(outArr[0][0])
34+
except:
35+
pass
36+
except IndexError:
37+
pass
38+
39+
def processYTData(self, data_queue):
40+
"""Processes the data from the DelsysAPI and place it in the data_queue argument"""
41+
outArr = self.GetYTData()
42+
if outArr is not None:
43+
for i in range(len(outArr)):
44+
self.allcollectiondata[i].extend(outArr[i][0].tolist())
45+
try:
46+
yt_outArr = []
47+
for i in range(len(outArr)):
48+
chan_yt = outArr[i]
49+
chan_ydata = np.asarray([k.Item2 for k in chan_yt[0]], dtype='object')
50+
yt_outArr.append(chan_ydata)
51+
52+
data_queue.append(list(yt_outArr))
53+
54+
try:
55+
self.packetCount += len(outArr[0])
56+
self.sampleCount += len(outArr[0][0])
57+
except:
58+
pass
59+
except IndexError:
60+
pass
61+
62+
def GetData(self):
63+
""" Check if data ready from DelsysAPI via Aero CheckDataQueue() - Return True if data is ready
64+
Get data (PollData)
65+
Organize output channels by their GUID keys
66+
67+
Return array of all channel data
68+
"""
69+
70+
dataReady = self.TrigBase.CheckDataQueue() # Check if DelsysAPI real-time data queue is ready to retrieve
71+
if dataReady:
72+
try:
73+
DataOut = self.TrigBase.PollData() # Dictionary<Guid, List<double>> (key = Guid (Unique channel ID), value = List(Y) (Y = sample value)
74+
if len(list(DataOut.Keys)) > 0:
75+
outArr = [[] for i in range(len(self.trigno_base.channel_guids))] # Set output array size to the amount of channels set during ConfigureCollectionOutput() in TrignoBase.py
76+
77+
for j in range(len(self.trigno_base.channel_guids)): #Loop all channels set during configuration (default behavior is all channels unless updated)
78+
chan_data = DataOut[self.trigno_base.channel_guids[j]] # Index a single channels data from the dictionary based on unique channel GUID (key)
79+
outArr[j].append(np.asarray(chan_data, dtype='object')) # Create a NumPy array of the channel data and add to the output array
80+
81+
return outArr
82+
except Exception as e:
83+
print("Exception occured in GetData() - " + str(e))
84+
else:
85+
return None
86+
87+
def GetYTData(self):
88+
""" YT Data stream only available when passing 'True' to Aero Start() command i.e. TrigBase.Start(True)
89+
Check if data ready from DelsysAPI via Aero CheckYTDataQueue() - Return True if data is ready
90+
Get data (PollYTData)
91+
Organize output channels by their GUID keys
92+
93+
Return array of all channel data
94+
"""
95+
96+
dataReady = self.TrigBase.CheckYTDataQueue() # Check if DelsysAPI real-time data queue is ready to retrieve
97+
if dataReady:
98+
try:
99+
DataOut = self.TrigBase.PollYTData() # Dictionary<Guid, List<(double, double)>> (key = Guid (Unique channel ID), value = List<(T, Y)> (T = time stamp in seconds Y = sample value)
100+
if len(list(DataOut.Keys)) > 0:
101+
outArr = [[] for i in range(len(self.trigno_base.channel_guids))] # Set output array size to the amount of channels set during ConfigureCollectionOutput() in TrignoBase.py
102+
103+
for j in range(len(self.trigno_base.channel_guids)): #Loop all channels set during configuration (default behavior is all channels unless updated)
104+
chan_yt_data = DataOut[self.trigno_base.channel_guids[j]] # Index a single channels data from the dictionary based on unique channel GUID (key)
105+
outArr[j].append(np.asarray(chan_yt_data, dtype='object')) # Create a NumPy array of the channel data and add to the output array
106+
107+
return outArr
108+
109+
except Exception as e:
110+
print("Exception occured in GetYTData() - " + str(e))
111+
else:
112+
return None

0 commit comments

Comments
 (0)