Skip to content

Commit 5e9e322

Browse files
committed
Hint gateway pipeline
1 parent e6c6f85 commit 5e9e322

6 files changed

Lines changed: 24 additions & 4 deletions

File tree

src/dstack/_internal/server/background/pipeline_tasks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from dstack._internal.server.background.pipeline_tasks.base import Pipeline
44
from dstack._internal.server.background.pipeline_tasks.compute_groups import ComputeGroupPipeline
5+
from dstack._internal.server.background.pipeline_tasks.gateways import GatewayPipeline
56
from dstack._internal.server.background.pipeline_tasks.placement_groups import (
67
PlacementGroupPipeline,
78
)
@@ -17,6 +18,7 @@ def __init__(self) -> None:
1718
if FeatureFlags.PIPELINE_PROCESSING_ENABLED:
1819
self._pipelines += [
1920
ComputeGroupPipeline(),
21+
GatewayPipeline(),
2022
PlacementGroupPipeline(),
2123
]
2224
self._hinter = PipelineHinter(self._pipelines)

src/dstack/_internal/server/background/scheduled_tasks/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,21 +99,23 @@ def start_scheduled_tasks() -> AsyncIOScheduler:
9999
)
100100
_scheduler.add_job(delete_prometheus_metrics, IntervalTrigger(minutes=5), max_instances=1)
101101
_scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15))
102-
_scheduler.add_job(process_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5)
103102
_scheduler.add_job(
104103
process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5
105104
)
106105
_scheduler.add_job(
107106
process_idle_volumes, IntervalTrigger(seconds=60, jitter=10), max_instances=1
108107
)
109-
if not FeatureFlags.PIPELINE_PROCESSING_ENABLED:
110-
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5))
111108
_scheduler.add_job(
112109
process_fleets,
113110
IntervalTrigger(seconds=10, jitter=2),
114111
max_instances=1,
115112
)
116113
_scheduler.add_job(delete_instance_health_checks, IntervalTrigger(minutes=5), max_instances=1)
114+
if not FeatureFlags.PIPELINE_PROCESSING_ENABLED:
115+
_scheduler.add_job(
116+
process_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5
117+
)
118+
_scheduler.add_job(process_placement_groups, IntervalTrigger(seconds=30, jitter=5))
117119
for replica in range(settings.SERVER_BACKGROUND_PROCESSING_FACTOR):
118120
# Add multiple copies of tasks if requested.
119121
# max_instances=1 for additional copies to avoid running too many tasks.

src/dstack/_internal/server/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,8 @@ class GatewayModel(PipelineModelMixin, BaseModel):
524524

525525
__table_args__ = (UniqueConstraint("project_id", "name", name="uq_gateways_project_id_name"),)
526526

527+
# TODO: Add pipeline index ("ix_gateways_pipeline_fetch_q") if gateways become soft-deleted.
528+
527529

528530
class GatewayComputeModel(BaseModel):
529531
__tablename__ = "gateway_computes"

src/dstack/_internal/server/routers/gateways.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
ProjectAdmin,
1414
ProjectMemberOrPublicAccess,
1515
)
16+
from dstack._internal.server.services.pipelines import PipelineHinterProtocol, get_pipeline_hinter
1617
from dstack._internal.server.utils.routers import (
1718
CustomORJSONResponse,
1819
get_base_api_additional_responses,
@@ -54,6 +55,7 @@ async def create_gateway(
5455
body: schemas.CreateGatewayRequest,
5556
session: AsyncSession = Depends(get_session),
5657
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectAdmin()),
58+
pipeline_hinter: PipelineHinterProtocol = Depends(get_pipeline_hinter),
5759
):
5860
user, project = user_project
5961
return CustomORJSONResponse(
@@ -62,6 +64,7 @@ async def create_gateway(
6264
user=user,
6365
project=project,
6466
configuration=body.configuration,
67+
pipeline_hinter=pipeline_hinter,
6568
)
6669
)
6770

src/dstack/_internal/server/services/gateways/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
get_locker,
5959
string_to_lock_id,
6060
)
61+
from dstack._internal.server.services.pipelines import PipelineHinterProtocol
6162
from dstack._internal.server.services.plugins import apply_plugin_policies
6263
from dstack._internal.server.utils.common import gather_map_async
6364
from dstack._internal.utils.common import get_current_datetime, run_async
@@ -184,6 +185,7 @@ async def create_gateway(
184185
user: UserModel,
185186
project: ProjectModel,
186187
configuration: GatewayConfiguration,
188+
pipeline_hinter: PipelineHinterProtocol,
187189
) -> Gateway:
188190
spec = await apply_plugin_policies(
189191
user=user.name,
@@ -238,6 +240,7 @@ async def create_gateway(
238240
await set_default_gateway(
239241
session=session, project=project, name=configuration.name, user=user
240242
)
243+
pipeline_hinter.hint_fetch(GatewayModel.__name__)
241244
return gateway_model_to_gateway(gateway)
242245

243246

@@ -303,7 +306,8 @@ async def delete_gateways(
303306
)
304307
gateway_models = res.scalars().all()
305308
if len(gateway_models) != len(gateways_ids):
306-
# TODO: Make the delete endpoint fully async without lock – put the request in queue and process in background.
309+
# TODO: Make the delete endpoint fully async so we don't need to lock and error:
310+
# put the request in queue and process in the background.
307311
raise ServerClientError(
308312
"Failed to delete gateways: gateways are being processed currently. Try again later."
309313
)

src/dstack/_internal/server/services/pipelines.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,15 @@
55

66
class PipelineHinterProtocol(Protocol):
77
def hint_fetch(self, model_name: str) -> None:
8+
"""
9+
Pass `Model.__name__` to hint replica's pipelines to fetch the model's items ASAP.
10+
"""
811
pass
912

1013

1114
def get_pipeline_hinter(request: Request) -> PipelineHinterProtocol:
15+
"""
16+
Returns pipeline hinter that allows hinting replica's pipelines that there are new items for processing.
17+
This can reduce processing latency if the processing happens rarely.
18+
"""
1219
return request.app.state.pipeline_manager.hinter

0 commit comments

Comments
 (0)