-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpacket_receiver.py
More file actions
78 lines (61 loc) · 2.55 KB
/
Copy pathpacket_receiver.py
File metadata and controls
78 lines (61 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import serial
import re
from loctek_structs import Packet
from packet_processor import AbstractPacketProcessor
import time
from abc import ABC, abstractmethod
class AbstractPacketReceiver:
@abstractmethod
def start_receiving(self, packet_processor: AbstractPacketProcessor)-> None:
pass
@abstractmethod
def end_receiving(self)-> None:
pass
class SerialPacketReceiver(AbstractPacketReceiver):
def __init__(self, port_name: str, baud_rate: int=9600):
super().__init__()
self.baud_rate = baud_rate
self.port_name = port_name
self.stop_listening = False
def start_receiving(self, packet_processor: AbstractPacketProcessor)-> None:
with serial.Serial(self.port_name, self.baud_rate) as conn:
print("listening on port ", conn.name)
buff = []
while not self.stop_listening:
byte = conn.read().hex().upper()
buff.append(byte)
if buff and buff[0] != Packet.BYTE.START:
buff = []
elif Packet.is_valid_packet(buff):
packet_processor.on_packet_recv(Packet(0, 0, buff))
buff.clear()
self.stop_listening = False
def stop_receiving(self)-> None:
self.stop_listening = True
class PulseViewFilePacketReceiver(AbstractPacketReceiver):
def __init__(self, file_path: str):
super().__init__()
self.file_path = file_path
def start_receiving(self, packet_processor: AbstractPacketProcessor)-> None:
pattern = re.compile(r"(\d+)-(\d+).+[RX|TX]:\s+(\w+)")
buff = []
t_pckg_start = -1
t_pckg_end = -1
with open(self.file_path, "r") as f:
for line in f.readlines():
for match in re.finditer(pattern, line):
t_start, t_end, byte = match.group(1), match.group(2), match.group(3)
if byte in "Start Stop".split():
continue
if byte == Packet.BYTE.START:
t_pckg_start = t_start
elif byte == Packet.BYTE.STOP:
t_pckg_end = t_end
buff.append(byte)
if buff and buff[0] != Packet.BYTE.START:
buff = []
elif Packet.is_valid_packet(buff):
packet_processor.on_packet_recv(Packet(t_pckg_start, t_pckg_end, buff))
buff = []
def stop_receiving(self)-> None:
pass