Skip to content

Commit c8ed526

Browse files
committed
celery task for WAL cleanup
1 parent 7ccf700 commit c8ed526

File tree

3 files changed

+203
-2
lines changed

3 files changed

+203
-2
lines changed

chart/templates/controller/beat.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ spec:
2323
- name: vela-controller-beat
2424
image: "{{ .Values.controller.image.repository }}:{{ .Values.controller.image.tag }}"
2525
imagePullPolicy: Always
26-
command: ["celery", "-A", "simplyblock.vela.worker", "beat", "--loglevel=info"]
26+
command: ["celery", "-A", "simplyblock.vela.worker", "beat", "--loglevel=info", "--schedule=/tmp/celerybeat-schedule"]
2727
envFrom:
2828
- configMapRef:
2929
name: vela-controller-config

src/deployment/wal_cleanup.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
"""Celery tasks for periodic WAL PVC utilization check and compaction.
2+
3+
The beat task `wal_cleanup` runs every 5 minutes, fetches all PITR-enabled
4+
branches, and dispatches an independent `wal_cleanup_branch` sub-task for
5+
each one. Sub-tasks run in parallel across the worker pool.
6+
"""
7+
8+
import contextlib
9+
import logging
10+
import time
11+
from functools import lru_cache
12+
from typing import cast
13+
14+
import asyncpg
15+
from asgiref.sync import async_to_sync
16+
from asyncpg import exceptions as asyncpg_exceptions
17+
from kubernetes_asyncio import client as k8s_client
18+
from kubernetes_asyncio.client.exceptions import ApiException
19+
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
20+
from sqlmodel import select
21+
from ulid import ULID
22+
23+
from ..exceptions import VelaDeploymentError, VelaSimplyblockAPIError
24+
from ..models.branch import Branch
25+
from ..worker import app
26+
from . import (
27+
AUTOSCALER_PVC_SUFFIX,
28+
branch_db_domain,
29+
get_autoscaler_vm_identity,
30+
resolve_autoscaler_wal_volume_identifiers,
31+
)
32+
from .kubernetes._util import _ensure_kubeconfig
33+
from .kubernetes.neonvm import resolve_autoscaler_vm_pod_name
34+
from .kubernetes.snapshot import create_snapshot_from_pvc
35+
from .simplyblock_api import create_simplyblock_api
36+
37+
logger = logging.getLogger(__name__)
38+
39+
WAL_UTILIZATION_THRESHOLD = 0.90
40+
41+
42+
@lru_cache
43+
def _session_factory() -> async_sessionmaker:
44+
from ..api.settings import get_settings
45+
46+
engine = create_async_engine(
47+
str(get_settings().postgres_url),
48+
echo=False,
49+
pool_pre_ping=True,
50+
pool_recycle=3600,
51+
)
52+
return async_sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
53+
54+
55+
async def _get_wal_utilization(branch: Branch) -> float | None:
56+
"""Return WAL PVC utilization as a ratio (0.0–1.0), or None if unavailable."""
57+
namespace, _ = get_autoscaler_vm_identity(branch.id)
58+
try:
59+
volume_uuid, _ = await resolve_autoscaler_wal_volume_identifiers(namespace)
60+
except (VelaDeploymentError, ApiException):
61+
logger.warning("Could not resolve WAL volume identifiers for branch %s", branch.id)
62+
return None
63+
64+
try:
65+
async with create_simplyblock_api() as sb_api:
66+
iostats = await sb_api.volume_iostats(volume=volume_uuid)
67+
volume = await sb_api.get_volume(volume=volume_uuid)
68+
except VelaSimplyblockAPIError:
69+
logger.warning("Could not fetch WAL volume stats for branch %s", branch.id)
70+
return None
71+
72+
size_used: int = iostats.get("size_used", 0)
73+
size_total: int = volume.size
74+
if size_total == 0:
75+
return None
76+
return size_used / size_total
77+
78+
79+
async def _compact_wal(branch: Branch) -> None:
80+
"""Compact WAL for a branch — mirrors the logic in the /compactwal API endpoint."""
81+
db_host = branch_db_domain(branch.id)
82+
connection = None
83+
try:
84+
connection = await asyncpg.connect(
85+
user="supabase_admin",
86+
password=branch.database_password,
87+
database=branch.database,
88+
host=db_host,
89+
port=5432,
90+
server_settings={"application_name": "vela-wal-compact"},
91+
command_timeout=10,
92+
)
93+
safe_wal = await connection.fetchval("SELECT pg_walfile_name(redo_lsn) FROM pg_control_checkpoint();")
94+
except (asyncpg_exceptions.PostgresError, OSError):
95+
logger.exception("Failed to connect to database for branch %s to determine safe WAL", branch.id)
96+
return
97+
finally:
98+
with contextlib.suppress(Exception):
99+
if connection is not None:
100+
await connection.close()
101+
102+
if not safe_wal:
103+
logger.warning("Safe WAL query returned null for branch %s", branch.id)
104+
return
105+
106+
namespace, vm_name = get_autoscaler_vm_identity(branch.id)
107+
pvc_name = f"{vm_name}{AUTOSCALER_PVC_SUFFIX}"
108+
snapshot_name = f"{str(branch.id).lower()}-compact-{int(time.time())}"[:63]
109+
110+
try:
111+
await create_snapshot_from_pvc(
112+
namespace=namespace,
113+
name=snapshot_name,
114+
snapshot_class="simplyblock-csi-snapshotclass",
115+
pvc_name=pvc_name,
116+
)
117+
logger.info("Created WAL compaction snapshot %s for branch %s", snapshot_name, branch.id)
118+
except ApiException:
119+
logger.exception("Failed to create snapshot for branch %s before WAL cleanup", branch.id)
120+
return
121+
122+
try:
123+
from kubernetes_asyncio.stream import WsApiClient
124+
125+
pod_name = await resolve_autoscaler_vm_pod_name(namespace, vm_name)
126+
cmd = ["ssh", "guest-vm", "pg_archivecleanup", "/var/lib/postgresql/wal/pg_wal", safe_wal]
127+
await _ensure_kubeconfig()
128+
async with WsApiClient() as ws_api:
129+
core_v1 = k8s_client.CoreV1Api(api_client=ws_api)
130+
resp = await core_v1.connect_get_namespaced_pod_exec(
131+
pod_name,
132+
namespace,
133+
command=cast("str", cmd),
134+
stderr=True,
135+
stdin=False,
136+
stdout=True,
137+
tty=False,
138+
)
139+
logger.info(
140+
"pg_archivecleanup for branch %s up to %s completed. Output: %s",
141+
branch.id,
142+
safe_wal,
143+
resp,
144+
)
145+
except Exception: # noqa: BLE001
146+
logger.warning("Failed to run pg_archivecleanup for branch %s", branch.id, exc_info=True)
147+
148+
149+
async def _wal_cleanup_branch(branch_id: ULID) -> None:
150+
async with _session_factory()() as db:
151+
branch = await db.get(Branch, branch_id)
152+
153+
if branch is None:
154+
logger.warning("Branch %s not found, skipping WAL cleanup", branch_id)
155+
return
156+
157+
utilization = await _get_wal_utilization(branch)
158+
if utilization is None:
159+
return
160+
161+
logger.info("WAL PVC utilization for branch %s: %.1f%%", branch.id, utilization * 100)
162+
163+
if utilization >= WAL_UTILIZATION_THRESHOLD:
164+
logger.warning(
165+
"WAL PVC for branch %s at %.1f%% — triggering compaction",
166+
branch.id,
167+
utilization * 100,
168+
)
169+
await _compact_wal(branch)
170+
171+
172+
async def _wal_cleanup() -> None:
173+
async with _session_factory()() as db:
174+
result = await db.execute(select(Branch).where(Branch.pitr_enabled == True)) # noqa: E712
175+
branch_ids = [b.id for b in result.scalars().all()]
176+
177+
logger.info("WAL cleanup: dispatching tasks for %d PITR-enabled branches", len(branch_ids))
178+
for branch_id in branch_ids:
179+
wal_cleanup_branch.delay(str(branch_id))
180+
181+
182+
@app.task(name="simplyblock.vela.deployment.wal_cleanup.wal_cleanup_branch")
183+
def wal_cleanup_branch(branch_id: str) -> None:
184+
"""Check WAL PVC utilization for a single branch and compact if >= 90%."""
185+
async_to_sync(_wal_cleanup_branch)(ULID.from_str(branch_id))
186+
187+
188+
@app.task(name="simplyblock.vela.deployment.wal_cleanup.wal_cleanup")
189+
def wal_cleanup() -> None:
190+
"""Periodic beat task: dispatch a wal_cleanup_branch task for every PITR-enabled branch."""
191+
async_to_sync(_wal_cleanup)()

src/worker/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,19 @@ class Settings(BaseSettings):
1919
# Chord callback always fires even when individual sub-tasks fail.
2020
app.conf.task_chord_propagates = False
2121

22+
# Write the beat schedule file to /tmp so it is always writable regardless of
23+
# the container's working directory permissions.
24+
app.conf.beat_schedule_filename = "/tmp/celerybeat-schedule"
25+
2226
# Celery Beat schedule — add periodic tasks here.
23-
app.conf.beat_schedule = {}
27+
app.conf.beat_schedule = {
28+
"wal-cleanup-every-5-minutes": {
29+
"task": "simplyblock.vela.deployment.wal_cleanup.wal_cleanup",
30+
"schedule": 300.0,
31+
},
32+
}
2433

2534
# Register tasks — must be imported after `app` is defined.
2635
from ..api.organization.project.branch import resize_tasks as _api_resize_tasks # noqa: E402, F401
2736
from ..deployment import resize as _deployment_resize # noqa: E402, F401
37+
from ..deployment import wal_cleanup as _deployment_wal_cleanup # noqa: E402, F401

0 commit comments

Comments
 (0)