|
17 | 17 | from fastapi.security import HTTPAuthorizationCredentials |
18 | 18 | from keycloak.exceptions import KeycloakError |
19 | 19 | from kubernetes_asyncio.client.exceptions import ApiException |
20 | | -from pydantic import AfterValidator |
| 20 | +from pydantic import AfterValidator, BaseModel |
21 | 21 | from sqlalchemy.exc import IntegrityError |
22 | 22 | from sqlmodel import select |
23 | 23 |
|
@@ -2114,3 +2114,108 @@ async def _apply_pgbouncer_settings(*, host: str, password: str, update_commands |
2114 | 2114 | def _persist_pgbouncer_settings(config: PgbouncerConfig, updates: dict[str, int]) -> None: |
2115 | 2115 | for field, value in updates.items(): |
2116 | 2116 | setattr(config, field, value) |
| 2117 | + |
| 2118 | + |
| 2119 | +class CompactWalResponse(BaseModel): |
| 2120 | + safe_wal: str |
| 2121 | + snapshot_name: str |
| 2122 | + removed_files: int | None = None |
| 2123 | + |
| 2124 | + |
| 2125 | +@api.post("/{branch_id}/compactwal", response_model=CompactWalResponse) |
| 2126 | +async def compact_branch_wal( |
| 2127 | + branch: BranchDep, |
| 2128 | +) -> CompactWalResponse: |
| 2129 | + if not branch.pitr_enabled: |
| 2130 | + raise HTTPException(status_code=400, detail="PITR is not enabled for this branch.") |
| 2131 | + |
| 2132 | + import contextlib |
| 2133 | + import time |
| 2134 | + |
| 2135 | + import asyncpg |
| 2136 | + from asyncpg import exceptions as asyncpg_exceptions |
| 2137 | + from kubernetes_asyncio import client as k8s_client |
| 2138 | + from kubernetes_asyncio import stream as k8s_stream |
| 2139 | + |
| 2140 | + from .....deployment import AUTOSCALER_PVC_SUFFIX, branch_db_domain, get_autoscaler_vm_identity |
| 2141 | + from .....deployment.kubernetes._util import _ensure_kubeconfig |
| 2142 | + from .....deployment.kubernetes.neonvm import resolve_autoscaler_vm_pod_name |
| 2143 | + from .....deployment.kubernetes.snapshot import create_snapshot_from_pvc |
| 2144 | + |
| 2145 | + db_host = branch_db_domain(branch.id) |
| 2146 | + connection = None |
| 2147 | + try: |
| 2148 | + connection = await asyncpg.connect( |
| 2149 | + user="supabase_admin", |
| 2150 | + password=branch.database_password, |
| 2151 | + database=branch.database, |
| 2152 | + host=db_host, |
| 2153 | + port=5432, |
| 2154 | + server_settings={"application_name": "vela-wal-compact"}, |
| 2155 | + command_timeout=10, |
| 2156 | + ) |
| 2157 | + safe_wal = await connection.fetchval("SELECT pg_walfile_name(redo_lsn) FROM pg_control_checkpoint();") |
| 2158 | + except (asyncpg_exceptions.PostgresError, OSError) as exc: |
| 2159 | + logger.exception("Failed to connect to database %s to determine safe WAL", branch.id) |
| 2160 | + raise HTTPException(status_code=502, detail="Failed to connect to database to determine safe WAL.") from exc |
| 2161 | + finally: |
| 2162 | + with contextlib.suppress(Exception): |
| 2163 | + if connection is not None: |
| 2164 | + await connection.close() |
| 2165 | + |
| 2166 | + if not safe_wal: |
| 2167 | + raise HTTPException(status_code=500, detail="Safe WAL query returned null.") |
| 2168 | + |
| 2169 | + namespace, vm_name = get_autoscaler_vm_identity(branch.id) |
| 2170 | + pvc_name = f"{vm_name}{AUTOSCALER_PVC_SUFFIX}" |
| 2171 | + snapshot_name = f"{str(branch.id).lower()}-compact-{int(time.time())}"[:63] |
| 2172 | + snapshot_class = "simplyblock-csi-snapshotclass" |
| 2173 | + |
| 2174 | + try: |
| 2175 | + await create_snapshot_from_pvc( |
| 2176 | + namespace=namespace, |
| 2177 | + name=snapshot_name, |
| 2178 | + snapshot_class=snapshot_class, |
| 2179 | + pvc_name=pvc_name, |
| 2180 | + ) |
| 2181 | + logger.info("Created simplyblock compaction snapshot %s for branch %s", snapshot_name, branch.id) |
| 2182 | + except ApiException as exc: |
| 2183 | + logger.exception("Failed to create snapshot for branch %s", branch.id) |
| 2184 | + raise HTTPException(status_code=500, detail="Failed to create WAL compaction snapshot.") from exc |
| 2185 | + |
| 2186 | + removed_files = None |
| 2187 | + try: |
| 2188 | + pod_name = await resolve_autoscaler_vm_pod_name(namespace, vm_name) |
| 2189 | + cmd = ["ssh", "guest-vm", "pg_archivecleanup", "/var/lib/postgresql/wal/pg_wal", safe_wal] |
| 2190 | + await _ensure_kubeconfig() |
| 2191 | + async with k8s_client.ApiClient() as api: |
| 2192 | + core_v1 = k8s_client.CoreV1Api(api_client=api) |
| 2193 | + resp = await k8s_stream.stream( |
| 2194 | + core_v1.connect_get_namespaced_pod_exec, |
| 2195 | + pod_name, |
| 2196 | + namespace, |
| 2197 | + command=cmd, |
| 2198 | + stderr=True, |
| 2199 | + stdin=False, |
| 2200 | + stdout=True, |
| 2201 | + tty=False, |
| 2202 | + ) |
| 2203 | + logger.info( |
| 2204 | + "pg_archivecleanup for branch %s up to %s completed. Output: %s", |
| 2205 | + branch.id, |
| 2206 | + safe_wal, |
| 2207 | + resp, |
| 2208 | + ) |
| 2209 | + removed_files = 0 |
| 2210 | + except Exception as exc: |
| 2211 | + logger.warning( |
| 2212 | + "Failed to run pg_archivecleanup for branch %s, old WALs may not be deleted: %s", |
| 2213 | + branch.id, |
| 2214 | + exc, |
| 2215 | + ) |
| 2216 | + |
| 2217 | + return CompactWalResponse( |
| 2218 | + safe_wal=safe_wal, |
| 2219 | + snapshot_name=snapshot_name, |
| 2220 | + removed_files=removed_files, |
| 2221 | + ) |
0 commit comments