From 1ed50428e855496cbd44e64817f0c062d95ce8ce Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 2 Apr 2026 11:05:28 +0500 Subject: [PATCH 1/2] Handle asyncio.TimeoutError --- .../_internal/server/background/pipeline_tasks/base.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/base.py b/src/dstack/_internal/server/background/pipeline_tasks/base.py index 49e9768fa..fa4f99785 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/base.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/base.py @@ -299,7 +299,10 @@ async def start(self): self._fetch_event.wait(), timeout=self._next_fetch_delay(empty_fetch_count), ) - except TimeoutError: + except ( + asyncio.TimeoutError, # < Python 3.11 + TimeoutError, # >= Python 3.11 + ): pass empty_fetch_count += 1 self._fetch_event.clear() From 15cbf7678ee5f7a906890992ca0342ce86b77726 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 2 Apr 2026 11:06:24 +0500 Subject: [PATCH 2/2] Lazy init PipelineManager --- .../server/background/pipeline_tasks/__init__.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/__init__.py b/src/dstack/_internal/server/background/pipeline_tasks/__init__.py index ca12c95ad..4bf76c20c 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/__init__.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/__init__.py @@ -85,10 +85,13 @@ def hint_fetch(self, model_name: str): pipeline.hint_fetch() -_pipeline_manager = PipelineManager() +_pipeline_manager = None def get_pipeline_manager() -> PipelineManager: + global _pipeline_manager + if _pipeline_manager is None: + _pipeline_manager = PipelineManager() return _pipeline_manager @@ -97,5 +100,6 @@ def start_pipeline_tasks() -> PipelineManager: Start tasks processed by fetch-workers pipelines based on db + in-memory queues. Suitable for tasks that run frequently and need to lock rows for a long time. """ - _pipeline_manager.start() - return _pipeline_manager + pipeline_manager = get_pipeline_manager() + pipeline_manager.start() + return pipeline_manager