Skip to content

Commit 6cef733

Browse files
committed
feat(opensearch): Add support for nested fields in OpenSearchDocumentStore
1 parent 86485b5 commit 6cef733

9 files changed

Lines changed: 948 additions & 27 deletions

File tree

integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py

Lines changed: 126 additions & 19 deletions
Large diffs are not rendered by default.

integrations/opensearch/src/haystack_integrations/document_stores/opensearch/filters.py

Lines changed: 96 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,104 @@
77
from haystack.errors import FilterError
88

99

10-
def normalize_filters(filters: dict[str, Any]) -> dict[str, Any]:
10+
def normalize_filters(filters: dict[str, Any], nested_fields: set[str] | None = None) -> dict[str, Any]:
1111
"""
1212
Converts Haystack filters in OpenSearch compatible filters.
13+
14+
:param filters: Haystack filter dictionary.
15+
:param nested_fields: Set of metadata field paths that are mapped as ``nested`` type in OpenSearch.
16+
When provided, conditions targeting sub-fields of these paths are wrapped in ``nested`` queries.
1317
"""
1418
if not isinstance(filters, dict):
1519
msg = "Filters must be a dictionary"
1620
raise FilterError(msg)
1721

1822
if "field" in filters:
19-
return {"bool": {"must": _parse_comparison_condition(filters)}}
20-
return _parse_logical_condition(filters)
23+
parsed = _parse_comparison_condition(filters, nested_fields)
24+
if nested_fields:
25+
nested_path = _get_nested_path(filters, nested_fields)
26+
if nested_path:
27+
return {"bool": {"must": {"nested": {"path": nested_path, "query": parsed}}}}
28+
return {"bool": {"must": parsed}}
29+
return _parse_logical_condition(filters, nested_fields)
30+
31+
32+
def _get_nested_path(condition: dict[str, Any], nested_fields: set[str]) -> str | None:
33+
"""Returns the nested path for a comparison condition, or None."""
34+
if not (field := condition.get("field")):
35+
return None
36+
if field.startswith("meta."):
37+
field = field[5:]
38+
parts = field.split(".")
39+
for i in range(1, len(parts)):
40+
prefix = ".".join(parts[:i])
41+
if prefix in nested_fields:
42+
return prefix
43+
return None
44+
45+
46+
def _get_logical_condition_nested_path(condition: dict[str, Any], nested_fields: set[str]) -> str | None:
47+
"""Returns the common nested path if all leaf comparisons in a logical sub-group share one, else None."""
48+
paths: set[str | None] = set()
49+
for c in condition.get("conditions", []):
50+
if "field" in c:
51+
paths.add(_get_nested_path(c, nested_fields))
52+
elif "operator" in c and "conditions" in c:
53+
paths.add(_get_logical_condition_nested_path(c, nested_fields))
54+
else:
55+
return None
56+
if len(paths) == 1:
57+
return next(iter(paths))
58+
return None
59+
60+
61+
def _group_nested_conditions(
62+
raw_conditions: list[dict[str, Any]],
63+
nested_fields: set[str],
64+
operator: str,
65+
) -> list[dict[str, Any]]:
66+
"""Group conditions by nested path and wrap each group in a ``nested`` query.
67+
68+
Both direct comparisons and logical sub-groups whose leaves all target the
69+
same nested path are absorbed into the group so that they match within the
70+
same array element.
71+
"""
72+
nested_groups: dict[str, list[dict[str, Any]]] = {}
73+
flat_raw: list[dict[str, Any]] = []
74+
75+
for c in raw_conditions:
76+
nested_path = _get_nested_path(c, nested_fields)
77+
if nested_path is None and "operator" in c and "conditions" in c:
78+
nested_path = _get_logical_condition_nested_path(c, nested_fields)
79+
if nested_path:
80+
nested_groups.setdefault(nested_path, []).append(c)
81+
else:
82+
flat_raw.append(c)
83+
84+
conditions = [_parse_comparison_condition(c, nested_fields) for c in flat_raw]
85+
86+
for path, group in nested_groups.items():
87+
inner = []
88+
for c in group:
89+
if "operator" in c and "conditions" in c:
90+
# Logical sub-group: parse without nested awareness to avoid
91+
# redundant nested wrapping — the outer code handles that.
92+
inner.append(_parse_logical_condition(c, nested_fields=None))
93+
else:
94+
inner.append(_parse_comparison_condition(c, nested_fields))
95+
if len(inner) > 1:
96+
inner = _normalize_ranges(inner)
97+
if len(inner) == 1:
98+
conditions.append({"nested": {"path": path, "query": inner[0]}})
99+
elif operator == "OR":
100+
conditions.append({"nested": {"path": path, "query": {"bool": {"should": inner}}}})
101+
else:
102+
conditions.append({"nested": {"path": path, "query": {"bool": {"must": inner}}}})
21103

