77 Response ,
88)
99import uuid , time , threading , json , logging , requests , os , asyncio , io , zipfile
10+ from datetime import datetime , timezone
11+ from zoneinfo import ZoneInfo
1012from typing import Optional , Tuple , List
1113from urllib .parse import quote
1214from helper import CANTimescaleStreamer
2022
2123error_logger = logging .getLogger (__name__ )
2224
23- ALLOWED_EXTENSIONS = {"csv" , "zip" }
25+ ALLOWED_EXTENSIONS = {"csv" , "zip" , "pecan" }
2426UPLOAD_ZIP_MAX_ARCHIVE_BYTES = int (os .getenv ("UPLOAD_ZIP_MAX_ARCHIVE_BYTES" , str (2 * 1024 ** 3 )))
2527UPLOAD_ZIP_MAX_MEMBER_BYTES = int (os .getenv ("UPLOAD_ZIP_MAX_MEMBER_BYTES" , str (4 * 1024 ** 3 )))
2628UPLOAD_ZIP_MAX_TOTAL_UNCOMPRESSED_BYTES = int (
@@ -155,6 +157,16 @@ def _zip_entry_path_safe(arcname: str) -> bool:
155157 return ".." not in n .split ("/" )
156158
157159
160+ class _InMemoryFile :
161+ """Minimal file-like object for passing in-memory bytes through expand_upload_files_to_csv_payloads."""
162+ def __init__ (self , filename : str , data : bytes ):
163+ self .filename = filename
164+ self ._data = data
165+
166+ def read (self ) -> bytes :
167+ return self ._data
168+
169+
158170def expand_upload_files_to_csv_payloads (files ) -> Tuple [List [Tuple [str , bytes ]], Optional [str ]]:
159171 out : List [Tuple [str , bytes ]] = []
160172 zip_idx = 0
@@ -178,36 +190,76 @@ def expand_upload_files_to_csv_payloads(files) -> Tuple[List[Tuple[str, bytes]],
178190 infos = [
179191 i for i in z .infolist ()
180192 if not i .is_dir ()
181- and i .filename .lower ().endswith (".csv" )
193+ and ( i .filename .lower ().endswith (".csv" ) or i . filename . lower (). endswith ( ".pecan" ) )
182194 and _zip_entry_path_safe (i .filename )
183195 # exclude macOS resource forks (__MACOSX/ and ._filename)
184196 and not i .filename .startswith ("__MACOSX/" )
185197 and not os .path .basename (i .filename ).startswith ("._" )
186198 ]
187199 if not infos :
188- return [], f"No CSV files found in zip: { name } "
200+ return [], f"No CSV or .pecan files found in zip: { name } "
189201 if len (infos ) > UPLOAD_ZIP_MAX_CSV_IN_ZIP :
190202 return [], f"Too many CSV entries in { name } (max { UPLOAD_ZIP_MAX_CSV_IN_ZIP } )"
191203 total_uc = sum (i .file_size for i in infos )
192204 if total_uc > UPLOAD_ZIP_MAX_TOTAL_UNCOMPRESSED_BYTES :
193205 return [], f"Zip { name } uncompressed total too large"
194206 for i in infos :
195207 if i .file_size > UPLOAD_ZIP_MAX_MEMBER_BYTES :
196- return [], f"CSV inside zip too large: { i .filename } in { name } "
208+ return [], f"File inside zip too large: { i .filename } in { name } "
197209 leaf = os .path .basename (i .filename ) or "data.csv"
198210 key = (zlabel , leaf .lower ())
199211 if key in seen_in_zip :
200- return [], f'Duplicate CSV filename "{ leaf } " inside zip { name } '
212+ return [], f'Duplicate filename "{ leaf } " inside zip { name } '
201213 seen_in_zip .add (key )
202214 with z .open (i , "r" ) as fp :
203215 body = fp .read ()
204- out .append ((f"_z{ zlabel } /{ leaf } " , body ))
216+ if leaf .lower ().endswith (".pecan" ):
217+ # Convert .pecan to CSV in-place so the pipeline is uniform
218+ sub_out , err = expand_upload_files_to_csv_payloads (
219+ [_InMemoryFile (leaf , body )]
220+ )
221+ if err :
222+ return [], f"{ err } (inside zip { name } )"
223+ out .extend (sub_out )
224+ else :
225+ out .append ((f"_z{ zlabel } /{ leaf } " , body ))
205226 except zipfile .BadZipFile :
206227 return [], f"Invalid or corrupt zip: { name } "
207228 except RuntimeError as e :
208229 return [], f"Could not read zip { name } : { e } "
230+ elif ext == "pecan" :
231+ try :
232+ payload = json .loads (data .decode ("utf-8" ))
233+ except Exception :
234+ return [], f"Invalid .pecan file (bad JSON): { name } "
235+ if payload .get ("format" ) != "pecan-session" or payload .get ("version" ) != 2 :
236+ return [], f".pecan file must be pecan-session v2 format: { name } "
237+ frames = payload .get ("frames" ) or []
238+ if not frames :
239+ return [], f"No frames in .pecan file: { name } "
240+ epoch_base_ms = payload .get ("epochBaseMs" )
241+ if epoch_base_ms is None :
242+ return [], f".pecan file missing epochBaseMs — cannot determine timestamps: { name } "
243+ tz_toronto = ZoneInfo ("America/Toronto" )
244+ start_dt = datetime .fromtimestamp (epoch_base_ms / 1000 , tz = tz_toronto )
245+ csv_filename = start_dt .strftime ("%Y-%m-%d-%H-%M-%S" ) + ".csv"
246+ lines = []
247+ for frame in frames :
248+ if not isinstance (frame , list ) or len (frame ) < 4 :
249+ continue
250+ try :
251+ t_rel_ms = int (frame [0 ])
252+ can_id = int (frame [1 ])
253+ data_bytes = bytes .fromhex (str (frame [3 ]))
254+ padded = (data_bytes + b"\x00 " * 8 )[:8 ]
255+ except Exception :
256+ continue
257+ lines .append (f"{ t_rel_ms } ,CAN,{ can_id } ," + "," .join (str (b ) for b in padded ))
258+ if not lines :
259+ return [], f"No parseable frames in .pecan file: { name } "
260+ out .append ((csv_filename , "\n " .join (lines ).encode ("utf-8" )))
209261 else :
210- return [], f"Invalid file type (only .csv and .zip ): { name } "
262+ return [], f"Invalid file type (only .csv, .zip, and .pecan ): { name } "
211263 if not out :
212264 return [], "No CSV data to process"
213265 return out , None
@@ -501,12 +553,13 @@ def on_progress(sent: int, total: int) -> None:
501553 pass
502554
503555 def worker ():
504- streamer = CANTimescaleStreamer (
505- postgres_dsn = POSTGRES_DSN ,
506- table = season .lower (),
507- dbc_path = dbc_temp_path ,
508- )
556+ streamer = None
509557 try :
558+ streamer = CANTimescaleStreamer (
559+ postgres_dsn = POSTGRES_DSN ,
560+ table = season .lower (),
561+ dbc_path = dbc_temp_path ,
562+ )
510563 asyncio .run (
511564 streamer .stream_multiple_csvs (
512565 file_data = file_data ,
@@ -516,14 +569,16 @@ def worker():
516569 )
517570 except Exception as e :
518571 error_logger .error (traceback .format_exc ())
519- PROGRESS [task_id ]["msg" ] = f"Error: { e } "
520- PROGRESS [task_id ]["done" ] = True
572+ PROGRESS [task_id ]["msg" ] = f"Error: { e } "
573+ PROGRESS [task_id ]["error" ] = str (e )
574+ PROGRESS [task_id ]["done" ] = True
521575 slack .fail (str (e ))
522576 finally :
523- try :
524- streamer .close ()
525- except Exception as e :
526- print ("error closing streamer" , e )
577+ if streamer :
578+ try :
579+ streamer .close ()
580+ except Exception as e :
581+ print ("error closing streamer" , e )
527582 if dbc_temp_path and os .path .exists (dbc_temp_path ):
528583 try :
529584 os .unlink (dbc_temp_path )
0 commit comments