@@ -247,64 +247,69 @@ def timeseries_df_to_json(
247247 df = df .reindex (columns = ["date-time" , "value" , "quality-code" ])
248248 if df .isnull ().values .any ():
249249 raise ValueError ("Null/NaN data must be removed from the dataframe" )
250-
250+ if version_date :
251+ version_date_iso = version_date .isoformat ()
252+ else :
253+ version_date_iso = None
251254 ts_dict = {
252255 "name" : ts_id ,
253256 "office-id" : office_id ,
254257 "units" : units ,
255258 "values" : df .values .tolist (),
256- "version-date" : version_date ,
259+ "version-date" : version_date_iso ,
257260 }
258261
259262 return ts_dict
260263
261264
262265def store_multi_timeseries_df (
263- ts_data : pd .DataFrame , office_id : str , max_workers : Optional [int ] = 30
266+ data : pd .DataFrame , office_id : str , max_workers : Optional [int ] = 30
264267) -> None :
265-
266268 def store_ts_ids (
267269 data : pd .DataFrame ,
268270 ts_id : str ,
269271 office_id : str ,
270272 version_date : Optional [datetime ] = None ,
271273 ) -> None :
272- units = data ["units" ].iloc [0 ]
273- data_json = timeseries_df_to_json (
274- data = data ,
275- ts_id = ts_id ,
276- units = units ,
277- office_id = office_id ,
278- version_date = version_date ,
279- )
280- store_timeseries (data = data_json )
274+ try :
275+ units = data ["units" ].iloc [0 ]
276+ data_json = timeseries_df_to_json (
277+ data = data ,
278+ ts_id = ts_id ,
279+ units = units ,
280+ office_id = office_id ,
281+ version_date = version_date ,
282+ )
283+ store_timeseries (data = data_json )
284+ except Exception as e :
285+ print (f"Error processing { ts_id } : { e } " )
281286 return None
282287
288+ ts_data_all = data .copy ()
289+ if "version_date" not in ts_data_all .columns :
290+ ts_data_all = ts_data_all .assign (version_date = pd .to_datetime (pd .Series ([])))
283291 unique_tsids = (
284- ts_data ["ts_id" ].astype (str ) + ":" + ts_data ["version_date" ].astype (str )
292+ ts_data_all ["ts_id" ].astype (str ) + ":" + ts_data_all ["version_date" ].astype (str )
285293 ).unique ()
286294
287295 with concurrent .futures .ThreadPoolExecutor (max_workers = max_workers ) as executor :
288- for ts_id_all in unique_tsids :
289- try :
290- ts_id , version_date = ts_id_all .split (":" , 1 )
291- if version_date != "NaT" :
292- version_date_dt = pd .to_datetime (version_date )
293- data = ts_data [
294- (ts_data ["ts_id" ] == ts_id )
295- & (ts_data ["version_date" ] == version_date_dt )
296- ]
297- else :
298- version_date_dt = None
299- data = ts_data [
300- (ts_data ["ts_id" ] == ts_id ) & ts_data ["version_date" ].isna ()
301- ]
302- if not data .empty :
303- executor .submit (
304- store_ts_ids , data , ts_id , office_id , version_date_dt
305- )
306- except Exception as e :
307- print (f"Error processing { ts_id } : { e } " )
296+ for unique_tsid in unique_tsids :
297+ ts_id , version_date = unique_tsid .split (":" , 1 )
298+ if version_date != "NaT" :
299+ version_date_dt = pd .to_datetime (version_date )
300+ ts_data = ts_data_all [
301+ (ts_data_all ["ts_id" ] == ts_id )
302+ & (ts_data_all ["version_date" ] == version_date_dt )
303+ ]
304+ else :
305+ version_date_dt = None
306+ ts_data = ts_data_all [
307+ (ts_data_all ["ts_id" ] == ts_id ) & ts_data_all ["version_date" ].isna ()
308+ ]
309+ if not data .empty :
310+ executor .submit (
311+ store_ts_ids , ts_data , ts_id , office_id , version_date_dt
312+ )
308313
309314
310315def store_timeseries (
0 commit comments