104+
return conditions
22105

23-
def _parse_logical_condition(condition: dict[str, Any]) -> dict[str, Any]:
106+
107+
def _parse_logical_condition(condition: dict[str, Any], nested_fields: set[str] | None = None) -> dict[str, Any]:
24108
if "operator" not in condition:
25109
msg = f"'operator' key missing in {condition}"
26110
raise FilterError(msg)
@@ -29,7 +113,12 @@ def _parse_logical_condition(condition: dict[str, Any]) -> dict[str, Any]:
29113
raise FilterError(msg)
30114

31115
operator = condition["operator"]
32-
conditions = [_parse_comparison_condition(c) for c in condition["conditions"]]
116+
117+
if nested_fields:
118+
conditions = _group_nested_conditions(condition["conditions"], nested_fields, operator)
119+
else:
120+
conditions = [_parse_comparison_condition(c, nested_fields) for c in condition["conditions"]]
121+
33122
if len(conditions) > 1:
34123
conditions = _normalize_ranges(conditions)
35124
if operator == "AND":
@@ -189,11 +278,11 @@ def _not_in(field: str, value: Any) -> dict[str, Any]:
189278
}
190279

191280

192-
def _parse_comparison_condition(condition: dict[str, Any]) -> dict[str, Any]:
281+
def _parse_comparison_condition(condition: dict[str, Any], nested_fields: set[str] | None = None) -> dict[str, Any]:
193282
if "field" not in condition:
194283
# 'field' key is only found in comparison dictionaries.
195284
# We assume this is a logic dictionary since it's not present.
196-
return _parse_logical_condition(condition)
285+
return _parse_logical_condition(condition, nested_fields)
197286
field: str = condition["field"]
198287

199288
if field.startswith("meta."):

integrations/opensearch/tests/conftest.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,54 @@ def document_store_embedding_dim_4_no_emb_returned_faiss():
143143
store._client.indices.delete(index=index, params={"ignore": [400, 404]})
144144

145145

146+
@pytest.fixture
147+
def document_store_nested():
148+
"""
149+
OpenSearch document store with explicit nested fields.
150+
"""
151+
hosts = ["https://localhost:9200"]
152+
index = _get_unique_index_name()
153+
154+
store = OpenSearchDocumentStore(
155+
hosts=hosts,
156+
index=index,
157+
http_auth=("admin", "SecureHaystack!2026"),
158+
verify_certs=False,
159+
embedding_dim=768,
160+
return_embedding=False,
161+
nested_fields=["refs", "tags"],
162+
)
163+
store._ensure_initialized()
164+
yield store
165+
166+
assert store._client
167+
store._client.indices.delete(index=index, params={"ignore": [400, 404]})
168+
169+
170+
@pytest.fixture
171+
def document_store_wildcard_nested():
172+
"""
173+
OpenSearch document store with wildcard nested field auto-detection.
174+
"""
175+
hosts = ["https://localhost:9200"]
176+
index = _get_unique_index_name()
177+
178+
store = OpenSearchDocumentStore(
179+
hosts=hosts,
180+
index=index,
181+
http_auth=("admin", "SecureHaystack!2026"),
182+
verify_certs=False,
183+
embedding_dim=768,
184+
return_embedding=False,
185+
nested_fields=["*"],
186+
)
187+
store._ensure_initialized()
188+
yield store
189+
190+
assert store._client
191+
store._client.indices.delete(index=index, params={"ignore": [400, 404]})
192+
193+
146194
@pytest.fixture
147195
def test_documents():
148196
return [

integrations/opensearch/tests/test_auth.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ def test_ds_to_dict_basic_auth(self, _mock_opensearch_client):
308308
"use_ssl": None,
309309
"verify_certs": None,
310310
"timeout": None,
311+
"nested_fields": None,
311312
},
312313
}
313314

@@ -354,6 +355,7 @@ def test_ds_to_dict_aws_auth(self, _mock_opensearch_client, monkeypatch: pytest.
354355
"use_ssl": None,
355356
"verify_certs": None,
356357
"timeout": None,
358+
"nested_fields": None,
357359
},
358360
}
359361

integrations/opensearch/tests/test_bm25_retriever.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def test_to_dict(_mock_opensearch_client):
6363
"use_ssl": None,
6464
"verify_certs": None,
6565
"timeout": None,
66+
"nested_fields": None,
6667
},
6768
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
6869
},

0 commit comments

Comments
 (0)