Skip to content

Commit 527dba6

Browse files
committed
feat: improve ducklake api struture
feat: global variables usage feat: include more orm models to manage .duckdb files
1 parent 714edad commit 527dba6

20 files changed

Lines changed: 565 additions & 8207 deletions

File tree

docker/Dockerfile renamed to Dockerfile

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,10 @@ RUN useradd -ms /bin/bash pysus \
2525

2626
COPY pyproject.toml poetry.lock LICENSE README.md /usr/src/
2727
COPY pysus /usr/src/pysus
28-
COPY docker/scripts/entrypoint.sh /entrypoint.sh
29-
COPY docker/notebooks/ /home/pysus/Notebooks/
28+
COPY entrypoint.sh /entrypoint.sh
3029

3130
RUN pip install poetry \
3231
&& cd /usr/src && poetry config virtualenvs.create false && poetry install --with docs \
33-
&& pip install 'httpx<0.28' \
3432
&& chown -R pysus:pysus /home/pysus
3533

3634
USER pysus
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
services:
22
jupyter:
33
build:
4-
context: ".."
5-
dockerfile: docker/Dockerfile
4+
context: "."
5+
dockerfile: Dockerfile
66
hostname: pysus-jupyter
77
container_name: pysus-jupyter
88
ports:

docker/notebooks/Welcome.ipynb

Lines changed: 0 additions & 89 deletions
This file was deleted.

docker/scripts/poetry-install.sh

Lines changed: 0 additions & 6 deletions
This file was deleted.

