Skip to content

Commit 84737b3

Browse files
committed
feat: include the 'client' field in the querying and prioritize FTP client
1 parent 6aca1bd commit 84737b3

6 files changed

Lines changed: 70 additions & 51 deletions

File tree

pysus/api/_impl/databases.py

Lines changed: 21 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,8 @@
2424
from typing import Literal
2525

2626
import pandas as pd
27-
from anyio import to_thread
2827
from pysus.api.client import PySUS
29-
from pysus.api.ducklake.catalog import CatalogDataset, CatalogFile, DatasetGroup
3028
from pysus.api.types import State
31-
from sqlalchemy.orm import joinedload
3229
from tqdm import tqdm
3330

3431

@@ -302,66 +299,44 @@ def list_files(
302299
"CNES",
303300
"CIHA",
304301
],
302+
client: Literal["FTP", "DadosGov"] | None = None,
305303
group: str | None = None,
306304
state: str | None = None,
307305
year: int | list[int] | None = None,
308306
month: int | list[int] | None = None,
309307
**kwargs,
310308
) -> pd.DataFrame:
311-
"""List catalog files for a dataset, filtered by group/state/year/month."""
309+
"""List catalog files filtered by client, group, state, year, and month."""
312310

313311
async def _list():
314312
async with PySUS() as pysus:
315-
ducklake = await pysus.get_ducklake()
316-
if ducklake._Session is None:
317-
await ducklake.connect()
318-
319-
def _query():
320-
with ducklake._Session() as session:
321-
q = session.query(CatalogFile).options(
322-
joinedload(CatalogFile.dataset),
323-
joinedload(CatalogFile.group),
324-
)
325-
326-
if dataset:
327-
q = q.join(CatalogDataset).filter(
328-
CatalogDataset.name == dataset.lower()
329-
)
313+
years = [year] if isinstance(year, int) else (year or [None])
314+
months = [month] if isinstance(month, int) else (month or [None])
330315

