Skip to content

Commit 21369f3

Browse files
committed
chore: fix tests & implement a semaphore on async downloads that was causing a throttle with too many files downloading at the same time
1 parent 511357c commit 21369f3

20 files changed

Lines changed: 421 additions & 688 deletions

.github/workflows/python-package.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ jobs:
7171
- uses: actions/checkout@v4
7272

7373
- name: Build Docker image
74-
run: docker compose -f docker/docker-compose.yaml build
74+
run: docker compose build
7575

7676
- name: Start container
77-
run: docker compose -f docker/docker-compose.yaml up -d
77+
run: docker compose up -d
7878

7979
- name: Wait for Jupyter
8080
run: |
@@ -84,8 +84,8 @@ jobs:
8484
done
8585
8686
- name: Run tests inside container
87-
run: docker compose -f docker/docker-compose.yaml exec -T -w /usr/src jupyter python3 -m pytest -vv pysus/tests/ --retries 3 --retry-delay 15 -x -o cache_dir=/tmp/.pytest_cache
87+
run: docker compose exec -T -w /usr/src jupyter python3 -m pytest -vv pysus/tests/ --retries 3 --retry-delay 15 -x -o cache_dir=/tmp/.pytest_cache
8888

8989
- name: Cleanup
9090
if: always()
91-
run: docker compose -f docker/docker-compose.yaml down -v
91+
run: docker compose down -v

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export PRINT_HELP_PYSCRIPT
2222
help:
2323
@python -c "$$PRINT_HELP_PYSCRIPT" < $(MAKEFILE_LIST)
2424

25-
DOCKER = docker compose -p pysus -f docker/docker-compose.yaml
25+
DOCKER = docker compose -p pysus
2626
SERVICE :=
2727
SEMANTIC_RELEASE = npx --yes \
2828
-p semantic-release \
@@ -56,7 +56,7 @@ test-pysus: ## run tests quickly with the default Python
5656

5757
.PHONY: test-pysus-with-coverage
5858
test-pysus-with-coverage: ## run tests with coverage report
59-
poetry run pytest -vv pysus/tests/ --retries 3 --retry-delay 15 --cov=pysus --cov-report=xml:coverage.xml --cov-report=term-missing
59+
poetry run pytest -vv pysus/tests/ --cov=pysus --cov-report=xml:coverage.xml --cov-report=term-missing
6060

6161
.PHONY: lint
6262
lint:

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ docker run -p 8888:8888 alertadengue/pysus
3737
Or build locally and start the container:
3838

3939
```bash
40-
docker compose -f docker/docker-compose.yaml up --build
40+
docker compose up --build
4141
```
4242

