Skip to content

Commit 14eeb7a

Browse files
committed
create a PySUS orchestrator that will be a bridge between remote clients and local files
1 parent 0e5e70d commit 14eeb7a

17 files changed

Lines changed: 347 additions & 98 deletions

.pre-commit-config.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ repos:
3434
'pydocstyle>=6.3.0',
3535
]
3636

37+
- repo: https://github.com/pre-commit/mirrors-mypy
38+
rev: v1.8.0
39+
hooks:
40+
- id: mypy
41+
additional_dependencies: [
42+
'types-python-dateutil',
43+
'types-requests',
44+
'types-setuptools',
45+
'pandas-stubs',
46+
'pydantic>=2.0.0',
47+
]
48+
args: [--ignore-missing-imports, --explicit-package-bases]
49+
3750
- repo: https://github.com/asottile/pyupgrade
3851
rev: v3.15.0
3952
hooks:

pysus/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
from importlib import metadata as importlib_metadata
66
from typing import Final
77

8-
CACHEPATH: Final[str] = os.getenv(
9-
"PYSUS_CACHEPATH",
10-
os.path.join(str(pathlib.Path.home()), "pysus"),
8+
CACHEPATH: Final[pathlib.Path] = pathlib.Path(
9+
os.getenv(
10+
"PYSUS_CACHEPATH",
11+
os.path.join(str(pathlib.Path.home()), "pysus"),
12+
)
1113
)
1214

1315
# from pysus.api.ftp.databases import * # noqa

pysus/api/client.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
import enum
2+
from collections.abc import Callable
3+
from datetime import datetime
4+
from pathlib import Path
5+
6+
from pysus import CACHEPATH
7+
from sqlalchemy import Column, DateTime, Enum, String, create_engine
8+
from sqlalchemy.orm import declarative_base, sessionmaker
9+
10+
from .dadosgov import DadosGovClient
11+
from .ducklake import DuckLakeClient
12+
from .ftp import FTPClient
13+
from .models import BaseLocalFile, BaseRemoteFile
14+
15+
Base = declarative_base()
16+
17+
18+
class DownloadStatus(enum.Enum):
19+
PENDING = "pending"
20+
DOWNLOADING = "downloading"
21+
COMPLETED = "completed"
22+
FAILED = "failed"
23+
MISSING = "missing"
24+
25+
26+
class LocalFileState(Base):
27+
__tablename__ = "local_file_state"
28+
path = Column(String, primary_key=True)
29+
remote_path = Column(String, nullable=False)
30+
client_name = Column(String, nullable=False)
31+
status = Column(Enum(DownloadStatus), default=DownloadStatus.PENDING)
32+
sha256 = Column(String, nullable=True)
33+
last_synced = Column(DateTime, default=datetime.utcnow)
34+
35+
36+
class PySUS:
37+
def __init__(self, db_path: str = CACHEPATH / "config.db"):
38+
db_path = Path(db_path)
39+
db_path.parent.mkdir(parents=True, exist_ok=True)
40+
41+
self.engine = create_engine(f"duckdb:///{db_path}")
42+
Base.metadata.create_all(self.engine)
43+
self.Session = sessionmaker(bind=self.engine)
44+
45+
self._ducklake: DuckLakeClient | None = None
46+
self._ftp: FTPClient | None = None
47+
self._dadosgov: DadosGovClient | None = None
48+
49+
async def __aenter__(self):
50+
self._ducklake = DuckLakeClient(engine=self.engine)
51+
await self._ducklake._load_catalog()
52+
self._attach_client_catalog("ducklake", self._ducklake.catalog_path)
53+
return self
54+
55+
async def get_dadosgov(self, access_token: str) -> DadosGovClient:
56+
if self._dadosgov is None:
57+
self._dadosgov = DadosGovClient()
58+
await self._dadosgov.connect(token=access_token)
59+
return self._dadosgov
60+
61+
async def get_ftp(self) -> FTPClient:
62+
if self._ftp is None:
63+
self._ftp = FTPClient()
64+
await self._ftp.connect()
65+
return self._ftp
66+
67+
async def get_local_file(
68+
self,
69+
file: BaseRemoteFile,
70+
) -> BaseLocalFile | None:
71+
from pysus.api.extensions import ExtensionFactory
72+
73+
client_name = file.client.name.lower()
74+
remote_path = file.path
75+
76+
with self.Session() as session:
77+
records = (
78+
session.query(LocalFileState)
79+
.filter_by(
80+
remote_path=remote_path,
81+
client_name=client_name,
82+
status=DownloadStatus.COMPLETED,
83+
)
84+
.all()
85+
)
86+
87+
if not records:
88+
return None
89+
90+
parquet_version = next(
91+
(r for r in records if r.path.endswith(".parquet")), None
92+
)
93+
file = parquet_version or records[0]
94+
95+
return await ExtensionFactory.instantiate(file.path)
96+
97+
def _attach_client_catalog(self, name: str, path: str):
98+
abs_path = str(Path(path).absolute())
99+
with self.engine.connect() as conn:
100+
q = "SELECT database_name FROM duckdb_databases() WHERE path = ?"
101+
existing = conn.exec_driver_sql(q, (abs_path,)).fetchone()
102+
103+
if not existing:
104+
conn.exec_driver_sql(f"ATTACH '{abs_path}' AS {
105+
name} (READ_ONLY)")
106+
107+
async def __aexit__(self, exc_type, exc_val, exc_tb):
108+
if self._ducklake:
109+
await self._ducklake.close()
110+
if self._ftp:
111+
await self._ftp.close()
112+
if self._dadosgov:
113+
await self._dadosgov.close()
114+
self.engine.dispose()
115+
116+
def _get_dest_path(self, client_name: str, remote_path: str) -> Path:
117+
return CACHEPATH / "downloads" / client_name / remote_path.lstrip("/")
118+
119+
async def _update_state(
120+
self,
121+
local_path: Path,
122+
remote_path: str,
123+
client_name: str,
124+
status: DownloadStatus,
125+
):
126+
with self.Session() as session:
127+
state = (
128+
session.query(LocalFileState).filter_by(
129+
path=str(local_path)).first()
130+
)
131+
if not state:
132+
state = LocalFileState(
133+
path=str(local_path),
134+
remote_path=remote_path,
135+
client_name=client_name,
136+
)
137+
session.add(state)
138+
139+
state.status = status
140+
state.last_synced = datetime.utcnow()
141+
session.commit()
142+
143+
async def download(
144+
self,
145+
file: BaseRemoteFile,
146+
token: str = None,
147+
callback: Callable = None,
148+
):
149+
from pysus.api.extensions import ExtensionFactory
150+
151+
existing_local = await self.get_local_file(file)
152+
if existing_local and existing_local.path.exists():
153+
return existing_local
154+
155+
client_name = file.client.name.lower()
156+
remote_path = file.path
157+
local_path = self._get_dest_path(client_name, remote_path)
158+
159+
local_path.parent.mkdir(parents=True, exist_ok=True)
160+
161+
await self._update_state(
162+
local_path, remote_path, client_name, DownloadStatus.DOWNLOADING
163+
)
164+
165+
try:
166+
if client_name == "ducklake":
167+
await self._ducklake._download_file(file, local_path, callback)
168+
elif client_name == "ftp":
169+
client = await self.get_ftp()
170+
await client._download_file(file, local_path, callback)
171+
elif client_name == "dadosgov":
172+
client = await self.get_dadosgov(token)
173+
await client._download_file(file, local_path, callback)
174+
else:
175+
raise ValueError(f"No download logic for client: {client_name}")
176+
177+
await self._update_state(
178+
local_path, remote_path, client_name, DownloadStatus.COMPLETED
179+
)
180+
return await ExtensionFactory.instantiate(local_path)
181+
182+
except Exception:
183+
await self._update_state(
184+
local_path, remote_path, client_name, DownloadStatus.FAILED
185+
)
186+
raise
187+
188+
async def download_to_parquet(
189+
self,
190+
file: BaseRemoteFile,
191+
token: str = None,
192+
callback: Callable = None,
193+
):
194+
local_file = await self.download(
195+
file=file,
196+
token=token,
197+
callback=callback,
198+
)
199+
200+
if hasattr(local_file, "to_parquet"):
201+
parquet_file = await local_file.to_parquet()
202+
203+
await self._update_state(
204+
local_path=parquet_file.path,
205+
remote_path=file.path,
206+
client_name=file.client.name.lower(),
207+
status=DownloadStatus.COMPLETED,
208+
)
209+
return parquet_file
210+
return local_file

pysus/api/dadosgov/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .client import DadosGov as DadosGovClient # noqa

pysus/api/dadosgov/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async def connect(self, token: str | None = None) -> None:
7676
self._client = httpx.AsyncClient(
7777
base_url=self.base_url,
7878
headers=headers,
79-
timeout=60.0,
79+
timeout=30.0,
8080
follow_redirects=True,
8181
)
8282

pysus/api/dadosgov/databases.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from typing import List
2-
31
from .models import Dataset
42

53

pysus/api/dadosgov/models.py

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import asyncio
22
import pathlib
3-
from abc import ABC
3+
from abc import ABC, abstractmethod
44
from collections.abc import Callable
55
from datetime import datetime as dt
6-
from typing import Any, List, Optional, Union
6+
from typing import Any
77

88
import httpx
9+
from pydantic import PrivateAttr
910
from pysus.api.models import (
1011
BaseRemoteClient,
1112
BaseRemoteDataset,
@@ -19,6 +20,12 @@
1920
class File(BaseRemoteFile):
2021
record: Recurso
2122
type: str | None = "remote"
23+
_metadata: dict[str, Any] = PrivateAttr(default_factory=dict)
24+
25+
def __init__(self, **data):
26+
metadata = data.pop("_metadata", {})
27+
super().__init__(**data)
28+
self._metadata = metadata
2229

2330
def __repr__(self):
2431
return self.basename
@@ -51,6 +58,18 @@ def size(self) -> int:
5158
def modify(self) -> dt:
5259
return self.record.last_modified
5360

61+
@property
62+
def year(self) -> int | None:
63+
return self._metadata.get("year")
64+
65+
@property
66+
def month(self) -> int | None:
67+
return self._metadata.get("month")
68+
69+
@property
70+
def state(self) -> str | None:
71+
return self._metadata.get("state")
72+
5473
async def _download(
5574
self,
5675
output: pathlib.Path | None = None,
@@ -64,7 +83,7 @@ async def fetch_size(self) -> int:
6483
try:
6584
async with httpx.AsyncClient(
6685
follow_redirects=True,
67-
timeout=1,
86+
timeout=3,
6887
) as client:
6988
response = await client.head(self.path)
7089

@@ -85,6 +104,19 @@ async def fetch_size(self) -> int:
85104

86105
class Group(BaseRemoteGroup):
87106
record: ConjuntoDados
107+
_formatter: Callable[[Recurso, "Group"], dict[str, Any]] | None = (
108+
PrivateAttr(default=None)
109+
)
110+
111+
def __init__(
112+
self,
113+
record: ConjuntoDados,
114+
dataset: BaseRemoteDataset,
115+
formatter: Callable | None = None,
116+
):
117+
super().__init__(dataset=dataset)
118+
self.record = record
119+
self._formatter = formatter
88120

89121
def __repr__(self):
90122
return self.name
@@ -99,16 +131,15 @@ def long_name(self) -> str:
99131

100132
@property
101133
def description(self) -> str:
102-
return "" # TODO:
134+
return ""
103135

104136
async def _fetch_files(self) -> list[File]:
105137
files = []
106138
for recurso in self.record.resources:
107-
file = File(
108-
record=recurso,
109-
parent=self,
139+
metadata = (
140+
self._formatter(recurso, self) if self._formatter else {}
110141
)
111-
await file.fetch_size()
142+
file = File(record=recurso, parent=self, _metadata=metadata)
112143
files.append(file)
113144
return files
114145

@@ -119,13 +150,20 @@ class Dataset(BaseRemoteDataset, ABC):
119150
def __repr__(self):
120151
return self.name
121152

122-
async def _fetch_content(
123-
self,
124-
) -> list[Group]:
153+
@property
154+
@abstractmethod
155+
def formatter(self) -> Callable[[Recurso, Group], dict[str, Any]]:
156+
pass
157+
158+
async def _fetch_content(self) -> list[Group]:
125159
items: list[Group] = []
126160
client: BaseRemoteClient = self.client
127161
if self.ids:
128162
for group_id in self.ids:
129163
record = await client.get_dataset(group_id)
130-
items.append(Group(record=record, dataset=self))
164+
items.append(
165+
Group(
166+
record=record, dataset=self, formatter=self.formatter
167+
)
168+
)
131169
return items

pysus/api/ducklake/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .client import DuckLake as DuckLakeClient # noqa

0 commit comments

Comments
 (0)