Skip to content

Commit 20f6b7e

Browse files
committed
Convert branch deletion to celery job
1 parent 3a6e4c2 commit 20f6b7e

9 files changed

Lines changed: 294 additions & 33 deletions

File tree

src/api/organization/project/branch/__init__.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
)
7474
from .....models.resources import ResourceLimitsPublic, ResourceType
7575
from ...._util import Conflict, Forbidden, NotFound, Unauthenticated, url_path_for
76-
from ...._util.backups import copy_branch_backup_schedules, delete_branch_backups, ensure_branch_pitr_schedule
76+
from ...._util.backups import copy_branch_backup_schedules, ensure_branch_pitr_schedule
7777
from ...._util.resourcelimit import (
7878
apply_branch_resource_allocation,
7979
check_available_resources_limits,
@@ -94,6 +94,7 @@
9494
from ....settings import get_settings as get_api_settings
9595
from .api_keys import api as api_key_api
9696
from .auth import api as auth_api
97+
from .delete_tasks import dispatch_delete
9798
from .resize_tasks import dispatch_resize
9899
from .tasks import task_api
99100

@@ -1653,31 +1654,34 @@ async def update(
16531654
@instance_api.delete(
16541655
"/",
16551656
name="organizations:projects:branch:delete",
1656-
status_code=204,
1657-
responses={401: Unauthenticated, 403: Forbidden, 404: NotFound},
1657+
status_code=202,
1658+
responses={401: Unauthenticated, 403: Forbidden, 404: NotFound, 400: {"description": "Delete already in progress"}},
16581659
)
16591660
async def delete(
16601661
session: SessionDep,
1661-
_organization: OrganizationDep,
1662-
_project: ProjectDep,
1662+
request: Request,
1663+
organization: OrganizationDep,
1664+
project: ProjectDep,
16631665
branch: BranchDep,
16641666
):
1665-
branch_id = branch.id
1666-
await _set_branch_status(session, branch, BranchServiceStatus.DELETING)
1667-
await delete_deployment(branch_id)
1668-
try:
1669-
await realm_admin("master").a_delete_realm(str(branch_id))
1670-
except KeycloakError as exc:
1671-
if getattr(exc, "response_code", None) == 404:
1672-
logger.error("Keycloak realm not found for branch %s during delete; continuing.", branch_id, exc_info=True)
1673-
else:
1674-
raise
1675-
await delete_branch_provisioning(session, branch_id)
1676-
await delete_branch_backups(session, branch_id)
1677-
await session.delete(branch)
1667+
if branch.delete_task_id is not None:
1668+
raise HTTPException(status_code=400, detail="A delete operation is already in progress")
1669+
1670+
task_id = dispatch_delete(str(branch.id))
1671+
1672+
branch.set_status(BranchServiceStatus.DELETING)
1673+
branch.delete_task_id = task_id
16781674
await session.commit()
16791675

1680-
return Response(status_code=204)
1676+
task_url = url_path_for(
1677+
request,
1678+
"organizations:projects:branch:tasks:detail",
1679+
organization_id=await organization.awaitable_attrs.id,
1680+
project_id=await project.awaitable_attrs.id,
1681+
branch_id=await branch.awaitable_attrs.id,
1682+
task_id=task_id,
1683+
)
1684+
return Response(status_code=202, headers={"Location": task_url})
16811685

16821686

16831687
@instance_api.post(
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""Celery chord tasks for async branch deletion.
2+
3+
The chord dispatches three sub-tasks in parallel (K8s, Keycloak, backups) and
4+
calls ``finalize_delete`` when all sub-tasks have settled. With
5+
``task_chord_propagates = False`` the callback always fires, so the branch DB
6+
record is removed regardless of individual sub-task outcomes (errors are logged
7+
but do not abort the delete).
8+
"""
9+
10+
import logging
11+
from uuid import UUID
12+
13+
from asgiref.sync import async_to_sync
14+
from celery import chord
15+
from ulid import ULID
16+
17+
from .....database import AsyncSessionLocal
18+
from .....deployment.delete import delete_backup_snapshots, delete_k8s_deployment, delete_keycloak_realm
19+
from .....models.branch import Branch
20+
from .....worker import app
21+
from ...._util.resourcelimit import delete_branch_provisioning
22+
23+
logger = logging.getLogger(__name__)
24+
25+
26+
async def _async_finalize_delete(job_results: list, branch_id: str) -> dict:
27+
errors: list[str] = []
28+
for result in job_results:
29+
if isinstance(result, Exception):
30+
errors.append(str(result))
31+
logger.error("Delete sub-task error for branch %s: %s", branch_id, result)
32+
33+
branch_ulid = ULID.from_str(branch_id)
34+
async with AsyncSessionLocal() as session:
35+
branch = await session.get(Branch, branch_ulid)
36+
if branch is None:
37+
logger.warning("Branch %s not found in finalize_delete; already deleted.", branch_id)
38+
return {"errors": errors}
39+
40+
await delete_branch_provisioning(session, branch_ulid, commit=False)
41+
await session.delete(branch)
42+
await session.commit()
43+
44+
return {"errors": errors}
45+
46+
47+
@app.task(name="simplyblock.vela.branch.delete.finalize")
48+
def finalize_delete(job_results: list, branch_id: str) -> dict:
49+
"""Chord callback: delete provisioning records and the branch DB row."""
50+
return async_to_sync(_async_finalize_delete)(job_results, branch_id)
51+
52+
53+
def dispatch_delete(branch_id: str) -> UUID:
54+
"""Dispatch the delete chord; return the chord result UUID."""
55+
jobs = [
56+
delete_k8s_deployment.s(branch_id),
57+
delete_keycloak_realm.s(branch_id),
58+
delete_backup_snapshots.s(branch_id),
59+
]
60+
result = chord(jobs)(finalize_delete.s(branch_id=branch_id))
61+
return UUID(result.id)

src/api/organization/project/branch/tasks.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Branch task list/detail endpoints.
22
3-
Exposes Celery task state (currently resize only) under:
3+
Exposes Celery task state (resize and delete) under:
44
GET .../branches/{branch_id}/tasks
55
GET .../branches/{branch_id}/tasks/{task_id}
66
"""
@@ -14,6 +14,7 @@
1414

1515
from ...._util import Forbidden, NotFound, Unauthenticated
1616
from ....dependencies import BranchDep, OrganizationDep, ProjectDep
17+
from .delete_tasks import finalize_delete
1718
from .resize_tasks import finalize_resize
1819

1920
task_api = APIRouter(tags=["branch"])
@@ -37,16 +38,20 @@ class BranchTaskPublic(BaseModel):
3738
date_done: datetime | None
3839

3940

40-
def _build_task_public(task_id: UUID) -> BranchTaskPublic:
41-
result = finalize_resize.AsyncResult(str(task_id))
41+
def _build_task_public(task_id: UUID, task_type: str) -> BranchTaskPublic:
42+
if task_type == "delete":
43+
result = finalize_delete.AsyncResult(str(task_id))
44+
else:
45+
result = finalize_resize.AsyncResult(str(task_id))
4246
state = result.state
4347
status = _CELERY_STATE_TO_STATUS.get(state, state)
4448
kwargs: dict = result.kwargs or {}
49+
parameters = kwargs.get("effective_parameters", {}) if task_type == "resize" else {}
4550
return BranchTaskPublic(
4651
id=task_id,
47-
task_type="resize",
52+
task_type=task_type,
4853
status=status,
49-
parameters=kwargs.get("effective_parameters", {}),
54+
parameters=parameters,
5055
result=result.result if state == "SUCCESS" else None,
5156
error=str(result.traceback) if state == "FAILURE" and result.traceback else None,
5257
date_done=result.date_done,
@@ -64,9 +69,12 @@ async def list_tasks(
6469
_project: ProjectDep,
6570
branch: BranchDep,
6671
) -> list[BranchTaskPublic]:
67-
if branch.resize_task_id is None:
68-
return []
69-
return [_build_task_public(branch.resize_task_id)]
72+
tasks = []
73+
if branch.resize_task_id is not None:
74+
tasks.append(_build_task_public(branch.resize_task_id, "resize"))
75+
if branch.delete_task_id is not None:
76+
tasks.append(_build_task_public(branch.delete_task_id, "delete"))
77+
return tasks
7078

7179

7280
@task_api.get(
@@ -81,6 +89,8 @@ async def get_task(
8189
branch: BranchDep,
8290
task_id: UUID,
8391
) -> BranchTaskPublic:
84-
if branch.resize_task_id != task_id:
85-
raise HTTPException(status_code=404, detail="Task not found")
86-
return _build_task_public(task_id)
92+
if branch.resize_task_id == task_id:
93+
return _build_task_public(task_id, "resize")
94+
if branch.delete_task_id == task_id:
95+
return _build_task_public(task_id, "delete")
96+
raise HTTPException(status_code=404, detail="Task not found")

src/deployment/delete.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""Celery sub-tasks for async branch deletion (deployment layer).
2+
3+
These tasks handle the infrastructure-level teardown steps that can run in
4+
parallel: K8s namespace/VM deletion, Keycloak realm removal, and backup
5+
snapshot cleanup. The API layer's ``delete_tasks.finalize_delete`` chord
6+
callback then removes the DB record after all sub-tasks have settled.
7+
"""
8+
9+
import logging
10+
11+
from asgiref.sync import async_to_sync
12+
from keycloak.exceptions import KeycloakError
13+
from ulid import ULID
14+
15+
from ..api._util.backups import delete_branch_backups
16+
from ..api.keycloak import realm_admin
17+
from ..database import AsyncSessionLocal
18+
from ..worker import app
19+
from . import delete_deployment
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
async def _delete_keycloak(branch_id: ULID) -> None:
25+
try:
26+
await realm_admin("master").a_delete_realm(str(branch_id))
27+
except KeycloakError as exc:
28+
if getattr(exc, "response_code", None) == 404:
29+
logger.warning("Keycloak realm not found for branch %s; skipping.", branch_id)
30+
else:
31+
raise
32+
33+
34+
async def _delete_snapshots(branch_id: ULID) -> None:
35+
async with AsyncSessionLocal() as session:
36+
await delete_branch_backups(session, branch_id)
37+
38+
39+
@app.task(name="simplyblock.vela.deployment.delete.k8s")
40+
def delete_k8s_deployment(branch_id: str) -> None:
41+
"""Delete the K8s namespace and associated VM for a branch."""
42+
async_to_sync(delete_deployment)(ULID.from_str(branch_id))
43+
44+
45+
@app.task(name="simplyblock.vela.deployment.delete.keycloak")
46+
def delete_keycloak_realm(branch_id: str) -> None:
47+
"""Delete the Keycloak realm for a branch (404 is treated as success)."""
48+
async_to_sync(_delete_keycloak)(ULID.from_str(branch_id))
49+
50+
51+
@app.task(name="simplyblock.vela.deployment.delete.backups")
52+
def delete_backup_snapshots(branch_id: str) -> None:
53+
"""Delete K8s volume snapshots for all backups belonging to a branch."""
54+
async_to_sync(_delete_snapshots)(ULID.from_str(branch_id))

src/models/branch.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from enum import StrEnum
44
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, Optional
55

6-
import sqlalchemy as sa
76
from pydantic import BaseModel, model_validator
87
from pydantic import Field as PydanticField
98
from sqlalchemy import BigInteger, Boolean, Column, String, Text, UniqueConstraint, text
@@ -109,7 +108,8 @@ class Branch(AsyncAttrs, Model, table=True):
109108
)
110109
pitr_enabled: bool = Field(default=False, sa_column=Column(Boolean, nullable=False, server_default=text("false")))
111110
resize_task_id: uuid.UUID | None = Field(default=None, nullable=True)
112-
db_port: int | None = Field(default=None, sa_column=Column(sa.Integer, nullable=True))
111+
delete_task_id: uuid.UUID | None = None
112+
db_port: int | None = None
113113

114114
__table_args__ = (UniqueConstraint("project_id", "name", name="unique_branch_name_per_project"),)
115115

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Add branch delete_task_id
2+
3+
Revision ID: b8e4c9f12d35
4+
Revises: a1b2c3d4e5f6
5+
Create Date: 2026-04-01 00:00:00.000000
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = 'b8e4c9f12d35'
16+
down_revision: Union[str, Sequence[str], None] = 'a1b2c3d4e5f6'
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
"""Upgrade schema."""
23+
op.add_column('branch', sa.Column('delete_task_id', sa.Uuid(), nullable=True))
24+
25+
26+
def downgrade() -> None:
27+
"""Downgrade schema."""
28+
op.drop_column('branch', 'delete_task_id')

src/worker/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,7 @@ class Settings(BaseSettings):
2020
app.conf.task_chord_propagates = False
2121

2222
# Register tasks — must be imported after `app` is defined.
23+
from ..api.organization.project.branch import delete_tasks as _api_delete_tasks # noqa: E402, F401
2324
from ..api.organization.project.branch import resize_tasks as _api_resize_tasks # noqa: E402, F401
25+
from ..deployment import delete as _deployment_delete # noqa: E402, F401
2426
from ..deployment import resize as _deployment_resize # noqa: E402, F401

tests/branches/test_delete.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
"""Integration tests for async branch deletion.
2+
3+
These tests verify:
4+
- DELETE /branches/{id}/ returns 202 Accepted with a Location header
5+
- A concurrent DELETE returns 400 while the first delete is in progress
6+
- The branch is eventually gone (GET returns 404) after the task completes
7+
"""
8+
9+
import time
10+
11+
import pytest
12+
from conftest import BRANCH_TIMEOUT_SEC, wait_for_deletion
13+
14+
pytestmark = pytest.mark.branch
15+
16+
_POLL_INTERVAL = 10
17+
18+
_state: dict = {}
19+
20+
21+
@pytest.fixture(scope="module")
22+
def org(make_org):
23+
return make_org("test-org-delete")
24+
25+
26+
@pytest.fixture(scope="module")
27+
def project(make_project, org):
28+
return make_project(org, "test-project-delete")
29+
30+
31+
@pytest.fixture(scope="module")
32+
def branch_id(make_branch, org, project):
33+
return make_branch(org, project, "test-branch-delete")
34+
35+
36+
def test_delete_returns_202_with_location(client, org, project, branch_id):
37+
r = client.delete(f"organizations/{org}/projects/{project}/branches/{branch_id}/")
38+
assert r.status_code == 202
39+
assert "Location" in r.headers
40+
_state["task_url"] = r.headers["Location"]
41+
42+
43+
def test_task_listed_while_deleting(client, org, project, branch_id):
44+
r = client.get(f"organizations/{org}/projects/{project}/branches/{branch_id}/tasks/")
45+
if r.status_code == 404:
46+
pytest.skip("Branch already deleted before task listing test could run")
47+
assert r.status_code == 200
48+
tasks = r.json()
49+
assert any(t["task_type"] == "delete" for t in tasks)
50+
51+
52+
def test_concurrent_delete_returns_400(client, org, project, branch_id):
53+
# While the branch has delete_task_id set the endpoint must reject a second delete.
54+
# The branch may have been deleted already in fast CI environments (404 is also acceptable).
55+
r = client.delete(f"organizations/{org}/projects/{project}/branches/{branch_id}/")
56+
assert r.status_code in (400, 404)
57+
58+
59+
def test_branch_gone_after_delete(client, org, project, branch_id):
60+
# Poll until the branch returns 404 — the Celery task has completed and deleted it.
61+
wait_for_deletion(
62+
client,
63+
f"organizations/{org}/projects/{project}/branches/{branch_id}/",
64+
BRANCH_TIMEOUT_SEC,
65+
)
66+
r = client.get(f"organizations/{org}/projects/{project}/branches/{branch_id}/")
67+
assert r.status_code == 404
68+
69+
70+
def test_task_detail_unavailable_after_deletion(client):
71+
task_url = _state.get("task_url")
72+
if not task_url:
73+
pytest.skip("No task URL stored from delete test")
74+
# Once the branch is gone the task detail endpoint returns 404 (BranchDep lookup fails).
75+
deadline = time.monotonic() + BRANCH_TIMEOUT_SEC
76+
while True:
77+
r = client.get(task_url, timeout=30)
78+
if r.status_code == 404:
79+
return
80+
status = r.json().get("status") if r.status_code == 200 else None
81+
if status in ("COMPLETED", "FAILED"):
82+
return
83+
if time.monotonic() >= deadline:
84+
raise TimeoutError(f"Timed out waiting for task to complete; last status={status!r}")
85+
time.sleep(_POLL_INTERVAL)

0 commit comments

Comments
 (0)