4343
Then open [http://127.0.0.1:8888/lab](http://127.0.0.1:8888/lab) in your browser.
4444

4545
Stop the container:
4646

4747
```bash
48-
docker compose -f docker/docker-compose.yaml down
48+
docker compose down
4949
```
5050

5151
## Quick Start
@@ -268,7 +268,7 @@ pytest tests/
268268
Run tests inside the Docker container:
269269

270270
```bash
271-
docker compose -f docker/docker-compose.yaml exec -T -w /usr/src jupyter python3 -m pytest pysus/tests/
271+
docker compose exec -T -w /usr/src jupyter python3 -m pytest pysus/tests/
272272
```
273273

274274
## License

docs/source/installation.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ Or build locally and start the container:
3434

3535
.. code-block:: bash
3636
37-
docker compose -f docker/docker-compose.yaml up --build
37+
docker compose up --build
3838
3939
Then open http://127.0.0.1:8888/lab in your browser.
4040

4141
Stop the container with:
4242

4343
.. code-block:: bash
4444
45-
docker compose -f docker/docker-compose.yaml down
45+
docker compose down
4646
4747
Development
4848
-----------

pysus/api/_impl/databases.py

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88
"""
99

1010
import asyncio
11-
from typing import Literal
11+
from typing import Literal, cast
1212

1313
import pandas as pd
1414
from pysus.api import types
1515
from pysus.api.client import PySUS
16-
from tqdm import tqdm
16+
from tqdm.asyncio import tqdm
1717

1818
__all__ = [
1919
"sinan",
@@ -57,10 +57,9 @@ def _fetch_data(
5757
month : int | list[int], optional
5858
Month or list of months to fetch.
5959
show_progress : bool, optional
60-
Whether to display a tqdm progress bar during download. Default is True.
60+
Whether to display a tqdm progress bar during download.
6161
as_dataframe : bool, optional
6262
Whether to concatenate and return the data as a pandas DataFrame.
63-
Default is False.
6463
**kwargs
6564
Additional arguments forwarded to :meth:`PySUS.read_parquet`.
6665
@@ -71,48 +70,41 @@ def _fetch_data(
7170
as_dataframe is True, returns a concatenated DataFrame.
7271
"""
7372

74-
async def _fetch():
75-
73+
async def _fetch() -> list[str] | pd.DataFrame:
7674
async with PySUS() as pysus:
77-
years = [year] if isinstance(year, int) else (year or [None])
78-
months = [month] if isinstance(month, int) else (month or [None])
75+
files = await pysus.query(
76+
dataset=dataset,
77+
group=group,
78+
state=state,
79+
year=year,
80+
month=month,
81+
)
7982

80-
files = []
81-
for y in years:
82-
for m in months:
83-
files.extend(
84-
await pysus.query(
85-
dataset=dataset,
86-
group=group,
87-
state=state,
88-
year=y,
89-
month=m,
90-
)
91-
)
83+
if not files:
84+
return pd.DataFrame() if as_dataframe else cast(list[str], [])
85+
86+
sem = asyncio.Semaphore(3)
87+
88+
async def _throttled_download(f):
89+
async with sem:
90+
return await pysus.download(f)
91+
92+
tasks = [_throttled_download(f) for f in files]
9293

93-
paths = []
9494
if show_progress:
95-
for file in tqdm(
96-
files,
95+
downloaded_files = await tqdm.gather(
96+
*tasks,
9797
desc=f"Downloading {dataset}",
9898
unit="file",
99-
):
100-
f = await pysus.download(file)
101-
paths.append(str(f.path))
99+
)
102100
else:
103-
for file in files:
104-
f = await pysus.download(file)
105-
paths.append(str(f.path))
101+
downloaded_files = await asyncio.gather(*tasks)
102+
103+
paths: list[str] = [str(f.path) for f in downloaded_files]
106104

107105
if as_dataframe:
108-
return (
109-
pysus.read_parquet(
110-
paths,
111-
**kwargs,
112-
).df()
113-
if paths
114-
else pd.DataFrame()
115-
)
106+
res = pysus.read_parquet(paths, **kwargs).df()
107+
return cast(pd.DataFrame, res)
116108

117109
return paths
118110

@@ -132,9 +124,11 @@ async def _fetch():
132124
"Install it with: pip install nest_asyncio"
133125
)
134126
raise RuntimeError(msg) from None
135-
return loop.run_until_complete(_fetch())
136-
else:
137-
return asyncio.run(_fetch())
127+
result = loop.run_until_complete(_fetch())
128+
return cast(list[str] | pd.DataFrame, result)
129+
130+
result = asyncio.run(_fetch())
131+
return cast(list[str] | pd.DataFrame, result)
138132

139133

