55import aiohttp
66import asyncio
77import config
8+ import serial
89
910import signal
1011import sys
1314from multiprocessing .managers import BaseManager
1415from . import db
1516from file_sync .file_sync_down .main import *
17+ import re
1618
1719format_string = '<' # little-endian
1820byte_length = 0
1921properties = []
2022frontend_data = {}
21- solar_car_connection = {'lte' : False , 'udp' : False }
23+ solar_car_connection = {'lte' : False , 'udp' : False , 'serial' : False }
2224# Convert dataformat to format string for struct conversion
2325# Docs: https://docs.python.org/3/library/struct.html
2426types = {'bool' : '?' , 'float' : 'f' , 'char' : 'c' , 'uint8' : 'B' , 'uint16' : 'H' , 'uint64' : 'Q' }
27+ serial_port = {"device" : "" , 'baud' : 115200 } # shared object with core_api for setting serial device from frontend
2528
2629def set_format (file_path : str ):
2730 global format_string , byte_length , properties
@@ -46,7 +49,7 @@ def unpack_data(data):
4649
4750
4851class Telemetry :
49- __tmp_data = {'tcp' : b'' , 'lte' : b'' , 'udp' : b'' , 'file_sync' : b'' }
52+ __tmp_data = {'tcp' : b'' , 'lte' : b'' , 'udp' : b'' , 'file_sync' : b'' , 'serial' : b'' }
5053 latest_tstamp = 0
5154
5255 def listen_udp (self , port : int ):
@@ -141,6 +144,51 @@ def listen_tcp(self, server_addr: str, port: int):
141144 solar_car_connection ['tcp' ] = False
142145 break
143146
147+ def serial_read (self ):
148+ global frontend_data , serial_port
149+ latest_tstamp = 0
150+ while True :
151+ curr_device = serial_port ['device' ]
152+ curr_baud = serial_port ['baud' ]
153+ if (curr_device ):
154+ # Establish a serial connection)
155+ ser = serial .Serial (curr_device , curr_baud )
156+ # if device has been updated then exit loop and connect to new device
157+ while curr_device == serial_port ['device' ] and curr_baud == serial_port ['baud' ]:
158+ if time .time () - latest_tstamp > 5 :
159+ solar_car_connection ['serial' ] = False
160+ # Read data from serial port
161+ try :
162+ data = b''
163+ if (ser .in_waiting > 0 ):
164+ data = ser .read (ser .in_waiting )
165+ else :
166+ time .sleep (0.1 )
167+ if not data :
168+ # No data received, continue listening
169+ continue
170+ packets = self .parse_packets (data , 'serial' )
171+ for packet in packets :
172+ if len (packet ) == byte_length :
173+ d = unpack_data (packet )
174+ latest_tstamp = time .time ()
175+ try :
176+ frontend_data = d .copy ()
177+ db .insert_data (d )
178+ except Exception as e :
179+ print (traceback .format_exc ())
180+ continue
181+ solar_car_connection ['serial' ] = True
182+ except Exception :
183+ print (traceback .format_exc ())
184+ solar_car_connection ['serial' ] = False
185+ serial_port ['device' ] = ""
186+ break
187+ else :
188+ solar_car_connection ['serial' ] = False
189+ # wait before retry
190+ time .sleep (1 )
191+
144192 async def fetch (self , session , url ):
145193 try :
146194 async with session .get (url , timeout = 2 ) as response :
@@ -195,35 +243,40 @@ async def remote_db_fetch(self, server_url: str):
195243
196244 def parse_packets (self , new_data : bytes , tmp_source : str ):
197245 """
198- Parse and check the length of each packet
199- :param new_data: Newly received bytes from the comm channel
200- :param tmp_source: Name of tmp data source, put comm channel name here e.g. tcp, lte
246+ Parse and check the length of each packet.
247+
248+ :param new_data: Newly received bytes from the comm channel.
249+ :param tmp_source: Name of tmp data source, put comm channel name here e.g. tcp, lte.
201250 """
202- header = b'<bsr>'
203- footer = b'</bsr>'
251+ header = b"<bsr>"
252+ footer = b"<bsr"
253+ if tmp_source not in self .__tmp_data :
254+ self .__tmp_data [tmp_source ] = b''
255+
256+ # Append new data to the temporary buffer
204257 self .__tmp_data [tmp_source ] += new_data
258+
259+ # Regex pattern to match packets with <bsr> and </bsr> tags
260+ pattern = re .compile (b'<bsr>(.*?)</bsr>' , re .DOTALL )
261+
205262 packets = []
206263 while True :
207- # Search for the next complete data packet
208- try :
209- start_index = self .__tmp_data [tmp_source ].index (header )
210- end_index = self .__tmp_data [tmp_source ].index (footer )
211- except ValueError :
264+ match = pattern .search (self .__tmp_data [tmp_source ])
265+ if not match :
212266 break
267+ # Extract the packet data
268+ packet = match .group (1 )
269+ #remove headers and footers
270+ packets .append (packet )
213271
214- # Extract a complete data packet
215- packets .append (self .__tmp_data [tmp_source ][start_index + len (header ):end_index ])
216- # Update the remaining data to exclude the processed packet
217- self .__tmp_data [tmp_source ] = self .__tmp_data [tmp_source ][end_index + len (footer ):]
218-
219- # If the remaining data is longer than the expected packet length,
220- # there might be an incomplete packet, so log a warning.
221- if len (self .__tmp_data [tmp_source ]) >= byte_length :
222- print ("Warning: Incomplete or malformed packet ------------------------------------" )
223- self .__tmp_data [tmp_source ] = b''
272+ if match .start (0 ) != 0 :
273+ print (f"skipping { match .start (0 )} bytes" )
274+ # Remove the processed packet from the temporary buffer
275+ self .__tmp_data [tmp_source ] = self .__tmp_data [tmp_source ][match .end ():]
224276
225277 return packets
226278
279+
227280 def fs_down_callback (self , data ):
228281 # copied from listen_upd()
229282 if not data :
@@ -255,12 +308,13 @@ def sigint_handler(signal, frame):
255308
256309def start_comms ():
257310 # start file sync
258- p .start ()
259-
260-
311+ # p.start()
312+
261313 # Start two live comm channels
262- vps_thread = threading .Thread (target = lambda : asyncio .run (telemetry .remote_db_fetch (config .VPS_URL )))
263- vps_thread .start ()
264- socket_thread = threading .Thread (target = lambda : telemetry .listen_udp (config .UDP_PORT ))
314+ #vps_thread = threading.Thread(target=lambda : asyncio.run(telemetry.remote_db_fetch(config.VPS_URL)))
315+ #vps_thread.start()
316+ #socket_thread = threading.Thread(target=lambda: telemetry.listen_udp(config.UDP_PORT))
317+ #socket_thread.start()
318+ socket_thread = threading .Thread (target = lambda : telemetry .serial_read ())
265319 socket_thread .start ()
266320
0 commit comments