Skip to content

Commit 76cb104

Browse files
committed
add prelim fix
1 parent 77c1e1b commit 76cb104

2 files changed

Lines changed: 82 additions & 1 deletion

File tree

integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ async def _ensure_initialized_async(self) -> None:
327327

328328
@staticmethod
329329
def _extract_nested_fields_from_mapping(mapping_properties: dict[str, Any]) -> set[str]:
330+
if not isinstance(mapping_properties, dict):
331+
return set()
330332
return {name for name, defn in mapping_properties.items() if defn.get("type") == "nested"}
331333

332334
def _populate_nested_fields_from_mapping(self, mapping_properties: dict[str, Any]) -> None:

integrations/opensearch/tests/test_bm25_retriever.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5-
from unittest.mock import Mock, patch
5+
from unittest.mock import AsyncMock, MagicMock, Mock, patch
66

77
import pytest
88
from haystack.dataclasses import Document
@@ -538,3 +538,82 @@ async def test_bm25_retriever_async_runtime_document_store_switching(
538538

539539
# Should get results from opensearch-1 (the initial store)
540540
assert len(results_1_again["documents"]) == 1
541+
542+
543+
@pytest.mark.asyncio
544+
async def test_run_async_in_async_pipeline():
545+
"""Test that OpenSearchBM25Retriever.run_async is called when used inside AsyncPipeline."""
546+
from haystack import AsyncPipeline
547+
548+
mock_store = Mock(spec=OpenSearchDocumentStore)
549+
mock_store._bm25_retrieval_async = AsyncMock(return_value=[Document(content="Test doc")])
550+
551+
retriever = OpenSearchBM25Retriever(document_store=mock_store)
552+
553+
pipeline = AsyncPipeline()
554+
pipeline.add_component("retriever", retriever)
555+
556+
result = await pipeline.run_async({"retriever": {"query": "test query"}})
557+
558+
assert len(result["retriever"]["documents"]) == 1
559+
assert result["retriever"]["documents"][0].content == "Test doc"
560+
mock_store._bm25_retrieval_async.assert_called_once_with(
561+
query="test query",
562+
filters={},
563+
fuzziness=0,
564+
top_k=10,
565+
scale_score=False,
566+
all_terms_must_match=False,
567+
custom_query=None,
568+
)
569+
570+
571+
@pytest.mark.asyncio
572+
async def test_run_async_in_async_pipeline_bytes_error():
573+
"""
574+
Reproduces: 'bytes' object has no attribute 'items' in AsyncPipeline.
575+
576+
In production, the error surfaces as:
577+
Component name: 'BM25Retriever_...' Component type: 'OpenSearchBM25Retriever'
578+
Error: 'bytes' object has no attribute 'items'
579+
580+
When run_async is called for the first time the document store lazily initialises
581+
its async client via _ensure_initialized_async → _ensure_index_exists_async.
582+
If the index already exists, the code retrieves its mapping and calls
583+
_extract_nested_fields_from_mapping(properties). That method does
584+
``properties.items()``. If opensearch-py hands back bytes for that value
585+
(observed with certain server/client version combinations) the call raises
586+
AttributeError: 'bytes' object has no attribute 'items'.
587+
"""
588+
from haystack import AsyncPipeline
589+
590+
with patch(
591+
"haystack_integrations.document_stores.opensearch.document_store.AsyncOpenSearch"
592+
) as MockAsyncOS:
593+
mock_client = MagicMock()
594+
MockAsyncOS.return_value = mock_client
595+
596+
mock_client.indices.exists = AsyncMock(return_value=True)
597+
mock_client.indices.get_mapping = AsyncMock(
598+
return_value={
599+
"test_index": {
600+
"mappings": {
601+
# bytes instead of dict — this is what triggers the bug
602+
"properties": b'{"content": {"type": "text"}}'
603+
}
604+
}
605+
}
606+
)
607+
mock_client.search = AsyncMock(return_value={"hits": {"hits": []}})
608+
609+
document_store = OpenSearchDocumentStore(hosts="fake_host", index="test_index")
610+
retriever = OpenSearchBM25Retriever(document_store=document_store)
611+
612+
pipeline = AsyncPipeline()
613+
pipeline.add_component("retriever", retriever)
614+
615+
# Currently raises (wrapped in PipelineRuntimeError):
616+
# AttributeError: 'bytes' object has no attribute 'items'
617+
# Once the bug is fixed this should return empty documents without raising.
618+
result = await pipeline.run_async({"retriever": {"query": "test query"}})
619+
assert result["retriever"]["documents"] == []

0 commit comments

Comments
 (0)