-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreader.py
More file actions
421 lines (363 loc) · 17.9 KB
/
reader.py
File metadata and controls
421 lines (363 loc) · 17.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# =================================================================================================================================== #
# ----------------------------------------------------------- DESCRIPTION ----------------------------------------------------------- #
# Obtains data. It connects to the MQTT broker and subscribes to the topics 'data/meas' and 'rfid/tde'. The script sends a message to #
# the topic 'data/meas' with the parameters 'pos_x', 'pos_y', 'pos_z', and 'moist', etc.... The script receives the RFID data from #
# the topic 'rfid/tde' and writes the data to a file. #
# =================================================================================================================================== #
# =================================================================================================================================== #
# --------------------------------------------------------- EXTERNAL IMPORTS -------------------------------------------------------- #
import datetime # Basic date and time types. #
import json # JSON encoder and decoder. #
import time # Time access and conversions. #
import paho.mqtt.client as mqtt # MQTT version 3.1/3.1.1 client class. #
import os # Miscellaneous operating system interfaces. #
import pytz # World Timezone Definitions. #
import re # Regular expression operations. #
# =================================================================================================================================== #
TAGS = ['Belt_G2iL', 'Belt_9XM', 'Dogbone_Monza_4D', 'Frog_3D', 'Miniweb_G2iL',
'Shortdipole_G2iL', 'Shortdipole_Monza_R6', 'Spine', 'Trap', 'Viper', 'Web_G2iL', 'Web_U9']
OBSTACLES = ['None', 'Cardboard', 'Metal', 'Wood', 'Human_Hand', 'Human_Body', 'Soil_(Dry)', 'Container_Water', 'Soil_(Wet)']
HEADERS = [
'n_group', 'eventNum', 'peakRssi', 'phase', 'channel', 'pos_x', 'pos_y', 'pos_z', 'depth',
'angle_rx', 'angle_ry', 'angle_rz', 'angle_tx', 'angle_ty', 'angle_tz', 'moist', 'antenna',
'reads', 'idHex', 'format', 'PC', 'CRC', 'MAC', 'hostName', 'time_reader', 'type',
'power', 'power2', 'tag', 'obs', 'cmd', 'cmd_issued', 'cmd_received'
]
# =================================================================================================================================== #
# --------------------------------------------------------- BASIC FUNCTIONS --------------------------------------------------------- #
def filename_generator(tag: int, freq: float, distance: float, obstacle: int, initial_filepath: str, power: float, suffix: str = '') -> str:
"""
Generate a filename based on: tag, frequency, distance, and obstacle.
Parameters:
- tag [int]: The Coded Tag:
- 0: Belt G2iL
- 1: Belt 9XM
- 2: Dogbone Monza 4D
- 3: Frog 3D
- 4: Miniweb G2iL
- 5: Shortdipole G2iL
- 6: Shortdipole Monza R6
- 7: Spine
- 8: Trap
- 9: Viper
- 10: Web G2iL
- 11: Web U9
- freq [float]: The Frequency [MHz]
- distance [float]: The Distance [m]
- obstacle [int]: The Obstacle:
- 0: None
- 1: Cardboard
- 2: Metal
- 3: Wood
- 4: Human Hand
- 5: Human Body
- 6: Soil (Dry)
- 7: Container with Water
- 8: Soil Type 1
- 9: Soil Type 2
- initial_filepath [str]: The initial filepath to save the file.
- power [float]: The transmission power of the RFID reader, in dBm.
Returns:
str: The generated filename.
"""
# Check if the tag is valid
if tag < 0 or tag > 11:
raise ValueError(f"Invalid tag: {tag}")
# Check if the obstacle is valid
if obstacle < 0 or obstacle > 9:
raise ValueError(f"Invalid obstacle: {obstacle}")
# Check if the frequency is valid
if freq < 0.0:
raise ValueError(f"Invalid frequency: {freq}")
# Check if the distance is valid
if distance < 0.0:
raise ValueError(f"Invalid distance: {distance}")
if 10.0 > power or power > 29.2:
raise ValueError(f"Invalid power: {power}")
# Get the current date and time with timezone
tz = pytz.timezone('CET')
current_time = datetime.datetime.now(tz).strftime("%Y%m%d_%H%M%S")
return f"{initial_filepath}/{current_time}{suffix}_{TAGS[tag]}_{freq}MHz_{distance}m_{OBSTACLES[obstacle]}_{power}dBm.csv"
def validate_filename(filename: str) -> bool:
"""
Validate the filename and ensure the directory exists.
Args:
filename (str): The file to validate.
Returns:
bool: True if the filename is valid and the directory exists, False otherwise.
"""
try:
directory = os.path.dirname(filename)
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
print(f"Directory created: {directory}")
else:
print(f"Directory already exists: {directory}")
# Try opening the file to ensure it can be written to
with open(filename, 'a+') as f:
if os.path.getsize(filename) == 0:
f.write(','.join(HEADERS) + '\n')
print(f"Filename is valid: {filename}")
return True
except Exception as e:
print(f"Error validating filename: {e}")
return False
def get_measurments(tag: str, filepath: str, obstacle: str) -> list:
"""
Obtain a list of all files of a certain tag with no obstacle in the given directory.
Files will be ordered by the distance value, in ascending order.
Remember, the filename format is: {date}_{timestamp}_{tag}_{freq}MHz_{distance}m_{obstacle}.csv
Args:
tag (str): The tag to search for.
filepath (str): The directory to search in.
Returns:
list: A list of all files with the given tag and no obstacle.
"""
files = []
# Get files with the given tag
for file in os.listdir(filepath):
if tag in file and obstacle in file:
files.append(file)
# Sort files by distance
files.sort(key=lambda x: float(re.search(r'(\d+\.\d+)m', x).group(1)))
# Prepend the filepath to each filename
files = [os.path.join(filepath, file) for file in files]
return files
def get_cardboard_measurments(tag: str, filepath: str) -> list:
"""
Obtain a list of all files of a certain tag with a cardboard obstacle in the given directory.
Files will be ordered by the distance value, in ascending order.
Remember, the filename format is: {date}_{timestamp}_{tag}_{freq}MHz_{distance}m_{obstacle}.csv
Args:
tag (str): The tag to search for.
filepath (str): The directory to search in.
Returns:
list: A list of all files with the given tag and a cardboard obstacle.
"""
files = []
# Get files with the given tag
for file in os.listdir(filepath):
if tag in file and 'Cardboard' in file:
files.append(file)
# Sort files by distance
files.sort(key=lambda x: float(re.search(r'(\d+\.\d+)m', x).group(1)))
# Prepend the filepath to each filename
files = [os.path.join(filepath, file) for file in files]
return files
def get_metal_measurments(tag: str, filepath: str) -> list:
"""
Obtain a list of all files of a certain tag with a metal obstacle in the given directory.
Files will be ordered by the distance value, in ascending order.
Remember, the filename format is: {date}_{timestamp}_{tag}_{freq}MHz_{distance}m_{obstacle}.csv
Args:
tag (str): The tag to search for.
filepath (str): The directory to search in.
Returns:
list: A list of all files with the given tag and a metal obstacle.
"""
files = []
# Get files with the given tag
for file in os.listdir(filepath):
if tag in file and 'Metal' in file:
files.append(file)
# Sort files by distance
files.sort(key=lambda x: float(re.search(r'(\d+\.\d+)m', x).group(1)))
# Prepend the filepath to each filename
files = [os.path.join(filepath, file) for file in files]
return files
def get_wood_measurments(tag: str, filepath: str) -> list:
"""
Obtain a list of all files of a certain tag with a wood obstacle in the given directory.
Files will be ordered by the distance value, in ascending order.
Remember, the filename format is: {date}_{timestamp}_{tag}_{freq}MHz_{distance}m_{obstacle}.csv
Args:
tag (str): The tag to search for.
filepath (str): The directory to search in.
Returns:
list: A list of all files with the given tag and a wood obstacle.
"""
files = []
# Get files with the given tag
for file in os.listdir(filepath):
if tag in file and 'Wood' in file:
files.append(file)
# Sort files by distance
files.sort(key=lambda x: float(re.search(r'(\d+\.\d+)m', x).group(1)))
# Prepend the filepath to each filename
files = [os.path.join(filepath, file) for file in files]
return files
def get_human_hand_measurments(tag: str, filepath: str) -> list:
"""
Obtain a list of all files of a certain tag with a human hand obstacle in the given directory.
Files will be ordered by the distance value, in ascending order.
Remember, the filename format is: {date}_{timestamp}_{tag}_{freq}MHz_{distance}m_{obstacle}.csv
Args:
tag (str): The tag to search for.
filepath (str): The directory to search in.
Returns:
list: A list of all files with the given tag and a human hand obstacle.
"""
files = []
# Get files with the given tag
for file in os.listdir(filepath):
if tag in file and 'Human_Hand' in file:
files.append(file)
# Sort files by distance
files.sort(key=lambda x: float(re.search(r'(\d+\.\d+)m', x).group(1)))
# Prepend the filepath to each filename
files = [os.path.join(filepath, file) for file in files]
return files
def get_human_body_measurments(tag: str, filepath: str) -> list:
"""
Obtain a list of all files of a certain tag with a human body obstacle in the given directory.
Files will be ordered by the distance value, in ascending order.
Remember, the filename format is: {date}_{timestamp}_{tag}_{freq}MHz_{distance}m_{obstacle}.csv
Args:
tag (str): The tag to search for.
filepath (str): The directory to search in.
Returns:
list: A list of all files with the given tag and a human body obstacle.
"""
files = []
# Get files with the given tag
for file in os.listdir(filepath):
if tag in file and 'Human_Body' in file:
files.append(file)
# Sort files by distance
files.sort(key=lambda x: float(re.search(r'(\d+\.\d+)m', x).group(1)))
# Prepend the filepath to each filename
files = [os.path.join(filepath, file) for file in files]
return files
def get_soil_measurments(tag: str, filepath: str) -> list:
"""
Obtain a list of all files of a certain tag with a soil (dry) obstacle in the given directory.
Files will be ordered by the distance value, in ascending order.
Remember, the filename format is: {date}_{timestamp}_{tag}_{freq}MHz_{distance}m_{obstacle}.csv
Args:
tag (str): The tag to search for.
filepath (str): The directory to search in.
Returns:
list: A list of all files with the given tag and a soil (dry) obstacle.
"""
files = []
# Get files with the given tag
for file in os.listdir(filepath):
if tag in file and 'Soil_(Dry)' in file:
files.append(file)
# Sort files by distance
files.sort(key=lambda x: float(re.search(r'(\d+\.\d+)m', x).group(1)))
# Prepend the filepath to each filename
files = [os.path.join(filepath, file) for file in files]
return files
def get_container_water_measurments(tag: str, filepath: str) -> list:
"""
Obtain a list of all files of a certain tag with a container with water obstacle in the given directory.
Files will be ordered by the distance value, in ascending order.
Remember, the filename format is: {date}_{timestamp}_{tag}_{freq}MHz_{distance}m_{obstacle}.csv
Args:
tag (str): The tag to search for.
filepath (str): The directory to search in.
Returns:
list: A list of all files with the given tag and a container with water obstacle.
"""
files = []
# Get files with the given tag
for file in os.listdir(filepath):
if tag in file and 'Container_Water' in file:
files.append(file)
# Sort files by distance
files.sort(key=lambda x: float(re.search(r'(\d+\.\d+)m', x).group(1)))
# Prepend the filepath to each filename
files = [os.path.join(filepath, file) for file in files]
return files
def on_connect(client: mqtt.Client, userdata: dict, flags: dict, reason_code: int, properties: dict) -> None:
"""
This function is called when the client is connected to the MQTT broker.
Args:
client (mqtt.Client): MQTT client instance.
userdata (dict): User data.
flags (dict): Response flags sent by the broker.
reason_code (int): Response code sent by the broker.
properties (dict): Response properties sent by the broker.
Returns:
None.
"""
print(f"Connected with result code {reason_code}")
client.subscribe("data/meas")
client.subscribe("rfid/tde")
def on_message(client: mqtt.Client, userdata: dict, msg: mqtt.MQTTMessage) -> None:
"""
This function is called when a message is received from the MQTT broker.
Args:
client (mqtt.Client): MQTT client instance.
userdata (dict): User data.
msg (mqtt.MQTTMessage): Message received from the MQTT broker.
Returns:
None.
"""
filename = userdata['file']
if msg.topic == 'rfid/tde':
userdata['n_reads'] += 1
data = json.loads(msg.payload)
time_utc = time.strptime(data["timestamp"], '%Y-%m-%dT%H:%M:%S.%f%z')
time_utc = time.mktime(time_utc) + float(data["timestamp"][-9:-5]) # + 6 * 3600.
print(msg.topic + f' ({datetime.datetime.fromtimestamp(time_utc)}) ' + str(msg.payload))
userdata['cmd_received'] = time.monotonic() if userdata.get('cmd') else ''
userdata['time_reader'] = time_utc
userdata['type'] = data['type']
userdata.update(data['data'])
rec = []
for h in HEADERS:
dat = userdata.get(h, '')
rec.append(dat if type(dat) is str else f'{dat:.14g}')
with open(filename, 'a+') as f:
f.write(',\t'.join(rec) + '\n')
if userdata['cmd_received']:
userdata.update({'cmd': '', 'cmd_issued': '', 'cmd_received': ''})
elif msg.topic == 'data/meas':
# mosquitto_pub -h 158.109.74.35 -p 1883 -t 'data/meas' -m '{"pos_x": 0, "pos_y": -.5, "pos_z": 0, "moist": 0}'
userdata.update(json.loads(msg.payload))
userdata['n_group'] += 1
# =================================================================================================================================== #
# =================================================================================================================================== #
# --------------------------------------------------------- MAIN FUNCTION ----------------------------------------------------------- #
def read(file_opt: dict, server_ip: str, server_port: int, duration: float) -> None:
"""
Entry point of the script. Connects to the MQTT broker, subscribes to topics, and handles incoming messages.
Args:
file_opt (dict): The options for the file to write to.
server_ip (str): The IP address of the MQTT broker (server).
server_port (int): The port of the MQTT broker (server).
duration (float): The duration of the reading process, in seconds.
Returns:
None
"""
filename = filename_generator(**file_opt)
if not validate_filename(filename):
raise ValueError("Invalid filename or directory cannot be created.")
state = {key: '' for key in HEADERS}
state.update({
'tag': file_opt.get('tag', 2), 'obs': file_opt.get('obstacle', 2), 'power': file_opt.get('power', 27.),
'n_reads': 0, 'n_group': 0, 'pos_x': .0, 'pos_y': file_opt.get('distance', 2), 'pos_z': .0,
'depth': .0, 'moist': .0, 'file': filename, 'cmd': 'init', 'cmd_issued': time.monotonic()
})
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.user_data_set(state)
mqttc.connect(server_ip, server_port, 60)
mqttc.publish('rfid/c', b'{"command": "start", "command_id": "123", "payload": {"doNotPersistState": true}}')
try:
mqttc.loop_start()
start_time = time.monotonic()
while time.monotonic() - start_time < duration:
mqttc.publish('data/meas', f'{{"cmd": "next", "cmd_issued": {time.monotonic()}}}'.encode())
time.sleep(1.)
# mqttc.loop_forever()
except FileNotFoundError as e:
print(e)
finally:
mqttc.publish('rfid/c', b'{"command": "stop", "command_id": "321", "payload": {}}')
mqttc.loop_stop()
print('FINISH')
# =================================================================================================================================== #