Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions src/api/organization/project/branch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
)
from .....models.resources import ResourceLimitsPublic, ResourceType
from ...._util import Conflict, Forbidden, NotFound, Unauthenticated, url_path_for
from ...._util.backups import copy_branch_backup_schedules, delete_branch_backups, ensure_branch_pitr_schedule
from ...._util.backups import copy_branch_backup_schedules, ensure_branch_pitr_schedule
from ...._util.resourcelimit import (
apply_branch_resource_allocation,
check_available_resources_limits,
Expand All @@ -94,13 +94,14 @@
from ....settings import get_settings as get_api_settings
from .api_keys import api as api_key_api
from .auth import api as auth_api
from .control_tasks import (
from .tasks import (
_CONTROL_TO_POWER_STATE,
dispatch_control,
dispatch_delete,
dispatch_resize,
get_control_in_progress_status,
)
from .resize_tasks import dispatch_resize
from .tasks import task_api
from .tasks import api as task_api

api = APIRouter(tags=["branch"])

Expand Down Expand Up @@ -1682,31 +1683,34 @@ async def update(
@instance_api.delete(
"/",
name="organizations:projects:branch:delete",
status_code=204,
responses={401: Unauthenticated, 403: Forbidden, 404: NotFound},
status_code=202,
responses={401: Unauthenticated, 403: Forbidden, 404: NotFound, 400: {"description": "Delete already in progress"}},
)
async def delete(
session: SessionDep,
_organization: OrganizationDep,
_project: ProjectDep,
request: Request,
organization: OrganizationDep,
project: ProjectDep,
branch: BranchDep,
):
branch_id = branch.id
await _set_branch_status(session, branch, BranchServiceStatus.DELETING)
await delete_deployment(branch_id)
try:
await realm_admin("master").a_delete_realm(str(branch_id))
except KeycloakError as exc:
if getattr(exc, "response_code", None) == 404:
logger.error("Keycloak realm not found for branch %s during delete; continuing.", branch_id, exc_info=True)
else:
raise
await delete_branch_provisioning(session, branch_id)
await delete_branch_backups(session, branch_id)
await session.delete(branch)
if branch.delete_task_id is not None:
raise HTTPException(status_code=400, detail="A delete operation is already in progress")

task_id = dispatch_delete(str(branch.id))

branch.set_status(BranchServiceStatus.DELETING)
branch.delete_task_id = task_id
await session.commit()

return Response(status_code=204)
task_url = url_path_for(
request,
"organizations:projects:branch:tasks:detail",
organization_id=await organization.awaitable_attrs.id,
project_id=await project.awaitable_attrs.id,
branch_id=await branch.awaitable_attrs.id,
task_id=task_id,
)
return Response(status_code=202, headers={"Location": task_url})


@instance_api.post(
Expand Down
61 changes: 61 additions & 0 deletions src/api/organization/project/branch/delete_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Celery chord tasks for async branch deletion.

The chord dispatches three sub-tasks in parallel (K8s, Keycloak, backups) and
calls ``finalize_delete`` when all sub-tasks have settled. With
``task_chord_propagates = False`` the callback always fires, so the branch DB
record is removed regardless of individual sub-task outcomes (errors are logged
but do not abort the delete).
"""

import logging
from uuid import UUID

from asgiref.sync import async_to_sync
from celery import chord
from ulid import ULID

from .....database import AsyncSessionLocal
from .....deployment.delete import delete_backup_snapshots, delete_k8s_deployment, delete_keycloak_realm
from .....models.branch import Branch
from .....worker import app
from ...._util.resourcelimit import delete_branch_provisioning

logger = logging.getLogger(__name__)


async def _async_finalize_delete(job_results: list, branch_id: str) -> dict:
errors: list[str] = []
for result in job_results:
if isinstance(result, Exception):
errors.append(str(result))
logger.error("Delete sub-task error for branch %s: %s", branch_id, result)

branch_ulid = ULID.from_str(branch_id)
async with AsyncSessionLocal() as session:
branch = await session.get(Branch, branch_ulid)
if branch is None:
logger.warning("Branch %s not found in finalize_delete; already deleted.", branch_id)
return {"errors": errors}

await delete_branch_provisioning(session, branch_ulid, commit=False)
await session.delete(branch)
await session.commit()

return {"errors": errors}


@app.task(name="simplyblock.vela.branch.delete.finalize")
def finalize_delete(job_results: list, branch_id: str) -> dict:
"""Chord callback: delete provisioning records and the branch DB row."""
return async_to_sync(_async_finalize_delete)(job_results, branch_id)


def dispatch_delete(branch_id: str) -> UUID:
"""Dispatch the delete chord; return the chord result UUID."""
jobs = [
delete_k8s_deployment.s(branch_id),
delete_keycloak_realm.s(branch_id),
delete_backup_snapshots.s(branch_id),
]
result = chord(jobs)(finalize_delete.s(branch_id=branch_id))
return UUID(result.id)
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
"""Branch task list/detail endpoints.

Exposes Celery task state (resize and control) under:
GET .../branches/{branch_id}/tasks
GET .../branches/{branch_id}/tasks/{task_id}
"""

from datetime import datetime
from typing import Any
from typing import Any, Literal
from uuid import UUID

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

from ...._util import Forbidden, NotFound, Unauthenticated
from ....dependencies import BranchDep, OrganizationDep, ProjectDep
from .control_tasks import perform_control
from .resize_tasks import finalize_resize
from ....._util import Forbidden, NotFound, Unauthenticated
from .....dependencies import BranchDep, OrganizationDep, ProjectDep
from ._control import _CONTROL_TO_POWER_STATE as _CONTROL_TO_POWER_STATE
from ._control import dispatch_control as dispatch_control
from ._control import get_control_in_progress_status as get_control_in_progress_status
from ._control import perform_control
from ._delete import finalize_delete
from ._resize import dispatch_resize as dispatch_resize
from ._resize import finalize_resize

task_api = APIRouter(tags=["branch"])
api = APIRouter(tags=["branch"])

TaskType = Literal["control", "delete", "resize"]

_CELERY_STATE_TO_STATUS: dict[str, str] = {
"PENDING": "PENDING",
Expand All @@ -38,40 +38,31 @@ class BranchTaskPublic(BaseModel):
date_done: datetime | None


def _build_resize_task_public(task_id: UUID) -> BranchTaskPublic:
result = finalize_resize.AsyncResult(str(task_id))
state = result.state
status = _CELERY_STATE_TO_STATUS.get(state, state)
kwargs: dict = result.kwargs or {}
return BranchTaskPublic(
id=task_id,
task_type="resize",
status=status,
parameters=kwargs.get("effective_parameters", {}),
result=result.result if state == "SUCCESS" else None,
error=str(result.traceback) if state == "FAILURE" and result.traceback else None,
date_done=result.date_done,
)

def _build_task_public(task_id: UUID, task_type: TaskType) -> BranchTaskPublic:
tasks = {
"control": perform_control,
"delete": finalize_delete,
"resize": finalize_resize,
}
result = tasks[task_type].AsyncResult(str(task_id))

def _build_control_task_public(task_id: UUID) -> BranchTaskPublic:
result = perform_control.AsyncResult(str(task_id))
state = result.state
status = _CELERY_STATE_TO_STATUS.get(state, state)
kwargs: dict = result.kwargs or {}
action = kwargs.get("action", "control")
task_type = task_type if task_type != "control" else kwargs["action"]
parameters = {k: v for k, v in kwargs.items() if k not in {"branch_id", "action"}}
return BranchTaskPublic(
id=task_id,
task_type=action,
task_type=task_type,
status=status,
parameters={"action": action},
parameters=parameters,
result=result.result if state == "SUCCESS" else None,
error=str(result.traceback) if state == "FAILURE" and result.traceback else None,
date_done=result.date_done,
)


@task_api.get(
@api.get(
"/",
name="organizations:projects:branch:tasks:list",
response_model=list[BranchTaskPublic],
Expand All @@ -82,15 +73,15 @@ async def list_tasks(
_project: ProjectDep,
branch: BranchDep,
) -> list[BranchTaskPublic]:
tasks = []
if branch.resize_task_id is not None:
tasks.append(_build_resize_task_public(branch.resize_task_id))
if branch.control_task_id is not None:
tasks.append(_build_control_task_public(branch.control_task_id))
return tasks
tasks: list[tuple[UUID | None, TaskType]] = [
(branch.control_task_id, "control"),
(branch.delete_task_id, "delete"),
(branch.resize_task_id, "resize"),
]
return [_build_task_public(task_id, task_type) for task_id, task_type in tasks if task_id is not None]


@task_api.get(
@api.get(
"/{task_id}",
name="organizations:projects:branch:tasks:detail",
response_model=BranchTaskPublic,
Expand All @@ -103,7 +94,9 @@ async def get_task(
task_id: UUID,
) -> BranchTaskPublic:
if branch.resize_task_id == task_id:
return _build_resize_task_public(task_id)
return _build_task_public(task_id, "resize")
if branch.control_task_id == task_id:
return _build_control_task_public(task_id)
return _build_task_public(task_id, "control")
if branch.delete_task_id == task_id:
return _build_task_public(task_id, "delete")
raise HTTPException(status_code=404, detail="Task not found")
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
from sqlalchemy.exc import NoResultFound
from ulid import ULID

from .....database import AsyncSessionLocal
from .....deployment import get_autoscaler_vm_identity
from .....deployment.health import query_deployment_status
from .....deployment.kubernetes.neonvm import Phase, PowerState, get_neon_vm, set_virtualmachine_power_state
from .....models.branch import BranchServiceStatus
from .....models.branch import lookup as branch_lookup
from .....worker import app
from ......database import AsyncSessionLocal
from ......deployment import get_autoscaler_vm_identity
from ......deployment.health import query_deployment_status
from ......deployment.kubernetes.neonvm import Phase, PowerState, get_neon_vm, set_virtualmachine_power_state
from ......models.branch import BranchServiceStatus
from ......models.branch import lookup as branch_lookup
from ......worker import app

logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
from celery import chord
from ulid import ULID

from .....database import AsyncSessionLocal
from .....deployment.health import collect_branch_service_health, derive_branch_status_from_services
from .....deployment.resize import resize_cpu_memory, resize_database_pvc, resize_iops, resize_storage_pvc
from .....models.branch import Branch
from .....models.resources import ResourceLimitsPublic
from .....worker import app
from ...._util.resourcelimit import apply_branch_resource_allocation
from ......database import AsyncSessionLocal
from ......deployment.health import collect_branch_service_health, derive_branch_status_from_services
from ......deployment.resize import resize_cpu_memory, resize_database_pvc, resize_iops, resize_storage_pvc
from ......models.branch import Branch
from ......models.resources import ResourceLimitsPublic
from ......worker import app
from ....._util.resourcelimit import apply_branch_resource_allocation

logger = logging.getLogger(__name__)

Expand Down
54 changes: 54 additions & 0 deletions src/deployment/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Celery sub-tasks for async branch deletion (deployment layer).

These tasks handle the infrastructure-level teardown steps that can run in
parallel: K8s namespace/VM deletion, Keycloak realm removal, and backup
snapshot cleanup. The API layer's ``delete_tasks.finalize_delete`` chord
callback then removes the DB record after all sub-tasks have settled.
"""

import logging

from asgiref.sync import async_to_sync
from keycloak.exceptions import KeycloakError
from ulid import ULID

from ..api._util.backups import delete_branch_backups
from ..api.keycloak import realm_admin
from ..database import AsyncSessionLocal
from ..worker import app
from . import delete_deployment

logger = logging.getLogger(__name__)


async def _delete_keycloak(branch_id: ULID) -> None:
try:
await realm_admin("master").a_delete_realm(str(branch_id))
except KeycloakError as exc:
if getattr(exc, "response_code", None) == 404:
logger.warning("Keycloak realm not found for branch %s; skipping.", branch_id)
else:
raise


async def _delete_snapshots(branch_id: ULID) -> None:
async with AsyncSessionLocal() as session:
await delete_branch_backups(session, branch_id)


@app.task(name="simplyblock.vela.deployment.delete.k8s")
def delete_k8s_deployment(branch_id: str) -> None:
"""Delete the K8s namespace and associated VM for a branch."""
async_to_sync(delete_deployment)(ULID.from_str(branch_id))


@app.task(name="simplyblock.vela.deployment.delete.keycloak")
def delete_keycloak_realm(branch_id: str) -> None:
"""Delete the Keycloak realm for a branch (404 is treated as success)."""
async_to_sync(_delete_keycloak)(ULID.from_str(branch_id))


@app.task(name="simplyblock.vela.deployment.delete.backups")
def delete_backup_snapshots(branch_id: str) -> None:
"""Delete K8s volume snapshots for all backups belonging to a branch."""
async_to_sync(_delete_snapshots)(ULID.from_str(branch_id))
3 changes: 2 additions & 1 deletion src/models/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ class Branch(AsyncAttrs, Model, table=True):
sa_column=Column(JSONB, nullable=False, server_default=text("'{}'::jsonb")),
)
pitr_enabled: bool = Field(default=False, sa_column=Column(Boolean, nullable=False, server_default=text("false")))
control_task_id: uuid.UUID | None = None
delete_task_id: uuid.UUID | None = None
resize_task_id: uuid.UUID | None = None
db_port: int | None = None
control_task_id: uuid.UUID | None = None

__table_args__ = (UniqueConstraint("project_id", "name", name="unique_branch_name_per_project"),)

Expand Down
Loading
Loading