1010from pathlib import Path
1111from typing import TYPE_CHECKING , Literal
1212
13+ import anyio
1314import duckdb
15+ import pandas as pd
1416from pysus import CACHEPATH
1517from sqlalchemy import DateTime , Enum , Integer , String , create_engine
1618from sqlalchemy .orm import DeclarativeBase , Mapped , mapped_column , sessionmaker
@@ -235,14 +237,26 @@ async def download(
235237 file : BaseRemoteFile ,
236238 token : str | None = None ,
237239 callback : Callable | None = None ,
240+ timeout : float | None = None ,
238241 ) -> BaseLocalFile :
239- """Download a remote file and return a local file handle."""
242+ """Download a remote file and return a local file handle.
243+
244+ Parameters
245+ ----------
246+ timeout : float | None
247+ Maximum seconds to wait for the download. ``None`` (default) means
248+ no timeout – use this when the socket-level timeout on the
249+ underlying client is sufficient.
250+ """
240251
241252 from pysus .api .extensions import ExtensionFactory
242253
243254 existing_local = await self .get_local_file (file )
244255 if existing_local and existing_local .path .exists ():
245- return existing_local
256+ if existing_local .size == file .size :
257+ return existing_local
258+ await self ._delete_record (str (existing_local .path ))
259+ existing_local .path .unlink (missing_ok = True )
246260
247261 client_name = file .client .name .lower ()
248262 remote_path = file .path
@@ -271,7 +285,11 @@ async def download(
271285 f"No download logic for client: { client_name } " ,
272286 )
273287
274- await client ._download_file (file , local_path , callback )
288+ if timeout is not None :
289+ with anyio .fail_after (timeout ):
290+ await client ._download_file (file , local_path , callback )
291+ else :
292+ await client ._download_file (file , local_path , callback )
275293
276294 await self ._update_state (
277295 local_path = local_path ,
@@ -311,18 +329,22 @@ async def download_to_parquet(
311329 file : BaseRemoteFile ,
312330 token : str | None = None ,
313331 callback : Callable [[int , int ], None ] | None = None ,
332+ timeout : float | None = None ,
333+ add_dv : bool = True ,
314334 ) -> Parquet :
315335 """Download a file and convert it to Parquet format."""
316336
317337 local_file = await self .download (
318338 file = file ,
319339 token = token ,
320340 callback = callback ,
341+ timeout = timeout ,
321342 )
322343
323344 if hasattr (local_file , "to_parquet" ):
324345 original_path = local_file .path
325346 parquet_file = await local_file .to_parquet (callback = callback )
347+ parquet_file .add_dv = add_dv
326348
327349 await self ._update_state (
328350 local_path = parquet_file .path ,
@@ -346,7 +368,9 @@ async def download_to_parquet(
346368 )
347369
348370 def get_local_hierarchy (self ):
349- """Build a nested dict of cached files grouped by client and dataset."""
371+ """
372+ Build a nested dict of cached files grouped by client and dataset.
373+ """
350374
351375 with self .Session () as session :
352376 records = session .query (LocalFileState ).all ()
@@ -414,8 +438,20 @@ def read_parquet(
414438 paths : list [Path ],
415439 sql : str | None = None ,
416440 mode : Literal ["union" , "intersection" , "strict" ] = "union" ,
417- ) -> "DuckDBPyConnection" :
418- """Read Parquet files with optional schema handling and SQL filter."""
441+ add_dv : bool = True ,
442+ ) -> "DuckDBPyConnection | pd.DataFrame" :
443+ """Read Parquet files with optional schema handling and SQL filter.
444+
445+ Parameters
446+ ----------
447+ add_dv : bool
448+ When True, automatically applies the IBGE verification digit to
449+ municipality code columns. If there are matching columns, a
450+ DataFrame is returned instead of a DuckDBPyConnection.
451+ """
452+
453+ from pysus .api .utils import add_dv as _add_dv_fn
454+ from pysus .api .utils import is_geocode_column
419455
420456 if not paths :
421457 raise ValueError ("No paths provided" )
@@ -452,8 +488,7 @@ def get_columns(path: Path) -> set[tuple[str, str]]:
452488 else :
453489 paths_str = ", " .join (f"'{ p } '" for p in paths )
454490 query = (
455- f"SELECT * FROM read_parquet([{ paths_str } ], "
456- "union_by_name=True)"
491+ f"SELECT * FROM read_parquet([{ paths_str } ], union_by_name=True)"
457492 )
458493
459494 if sql :
@@ -462,4 +497,29 @@ def get_columns(path: Path) -> set[tuple[str, str]]:
462497 else :
463498 query = f"SELECT { sql } FROM ({ query } ) AS t"
464499
500+ base = duckdb .execute (query )
501+
502+ if not add_dv :
503+ return base
504+
505+ geocode_cols = [
506+ col [0 ] for col in base .description if is_geocode_column (col [0 ])
507+ ]
508+ if not geocode_cols :
509+ return base
510+
511+ duckdb .create_function (
512+ "__pysus_add_dv" ,
513+ _add_dv_fn ,
514+ null_handling = "special" ,
515+ )
516+ selects = [
517+ (
518+ f'__pysus_add_dv("{ c [0 ]} ") AS "{ c [0 ]} "'
519+ if c [0 ] in geocode_cols
520+ else f'"{ c [0 ]} "'
521+ )
522+ for c in base .description
523+ ]
524+ query = f"SELECT { ', ' .join (selects )} FROM ({ query } ) AS _t"
465525 return duckdb .execute (query )
0 commit comments