Skip to content

Commit 989b751

Browse files
committed
celery task for WAL cleanup
1 parent 5ed89b4 commit 989b751

3 files changed

Lines changed: 213 additions & 0 deletions

File tree

src/deployment/wal_cleanup.py

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
"""Celery tasks that periodically checks WAL PVC utilization and perform
2+
cleanup of WAL.
3+
4+
The task fetches all PITR-enabled branches and dispatches an independent
5+
`wal_cleanup_branch` sub-task for each one.
6+
"""
7+
8+
import contextlib
9+
import logging
10+
import time
11+
from functools import lru_cache
12+
from typing import cast
13+
14+
import asyncpg
15+
from asgiref.sync import async_to_sync
16+
from asyncpg import exceptions as asyncpg_exceptions
17+
from kubernetes_asyncio import client as k8s_client
18+
from kubernetes_asyncio.client.exceptions import ApiException
19+
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
20+
from sqlmodel import select
21+
from ulid import ULID
22+
23+
from ..exceptions import (
24+
VelaDeploymentError,
25+
VelaKubernetesError,
26+
VelaSimplyblockAPIError,
27+
VelaWALPVCError,
28+
)
29+
from ..models.branch import Branch
30+
from ..worker import app
31+
from . import (
32+
AUTOSCALER_PVC_SUFFIX,
33+
branch_db_domain,
34+
get_autoscaler_vm_identity,
35+
resolve_autoscaler_wal_volume_identifiers,
36+
)
37+
from .kubernetes._util import _ensure_kubeconfig
38+
from .kubernetes.neonvm import resolve_autoscaler_vm_pod_name
39+
from .kubernetes.snapshot import create_snapshot_from_pvc
40+
from .simplyblock_api import create_simplyblock_api
41+
42+
logger = logging.getLogger(__name__)
43+
44+
WAL_UTILIZATION_THRESHOLD = 0.90
45+
46+
47+
@lru_cache
48+
def _session_factory() -> async_sessionmaker:
49+
from ..api.settings import get_settings
50+
51+
engine = create_async_engine(
52+
str(get_settings().postgres_url),
53+
echo=False,
54+
pool_pre_ping=True,
55+
pool_recycle=3600,
56+
)
57+
return async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
58+
59+
60+
async def _get_wal_utilization(branch: Branch) -> float:
61+
"""Return WAL PVC utilization as a ratio (0.0–1.0)."""
62+
namespace, _ = get_autoscaler_vm_identity(branch.id)
63+
try:
64+
volume_uuid, _ = await resolve_autoscaler_wal_volume_identifiers(namespace)
65+
except (VelaDeploymentError, ApiException) as exc:
66+
raise VelaWALPVCError(f"Failed to resolve WAL volume identifiers for branch {branch.id}") from exc
67+
68+
try:
69+
async with create_simplyblock_api() as sb_api:
70+
iostats = await sb_api.volume_iostats(volume=volume_uuid)
71+
volume = await sb_api.get_volume(volume=volume_uuid)
72+
except VelaSimplyblockAPIError as exc:
73+
raise VelaWALPVCError(f"Failed to fetch WAL volume stats for branch {branch.id}") from exc
74+
75+
size_used: int = iostats.get("size_used", 0)
76+
size_total: int = volume.size
77+
if size_total == 0:
78+
raise VelaWALPVCError(f"WAL volume for branch {branch.id} reported size 0")
79+
return size_used / size_total
80+
81+
82+
async def _cleanup_wal(branch: Branch) -> None:
83+
"""
84+
1. Get the SAFE WAL file name
85+
2. Take the snapshot of DATA PVC
86+
3. Trigger cleanup using `pg_archivecleanup` binary by perform kubectl exec
87+
"""
88+
db_host = branch_db_domain(branch.id)
89+
connection = None
90+
try:
91+
connection = await asyncpg.connect(
92+
user="supabase_admin",
93+
password=branch.database_password,
94+
database=branch.database,
95+
host=db_host,
96+
port=5432,
97+
server_settings={"application_name": "vela-wal-compact"},
98+
command_timeout=10,
99+
)
100+
safe_wal = await connection.fetchval("SELECT pg_walfile_name(redo_lsn) FROM pg_control_checkpoint();")
101+
except (asyncpg_exceptions.PostgresError, OSError):
102+
logger.exception("Failed to connect to database for branch %s to determine safe WAL", branch.id)
103+
return
104+
finally:
105+
with contextlib.suppress(Exception):
106+
if connection is not None:
107+
await connection.close()
108+
109+
if not safe_wal:
110+
logger.warning("Safe WAL query returned null for branch %s", branch.id)
111+
return
112+
113+
namespace, vm_name = get_autoscaler_vm_identity(branch.id)
114+
pvc_name = f"{vm_name}{AUTOSCALER_PVC_SUFFIX}"
115+
snapshot_name = f"{str(branch.id).lower()}-compact-{int(time.time())}"[:63]
116+
117+
try:
118+
await create_snapshot_from_pvc(
119+
namespace=namespace,
120+
name=snapshot_name,
121+
snapshot_class="simplyblock-csi-snapshotclass",
122+
pvc_name=pvc_name,
123+
)
124+
logger.info("Created WAL compaction snapshot %s for branch %s", snapshot_name, branch.id)
125+
except ApiException:
126+
logger.exception("Failed to create snapshot for branch %s before WAL cleanup", branch.id)
127+
return
128+
129+
try:
130+
from kubernetes_asyncio.stream import WsApiClient
131+
132+
pod_name = await resolve_autoscaler_vm_pod_name(namespace, vm_name)
133+
cmd = ["ssh", "guest-vm", "pg_archivecleanup", "/var/lib/postgresql/wal/pg_wal", safe_wal]
134+
await _ensure_kubeconfig()
135+
async with WsApiClient() as ws_api:
136+
core_v1 = k8s_client.CoreV1Api(api_client=ws_api)
137+
resp = await core_v1.connect_get_namespaced_pod_exec(
138+
pod_name,
139+
namespace,
140+
command=cast("str", cmd),
141+
stderr=True,
142+
stdin=False,
143+
stdout=True,
144+
tty=False,
145+
)
146+
logger.info(
147+
"pg_archivecleanup for branch %s up to %s completed. Output: %s",
148+
branch.id,
149+
safe_wal,
150+
resp,
151+
)
152+
except (ApiException, RuntimeError, VelaKubernetesError):
153+
logger.warning("Failed to run pg_archivecleanup for branch %s", branch.id, exc_info=True)
154+
155+
156+
async def _wal_cleanup_branch(branch_id: ULID) -> None:
157+
async with _session_factory()() as db:
158+
branch = await db.get(Branch, branch_id)
159+
160+
if branch is None:
161+
logger.warning("Branch %s not found, skipping WAL cleanup", branch_id)
162+
return
163+
164+
try:
165+
utilization = await _get_wal_utilization(branch)
166+
except VelaWALPVCError as exc:
167+
logger.error("%s", exc)
168+
return
169+
170+
logger.info("WAL PVC utilization for branch %s: %.1f%%", branch.id, utilization * 100)
171+
172+
if utilization >= WAL_UTILIZATION_THRESHOLD:
173+
logger.warning(
174+
"WAL PVC for branch %s at %.1f%% — triggering compaction",
175+
branch.id,
176+
utilization * 100,
177+
)
178+
await _cleanup_wal(branch)
179+
180+
181+
async def _wal_cleanup() -> None:
182+
async with _session_factory()() as db:
183+
result = await db.execute(select(Branch).where(Branch.pitr_enabled == True)) # noqa: E712
184+
branch_ids = [b.id for b in result.scalars().all()]
185+
186+
logger.info("WAL cleanup: dispatching tasks for %d PITR-enabled branches", len(branch_ids))
187+
for branch_id in branch_ids:
188+
# sub-tasks run in parallel across the worker pool.
189+
wal_cleanup_branch.delay(str(branch_id))
190+
191+
192+
@app.task(name="simplyblock.vela.deployment.wal_cleanup.wal_cleanup_branch")
193+
def wal_cleanup_branch(branch_id: str) -> None:
194+
"""Check WAL PVC utilization for a single branch and compact if >= 90%."""
195+
async_to_sync(_wal_cleanup_branch)(ULID.from_str(branch_id))
196+
197+
198+
@app.task(name="simplyblock.vela.deployment.wal_cleanup.wal_cleanup")
199+
def wal_cleanup() -> None:
200+
"""Periodic beat task: dispatch a wal_cleanup_branch task for every PITR-enabled branch."""
201+
async_to_sync(_wal_cleanup)()

src/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,9 @@ class VelaSnapshotTimeoutError(VelaDeploymentError, TimeoutError):
3030
"""Timed out waiting for a snapshot operation to complete"""
3131

3232

33+
class VelaWALPVCError(VelaDeploymentError):
34+
"""Expected WAL PVC is missing or unusable"""
35+
36+
3337
class VelaSimplyblockAPIError(VelaError):
3438
"""Error occured while interacting with Simplyblock API"""

src/worker/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ class Settings(BaseSettings):
2121

2222
app.conf.beat_schedule_filename = "/tmp/celerybeat-schedule"
2323

24+
app.conf.beat_schedule = {
25+
"wal-cleanup-every-5-minutes": {
26+
"task": "simplyblock.vela.deployment.wal_cleanup.wal_cleanup",
27+
"schedule": 300.0,
28+
},
29+
}
30+
2431
# Register tasks — must be imported after `app` is defined.
2532
from ..api.organization.project.branch import resize_tasks as _api_resize_tasks # noqa: E402, F401
2633
from ..deployment import resize as _deployment_resize # noqa: E402, F401
34+
from ..deployment import wal_cleanup as _deployment_wal_cleanup # noqa: E402, F401

0 commit comments

Comments
 (0)