diff --git a/src/dstack/_internal/server/background/pipeline_tasks/__init__.py b/src/dstack/_internal/server/background/pipeline_tasks/__init__.py index ca12c95ad..4bf76c20c 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/__init__.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/__init__.py @@ -85,10 +85,13 @@ def hint_fetch(self, model_name: str): pipeline.hint_fetch() -_pipeline_manager = PipelineManager() +_pipeline_manager = None def get_pipeline_manager() -> PipelineManager: + global _pipeline_manager + if _pipeline_manager is None: + _pipeline_manager = PipelineManager() return _pipeline_manager @@ -97,5 +100,6 @@ def start_pipeline_tasks() -> PipelineManager: Start tasks processed by fetch-workers pipelines based on db + in-memory queues. Suitable for tasks that run frequently and need to lock rows for a long time. """ - _pipeline_manager.start() - return _pipeline_manager + pipeline_manager = get_pipeline_manager() + pipeline_manager.start() + return pipeline_manager diff --git a/src/dstack/_internal/server/background/pipeline_tasks/base.py b/src/dstack/_internal/server/background/pipeline_tasks/base.py index 49e9768fa..fa4f99785 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/base.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/base.py @@ -299,7 +299,10 @@ async def start(self): self._fetch_event.wait(), timeout=self._next_fetch_delay(empty_fetch_count), ) - except TimeoutError: + except ( + asyncio.TimeoutError, # < Python 3.11 + TimeoutError, # >= Python 3.11 + ): pass empty_fetch_count += 1 self._fetch_event.clear()