@@ -262,29 +262,22 @@ def upload_dataset(self, public=True) -> DatasetFile or None:
262262 return None
263263
264264 @with_db_session
265- def process_from_bucket_latest (
266- self , db_session , public = True
267- ) -> DatasetFile or None :
265+ def process_from_bucket (self , db_session , public = True ) -> Optional [DatasetFile ]:
268266 """
269- Uploads a dataset to a GCP bucket as <feed_stable_id>/latest.zip and
270- <feed_stable_id>/<feed_stable_id>-<upload_datetime>.zip
271- if the dataset hash is different from the latest dataset stored
272- :return: the file hash and the hosted url as a tuple or None if no upload is required
267+ Process an existing dataset from the GCP bucket updates the related database entities
268+ :return: The DatasetFile object created
273269 """
274270 temp_file_path = None
275271 try :
276- self .logger .info ("Accessing URL %s" , self .producer_url )
277272 temp_file_path = self .generate_temp_filename ()
278- blob_file_path = f"{ self .feed_stable_id } /latest.zip"
273+ blob_file_path = f"{ self .feed_stable_id } /{ self .dataset_stable_id } /{ self .dataset_stable_id } .zip"
274+ self .logger .info (f"Processing dataset from bucket: { blob_file_path } " )
279275 download_from_gcs (
280276 os .getenv ("DATASETS_BUCKET_NAME" ), blob_file_path , temp_file_path
281277 )
282278
283279 extracted_files_path = self .unzip_files (temp_file_path )
284- dataset_full_path = f"{ self .feed_stable_id } /{ self .dataset_stable_id } /{ self .dataset_stable_id } .zip"
285- self .logger .info (
286- f"Creating file { dataset_full_path } in bucket { self .bucket_name } "
287- )
280+
288281 _ , extracted_files = self .upload_files_to_storage (
289282 temp_file_path ,
290283 self .dataset_stable_id ,
@@ -296,7 +289,7 @@ def process_from_bucket_latest(
296289 dataset_file = DatasetFile (
297290 stable_id = self .dataset_stable_id ,
298291 file_sha256_hash = self .latest_hash ,
299- hosted_url = f"{ self .public_hosted_datasets_url } /{ dataset_full_path } " ,
292+ hosted_url = f"{ self .public_hosted_datasets_url } /{ blob_file_path } " ,
300293 extracted_files = extracted_files ,
301294 zipped_size = (
302295 os .path .getsize (temp_file_path )
@@ -307,11 +300,21 @@ def process_from_bucket_latest(
307300 dataset = self .create_dataset_entities (
308301 dataset_file , skip_dataset_creation = True , db_session = db_session
309302 )
310- create_pipeline_tasks (dataset )
303+ if dataset and dataset .latest :
304+ self .logger .info (
305+ f"Creating pipeline tasks for latest dataset { dataset .stable_id } "
306+ )
307+ create_pipeline_tasks (dataset )
308+ elif dataset :
309+ self .logger .info (
310+ f"Dataset { dataset .stable_id } is not the latest, skipping pipeline tasks creation."
311+ )
312+ else :
313+ raise ValueError ("Dataset update failed, dataset is None." )
314+ return dataset_file
311315 finally :
312316 if temp_file_path and os .path .exists (temp_file_path ):
313317 os .remove (temp_file_path )
314- return None
315318
316319 def unzip_files (self , temp_file_path ):
317320 extracted_files_path = os .path .join (temp_file_path .split ("." )[0 ], "extracted" )
@@ -356,11 +359,11 @@ def create_dataset_entities(
356359 f"[{ self .feed_stable_id } ] No latest dataset found for feed."
357360 )
358361
359- self .logger .info (
360- f"[{ self .feed_stable_id } ] Creating new dataset for feed with stable id { dataset_file .stable_id } ."
361- )
362362 dataset = None
363363 if not skip_dataset_creation :
364+ self .logger .info (
365+ f"[{ self .feed_stable_id } ] Creating new dataset for feed with stable id { dataset_file .stable_id } ."
366+ )
364367 dataset = Gtfsdataset (
365368 id = str (uuid .uuid4 ()),
366369 feed_id = self .feed_id ,
@@ -377,22 +380,20 @@ def create_dataset_entities(
377380 else []
378381 ),
379382 zipped_size_bytes = dataset_file .zipped_size ,
380- unzipped_size_bytes = (
381- sum ([ex .file_size_bytes for ex in dataset_file .extracted_files ])
382- if dataset_file .extracted_files
383- else None
384- ),
383+ unzipped_size_bytes = self ._get_unzipped_size (dataset_file ),
385384 )
386385 db_session .add (dataset )
387386 elif skip_dataset_creation and latest_dataset :
387+ self .logger .info (
388+ f"[{ self .feed_stable_id } ] Updating latest dataset for feed with stable id "
389+ f"{ latest_dataset .stable_id } ."
390+ )
388391 latest_dataset .gtfsfiles = (
389392 dataset_file .extracted_files if dataset_file .extracted_files else []
390393 )
391394 latest_dataset .zipped_size_bytes = dataset_file .zipped_size
392- latest_dataset .unzipped_size_bytes = (
393- sum ([ex .file_size_bytes for ex in dataset_file .extracted_files ])
394- if dataset_file .extracted_files
395- else None
395+ latest_dataset .unzipped_size_bytes = self ._get_unzipped_size (
396+ dataset_file
396397 )
397398
398399 if latest_dataset and not skip_dataset_creation :
@@ -406,11 +407,19 @@ def create_dataset_entities(
406407 except Exception as e :
407408 raise Exception (f"Error creating dataset: { e } " )
408409
410+ @staticmethod
411+ def _get_unzipped_size (dataset_file ):
412+ return (
413+ sum ([ex .file_size_bytes for ex in dataset_file .extracted_files ])
414+ if dataset_file .extracted_files
415+ else None
416+ )
417+
409418 @with_db_session
410- def process_from_producer_url (self , db_session ) -> DatasetFile or None :
419+ def process_from_producer_url (self , db_session ) -> Optional [ DatasetFile ] :
411420 """
412421 Process the dataset and store new version in GCP bucket if any changes are detected
413- :return: the file hash and the hosted url as a tuple or None if no upload is required
422+ :return: the DatasetFile object created
414423 """
415424 dataset_file = self .upload_dataset ()
416425
@@ -531,7 +540,7 @@ def process_dataset(cloud_event: CloudEvent):
531540 json_payload .get ("dataset_stable_id" ),
532541 )
533542 if json_payload .get ("use_bucket_latest" , False ):
534- dataset_file = processor .process_from_bucket_latest ()
543+ dataset_file = processor .process_from_bucket ()
535544 else :
536545 dataset_file = processor .process_from_producer_url ()
537546 except Exception as e :
0 commit comments