331-
if group:
332-
q = q.join(DatasetGroup).filter(
333-
DatasetGroup.name == group
316+
records = []
317+
for y in years:
318+
for m in months:
319+
records.extend(
320+
await pysus.query(
321+
client=client,
322+
dataset=dataset,
323+
group=group,
324+
state=state,
325+
year=y,
326+
month=m,
334327
)
335-
336-
if state:
337-
q = q.filter(CatalogFile.state == state.upper())
338-
339-
years = [year] if isinstance(year, int) else (year or [])
340-
months = (
341-
[month] if isinstance(month, int) else (month or [])
342328
)
343329

344-
if years:
345-
q = q.filter(CatalogFile.year.in_(years))
346-
if months:
347-
q = q.filter(CatalogFile.month.in_(months))
348-
349-
results = q.all()
350-
session.expunge_all()
351-
return results
352-
353-
records = await to_thread.run_sync(_query)
354-
355330
return [
356331
{
357-
"name": r.path.split("/")[-1],
358-
"path": r.path,
332+
"name": str(r.path).split("/")[-1],
333+
"path": str(r.path),
359334
"dataset": r.dataset.name if r.dataset else None,
360335
"group": r.group.name if r.group else None,
361-
"year": r.year,
362-
"month": r.month,
363-
"state": r.state,
364-
"modify": r.origin_modified,
336+
"year": r.record.year,
337+
"month": r.record.month,
338+
"state": r.record.state,
339+
"modify": r.record.origin_modified,
365340
}
366341
for r in records
367342
]

pysus/api/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ def get_completed_remote_paths(self) -> set[str]:
414414

415415
async def query(
416416
self,
417+
client: Literal["DadosGov", "FTP"] | None = None,
417418
dataset: str | None = None,
418419
group: str | None = None,
419420
state: str | None = None,
@@ -426,6 +427,7 @@ async def query(
426427
await self.get_ducklake()
427428
if self._ducklake is not None:
428429
return await self._ducklake.query(
430+
client=client,
429431
dataset=dataset,
430432
group=group,
431433
state=state,

pysus/api/ducklake/client.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from collections.abc import Callable
88
from pathlib import Path
9-
from typing import Any
9+
from typing import Any, Literal
1010

1111
import boto3
1212
import httpx
@@ -334,13 +334,14 @@ def _upload():
334334

335335
async def query(
336336
self,
337+
client: Literal["FTP", "DadosGov"] | None = None,
337338
dataset: str | None = None,
338339
group: str | None = None,
339340
state: str | None = None,
340341
year: int | None = None,
341342
month: int | None = None,
342343
) -> list[File]:
343-
"""Query catalog files by dataset, group, state, year, and/or month."""
344+
"""Filter catalog files by client, dataset, group, state, year."""
344345
if not self._Session:
345346
await self.connect()
346347

@@ -380,6 +381,30 @@ def _query():
380381
return results
381382

382383
records = await to_thread.run_sync(_query)
384+
385+
if client:
386+
prefix = f"public/data/{client.lower()}/"
387+
records = [r for r in records if r.path.startswith(prefix)]
388+
else:
389+
ftp = [r for r in records if r.path.startswith("public/data/ftp/")]
390+
dadosgov = [
391+
r for r in records if r.path.startswith("public/data/dadosgov/")
392+
]
393+
ftp_keys = set()
394+
for r in ftp:
395+
stem = Path(r.path).stem
396+
key = (r.dataset_id, r.year, r.month, stem)
397+
ftp_keys.add(key)
398+
399+
def has_ftp_match(r):
400+
stem = Path(r.path).stem
401+
if stem.endswith(".csv"):
402+
stem = stem[:-4]
403+
key = (r.dataset_id, r.year, r.month, stem)
404+
return key in ftp_keys
405+
406+
records = ftp + [r for r in dadosgov if not has_ftp_match(r)]
407+
383408
return [
384409
File(
385410
path=r.path,

pysus/management/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ async def upload(
7373

7474
if row:
7575
dataset_id = row[0]
76+
origin_val = "'FTP'" if is_ftp else "'API'"
77+
cursor.execute(
78+
f"UPDATE pysus.datasets SET origin = {origin_val} "
79+
f"WHERE id = {dataset_id}"
80+
)
7681
else:
7782
cursor.execute("SELECT MAX(id) FROM pysus.datasets")
7883
max_id = cursor.fetchone()[0]

pysus/tests/api/ducklake/test_client.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,29 @@ async def test_is_authenticated_false_no_credentials(self):
4242

4343
@pytest.mark.asyncio
4444
async def test_is_authenticated_with_credentials(self):
45+
from unittest.mock import patch
46+
4547
client = DuckLake()
46-
await client.login(access_key="key", secret_key="secret")
48+
with patch.object(client, "_load_catalog"):
49+
await client.login(access_key="key", secret_key="secret")
4750
assert client._is_authenticated is True
4851

4952
@pytest.mark.asyncio
5053
async def test_login_sets_credentials(self):
54+
from unittest.mock import patch
55+
5156
client = DuckLake()
52-
await client.login(access_key="key", secret_key="secret")
57+
with patch.object(client, "_load_catalog"):
58+
await client.login(access_key="key", secret_key="secret")
5359
assert client.credentials is not None
5460

5561
@pytest.mark.asyncio
5662
async def test_login_creates_s3_client(self):
63+
from unittest.mock import patch
64+
5765
client = DuckLake()
58-
await client.login(access_key="key", secret_key="secret")
66+
with patch.object(client, "_load_catalog"):
67+
await client.login(access_key="key", secret_key="secret")
5968
assert client._s3_client is not None
6069
client._s3_client = None
6170

pysus/tests/api/test_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ async def test_query_with_dataset(self, test_db_path, tmp_path):
201201
result = await client.query(dataset="sinan")
202202

203203
mock_ducklake.query.assert_called_once_with(
204+
client=None,
204205
dataset="sinan",
205206
group=None,
206207
state=None,
@@ -227,6 +228,7 @@ async def test_query_with_group(self, test_db_path):
227228
await client.query(dataset="sinan", group="DENGUE")
228229

229230
mock_ducklake.query.assert_called_once_with(
231+
client=None,
230232
dataset="sinan",
231233
group="DENGUE",
232234
state=None,
@@ -258,6 +260,7 @@ async def test_query_with_all_params(self, test_db_path):
258260
)
259261

260262
mock_ducklake.query.assert_called_once_with(
263+
client=None,
261264
dataset="sinasc",
262265
group="DC",
263266
state="SP",

0 commit comments

Comments
 (0)