Skip to content

Commit 63e164c

Browse files
committed
feat: improve ducklake api struture
feat: global variables usage feat: include more orm models to manage .duckdb files
1 parent 714edad commit 63e164c

15 files changed

Lines changed: 412 additions & 7813 deletions

File tree

docker/Dockerfile renamed to Dockerfile

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,10 @@ RUN useradd -ms /bin/bash pysus \
2525

2626
COPY pyproject.toml poetry.lock LICENSE README.md /usr/src/
2727
COPY pysus /usr/src/pysus
28-
COPY docker/scripts/entrypoint.sh /entrypoint.sh
29-
COPY docker/notebooks/ /home/pysus/Notebooks/
28+
COPY entrypoint.sh /entrypoint.sh
3029

3130
RUN pip install poetry \
3231
&& cd /usr/src && poetry config virtualenvs.create false && poetry install --with docs \
33-
&& pip install 'httpx<0.28' \
3432
&& chown -R pysus:pysus /home/pysus
3533

3634
USER pysus
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
services:
22
jupyter:
33
build:
4-
context: ".."
5-
dockerfile: docker/Dockerfile
4+
context: "."
5+
dockerfile: Dockerfile
66
hostname: pysus-jupyter
77
container_name: pysus-jupyter
88
ports:

docker/notebooks/Welcome.ipynb

Lines changed: 0 additions & 89 deletions
This file was deleted.

docker/scripts/poetry-install.sh

Lines changed: 0 additions & 6 deletions
This file was deleted.

pysus/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ def get_version() -> str:
2424
__version__: str = version
2525

