Skip to content

Commit ece11f0

Browse files
committed
chore: implement context managers to every duckdb interaction
1 parent 527dba6 commit ece11f0

5 files changed

Lines changed: 174 additions & 60 deletions

File tree

pysus/api/ducklake/README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# DuckLake Catalog Component
2+
3+
This module provides the application-level models and data adapters required to interface with remote DuckLake resources over S3. It wraps low-level database operations into unified interfaces (`File`, `DuckDataset`, and `DuckGroup`) and implements resilient database connection management via async context managers.
4+
5+
## Features
6+
7+
* **Deterministic Resource Management**: Implements asynchronous context managers (`async with`) across clients and database adapters to prevent DuckDB file locks and connection leaks.
8+
* **Fail-Safe Cleanups**: Features fallback `__del__` destructors to safely terminate remaining active engines during garbage collection or interpreter shutdown.
9+
* **Intelligent Syncing**: Employs an `update_on_close` mechanism to optionally push state changes back to S3 automatically upon exiting a context.
10+
* **SQLAlchemy Eager Loading Fixes**: Optimizes attribute mappings using isolated path strategies (`joinedload` vs. `contains_eager`) based on query parameters.
11+
* **Resilient S3 Verifications**: Gracefully intercepts 404 responses during S3 download handshakes to stop failing connection retry loops early.
12+
13+
---
14+
15+
## Architecture Overview
16+
17+
The system separates the raw database management layer (Adapters) from the client wrapper layer (Client Models).
18+
19+
1. **Adapters (`BaseAdapter`)**: Track local and remote `.duckdb` target states, manage connections, handle S3 transfers, and expose scoped SQLAlchemy transaction sessions.
20+
2. **Client Components (`DuckLake`)**: Coordinate high-level actions, parse credential models, route queries, and handle collection loops.
21+
22+
---
23+
24+
## Lifecycle & Connection Handling
25+
26+
### Using Context Managers (Recommended)
27+
28+
Using `async with` blocks guarantees deterministic resource teardown. The moment execution exits the context layout block—even due to a runtime crash—all engines are disposed of cleanly.
29+
30+
```python
31+
from pysus.api.ducklake.client import DuckLake
32+
33+
async with DuckLake() as dl:
34+
datasets = await dl.datasets()
35+
36+
sia = datasets[4] # e.g., SIA
37+
files = await sia.query(state="SP", year=2026)

pysus/api/ducklake/__init__.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +0,0 @@
1-
"""DuckLake subpackage for interacting with the PySUS S3 catalog.
2-
3-
Provides a DuckDB-based client for querying and downloading
4-
public health datasets stored in object storage.
5-
"""
6-
7-
from .client import DuckLake as DuckLakeClient # noqa

pysus/api/ducklake/catalog/adapters.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from abc import ABC
22
from pathlib import Path
3+
import asyncio
34

45
import httpx
56
from anyio import to_thread
@@ -12,6 +13,7 @@
1213
from pysus import CACHEPATH
1314
from pysus.api import types
1415
from pysus.api.ducklake.functional import download_s3, upload_s3
16+
from pysus.api.ducklake.catalog.orm.dataset import DatasetBase
1517

1618

1719
class DuckLakeCredentials(BaseModel):
@@ -25,12 +27,17 @@ class BaseAdapter(ABC):
2527
db_remote: Path
2628

2729
def __init__(
28-
self, engine=None, credentials: DuckLakeCredentials | None = None, **data
30+
self,
31+
engine=None,
32+
credentials: DuckLakeCredentials | None = None,
33+
update_on_close: bool = False,
34+
**data,
2935
) -> None:
3036
self._engine = engine
3137
self._session_factory = None
3238
self.cache_dir.mkdir(parents=True, exist_ok=True)
3339
self.credentials = credentials
40+
self.update_on_close = update_on_close
3441

3542
@property
3643
def remote_url(self) -> str:
@@ -64,6 +71,7 @@ def setup_engine(
6471

6572
with engine.connect() as conn:
6673
conn.exec_driver_sql("INSTALL ducklake; LOAD ducklake;")
74+
conn.exec_driver_sql("CREATE SCHEMA IF NOT EXISTS pysus;")
6775

6876
has_pysus = conn.exec_driver_sql(
6977
"SELECT 1 FROM information_schema.schemata WHERE schema_name = 'pysus'"
@@ -90,6 +98,7 @@ def setup_engine(
9098

9199
conn.commit()
92100

101+
DatasetBase.metadata.create_all(bind=engine)
93102
return engine
94103

95104
async def _download_catalog(self, local_path: Path, remote_path: str) -> None:
@@ -106,8 +115,14 @@ async def _download_catalog(self, local_path: Path, remote_path: str) -> None:
106115
async with httpx.AsyncClient(follow_redirects=True) as client:
107116
try:
108117
head = await client.head(url)
118+
119+
if head.status_code == 404:
120+
return
121+
109122
head.raise_for_status()
110123
remote_size = int(head.headers.get("content-length", 0))
124+
except httpx.HTTPStatusError:
125+
return
111126
except Exception:
112127
remote_size = 0
113128

@@ -153,6 +168,28 @@ async def close(self, update: bool = False) -> None:
153168
self._engine = None
154169
self._session_factory = None
155170

171+
def __del__(self) -> None:
172+
if not hasattr(self, "_engine") or not self._engine:
173+
return
174+
try:
175+
loop = asyncio.get_running_loop()
176+
if loop.is_running():
177+
loop.create_task(self.close(update=False))
178+
except RuntimeError:
179+
try:
180+
asyncio.run(self.close(update=False))
181+
except Exception: # noqa
182+
pass
183+
except Exception: # noqa
184+
pass
185+
186+
async def __aenter__(self):
187+
await self.connect()
188+
return self
189+
190+
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
191+
await self.close(update=self.update_on_close)
192+
156193

157194
class CatalogAdapter(BaseAdapter):
158195
def __init__(self, engine=None, **data) -> None:
@@ -174,3 +211,4 @@ def __init__(self, engine=None, **data) -> None:
174211
super().__init__(engine=engine, **data)
175212
self.db_local: Path = self.cache_dir / "columns.duckdb"
176213
self.db_remote: str = "public/columns.duckdb"
214+

pysus/api/ducklake/client.py

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
capabilities backed by per-dataset DuckDB engines.
55
"""
66

7+
import asyncio
78
from collections.abc import Callable
89
from pathlib import Path
910

1011
from anyio import to_thread
11-
from pydantic import SecretStr, PrivateAttr
12+
from pydantic import SecretStr, PrivateAttr, Field
1213
from pysus.api.models import BaseRemoteClient
1314
from pysus.api.types import DUCKLAKE
1415

@@ -21,13 +22,17 @@
2122

2223
class DuckLake(BaseRemoteClient):
2324
credentials: DuckLakeCredentials | None = None
25+
update_on_close: bool = Field(default=False, exclude=True)
2426
_datasets: list[DuckDataset] = PrivateAttr(default_factory=list)
27+
_catalog_adap: CatalogAdapter = PrivateAttr()
2528

26-
def __init__(self, engine=None, **data) -> None:
29+
def __init__(self, engine=None, update_on_close: bool = False, **data) -> None:
2730
super().__init__(**data)
28-
self.catalog_adap = CatalogAdapter(
31+
self.update_on_close = update_on_close
32+
self._catalog_adap = CatalogAdapter(
2933
engine=engine,
3034
credentials=self.credentials,
35+
update_on_close=self.update_on_close,
3136
)
3237

3338
@property
@@ -43,24 +48,31 @@ def description(self) -> str:
4348
return ""
4449

4550
async def datasets(self, **kwargs) -> list[DuckDataset]:
46-
await self.catalog_adap.connect()
47-
4851
def _fetch():
49-
with self.catalog_adap.get_session() as session:
52+
with self._catalog_adap.get_session() as session:
5053
results = session.query(Dataset).all()
5154
session.expunge_all()
5255
return results
5356

54-
records = await to_thread.run_sync(_fetch)
55-
5657
duck_datasets: list[DuckDataset] = []
57-
for rec in records:
58-
dataset_adapter = DatasetAdapter(
59-
name=str(rec.name), credentials=self.credentials
60-
)
61-
duck_datasets.append(
62-
DuckDataset(record=rec, client=self, adapter=dataset_adapter)
63-
)
58+
59+
async with self._catalog_adap:
60+
records = await to_thread.run_sync(_fetch)
61+
62+
for rec in records:
63+
dataset_adapter = DatasetAdapter(
64+
name=str(rec.name),
65+
credentials=self.credentials,
66+
update_on_close=self.update_on_close,
67+
)
68+
duck_datasets.append(
69+
DuckDataset(
70+
record=rec,
71+
client=self,
72+
adapter=dataset_adapter,
73+
update_on_close=self.update_on_close,
74+
)
75+
)
6476

6577
self._datasets = duck_datasets
6678
return duck_datasets
@@ -75,18 +87,21 @@ async def login(
7587
access_key=SecretStr(access_key),
7688
secret_key=SecretStr(secret_key),
7789
)
78-
self.catalog_adap.credentials = self.credentials
79-
await self.catalog_adap.connect(force=True)
90+
self._catalog_adap.credentials = self.credentials
91+
await self._catalog_adap.connect(force=True)
8092

8193
async def connect(self, force: bool = False) -> None:
82-
await self.catalog_adap.connect(force=force)
94+
await self._catalog_adap.connect(force=force)
95+
96+
async def close(self, update_catalog: bool | None = None) -> None:
97+
should_update = (
98+
self.update_on_close if update_catalog is None else update_catalog
99+
)
83100

84-
async def close(self, update_catalog: bool = False) -> None:
85101
for ds in self._datasets:
86-
await ds.close(update_catalog=update_catalog)
102+
await ds.close(update_catalog=should_update)
87103

88-
await self.catalog_adap.close(update=update_catalog)
89-
self._datasets.clear()
104+
await self._catalog_adap.close(update=should_update)
90105

91106
async def download(
92107
self,
@@ -95,7 +110,7 @@ async def download(
95110
callback: Callable[[int, int], None] | None = None,
96111
) -> Path:
97112
if not isinstance(file, File):
98-
raise ValueError("FTP File was not properly instantiated")
113+
raise ValueError("DuckLake File was not properly instantiated")
99114

100115
access_key = (
101116
self.credentials.access_key.get_secret_value() if self.credentials else None
@@ -113,5 +128,27 @@ async def download(
113128
)
114129
return output
115130

131+
async def __aenter__(self):
132+
await self.connect()
133+
return self
134+
135+
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
136+
await self.close(update_catalog=None)
137+
138+
def __del__(self) -> None:
139+
if not hasattr(self, "_catalog_adap"):
140+
return
141+
try:
142+
loop = asyncio.get_running_loop()
143+
if loop.is_running():
144+
loop.create_task(self.close(update_catalog=False))
145+
except RuntimeError:
146+
try:
147+
asyncio.run(self.close(update_catalog=False))
148+
except Exception:
149+
pass
150+
except Exception:
151+
pass
152+
116153

117154
DuckDataset.model_rebuild(_types_namespace={"DuckLake": DuckLake})

0 commit comments

Comments
 (0)