From 76cb1049d7a23d44d4aff783b5fb3f1295592c55 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Tue, 12 May 2026 16:20:04 +0200 Subject: [PATCH] add prelim fix --- .../opensearch/document_store.py | 2 + .../opensearch/tests/test_bm25_retriever.py | 81 ++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index df9be9a2a9..4495a96506 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -327,6 +327,8 @@ async def _ensure_initialized_async(self) -> None: @staticmethod def _extract_nested_fields_from_mapping(mapping_properties: dict[str, Any]) -> set[str]: + if not isinstance(mapping_properties, dict): + return set() return {name for name, defn in mapping_properties.items() if defn.get("type") == "nested"} def _populate_nested_fields_from_mapping(self, mapping_properties: dict[str, Any]) -> None: diff --git a/integrations/opensearch/tests/test_bm25_retriever.py b/integrations/opensearch/tests/test_bm25_retriever.py index c1a77675cf..ca367477fa 100644 --- a/integrations/opensearch/tests/test_bm25_retriever.py +++ b/integrations/opensearch/tests/test_bm25_retriever.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 -from unittest.mock import Mock, patch +from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest from haystack.dataclasses import Document @@ -538,3 +538,82 @@ async def test_bm25_retriever_async_runtime_document_store_switching( # Should get results from opensearch-1 (the initial store) assert len(results_1_again["documents"]) == 1 + + +@pytest.mark.asyncio +async def test_run_async_in_async_pipeline(): + """Test that OpenSearchBM25Retriever.run_async is called when used inside AsyncPipeline.""" + from haystack import AsyncPipeline + + mock_store = Mock(spec=OpenSearchDocumentStore) + mock_store._bm25_retrieval_async = AsyncMock(return_value=[Document(content="Test doc")]) + + retriever = OpenSearchBM25Retriever(document_store=mock_store) + + pipeline = AsyncPipeline() + pipeline.add_component("retriever", retriever) + + result = await pipeline.run_async({"retriever": {"query": "test query"}}) + + assert len(result["retriever"]["documents"]) == 1 + assert result["retriever"]["documents"][0].content == "Test doc" + mock_store._bm25_retrieval_async.assert_called_once_with( + query="test query", + filters={}, + fuzziness=0, + top_k=10, + scale_score=False, + all_terms_must_match=False, + custom_query=None, + ) + + +@pytest.mark.asyncio +async def test_run_async_in_async_pipeline_bytes_error(): + """ + Reproduces: 'bytes' object has no attribute 'items' in AsyncPipeline. + + In production, the error surfaces as: + Component name: 'BM25Retriever_...' Component type: 'OpenSearchBM25Retriever' + Error: 'bytes' object has no attribute 'items' + + When run_async is called for the first time the document store lazily initialises + its async client via _ensure_initialized_async → _ensure_index_exists_async. + If the index already exists, the code retrieves its mapping and calls + _extract_nested_fields_from_mapping(properties). That method does + ``properties.items()``. If opensearch-py hands back bytes for that value + (observed with certain server/client version combinations) the call raises + AttributeError: 'bytes' object has no attribute 'items'. + """ + from haystack import AsyncPipeline + + with patch( + "haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch" + ) as MockAsyncOS: + mock_client = MagicMock() + MockAsyncOS.return_value = mock_client + + mock_client.indices.exists = AsyncMock(return_value=True) + mock_client.indices.get_mapping = AsyncMock( + return_value={ + "test_index": { + "mappings": { + # bytes instead of dict — this is what triggers the bug + "properties": b'{"content": {"type": "text"}}' + } + } + } + ) + mock_client.search = AsyncMock(return_value={"hits": {"hits": []}}) + + document_store = OpenSearchDocumentStore(hosts="fake_host", index="test_index") + retriever = OpenSearchBM25Retriever(document_store=document_store) + + pipeline = AsyncPipeline() + pipeline.add_component("retriever", retriever) + + # Currently raises (wrapped in PipelineRuntimeError): + # AttributeError: 'bytes' object has no attribute 'items' + # Once the bug is fixed this should return empty documents without raising. + result = await pipeline.run_async({"retriever": {"query": "test query"}}) + assert result["retriever"]["documents"] == []