Skip to content

Commit 67b6a8c

Browse files
committed
fix: avoid sqlalchemy session concurrency
1 parent ca4340f commit 67b6a8c

5 files changed

Lines changed: 29 additions & 15 deletions

File tree

src/pypsa_app/backend/api/routes/runs.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@
3434
RunSummary,
3535
)
3636
from pypsa_app.backend.services.backend_registry import backend_registry
37+
from pypsa_app.backend.services.callback import (
38+
_build_payload,
39+
post_callback_sync,
40+
)
3741
from pypsa_app.backend.services.run import SnakedispatchClient, SnakedispatchError
38-
from pypsa_app.backend.services.callback import fire_callback_sync
3942
from pypsa_app.backend.services.sync import (
4043
SYNCED_STATUSES,
4144
sync_run_from_job,
@@ -292,9 +295,11 @@ def get_run(
292295
if needs_callback:
293296
# TODO: replace with proper async callback or
294297
# FastAPI BackgroundTasks.
298+
url = str(run.callback_url)
299+
payload = _build_payload(run)
295300
threading.Thread(
296-
target=fire_callback_sync,
297-
args=(run,),
301+
target=post_callback_sync,
302+
args=(url, payload),
298303
daemon=True,
299304
).start()
300305
except SnakedispatchError:
@@ -397,8 +402,10 @@ def cancel_run(
397402
if needs_callback:
398403
# TODO: replace with proper async callback or
399404
# FastAPI BackgroundTasks.
405+
url = str(run.callback_url)
406+
payload = _build_payload(run)
400407
threading.Thread(
401-
target=fire_callback_sync, args=(run,), daemon=True
408+
target=post_callback_sync, args=(url, payload), daemon=True
402409
).start()
403410
except SnakedispatchError as e:
404411
if e.status_code in (404, 409):
@@ -407,8 +414,10 @@ def cancel_run(
407414
db.commit()
408415
# TODO: replace with proper async callback or
409416
# FastAPI BackgroundTasks.
417+
url = str(run.callback_url)
418+
payload = _build_payload(run)
410419
threading.Thread(
411-
target=fire_callback_sync, args=(run,), daemon=True
420+
target=post_callback_sync, args=(url, payload), daemon=True
412421
).start()
413422
else:
414423
raise

src/pypsa_app/backend/schemas/run.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import uuid
44
from datetime import datetime
5-
65
from urllib.parse import urlparse
76

87
from pydantic import BaseModel, ConfigDict, Field, HttpUrl, field_validator

src/pypsa_app/backend/services/callback.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,26 @@ def _build_payload(run: Run) -> dict:
1313
return {"run_id": str(run.job_id), "status": run.status.value}
1414

1515

16-
def fire_callback_sync(run: Run) -> None:
17-
"""POST to the run's callback URL (blocking)."""
18-
if not run.callback_url:
19-
return
20-
url = str(run.callback_url)
21-
payload = _build_payload(run)
16+
def post_callback_sync(url: str, payload: dict) -> None:
17+
"""POST to a callback URL (blocking)."""
2218
try:
2319
httpx.post(url, json=payload, timeout=5.0, follow_redirects=False)
2420
except Exception:
2521
logger.warning(
2622
"Callback failed for run %s to %s",
27-
payload["run_id"],
23+
payload.get("run_id"),
2824
url,
2925
exc_info=True,
3026
)
3127

3228

29+
def fire_callback_sync(run: Run) -> None:
30+
"""POST to the run's callback URL (blocking)."""
31+
if not run.callback_url:
32+
return
33+
post_callback_sync(str(run.callback_url), _build_payload(run))
34+
35+
3336
async def fire_callback_async(url: str, payload: dict) -> None:
3437
"""POST to a callback URL (async). Used by the background sync loop."""
3538
try:

src/pypsa_app/backend/services/sync.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111

1212
from pypsa_app.backend.database import SessionLocal
1313
from pypsa_app.backend.models import Run, RunStatus
14-
from pypsa_app.backend.services.callback import fire_callback_async
1514
from pypsa_app.backend.services.backend_registry import backend_registry
15+
from pypsa_app.backend.services.callback import fire_callback_async
1616
from pypsa_app.backend.tasks import import_run_outputs_task
1717

1818
logger = logging.getLogger(__name__)
@@ -143,7 +143,9 @@ async def run_sync_loop(interval: float = 10.0) -> None:
143143
try:
144144
callbacks = await asyncio.to_thread(sync_non_terminal_runs)
145145
for cb in callbacks:
146-
task = asyncio.create_task(fire_callback_async(cb["url"], cb["payload"]))
146+
task = asyncio.create_task(
147+
fire_callback_async(cb["url"], cb["payload"])
148+
)
147149
_background_tasks.add(task)
148150
task.add_done_callback(_background_tasks.discard)
149151
except Exception:

src/pypsa_app/backend/tasks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from pypsa_app.backend.database import SessionLocal
1313
from pypsa_app.backend.models import Run, RunStatus, SnakedispatchBackend
1414
from pypsa_app.backend.schemas.task import TaskResultResponse
15+
from pypsa_app.backend.services.callback import fire_callback_sync
1516
from pypsa_app.backend.services.network import import_network_file
1617
from pypsa_app.backend.services.callback import fire_callback_sync
1718
from pypsa_app.backend.services.run import SnakedispatchClient

0 commit comments

Comments
 (0)