140134
def sinan(

pysus/api/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,11 @@ async def download(
314314
)
315315
return await ExtensionFactory.instantiate(local_path)
316316

317-
except Exception as e: # noqa: B902
317+
except Exception as e: # noqa
318+
import traceback
319+
320+
traceback.print_exc()
321+
318322
await self._update_state(
319323
local_path,
320324
str(remote_path),

pysus/api/ducklake/catalog/adapters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def __init__(self, name: str, dataset_id: int, engine=None, **data) -> None:
239239
super().__init__(engine=engine, **data)
240240
self.dataset_name: str = name
241241
self.db_local: Path = self.cache_dir / f"catalog_{name}.duckdb"
242-
self.db_remote: Path = Path(f"datasets/catalog_{name}.duckdb")
242+
self.db_remote: Path = Path(f"public/catalog_{name}.duckdb")
243243
self.dataset_id = dataset_id
244244

245245

pysus/api/ducklake/functional.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,37 @@ async def download_http(
1717
url = f"https://{types.S3_ENDPOINT}/{types.S3_BUCKET}/{remote_path}"
1818
max_retries = 5
1919

20+
timeout = httpx.Timeout(15.0, read=60.0, write=20.0, connect=15.0)
21+
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
22+
2023
for attempt in range(max_retries):
2124
try:
2225
async with httpx.AsyncClient(
23-
follow_redirects=True, verify=False
26+
follow_redirects=True,
27+
verify=False,
28+
limits=limits,
29+
timeout=timeout,
2430
) as client:
2531
async with client.stream("GET", url) as r:
2632
r.raise_for_status()
2733
total = int(r.headers.get("Content-Length", 0))
2834
downloaded = 0
2935

3036
with open(local_path, "wb") as f:
31-
async for chunk in r.aiter_bytes(
32-
chunk_size=1024 * 1024
33-
):
37+
async for chunk in r.aiter_bytes(chunk_size=64 * 1024):
3438
await to_thread.run_sync(f.write, chunk)
3539
downloaded += len(chunk)
3640
if callback:
3741
callback(downloaded, total)
3842
return
39-
except (OSError, httpx.HTTPStatusError) as e:
43+
except (
44+
OSError,
45+
httpx.HTTPStatusError,
46+
httpx.ConnectError,
47+
httpx.ReadError,
48+
) as e:
4049
if attempt < max_retries - 1:
41-
await sleep(1)
50+
await sleep(2 * (attempt + 1))
4251
else:
4352
raise e
4453

pysus/api/ducklake/models.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def _calculate():
9797
class DuckDataset(BaseRemoteDataset):
9898
record: "Dataset" = Field(exclude=True)
9999
client: "DuckLake" = Field(exclude=True)
100-
border: "DatasetAdapter" = Field(exclude=True)
100+
border: Any = Field(exclude=True)
101101
update_on_close: bool = Field(default=False, exclude=True)
102102

103103
def __init__(self, **data) -> None:
@@ -143,12 +143,14 @@ async def query(
143143
self,
144144
group: str | list[str] | None = None,
145145
state: str | list[str] | None = None,
146-
year: int | list[int] | None = None,
147-
month: int | list[int] | None = None,
146+
year: int | list[int] | range | None = None,
147+
month: int | list[int] | range | None = None,
148148
) -> list[File]:
149149
def _to_list(val: Any) -> list[Any] | None:
150150
if val is None:
151151
return None
152+
if isinstance(val, range):
153+
return list(val)
152154
return val if isinstance(val, list) else [val]
153155

154156
groups = _to_list(group)

pysus/api/metadata/models.py

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,6 @@
33
from pysus.api.types import ColumnType, Origin
44

55

6-
def lookup_column_meta(name: str) -> dict[str, str] | None:
7-
"""Look up column metadata from the global columns.py constants.
8-
9-
Returns the {dataset: description} dict if the column name exists
10-
as a constant in columns.py, or None if not found.
11-
"""
12-
try:
13-
from pysus.api.ducklake.catalog import columns as _cols
14-
15-
return getattr(_cols, name.upper(), None)
16-
except ImportError:
17-
return None
18-
19-
20-
def pick_description(meta: dict[str, str] | None) -> str:
21-
"""Pick the best description from a column metadata dict."""
22-
if meta is None:
23-
return ""
24-
for desc in meta.values():
25-
if desc:
26-
return desc
27-
return ""
28-
29-
306
@dataclass
317
class Dataset:
328
name: str
@@ -70,11 +46,12 @@ class Column:
7046
dtype: ColumnType
7147

7248
@classmethod
73-
def from_schema(cls, name: str, dtype: ColumnType) -> "Column":
74-
"""Create a Column from a file schema, looking up description from
75-
columns.py."""
49+
def from_schema(
50+
cls, name: str, dtype: ColumnType, description: str = ""
51+
) -> "Column":
52+
"""Create a Column with a description provided from the database."""
7653
return cls(
7754
name=name,
78-
description=pick_description(lookup_column_meta(name)),
55+
description=description,
7956
dtype=dtype,
8057
)

0 commit comments

Comments
 (0)