Skip to content

Commit 545f497

Browse files
committed
Updates to config tools for models running rtlinux
1 parent ae552dd commit 545f497

2 files changed

Lines changed: 114 additions & 41 deletions

File tree

src/bs1200/cfg_tools.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from ftplib import FTP
22
from dataclasses import dataclass
3+
from paramiko import SSHClient
4+
from scp import SCPClient
35

46
class FtpHelper(object):
57
"""
@@ -18,16 +20,63 @@ def getFile(self, tgt_path, dest_path) -> str:
1820
retr = f.retrbinary(f"RETR {tgt_path}", file.write)
1921
return dest_path
2022
except Exception as e:
21-
print(e)
23+
raise e
2224

2325
def uploadFile(self, src_path, tgt_path):
2426
try:
2527
with FTP(self.tgt_address, self.username, self.password) as f:
2628
with open(src_path, "rb") as file:
2729
f.storbinary(f"STOR {tgt_path}", file)
30+
except Exception as e:
31+
raise e
32+
33+
class ScpHelper(object):
34+
"""
35+
Class used to wrap SSH and SCP file transfer for sbRIO 9603 controllers. Call using 'with' statements, or call open
36+
"""
37+
def __init__(self, tgt: str, user: str, password : str):
38+
self.tgt_address = tgt
39+
self.username = user
40+
self.password = password
41+
42+
def __enter__(self):
43+
try:
44+
self.open()
2845
except Exception as e:
2946
print(e)
3047

48+
def __exit__(self):
49+
self.close()
50+
51+
def open(self):
52+
"""Open """
53+
self.ssh = SSHClient()
54+
self.ssh.load_system_host_keys()
55+
self.ssh.connect(self.tgt_address)
56+
# SCPCLient takes a paramiko transport as an argument
57+
self.scp = SCPClient(self.ssh.get_transport())
58+
59+
def close(self):
60+
"""Close SCP and SSH connections"""
61+
self.scp.close()
62+
self.ssh.close()
63+
64+
def getFile(self, tgt_path, dest_path) -> str:
65+
try:
66+
with self.scp as s:
67+
s.get(remote_path=tgt_path, local_path=dest_path)
68+
return dest_path
69+
except Exception as e:
70+
print(e)
71+
72+
def uploadFile(self, src_path, tgt_path):
73+
try:
74+
with self.scp as s:
75+
s.put(files=src_path, remote_path=tgt_path)
76+
except Exception as e:
77+
print(e)
78+
79+
3180
@dataclass
3281
class Ethernet_Settings:
3382
IP_Address : str

src/bs1200/configuration.py

Lines changed: 64 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,32 @@ def __init__(self, tgt_address, username= "admin", password= ""):
2222
self.FTP = FtpHelper(tgt_address, username, password)
2323
self.ini_parser = ConfigParser()
2424

25+
def getFile(self, tgt_file: str, dest: str):
26+
#First try the FTP, if this fails to connect use SCP
27+
try:
28+
local_copy = self.FTP.getFile(tgt_file, dest)
29+
except:
30+
#if error from FTP, use SCP via SSH
31+
#fix the path to ni-rt.ini if ni-rt linux is target
32+
if("ni-rt.ini" in tgt_file):
33+
tgt_file = "etc/natinst/share/ni-rt.ini"
34+
with ScpHelper(self.ip_address, self.user, self.pwd) as s:
35+
local_copy = s.getFile(tgt_file, dest)
36+
finally:
37+
return local_copy
38+
39+
def uploadFile(self, tgt, dest_path):
40+
#First try the FTP, if this fails to connect use SCP
41+
try:
42+
self.FTP.uploadFile(tgt, dest_path)
43+
except:
44+
#if error from FTP, use SCP via SSH
45+
#fix the path to ni-rt.ini if ni-rt linux is target
46+
if("ni-rt.ini" in dest_path):
47+
dest_path = "etc/natinst/share/ni-rt.ini"
48+
with ScpHelper(self.ip_address, self.user, self.pwd) as s:
49+
s.uploadFile(tgt, dest_path)
50+
2551
def apply_config_file(self, cfg_file_path, restart: bool = True):
2652
"""
2753
Takes in a .json file with BS1200 configuration object in the following format
@@ -78,35 +104,37 @@ def get_all_settings(self, export_to_file: bool,
78104
can = self.get_can_settings()
79105
mode = self.get_protocol()
80106
interlock = self.interlock_enabled()
81-
config = {'Protocol' : mode, 'IP_Address' : ethernet.IP_Address,
82-
'Ethernet_Settings':{
83-
'TCP_Cmd_Port' : ethernet.Command_Port,
84-
'TCP_Cmd_Interval_ms' : ethernet.Command_Interval_ms,
85-
'UDP_Read_Port' : ethernet.Reporting_Port,
86-
'UDP_Read_Interval_ms' : ethernet.Reporting_Interval_ms
87-
},
88-
'CAN_Settings':
89-
{
90-
'Box_ID' : can.box_id,
91-
'Write_Period_ms' : int(can.publish_period_us/1000)
92-
},
93-
'Enable_SafetyInterlock' : interlock
94-
}
107+
config = {
108+
'Protocol' : mode,
109+
'IP_Address' : ethernet.IP_Address,
110+
'Ethernet_Settings':
111+
{
112+
'TCP_Cmd_Port' : ethernet.Command_Port,
113+
'TCP_Cmd_Interval_ms' : ethernet.Command_Interval_ms,
114+
'UDP_Read_Port' : ethernet.Reporting_Port,
115+
'UDP_Read_Interval_ms' : ethernet.Reporting_Interval_ms
116+
},
117+
'CAN_Settings':
118+
{
119+
'Box_ID' : can.box_id,
120+
'Write_Period_ms' : int(can.publish_period_us/1000)
121+
},
122+
'Enable_SafetyInterlock' : interlock
123+
}
95124
if export_to_file:
96125
with open(file_name, 'w') as f:
97126
f.write(json.dumps(config, indent=4))
98127
return json.dumps(config)
99128

100-
101-
102129
def apply_general_settings(self, protocol: Protocol, restart: bool = True):
103130
"""
104131
Sets the protocol used by the target BS1200 unit
105132
Valid values for protocol are Protocol.CAN or Protocol.Ethernet
106133
"""
107134
rt_path = "/ni-rt/startup/Configuration Files/General Settings.xml"
108135
temp_path = "General Settings.xml"
109-
cfgfile_path = self.FTP.getFile(rt_path, temp_path)
136+
137+
cfgfile_path = self.retrieveFile(rt_path, temp_path)
110138
#build an XML tree for the xml file
111139
ET.register_namespace("", "http://www.ni.com/LVData")
112140
tree = ET.parse(cfgfile_path)
@@ -120,13 +148,13 @@ def apply_general_settings(self, protocol: Protocol, restart: bool = True):
120148
tree.write(cfgfile_path, encoding="utf-8", xml_declaration=True, short_empty_elements=False)
121149
#replace the configuration file on the target
122150
self.__replace_xml_declaration(cfgfile_path)
123-
self.FTP.uploadFile(cfgfile_path, rt_path)
124-
#restart unit by default
151+
self.uploadFile(cfgfile_path, rt_path)
152+
125153
os.remove(temp_path)
154+
#restart unit by default
126155
if restart:
127156
self.__restart_unit()
128157

129-
130158
def apply_ethernet_settings(self, tcpip_cfg: Ethernet_Settings, restart: bool = True):
131159
"""
132160
Applies the provided TCP and UDP ethernet stack settings, and updates the IP address of
@@ -142,15 +170,14 @@ def apply_ethernet_settings(self, tcpip_cfg: Ethernet_Settings, restart: bool =
142170
if restart:
143171
self.__restart_unit()
144172

145-
146173
def apply_can_config(self, can_cfg: CAN_Settings, restart: bool = True):
147174
"""
148175
Applies the provided CAN Settings to the startup configuration,
149176
settings a new Box ID and Publish Period (ms) for the BS1200 unit
150177
"""
151178
rt_path = "/ni-rt/startup/Data/BS1200 Configuration.ini"
152179
temp_path = "BS1200 Configuration.ini"
153-
cfgfile_path = self.FTP.getFile(rt_path, temp_path)
180+
cfgfile_path = self.getFile(rt_path, temp_path)
154181
self.__fix_XML_tags(cfgfile_path)
155182
#build an XML tree for the .ini file 🤦‍♂️
156183
tree = ET.parse(cfgfile_path)
@@ -163,7 +190,7 @@ def apply_can_config(self, can_cfg: CAN_Settings, restart: bool = True):
163190
tree.write(cfgfile_path, encoding="utf-8", xml_declaration=True, short_empty_elements=False)
164191
#replace the configuration file on the target
165192
self.__replace_xml_declaration(cfgfile_path)
166-
self.FTP.uploadFile(cfgfile_path, rt_path)
193+
self.uploadFile(cfgfile_path, rt_path)
167194
os.remove(temp_path)
168195
#restart unit by default
169196
if restart:
@@ -175,7 +202,7 @@ def enable_safety_interlock(self, interlock, restart: bool = True):
175202
"""
176203
rt_path = "/ni-rt/startup/Data/BS1200 Configuration.ini"
177204
temp_path = "BS1200 Configuration.ini"
178-
cfgfile_path = self.FTP.getFile(rt_path, temp_path)
205+
cfgfile_path = self.getFile(rt_path, temp_path)
179206
self.__fix_XML_tags(cfgfile_path)
180207
#build an XML tree for the .ini file 🤦‍♂️
181208
tree = ET.parse(cfgfile_path)
@@ -186,7 +213,7 @@ def enable_safety_interlock(self, interlock, restart: bool = True):
186213
tree.write(cfgfile_path, encoding="utf-8", xml_declaration=True, short_empty_elements=False)
187214
#replace the configuration file on the target
188215
self.__replace_xml_declaration(cfgfile_path)
189-
self.FTP.uploadFile(cfgfile_path, rt_path)
216+
self.uploadFile(cfgfile_path, rt_path)
190217
os.remove(temp_path)
191218
#restart unit by default
192219
if restart:
@@ -201,7 +228,7 @@ def set_ip_address(self, new_ip_address: str):
201228
if("255" in new_ip_address):
202229
raise ValueError("Broadcast bytes are not allowed for the unit IP address")
203230
#get the ni-rt.ini file from the BS1200 controller root directory
204-
self.FTP.getFile('ni-rt.ini', 'ni-rt.ini')
231+
self.getFile('ni-rt.ini', 'ni-rt.ini')
205232
#open the local copy of the cfg file and set the IP Address parameter of the
206233
#TCP_STACK_CONFIG section
207234
self.ini_parser.read('ni-rt.ini')
@@ -210,19 +237,18 @@ def set_ip_address(self, new_ip_address: str):
210237
with open('ni-rt.ini', 'w') as file:
211238
self.ini_parser.write(file)
212239
#Send the file back to the target BS1200, replacing the entire file
213-
self.FTP.uploadFile('ni-rt.ini', 'ni-rt.ini')
240+
self.uploadFile('ni-rt.ini', 'ni-rt.ini')
214241
#remove local copy of cfg file and update the FTP helper's stored target address
215242
os.remove('ni-rt.ini')
216243
self.FTP.tgt_address = self.ip_address = new_ip_address
217244

218-
219245
def set_tcp_settings(self, ip_address : str, cmd_port: int, cmd_interval_ms: int):
220246
"""
221247
Update the TCP related settings in the Ethernet configuration
222248
"""
223249
rt_path = "/ni-rt/startup/Configuration Files/TCP Settings.xml"
224250
temp_path = "TCP Settings.xml"
225-
cfgfile_path = self.FTP.getFile(rt_path, temp_path)
251+
cfgfile_path = self.getFile(rt_path, temp_path)
226252
#build an XML tree for the xml file
227253
ET.register_namespace("", "http://www.ni.com/LVData")
228254
tree = ET.parse(cfgfile_path)
@@ -245,17 +271,16 @@ def set_tcp_settings(self, ip_address : str, cmd_port: int, cmd_interval_ms: int
245271
tree.write(cfgfile_path, encoding="utf-8", xml_declaration=True, short_empty_elements=False)
246272
self.__replace_xml_declaration(cfgfile_path)
247273
#replace the configuration file on the target
248-
self.FTP.uploadFile(cfgfile_path, rt_path)
274+
self.uploadFile(cfgfile_path, rt_path)
249275
os.remove(temp_path)
250276

251-
252277
def set_udp_settings(self, rep_port: int, rep_interval_ms: int):
253278
"""
254279
Update the UDP related settings in the Ethernet configuration
255280
"""
256281
rt_path = "/ni-rt/startup/Configuration Files/UDP Settings.xml"
257282
temp_path = "UDP Settings.xml"
258-
cfgfile_path = self.FTP.getFile(rt_path, temp_path)
283+
cfgfile_path = self.getFile(rt_path, temp_path)
259284
#build an XML tree for the xml file
260285
ET.register_namespace("", "http://www.ni.com/LVData")
261286
tree = ET.parse(cfgfile_path)
@@ -273,7 +298,7 @@ def set_udp_settings(self, rep_port: int, rep_interval_ms: int):
273298
tree.write(cfgfile_path, encoding="utf-8", xml_declaration=True, short_empty_elements=False)
274299
self.__replace_xml_declaration(cfgfile_path)
275300
#replace the configuration file on the target
276-
self.FTP.uploadFile(cfgfile_path, rt_path)
301+
self.uploadFile(cfgfile_path, rt_path)
277302
os.remove(temp_path)
278303

279304
def get_can_settings(self):
@@ -282,7 +307,7 @@ def get_can_settings(self):
282307
"""
283308
rt_path = "/ni-rt/startup/Data/BS1200 Configuration.ini"
284309
temp_path = "BS1200 Configuration.ini"
285-
cfgfile_path = self.FTP.getFile(rt_path, temp_path)
310+
cfgfile_path = self.getFile(rt_path, temp_path)
286311
self.__fix_XML_tags(cfgfile_path)
287312
#build an XML tree for the .ini file 🤦‍♂️
288313
tree = ET.parse(cfgfile_path)
@@ -294,7 +319,7 @@ def get_ethernet_settings(self):
294319
"""
295320
Retreive the ethernet settings and IP address from the target device
296321
"""
297-
self.FTP.getFile('ni-rt.ini', 'ni-rt.ini')
322+
self.getFile('ni-rt.ini', 'ni-rt.ini')
298323
#open the local copy of the cfg file and set the IP Address parameter of the
299324
#TCP_STACK_CONFIG section
300325
self.ini_parser.read('ni-rt.ini')
@@ -305,8 +330,8 @@ def get_ethernet_settings(self):
305330
temp_path1 = "TCP Settings.xml"
306331
temp_path2 = "UDP Settings.xml"
307332

308-
tcp_cfgfile_path = self.FTP.getFile(tcp_path, temp_path1)
309-
udp_cfgfile_path = self.FTP.getFile(udp_path, temp_path2)
333+
tcp_cfgfile_path = self.getFile(tcp_path, temp_path1)
334+
udp_cfgfile_path = self.getFile(udp_path, temp_path2)
310335
#build an XML tree for the xml file
311336
ET.register_namespace("", "http://www.ni.com/LVData")
312337
tcp_tree = ET.parse(tcp_cfgfile_path)
@@ -342,22 +367,21 @@ def interlock_enabled(self):
342367
"""
343368
rt_path = "/ni-rt/startup/Data/BS1200 Configuration.ini"
344369
temp_path = "BS1200 Configuration.ini"
345-
cfgfile_path = self.FTP.getFile(rt_path, temp_path)
370+
cfgfile_path = self.getFile(rt_path, temp_path)
346371
self.__fix_XML_tags(cfgfile_path)
347372
#build an XML tree for the .ini file 🤦‍♂️
348373
tree = ET.parse(cfgfile_path)
349374
root = tree.getroot()
350375
os.remove(temp_path)
351376
return True if root[3].text == "TRUE" else False
352377

353-
354378
def get_protocol(self):
355379
"""
356380
Get the procotol form the general settings
357381
"""
358382
rt_path = "/ni-rt/startup/Configuration Files/General Settings.xml"
359383
temp_path = "General Settings.xml"
360-
cfgfile_path = self.FTP.getFile(rt_path, temp_path)
384+
cfgfile_path = self.getFile(rt_path, temp_path)
361385
#build an XML tree for the xml file
362386
ET.register_namespace("", "http://www.ni.com/LVData")
363387
tree = ET.parse(cfgfile_path)

0 commit comments

Comments
 (0)