File tree Expand file tree Collapse file tree 2 files changed +11
-4
lines changed
src/dstack/_internal/server/background/pipeline_tasks Expand file tree Collapse file tree 2 files changed +11
-4
lines changed Original file line number Diff line number Diff line change @@ -85,10 +85,13 @@ def hint_fetch(self, model_name: str):
8585 pipeline .hint_fetch ()
8686
8787
88- _pipeline_manager = PipelineManager ()
88+ _pipeline_manager = None
8989
9090
9191def get_pipeline_manager () -> PipelineManager :
92+ global _pipeline_manager
93+ if _pipeline_manager is None :
94+ _pipeline_manager = PipelineManager ()
9295 return _pipeline_manager
9396
9497
@@ -97,5 +100,6 @@ def start_pipeline_tasks() -> PipelineManager:
97100 Start tasks processed by fetch-workers pipelines based on db + in-memory queues.
98101 Suitable for tasks that run frequently and need to lock rows for a long time.
99102 """
100- _pipeline_manager .start ()
101- return _pipeline_manager
103+ pipeline_manager = get_pipeline_manager ()
104+ pipeline_manager .start ()
105+ return pipeline_manager
Original file line number Diff line number Diff line change @@ -299,7 +299,10 @@ async def start(self):
299299 self ._fetch_event .wait (),
300300 timeout = self ._next_fetch_delay (empty_fetch_count ),
301301 )
302- except TimeoutError :
302+ except (
303+ asyncio .TimeoutError , # < Python 3.11
304+ TimeoutError , # >= Python 3.11
305+ ):
303306 pass
304307 empty_fetch_count += 1
305308 self ._fetch_event .clear ()
You can’t perform that action at this time.
0 commit comments