Skip to content

Commit 5ee2702

Browse files
Added routing functionality for bulk write and bulk delete requests (#2624)
Co-authored-by: Michele Pangrazzi <xmikex83@gmail.com>
1 parent 927d201 commit 5ee2702

3 files changed

Lines changed: 177 additions & 13 deletions

File tree

integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ class OpenSearchDocumentStore:
4646
4747
Usage example:
4848
```python
49-
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
49+
from haystack_integrations.document_stores.opensearch import (
50+
OpenSearchDocumentStore,
51+
)
5052
from haystack import Document
5153
5254
document_store = OpenSearchDocumentStore(hosts="localhost:9200")
@@ -420,6 +422,10 @@ def _prepare_bulk_write_request(
420422
opensearch_actions = []
421423
for doc in documents:
422424
doc_dict = doc.to_dict()
425+
426+
# Extract routing from document metadata
427+
doc_routing = doc_dict.pop("_routing", None)
428+
423429
if "sparse_embedding" in doc_dict:
424430
sparse_embedding = doc_dict.pop("sparse_embedding", None)
425431
if sparse_embedding:
@@ -429,13 +435,17 @@ def _prepare_bulk_write_request(
429435
"The `sparse_embedding` field will be ignored.",
430436
id=doc.id,
431437
)
432-
opensearch_actions.append(
433-
{
434-
"_op_type": action,
435-
"_id": doc.id,
436-
"_source": doc_dict,
437-
}
438-
)
438+
439+
action_dict = {
440+
"_op_type": action,
441+
"_id": doc.id,
442+
"_source": doc_dict,
443+
}
444+
445+
if doc_routing is not None:
446+
action_dict["_routing"] = doc_routing
447+
448+
opensearch_actions.append(action_dict)
439449

440450
return {
441451
"client": self._client if not is_async else self._async_client,
@@ -549,18 +559,36 @@ def _deserialize_document(hit: dict[str, Any]) -> Document:
549559
return Document.from_dict(data)
550560

551561
def _prepare_bulk_delete_request(
552-
self, *, document_ids: list[str], is_async: bool, refresh: Literal["wait_for", True, False]
562+
self,
563+
*,
564+
document_ids: list[str],
565+
is_async: bool,
566+
refresh: Literal["wait_for", True, False],
567+
routing: Optional[dict[str, str]] = None,
553568
) -> dict[str, Any]:
569+
def action_generator():
570+
for id_ in document_ids:
571+
action = {"_op_type": "delete", "_id": id_}
572+
# Add routing if provided for this document ID
573+
if routing and id_ in routing and routing[id_] is not None:
574+
action["_routing"] = routing[id_]
575+
yield action
576+
554577
return {
555578
"client": self._client if not is_async else self._async_client,
556-
"actions": ({"_op_type": "delete", "_id": id_} for id_ in document_ids),
579+
"actions": action_generator(),
557580
"refresh": refresh,
558581
"index": self._index,
559582
"raise_on_error": False,
560583
"max_chunk_bytes": self._max_chunk_bytes,
561584
}
562585

563-
def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None:
586+
def delete_documents(
587+
self,
588+
document_ids: list[str],
589+
refresh: Literal["wait_for", True, False] = "wait_for",
590+
routing: Optional[dict[str, str]] = None,
591+
) -> None:
564592
"""
565593
Deletes documents that match the provided `document_ids` from the document store.
566594
@@ -570,16 +598,24 @@ def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for",
570598
- `False`: Do not refresh (better performance for bulk operations).
571599
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
572600
For more details, see the [OpenSearch refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/index-document/).
601+
:param routing: A dictionary mapping document IDs to their routing values.
602+
Routing values are used to determine the shard where documents are stored.
603+
If provided, the routing value for each document will be used during deletion.
573604
"""
574605

575606
self._ensure_initialized()
576607

577-
bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=False, refresh=refresh))
608+
bulk(
609+
**self._prepare_bulk_delete_request(
610+
document_ids=document_ids, is_async=False, refresh=refresh, routing=routing
611+
)
612+
)
578613

579614
async def delete_documents_async(
580615
self,
581616
document_ids: list[str],
582617
refresh: Literal["wait_for", True, False] = "wait_for",
618+
routing: Optional[dict[str, str]] = None,
583619
) -> None:
584620
"""
585621
Asynchronously deletes documents that match the provided `document_ids` from the document store.
@@ -590,11 +626,18 @@ async def delete_documents_async(
590626
- `False`: Do not refresh (better performance for bulk operations).
591627
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
592628
For more details, see the [OpenSearch refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/index-document/).
629+
:param routing: A dictionary mapping document IDs to their routing values.
630+
Routing values are used to determine the shard where documents are stored.
631+
If provided, the routing value for each document will be used during deletion.
593632
"""
594633
await self._ensure_initialized_async()
595634
assert self._async_client is not None
596635

597-
await async_bulk(**self._prepare_bulk_delete_request(document_ids=document_ids, is_async=True, refresh=refresh))
636+
await async_bulk(
637+
**self._prepare_bulk_delete_request(
638+
document_ids=document_ids, is_async=True, refresh=refresh, routing=routing
639+
)
640+
)
598641

599642
def _prepare_delete_all_request(self, *, refresh: bool) -> dict[str, Any]:
600643
return {

integrations/opensearch/tests/test_document_store.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,46 @@ def test_get_default_mappings(_mock_opensearch_client):
110110
}
111111

112112

113+
@patch("haystack_integrations.document_stores.opensearch.document_store.bulk")
114+
def test_routing_extracted_from_metadata(mock_bulk, document_store):
115+
"""Test routing extraction from document metadata"""
116+
mock_bulk.return_value = (2, [])
117+
118+
docs = [
119+
Document(id="1", content="Doc", meta={"_routing": "user_a", "other": "data"}),
120+
Document(id="2", content="Doc"),
121+
]
122+
document_store.write_documents(docs)
123+
124+
actions = list(mock_bulk.call_args.kwargs["actions"])
125+
126+
# Routing should be at action level, not in _source
127+
assert actions[0]["_routing"] == "user_a"
128+
assert "_routing" not in actions[0]["_source"].get("meta", {})
129+
130+
# Other metadata should be preserved
131+
assert actions[0]["_source"]["other"] == "data"
132+
133+
# Second doc has no routing
134+
assert "_routing" not in actions[1]
135+
assert "_routing" not in actions[1]["_source"].get("meta", {})
136+
137+
138+
@patch("haystack_integrations.document_stores.opensearch.document_store.bulk")
139+
def test_routing_in_delete(mock_bulk, document_store):
140+
"""Test routing parameter in delete operations"""
141+
mock_bulk.return_value = (2, [])
142+
143+
routing_map = {"1": "user_a", "2": "user_b"}
144+
document_store.delete_documents(["1", "2", "3"], routing=routing_map)
145+
146+
actions = list(mock_bulk.call_args.kwargs["actions"])
147+
148+
assert actions[0]["_routing"] == "user_a"
149+
assert actions[1]["_routing"] == "user_b"
150+
assert "_routing" not in actions[2]
151+
152+
113153
@pytest.mark.integration
114154
class TestDocumentStore(CountDocumentsTest, WriteDocumentsTest, DeleteDocumentsTest):
115155
"""
@@ -574,3 +614,45 @@ def test_update_by_filter(self, document_store: OpenSearchDocumentStore):
574614
)
575615
assert len(draft_docs) == 1
576616
assert draft_docs[0].meta["category"] == "B"
617+
618+
@pytest.mark.integration
619+
def test_write_with_routing(self, document_store: OpenSearchDocumentStore):
620+
"""Test writing documents with routing metadata"""
621+
docs = [
622+
Document(id="1", content="User A doc", meta={"_routing": "user_a", "category": "test"}),
623+
Document(id="2", content="User B doc", meta={"_routing": "user_b"}),
624+
Document(id="3", content="No routing"),
625+
]
626+
627+
written = document_store.write_documents(docs)
628+
assert written == 3
629+
assert document_store.count_documents() == 3
630+
631+
# Verify _routing not stored in metadata
632+
retrieved = document_store.filter_documents()
633+
retrieved_by_id = {doc.id: doc for doc in retrieved}
634+
635+
# Check _routing is not stored for any document
636+
for doc in retrieved:
637+
assert "_routing" not in doc.meta
638+
639+
assert retrieved_by_id["1"].meta["category"] == "test"
640+
641+
assert retrieved_by_id["2"].meta == {}
642+
643+
assert retrieved_by_id["3"].meta == {}
644+
645+
@pytest.mark.integration
646+
def test_delete_with_routing(self, document_store: OpenSearchDocumentStore):
647+
"""Test deleting documents with routing"""
648+
docs = [
649+
Document(id="1", content="Doc 1", meta={"_routing": "user_a"}),
650+
Document(id="2", content="Doc 2", meta={"_routing": "user_b"}),
651+
Document(id="3", content="Doc 3"),
652+
]
653+
document_store.write_documents(docs)
654+
655+
routing_map = {"1": "user_a", "2": "user_b"}
656+
document_store.delete_documents(["1", "2"], routing=routing_map)
657+
658+
assert document_store.count_documents() == 1

integrations/opensearch/tests/test_document_store_async.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,45 @@ async def test_delete_all_documents_no_index_recreation(self, document_store: Op
299299
assert len(results) == 1
300300
assert results[0].content == "New document after delete all"
301301

302+
@pytest.mark.asyncio
303+
async def test_write_with_routing(self, document_store: OpenSearchDocumentStore):
304+
"""Test async writing documents with routing metadata"""
305+
docs = [
306+
Document(id="1", content="User A doc", meta={"_routing": "user_a", "category": "test"}),
307+
Document(id="2", content="User B doc", meta={"_routing": "user_b"}),
308+
Document(id="3", content="No routing"),
309+
]
310+
311+
written = await document_store.write_documents_async(docs)
312+
assert written == 3
313+
assert await document_store.count_documents_async() == 3
314+
315+
# Verify _routing not stored in metadata
316+
retrieved = await document_store.filter_documents_async()
317+
retrieved_by_id = {doc.id: doc for doc in retrieved}
318+
319+
# Check _routing is not stored for any document
320+
for doc in retrieved:
321+
assert "_routing" not in doc.meta
322+
323+
assert retrieved_by_id["1"].meta["category"] == "test"
324+
assert retrieved_by_id["2"].meta == {}
325+
assert retrieved_by_id["3"].meta == {}
326+
327+
@pytest.mark.asyncio
328+
async def test_delete_with_routing(self, document_store: OpenSearchDocumentStore):
329+
"""Test async deleting documents with routing"""
330+
docs = [
331+
Document(id="1", content="Doc 1", meta={"_routing": "user_a"}),
332+
Document(id="2", content="Doc 2", meta={"_routing": "user_b"}),
333+
Document(id="3", content="Doc 3"),
334+
]
335+
await document_store.write_documents_async(docs)
336+
337+
routing_map = {"1": "user_a", "2": "user_b"}
338+
await document_store.delete_documents_async(["1", "2"], routing=routing_map)
339+
assert await document_store.count_documents_async() == 1
340+
302341
async def test_delete_by_filter_async(self, document_store: OpenSearchDocumentStore):
303342
docs = [
304343
Document(content="Doc 1", meta={"category": "A"}),

0 commit comments

Comments
 (0)