Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,13 @@ def hint_fetch(self, model_name: str):
pipeline.hint_fetch()


_pipeline_manager = PipelineManager()
_pipeline_manager = None


def get_pipeline_manager() -> PipelineManager:
global _pipeline_manager
if _pipeline_manager is None:
_pipeline_manager = PipelineManager()
return _pipeline_manager


Expand All @@ -97,5 +100,6 @@ def start_pipeline_tasks() -> PipelineManager:
Start tasks processed by fetch-workers pipelines based on db + in-memory queues.
Suitable for tasks that run frequently and need to lock rows for a long time.
"""
_pipeline_manager.start()
return _pipeline_manager
pipeline_manager = get_pipeline_manager()
pipeline_manager.start()
return pipeline_manager
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@ async def start(self):
self._fetch_event.wait(),
timeout=self._next_fetch_delay(empty_fetch_count),
)
except TimeoutError:
except (
asyncio.TimeoutError, # < Python 3.11
TimeoutError, # >= Python 3.11
):
pass
empty_fetch_count += 1
self._fetch_event.clear()
Expand Down
Loading