diff --git a/src/_util/__init__.py b/src/_util/__init__.py index e82c80d8..6d703b60 100644 --- a/src/_util/__init__.py +++ b/src/_util/__init__.py @@ -243,6 +243,16 @@ def quantity_to_milli_cpu(value: str | Decimal | None) -> int | None: return int(quantity * Decimal(1000)) +def quantity_to_bytes(value: str | Decimal | None) -> int | None: + """Convert a Kubernetes quantity (e.g. '1Gi', '500Mi') to bytes.""" + + quantity = _normalize_quantity(value) + if quantity is None: + return None + + return int(quantity) + + def permissive_numeric_timedelta(value: Any) -> Any: """Parses the given value into timedelta diff --git a/src/api/backup.py b/src/api/backup.py index 9f30da70..0856b512 100644 --- a/src/api/backup.py +++ b/src/api/backup.py @@ -13,6 +13,7 @@ from ulid import ULID from ..database import SessionDep +from ..deployment.storage_backends import get_storage_backend from ..models._util import Identifier from ..models.backups import ( BackupCreatePublic, @@ -46,7 +47,6 @@ # --------------------------- # Constants # --------------------------- -VOLUME_SNAPSHOT_CLASS = os.environ.get("VOLUME_SNAPSHOT_CLASS", "simplyblock-csi-snapshotclass") MANUAL_BACKUP_TIMEOUT_SEC = int(os.environ.get("MANUAL_BACKUP_TIMEOUT_SEC", "10")) UNIT_MULTIPLIER = { @@ -189,6 +189,10 @@ class BackupScheduleUpdate(SchedulePayload): logger = logging.getLogger(__name__) +def _resolve_snapshot_class() -> str: + return get_storage_backend().resolve_snapshot_class() + + # --------------------------- # Create/Update Schedule # --------------------------- @@ -542,7 +546,7 @@ async def manual_backup(session: SessionDep, branch_id: Identifier) -> BackupCre data_snapshot, wal_snapshot = await create_branch_snapshot_bundle( branch.id, backup_id=backup_id, - snapshot_class=VOLUME_SNAPSHOT_CLASS, + snapshot_class=_resolve_snapshot_class(), poll_interval=SNAPSHOT_POLL_INTERVAL_SEC, label="manual", time_limit=MANUAL_BACKUP_TIMEOUT_SEC, diff --git a/src/api/backup_snapshots.py b/src/api/backup_snapshots.py index 04c7c8f2..6ec9f2aa 100644 --- a/src/api/backup_snapshots.py +++ b/src/api/backup_snapshots.py @@ -20,8 +20,8 @@ read_snapshot_content, wait_snapshot_ready, ) -from ..deployment.simplyblock_api import create_simplyblock_api -from ..exceptions import VelaKubernetesError, VelaSimplyblockAPIError, VelaSnapshotTimeoutError +from ..deployment.storage_backends import get_storage_backend +from ..exceptions import VelaDeploymentError, VelaKubernetesError, VelaSimplyblockAPIError, VelaSnapshotTimeoutError if TYPE_CHECKING: from ulid import ULID @@ -285,19 +285,7 @@ async def branch_snapshots_used_size( raise VelaSimplyblockAPIError("Invalid snapshot UUID in backup entries") from exc try: - async with create_simplyblock_api() as sb_api: - snapshots = await sb_api.list_snapshots() - except httpx.HTTPError as exc: - logger.exception( - "Failed to list Simplyblock snapshots for backup entries", - ) - raise VelaSimplyblockAPIError("Failed to list Simplyblock snapshots") from exc - - used_size_by_id: dict[UUID, int] = {snapshot.id: snapshot.used_size for snapshot in snapshots} - - missing_ids = set(snapshot_ids) - set(used_size_by_id) - if missing_ids: - missing = ", ".join(str(snapshot_id) for snapshot_id in sorted(missing_ids, key=str)) - raise VelaSimplyblockAPIError(f"Missing snapshots in Simplyblock response: {missing}") - - return sum(used_size_by_id[snapshot_id] for snapshot_id in snapshot_ids) + used_size = await get_storage_backend().get_snapshot_used_size(snapshot_ids) + except VelaDeploymentError as exc: + raise VelaSimplyblockAPIError("Failed to collect snapshot usage from storage backend") from exc + return int(used_size or 0) diff --git a/src/api/backupmonitor.py b/src/api/backupmonitor.py index 287116ba..c611ee1f 100644 --- a/src/api/backupmonitor.py +++ b/src/api/backupmonitor.py @@ -10,6 +10,7 @@ from sqlmodel import SQLModel, asc, delete, select from ulid import ULID +from ..deployment.storage_backends import get_storage_backend from ..models.backups import ( BackupEntry, BackupLog, @@ -32,11 +33,15 @@ # --------------------------- # Config # --------------------------- -VOLUME_SNAPSHOT_CLASS = os.environ.get("VOLUME_SNAPSHOT_CLASS", "simplyblock-csi-snapshotclass") POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "60")) logger = logging.getLogger(__name__) + +def _resolve_snapshot_class() -> str: + return get_storage_backend().resolve_snapshot_class() + + UNIT_MULTIPLIER = { "min": 60, "minute": 60, @@ -311,7 +316,7 @@ async def execute_backup(self, db: AsyncSession, branch: Branch, row: BackupSche data_snapshot, wal_snapshot = await create_branch_snapshot_bundle( branch.id, backup_id=backup_id, - snapshot_class=VOLUME_SNAPSHOT_CLASS, + snapshot_class=_resolve_snapshot_class(), poll_interval=SNAPSHOT_POLL_INTERVAL_SEC, label=f"row-{row.row_index}", time_limit=SNAPSHOT_TIMEOUT_SEC, diff --git a/src/api/organization/project/branch/__init__.py b/src/api/organization/project/branch/__init__.py index 73753284..daddfd47 100644 --- a/src/api/organization/project/branch/__init__.py +++ b/src/api/organization/project/branch/__init__.py @@ -7,6 +7,7 @@ import secrets from collections.abc import Sequence from datetime import UTC, datetime, timedelta +from decimal import Decimal from typing import Annotated, Any, Literal, TypedDict, cast from urllib.parse import urlsplit, urlunsplit @@ -21,23 +22,27 @@ from sqlalchemy.exc import IntegrityError from sqlmodel import select -from ....._util import DEFAULT_DB_NAME, DEFAULT_DB_USER, Identifier, storage_backend_bytes_to_db_bytes +from ....._util import DEFAULT_DB_NAME, DEFAULT_DB_USER, IOPS_MIN, Identifier, storage_backend_bytes_to_db_bytes from ....._util.crypto import encrypt_with_passphrase, generate_keys from .....database import AsyncSessionLocal, SessionDep from .....deployment import ( + AUTOSCALER_PVC_SUFFIX, + STORAGE_PVC_SUFFIX, DeploymentParameters, ResizeParameters, branch_api_domain, branch_db_domain, branch_rest_endpoint, branch_service_name, + clone_branch_database_volume_with_backend, delete_deployment, deploy_branch_environment, - ensure_branch_storage_class, get_autoscaler_vm_identity, kube_service, resolve_branch_database_volume_size, + restore_branch_database_volume_from_snapshot_with_backend, update_branch_database_password, + update_branch_volume_iops, ) from .....deployment._util import deployment_namespace from .....deployment.health import ( @@ -47,11 +52,8 @@ from .....deployment.kubernetes._util import core_v1_client from .....deployment.kubernetes.neonvm import PowerState as NeonVMPowerState from .....deployment.kubernetes.neonvm import set_virtualmachine_power_state -from .....deployment.kubernetes.volume_clone import ( - clone_branch_database_volume, - restore_branch_database_volume_from_snapshot, -) from .....deployment.settings import get_settings as get_deployment_settings +from .....deployment.storage_backends import get_storage_backend from .....exceptions import VelaDeploymentError, VelaError, VelaKubernetesError, VelaSimplyblockAPIError from .....models.backups import BackupEntry from .....models.branch import ( @@ -83,12 +85,6 @@ ) from ...._util.role import clone_user_role_assignment from ....auth import security -from ....backup_snapshots import ( - SNAPSHOT_POLL_INTERVAL_SEC as _SNAPSHOT_POLL_INTERVAL_SECONDS, -) -from ....backup_snapshots import ( - SNAPSHOT_TIMEOUT_SEC as _SNAPSHOT_TIMEOUT_SECONDS, -) from ....backup_snapshots import branch_snapshots_used_size from ....dependencies import BranchDep, OrganizationDep, ProjectDep, backup_lookup, branch_lookup from ....keycloak import realm_admin @@ -294,12 +290,6 @@ async def _refresh_branch_status(branch: Branch) -> BranchServiceStatus: ) -_PVC_TIMEOUT_SECONDS = float(600) -_PVC_CLONE_TIMEOUT_SECONDS = float(120) -_PVC_POLL_INTERVAL_SECONDS = float(2) -_VOLUME_SNAPSHOT_CLASS = "simplyblock-csi-snapshotclass" - - _PGBOUNCER_ADMIN_USER = "pgbouncer_admin" _PGBOUNCER_ADMIN_DATABASE = "pgbouncer" _PGBOUNCER_SERVICE_PORT = 6432 @@ -427,6 +417,19 @@ class _DeploymentResourceValues(TypedDict): iops: int | None +def _backend_requires_iops() -> bool: + capabilities = get_storage_backend().get_capabilities().capabilities + return capabilities.supports_volume_iops + + +def _effective_iops(iops: int | None) -> int: + if iops is not None: + return iops + if _backend_requires_iops(): + raise HTTPException(status_code=422, detail="iops is required for the configured storage backend") + return IOPS_MIN + + def _normalize_size_from_source(value: int | None) -> int | None: if value is None: return None @@ -516,7 +519,7 @@ def _validate_deployment_requirements( status_code=422, detail="memory_bytes is required when cloning from a source branch", ) - if resources["iops"] is None: + if _backend_requires_iops() and resources["iops"] is None: raise HTTPException( status_code=422, detail="iops is required when cloning from a source branch", @@ -547,7 +550,7 @@ def _deployment_parameters_from_source( database_size = cast("int", resource_values["database_size"]) milli_vcpu = cast("int", resource_values["milli_vcpu"]) memory_bytes = cast("int", resource_values["memory_bytes"]) - iops = cast("int", resource_values["iops"]) + iops = _effective_iops(resource_values["iops"]) storage_size = resource_values["storage_size"] if enable_file_storage else None _ensure_clone_storage_capacity( @@ -595,10 +598,13 @@ def _resolve_database_size_against_source( def _resource_limits_from_deployment(parameters: DeploymentParameters) -> ResourceLimitsPublic: + iops_for_limits = ( + parameters.iops if parameters.iops is not None else (IOPS_MIN if _backend_requires_iops() else None) + ) return ResourceLimitsPublic( milli_vcpu=parameters.milli_vcpu, ram=parameters.memory_bytes, - iops=parameters.iops, + iops=iops_for_limits, database_size=parameters.database_size, storage_size=parameters.storage_size, ) @@ -651,7 +657,7 @@ async def _build_branch_entity( storage_size=clone_parameters.storage_size if clone_parameters.enable_file_storage else None, milli_vcpu=clone_parameters.milli_vcpu, memory=clone_parameters.memory_bytes, - iops=clone_parameters.iops, + iops=_effective_iops(clone_parameters.iops), database_image_tag=clone_parameters.database_image_tag, env_type=env_type, enable_file_storage=clone_parameters.enable_file_storage, @@ -676,7 +682,7 @@ async def _build_branch_entity( storage_size=deployment_params.storage_size if deployment_params.enable_file_storage else None, milli_vcpu=deployment_params.milli_vcpu, memory=deployment_params.memory_bytes, - iops=deployment_params.iops, + iops=_effective_iops(deployment_params.iops), database_image_tag=deployment_params.database_image_tag, env_type=env_type, enable_file_storage=deployment_params.enable_file_storage, @@ -689,6 +695,118 @@ async def _build_branch_entity( return entity +type BranchResizeService = Literal[ + "database_disk_resize", + "storage_api_disk_resize", + "database_cpu_resize", + "database_memory_resize", + "database_iops_resize", +] + + +_PARAMETER_TO_SERVICE: dict[CapaResizeKey, BranchResizeService] = { + "database_size": "database_disk_resize", + "storage_size": "storage_api_disk_resize", + "milli_vcpu": "database_cpu_resize", + "memory_bytes": "database_memory_resize", + "iops": "database_iops_resize", +} + + +def _track_resize_change( + *, + parameter_key: CapaResizeKey, + new_value: int | None, + current_value: int | None, + statuses: dict[str, dict[str, Any]], + effective: dict[CapaResizeKey, int], + timestamp: str, +) -> None: + service_key = _PARAMETER_TO_SERVICE[parameter_key] + if new_value is None: + return + if new_value != current_value: + effective[parameter_key] = new_value + entry: dict[str, Any] = {"status": "PENDING", "timestamp": timestamp, "requested_at": timestamp} + statuses[service_key] = entry + elif statuses.get(service_key, {}).get("status") == "PENDING": + statuses.pop(service_key, None) + + +async def _apply_resize_operations( + session: SessionDep, + branch: Branch, + effective_parameters: dict[CapaResizeKey, int], +) -> None: + namespace, autoscaler_vm_name = get_autoscaler_vm_identity(branch.id) + if "database_size" in effective_parameters: + new_database_size = effective_parameters["database_size"] + pvc_name = f"{autoscaler_vm_name}{AUTOSCALER_PVC_SUFFIX}" + storage_size_bytes = str(new_database_size) + await kube_service.resize_pvc_storage(namespace, pvc_name, storage_size_bytes) + + if "storage_size" in effective_parameters: + new_storage_size = effective_parameters["storage_size"] + pvc_name = f"{autoscaler_vm_name}{STORAGE_PVC_SUFFIX}" + storage_size_bytes = str(new_storage_size) + await kube_service.resize_pvc_storage(namespace, pvc_name, storage_size_bytes) + + if "iops" in effective_parameters: + new_iops = effective_parameters["iops"] + backend_capabilities = get_storage_backend().get_capabilities() + supports_runtime_iops_update = backend_capabilities.capabilities.supports_volume_iops_update + if supports_runtime_iops_update: + await update_branch_volume_iops(branch.id, new_iops) + elif backend_capabilities.qos_policy == "strict": + raise VelaDeploymentError( + f"Storage backend {backend_capabilities.backend!r} does not support runtime IOPS updates" + ) + else: + logger.info( + "Skipping runtime IOPS update for branch %s because backend %s does not support it (best_effort)", + branch.id, + backend_capabilities.backend, + ) + branch.iops = new_iops + await create_or_update_branch_provisioning( + session, + branch, + ResourceLimitsPublic(iops=new_iops), + commit=False, + ) + + milli_vcpu = effective_parameters.get("milli_vcpu", branch.milli_vcpu) + memory = effective_parameters.get("memory_bytes", branch.memory) + + cpu_changed = "milli_vcpu" in effective_parameters + memory_changed = "memory_bytes" in effective_parameters + + if cpu_changed or memory_changed: + await kube_service.resize_autoscaler_vm( + namespace, + autoscaler_vm_name, + cpu=Decimal(milli_vcpu) / 1000, + memory_bytes=memory, + ) + if cpu_changed: + branch.milli_vcpu = milli_vcpu + await create_or_update_branch_provisioning( + session, + branch, + ResourceLimitsPublic(milli_vcpu=milli_vcpu), + commit=False, + ) + + if memory_changed: + branch.memory = memory + await create_or_update_branch_provisioning( + session, + branch, + ResourceLimitsPublic(ram=memory), + commit=False, + ) + + async def _deploy_branch_environment_task( *, organization_id: Identifier, @@ -748,19 +866,11 @@ async def _clone_branch_environment_task( pitr_enabled: bool, ) -> None: await _persist_branch_status(branch_id, BranchServiceStatus.CREATING) - storage_class_name: str | None = None if copy_data: try: - storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops) - await clone_branch_database_volume( + await clone_branch_database_volume_with_backend( source_branch_id=source_branch_id, target_branch_id=branch_id, - snapshot_class=_VOLUME_SNAPSHOT_CLASS, - storage_class_name=storage_class_name, - snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS, - snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS, - pvc_timeout_seconds=_PVC_CLONE_TIMEOUT_SECONDS, - pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS, database_size=parameters.database_size, ) except VelaError: @@ -822,21 +932,13 @@ async def _restore_branch_environment_task( pitr_enabled: bool, ) -> None: await _persist_branch_status(branch_id, BranchServiceStatus.CREATING) - storage_class_name: str | None = None try: - storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops) - await restore_branch_database_volume_from_snapshot( + await restore_branch_database_volume_from_snapshot_with_backend( source_branch_id=source_branch_id, target_branch_id=branch_id, snapshot_namespace=snapshot_namespace, snapshot_name=snapshot_name, snapshot_content_name=snapshot_content_name, - snapshot_class=_VOLUME_SNAPSHOT_CLASS, - storage_class_name=storage_class_name, - snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS, - snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS, - pvc_timeout_seconds=_PVC_TIMEOUT_SECONDS, - pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS, database_size=restore_database_size, ) except VelaError: @@ -904,19 +1006,12 @@ async def _restore_branch_environment_in_place_task( return try: - storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops) - await restore_branch_database_volume_from_snapshot( + await restore_branch_database_volume_from_snapshot_with_backend( source_branch_id=source_branch_id, target_branch_id=branch_id, snapshot_namespace=snapshot_namespace, snapshot_name=snapshot_name, snapshot_content_name=snapshot_content_name, - snapshot_class=_VOLUME_SNAPSHOT_CLASS, - storage_class_name=storage_class_name, - snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS, - snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS, - pvc_timeout_seconds=_PVC_TIMEOUT_SECONDS, - pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS, database_size=parameters.database_size, ) except VelaError: diff --git a/src/api/resources.py b/src/api/resources.py index 5ac88ad5..236f61f6 100644 --- a/src/api/resources.py +++ b/src/api/resources.py @@ -1,9 +1,7 @@ import asyncio import logging -from collections.abc import Awaitable, Callable from datetime import UTC, datetime from typing import TYPE_CHECKING, Any, cast -from uuid import UUID from fastapi import APIRouter, Depends, HTTPException from kubernetes.utils import parse_quantity @@ -16,14 +14,11 @@ from ..database import SessionDep from ..deployment import ( get_autoscaler_vm_identity, - resolve_autoscaler_volume_identifiers, - resolve_autoscaler_wal_volume_identifiers, - resolve_storage_volume_identifiers, ) from ..deployment.kubernetes._util import custom_api_client from ..deployment.kubernetes.neonvm import resolve_autoscaler_vm_pod_name -from ..deployment.simplyblock_api import create_simplyblock_api -from ..exceptions import VelaSimplyblockAPIError +from ..deployment.storage_backends import get_storage_backend +from ..exceptions import VelaDeploymentError from ..models._util import Identifier from ..models.branch import Branch, BranchServiceStatus, ResourceUsageDefinition from ..models.project import Project @@ -337,48 +332,34 @@ async def _collect_compute_usage(namespace: str, vm_name: str) -> tuple[int, int return _parse_compute_usage(metrics) -async def _resolve_volume_stats( - *, - volume_identifier_resolver: Callable[[str], Awaitable[tuple[UUID, UUID | None]]], - namespace: str, -) -> dict[str, int]: - volume, _ = await volume_identifier_resolver(namespace) +async def _collect_database_volume_usage(branch: Branch) -> tuple[int, int]: + usage = await get_storage_backend().get_branch_volume_usage(branch.id, volume_type="database") + if usage is None: + raise VelaDeploymentError(f"Database volume usage unavailable for branch {branch.id}") + nvme_bytes = int(usage.used_bytes or 0) + iops = int(usage.read_iops or 0) + int(usage.write_iops or 0) + return nvme_bytes, iops - async with create_simplyblock_api() as sb_api: - return await sb_api.volume_iostats(volume=volume) - -async def _collect_database_volume_usage(namespace: str) -> tuple[int, int]: - stats = await _resolve_volume_stats( - volume_identifier_resolver=resolve_autoscaler_volume_identifiers, - namespace=namespace, - ) - nvme_bytes = stats["size_used"] - read_iops = stats["read_io_ps"] - write_iops = stats["write_io_ps"] - return nvme_bytes, read_iops + write_iops - - -async def _collect_storage_volume_usage(namespace: str) -> int: - stats = await _resolve_volume_stats( - volume_identifier_resolver=resolve_storage_volume_identifiers, - namespace=namespace, - ) - return stats["size_used"] +async def _collect_storage_volume_usage(branch: Branch) -> int: + usage = await get_storage_backend().get_branch_volume_usage(branch.id, volume_type="storage") + if usage is None: + raise VelaDeploymentError(f"Storage volume usage unavailable for branch {branch.id}") + return int(usage.used_bytes or 0) -async def _collect_wal_volume_usage(namespace: str) -> int: - stats = await _resolve_volume_stats( - volume_identifier_resolver=resolve_autoscaler_wal_volume_identifiers, - namespace=namespace, - ) - return stats["size_used"] +async def _collect_wal_volume_usage(branch: Branch) -> int: + usage = await get_storage_backend().get_branch_volume_usage(branch.id, volume_type="wal") + if usage is None: + raise VelaDeploymentError(f"WAL volume usage unavailable for branch {branch.id}") + return int(usage.used_bytes or 0) async def _collect_branch_volume_usage(branch: Branch, namespace: str) -> tuple[int, int, int | None, int | None]: - db_task = _collect_database_volume_usage(namespace) + _ = namespace + db_task = _collect_database_volume_usage(branch) if branch.enable_file_storage: - storage_task = _collect_storage_volume_usage(namespace) + storage_task = _collect_storage_volume_usage(branch) (nvme_bytes, iops), storage_bytes = await asyncio.gather(db_task, storage_task) else: nvme_bytes, iops = await db_task @@ -386,7 +367,7 @@ async def _collect_branch_volume_usage(branch: Branch, namespace: str) -> tuple[ wal_bytes = None if branch.pitr_enabled: - wal_bytes = await _collect_wal_volume_usage(namespace) + wal_bytes = await _collect_wal_volume_usage(branch) return nvme_bytes, iops, storage_bytes, wal_bytes @@ -407,16 +388,23 @@ async def _collect_branch_resource_usage(branch: Branch) -> ResourceUsageDefinit raise milli_vcpu, ram_bytes = compute_usage + capabilities = get_storage_backend().get_capabilities().capabilities + supports_storage_metrics = capabilities.supports_volume_usage_storage_metrics + supports_qos_metrics = capabilities.supports_volume_usage_qos_metrics + should_collect_volume_usage = supports_storage_metrics or supports_qos_metrics nvme_bytes, iops, storage_bytes, wal_bytes = 0, 0, None, None - try: - nvme_bytes, iops, storage_bytes, wal_bytes = await _collect_branch_volume_usage(branch, namespace) - except VelaSimplyblockAPIError as exc: - logger.error( - "Failed to collect volume stats for branch %s (namespace %s): %s", - branch.id, - namespace, - exc, - ) + volume_metrics_available = should_collect_volume_usage + if should_collect_volume_usage: + try: + nvme_bytes, iops, storage_bytes, wal_bytes = await _collect_branch_volume_usage(branch, namespace) + except VelaDeploymentError as exc: + volume_metrics_available = False + logger.error( + "Failed to collect volume stats for branch %s (namespace %s): %s", + branch.id, + namespace, + exc, + ) return ResourceUsageDefinition( milli_vcpu=milli_vcpu, @@ -425,6 +413,7 @@ async def _collect_branch_resource_usage(branch: Branch) -> ResourceUsageDefinit iops=iops, storage_bytes=storage_bytes, wal_bytes=wal_bytes, + volume_metrics_available=volume_metrics_available, ) diff --git a/src/api/system.py b/src/api/system.py index fa7fd47e..f8fede5e 100644 --- a/src/api/system.py +++ b/src/api/system.py @@ -22,6 +22,7 @@ VCPU_MILLIS_STEP, ) from ..database import SessionDep +from ..deployment.storage_backends import StorageCapabilitiesPublic, get_storage_backend from ..models.resources import OrganizationLimitDefault, ResourceLimitDefinitionPublic, ResourceType from ..models.role import AccessRight from .auth import authenticated_user @@ -40,6 +41,11 @@ class SystemVersion(BaseModel): timestamp: str = "" +@api.get("/storage-capabilities", dependencies=[Depends(authenticated_user)], response_model=StorageCapabilitiesPublic) +async def get_storage_capabilities() -> StorageCapabilitiesPublic: + return get_storage_backend().get_capabilities() + + @api.get("/version", response_model=SystemVersion) async def get_system_version() -> SystemVersion: """ @@ -69,6 +75,7 @@ async def list_resource_limit_definitions( ) -> list[ResourceLimitDefinitionPublic]: defaults_result = await session.execute(select(OrganizationLimitDefault)) defaults = {d.resource: d for d in defaults_result.scalars().all()} + storage_capabilities = get_storage_backend().get_capabilities().capabilities def _get_limit(resource_type: ResourceType, default: int) -> int: d = defaults.get(resource_type) @@ -80,14 +87,13 @@ def _get_limit(resource_type: ResourceType, default: int) -> int: max_database_size_bytes = _get_limit(ResourceType.database_size, DB_SIZE_MAX) max_storage_size_bytes = _get_limit(ResourceType.storage_size, STORAGE_SIZE_MAX) - return [ + definitions = [ ResourceLimitDefinitionPublic( resource_type="milli_vcpu", min=VCPU_MILLIS_MIN, max=max_vcpu_millis, step=VCPU_MILLIS_STEP, unit="Millis" ), ResourceLimitDefinitionPublic( resource_type="ram", min=MEMORY_MIN, max=max_ram_bytes, step=MEMORY_STEP, unit="MiB" ), - ResourceLimitDefinitionPublic(resource_type="iops", min=IOPS_MIN, max=max_iops, step=IOPS_STEP, unit="IOPS"), ResourceLimitDefinitionPublic( resource_type="database_size", min=DB_SIZE_MIN, max=max_database_size_bytes, step=DB_SIZE_STEP, unit="GB" ), @@ -99,6 +105,18 @@ def _get_limit(resource_type: ResourceType, default: int) -> int: unit="GB", ), ] + if storage_capabilities.supports_volume_iops: + definitions.insert( + 2, + ResourceLimitDefinitionPublic( + resource_type="iops", + min=IOPS_MIN, + max=max_iops, + step=IOPS_STEP, + unit="IOPS", + ), + ) + return definitions @api.get("/available-postgresql-versions", dependencies=[Depends(authenticated_user)]) diff --git a/src/deployment/__init__.py b/src/deployment/__init__.py index f3011423..ba479118 100644 --- a/src/deployment/__init__.py +++ b/src/deployment/__init__.py @@ -1,5 +1,4 @@ import asyncio -import base64 import json import logging import math @@ -10,14 +9,13 @@ from datetime import datetime from importlib import resources from typing import TYPE_CHECKING, Annotated, Any, Literal, cast -from uuid import UUID import asyncpg import yaml from cloudflare import AsyncCloudflare, CloudflareError from kubernetes_asyncio import client as kubernetes_client from kubernetes_asyncio.client.exceptions import ApiException -from pydantic import BaseModel, Field, ValidationError, model_validator +from pydantic import BaseModel, Field, model_validator from ulid import ULID from .._util import ( @@ -29,6 +27,7 @@ DEFAULT_DB_NAME, DEFAULT_DB_USER, IOPS_CONSTRAINTS, + IOPS_MIN, MEMORY_CONSTRAINTS, STORAGE_SIZE_CONSTRAINTS, VCPU_MILLIS_MAX, @@ -44,7 +43,6 @@ VelaDeploymentError, VelaGrafanaError, VelaKubernetesError, - VelaSimplyblockAPIError, ) from ._util import deployment_namespace from .deployment import DeploymentParameters, database_image_tag_to_database_images @@ -53,7 +51,8 @@ from .kubernetes._util import custom_api_client from .monitors.health import vm_monitor from .settings import CloudflareSettings, get_settings -from .simplyblock_api import create_simplyblock_api +from .storage_backends import SnapshotRef, VolumeQosProfile, get_storage_backend +from .storage_backends.simplyblock import ensure_branch_storage_class as ensure_simplyblock_storage_class if TYPE_CHECKING: from cloudflare.types.dns.record_list_params import Name as CloudflareRecordName @@ -71,9 +70,6 @@ CHECK_ENCRYPTED_HEADER_PLUGIN_NAME = "check-x-connection-encrypted" APIKEY_JWT_PLUGIN_NAME = "apikey-jwt" CPU_REQUEST_FRACTION = 0.25 # request = 25% of limit -SIMPLYBLOCK_CSI_CONFIGMAP = "simplyblock-csi-cm" -SIMPLYBLOCK_CSI_SECRET = "simplyblock-csi-secret" -SIMPLYBLOCK_CSI_STORAGE_CLASS = "simplyblock-csi-sc" STORAGE_PVC_SUFFIX = "-storage-pvc" DATABASE_PVC_SUFFIX = "-db-pvc" AUTOSCALER_PVC_SUFFIX = "-block-data" @@ -238,150 +234,71 @@ async def _initialize_autoscaler_overlay_endpoints(namespace: str) -> None: await _ensure_autoscaler_overlay_endpoint_slices(namespace, overlay_ip) -def _build_storage_class_manifest(*, storage_class_name: str, iops: int, base_storage_class: Any) -> dict[str, Any]: - provisioner = getattr(base_storage_class, "provisioner", None) - if not provisioner: - raise VelaKubernetesError("Base storage class missing provisioner") - - base_parameters = dict(getattr(base_storage_class, "parameters", {}) or {}) - cluster_id = base_parameters.get("cluster_id") - if not cluster_id: - raise VelaKubernetesError("Base storage class missing required parameter 'cluster_id'") - - parameters = {key: str(value) for key, value in base_parameters.items()} - parameters.update( - { - "qos_rw_iops": str(iops), - "qos_rw_mbytes": "0", - "qos_r_mbytes": "0", - "qos_w_mbytes": "0", - } - ) - - allow_volume_expansion = getattr(base_storage_class, "allow_volume_expansion", None) - volume_binding_mode = getattr(base_storage_class, "volume_binding_mode", None) - reclaim_policy = getattr(base_storage_class, "reclaim_policy", None) - mount_options = getattr(base_storage_class, "mount_options", None) - - manifest: dict[str, Any] = { - "apiVersion": "storage.k8s.io/v1", - "kind": "StorageClass", - "metadata": { - "name": storage_class_name, - }, - "provisioner": provisioner, - "parameters": parameters, - } - if reclaim_policy is not None: - manifest["reclaimPolicy"] = reclaim_policy - if volume_binding_mode is not None: - manifest["volumeBindingMode"] = volume_binding_mode - if allow_volume_expansion is not None: - manifest["allowVolumeExpansion"] = bool(allow_volume_expansion) - if mount_options: - manifest["mountOptions"] = list(mount_options) - - return manifest - - -async def load_simplyblock_credentials() -> tuple[str, UUID, str, str]: - simplyblock_namespace = get_settings().simplyblock_csi_namespace - try: - config_map = await kube_service.get_config_map(simplyblock_namespace, SIMPLYBLOCK_CSI_CONFIGMAP) - config = json.loads(config_map.data["config.json"]) - cluster_endpoint = config["simplybk"]["ip"].rstrip("/") - cluster_id = UUID(config["simplybk"]["uuid"]) - - encoded_secret = await kube_service.get_secret(simplyblock_namespace, SIMPLYBLOCK_CSI_SECRET) - secret = json.loads(base64.b64decode(encoded_secret.data["secret.json"]).decode()) - cluster_secret = secret["simplybk"]["secret"] - - storage_class = await kube_service.get_storage_class(SIMPLYBLOCK_CSI_STORAGE_CLASS) - pool_name = storage_class.parameters["pool_name"] - - return cluster_endpoint, cluster_id, cluster_secret, pool_name - except (KeyError, TypeError, ValueError, json.JSONDecodeError, VelaKubernetesError) as e: - raise VelaDeploymentError("Failed to load simplyblock credentials") from e - - -async def _resolve_volume_identifiers(namespace: str, pvc_name: str) -> tuple[UUID, UUID | None]: - pvc = await kube_service.get_persistent_volume_claim(namespace, pvc_name) - pvc_spec = getattr(pvc, "spec", None) - volume_name = getattr(pvc_spec, "volume_name", None) if pvc_spec else None - if not volume_name: - raise VelaDeploymentError(f"PersistentVolumeClaim {namespace}/{pvc_name} is not bound to a PersistentVolume") - - pv = await kube_service.get_persistent_volume(volume_name) - pv_spec = getattr(pv, "spec", None) - csi_spec = getattr(pv_spec, "csi", None) if pv_spec else None - volume_attributes = getattr(csi_spec, "volume_attributes", None) if csi_spec else None - if not isinstance(volume_attributes, dict): - raise VelaDeploymentError( - f"PersistentVolume {volume_name} missing CSI volume attributes; cannot resolve Simplyblock volume UUID" - ) - volume_uuid = volume_attributes.get("uuid") - volume_cluster_id = volume_attributes.get("cluster_id") - if not volume_uuid: - raise VelaDeploymentError(f"PersistentVolume {volume_name} missing 'uuid' attribute in CSI volume attributes") - return UUID(volume_uuid), UUID(volume_cluster_id) if volume_cluster_id is not None else None - - -async def resolve_storage_volume_identifiers(namespace: str) -> tuple[UUID, UUID | None]: - pvc_name = f"{_autoscaler_vm_name()}{STORAGE_PVC_SUFFIX}" - return await _resolve_volume_identifiers(namespace, pvc_name) - - -async def resolve_autoscaler_volume_identifiers(namespace: str) -> tuple[UUID, UUID | None]: - pvc_name = f"{_autoscaler_vm_name()}{AUTOSCALER_PVC_SUFFIX}" - return await _resolve_volume_identifiers(namespace, pvc_name) - - -async def resolve_autoscaler_wal_volume_identifiers(namespace: str) -> tuple[UUID, UUID | None]: - pvc_name = f"{_autoscaler_vm_name()}{AUTOSCALER_WAL_PVC_SUFFIX}" - return await _resolve_volume_identifiers(namespace, pvc_name) - - async def resolve_branch_database_volume_size(branch_id: Identifier) -> int: - namespace = deployment_namespace(branch_id) - volume, _ = await resolve_autoscaler_volume_identifiers(namespace) try: - async with create_simplyblock_api() as sb_api: - volume_payload = await sb_api.get_volume(volume=volume) - return volume_payload.size - except (VelaSimplyblockAPIError, ValidationError) as exc: + volume = await get_storage_backend().lookup_volume(branch_id) + except VelaDeploymentError as exc: raise VelaDeploymentError(f"Failed to resolve database volume size for branch {branch_id}") from exc + if volume is None: + raise VelaDeploymentError(f"Failed to resolve database volume size for branch {branch_id}: volume not found") + return volume.size_bytes async def update_branch_volume_iops(branch_id: Identifier, iops: int) -> None: - namespace = deployment_namespace(branch_id) - - volume, _ = await resolve_autoscaler_volume_identifiers(namespace) try: - async with create_simplyblock_api() as sb_api: - await sb_api.update_volume(volume=volume, payload={"max_rw_iops": iops}) - except VelaSimplyblockAPIError as exc: + volume = await get_storage_backend().lookup_volume(branch_id) + if volume is None: + raise VelaDeploymentError(f"No branch volume found for {branch_id}") + await volume.update_performance(VolumeQosProfile(max_read_write_iops=iops)) + except VelaDeploymentError as exc: raise VelaDeploymentError("Failed to update volume") from exc - logger.info("Updated Simplyblock volume %s IOPS to %s", volume, iops) + logger.info("Updated branch %s IOPS to %s", branch_id, iops) async def ensure_branch_storage_class(branch_id: Identifier, *, iops: int) -> str: - storage_class_name = branch_storage_class_name(branch_id) - try: - await kube_service.get_storage_class(storage_class_name) - logger.info("StorageClass %s already exists; reusing", storage_class_name) - return storage_class_name - except VelaKubernetesError: - pass - - base_storage_class = await kube_service.get_storage_class(SIMPLYBLOCK_CSI_STORAGE_CLASS) - storage_class_manifest = _build_storage_class_manifest( - storage_class_name=storage_class_name, - iops=iops, - base_storage_class=base_storage_class, + backend = get_storage_backend() + if backend.name == "simplyblock": + return await ensure_simplyblock_storage_class(branch_id, iops=iops) + return backend.resolve_storage_class() + + +async def clone_branch_database_volume_with_backend( + *, + source_branch_id: Identifier, + target_branch_id: Identifier, + database_size: int, + pitr_enabled: bool = False, +) -> None: + backend = get_storage_backend() + await backend.clone_branch_database_volume( + source_identifier=source_branch_id, + target_identifier=target_branch_id, + database_size=database_size, + pitr_enabled=pitr_enabled, + ) + + +async def restore_branch_database_volume_from_snapshot_with_backend( + *, + source_branch_id: Identifier, + target_branch_id: Identifier, + snapshot_namespace: str, + snapshot_name: str, + snapshot_content_name: str | None, + database_size: int, +) -> None: + backend = get_storage_backend() + await backend.restore_branch_database_volume_from_snapshot( + source_identifier=source_branch_id, + target_identifier=target_branch_id, + snapshot_ref=SnapshotRef( + name=snapshot_name, + namespace=snapshot_namespace, + content_name=snapshot_content_name, + ), + database_size=database_size, ) - await kube_service.apply_storage_class(storage_class_manifest) - return storage_class_name def _load_chart_values(chart_root: Any) -> dict[str, Any]: @@ -521,7 +438,7 @@ async def create_vela_config( postgresql_resource = resources.files(__package__).joinpath("postgresql.conf") values_content = _load_chart_values(chart) - storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops) + storage_class_name = await ensure_branch_storage_class(branch_id, iops=parameters.iops or IOPS_MIN) values_content = _configure_vela_values( values_content, parameters=parameters, diff --git a/src/deployment/charts/vela/values.yaml b/src/deployment/charts/vela/values.yaml index 26518f65..768be390 100644 --- a/src/deployment/charts/vela/values.yaml +++ b/src/deployment/charts/vela/values.yaml @@ -61,7 +61,7 @@ autoscalerVm: serverName: "" persistence: create: true - storageClassName: "simplyblock-csi-sc" + storageClassName: "" size: 20Gi claimName: '' accessModes: @@ -72,7 +72,7 @@ pg_wal: mountPath: '/var/lib/postgresql/wal' persistence: create: true - storageClassName: "simplyblock-csi-sc" + storageClassName: "" size: 10Gi claimName: '' accessModes: @@ -354,7 +354,7 @@ storage: # memory: 128Mi persistence: enabled: true - storageClassName: "simplyblock-csi-sc" + storageClassName: "" annotations: {} size: 10Gi accessModes: diff --git a/src/deployment/deployment.py b/src/deployment/deployment.py index 5dbb146e..e540a61f 100644 --- a/src/deployment/deployment.py +++ b/src/deployment/deployment.py @@ -33,7 +33,7 @@ class DeploymentParameters(BaseModel): storage_size: Annotated[int | None, Field(**STORAGE_SIZE_CONSTRAINTS)] = None milli_vcpu: Annotated[int, Field(**CPU_CONSTRAINTS)] # units of milli vCPU memory_bytes: Annotated[int, Field(**MEMORY_CONSTRAINTS)] - iops: Annotated[int, Field(**IOPS_CONSTRAINTS)] + iops: Annotated[int | None, Field(default=None, **IOPS_CONSTRAINTS)] = None database_image_tag: Literal["15.1.0.147", "18.1-velaos"] enable_file_storage: bool = True diff --git a/src/deployment/kubernetes/__init__.py b/src/deployment/kubernetes/__init__.py index ca96b6c2..8f525840 100644 --- a/src/deployment/kubernetes/__init__.py +++ b/src/deployment/kubernetes/__init__.py @@ -357,6 +357,18 @@ async def get_persistent_volume(self, name: str) -> Any: raise VelaKubernetesError(f"PersistentVolume {name!r} not found") from exc raise + async def delete_persistent_volume(self, name: str) -> None: + async with core_v1_client() as core_v1: + try: + await core_v1.delete_persistent_volume(name=name) + logger.info("Deleted PersistentVolume %s", name) + except client.exceptions.ApiException as exc: + if exc.status == 404: + logger.info("PersistentVolume %s not found; skipping delete", name) + return + detail = exc.body or exc.reason or exc + raise VelaKubernetesError(f"Failed to delete PersistentVolume {name!r}: {detail!r}") from exc + async def resize_autoscaler_vm( self, namespace: str, diff --git a/src/deployment/settings.py b/src/deployment/settings.py index 8c846fc3..46c55460 100644 --- a/src/deployment/settings.py +++ b/src/deployment/settings.py @@ -1,5 +1,5 @@ from functools import lru_cache -from typing import Annotated +from typing import Annotated, Literal from pydantic import BaseModel, StringConstraints from pydantic_settings import BaseSettings, SettingsConfigDict @@ -40,6 +40,10 @@ class Settings(BaseSettings): grafana_security_admin_user: str = "admin" grafana_security_admin_password: str = "password" simplyblock_csi_namespace: str = "simplyblock" + storage_backend: Literal["simplyblock", "lvm"] = "simplyblock" + storage_default_class: str = "simplyblock-csi-sc" + storage_snapshot_class: str = "simplyblock-csi-snapshotclass" + storage_qos_policy: Literal["strict", "best_effort"] = "best_effort" @lru_cache diff --git a/src/deployment/simplyblock_api.py b/src/deployment/simplyblock_api.py index 1d847d56..57951cc9 100644 --- a/src/deployment/simplyblock_api.py +++ b/src/deployment/simplyblock_api.py @@ -1,5 +1,7 @@ from __future__ import annotations +import base64 +import json import logging from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any, Self @@ -8,7 +10,9 @@ import httpx from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, ValidationError -from ..exceptions import VelaSimplyblockAPIError +from ..exceptions import VelaDeploymentError, VelaKubernetesError, VelaSimplyblockAPIError +from .kubernetes import KubernetesService +from .settings import get_settings if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -16,6 +20,31 @@ logger = logging.getLogger(__name__) +_SIMPLYBLOCK_CSI_CONFIGMAP = "simplyblock-csi-cm" +_SIMPLYBLOCK_CSI_SECRET = "simplyblock-csi-secret" +_SIMPLYBLOCK_CSI_STORAGE_CLASS = "simplyblock-csi-sc" +_kube_service = KubernetesService() + + +async def load_simplyblock_credentials() -> tuple[str, UUID, str, str]: + simplyblock_namespace = get_settings().simplyblock_csi_namespace + try: + config_map = await _kube_service.get_config_map(simplyblock_namespace, _SIMPLYBLOCK_CSI_CONFIGMAP) + config = json.loads(config_map.data["config.json"]) + cluster_endpoint = config["simplybk"]["ip"].rstrip("/") + cluster_id = UUID(config["simplybk"]["uuid"]) + + encoded_secret = await _kube_service.get_secret(simplyblock_namespace, _SIMPLYBLOCK_CSI_SECRET) + secret = json.loads(base64.b64decode(encoded_secret.data["secret.json"]).decode()) + cluster_secret = secret["simplybk"]["secret"] + + storage_class = await _kube_service.get_storage_class(_SIMPLYBLOCK_CSI_STORAGE_CLASS) + pool_name = storage_class.parameters["pool_name"] + + return cluster_endpoint, cluster_id, cluster_secret, pool_name + except (KeyError, TypeError, ValueError, json.JSONDecodeError, VelaKubernetesError) as e: + raise VelaDeploymentError("Failed to load simplyblock credentials") from e + class SimplyblockVolume(BaseModel): model_config = ConfigDict(extra="ignore") @@ -147,8 +176,6 @@ async def list_snapshots(self) -> list[SnapshotInfo]: @asynccontextmanager async def create_simplyblock_api() -> AsyncIterator[SimplyblockPoolApi]: - from . import load_simplyblock_credentials - api = SimplyblockPoolApi(*(await load_simplyblock_credentials())) async with api: yield api diff --git a/src/deployment/storage_backends/__init__.py b/src/deployment/storage_backends/__init__.py new file mode 100644 index 00000000..92e2ce70 --- /dev/null +++ b/src/deployment/storage_backends/__init__.py @@ -0,0 +1,215 @@ +from typing import Literal +from uuid import UUID + +from ...exceptions import VelaDeploymentError +from ..settings import get_settings +from .base import ( + Identifier, + Snapshot, + SnapshotRef, + StorageBackend, + StorageBackendName, + StorageCapabilitiesPublic, + StorageQosPolicy, + Volume, + VolumeCapabilities, + VolumeGroup, + VolumeQosProfile, + VolumeSpec, + VolumeUsage, +) +from .lvm import LvmBackend +from .simplyblock import SimplyblockBackend + + +class PlaceholderBackend(StorageBackend): + def __init__(self, name: StorageBackendName): + self.name = name + + def get_capabilities(self) -> StorageCapabilitiesPublic: + settings = get_settings() + warning = ( + f"Storage backend '{self.name}' abstraction is not implemented yet; reporting conservative capabilities." + ) + return StorageCapabilitiesPublic( + backend=self.name, + capabilities=VolumeCapabilities( + supports_dynamic_provisioning=False, + supports_storage_class_per_branch=False, + supports_storage_class_shared=False, + supports_topology_awareness=False, + supports_encrypted_volumes=False, + supports_vm_live_migration=False, + supports_usage_qos_metrics=False, + supports_qos_read_write_split=False, + supports_usage_storage_metrics=False, + supports_file_storage_volume=False, + supports_pitr_wal_volume=False, + supports_snapshot_content_rebind=False, + supports_clone_without_snapshot=False, + supports_fast_clone=False, + supports_backup_snapshot_labels=False, + supports_restore_size_discovery=False, + supports_volume_groups=False, + supports_consistency_group_snapshots=False, + supports_volume_group_iops=False, + supports_volume_group_iops_update=False, + supports_volume_group_throughput=False, + supports_volume_group_throughput_update=False, + supports_volume_group_usage_qos_metrics=False, + supports_volume_group_usage_storage_metrics=False, + supports_volume_iops=False, + supports_volume_iops_update=False, + supports_volume_throughput=False, + supports_volume_throughput_update=False, + supports_volume_usage_qos_metrics=False, + supports_volume_usage_storage_metrics=False, + supports_volume_clone_cross_namespace=False, + supports_volume_expansion=False, + supports_volume_expansion_online=False, + supports_volume_relocation=False, + ), + storage_class=settings.storage_default_class, + snapshot_class=settings.storage_snapshot_class, + qos_policy=settings.storage_qos_policy, + warnings=[warning], + ) + + def resolve_storage_class(self) -> str: + return get_settings().storage_default_class + + def resolve_snapshot_class(self) -> str: + return get_settings().storage_snapshot_class + + async def provision_volume( + self, + name: str, + size_bytes: int, + qos: VolumeQosProfile | None = None, + ) -> Volume: + _ = (name, size_bytes, qos) + self.validate_capabilities_for_operation("volume_provision") + raise self._unsupported("volume provisioning") + + async def provision_volume_group( + self, + group_id: Identifier, + group_name: str, + qos: VolumeQosProfile | None = None, + ) -> VolumeGroup: + _ = (group_id, group_name, qos) + self.validate_capabilities_for_operation("volume_group_provision") + raise self._unsupported("volume group provisioning") + + async def lookup_volume(self, volume_id: Identifier) -> Volume | None: + _ = volume_id + self.validate_capabilities_for_operation("volume_lookup") + raise self._unsupported("volume lookup") + + async def lookup_volume_group(self, group_id: Identifier) -> VolumeGroup | None: + _ = group_id + self.validate_capabilities_for_operation("volume_group_lookup") + raise self._unsupported("volume group lookup") + + async def lookup_snapshot(self, snapshot_ref: SnapshotRef) -> Snapshot | None: + _ = snapshot_ref + self.validate_capabilities_for_operation("snapshot_lookup") + raise self._unsupported("snapshot lookup") + + async def get_branch_volume_usage( + self, + identifier: Identifier, + *, + volume_type: Literal["database", "storage", "wal"] = "database", + ) -> VolumeUsage | None: + _ = (identifier, volume_type) + self.validate_capabilities_for_operation("volume_usage_storage_metrics") + return None + + async def get_snapshot_used_size(self, snapshot_ids: list[UUID]) -> int | None: + _ = snapshot_ids + self.validate_capabilities_for_operation("usage_storage_metrics") + return None + + async def clone_branch_database_volume( + self, + *, + source_identifier: Identifier, + target_identifier: Identifier, + database_size: int, + pitr_enabled: bool = False, + ) -> None: + _ = (source_identifier, target_identifier, database_size, pitr_enabled) + raise self._unsupported("branch database volume clone") + + async def restore_branch_database_volume_from_snapshot( + self, + *, + source_identifier: Identifier, + target_identifier: Identifier, + snapshot_ref: SnapshotRef, + database_size: int, + ) -> None: + _ = (source_identifier, target_identifier, snapshot_ref, database_size) + raise self._unsupported("branch database volume restore") + + def validate_qos_profile(self, qos: VolumeQosProfile) -> None: + _ = qos + return None + + def validate_capabilities_for_operation(self, operation: str, params: dict[str, object] | None = None) -> None: + _ = params + capabilities = self.get_capabilities().capabilities + checks = { + "volume_provision": capabilities.supports_dynamic_provisioning, + "volume_group_provision": capabilities.supports_volume_groups, + "volume_lookup": capabilities.supports_dynamic_provisioning, + "volume_group_lookup": capabilities.supports_volume_groups, + "volume_expansion": capabilities.supports_volume_expansion, + "volume_relocation": capabilities.supports_volume_relocation, + "volume_iops_update": capabilities.supports_volume_iops_update, + "volume_group_iops_update": capabilities.supports_volume_group_iops_update, + "volume_group_throughput_update": capabilities.supports_volume_group_throughput_update, + "volume_usage_storage_metrics": capabilities.supports_volume_usage_storage_metrics, + "usage_storage_metrics": capabilities.supports_usage_storage_metrics, + } + supported = checks.get(operation) + if supported is False: + raise VelaDeploymentError( + f"Operation {operation!r} is not supported by backend {self.name!r} " + "(capabilities endpoint reports it as unavailable)" + ) + + def _unsupported(self, capability: str) -> VelaDeploymentError: + return VelaDeploymentError( + f"Storage backend '{self.name}' does not support {capability}; " + "select a backend with the required capability or adjust storage settings." + ) + + +def get_storage_backend() -> StorageBackend: + settings = get_settings() + backend = settings.storage_backend + if backend == "simplyblock": + return SimplyblockBackend(settings) + if backend == "lvm": + return LvmBackend(settings) + raise VelaDeploymentError(f"Unsupported storage backend {backend!r}. Supported backends are: simplyblock, lvm.") + + +__all__ = [ + "Identifier", + "Snapshot", + "SnapshotRef", + "StorageBackend", + "StorageBackendName", + "StorageCapabilitiesPublic", + "StorageQosPolicy", + "Volume", + "VolumeCapabilities", + "VolumeGroup", + "VolumeQosProfile", + "VolumeSpec", + "VolumeUsage", + "get_storage_backend", +] diff --git a/src/deployment/storage_backends/base.py b/src/deployment/storage_backends/base.py new file mode 100644 index 00000000..9b3016ec --- /dev/null +++ b/src/deployment/storage_backends/base.py @@ -0,0 +1,268 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Literal +from uuid import UUID + +from pydantic import BaseModel, Field +from ulid import ULID + +StorageBackendName = Literal["simplyblock", "lvm"] +StorageQosPolicy = Literal["strict", "best_effort"] +Identifier = ULID + + +class VolumeCapabilities(BaseModel): + # Backend supports StorageClass-based dynamic provisioning. + supports_dynamic_provisioning: bool + # Backend supports per-branch StorageClass strategy. + supports_storage_class_per_branch: bool + # Backend supports shared StorageClass strategy across branches. + supports_storage_class_shared: bool + # Backend supports topology/placement-aware provisioning. + supports_topology_awareness: bool + # Backend supports encrypted volumes natively. + supports_encrypted_volumes: bool + # Backend supports live migration characteristics without storage relocation interruption. + supports_vm_live_migration: bool + # Backend can report system-level observed IOPS and throughput usage metrics. + supports_usage_qos_metrics: bool + # Backend supports distinct read/write QoS limits. + supports_qos_read_write_split: bool + # Backend can report system-level observed used-bytes/storage usage metrics. + supports_usage_storage_metrics: bool + # Backend supports provisioning the optional file storage volume. + supports_file_storage_volume: bool + # Backend supports dedicated WAL volume for PITR paths. + supports_pitr_wal_volume: bool + # Backend supports snapshot content rebind/import workflows. + supports_snapshot_content_rebind: bool + # Backend supports direct clone without explicit snapshot object. + supports_clone_without_snapshot: bool + # Backend supports fast/native clone semantics (e.g., COW metadata clone). + supports_fast_clone: bool + # Backend supports custom labels/metadata on backup snapshots. + supports_backup_snapshot_labels: bool + # Backend can discover required restore size from snapshot metadata. + supports_restore_size_discovery: bool + + # Backend exposes volume groups as first-class entities. + supports_volume_groups: bool + # Backend supports consistency-group snapshots across volumes in the volume group. + supports_consistency_group_snapshots: bool + # Backend supports IOPS controls per-volume-group and provision time. + supports_volume_group_iops: bool + # Backend supports changing per-volume-group IOPS after creation. + supports_volume_group_iops_update: bool + # Backend supports throughput controls per-volume-group and provision time. + supports_volume_group_throughput: bool + # Backend supports changing per-volume-group throughput after creation. + supports_volume_group_throughput_update: bool + # Backend can report observed per-volume-group IOPS and throughput usage metrics. + supports_volume_group_usage_qos_metrics: bool + # Backend can report observed per-volume-group used-bytes/storage usage metrics. + supports_volume_group_usage_storage_metrics: bool + + # Backend supports setting per-volume IOPS at provision time. + supports_volume_iops: bool + # Backend supports changing per-volume IOPS after creation. + supports_volume_iops_update: bool + # Backend supports setting throughput limits at provision time. + supports_volume_throughput: bool + # Backend supports changing throughput limits after creation. + supports_volume_throughput_update: bool + # Backend can report observed per-volume IOPS and throughput usage metrics. + supports_volume_usage_qos_metrics: bool + # Backend can report observed per-volume used-bytes/storage usage metrics. + supports_volume_usage_storage_metrics: bool + # Backend can clone/restore volumes across Kubernetes namespaces. + supports_volume_clone_cross_namespace: bool + # Backend supports increasing volume size. + supports_volume_expansion: bool + # Backend supports online resize while workload is active. + supports_volume_expansion_online: bool + # Backend supports relocating volumes between nodes. + supports_volume_relocation: bool + + +class StorageCapabilitiesPublic(BaseModel): + backend: StorageBackendName + capabilities: VolumeCapabilities + storage_class: str + snapshot_class: str + qos_policy: StorageQosPolicy + warnings: list[str] = Field(default_factory=list) + + +class VolumeQosProfile(BaseModel): + max_read_iops: int | None = None + max_write_iops: int | None = None + max_read_write_iops: int | None = None + max_read_mibps: int | None = None + max_write_mibps: int | None = None + max_read_write_mibps: int | None = None + + +class SnapshotDetails(BaseModel): + name: str + namespace: str + content_name: str | None = None + size_bytes: int | None = None + + +class SnapshotRef(BaseModel): + name: str + namespace: str + content_name: str | None = None + + +class VolumeSpec(BaseModel): + size_bytes: int + + +class VolumeUsage(BaseModel): + used_bytes: int | None = None + read_iops: int | None = None + write_iops: int | None = None + read_mibps: float | None = None + write_mibps: float | None = None + + +@dataclass +class Volume(ABC): + namespace: str + pvc_name: str + storage_class: str + size_bytes: int + + @abstractmethod + async def resize(self, new_size_bytes: int) -> None: ... + + @abstractmethod + async def delete(self) -> None: ... + + @abstractmethod + async def snapshot(self, label: str, backup_id: Identifier) -> "Snapshot": ... + + @abstractmethod + async def update_performance(self, qos: "VolumeQosProfile") -> None: ... + + @abstractmethod + async def usage(self) -> "VolumeUsage | None": ... + + @abstractmethod + async def relocate(self, target_node: str | None = None) -> None: ... + + +@dataclass +class VolumeGroup(ABC): + name: str + + @abstractmethod + async def delete(self) -> None: ... + + @abstractmethod + async def update_performance(self, qos: "VolumeQosProfile") -> None: ... + + @abstractmethod + async def volumes(self) -> list[Volume]: ... + + @abstractmethod + async def snapshot(self, label: str, backup_id: Identifier) -> "Snapshot": ... + + @abstractmethod + async def provision_volume( + self, + name: str, + size_bytes: int, + qos: VolumeQosProfile | None = None, + ) -> Volume: ... + + +@dataclass +class Snapshot(ABC): + details: SnapshotDetails + snapshot_ref: SnapshotRef + + @abstractmethod + async def delete(self) -> None: ... + + @abstractmethod + async def restore_to(self, target_volume: Volume, database_size: int) -> None: ... + + @abstractmethod + async def clone_to(self, target_volume: Volume, new_size: int) -> None: ... + + +class StorageBackend(ABC): + name: StorageBackendName + + @abstractmethod + def resolve_storage_class(self) -> str: ... + + @abstractmethod + def resolve_snapshot_class(self) -> str: ... + + @abstractmethod + async def provision_volume( + self, + name: str, + size_bytes: int, + qos: VolumeQosProfile | None = None, + ) -> Volume: ... + + @abstractmethod + async def provision_volume_group( + self, + group_id: Identifier, + group_name: str, + qos: VolumeQosProfile | None = None, + ) -> VolumeGroup: ... + + @abstractmethod + async def lookup_volume(self, volume_id: Identifier) -> Volume | None: ... + + @abstractmethod + async def lookup_volume_group(self, group_id: Identifier) -> VolumeGroup | None: ... + + @abstractmethod + async def lookup_snapshot(self, snapshot_ref: SnapshotRef) -> Snapshot | None: ... + + @abstractmethod + async def get_branch_volume_usage( + self, + identifier: Identifier, + *, + volume_type: Literal["database", "storage", "wal"] = "database", + ) -> VolumeUsage | None: ... + + @abstractmethod + async def get_snapshot_used_size(self, snapshot_ids: list[UUID]) -> int | None: ... + + @abstractmethod + async def clone_branch_database_volume( + self, + *, + source_identifier: Identifier, + target_identifier: Identifier, + database_size: int, + pitr_enabled: bool = False, + ) -> None: ... + + @abstractmethod + async def restore_branch_database_volume_from_snapshot( + self, + *, + source_identifier: Identifier, + target_identifier: Identifier, + snapshot_ref: SnapshotRef, + database_size: int, + ) -> None: ... + + @abstractmethod + def validate_qos_profile(self, qos: VolumeQosProfile) -> None: ... + + @abstractmethod + def validate_capabilities_for_operation(self, operation: str, params: dict[str, Any] | None = None) -> None: ... + + @abstractmethod + def get_capabilities(self) -> StorageCapabilitiesPublic: ... diff --git a/src/deployment/storage_backends/lvm.py b/src/deployment/storage_backends/lvm.py new file mode 100644 index 00000000..57256f90 --- /dev/null +++ b/src/deployment/storage_backends/lvm.py @@ -0,0 +1,487 @@ +from __future__ import annotations + +import os +import re +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import TYPE_CHECKING, Literal + +from ulid import ULID + +from ..._util import quantity_to_bytes +from ...exceptions import VelaDeploymentError, VelaKubernetesError +from .. import AUTOSCALER_PVC_SUFFIX, get_autoscaler_vm_identity, kube_service +from ..kubernetes.pvc import delete_pvc, wait_for_pvc_absent +from ..kubernetes.snapshot import create_snapshot_from_pvc, delete_snapshot, read_snapshot, wait_snapshot_ready +from ..kubernetes.volume_clone import clone_branch_database_volume, restore_branch_database_volume_from_snapshot +from .base import ( + Identifier, + Snapshot, + SnapshotDetails, + SnapshotRef, + StorageBackend, + StorageCapabilitiesPublic, + Volume, + VolumeCapabilities, + VolumeGroup, + VolumeQosProfile, + VolumeUsage, +) + +if TYPE_CHECKING: + from uuid import UUID + + from ..settings import Settings + +_SNAPSHOT_TIMEOUT_SECONDS = float(os.environ.get("SNAPSHOT_TIMEOUT_SEC", "120")) +_SNAPSHOT_POLL_INTERVAL_SECONDS = float(os.environ.get("SNAPSHOT_POLL_INTERVAL_SEC", "5")) +_PVC_TIMEOUT_SECONDS = float(600) +_PVC_POLL_INTERVAL_SECONDS = float(2) +_K8S_NAME_MAX_LENGTH = 63 +_DEFAULT_TOPO_LVM_STORAGE_CLASS = "topolvm-provisioner" +_DEFAULT_TOPO_LVM_SNAPSHOT_CLASS = "topolvm-snapshotclass" +_TOPO_LVM_PROVISIONER_MARKERS = ("topolvm",) + +_CAPABILITIES = VolumeCapabilities( + supports_dynamic_provisioning=True, + supports_storage_class_per_branch=False, + supports_storage_class_shared=True, + supports_topology_awareness=True, + supports_encrypted_volumes=False, + supports_vm_live_migration=False, + supports_usage_qos_metrics=False, + supports_qos_read_write_split=False, + supports_usage_storage_metrics=False, + supports_file_storage_volume=True, + supports_pitr_wal_volume=True, + supports_snapshot_content_rebind=True, + supports_clone_without_snapshot=False, + supports_fast_clone=False, + supports_backup_snapshot_labels=True, + supports_restore_size_discovery=True, + supports_volume_groups=True, + supports_consistency_group_snapshots=False, + supports_volume_group_iops=False, + supports_volume_group_iops_update=False, + supports_volume_group_throughput=False, + supports_volume_group_throughput_update=False, + supports_volume_group_usage_qos_metrics=False, + supports_volume_group_usage_storage_metrics=False, + supports_volume_iops=False, + supports_volume_iops_update=False, + supports_volume_throughput=False, + supports_volume_throughput_update=False, + supports_volume_usage_qos_metrics=False, + supports_volume_usage_storage_metrics=False, + supports_volume_clone_cross_namespace=True, + supports_volume_expansion=True, + supports_volume_expansion_online=True, + supports_volume_relocation=False, +) + + +@dataclass +class LvmVolume(Volume): + identifier: Identifier + _backend: LvmBackend + + async def resize(self, new_size_bytes: int) -> None: + # Resize execution is handled by PVC workflows outside this backend currently. + self.size_bytes = new_size_bytes + + async def delete(self) -> None: + await self._backend._delete_volume(self.identifier) + + async def snapshot(self, label: str, backup_id: Identifier) -> LvmSnapshot: + details = await self._backend._snapshot_volume(self.namespace, self.pvc_name, label=label, backup_id=backup_id) + snapshot_ref = SnapshotRef(name=details.name, namespace=details.namespace, content_name=details.content_name) + return LvmSnapshot( + details=details, + snapshot_ref=snapshot_ref, + source_identifier=self.identifier, + _backend=self._backend, + ) + + async def update_performance(self, qos: VolumeQosProfile) -> None: + self._backend.validate_qos_profile(qos) + + async def usage(self) -> VolumeUsage | None: + return None + + async def relocate(self, target_node: str | None = None) -> None: + _ = target_node + raise VelaDeploymentError("lvm backend does not support volume relocation") + + +@dataclass +class LvmVolumeGroup(VolumeGroup): + identifier: Identifier + _backend: LvmBackend + + async def delete(self) -> None: + volumes = await self.volumes() + for volume in volumes: + await volume.delete() + + async def update_performance(self, qos: VolumeQosProfile) -> None: + self._backend.validate_qos_profile(qos) + + async def volumes(self) -> list[Volume]: + volume = await self._backend.lookup_volume(self.identifier) + return [] if volume is None else [volume] + + async def snapshot(self, label: str, backup_id: Identifier) -> Snapshot: + volume = await self._backend.lookup_volume(self.identifier) + if volume is None: + raise VelaDeploymentError(f"No database volume found for identifier={self.identifier}") + return await volume.snapshot(label, backup_id) + + async def provision_volume( + self, + name: str, + size_bytes: int, + qos: VolumeQosProfile | None = None, + ) -> Volume: + return await self._backend.provision_volume(name=name, size_bytes=size_bytes, qos=qos) + + +@dataclass +class LvmSnapshot(Snapshot): + source_identifier: Identifier | None + _backend: LvmBackend + + async def delete(self) -> None: + await delete_snapshot(self.details.namespace, self.details.name) + + async def restore_to(self, target_volume: Volume, database_size: int) -> None: + if not isinstance(target_volume, LvmVolume): + raise VelaDeploymentError("Snapshot restore requires an LvmVolume target") + if self.source_identifier is None: + raise VelaDeploymentError("Cannot restore snapshot without source identifier context") + await self._backend._restore_volume_from_snapshot( + source_identifier=self.source_identifier, + target_identifier=target_volume.identifier, + snapshot_ref=self.snapshot_ref, + database_size=database_size, + ) + + async def clone_to(self, target_volume: Volume, new_size: int) -> None: + if not isinstance(target_volume, LvmVolume): + raise VelaDeploymentError("Snapshot clone requires an LvmVolume target") + if self.source_identifier is None: + raise VelaDeploymentError("Cannot clone snapshot without source identifier context") + await self._backend._clone_volume_from_snapshot( + source_identifier=self.source_identifier, + target_identifier=target_volume.identifier, + new_size=new_size, + ) + + +class LvmBackend(StorageBackend): + name = "lvm" + + def __init__(self, settings: Settings): + self.settings = settings + self._storage_class_validated = False + + def get_capabilities(self) -> StorageCapabilitiesPublic: + warnings: list[str] = [] + if self.settings.storage_default_class == "simplyblock-csi-sc": + warnings.append( + "Using TopoLVM default StorageClass fallback 'topolvm-provisioner' because " + "vela_storage_default_class is still set to simplyblock default." + ) + if self.settings.storage_snapshot_class == "simplyblock-csi-snapshotclass": + warnings.append( + "Using TopoLVM default VolumeSnapshotClass fallback 'topolvm-snapshotclass' because " + "vela_storage_snapshot_class is still set to simplyblock default." + ) + warnings.append("LVM backend currently reports no runtime QoS controls and no usage telemetry.") + return StorageCapabilitiesPublic( + backend="lvm", + capabilities=_CAPABILITIES, + storage_class=self._effective_storage_class(), + snapshot_class=self._effective_snapshot_class(), + qos_policy=self.settings.storage_qos_policy, + warnings=warnings, + ) + + def resolve_storage_class(self) -> str: + return self._effective_storage_class() + + def resolve_snapshot_class(self) -> str: + return self._effective_snapshot_class() + + async def provision_volume( + self, + name: str, + size_bytes: int, + qos: VolumeQosProfile | None = None, + ) -> Volume: + _ = name + await self._ensure_topolvm_storage_class() + self.validate_qos_profile(qos or VolumeQosProfile()) + identifier = ULID() + namespace, autoscaler_vm_name = get_autoscaler_vm_identity(identifier) + pvc_name = f"{autoscaler_vm_name}{AUTOSCALER_PVC_SUFFIX}" + return LvmVolume( + namespace=namespace, + pvc_name=pvc_name, + storage_class=self.resolve_storage_class(), + size_bytes=size_bytes, + identifier=identifier, + _backend=self, + ) + + async def provision_volume_group( + self, + group_id: Identifier, + group_name: str, + qos: VolumeQosProfile | None = None, + ) -> VolumeGroup: + await self._ensure_topolvm_storage_class() + self.validate_qos_profile(qos or VolumeQosProfile()) + return LvmVolumeGroup(name=group_name, identifier=group_id, _backend=self) + + async def lookup_volume(self, volume_id: Identifier) -> Volume | None: + await self._ensure_topolvm_storage_class() + namespace, autoscaler_vm_name = get_autoscaler_vm_identity(volume_id) + pvc_name = f"{autoscaler_vm_name}{AUTOSCALER_PVC_SUFFIX}" + try: + pvc = await kube_service.get_persistent_volume_claim(namespace, pvc_name) + except VelaKubernetesError as exc: + if self._is_not_found_error(exc): + return None + raise + + requests = getattr(getattr(getattr(pvc, "spec", None), "resources", None), "requests", None) or {} + size_bytes = quantity_to_bytes(requests.get("storage")) or 0 + return LvmVolume( + namespace=namespace, + pvc_name=pvc_name, + storage_class=self.resolve_storage_class(), + size_bytes=size_bytes, + identifier=volume_id, + _backend=self, + ) + + async def lookup_volume_group(self, group_id: Identifier) -> VolumeGroup | None: + await self._ensure_topolvm_storage_class() + return LvmVolumeGroup(name=str(group_id), identifier=group_id, _backend=self) + + async def lookup_snapshot(self, snapshot_ref: SnapshotRef) -> Snapshot | None: + snapshot = await read_snapshot(snapshot_ref.namespace, snapshot_ref.name) + if snapshot is None: + return None + status = snapshot.get("status") or {} + content_name_payload = status.get("boundVolumeSnapshotContentName") + details = SnapshotDetails( + name=snapshot_ref.name, + namespace=snapshot_ref.namespace, + content_name=content_name_payload if isinstance(content_name_payload, str) else snapshot_ref.content_name, + size_bytes=quantity_to_bytes(status.get("restoreSize")), + ) + resolved_ref = SnapshotRef(name=details.name, namespace=details.namespace, content_name=details.content_name) + return LvmSnapshot(details=details, snapshot_ref=resolved_ref, source_identifier=None, _backend=self) + + async def get_branch_volume_usage( + self, + identifier: Identifier, + *, + volume_type: Literal["database", "storage", "wal"] = "database", + ) -> VolumeUsage | None: + _ = (identifier, volume_type) + return None + + async def clone_branch_database_volume( + self, + *, + source_identifier: Identifier, + target_identifier: Identifier, + database_size: int, + pitr_enabled: bool = False, + ) -> None: + await clone_branch_database_volume( + source_branch_id=source_identifier, + target_branch_id=target_identifier, + snapshot_class=self.resolve_snapshot_class(), + storage_class_name=self.resolve_storage_class(), + snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS, + snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS, + pvc_timeout_seconds=_PVC_TIMEOUT_SECONDS, + pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS, + database_size=database_size, + pitr_enabled=pitr_enabled, + ) + + async def restore_branch_database_volume_from_snapshot( + self, + *, + source_identifier: Identifier, + target_identifier: Identifier, + snapshot_ref: SnapshotRef, + database_size: int, + ) -> None: + await restore_branch_database_volume_from_snapshot( + source_branch_id=source_identifier, + target_branch_id=target_identifier, + snapshot_namespace=snapshot_ref.namespace, + snapshot_name=snapshot_ref.name, + snapshot_content_name=snapshot_ref.content_name, + snapshot_class=self.resolve_snapshot_class(), + storage_class_name=self.resolve_storage_class(), + database_size=database_size, + snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS, + snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS, + pvc_timeout_seconds=_PVC_TIMEOUT_SECONDS, + pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS, + ) + + async def get_snapshot_used_size(self, snapshot_ids: list[UUID]) -> int | None: + _ = snapshot_ids + return None + + def validate_qos_profile(self, qos: VolumeQosProfile) -> None: + requested_fields = [ + field_name + for field_name in ( + "max_read_iops", + "max_write_iops", + "max_read_write_iops", + "max_read_mibps", + "max_write_mibps", + "max_read_write_mibps", + ) + if getattr(qos, field_name) is not None + ] + if requested_fields and self.settings.storage_qos_policy == "strict": + raise VelaDeploymentError( + f"lvm backend does not support QoS controls; unsupported fields: {', '.join(requested_fields)}" + ) + + def validate_capabilities_for_operation(self, operation: str, params: dict[str, object] | None = None) -> None: + _ = params + capabilities = self.get_capabilities().capabilities + checks = { + "volume_expansion": capabilities.supports_volume_expansion, + "runtime_iops_update": capabilities.supports_volume_iops_update, + "volume_group_provision": capabilities.supports_volume_groups, + "volume_relocation": capabilities.supports_volume_relocation, + } + supported = checks.get(operation) + if supported is False: + raise VelaDeploymentError(f"Operation {operation!r} is not supported by backend {self.name!r}") + + async def _delete_volume(self, identifier: Identifier) -> None: + namespace, autoscaler_vm_name = get_autoscaler_vm_identity(identifier) + pvc_name = f"{autoscaler_vm_name}{AUTOSCALER_PVC_SUFFIX}" + pv_name: str | None + try: + pvc = await kube_service.get_persistent_volume_claim(namespace, pvc_name) + pvc_spec = getattr(pvc, "spec", None) + pv_name = getattr(pvc_spec, "volume_name", None) if pvc_spec else None + except VelaKubernetesError as exc: + if self._is_not_found_error(exc): + return + raise + + await delete_pvc(namespace, pvc_name) + await wait_for_pvc_absent( + namespace, + pvc_name, + timeout=_PVC_TIMEOUT_SECONDS, + poll_interval=_PVC_POLL_INTERVAL_SECONDS, + ) + if pv_name: + await kube_service.delete_persistent_volume(pv_name) + + async def _snapshot_volume( + self, + namespace: str, + pvc_name: str, + label: str | None = None, + backup_id: Identifier | None = None, + ) -> SnapshotDetails: + snapshot_name = self._build_snapshot_name(label=label or "backup", backup_id=backup_id or ULID()) + await create_snapshot_from_pvc( + namespace=namespace, + name=snapshot_name, + snapshot_class=self.resolve_snapshot_class(), + pvc_name=pvc_name, + ) + snapshot = await wait_snapshot_ready( + namespace, + snapshot_name, + timeout=_SNAPSHOT_TIMEOUT_SECONDS, + poll_interval=_SNAPSHOT_POLL_INTERVAL_SECONDS, + ) + status = snapshot.get("status") or {} + content_name_payload = status.get("boundVolumeSnapshotContentName") + content_name = content_name_payload if isinstance(content_name_payload, str) else None + return SnapshotDetails( + name=snapshot_name, + namespace=namespace, + content_name=content_name, + size_bytes=quantity_to_bytes(status.get("restoreSize")), + ) + + async def _clone_volume_from_snapshot( + self, + source_identifier: Identifier, + target_identifier: Identifier, + new_size: int, + ) -> None: + await self.clone_branch_database_volume( + source_identifier=source_identifier, + target_identifier=target_identifier, + database_size=new_size, + pitr_enabled=False, + ) + + async def _restore_volume_from_snapshot( + self, + source_identifier: Identifier, + target_identifier: Identifier, + snapshot_ref: SnapshotRef, + database_size: int, + ) -> None: + await self.restore_branch_database_volume_from_snapshot( + source_identifier=source_identifier, + target_identifier=target_identifier, + snapshot_ref=snapshot_ref, + database_size=database_size, + ) + + def _build_snapshot_name(self, *, label: str, backup_id: ULID) -> str: + clean_label = re.sub(r"[^a-z0-9-]", "-", label.lower()) + clean_label = re.sub(r"-+", "-", clean_label).strip("-") or "backup" + clean_backup = re.sub(r"[^a-z0-9-]", "-", str(backup_id).lower()) + clean_backup = re.sub(r"-+", "-", clean_backup).strip("-") or datetime.now(UTC).strftime("%Y%m%d%H%M%S") + return f"{clean_label}-{clean_backup}"[:_K8S_NAME_MAX_LENGTH].strip("-") + + def _is_not_found_error(self, exc: VelaKubernetesError) -> bool: + return "not found" in str(exc).lower() + + def _effective_storage_class(self) -> str: + if self.settings.storage_default_class == "simplyblock-csi-sc": + return _DEFAULT_TOPO_LVM_STORAGE_CLASS + return self.settings.storage_default_class + + def _effective_snapshot_class(self) -> str: + if self.settings.storage_snapshot_class == "simplyblock-csi-snapshotclass": + return _DEFAULT_TOPO_LVM_SNAPSHOT_CLASS + return self.settings.storage_snapshot_class + + async def _ensure_topolvm_storage_class(self) -> None: + if self._storage_class_validated: + return + storage_class_name = self._effective_storage_class() + storage_class = await kube_service.get_storage_class(storage_class_name) + provisioner = str(getattr(storage_class, "provisioner", "") or "").lower() + if not any(marker in provisioner for marker in _TOPO_LVM_PROVISIONER_MARKERS): + raise VelaDeploymentError( + f"LVM backend expects a TopoLVM StorageClass. " + f"Configured class {storage_class_name!r} uses provisioner {provisioner!r}. " + "Set vela_storage_default_class to a TopoLVM class (for example 'topolvm-provisioner')." + ) + self._storage_class_validated = True diff --git a/src/deployment/storage_backends/simplyblock.py b/src/deployment/storage_backends/simplyblock.py new file mode 100644 index 00000000..bd51a8cf --- /dev/null +++ b/src/deployment/storage_backends/simplyblock.py @@ -0,0 +1,691 @@ +from __future__ import annotations + +import os +import re +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Any, Literal +from uuid import UUID + +from ulid import ULID + +from ..._util import IOPS_MIN, quantity_to_bytes +from ...exceptions import VelaDeploymentError, VelaKubernetesError, VelaSimplyblockAPIError +from .. import ( + AUTOSCALER_PVC_SUFFIX, + AUTOSCALER_WAL_PVC_SUFFIX, + STORAGE_PVC_SUFFIX, + get_autoscaler_vm_identity, + kube_service, +) +from ..kubernetes.pvc import delete_pvc, wait_for_pvc_absent +from ..kubernetes.snapshot import create_snapshot_from_pvc, delete_snapshot, read_snapshot, wait_snapshot_ready +from ..kubernetes.volume_clone import clone_branch_database_volume, restore_branch_database_volume_from_snapshot +from ..settings import Settings, get_settings +from ..simplyblock_api import create_simplyblock_api +from .base import ( + Identifier, + Snapshot, + SnapshotDetails, + SnapshotRef, + StorageBackend, + StorageCapabilitiesPublic, + Volume, + VolumeCapabilities, + VolumeGroup, + VolumeQosProfile, + VolumeUsage, +) + +_SNAPSHOT_TIMEOUT_SECONDS = float(os.environ.get("SNAPSHOT_TIMEOUT_SEC", "120")) +_SNAPSHOT_POLL_INTERVAL_SECONDS = float(os.environ.get("SNAPSHOT_POLL_INTERVAL_SEC", "5")) +_PVC_TIMEOUT_SECONDS = float(600) +_PVC_POLL_INTERVAL_SECONDS = float(2) +_K8S_NAME_MAX_LENGTH = 63 +_SIMPLYBLOCK_CSI_STORAGE_CLASS = "simplyblock-csi-sc" + +_CAPABILITIES = VolumeCapabilities( + supports_dynamic_provisioning=True, + supports_storage_class_per_branch=True, + supports_storage_class_shared=True, + supports_topology_awareness=False, # possible but not yet implemented + supports_encrypted_volumes=False, # possible but not yet implemented + supports_vm_live_migration=True, + supports_usage_qos_metrics=True, + supports_qos_read_write_split=False, # possible but not yet implemented + supports_usage_storage_metrics=True, + supports_file_storage_volume=True, + supports_pitr_wal_volume=True, + supports_snapshot_content_rebind=True, + supports_clone_without_snapshot=True, + supports_fast_clone=True, + supports_backup_snapshot_labels=True, + supports_restore_size_discovery=True, + supports_volume_groups=True, + supports_consistency_group_snapshots=False, # possible but not yet implemented + supports_volume_group_iops=False, # possible but not yet implemented + supports_volume_group_iops_update=False, # possible but not yet implemented + supports_volume_group_throughput=False, # possible but not yet implemented + supports_volume_group_throughput_update=False, # possible but not yet implemented + supports_volume_group_usage_qos_metrics=False, # possible but not yet implemented + supports_volume_group_usage_storage_metrics=False, # possible but not yet implemented + supports_volume_iops=True, + supports_volume_iops_update=True, + supports_volume_throughput=False, # possible but not yet implemented + supports_volume_throughput_update=False, # possible but not yet implemented + supports_volume_usage_qos_metrics=True, + supports_volume_usage_storage_metrics=True, + supports_volume_clone_cross_namespace=True, + supports_volume_expansion=True, + supports_volume_expansion_online=True, + supports_volume_relocation=False, # not required +) + + +def _build_storage_class_manifest(*, storage_class_name: str, iops: int, base_storage_class: Any) -> dict[str, Any]: + provisioner = getattr(base_storage_class, "provisioner", None) + if not provisioner: + raise VelaKubernetesError("Base storage class missing provisioner") + + base_parameters = dict(getattr(base_storage_class, "parameters", {}) or {}) + cluster_id = base_parameters.get("cluster_id") + if not cluster_id: + raise VelaKubernetesError("Base storage class missing required parameter 'cluster_id'") + + parameters = {key: str(value) for key, value in base_parameters.items()} + parameters.update( + { + "qos_rw_iops": str(iops), + "qos_rw_mbytes": "0", + "qos_r_mbytes": "0", + "qos_w_mbytes": "0", + } + ) + + allow_volume_expansion = getattr(base_storage_class, "allow_volume_expansion", None) + volume_binding_mode = getattr(base_storage_class, "volume_binding_mode", None) + reclaim_policy = getattr(base_storage_class, "reclaim_policy", None) + mount_options = getattr(base_storage_class, "mount_options", None) + + manifest: dict[str, Any] = { + "apiVersion": "storage.k8s.io/v1", + "kind": "StorageClass", + "metadata": { + "name": storage_class_name, + }, + "provisioner": provisioner, + "parameters": parameters, + } + if reclaim_policy is not None: + manifest["reclaimPolicy"] = reclaim_policy + if volume_binding_mode is not None: + manifest["volumeBindingMode"] = volume_binding_mode + if allow_volume_expansion is not None: + manifest["allowVolumeExpansion"] = bool(allow_volume_expansion) + if mount_options: + manifest["mountOptions"] = list(mount_options) + + return manifest + + +def _branch_storage_class_name(identifier: Identifier) -> str: + return f"sc-{str(identifier).lower()}" + + +async def ensure_branch_storage_class(branch_id: Identifier, *, iops: int) -> str: + storage_class_name = _branch_storage_class_name(branch_id) + try: + await kube_service.get_storage_class(storage_class_name) + return storage_class_name + except VelaKubernetesError: + pass + + base_storage_class = await kube_service.get_storage_class(_SIMPLYBLOCK_CSI_STORAGE_CLASS) + storage_class_manifest = _build_storage_class_manifest( + storage_class_name=storage_class_name, + iops=iops, + base_storage_class=base_storage_class, + ) + await kube_service.apply_storage_class(storage_class_manifest) + return storage_class_name + + +def _release_name() -> str: + return get_settings().deployment_release_name + + +def _release_fullname() -> str: + release = _release_name() + return release if "vela" in release else f"{release}-vela" + + +def _autoscaler_vm_name() -> str: + name = f"{_release_fullname()}-autoscaler-vm" + return name[:63].rstrip("-") + + +async def _resolve_volume_identifiers(namespace: str, pvc_name: str) -> tuple[UUID, UUID | None]: + pvc = await kube_service.get_persistent_volume_claim(namespace, pvc_name) + pvc_spec = getattr(pvc, "spec", None) + volume_name = getattr(pvc_spec, "volume_name", None) if pvc_spec else None + if not volume_name: + raise VelaDeploymentError(f"PersistentVolumeClaim {namespace}/{pvc_name} is not bound to a PersistentVolume") + + pv = await kube_service.get_persistent_volume(volume_name) + pv_spec = getattr(pv, "spec", None) + csi_spec = getattr(pv_spec, "csi", None) if pv_spec else None + volume_attributes = getattr(csi_spec, "volume_attributes", None) if csi_spec else None + if not isinstance(volume_attributes, dict): + raise VelaDeploymentError( + f"PersistentVolume {volume_name} missing CSI volume attributes; cannot resolve Simplyblock volume UUID" + ) + volume_uuid = volume_attributes.get("uuid") + volume_cluster_id = volume_attributes.get("cluster_id") + if not volume_uuid: + raise VelaDeploymentError(f"PersistentVolume {volume_name} missing 'uuid' attribute in CSI volume attributes") + return UUID(volume_uuid), UUID(volume_cluster_id) if volume_cluster_id is not None else None + + +async def resolve_storage_volume_identifiers(namespace: str) -> tuple[UUID, UUID | None]: + pvc_name = f"{_autoscaler_vm_name()}{STORAGE_PVC_SUFFIX}" + return await _resolve_volume_identifiers(namespace, pvc_name) + + +async def resolve_autoscaler_volume_identifiers(namespace: str) -> tuple[UUID, UUID | None]: + pvc_name = f"{_autoscaler_vm_name()}{AUTOSCALER_PVC_SUFFIX}" + return await _resolve_volume_identifiers(namespace, pvc_name) + + +async def resolve_autoscaler_wal_volume_identifiers(namespace: str) -> tuple[UUID, UUID | None]: + pvc_name = f"{_autoscaler_vm_name()}{AUTOSCALER_WAL_PVC_SUFFIX}" + return await _resolve_volume_identifiers(namespace, pvc_name) + + +@dataclass +class SimplyblockVolume(Volume): + identifier: Identifier + _backend: SimplyblockBackend + + async def resize(self, new_size_bytes: int) -> None: + await self._backend._resize_volume(self.identifier, new_size_bytes) + self.size_bytes = new_size_bytes + + async def delete(self) -> None: + await self._backend._delete_volume(self.identifier) + + async def snapshot(self, label: str, backup_id: Identifier) -> SimplyblockSnapshot: + details = await self._backend._snapshot_volume( + namespace=self.namespace, + pvc_name=self.pvc_name, + label=label, + backup_id=backup_id, + ) + snapshot_ref = SnapshotRef(name=details.name, namespace=details.namespace, content_name=details.content_name) + return SimplyblockSnapshot( + details=details, + snapshot_ref=snapshot_ref, + source_identifier=self.identifier, + _backend=self._backend, + ) + + async def update_performance(self, qos: VolumeQosProfile) -> None: + await self._backend._update_volume_performance(self.identifier, qos) + + async def usage(self) -> VolumeUsage | None: + return await self._backend._get_volume_usage(self.identifier) + + async def relocate(self, target_node: str | None = None) -> None: + _ = target_node + raise VelaDeploymentError("simplyblock backend does not require volume relocation") + + +@dataclass +class SimplyblockStoragePoolVolumeGroup(VolumeGroup): + _backend: SimplyblockBackend + + async def delete(self) -> None: + raise VelaDeploymentError("simplyblock backend does not expose volume groups") + + async def update_performance(self, qos: VolumeQosProfile) -> None: + _ = qos + raise VelaDeploymentError("simplyblock backend does not expose volume groups") + + async def volumes(self) -> list[Volume]: + raise VelaDeploymentError("simplyblock backend does not expose volume groups") + + async def snapshot(self, label: str, backup_id: Identifier) -> Snapshot: + _ = (label, backup_id) + raise VelaDeploymentError("simplyblock backend does not expose volume groups") + + +@dataclass +class SimplyblockSimpleVolumeGroup(VolumeGroup): + _backend: SimplyblockBackend + identifier: Identifier + + async def delete(self) -> None: + volumes = await self._backend._list_volume_group_volumes(self.identifier) + for volume in volumes: + await volume.delete() + + async def update_performance(self, qos: VolumeQosProfile) -> None: + _ = qos + raise VelaDeploymentError("simplyblock backend does not expose volume groups") + + async def volumes(self) -> list[Volume]: + return await self._backend._list_volume_group_volumes(self.identifier) + + async def snapshot(self, label: str, backup_id: Identifier) -> Snapshot: + _ = (label, backup_id) + raise VelaDeploymentError("simplyblock backend does not expose volume groups") + + async def provision_volume( + self, + name: str, + size_bytes: int, + qos: VolumeQosProfile | None = None, + ) -> Volume: + return await self._backend._provision_volume(self.identifier, name, size_bytes, qos) + + +@dataclass +class SimplyblockSnapshot(Snapshot): + source_identifier: Identifier | None + _backend: SimplyblockBackend + + async def delete(self) -> None: + await self._backend._delete_snapshot(self.details) + + async def restore_to(self, target_volume: Volume, database_size: int) -> None: + target_identifier = self._backend._volume_identifier(target_volume) + if self.source_identifier is None: + raise VelaDeploymentError("Cannot restore snapshot without source identifier context") + await self._backend._restore_volume_from_snapshot( + source_identifier=self.source_identifier, + target_identifier=target_identifier, + snapshot_ref=self.snapshot_ref, + database_size=database_size, + ) + + async def clone_to(self, target_volume: Volume, new_size: int) -> None: + target_identifier = self._backend._volume_identifier(target_volume) + if self.source_identifier is None: + raise VelaDeploymentError("Cannot clone snapshot without source identifier context") + await self._backend._clone_volume_from_snapshot( + source_identifier=self.source_identifier, + target_identifier=target_identifier, + new_size=new_size, + ) + + +class SimplyblockBackend(StorageBackend): + name = "simplyblock" + + def __init__(self, settings: Settings): + self.settings = settings + + def get_capabilities(self) -> StorageCapabilitiesPublic: + return StorageCapabilitiesPublic( + backend="simplyblock", + capabilities=_CAPABILITIES, + storage_class=self.settings.storage_default_class, + snapshot_class=self.settings.storage_snapshot_class, + qos_policy=self.settings.storage_qos_policy, + ) + + def resolve_storage_class(self) -> str: + return self.settings.storage_default_class + + def resolve_snapshot_class(self) -> str: + return self.settings.storage_snapshot_class + + async def provision_volume( + self, + name: str, + size_bytes: int, + qos: VolumeQosProfile | None = None, + ) -> Volume: + return await self._provision_volume(ULID(), name, size_bytes, qos) + + async def provision_volume_group( + self, + group_id: Identifier, + group_name: str, + qos: VolumeQosProfile | None = None, + ) -> VolumeGroup: + _ = qos + return SimplyblockSimpleVolumeGroup(identifier=group_id, _backend=self, name=group_name) + + async def lookup_volume(self, volume_id: Identifier) -> Volume | None: + namespace, autoscaler_vm_name = get_autoscaler_vm_identity(volume_id) + pvc_name = f"{autoscaler_vm_name}{AUTOSCALER_PVC_SUFFIX}" + try: + pvc = await kube_service.get_persistent_volume_claim(namespace, pvc_name) + except VelaKubernetesError as exc: + if self._is_not_found_error(exc): + return None + raise + + requests = getattr(getattr(getattr(pvc, "spec", None), "resources", None), "requests", None) or {} + size_bytes = quantity_to_bytes(requests.get("storage")) or 0 + return SimplyblockVolume( + namespace=namespace, + pvc_name=pvc_name, + storage_class=self._branch_storage_class_name(volume_id), + size_bytes=size_bytes, + identifier=volume_id, + _backend=self, + ) + + async def lookup_volume_group(self, group_id: Identifier) -> VolumeGroup | None: + return SimplyblockSimpleVolumeGroup(identifier=group_id, _backend=self, name=str(group_id)) + + async def lookup_snapshot(self, snapshot_ref: SnapshotRef) -> Snapshot | None: + snapshot = await read_snapshot(snapshot_ref.namespace, snapshot_ref.name) + if snapshot is None: + return None + status = snapshot.get("status") or {} + content_name_payload = status.get("boundVolumeSnapshotContentName") + content_name = content_name_payload if isinstance(content_name_payload, str) else snapshot_ref.content_name + details = SnapshotDetails( + name=snapshot_ref.name, + namespace=snapshot_ref.namespace, + content_name=content_name, + size_bytes=quantity_to_bytes(status.get("restoreSize")), + ) + resolved_ref = SnapshotRef(name=details.name, namespace=details.namespace, content_name=details.content_name) + return SimplyblockSnapshot( + details=details, + snapshot_ref=resolved_ref, + source_identifier=None, + _backend=self, + ) + + async def get_branch_volume_usage( + self, + identifier: Identifier, + *, + volume_type: Literal["database", "storage", "wal"] = "database", + ) -> VolumeUsage | None: + if volume_type == "database": + return await self._get_volume_usage(identifier) + + namespace = f"{self.settings.deployment_namespace_prefix}-{str(identifier).lower()}" + resolver = ( + resolve_storage_volume_identifiers + if volume_type == "storage" + else resolve_autoscaler_wal_volume_identifiers + ) + try: + volume, _ = await resolver(namespace) + async with create_simplyblock_api() as sb_api: + stats = await sb_api.volume_iostats(volume=volume) + except (VelaDeploymentError, VelaSimplyblockAPIError, VelaKubernetesError): + return None + + return VolumeUsage( + used_bytes=int(stats.get("size_used") or 0), + read_iops=int(stats.get("read_io_ps") or 0), + write_iops=int(stats.get("write_io_ps") or 0), + ) + + async def clone_branch_database_volume( + self, + *, + source_identifier: Identifier, + target_identifier: Identifier, + database_size: int, + pitr_enabled: bool = False, + ) -> None: + storage_class = await ensure_branch_storage_class(target_identifier, iops=IOPS_MIN) + await clone_branch_database_volume( + source_branch_id=source_identifier, + target_branch_id=target_identifier, + snapshot_class=self.resolve_snapshot_class(), + storage_class_name=storage_class, + snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS, + snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS, + pvc_timeout_seconds=_PVC_TIMEOUT_SECONDS, + pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS, + database_size=database_size, + pitr_enabled=pitr_enabled, + ) + + async def restore_branch_database_volume_from_snapshot( + self, + *, + source_identifier: Identifier, + target_identifier: Identifier, + snapshot_ref: SnapshotRef, + database_size: int, + ) -> None: + storage_class = await ensure_branch_storage_class(target_identifier, iops=IOPS_MIN) + await restore_branch_database_volume_from_snapshot( + source_branch_id=source_identifier, + target_branch_id=target_identifier, + snapshot_namespace=snapshot_ref.namespace, + snapshot_name=snapshot_ref.name, + snapshot_content_name=snapshot_ref.content_name, + snapshot_class=self.resolve_snapshot_class(), + storage_class_name=storage_class, + database_size=database_size, + snapshot_timeout_seconds=_SNAPSHOT_TIMEOUT_SECONDS, + snapshot_poll_interval_seconds=_SNAPSHOT_POLL_INTERVAL_SECONDS, + pvc_timeout_seconds=_PVC_TIMEOUT_SECONDS, + pvc_poll_interval_seconds=_PVC_POLL_INTERVAL_SECONDS, + ) + + async def get_snapshot_used_size(self, snapshot_ids: list[UUID]) -> int | None: + if not snapshot_ids: + return 0 + async with create_simplyblock_api() as sb_api: + snapshots = await sb_api.list_snapshots() + used_size_by_id: dict[UUID, int] = {snapshot.id: snapshot.used_size for snapshot in snapshots} + missing_ids = set(snapshot_ids) - set(used_size_by_id) + if missing_ids: + missing = ", ".join(str(snapshot_id) for snapshot_id in sorted(missing_ids, key=str)) + raise VelaSimplyblockAPIError(f"Missing snapshots in Simplyblock response: {missing}") + return sum(used_size_by_id[snapshot_id] for snapshot_id in snapshot_ids) + + def validate_qos_profile(self, qos: VolumeQosProfile) -> None: + unsupported_fields = [ + field_name + for field_name in ( + "max_read_iops", + "max_write_iops", + "max_read_mibps", + "max_write_mibps", + "max_read_write_mibps", + ) + if getattr(qos, field_name) is not None + ] + if unsupported_fields and self.settings.storage_qos_policy == "strict": + raise VelaDeploymentError( + "simplyblock backend supports only max_read_write_iops; " + f"unsupported fields: {', '.join(unsupported_fields)}" + ) + + def validate_capabilities_for_operation(self, operation: str, params: dict[str, Any] | None = None) -> None: + _ = params + capabilities = self.get_capabilities().capabilities + checks = { + "volume_expansion": capabilities.supports_volume_expansion, + "runtime_iops_update": capabilities.supports_volume_iops_update, + "volume_group_provision": capabilities.supports_volume_groups, + "volume_relocation": capabilities.supports_volume_relocation, + } + supported = checks.get(operation) + if supported is False: + raise VelaDeploymentError(f"Operation {operation!r} is not supported by backend {self.name!r}") + + async def _provision_volume( + self, + identifier: Identifier, + name: str, + size_bytes: int, + qos: VolumeQosProfile | None = None, + ) -> Volume: + _ = name + self.validate_qos_profile(qos or VolumeQosProfile()) + iops = self._effective_iops(qos) + storage_class = await ensure_branch_storage_class(identifier, iops=iops) + namespace, autoscaler_vm_name = get_autoscaler_vm_identity(identifier) + pvc_name = f"{autoscaler_vm_name}{AUTOSCALER_PVC_SUFFIX}" + return SimplyblockVolume( + namespace=namespace, + pvc_name=pvc_name, + storage_class=storage_class, + size_bytes=size_bytes, + identifier=identifier, + _backend=self, + ) + + async def _resize_volume(self, identifier: Identifier, new_size_bytes: int) -> None: + # Resize is currently handled by existing PVC workflows outside this adapter. + # TODO: Move update of the PVC here + _ = (identifier, new_size_bytes) + return None + + async def _delete_volume(self, identifier: Identifier) -> None: + namespace, autoscaler_vm_name = get_autoscaler_vm_identity(identifier) + pvc_name = f"{autoscaler_vm_name}{AUTOSCALER_PVC_SUFFIX}" + + pv_name: str | None + try: + pvc = await kube_service.get_persistent_volume_claim(namespace, pvc_name) + pvc_spec = getattr(pvc, "spec", None) + pv_name = getattr(pvc_spec, "volume_name", None) if pvc_spec else None + except VelaKubernetesError as exc: + if self._is_not_found_error(exc): + return + raise + + await delete_pvc(namespace, pvc_name) + await wait_for_pvc_absent( + namespace, + pvc_name, + timeout=_PVC_TIMEOUT_SECONDS, + poll_interval=_PVC_POLL_INTERVAL_SECONDS, + ) + if pv_name: + await kube_service.delete_persistent_volume(pv_name) + + async def _snapshot_volume( + self, + namespace: str, + pvc_name: str, + label: str | None = None, + backup_id: Identifier | None = None, + ) -> SnapshotDetails: + snapshot_name = self._build_snapshot_name( + label=label or "backup", + backup_id=backup_id or str(ULID()), + ) + await create_snapshot_from_pvc( + namespace=namespace, + name=snapshot_name, + snapshot_class=self.resolve_snapshot_class(), + pvc_name=pvc_name, + ) + snapshot = await wait_snapshot_ready( + namespace, + snapshot_name, + timeout=_SNAPSHOT_TIMEOUT_SECONDS, + poll_interval=_SNAPSHOT_POLL_INTERVAL_SECONDS, + ) + status = snapshot.get("status") or {} + content_name_payload = status.get("boundVolumeSnapshotContentName") + content_name = content_name_payload if isinstance(content_name_payload, str) else None + return SnapshotDetails( + name=snapshot_name, + namespace=namespace, + content_name=content_name, + size_bytes=quantity_to_bytes(status.get("restoreSize")), + ) + + async def _list_volume_group_volumes(self, group_id: Identifier) -> list[Volume]: + volume = await self.lookup_volume(group_id) + return [] if volume is None else [volume] + + async def _clone_volume_from_snapshot( + self, + source_identifier: Identifier, + target_identifier: Identifier, + new_size: int, + ) -> None: + await self.clone_branch_database_volume( + source_identifier=source_identifier, + target_identifier=target_identifier, + database_size=new_size, + pitr_enabled=False, + ) + + async def _restore_volume_from_snapshot( + self, + source_identifier: Identifier, + target_identifier: Identifier, + snapshot_ref: SnapshotRef, + database_size: int, + ) -> None: + await self.restore_branch_database_volume_from_snapshot( + source_identifier=source_identifier, + target_identifier=target_identifier, + snapshot_ref=snapshot_ref, + database_size=database_size, + ) + + async def _update_volume_performance(self, identifier: Identifier, qos: VolumeQosProfile) -> None: + self.validate_qos_profile(qos) + iops = self._effective_iops(qos) + storage_class_name = self._branch_storage_class_name(identifier) + try: + base_storage_class = await kube_service.get_storage_class(storage_class_name) + except VelaKubernetesError: + base_storage_class = await kube_service.get_storage_class(_SIMPLYBLOCK_CSI_STORAGE_CLASS) + + storage_class_manifest = _build_storage_class_manifest( + storage_class_name=storage_class_name, + iops=iops, + base_storage_class=base_storage_class, + ) + await kube_service.apply_storage_class(storage_class_manifest) + + async def _get_volume_usage(self, identifier: Identifier) -> VolumeUsage | None: + namespace = f"{self.settings.deployment_namespace_prefix}-{str(identifier).lower()}" + try: + volume, _ = await resolve_autoscaler_volume_identifiers(namespace) + async with create_simplyblock_api() as sb_api: + stats = await sb_api.volume_iostats(volume=volume) + except (VelaDeploymentError, VelaSimplyblockAPIError, VelaKubernetesError): + return None + + return VolumeUsage( + used_bytes=int(stats.get("size_used") or 0), + read_iops=int(stats.get("read_io_ps") or 0), + write_iops=int(stats.get("write_io_ps") or 0), + ) + + async def _delete_snapshot(self, details: SnapshotDetails) -> None: + await delete_snapshot(details.namespace, details.name) + + def _volume_identifier(self, volume: Volume) -> Identifier: + if isinstance(volume, SimplyblockVolume): + return volume.identifier + raise VelaDeploymentError("Snapshot operation requires a SimplyblockVolume target") + + def _effective_iops(self, qos: VolumeQosProfile | None) -> int: + iops = (qos.max_read_write_iops if qos is not None else None) or IOPS_MIN + return max(iops, IOPS_MIN) + + def _build_snapshot_name(self, *, label: str, backup_id: Identifier | str) -> str: + clean_label = re.sub(r"[^a-z0-9-]", "-", label.lower()) + clean_label = re.sub(r"-+", "-", clean_label).strip("-") or "backup" + clean_backup = re.sub(r"[^a-z0-9-]", "-", str(backup_id).lower()) + clean_backup = re.sub(r"-+", "-", clean_backup).strip("-") or datetime.now(UTC).strftime("%Y%m%d%H%M%S") + return f"{clean_label}-{clean_backup}"[:_K8S_NAME_MAX_LENGTH].strip("-") + + def _branch_storage_class_name(self, identifier: Identifier) -> str: + return _branch_storage_class_name(identifier) + + def _is_not_found_error(self, exc: VelaKubernetesError) -> bool: + return "not found" in str(exc).lower() diff --git a/src/models/branch.py b/src/models/branch.py index d049981d..6377a099 100644 --- a/src/models/branch.py +++ b/src/models/branch.py @@ -14,6 +14,7 @@ CPU_CONSTRAINTS, DATABASE_SIZE_CONSTRAINTS, IOPS_CONSTRAINTS, + IOPS_MIN, MEMORY_CONSTRAINTS, PGBOUNCER_DEFAULT_MAX_CLIENT_CONN, PGBOUNCER_DEFAULT_POOL_SIZE, @@ -48,6 +49,7 @@ def _default_resource_usage_payload() -> dict[str, Any]: "iops": 0, "storage_bytes": None, "wal_bytes": None, + "volume_metrics_available": None, } @@ -88,7 +90,7 @@ class Branch(AsyncAttrs, Model, table=True): database_size: Annotated[int, Field(**DATABASE_SIZE_CONSTRAINTS, sa_column=Column(BigInteger))] milli_vcpu: Annotated[int, Field(**CPU_CONSTRAINTS, sa_column=Column(BigInteger))] # units of milli vCPU memory: Annotated[int, Field(**MEMORY_CONSTRAINTS, sa_column=Column(BigInteger))] - iops: Annotated[int, Field(**IOPS_CONSTRAINTS, sa_column=Column(BigInteger))] + iops: Annotated[int, Field(default=IOPS_MIN, **IOPS_CONSTRAINTS, sa_column=Column(BigInteger))] storage_size: Annotated[ int | None, Field(**STORAGE_SIZE_CONSTRAINTS, sa_column=Column(BigInteger, nullable=True)) ] = None @@ -188,6 +190,7 @@ def get_resource_usage(self) -> "ResourceUsageDefinition": iops=int(payload.get("iops") or 0), storage_bytes=None if storage_value is None else int(storage_value), wal_bytes=None if wal_value is None else int(wal_value), + volume_metrics_available=payload.get("volume_metrics_available"), ) @@ -451,6 +454,12 @@ class ResourceUsageDefinition(BaseModel): description="Measured total snapshot used size in bytes, if available.", ), ] = None + volume_metrics_available: Annotated[ + bool | None, + PydanticField( + description="Whether volume/storage usage metrics were available from the active storage backend.", + ), + ] = None class BranchApiKeys(BaseModel): diff --git a/storage-backend-tasks.md b/storage-backend-tasks.md new file mode 100644 index 00000000..5eb80301 --- /dev/null +++ b/storage-backend-tasks.md @@ -0,0 +1,24 @@ +- Move + - [x] `vela-controller/src/deployment/__init__.py:241` _build_storage_class_manifest (simplyblock QoS keys) -> move behind SimplyblockBackend provisioning. + - [x] `vela-controller/src/deployment/__init__.py:287` load_simplyblock_credentials -> move to simplyblock adapter/API client boundary. + - [x] `vela-controller/src/deployment/__init__.py:307` _resolve_volume_identifiers + helpers -> move to simplyblock backend usage/lookup implementation. + - [x] `vela-controller/src/deployment/__init__.py:344` resolve_branch_database_volume_size -> backend lookup_volume()/usage path. + - [x] `vela-controller/src/deployment/__init__.py:355` update_branch_volume_iops -> backend Volume.update_performance. + - [x] `vela-controller/src/deployment/__init__.py:368` ensure_branch_storage_class -> backend-resolved storage class strategy. + - [x] `vela-controller/src/api/organization/project/branch/__init__.py:844`, `vela-controller/src/api/organization/project/branch/__init__.py:916`, `vela-controller/src/api/organization/project/branch/__init__.py:994` clone/restore orchestration currently composes storage class + snapshot class directly; should be moved to backend Volume/Snapshot operations. + - [x] `vela-controller/src/api/resources.py:340–vela-controller/src/api/resources.py:359` direct simplyblock iostats collection/parsing -> backend usage provider. + - [x] `vela-controller/src/api/backup_snapshots.py:236` branch_snapshots_used_size (calls simplyblock snapshot API) -> backend-specific snapshot metric provider. + +- Adjust + - [x] `vela-controller/src/api/organization/project/branch/__init__.py:310` hardcoded _VOLUME_SNAPSHOT_CLASS = "simplyblock-csi-snapshotclass" -> backend resolver. + - [x] `vela-controller/src/api/backup.py:49` and `vela-controller/src/api/backupmonitor.py:35` simplyblock snapshot default -> backend resolver. + - [x] `vela-controller/src/deployment/charts/vela/values.yaml:65`, `vela-controller/src/deployment/charts/vela/values.yaml:76`, `vela-controller/src/deployment/charts/vela/values.yaml:358` hardcoded simplyblock-csi-sc -> resolved storage class. + - [x] `vela-controller/src/deployment/deployment.py:36`, `vela-controller/src/api/organization/project/branch/__init__.py:529`, `vela-controller/src/models/branch.py:91` required iops contract -> capability-aware handling for non-simplyblock backends. + - [x] `vela-controller/src/api/organization/project/branch/__init__.py:742` resize path directly calling update_branch_volume_iops -> backend capability-gated update/no-op policy. + - [x] `vela-controller/src/api/resources.py:410` zero-fallback semantics on simplyblock metric failure -> explicit “metric unavailable” behavior. + - [x] `vela-controller/src/deployment/settings.py:16` missing storage_default_class / storage_snapshot_class even though backends use them. + +- Extend + - [x] `vela-controller/src/deployment/storage_backends/base.py:12` extend interface for snapshot-used-size/telemetry availability surface (needed to remove simplyblock-only metric path). + - [x] `vela-controller/src/deployment/storage_backends/__init__.py:62`, `vela-controller/src/deployment/storage_backends/lvm.py:302`, `vela-controller/src/deployment/storage_backends/simplyblock.py:299` capability checks reference supports_snapshots / supports_snapshot_restore not present in base.py model; extend/fix capability schema alignment. + - [x] `vela-controller/src/api/system.py:73` resource-limit definitions should be extended to be backend-capability aware (especially iops visibility/validation paths).