Skip to content

Commit 0abece6

Browse files
Add dataset seeding with Supabase storage and worker automation
- Seed script now creates datasets from ensure_datasets and uploads to Supabase storage - Worker automatically scans for pending simulations every 60 seconds - Downloads datasets from storage, runs simulations, marks complete - Comprehensive logfire instrumentation in worker for step tracking - Storage policies migration for datasets bucket access
1 parent aeb4aae commit 0abece6

7 files changed

Lines changed: 489 additions & 905 deletions

File tree

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "policyengine-api-v2"
33
version = "0.1.0"
44
description = "FastAPI service for PolicyEngine microsimulations"
55
readme = "README.md"
6-
requires-python = ">=3.11"
6+
requires-python = ">=3.13"
77
dependencies = [
88
"fastapi>=0.115.0",
99
"uvicorn[standard]>=0.32.0",
@@ -13,7 +13,7 @@ dependencies = [
1313
"storage3>=0.8.1",
1414
"celery[redis]>=5.4.0",
1515
"redis>=5.1.1",
16-
"policyengine>=0.1.0",
16+
"policyengine>=3.1.5",
1717
"policyengine-uk>=2.0.0",
1818
"policyengine-us>=1.0.0",
1919
"pydantic>=2.9.2",
@@ -22,6 +22,7 @@ dependencies = [
2222
"tables>=3.10.2",
2323
"logfire[fastapi,httpx,sqlalchemy]>=0.60.0",
2424
"fastapi-cache2>=0.2.1",
25+
"boto3>=1.41.1",
2526
]
2627

2728
[project.optional-dependencies]

scripts/seed.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,18 @@
1414
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
1515

1616
from policyengine.tax_benefit_models.uk import uk_latest
17+
from policyengine.tax_benefit_models.uk.datasets import ensure_datasets as ensure_uk_datasets
1718
from policyengine.tax_benefit_models.us import us_latest
19+
from policyengine.tax_benefit_models.us.datasets import ensure_datasets as ensure_us_datasets
1820
from policyengine_api.models import (
1921
TaxBenefitModel,
2022
TaxBenefitModelVersion,
2123
Variable,
2224
Parameter,
2325
ParameterValue,
26+
Dataset,
2427
)
28+
from policyengine_api.services.storage import upload_dataset
2529
from sqlmodel import Session, create_engine, select
2630
from policyengine_api.config.settings import settings
2731
from rich.console import Console
@@ -140,6 +144,73 @@ def seed_model(model_version, session) -> TaxBenefitModelVersion:
140144
return db_version
141145

142146

147+
def seed_datasets(session):
148+
"""Seed datasets and upload to S3."""
149+
console.print("[bold blue]Seeding datasets...")
150+
151+
# UK datasets
152+
console.print(" Creating UK datasets...")
153+
uk_datasets = ensure_uk_datasets()
154+
155+
for _, pe_dataset in track(list(uk_datasets.items()), description="UK datasets"):
156+
# Check if dataset already exists
157+
existing = session.exec(
158+
select(Dataset).where(Dataset.name == pe_dataset.name)
159+
).first()
160+
161+
if existing:
162+
console.print(f" Dataset {pe_dataset.name} already exists, skipping")
163+
continue
164+
165+
# Upload to S3
166+
object_name = upload_dataset(pe_dataset.filepath)
167+
console.print(f" Uploaded {pe_dataset.filepath} to S3 as {object_name}")
168+
169+
# Create database record
170+
db_dataset = Dataset(
171+
name=pe_dataset.name,
172+
description=pe_dataset.description,
173+
filepath=object_name, # Store S3 key, not local path
174+
year=pe_dataset.year,
175+
tax_benefit_model="uk_latest",
176+
)
177+
session.add(db_dataset)
178+
session.commit()
179+
console.print(f" [green]✓[/green] Created dataset: {db_dataset.name}")
180+
181+
# US datasets
182+
console.print(" Creating US datasets...")
183+
us_datasets = ensure_us_datasets()
184+
185+
for _, pe_dataset in track(list(us_datasets.items()), description="US datasets"):
186+
# Check if dataset already exists
187+
existing = session.exec(
188+
select(Dataset).where(Dataset.name == pe_dataset.name)
189+
).first()
190+
191+
if existing:
192+
console.print(f" Dataset {pe_dataset.name} already exists, skipping")
193+
continue
194+
195+
# Upload to S3
196+
object_name = upload_dataset(pe_dataset.filepath)
197+
console.print(f" Uploaded {pe_dataset.filepath} to S3 as {object_name}")
198+
199+
# Create database record
200+
db_dataset = Dataset(
201+
name=pe_dataset.name,
202+
description=pe_dataset.description,
203+
filepath=object_name, # Store S3 key, not local path
204+
year=pe_dataset.year,
205+
tax_benefit_model="us_latest",
206+
)
207+
session.add(db_dataset)
208+
session.commit()
209+
console.print(f" [green]✓[/green] Created dataset: {db_dataset.name}")
210+
211+
console.print(f"[green]✓[/green] Seeded {len(uk_datasets) + len(us_datasets)} datasets\n")
212+
213+
143214
def main():
144215
"""Main seed function."""
145216
console.print("[bold green]PolicyEngine database seeding[/bold green]\n")
@@ -153,8 +224,10 @@ def main():
153224
us_version = seed_model(us_latest, session)
154225
console.print(f"[green]✓[/green] US model seeded: {us_version.id}\n")
155226

227+
# Seed datasets
228+
seed_datasets(session)
229+
156230
console.print("\n[bold green]✓ Database seeding complete![/bold green]")
157-
console.print("\n[yellow]Note:[/yellow] Dataset creation skipped. To add datasets, upload H5 files via the API.")
158231

159232

160233
if __name__ == "__main__":

src/policyengine_api/services/storage.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,22 @@ def upload_dataset(file_path: str, object_name: str | None = None) -> str:
2020
object_name: Name to store in bucket (defaults to filename)
2121
2222
Returns:
23-
Public URL of uploaded file
23+
Object name (key) in storage
2424
"""
2525
supabase = get_supabase_client()
2626

2727
if object_name is None:
2828
object_name = Path(file_path).name
2929

30-
# Upload file
30+
# Upload file using Supabase storage client
3131
with open(file_path, "rb") as f:
3232
supabase.storage.from_(settings.storage_bucket).upload(
33-
object_name, f, {"content-type": "application/octet-stream"}
33+
object_name,
34+
f,
35+
{"content-type": "application/octet-stream", "upsert": "true"}
3436
)
3537

36-
# Get public URL
37-
url = supabase.storage.from_(settings.storage_bucket).get_public_url(object_name)
38-
return url
38+
return object_name
3939

4040

4141
def download_dataset(object_name: str, local_path: str) -> str:
@@ -50,7 +50,10 @@ def download_dataset(object_name: str, local_path: str) -> str:
5050
"""
5151
supabase = get_supabase_client()
5252

53-
# Download file
53+
# Ensure parent directory exists
54+
Path(local_path).parent.mkdir(parents=True, exist_ok=True)
55+
56+
# Download file using Supabase storage client
5457
data = supabase.storage.from_(settings.storage_bucket).download(object_name)
5558

5659
# Save locally

src/policyengine_api/tasks/celery_app.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from celery import Celery
2+
from celery.schedules import crontab
23
import logfire
34

45
from policyengine_api.config.settings import settings
@@ -23,4 +24,10 @@
2324
result_serializer="json",
2425
timezone="UTC",
2526
enable_utc=True,
27+
beat_schedule={
28+
"scan-pending-simulations": {
29+
"task": "scan_pending_simulations",
30+
"schedule": 60.0, # Run every 60 seconds
31+
},
32+
},
2633
)

0 commit comments

Comments
 (0)