-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtui.py
More file actions
618 lines (543 loc) · 30.5 KB
/
tui.py
File metadata and controls
618 lines (543 loc) · 30.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
# =================================================================================================================================== #
# ----------------------------------------------------------- DESCRIPTION ----------------------------------------------------------- #
# TUI.py - Text-based User Interface for controlling a Zebra FX7500 RFID reader. #
# This application provides a terminal-based interface to configure reader settings, run measurements, and collect RFID tag data #
# with various environmental parameters. The interface allows control of transmission power, tag type, obstacles, positioning, #
# and other experimental parameters for systematic RFID performance measurements. #
# =================================================================================================================================== #
# =================================================================================================================================== #
# --------------------------------------------------------- EXTERNAL IMPORTS -------------------------------------------------------- #
import os # Miscellaneous operating system interfaces. #
import re # Regular expression operations. #
import time # Time access and conversions. #
import asyncio # Asynchronous I/O, event loop, and coroutines. #
import datetime # Basic date and time types. #
import subprocess # Subprocess management. #
import paho.mqtt.client as mqtt # MQTT client for communicating with the RFID reader. #
#
from threading import Thread # Thread-based parallelism. #
from textual.app import App, ComposeResult # Textual library for creating TUI applications. #
from textual.containers import Horizontal, VerticalGroup, HorizontalGroup, Right # Layout containers for UI elements. #
from textual.widgets import Button, Input, Log, Label, Collapsible, ListItem, Footer # UI widgets for user interaction. #
#
import src.reader as reader # Custom module for RFID reader functionality. #
from arduino_serial.main import main as arduino_main # Module for communicating with Arduino (moisture sensor). #
# =================================================================================================================================== #
# =================================================================================================================================== #
# --------------------------------------------------------- READER COMMANDS --------------------------------------------------------- #
# JSON template for configuring the RFID reader in standard operation mode #
# Reference: https://zebradevs.github.io/rfid-ziotc-docs/schemas/raw_mqtt_payloads/commands/index.html #
OPERATION_MODE = '''
{{
"command": "set_mode", # Command to set the reader mode
"command_id": "333", # Unique identifier for this command
"payload": {{
"antennaStopCondition": {{"type": "SINGLE_INVENTORY_LIMITED_DURATION", "value": 1000}}, # Stop after 1000ms
"antennas": [1, 2], # Use antennas 1 and 2
"delayAfterSelects": 0,
"delayBetweenAntennaCycles": {{"duration": 75, "type": "NO_TAGS"}}, # Wait 75ms between cycles if no tags
"environment": "LOW_INTERFERENCE", # Environment setting for noise handling
"filter": {{"match": "prefix", "operation": "include", "value": ""}}, # No filtering
"radioStartConditions": {{}},
"radioStopConditions": {{}},
"reportFilter": {{"duration": 0, "type": "RADIO_WIDE"}},
"tagMetaData": ["ANTENNA", "RSSI", "CHANNEL", "PC", "XPC", "CRC", "MAC", "HOSTNAME", "SEEN_COUNT", "PHASE"], # Data to collect
"transmitPower": [{power:2.1f}, {power2:2.1f}], # Power levels for each antenna (dynamic values)
"type": "SIMPLE" # Simple operation mode
}}
}}
''' # 300833b2ddd9014000000000 (DogBone) - Example tag ID
# CUSTOM_MODE defines an advanced configuration with read/write operations #
CUSTOM_MODE = '''
{
"command": "set_mode",
"command_id": "222",
"payload": {
"accesses": [[ # Array of tag access operations
{
"config": {
"membank": "EPC", # Electronic Product Code memory bank
"wordCount": 1,
"wordPointer": 2
},
"type": "READ" # Read operation
}, {
"type": "WRITE", # Write operation
"config": {
"membank":"EPC",
"wordPointer": 2,
"data": "bbbb" # Data to write
}
}
]],
"antennaStopCondition": {"type": "SINGLE_INVENTORY_LIMITED_DURATION", "value": 500},
"antennas": [1], # Only use antenna 1
"delayAfterSelects": 0,
"delayBetweenAntennaCycles": {"duration": 75, "type": "NO_TAGS"},
"environment": "HIGH_INTERFERENCE", # Higher noise environment setting
"radioStartConditions": {},
"radioStopConditions": {},
"reportFilter": {},
"tagMetaData": [],
"transmitPower": 27.0, # Power in dBm
"type": "CUSTOM" # Custom operation mode
}
}
'''
# =================================================================================================================================== #
# =================================================================================================================================== #
# ---------------------------------------------------------- UTILITY CLASSES -------------------------------------------------------- #
class GrayCounter:
"""
Implements a multi-dimensional Gray code counter for systematically traversing parameter spaces.
Gray codes change only one bit at a time, useful for parameter sweeps with minimal transitions.
Attributes:
shape (list[int]): Dimensions of the parameter space to traverse.
state (list[int]): Current state/position within the parameter space.
direction (list[int]): Direction of movement for each dimension (1, 0, or -1).
"""
def __init__(self, shape: list[int]):
"""
Initialize counter with dimensions specified in shape.
Args:
shape (list[int]): List of sizes for each dimension in the parameter space.
"""
self.shape, self.state, self.direction = shape, [0]*len(shape), [1]*len(shape)
def increment(self):
"""
Advance the counter to the next state in the Gray code sequence.
Traverses the parameter space changing only one parameter at a time.
"""
for i in range(1, len(self.shape) + 1):
if self.shape[-i] <= 1:
continue # Skip dimensions with only one value
elif self.direction[-i] == 0:
# Determine new direction based on current state
self.direction[-i] = 1 if self.state[-i] == 0 else -1
else:
# Move in the current direction
self.state[-i] += self.direction[-i]
if (self.state[-i] == 0) or (self.state[-i] == self.shape[-i] - 1):
# At boundary, stop this dimension on next increment
self.direction[-i] = 0
break
def reset(self):
"""Reset counter to initial state."""
self.state, self.direction = [0]*len(self.shape), [1]*len(self.shape)
# =================================================================================================================================== #
# =================================================================================================================================== #
# ---------------------------------------------------------- UI COMPONENTS ---------------------------------------------------------- #
class NumberInput(VerticalGroup):
"""
Custom input widget for numeric values with increment/decrement buttons.
Supports bounded values, step sizes, and option selection from lists.
Can also function as a countdown timer.
Attributes:
title (str): Label text for the input.
value_def (float|int): Default value.
countdown (bool): Whether this input functions as a countdown timer.
auto (bool): Whether this input can be auto-swept in batch measurements.
value (float|int): Current value.
min, max (float|int): Minimum and maximum allowed values.
step (float|int): Step size for increments/decrements.
remaining (float): Remaining time for countdown timer.
options (list|None): List of options if this is a selection widget.
timer (Timer|None): Timer object for countdown functionality.
input (Input|None): Textual Input widget.
format (str): Format string for displaying numeric values.
"""
def __init__(self, title: str = 'Test', default: float | int = 0, inf: float | int = -180, sup: float | int = 180,
step: float | int = .1, countdown: bool = False, options: list | None = None, auto: bool = False,
**kwargs):
"""
Initialize the number input with range constraints and behavior options.
Args:
title (str): Label text for the input.
default (float|int): Default value.
inf (float|int): Minimum allowed value.
sup (float|int): Maximum allowed value.
step (float|int): Step size for increments/decrements.
countdown (bool): Whether this input functions as a countdown timer.
options (list|None): List of options if this is a selection widget.
auto (bool): Whether this input can be auto-swept in batch measurements.
**kwargs: Additional arguments for the parent VerticalGroup.
"""
self.title, self.value_def, self.countdown, self.auto = title, default, countdown, auto
self.value, self.min, self.max, self.step = default, inf, sup, step
self.remaining, self.options = -1e-6, options
self.timer, self.input = None, None
self.format = '{num: .5g}' # Format for displaying numeric values
# If options list is provided, treat as a selection widget
if options:
self.step = max(1, int(self.step))
self.min, self.max = 0, len(options) - 1
super().__init__(**kwargs)
def compose(self) -> ComposeResult:
"""
Create the widget's UI components.
Returns:
ComposeResult: The composed UI elements.
"""
yield Label(self.title)
with HorizontalGroup():
self.input = Input(value=self.value_str, id=self.id + '_inp',
tooltip=self.options[int(self.value)].__repr__() if self.options else None)
yield self.input
with VerticalGroup():
yield Button('▲', classes='small_button', id='inc') # Increment button
yield Button('⊙', classes='small_button', id='rst') # Reset button
yield Button('▼', classes='small_button', id='dec') # Decrement button
def on_mount(self) -> None:
"""Set up the timer when the widget is mounted if countdown mode is enabled."""
if self.countdown:
self.timer = self.set_interval(1 / 10., self.update_countdown, pause=True)
def start_countdown(self) -> None:
"""Start the countdown timer."""
self.value_def = self.sanitized_input()
if self.countdown:
self.remaining = self.value
self.timer.resume()
def stop_countdown(self, reason='stop') -> None:
"""
Stop the countdown timer and update the application state.
Args:
reason (str): Reason for stopping ('stop', 'interrupt', etc.).
"""
if self.countdown:
self.timer.pause()
self.value_str = self.value_def
# Update state with measurement completion information
self.app.state.update({'file': f'{self.app.state['path']}/dump_{datetime.date.today().__str__()}.csv',
'cmd': reason, 'cmd_issued': time.monotonic()})
# Publish measurement completion message via MQTT
self.app.mqttc.publish('data/meas', ('{' +
', '.join(f'"{x}": {self.app.state[x].__repr__()}' for x in ('cmd', 'cmd_issued', 'file')) +
'}').replace("'", '"').encode())
# Update button states
self.app.stop.add_class('disabled')
self.app.start.remove_class('disabled')
self.app.next.remove_class('disabled')
def update_countdown(self) -> None:
"""Update the countdown timer display every 0.1 seconds."""
if self.remaining <= 0:
self.stop_countdown()
return
self.remaining -= 0.1
self.value_str = self.remaining
def sanitized_input(self):
"""
Clean and validate user input to ensure it's a valid number within range.
Returns:
float|int: The sanitized input value.
"""
val = re.sub(r'[^0-9\.]', '', self.input.value) # Remove non-numeric characters
val = val if val != '' else self.value_def
val = int(val) if self.options else float(val)
return max(self.min, min(self.max, val)) # Clamp to allowed range
def on_input_changed(self, event: Input.Changed) -> None:
"""
Handle user input changes.
Args:
event (Input.Changed): The input change event.
"""
self.value_str = self.sanitized_input()
def action_increase(self):
"""Increase the value by one step."""
self.value_str = min(self.value + self.step, self.max)
def action_reset(self):
"""Reset to default value."""
self.value_str = self.value_def
def action_decrease(self):
"""Decrease the value by one step."""
self.value_str = max(self.value - self.step, self.min)
def on_button_pressed(self, event: Button.Pressed) -> None:
"""
Handle button press events.
Args:
event (Button.Pressed): The button press event.
"""
if event.button.id == 'inc':
self.action_increase()
elif event.button.id == 'rst':
self.action_reset()
elif event.button.id == 'dec':
self.action_decrease()
def _get_number(self):
"""
Format the current value for display.
Returns:
str: The formatted value.
"""
return self.format.format(num=self.value)
def _set_number(self, num):
"""
Set the current value and update the display.
Args:
num (float|int|str): The new value.
"""
if self.options is None:
self.value = float(num)
self.input.value = self.value_str
else:
self.value = num if type(num) is int else self.options.index(num)
self.input.tooltip = self.options[self.value].__repr__()
self.input.value = f'[{self.value_str}]'
# Property for accessing/setting the value as a formatted string
value_str = property(
fget=_get_number,
fset=_set_number,
)
class CollapsibleItem(ListItem):
"""
List item that can be expanded to show additional content.
Attributes:
label (str): The text label for the item.
"""
def __init__(self, label: str) -> None:
"""
Initialize with a label.
Args:
label (str): The text label for the item.
"""
super().__init__()
self.label = label
def compose(self) -> ComposeResult:
"""
Create the collapsible item UI.
Returns:
ComposeResult: The composed UI elements.
"""
yield Collapsible(Label(self.label), Label(self.label + '2'))
# =================================================================================================================================== #
# =================================================================================================================================== #
# ---------------------------------------------------------- MAIN APPLICATION ------------------------------------------------------ #
class ReaderApp(App):
"""
Main application class for the RFID reader control interface.
Provides UI for configuring parameters, starting/stopping measurements,
and automating measurement sequences.
Attributes:
BINDINGS (list): Keyboard shortcuts for the application.
CSS_PATH (str): Path to CSS styling file.
app_log (Log): Log widget for displaying application messages.
start, stop, skip, next (Button): Control buttons for measurements.
timer (NumberInput): Timer widget for measurement duration.
mqttc (mqtt.Client): MQTT client for communicating with RFID reader.
state (dict): Application state containing measurement parameters.
batch (dict): Configuration for batch measurements.
inputs (list): List of input parameter configurations.
annoying (list): Parameters to be automatically swept in batch mode.
batch_counter (GrayCounter): Counter for traversing parameter combinations.
"""
BINDINGS = [
("a", "auto_measurement", "Auto") # Keyboard shortcut for auto measurement
]
CSS_PATH = 'main_tui.css' # CSS styling for the TUI
def __init__(self, mqttc: mqtt.Client, state: dict, batch: dict[str, list] = None):
"""
Initialize the application with MQTT client and state information.
Args:
mqttc (mqtt.Client): MQTT client for communicating with RFID reader.
state (dict): Initial application state.
batch (dict[str, list], optional): Batch measurement configuration.
"""
super().__init__()
self.app_log, self.start, self.stop, self.skip, self.next, self.timer = [None]*6
self.mqttc, self.state = mqttc, state
self.batch, self.inputs = {}, [
# Define all input parameters with default values and constraints
{'title': 'Depth (m)', 'id': 'depth', 'default': self.state['depth'], 'step': .01},
{'title': 'Offset X', 'id': 'pos_x', 'default': self.state['pos_x'], 'step': .01},
{'title': 'Height (m)', 'id': 'pos_y', 'default': self.state['pos_y'], 'step': .01},
# Commented inputs can be uncommented to add more parameters
# {'title': 'Offset Z', 'id': 'pos_z', 'default': self.state['pos_z'], 'step': .01},
# {'title': 'Angle rX', 'id': 'angle_rx', 'default': self.state['angle_rx'], 'inf': -180, 'sup': 180, 'step': 1},
# {'title': 'Angle tX', 'id': 'angle_tx', 'default': self.state['angle_tx'], 'inf': -180, 'sup': 180, 'step': 1},
# {'title': 'Angle rY', 'id': 'angle_ry', 'default': self.state['angle_ry'], 'inf': -180, 'sup': 180, 'step': 1},
# {'title': 'Angle tY', 'id': 'angle_ty', 'default': self.state['angle_ty'], 'inf': -180, 'sup': 180, 'step': 1},
# {'title': 'Angle rZ', 'id': 'angle_rz', 'default': self.state['angle_rz'], 'inf': -180, 'sup': 180, 'step': 1},
# {'title': 'Angle tZ', 'id': 'angle_tz', 'default': self.state['angle_tz'], 'inf': -180, 'sup': 180, 'step': 1},
{'title': 'Moisture', 'id': 'moist', 'default': self.state['moist'], 'inf': 0., 'sup': 1., 'step': .01},
{'title': 'Tag', 'id': 'tag', 'default': reader.TAGS.index(self.state['tag']), 'options': reader.TAGS},
{'title': 'Power (dBm)', 'id': 'power', 'default': self.state['power'], 'inf': 10., 'sup': 29.2, 'step': .1, 'auto': True},
{'title': 'Obstacle', 'id': 'obs', 'default': reader.OBSTACLES.index(self.state['obs']), 'options': reader.OBSTACLES},
{'title': 'Pow_2 (dBm)', 'id': 'power2', 'default': self.state['power2'], 'inf': 10., 'sup': 29.2, 'step': .1},
]
# Configure batch measurement parameters if provided
if batch is not None:
self.batch.update(batch)
for inp in self.inputs:
if inp['id'] in self.batch:
inp['options'] = self.batch[inp['id']]
inp['default'] = 0
# Parameters that will be automatically swept in batch mode
self.annoying = ['depth', 'moist', 'pos_y', 'obs', 'tag', 'power']
self.annoying = [x for x in self.annoying if x in self.batch]
self.batch_counter = GrayCounter([len(batch[x]) for x in self.annoying])
else:
self.annoying = []
async def action_auto_measurement(self):
"""
Run an automated sequence of measurements by sweeping the power parameter.
Automatically steps through all power levels, performing a measurement at each.
"""
sel_power: NumberInput = self.query_one(f'#power')
if sel_power.auto and sel_power.options is not None:
for i, op in enumerate(sel_power.options):
sel_power.value_str = i
self.timer.value = self.timer.value_def
self.timer.remaining = self.timer.value_def
self.action_start_measurement()
await asyncio.sleep(self.timer.value_def + .5) # Wait for measurement to complete
def action_stop_measurement(self, reason='interrupt'):
"""
Stop the current measurement.
Args:
reason (str): Reason for stopping ('interrupt', etc.).
"""
self.query_one('#D').stop_countdown(reason)
def action_next_measurement(self):
"""
Move to the next parameter combination in batch mode.
Uses the GrayCounter to determine the next set of parameter values.
"""
for i, inp in enumerate(self.annoying):
sel: NumberInput = self.query_one(f'#{inp}')
res = self.batch_counter.state[i]
sel.value_def = res
sel.value_str = res
self.batch_counter.increment()
self.next.add_class('disabled')
self.skip.remove_class('disabled')
def action_start_measurement(self):
"""
Start a measurement with the current parameter settings.
Updates state, generates appropriate filename, and sends commands to the RFID reader.
"""
# Update state with current UI values
for inp in self.inputs:
sel: NumberInput = self.query_one(f'#{inp["id"]}')
if sel.options is not None:
self.state.update({inp["id"]: sel.options[sel.value]})
else:
self.state.update({inp["id"]: sel.value})
# Generate output filename based on parameters
tag, obs = reader.TAGS.index(self.state['tag']), reader.OBSTACLES.index(self.state['obs'])
file = reader.filename_generator(tag=tag, freq=865.7, obstacle=obs, distance=float(self.state['pos_y']),
initial_filepath=self.state['path'], power=float(self.state['power']), suffix='gui')
reader.validate_filename(file) # Create file/dir if not exists
# Update state and send commands to the RFID reader
self.state.update({'cmd': 'start', 'cmd_issued': time.monotonic(), 'file': file, })
self.mqttc.publish('rfid/c', OPERATION_MODE.format(**self.state).encode())
# Publish measurement metadata
inps = [x['id'] for x in self.inputs] + ['cmd', 'cmd_issued', 'file']
self.mqttc.publish('data/meas', ('{' +
', '.join(f'"{x}": {self.state[x].__repr__()}' for x in inps) +
'}').replace("'", '"').encode())
self.state.update({'cmd': '', 'cmd_issued': '', 'file': file, })
# Update UI button states
self.start.add_class('disabled')
self.stop.remove_class('disabled')
self.skip.add_class('disabled')
self.app_log.write(self.state.__repr__() + '\n') # Log current state
self.timer.start_countdown() # Start the measurement timer
def compose(self) -> ComposeResult:
"""
Create the application UI layout.
Returns:
ComposeResult: The composed UI elements.
"""
self.app_log = Log(id='log')
yield self.app_log
with Horizontal(id='main'):
with VerticalGroup():
# Split inputs into two columns
inp_a, inp_b = self.inputs[::2], self.inputs[1::2]
with HorizontalGroup():
for inp in inp_a:
yield NumberInput(**inp)
with HorizontalGroup():
for inp in inp_b:
yield NumberInput(**inp)
self.timer = NumberInput('Duration', id='D', default=5.0, countdown=True)
yield self.timer
with Right(id='controls'):
# Control buttons
self.start = Button('Start', classes='', id='start', variant='success', action='app.start_measurement()')
yield self.start
self.stop = Button('Stop', classes='disabled', id='stop', variant='error', action='app.stop_measurement()')
yield self.stop
if self.state is not None:
self.skip = Button('Skip', classes='', id='skip', variant='error', action='app.next_measurement()')
yield self.skip
self.next = Button('Next >', classes='disabled', id='next', action='app.next_measurement()')
yield self.next
yield Footer()
# =================================================================================================================================== #
# =================================================================================================================================== #
# ---------------------------------------------------------- MAIN FUNCTION ---------------------------------------------------------- #
def main(server_port = 18300, server_ip = '10.0.0.1', moist_thread: bool = True):
"""
Main function to run the RFID reader control application.
Args:
server_port (int): MQTT broker port.
server_ip (str): MQTT broker IP address.
moist_thread (bool): Whether to start the moisture sensor communication thread.
Returns:
None
"""
# Create output directory for data
filepath = f'./data/{datetime.date.today().__str__()}'
os.makedirs(filepath, exist_ok=True)
# Initialize application state with default values
state = {'n_reads': 0, 'n_group': 0, 'pos_x': .0, 'pos_y': 1.538, 'pos_z': .0, 'depth': .096, 'moist': .12,
'angle_rx': 0, 'angle_ry': 0, 'angle_rz': 0, 'angle_tx': 180, 'angle_ty': 180, 'angle_tz': 180,
'power': 27.0, 'power2': 27.0, 'tag': reader.TAGS[2], 'obs': reader.OBSTACLES[8], 'cmd': 'init', 'cmd_issued': 0,
'path': filepath, 'file': f'{filepath}/dump_{datetime.date.today().__str__()}.csv'}
reader.validate_filename(state['file'])
# Set up Arduino serial communication for moisture sensor if enabled
if moist_thread:
try:
# Ensure Arduino port is readable
acc = os.stat('/dev/ttyACM0')
if (acc.st_mode & 0o666) != 0o666: # Change permission if needed
subprocess.call(['sudo', 'chmod', 'a+rw', '/dev/ttyACM0'])
arduino = Thread(target=arduino_main, args=('/dev/ttyACM0', filepath))
arduino.daemon = True
except (FileNotFoundError, PermissionError) as e:
moist_thread = False
# Set up MQTT client for communication with RFID reader
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = reader.on_connect
mqttc.on_message = reader.on_message
mqttc.user_data_set(state)
# Connect to MQTT broker and initialize the reader
mqttc.connect(server_ip, server_port, 60)
mqttc.publish('rfid/c', b'{"command": "start", "command_id": "123", "payload": {"doNotPersistState": true}}')
try:
# Start the moisture sensor thread if enabled
if moist_thread:
arduino.start()
# Start the MQTT client loop
mqttc.loop_start()
# Create and run the application with batch measurement configuration
app = ReaderApp(mqttc, state, batch={
'pos_y': [1.70 - x/100 for x in range(0, 151, 5)], # Height values to sweep from 1.70m to 0.20m
# Uncomment to enable other parameter sweeps
# 'power': [x for x in range(10, 30)],
# 'obs': [reader.OBSTACLES[x] for x in [0, 3, 7]], 'tag': [reader.TAGS[x] for x in [2]],
})
reply = app.run()
print(reply)
except FileNotFoundError as e:
print(e)
finally:
# Clean up: stop the reader and MQTT client
mqttc.publish('rfid/c', b'{"command": "stop", "command_id": "321", "payload": {}}')
mqttc.loop_stop()
# =================================================================================================================================== #
# =================================================================================================================================== #
# ---------------------------------------------------------- ENTRY POINT ------------------------------------------------------------ #
if __name__ == '__main__':
main(server_port = 18300, server_ip = 'localhost') # Run with local MQTT broker
# =================================================================================================================================== #