2626
from pysus.api._impl.databases import * # noqa
27+
from pysus.api import types # noqa
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
from abc import ABC
2+
from pathlib import Path
3+
4+
import httpx
5+
from anyio import to_thread
6+
from pydantic import BaseModel, SecretStr
7+
from sqlalchemy.engine import Engine
8+
from sqlalchemy import create_engine
9+
from sqlalchemy.pool import StaticPool
10+
from sqlalchemy.orm import sessionmaker, Session
11+
12+
from pysus import CACHEPATH, types
13+
14+
15+
class DuckLakeCredentials(BaseModel):
16+
"""Credentials for authenticating with the S3-compatible object storage.
17+
18+
Parameters
19+
----------
20+
access_key : SecretStr
21+
The S3 access key ID.
22+
secret_key : SecretStr
23+
The S3 secret access key.
24+
25+
:meta private:
26+
"""
27+
28+
access_key: SecretStr
29+
secret_key: SecretStr
30+
31+
32+
class BaseAdapter(ABC):
33+
cache_dir: Path = Path(CACHEPATH) / "ducklake"
34+
db_local: Path
35+
db_remote: Path
36+
37+
def __init__(
38+
self, engine=None, credentials: DuckLakeCredentials | None = None, **data
39+
) -> None:
40+
self._engine = engine
41+
self._session_factory = None
42+
self.cache_dir.mkdir(parents=True, exist_ok=True)
43+
self.credentials = credentials
44+
45+
@property
46+
def remote_url(self) -> str:
47+
return f"https://{types.S3_ENDPOINT}/{types.S3_BUCKET}/{self.db_remote}"
48+
49+
def get_session(self) -> Session:
50+
if not self._session_factory:
51+
raise RuntimeError("Database engine not initialized. Call connect() first.")
52+
return self._session_factory()
53+
54+
async def connect(self, force: bool = False) -> None:
55+
if self._engine and not force:
56+
if not self._Session:
57+
self._Session = sessionmaker(bind=self._engine)
58+
return
59+
60+
await self._download_catalog(
61+
self._catalog_local,
62+
self._catalog_remote,
63+
)
64+
self._engine = await to_thread.run_sync(self.setup_engine)
65+
self._Session = sessionmaker(bind=self._engine)
66+
67+
def setup_engine(
68+
self, access_key: str | None = None, secret_key: str | None = None
69+
) -> Engine:
70+
engine: Engine = create_engine(
71+
f"duckdb:///{self.db_local}",
72+
poolclass=StaticPool,
73+
)
74+
75+
with engine.connect() as conn:
76+
conn.exec_driver_sql("INSTALL ducklake; LOAD ducklake;")
77+
78+
has_pysus = conn.exec_driver_sql(
79+
"SELECT 1 FROM information_schema.schemata WHERE schema_name = 'pysus'"
80+
).fetchone()
81+
82+
if has_pysus:
83+
conn.exec_driver_sql("SET search_path='pysus,main';")
84+
else:
85+
conn.exec_driver_sql("SET search_path='main';")
86+
87+
s3_cfg = {
88+
"s3_endpoint": types.S3_ENDPOINT,
89+
"s3_region": types.S3_REGION,
90+
"s3_url_style": "path",
91+
"s3_use_ssl": "true",
92+
}
93+
94+
if access_key and secret_key:
95+
s3_cfg["s3_access_key_id"] = access_key
96+
s3_cfg["s3_secret_access_key"] = secret_key
97+
98+
for key, value in s3_cfg.items():
99+
conn.exec_driver_sql(f"SET {key}='{value}'")
100+
101+
conn.commit()
102+
103+
return engine
104+
105+
async def _download_catalog(self, local_path: Path, remote_path: str) -> None:
106+
url = f"https://{types.S3_ENDPOINT}/{types.S3_BUCKET}/{remote_path}"
107+
108+
if local_path.exists():
109+
try:
110+
local_size = local_path.stat().st_size
111+
except OSError:
112+
local_size = -1
113+
else:
114+
local_size = -1
115+
116+
async with httpx.AsyncClient(follow_redirects=True) as client:
117+
try:
118+
head = await client.head(url)
119+
head.raise_for_status()
120+
remote_size = int(head.headers.get("content-length", 0))
121+
except Exception: # noqa: B902
122+
remote_size = 0
123+
124+
if remote_size == local_size:
125+
return
126+
127+
await self._download(remote_path, local_path)
128+
129+
async def _upload_catalog(self) -> None:
130+
"""Upload all per-dataset catalogs to remote storage.
131+
132+
Requires authenticated credentials.
133+
134+
:meta private:
135+
"""
136+
if not self.credentials:
137+
raise PermissionError(
138+
"Admin credentials required to upload catalog.",
139+
)
140+
141+
if not self.db_local.exists():
142+
raise FileNotFoundError("catalog file not found")
143+
144+
def _upload():
145+
self._s3_client.upload_file(
146+
self.db_local.absolute(),
147+
types.S3_BUCKET,
148+
self.db_remote,
149+
)
150+
151+
await to_thread.run_sync(_upload)
152+
153+
154+
class CatalogAdapter(BaseAdapter):
155+
def __init__(self, engine=None, **data) -> None:
156+
super().__init__(engine=engine, **data)
157+
self.db_local: Path = self.cache_dir / "catalog.duckdb"
158+
self.db_remote: str = "public/catalog.duckdb"
159+
160+
161+
class DatasetAdapter(BaseAdapter):
162+
def __init__(self, name: str, engine=None, **data) -> None:
163+
super().__init__(engine=engine, **data)
164+
self.dataset_name: str = name
165+
self.db_local: Path = self.cache_dir / f"catalog_{name}.duckdb"
166+
self.db_remote: str = f"datasets/catalog_{name}.duckdb"
167+
168+
169+
class ColumnsAdapter(BaseAdapter):
170+
def __init__(self, engine=None, **data) -> None:
171+
super().__init__(engine=engine, **data)
172+
self.db_local: Path = self.cache_dir / "columns.duckdb"
173+
self.db_remote: str = "public/columns.duckdb"

0 commit comments

Comments
 (0)