pysus/api/client.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,9 @@ async def download(
322322

323323
if timeout is not None:
324324
with anyio.fail_after(timeout):
325-
await client._download_file(file, local_path, callback)
325+
await client.download(file, local_path, callback)
326326
else:
327-
await client._download_file(file, local_path, callback)
327+
await client.download(file, local_path, callback)
328328

329329
await self._update_state(
330330
local_path=local_path,
@@ -517,9 +517,7 @@ async def query(
517517
all_datasets = await self._ducklake.datasets()
518518

519519
if dataset:
520-
matching = [
521-
d for d in all_datasets if d.name.lower() == dataset.lower()
522-
]
520+
matching = [d for d in all_datasets if d.name.lower() == dataset.lower()]
523521
if not matching:
524522
return []
525523
target = matching[0]
@@ -618,9 +616,7 @@ def get_columns(path: Path) -> set[tuple[str, str]]:
618616

619617
else:
620618
paths_str = ", ".join(f"'{p}'" for p in paths)
621-
query = (
622-
f"SELECT * FROM read_parquet([{paths_str}], union_by_name=True)"
623-
)
619+
query = f"SELECT * FROM read_parquet([{paths_str}], union_by_name=True)"
624620

625621
if sql:
626622
if sql.upper().startswith("SELECT"):
@@ -633,9 +629,7 @@ def get_columns(path: Path) -> set[tuple[str, str]]:
633629
if not add_dv:
634630
return base
635631

636-
geocode_cols = [
637-
col[0] for col in base.description if is_geocode_column(col[0])
638-
]
632+
geocode_cols = [col[0] for col in base.description if is_geocode_column(col[0])]
639633
if not geocode_cols:
640634
return base
641635

pysus/api/dadosgov/models.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,9 @@ def _dedup_entries(
3030
if m:
3131
stem = filename[: m.start()]
3232
fmt = m.group(1).lower()
33-
grouped.setdefault(stem, []).append(
34-
(fmt, filename, recurso, metadata)
35-
)
33+
grouped.setdefault(stem, []).append((fmt, filename, recurso, metadata))
3634
else:
37-
grouped.setdefault(filename, []).append(
38-
("", filename, recurso, metadata)
39-
)
35+
grouped.setdefault(filename, []).append(("", filename, recurso, metadata))
4036

4137
result: list[tuple[str, Any, dict]] = []
4238
for _, items in grouped.items():
@@ -210,7 +206,7 @@ async def _download(
210206
"""Download the file to a local path."""
211207
if not output:
212208
output = CACHEPATH / self.name
213-
return await self.client._download_file(self, output, callback=callback)
209+
return await self.client.download(self, output, callback=callback)
214210

215211
async def fetch_size(self) -> int:
216212
"""Fetch the remote file size and update the local record.
@@ -249,9 +245,7 @@ class Group(BaseRemoteGroup):
249245
"""A group of files within a dataset."""
250246

251247
record: ConjuntoDados
252-
_formatter: Callable[[str], dict[str, Any]] | None = PrivateAttr(
253-
default=None
254-
)
248+
_formatter: Callable[[str], dict[str, Any]] | None = PrivateAttr(default=None)
255249

256250
def __init__(
257251
self,
@@ -319,9 +313,7 @@ async def _fetch_files(self) -> list[BaseRemoteFile]:
319313
"""Build File objects from the underlying resources."""
320314
entries: list[tuple[str, Any, dict]] = []
321315
for recurso in self.record.resources:
322-
filename = (
323-
recurso.file_name or recurso.url.split("/")[-1].split("?")[0]
324-
)
316+
filename = recurso.file_name or recurso.url.split("/")[-1].split("?")[0]
325317
if filename.lower().endswith(".pdf") or filename.startswith("get_"):
326318
continue
327319
metadata = {}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
from abc import ABC
2+
from pathlib import Path
3+
4+
import httpx
5+
from anyio import to_thread
6+
from pydantic import BaseModel, SecretStr
7+
from sqlalchemy.engine import Engine
8+
from sqlalchemy import create_engine
9+
from sqlalchemy.pool import StaticPool
10+
from sqlalchemy.orm import sessionmaker, Session
11+
12+
from pysus import CACHEPATH
13+
from pysus.api import types
14+
from pysus.api.ducklake.functional import download_s3, upload_s3
15+
16+
17+
class DuckLakeCredentials(BaseModel):
18+
access_key: SecretStr
19+
secret_key: SecretStr
20+
21+
22+
class BaseAdapter(ABC):
23+
cache_dir: Path = Path(CACHEPATH) / "ducklake"
24+
db_local: Path
25+
db_remote: Path
26+
27+
def __init__(
28+
self, engine=None, credentials: DuckLakeCredentials | None = None, **data
29+
) -> None:
30+
self._engine = engine
31+
self._session_factory = None
32+
self.cache_dir.mkdir(parents=True, exist_ok=True)
33+
self.credentials = credentials
34+
35+
@property
36+
def remote_url(self) -> str:
37+
return f"https://{types.S3_ENDPOINT}/{types.S3_BUCKET}/{self.db_remote}"
38+
39+
def get_session(self) -> Session:
40+
if not self._session_factory:
41+
raise RuntimeError("Database engine not initialized. Call connect() first.")
42+
return self._session_factory()
43+
44+
async def connect(self, force: bool = False) -> None:
45+
if self._engine and not force:
46+
if not self._session_factory:
47+
self._session_factory = sessionmaker(bind=self._engine)
48+
return
49+
50+
await self._download_catalog(
51+
self.db_local,
52+
str(self.db_remote),
53+
)
54+
self._engine = await to_thread.run_sync(self.setup_engine)
55+
self._session_factory = sessionmaker(bind=self._engine)
56+
57+
def setup_engine(
58+
self, access_key: str | None = None, secret_key: str | None = None
59+
) -> Engine:
60+
engine: Engine = create_engine(
61+
f"duckdb:///{self.db_local}",
62+
poolclass=StaticPool,
63+
)
64+
65+
with engine.connect() as conn:
66+
conn.exec_driver_sql("INSTALL ducklake; LOAD ducklake;")
67+
68+
has_pysus = conn.exec_driver_sql(
69+
"SELECT 1 FROM information_schema.schemata WHERE schema_name = 'pysus'"
70+
).fetchone()
71+
72+
if has_pysus:
73+
conn.exec_driver_sql("SET search_path='pysus,main';")
74+
else:
75+
conn.exec_driver_sql("SET search_path='main';")
76+
77+
s3_cfg = {
78+
"s3_endpoint": types.S3_ENDPOINT,
79+
"s3_region": types.S3_REGION,
80+
"s3_url_style": "path",
81+
"s3_use_ssl": "true",
82+
}
83+
84+
if access_key and secret_key:
85+
s3_cfg["s3_access_key_id"] = access_key
86+
s3_cfg["s3_secret_access_key"] = secret_key
87+
88+
for key, value in s3_cfg.items():
89+
conn.exec_driver_sql(f"SET {key}='{value}'")
90+
91+
conn.commit()
92+
93+
return engine
94+
95+
async def _download_catalog(self, local_path: Path, remote_path: str) -> None:
96+
url = f"https://{types.S3_ENDPOINT}/{types.S3_BUCKET}/{remote_path}"
97+
98+
if local_path.exists():
99+
try:
100+
local_size = local_path.stat().st_size
101+
except OSError:
102+
local_size = -1
103+
else:
104+
local_size = -1
105+
106+
async with httpx.AsyncClient(follow_redirects=True) as client:
107+
try:
108+
head = await client.head(url)
109+
head.raise_for_status()
110+
remote_size = int(head.headers.get("content-length", 0))
111+
except Exception:
112+
remote_size = 0
113+
114+
if remote_size == local_size:
115+
return
116+
117+
access_key = (
118+
self.credentials.access_key.get_secret_value() if self.credentials else None
119+
)
120+
secret_key = (
121+
self.credentials.secret_key.get_secret_value() if self.credentials else None
122+
)
123+
124+
await download_s3(
125+
remote_path=remote_path,
126+
local_path=local_path,
127+
access_key=access_key,
128+
secret_key=secret_key,
129+
)
130+
131+
async def _upload_catalog(self) -> None:
132+
if not self.credentials:
133+
raise PermissionError(
134+
"Admin credentials required to upload catalog.",
135+
)
136+
137+
if not self.db_local.exists():
138+
raise FileNotFoundError("catalog file not found")
139+
140+
await upload_s3(
141+
local_path=self.db_local,
142+
remote_path=str(self.db_remote),
143+
access_key=self.credentials.access_key.get_secret_value(),
144+
secret_key=self.credentials.secret_key.get_secret_value(),
145+
)
146+
147+
async def close(self, update: bool = False) -> None:
148+
if update:
149+
await self._upload_catalog()
150+
151+
if self._engine:
152+
await to_thread.run_sync(self._engine.dispose)
153+
self._engine = None
154+
self._session_factory = None
155+
156+
157+
class CatalogAdapter(BaseAdapter):
158+
def __init__(self, engine=None, **data) -> None:
159+
super().__init__(engine=engine, **data)
160+
self.db_local: Path = self.cache_dir / "catalog.duckdb"
161+
self.db_remote: str = "public/catalog.duckdb"
162+
163+
164+
class DatasetAdapter(BaseAdapter):
165+
def __init__(self, name: str, engine=None, **data) -> None:
166+
super().__init__(engine=engine, **data)
167+
self.dataset_name: str = name
168+
self.db_local: Path = self.cache_dir / f"catalog_{name}.duckdb"
169+
self.db_remote: str = f"datasets/catalog_{name}.duckdb"
170+
171+
172+
class ColumnsAdapter(BaseAdapter):
173+
def __init__(self, engine=None, **data) -> None:
174+
super().__init__(engine=engine, **data)
175+
self.db_local: Path = self.cache_dir / "columns.duckdb"
176+
self.db_remote: str = "public/columns.duckdb"

0 commit comments

Comments
 (0)