-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
344 lines (255 loc) · 10.2 KB
/
main.py
File metadata and controls
344 lines (255 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/local/bin/python3
# Simple way to query connected USB devices info in Python?
# https://newbedev.com/simple-way-to-query-connected-usb-devices-info-in-python
# usb.product
# uThingVOC - uThingVOC
# ESP32 - CP2104 USB to UART Bridge Controller
# ARDUINO ELEGOO - IOUSBHostDevice
# ARDUINO - USB2.0-Serial
# ARDUINO Original - Arduino Uno
# IMPORTS
import os
import re
from typing import Union, Tuple
import serial
import time
import json
import csv
import yaml
from datetime import datetime
from json import JSONDecodeError
from serial import Serial
from signal import signal, SIGINT
import serial.tools.list_ports as list_ports
import pandas as pd
def __board_setup(port, baud=9600, timeout=1, set_config='') -> Serial:
if not port:
raise Exception('ERROR(board_setup): Indicate a communication port.')
uart = serial.Serial(port, baud, timeout=timeout)
if set_config:
uart.write(set_config.encode())
return uart
def board_setup(port_info) -> Union[Serial, Serial, Serial, Serial]:
if port_info.product in config.keys():
setup = config[port_info.product]
return __board_setup(port_info.device, setup['baud'], setup['timeout'], setup['set-config'])
def u_thing_voc_read_values(uart) -> Tuple[any, any, dict]:
uart.flushInput()
while True:
try:
__json = uart.readline().decode()
__data = dict(json.loads(__json))
__keys = __data.keys()
__values = __data.values()
return __keys, __values, __data
except (JSONDecodeError, AttributeError):
pass
except Exception:
raise Exception('ERROR(u_thing_voc_read_values).')
def esp32_voc_read_values(uart) -> Tuple[dict, list, dict]:
uart.flushInput()
while True:
try:
__json = uart.readline().decode()
__data = json.loads(__json)
__keys = __data['sensors'][0].keys()
__values = []
count = 0
for __row in __data['sensors']:
count += 1
__values.append(__row.values())
if count == 8:
return __keys, __values, __data
except (JSONDecodeError, AttributeError):
pass
except Exception:
raise Exception('ERROR(esp32_voc_read_values).')
def create_compact_excel_file() -> None:
__csv_files = os.listdir(folder_path_csv)
__writer = pd.ExcelWriter(os.path.join(folder_path_excel, 'resume.xlsx'), engine='xlsxwriter')
for __file_name in __csv_files:
__csv_data = pd.read_csv(os.path.join(folder_path_csv, __file_name),
encoding="utf-8",
low_memory=False)
__name = re.split(r'\\|/|\.', __file_name)[-2]
if len(__name) > 30:
__words = __name.split();
__name = __words[0] + __words[-1]
__csv_data.to_excel(os.path.join(folder_path_excel, f"{__name}.xlsx"), encoding="utf-8")
__csv_data.to_excel(__writer, sheet_name=__name, index=False)
__writer.save()
def handler(event, context) -> None:
program_header()
res = input("\nCtrl-c was pressed. Do you really want to exit? y/n ").lower()
if res == 'y':
create_file = input("Want to create a compact excel file? y/n ").lower()
if create_file == 'y':
program_header()
print("In process...")
create_compact_excel_file()
exit(1)
def program_header() -> None:
os.system('clear')
print("\033[0;0H")
print("." * 110)
print("MIEEC - Sensor project\n")
print("Ports in use....")
for _device in devices:
print(_device["product"], " : ", _device["serial_number"])
print("." * 120)
def exists(device) -> bool:
for __device in devices:
if __device["product"] == device.product and __device["serial_number"] == device.serial_number:
return True
return False
def update_devices(list_devices):
if not devices:
for __device in __list_devices:
__uart = board_setup(__device)
devices.append(
{
"product": __device.product,
"serial_number": __device.serial_number,
"uart": __uart
}
)
else:
for __device in list_devices:
if not exists(__device):
__uart = board_setup(__device)
devices.append(
{
"product": __device.product,
"serial_number": __device.serial_number,
"uart": __uart
}
)
if not len(devices) == len(list_devices):
for __device in devices:
exist = False
for __dev in list_devices:
if __device["product"] == __dev.product and __device["serial_number"] == __dev.serial_number:
exist = True
break
if not exist:
devices.remove(__device)
def show(keys, rows) -> None:
for key in keys:
print("{:^15s}".format(key), end='')
print()
for __row in rows:
for col in __row:
print("{:^15}".format(col), end='')
print()
print()
def error_log(error) -> None:
pass
def create_directory(__name) -> Tuple[str, str]:
__folders = list(filter(lambda x: os.path.isdir(x), os.listdir(path_root)))
__folder_path = os.path.join(path_root, "data_files")
if "data_files" not in __folders:
os.mkdir(__folder_path)
__name = "default" if __name == "-d" or __name == "default" else f"{__name}_{str(datetime.now().date())}"
__folders = os.listdir(__folder_path)
__folder_path = os.path.join(__folder_path, __name)
if __name not in __folders:
os.mkdir(__folder_path)
__folder_path_csv = os.path.join(__folder_path, "csv")
__folder_path_excel = os.path.join(__folder_path, "excel")
__folders = os.listdir(__folder_path)
if "csv" not in __folders:
os.mkdir(__folder_path_csv)
if "excel" not in __folders:
os.mkdir(__folder_path_excel)
return __folder_path_csv, __folder_path_excel
def read_values(__devices) -> list:
_data = []
for __device in __devices:
__keys = []
__values = []
__data = []
if __device['product'] == "uThingVOC":
__keys, __values, __data = u_thing_voc_read_values(__device["uart"])
elif __device['product'] == "CP2104 USB to UART Bridge Controller":
__keys, __values, __data = esp32_voc_read_values(__device["uart"])
_data.append({
"product": __device['product'],
"serial_number": __device['serial_number'],
"keys": __keys,
"values": __values,
"data": __data
})
return _data
def show_values(__data) -> None:
program_header()
print("Time: ", str(datetime.now()), "\n")
for __d in __data:
if __d['product'] in "uThingVOC":
print(__d["product"], "-", __d["serial_number"])
show(__d["keys"], [__d["values"]])
elif __d['product'] in "CP2104 USB to UART Bridge Controller":
print(__d["product"], "-", __d["serial_number"])
show(__d["keys"], __d["values"])
def save_in_csv(__data, __folder_path_csv) -> None:
__files = os.listdir(__folder_path_csv)
for __d in data:
__name = "{}_{}.csv".format(__d['product'], __d['serial_number'])
if len(__name) > 30:
__words = __name.split();
__name = __words[0] + __words[-1]
__path = os.path.join(__folder_path_csv, __name)
if __name not in __files:
with open(__path, 'w', encoding='UTF8', newline='\n') as f:
writer = csv.writer(f)
writer.writerow(["time", *__d["keys"]])
with open(__path, 'a', encoding='UTF8', newline='\n') as f:
writer = csv.writer(f)
if __d['product'] in "uThingVOC":
writer.writerow([str(datetime.now()), *__d["values"]])
elif __d['product'] in "CP2104 USB to UART Bridge Controller":
for row in __d["values"]:
writer.writerow([str(datetime.now()), *row])
if __name__ == '__main__':
# This function is used to prevent "ctrl-c" or other forced interruption
signal(SIGINT, handler)
# Variables
devices = [] # all important devices
# Save the path to the root directory
path_root = os.path.abspath(os.getcwd())
# Read all configurations from config.yamlconfig.yaml
# If config.yaml does not exist, create a "config.yaml"
# file in the root directory. Use the "__reference__config.yaml"
# to help you
with open('config.yaml', 'r') as f:
__yaml = yaml.load(f, Loader=yaml.FullLoader)
config = __yaml['devices']
while True:
try:
# Create a folder to save the tests session
name = input('What is the name you want to give this test session?\n Write ("default" or "-d") to '
'save in default file. #> ').lower()
if not name:
program_header()
print("\n🚨 I have a little problem to understand your choice.\nSorry 😞, write again... \n")
else:
break
except Exception as e:
print(e)
exit(1)
# crate directory to save data
folder_path_csv, folder_path_excel = create_directory(name)
while True:
# List all devices connected to a serial port
__list_devices = list_ports.comports()
# Filter all pre-referenced devices (config.yaml)
__list_devices = list(filter(lambda __device: __device.product in config.keys(), __list_devices))
# Update device list and configure devices
update_devices(__list_devices)
# Read values from all sensors
data = read_values(devices)
# Show read values
show_values(data)
# Save information in csv file
if data:
save_in_csv(data, folder_path_csv)
time.sleep(5)