From 65a62569dab72c789064ac733d450a580b1ebd32 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 27 May 2025 15:04:20 +0500 Subject: [PATCH] Run plugins in executor --- src/dstack/_internal/server/services/fleets.py | 4 ++-- src/dstack/_internal/server/services/gateways/__init__.py | 2 +- src/dstack/_internal/server/services/plugins.py | 5 +++-- src/dstack/_internal/server/services/runs.py | 4 ++-- src/dstack/_internal/server/services/volumes.py | 2 +- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index cca73e25c..842ba60c5 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -237,7 +237,7 @@ async def get_plan( ) -> FleetPlan: # Spec must be copied by parsing to calculate merged_profile effective_spec = FleetSpec.parse_obj(spec.dict()) - effective_spec = apply_plugin_policies( + effective_spec = await apply_plugin_policies( user=user.name, project=project.name, spec=effective_spec, @@ -342,7 +342,7 @@ async def create_fleet( spec: FleetSpec, ) -> Fleet: # Spec must be copied by parsing to calculate merged_profile - spec = apply_plugin_policies( + spec = await apply_plugin_policies( user=user.name, project=project.name, spec=spec, diff --git a/src/dstack/_internal/server/services/gateways/__init__.py b/src/dstack/_internal/server/services/gateways/__init__.py index d271d9fd7..9cd9dcde0 100644 --- a/src/dstack/_internal/server/services/gateways/__init__.py +++ b/src/dstack/_internal/server/services/gateways/__init__.py @@ -140,7 +140,7 @@ async def create_gateway( project: ProjectModel, configuration: GatewayConfiguration, ) -> Gateway: - spec = apply_plugin_policies( + spec = await apply_plugin_policies( user=user.name, project=project.name, # Create pseudo spec until the gateway API is updated to accept spec diff --git a/src/dstack/_internal/server/services/plugins.py b/src/dstack/_internal/server/services/plugins.py index 99699ef73..8acd101f9 100644 --- a/src/dstack/_internal/server/services/plugins.py +++ b/src/dstack/_internal/server/services/plugins.py @@ -5,6 +5,7 @@ from backports.entry_points_selectable import entry_points # backport for Python 3.9 from dstack._internal.core.errors import ServerClientError +from dstack._internal.utils.common import run_async from dstack._internal.utils.logging import get_logger from dstack.plugins import ApplyPolicy, ApplySpec, Plugin @@ -91,11 +92,11 @@ def load_plugins(enabled_plugins: list[str]): logger.warning("Enabled plugins not found: %s", plugins_to_load) -def apply_plugin_policies(user: str, project: str, spec: ApplySpec) -> ApplySpec: +async def apply_plugin_policies(user: str, project: str, spec: ApplySpec) -> ApplySpec: policies = _get_apply_policies() for policy in policies: try: - spec = policy.on_apply(user=user, project=project, spec=spec) + spec = await run_async(policy.on_apply, user=user, project=project, spec=spec) except ValueError as e: msg = None if len(e.args) > 0: diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index d0fd3045e..a1ec23e46 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -283,7 +283,7 @@ async def get_plan( ) -> RunPlan: # Spec must be copied by parsing to calculate merged_profile effective_run_spec = RunSpec.parse_obj(run_spec.dict()) - effective_run_spec = apply_plugin_policies( + effective_run_spec = await apply_plugin_policies( user=user.name, project=project.name, spec=effective_run_spec, @@ -382,7 +382,7 @@ async def apply_plan( force: bool, ) -> Run: run_spec = plan.run_spec - run_spec = apply_plugin_policies( + run_spec = await apply_plugin_policies( user=user.name, project=project.name, spec=run_spec, diff --git a/src/dstack/_internal/server/services/volumes.py b/src/dstack/_internal/server/services/volumes.py index 689520620..8913be3b0 100644 --- a/src/dstack/_internal/server/services/volumes.py +++ b/src/dstack/_internal/server/services/volumes.py @@ -205,7 +205,7 @@ async def create_volume( user: UserModel, configuration: VolumeConfiguration, ) -> Volume: - spec = apply_plugin_policies( + spec = await apply_plugin_policies( user=user.name, project=project.name, # Create pseudo spec until the volume API is updated to accept spec