1313 ColumnDefinition ,
1414 DatasetGroup ,
1515 Origin ,
16- file_columns ,
1716)
1817from pysus .api .extensions import Parquet
1918from pysus .api .ftp .models import File as FTPFile
2019from pysus .api .models import BaseRemoteFile
21- from sqlalchemy import delete
2220
2321
2422class CatalogManager :
@@ -62,26 +60,59 @@ async def upload(
6260 if not self .pysus ._ducklake :
6361 raise ConnectionError ("DuckLake is not connected" )
6462
63+ s3_key = (
64+ f"public/data/{ file .client .name .lower ()} "
65+ f"/{ file .dataset .name .lower ()} /{ file .basename } "
66+ )
67+
6568 with self .pysus ._ducklake ._Session () as session :
6669 dataset = self ._get_or_create_dataset (session , file )
67- group = self ._get_or_create_group (session , file , dataset )
68- cat_file = self ._get_or_create_file (session , file , dataset , group )
6970
70- if not self ._should_upload (file , cat_file ):
71+ existing = (
72+ session .query (CatalogFile )
73+ .filter (
74+ CatalogFile .dataset_id == dataset .id ,
75+ CatalogFile .origin_path == str (file .path ),
76+ )
77+ .first ()
78+ )
79+
80+ if not existing :
81+ existing = (
82+ session .query (CatalogFile )
83+ .filter (
84+ CatalogFile .path == s3_key ,
85+ CatalogFile .dataset_id == dataset .id ,
86+ )
87+ .first ()
88+ )
89+
90+ if not existing :
91+ existing = (
92+ session .query (CatalogFile )
93+ .filter (
94+ CatalogFile .path == str (
95+ Path (s3_key ).with_suffix (".parquet" )),
96+ CatalogFile .dataset_id == dataset .id ,
97+ )
98+ .first ()
99+ )
100+
101+ if existing and self ._should_upload (file , existing ):
71102 return
72103
73- session .flush ()
104+ group = self ._get_or_create_group (session , file , dataset )
105+ cat_file = self ._get_or_create_file (session , file , dataset , group )
74106
75107 parquet_ext = await self .pysus .download_to_parquet (
76108 file = file , token = self .dadosgov_token , callback = callback
77109 )
78110
79- s3_key = (
80- f"public/data/{ file .client .name .lower ()} "
81- f"/{ file .dataset .name .lower ()} /{ parquet_ext .path .name } "
82- )
83-
84111 with self .pysus ._ducklake ._Session () as session :
112+ dataset = self ._get_or_create_dataset (session , file )
113+ group = self ._get_or_create_group (session , file , dataset )
114+ cat_file = self ._get_or_create_file (session , file , dataset , group )
115+
85116 existing_conflict = (
86117 session .query (CatalogFile )
87118 .filter (
@@ -91,17 +122,10 @@ async def upload(
91122 .first ()
92123 )
93124
94- if existing_conflict and existing_conflict .id != cat_file .id :
95- session .execute (
96- delete (file_columns ).where (
97- file_columns .c .file_id == existing_conflict .id
98- )
99- )
100- session .flush ()
101- session .delete (existing_conflict )
102- session .flush ()
103-
104- cat_file = session .merge (cat_file )
125+ if existing_conflict :
126+ cat_file = existing_conflict
127+ else :
128+ cat_file = session .merge (cat_file )
105129
106130 await self ._upload_to_s3 (parquet_ext .path , s3_key )
107131
@@ -141,11 +165,37 @@ def _should_upload(
141165 self ,
142166 file : BaseRemoteFile ,
143167 catalog_file : CatalogFile | None = None ,
168+ force : bool = False ,
144169 ) -> bool :
145- if catalog_file is None or catalog_file .origin_modified is None :
170+ if force :
171+ print (f"force=True, uploading { file .basename } " )
172+ return True
173+
174+ if catalog_file is None :
175+ print (f"no catalog record, uploading { file .basename } " )
176+ return True
177+
178+ if catalog_file .origin_modified is None :
179+ print (f"no origin_modified, uploading { file .basename } " )
180+ return True
181+
182+ file_mod = getattr (file , "modify" , None )
183+ if file_mod is None :
184+ print (f"no file modify date, uploading { file .basename } " )
185+ return True
186+
187+ if file_mod > catalog_file .origin_modified :
188+ print (f"{ catalog_file .origin_modified } newer than ({ file_mod } )" )
189+ return True
190+
191+ file_size = getattr (file , "size" , None )
192+ if file_size and file_size != catalog_file .size :
193+ print (f"size differs: file={
194+ file_size } , catalog={ catalog_file .size } " )
146195 return True
147196
148- return file .modify > catalog_file .origin_modified
197+ print (f"skipping { file .basename } - already up to date" )
198+ return False
149199
150200 def _get_or_create_dataset (
151201 self ,
0 commit comments