feat: add run_async to TextEmbeddingRetriever, MultiQueryEmbeddingRetriever, and MultiQueryTextRetriever#11367
Conversation
…riever, and MultiQueryTextRetriever
|
@sachinn854 is attempting to deploy a commit to the deepset Team on Vercel. A member of the Team first needs to authorize it. |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds native coroutine support (run_async) to several retriever components so they can execute efficiently inside AsyncPipeline, with a thread-based fallback for sync-only wrapped components.
Changes:
- Implemented
run_asyncinTextEmbeddingRetriever,MultiQueryTextRetriever, andMultiQueryEmbeddingRetriever. - Added async test coverage (including
AsyncPipelineintegration tests) for the new async behavior and sync fallbacks. - Added release notes describing the new coroutine execution and fallback behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
haystack/components/retrievers/text_embedding_retriever.py |
Adds run_async with async-first execution and thread fallback for sync embedders/retrievers. |
haystack/components/retrievers/multi_query_text_retriever.py |
Adds concurrent run_async for multi-query text retrieval with per-query async execution and dedup/sort. |
haystack/components/retrievers/multi_query_embedding_retriever.py |
Adds concurrent run_async for multi-query embedding retrieval, including async embedder + retriever execution. |
test/components/retrievers/test_text_embedding_retriever_async.py |
New tests for TextEmbeddingRetriever.run_async + pipeline integration + sync fallback. |
test/components/retrievers/test_multi_query_text_retriever_async.py |
New tests for MultiQueryTextRetriever.run_async, dedup, ordering, fallback, and pipeline integration. |
test/components/retrievers/test_multi_query_embedding_retriever_async.py |
New tests for MultiQueryEmbeddingRetriever.run_async, dedup, ordering, fallback, and pipeline integration. |
releasenotes/notes/add-run-async-to-retrievers-a265779e909abc2c.yaml |
Documents the new run_async support and fallback behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @component.output_types(documents=list[Document]) | ||
| async def run_async( | ||
| self, queries: list[str], retriever_kwargs: dict[str, Any] | None = None | ||
| ) -> dict[str, list[Document]]: |
| retriever_kwargs = retriever_kwargs or {} | ||
|
|
||
| if not self._is_warmed_up: | ||
| self.warm_up() | ||
|
|
||
| results = await asyncio.gather(*[self._run_one_async(q, retriever_kwargs) for q in queries]) | ||
| docs: list[Document] = [doc for result in results if result for doc in result] | ||
| docs = _deduplicate_documents(docs) | ||
| docs.sort(key=lambda x: x.score or 0.0, reverse=True) | ||
| return {"documents": docs} |
| @component.output_types(documents=list[Document]) | ||
| async def run_async( | ||
| self, queries: list[str], retriever_kwargs: dict[str, Any] | None = None | ||
| ) -> dict[str, list[Document]]: |
| retriever_kwargs = retriever_kwargs or {} | ||
|
|
||
| if not self._is_warmed_up: | ||
| self.warm_up() | ||
|
|
||
| results = await asyncio.gather(*[self._run_one_async(q, retriever_kwargs) for q in queries]) | ||
| docs: list[Document] = [doc for result in results if result for doc in result] | ||
| docs = _deduplicate_documents(docs) | ||
| docs.sort(key=lambda x: x.score or 0.0, reverse=True) | ||
| return {"documents": docs} |
| loop = asyncio.get_running_loop() | ||
|
|
||
| if hasattr(self.text_embedder, "run_async") and callable(self.text_embedder.run_async): | ||
| embedding_result = await self.text_embedder.run_async(text=query) | ||
| else: | ||
| embedding_result = await loop.run_in_executor(None, lambda: self.text_embedder.run(text=query)) | ||
|
|
||
| if hasattr(self.retriever, "run_async") and callable(self.retriever.run_async): | ||
| result = await self.retriever.run_async( | ||
| query_embedding=embedding_result["embedding"], filters=filters, top_k=top_k | ||
| ) | ||
| else: | ||
| result = await loop.run_in_executor( | ||
| None, | ||
| lambda: self.retriever.run(query_embedding=embedding_result["embedding"], filters=filters, top_k=top_k), | ||
| ) |
| @pytest.mark.asyncio | ||
| async def test_run_async_deduplication(self): | ||
| doc2 = Document(content="Wind energy is clean", id="doc2", score=0.8) | ||
| # doc3 shares the same id as doc1 — simulates the same doc retrieved by different queries |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
davidsbatista
left a comment
There was a problem hiding this comment.
Looks Good, I did a few adjustments, increased test coverage.
Related Issues
run_asynctoMultiQueryEmbeddingRetriever,MultiQueryTextRetriever, andTextEmbeddingRetriever#11358Proposed Changes:
TextEmbeddingRetriever,MultiQueryEmbeddingRetriever, andMultiQueryTextRetrieverdid notimplement
run_async, soAsyncPipelinefell back to running them in a thread executor even whentheir wrapped components supported native async execution.
TextEmbeddingRetriever.run_async: chainstext_embedderandretrievercalls sequentially,using each component's
run_asyncif available, otherwise falls back toloop.run_in_executorMultiQueryEmbeddingRetriever.run_async: replacesThreadPoolExecutorwithasyncio.gathervia a new
_run_one_asynchelper; usesrun_asyncon both the embedder and retriever when availableMultiQueryTextRetriever.run_async: same pattern asMultiQueryEmbeddingRetrieverAll three follow the pattern already established in
MultiRetriever.run_async.How did you test it?
test/components/retrievers/test_text_embedding_retriever_async.pytest/components/retrievers/test_multi_query_embedding_retriever_async.pytest/components/retrievers/test_multi_query_text_retriever_async.pyrun_asyncis absentNotes for the reviewer
_run_on_thread(used by syncrun()) is unchanged — only new async methods were added.The
run_asyncsignature matchesrunexactly in all three components.Checklist
fix:,feat:,build:,chore:,ci:,docs:,style:,refactor:,perf:,test:and added!in case the PR includes breaking changes.