Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,105 @@
from haystack.errors import FilterError


def normalize_filters(filters: dict[str, Any]) -> dict[str, Any]:
def normalize_filters(filters: dict[str, Any], nested_fields: set[str] | None = None) -> dict[str, Any]:
"""
Converts Haystack filters in OpenSearch compatible filters.

:param filters: Haystack filter dictionary.
:param nested_fields: Set of metadata field paths that are mapped as `nested` type in OpenSearch.
When provided, conditions targeting sub-fields of these paths are wrapped in `nested` queries.
"""
if not isinstance(filters, dict):
msg = "Filters must be a dictionary"
raise FilterError(msg)

if "field" in filters:
return {"bool": {"must": _parse_comparison_condition(filters)}}
return _parse_logical_condition(filters)
parsed = _parse_comparison_condition(filters, nested_fields)
if nested_fields:
nested_path = _get_nested_path(filters, nested_fields)
if nested_path:
return {"bool": {"must": {"nested": {"path": nested_path, "query": parsed}}}}
return {"bool": {"must": parsed}}
return _parse_logical_condition(filters, nested_fields)


def _get_nested_path(condition: dict[str, Any], nested_fields: set[str]) -> str | None:
"""Returns the nested path for a comparison condition, or None."""
if not (field := condition.get("field")):
return None
if field.startswith("meta."):
field = field[5:]
parts = field.split(".")
for i in range(1, len(parts)):
prefix = ".".join(parts[:i])
if prefix in nested_fields:
return prefix
return None


def _get_logical_condition_nested_path(condition: dict[str, Any], nested_fields: set[str]) -> str | None:
"""Returns the common nested path if all leaf comparisons in a logical sub-group share one, else None."""
paths: set[str | None] = set()
for c in condition.get("conditions", []):
if "field" in c:
paths.add(_get_nested_path(c, nested_fields))
elif "operator" in c and "conditions" in c:
paths.add(_get_logical_condition_nested_path(c, nested_fields))
else:
return None
if len(paths) == 1:
return next(iter(paths))
return None


def _group_nested_conditions(
raw_conditions: list[dict[str, Any]],
nested_fields: set[str],
operator: str,
) -> list[dict[str, Any]]:
"""
Group conditions by nested path and wrap each group in a `nested` query.

Both direct comparisons and logical sub-groups whose leaves all target the
same nested path are absorbed into the group so that they match within the
same array element.
"""
nested_groups: dict[str, list[dict[str, Any]]] = {}
flat_raw: list[dict[str, Any]] = []

for c in raw_conditions:
nested_path = _get_nested_path(c, nested_fields)
if nested_path is None and "operator" in c and "conditions" in c:
nested_path = _get_logical_condition_nested_path(c, nested_fields)
if nested_path:
nested_groups.setdefault(nested_path, []).append(c)
else:
flat_raw.append(c)

conditions = [_parse_comparison_condition(c, nested_fields) for c in flat_raw]

for path, group in nested_groups.items():
inner = []
for c in group:
if "operator" in c and "conditions" in c:
# Logical sub-group: parse without nested awareness to avoid
# redundant nested wrapping — the outer code handles that.
inner.append(_parse_logical_condition(c, nested_fields=None))
else:
inner.append(_parse_comparison_condition(c, nested_fields))
if len(inner) > 1:
inner = _normalize_ranges(inner)
if len(inner) == 1:
conditions.append({"nested": {"path": path, "query": inner[0]}})
elif operator == "OR":
conditions.append({"nested": {"path": path, "query": {"bool": {"should": inner}}}})
else:
conditions.append({"nested": {"path": path, "query": {"bool": {"must": inner}}}})

def _parse_logical_condition(condition: dict[str, Any]) -> dict[str, Any]:
return conditions


def _parse_logical_condition(condition: dict[str, Any], nested_fields: set[str] | None = None) -> dict[str, Any]:
if "operator" not in condition:
msg = f"'operator' key missing in {condition}"
raise FilterError(msg)
Expand All @@ -29,7 +114,12 @@ def _parse_logical_condition(condition: dict[str, Any]) -> dict[str, Any]:
raise FilterError(msg)

operator = condition["operator"]
conditions = [_parse_comparison_condition(c) for c in condition["conditions"]]

if nested_fields:
conditions = _group_nested_conditions(condition["conditions"], nested_fields, operator)
else:
conditions = [_parse_comparison_condition(c, nested_fields) for c in condition["conditions"]]

if len(conditions) > 1:
conditions = _normalize_ranges(conditions)
if operator == "AND":
Expand Down Expand Up @@ -189,11 +279,11 @@ def _not_in(field: str, value: Any) -> dict[str, Any]:
}


def _parse_comparison_condition(condition: dict[str, Any]) -> dict[str, Any]:
def _parse_comparison_condition(condition: dict[str, Any], nested_fields: set[str] | None = None) -> dict[str, Any]:
if "field" not in condition:
# 'field' key is only found in comparison dictionaries.
# We assume this is a logic dictionary since it's not present.
return _parse_logical_condition(condition)
return _parse_logical_condition(condition, nested_fields)
field: str = condition["field"]

if field.startswith("meta."):
Expand Down
48 changes: 48 additions & 0 deletions integrations/opensearch/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,54 @@ def document_store_embedding_dim_4_no_emb_returned_faiss():
store._client.indices.delete(index=index, params={"ignore": [400, 404]})


@pytest.fixture
def document_store_nested():
"""
OpenSearch document store with explicit nested fields.
"""
hosts = ["https://localhost:9200"]
index = _get_unique_index_name()

store = OpenSearchDocumentStore(
hosts=hosts,
index=index,
http_auth=("admin", "SecureHaystack!2026"),
verify_certs=False,
embedding_dim=768,
return_embedding=False,
nested_fields=["refs", "tags"],
)
store._ensure_initialized()
yield store

assert store._client
store._client.indices.delete(index=index, params={"ignore": [400, 404]})


@pytest.fixture
def document_store_wildcard_nested():
"""
OpenSearch document store with wildcard nested field auto-detection.
"""
hosts = ["https://localhost:9200"]
index = _get_unique_index_name()

store = OpenSearchDocumentStore(
hosts=hosts,
index=index,
http_auth=("admin", "SecureHaystack!2026"),
verify_certs=False,
embedding_dim=768,
return_embedding=False,
nested_fields="*",
)
store._ensure_initialized()
yield store

assert store._client
store._client.indices.delete(index=index, params={"ignore": [400, 404]})


@pytest.fixture
def test_documents():
return [
Expand Down
2 changes: 2 additions & 0 deletions integrations/opensearch/tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def test_ds_to_dict_basic_auth(self, _mock_opensearch_client):
"use_ssl": None,
"verify_certs": None,
"timeout": None,
"nested_fields": None,
},
}

Expand Down Expand Up @@ -354,6 +355,7 @@ def test_ds_to_dict_aws_auth(self, _mock_opensearch_client, monkeypatch: pytest.
"use_ssl": None,
"verify_certs": None,
"timeout": None,
"nested_fields": None,
},
}

Expand Down
1 change: 1 addition & 0 deletions integrations/opensearch/tests/test_bm25_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def test_to_dict(_mock_opensearch_client):
"use_ssl": None,
"verify_certs": None,
"timeout": None,
"nested_fields": None,
},
"type": "haystack_integrations.document_stores.opensearch.document_store.OpenSearchDocumentStore",
},
Expand Down
Loading
Loading