@@ -46,7 +46,9 @@ class OpenSearchDocumentStore:
4646
4747 Usage example:
4848 ```python
49- from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
49+ from haystack_integrations.document_stores.opensearch import (
50+ OpenSearchDocumentStore,
51+ )
5052 from haystack import Document
5153
5254 document_store = OpenSearchDocumentStore(hosts="localhost:9200")
@@ -420,6 +422,10 @@ def _prepare_bulk_write_request(
420422 opensearch_actions = []
421423 for doc in documents :
422424 doc_dict = doc .to_dict ()
425+
426+ # Extract routing from document metadata
427+ doc_routing = doc_dict .pop ("_routing" , None )
428+
423429 if "sparse_embedding" in doc_dict :
424430 sparse_embedding = doc_dict .pop ("sparse_embedding" , None )
425431 if sparse_embedding :
@@ -429,13 +435,17 @@ def _prepare_bulk_write_request(
429435 "The `sparse_embedding` field will be ignored." ,
430436 id = doc .id ,
431437 )
432- opensearch_actions .append (
433- {
434- "_op_type" : action ,
435- "_id" : doc .id ,
436- "_source" : doc_dict ,
437- }
438- )
438+
439+ action_dict = {
440+ "_op_type" : action ,
441+ "_id" : doc .id ,
442+ "_source" : doc_dict ,
443+ }
444+
445+ if doc_routing is not None :
446+ action_dict ["_routing" ] = doc_routing
447+
448+ opensearch_actions .append (action_dict )
439449
440450 return {
441451 "client" : self ._client if not is_async else self ._async_client ,
@@ -549,18 +559,36 @@ def _deserialize_document(hit: dict[str, Any]) -> Document:
549559 return Document .from_dict (data )
550560
551561 def _prepare_bulk_delete_request (
552- self , * , document_ids : list [str ], is_async : bool , refresh : Literal ["wait_for" , True , False ]
562+ self ,
563+ * ,
564+ document_ids : list [str ],
565+ is_async : bool ,
566+ refresh : Literal ["wait_for" , True , False ],
567+ routing : Optional [dict [str , str ]] = None ,
553568 ) -> dict [str , Any ]:
569+ def action_generator ():
570+ for id_ in document_ids :
571+ action = {"_op_type" : "delete" , "_id" : id_ }
572+ # Add routing if provided for this document ID
573+ if routing and id_ in routing and routing [id_ ] is not None :
574+ action ["_routing" ] = routing [id_ ]
575+ yield action
576+
554577 return {
555578 "client" : self ._client if not is_async else self ._async_client ,
556- "actions" : ({ "_op_type" : "delete" , "_id" : id_ } for id_ in document_ids ),
579+ "actions" : action_generator ( ),
557580 "refresh" : refresh ,
558581 "index" : self ._index ,
559582 "raise_on_error" : False ,
560583 "max_chunk_bytes" : self ._max_chunk_bytes ,
561584 }
562585
563- def delete_documents (self , document_ids : list [str ], refresh : Literal ["wait_for" , True , False ] = "wait_for" ) -> None :
586+ def delete_documents (
587+ self ,
588+ document_ids : list [str ],
589+ refresh : Literal ["wait_for" , True , False ] = "wait_for" ,
590+ routing : Optional [dict [str , str ]] = None ,
591+ ) -> None :
564592 """
565593 Deletes documents that match the provided `document_ids` from the document store.
566594
@@ -570,16 +598,24 @@ def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for",
570598 - `False`: Do not refresh (better performance for bulk operations).
571599 - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
572600 For more details, see the [OpenSearch refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/index-document/).
601+ :param routing: A dictionary mapping document IDs to their routing values.
602+ Routing values are used to determine the shard where documents are stored.
603+ If provided, the routing value for each document will be used during deletion.
573604 """
574605
575606 self ._ensure_initialized ()
576607
577- bulk (** self ._prepare_bulk_delete_request (document_ids = document_ids , is_async = False , refresh = refresh ))
608+ bulk (
609+ ** self ._prepare_bulk_delete_request (
610+ document_ids = document_ids , is_async = False , refresh = refresh , routing = routing
611+ )
612+ )
578613
579614 async def delete_documents_async (
580615 self ,
581616 document_ids : list [str ],
582617 refresh : Literal ["wait_for" , True , False ] = "wait_for" ,
618+ routing : Optional [dict [str , str ]] = None ,
583619 ) -> None :
584620 """
585621 Asynchronously deletes documents that match the provided `document_ids` from the document store.
@@ -590,11 +626,18 @@ async def delete_documents_async(
590626 - `False`: Do not refresh (better performance for bulk operations).
591627 - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
592628 For more details, see the [OpenSearch refresh documentation](https://opensearch.org/docs/latest/api-reference/document-apis/index-document/).
629+ :param routing: A dictionary mapping document IDs to their routing values.
630+ Routing values are used to determine the shard where documents are stored.
631+ If provided, the routing value for each document will be used during deletion.
593632 """
594633 await self ._ensure_initialized_async ()
595634 assert self ._async_client is not None
596635
597- await async_bulk (** self ._prepare_bulk_delete_request (document_ids = document_ids , is_async = True , refresh = refresh ))
636+ await async_bulk (
637+ ** self ._prepare_bulk_delete_request (
638+ document_ids = document_ids , is_async = True , refresh = refresh , routing = routing
639+ )
640+ )
598641
599642 def _prepare_delete_all_request (self , * , refresh : bool ) -> dict [str , Any ]:
600643 return {
0 commit comments