Skip to content

Commit cf35aa6

Browse files
committed
create a PySUS orchestrator that will be a bridge between remote clients and local files
1 parent 0e5e70d commit cf35aa6

19 files changed

Lines changed: 668 additions & 134 deletions

.pre-commit-config.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ repos:
3434
'pydocstyle>=6.3.0',
3535
]
3636

37+
- repo: https://github.com/pre-commit/mirrors-mypy
38+
rev: v1.8.0
39+
hooks:
40+
- id: mypy
41+
additional_dependencies: [
42+
'types-python-dateutil',
43+
'types-requests',
44+
'types-setuptools',
45+
'pandas-stubs',
46+
'pydantic>=2.0.0',
47+
]
48+
args: [--ignore-missing-imports, --explicit-package-bases]
49+
3750
- repo: https://github.com/asottile/pyupgrade
3851
rev: v3.15.0
3952
hooks:

pysus/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
from importlib import metadata as importlib_metadata
66
from typing import Final
77

8-
CACHEPATH: Final[str] = os.getenv(
9-
"PYSUS_CACHEPATH",
10-
os.path.join(str(pathlib.Path.home()), "pysus"),
8+
CACHEPATH: Final[pathlib.Path] = pathlib.Path(
9+
os.getenv(
10+
"PYSUS_CACHEPATH",
11+
os.path.join(str(pathlib.Path.home()), "pysus"),
12+
)
1113
)
1214

1315
# from pysus.api.ftp.databases import * # noqa

pysus/api/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .client import PySUS as PySUSClient # noqa

pysus/api/client.py

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
import enum
2+
from collections.abc import Callable
3+
from datetime import datetime
4+
from pathlib import Path
5+
6+
from pysus import CACHEPATH
7+
from sqlalchemy import Column, Integer, DateTime, Enum, String, create_engine
8+
from sqlalchemy.orm import declarative_base, sessionmaker
9+
10+
from .dadosgov import DadosGovClient
11+
from .ducklake import DuckLakeClient
12+
from .ftp import FTPClient
13+
from .models import BaseLocalFile, BaseRemoteFile
14+
15+
Base = declarative_base()
16+
17+
18+
class DownloadStatus(enum.Enum):
19+
PENDING = "pending"
20+
DOWNLOADING = "downloading"
21+
COMPLETED = "completed"
22+
FAILED = "failed"
23+
MISSING = "missing"
24+
25+
26+
class LocalFileState(Base):
27+
__tablename__ = "local_file_state"
28+
path = Column(String, primary_key=True)
29+
remote_path = Column(String, nullable=False)
30+
client_name = Column(String, nullable=False)
31+
32+
year = Column(Integer, nullable=True)
33+
month = Column(Integer, nullable=True)
34+
state = Column(String, nullable=True)
35+
group = Column(String, nullable=True)
36+
37+
status = Column(Enum(DownloadStatus), default=DownloadStatus.PENDING)
38+
sha256 = Column(String, nullable=True)
39+
last_synced = Column(DateTime, default=datetime.utcnow)
40+
41+
42+
class PySUS:
43+
def __init__(self, db_path: str = CACHEPATH / "config.db"):
44+
db_path = Path(db_path)
45+
db_path.parent.mkdir(parents=True, exist_ok=True)
46+
47+
self.engine = create_engine(f"duckdb:///{db_path}")
48+
Base.metadata.create_all(self.engine)
49+
self.Session = sessionmaker(bind=self.engine)
50+
51+
self._ducklake: DuckLakeClient | None = None
52+
self._ftp: FTPClient | None = None
53+
self._dadosgov: DadosGovClient | None = None
54+
55+
async def __aenter__(self):
56+
self._ducklake = DuckLakeClient(engine=self.engine)
57+
await self._ducklake._load_catalog()
58+
self._attach_client_catalog("ducklake", self._ducklake.catalog_path)
59+
return self
60+
61+
async def get_dadosgov(self, access_token: str) -> DadosGovClient:
62+
if self._dadosgov is None:
63+
self._dadosgov = DadosGovClient()
64+
await self._dadosgov.connect(token=access_token)
65+
return self._dadosgov
66+
67+
async def get_ftp(self) -> FTPClient:
68+
if self._ftp is None:
69+
self._ftp = FTPClient()
70+
await self._ftp.connect()
71+
return self._ftp
72+
73+
async def get_local_file(
74+
self,
75+
file: BaseRemoteFile,
76+
) -> BaseLocalFile | None:
77+
from pysus.api.extensions import ExtensionFactory
78+
79+
client_name = file.client.name.lower()
80+
remote_path = file.path
81+
82+
with self.Session() as session:
83+
records = (
84+
session.query(LocalFileState)
85+
.filter_by(
86+
remote_path=remote_path,
87+
client_name=client_name,
88+
status=DownloadStatus.COMPLETED,
89+
)
90+
.all()
91+
)
92+
93+
if not records:
94+
return None
95+
96+
parquet_version = next(
97+
(r for r in records if r.path.endswith(".parquet")), None
98+
)
99+
file = parquet_version or records[0]
100+
101+
return await ExtensionFactory.instantiate(file.path)
102+
103+
def _attach_client_catalog(self, name: str, path: str):
104+
abs_path = str(Path(path).absolute())
105+
with self.engine.connect() as conn:
106+
q = "SELECT database_name FROM duckdb_databases() WHERE path = ?"
107+
existing = conn.exec_driver_sql(q, (abs_path,)).fetchone()
108+
109+
if not existing:
110+
conn.exec_driver_sql(f"ATTACH '{abs_path}' AS {
111+
name} (READ_ONLY)")
112+
113+
async def __aexit__(self, exc_type, exc_val, exc_tb):
114+
if self._ducklake:
115+
await self._ducklake.close()
116+
if self._ftp:
117+
await self._ftp.close()
118+
if self._dadosgov:
119+
await self._dadosgov.close()
120+
self.engine.dispose()
121+
122+
def _get_dest_path(self, client_name: str, remote_path: str) -> Path:
123+
return CACHEPATH / "downloads" / client_name / remote_path.lstrip("/")
124+
125+
async def _update_state(
126+
self,
127+
local_path: Path,
128+
remote_path: str,
129+
client_name: str,
130+
status: DownloadStatus,
131+
year: int = None,
132+
month: int = None,
133+
state: str = None,
134+
group: str = None,
135+
):
136+
with self.Session() as session:
137+
record = (
138+
session.query(LocalFileState).filter_by(
139+
path=str(local_path)).first()
140+
)
141+
if not record:
142+
record = LocalFileState(
143+
path=str(local_path),
144+
remote_path=remote_path,
145+
client_name=client_name,
146+
year=year,
147+
month=month,
148+
state=state,
149+
group=group,
150+
)
151+
session.add(record)
152+
153+
record.status = status
154+
record.last_synced = datetime.utcnow()
155+
session.commit()
156+
157+
async def download(
158+
self,
159+
file: BaseRemoteFile,
160+
token: str = None,
161+
callback: Callable = None,
162+
):
163+
from pysus.api.extensions import ExtensionFactory
164+
165+
existing_local = await self.get_local_file(file)
166+
if existing_local and existing_local.path.exists():
167+
return existing_local
168+
169+
client_name = file.client.name.lower()
170+
remote_path = file.path
171+
local_path = self._get_dest_path(client_name, remote_path)
172+
173+
local_path.parent.mkdir(parents=True, exist_ok=True)
174+
175+
await self._update_state(
176+
local_path, remote_path, client_name, DownloadStatus.DOWNLOADING
177+
)
178+
179+
try:
180+
if client_name == "ducklake":
181+
await self._ducklake._download_file(file, local_path, callback)
182+
elif client_name == "ftp":
183+
client = await self.get_ftp()
184+
await client._download_file(file, local_path, callback)
185+
elif client_name == "dadosgov":
186+
client = await self.get_dadosgov(token)
187+
await client._download_file(file, local_path, callback)
188+
else:
189+
raise ValueError(f"No download logic for client: {client_name}")
190+
191+
await self._update_state(
192+
local_path=local_path,
193+
remote_path=remote_path,
194+
client_name=client_name,
195+
status=DownloadStatus.DOWNLOADING,
196+
year=file.year,
197+
month=file.month,
198+
state=file.state,
199+
group=getattr(file.group, "name", None),
200+
)
201+
return await ExtensionFactory.instantiate(local_path)
202+
203+
except Exception:
204+
await self._update_state(
205+
local_path, remote_path, client_name, DownloadStatus.FAILED
206+
)
207+
raise
208+
209+
async def download_to_parquet(
210+
self,
211+
file: BaseRemoteFile,
212+
token: str = None,
213+
callback: Callable = None,
214+
):
215+
local_file = await self.download(
216+
file=file,
217+
token=token,
218+
callback=callback,
219+
)
220+
221+
if hasattr(local_file, "to_parquet"):
222+
parquet_file = await local_file.to_parquet()
223+
224+
await self._update_state(
225+
local_path=parquet_file.path,
226+
remote_path=file.path,
227+
client_name=file.client.name.lower(),
228+
status=DownloadStatus.COMPLETED,
229+
)
230+
return parquet_file
231+
return local_file

pysus/api/dadosgov/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .client import DadosGov as DadosGovClient # noqa

pysus/api/dadosgov/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async def connect(self, token: str | None = None) -> None:
7676
self._client = httpx.AsyncClient(
7777
base_url=self.base_url,
7878
headers=headers,
79-
timeout=60.0,
79+
timeout=30.0,
8080
follow_redirects=True,
8181
)
8282

pysus/api/dadosgov/databases.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from typing import List
2-
31
from .models import Dataset
42

53

0 commit comments

Comments
 (0)