Skip to content

Commit 363c1ad

Browse files
committed
adding delete_by_filter and updated_by_filter
1 parent 618de89 commit 363c1ad

5 files changed

Lines changed: 149 additions & 2 deletions

File tree

integrations/astra/src/haystack_integrations/document_stores/astra/astra_client.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
15
import json
26
from typing import Any, Optional, Union
37
from warnings import warn
@@ -356,3 +360,23 @@ def count_documents(self, upper_bound: int = 10000) -> int:
356360
:returns: the number of documents in the index
357361
"""
358362
return self._astra_db_collection.count_documents({}, upper_bound=upper_bound)
363+
364+
def update_many(
365+
self,
366+
*,
367+
filter_condition: dict[str, Union[str, float, int, bool, list, dict]],
368+
update: dict[str, Any],
369+
) -> int:
370+
"""
371+
Update multiple documents in the Astra index that match the filter.
372+
373+
:param filter_condition: the filter to match documents to update
374+
:param update: the update operations to apply (e.g., {"$set": {...}})
375+
376+
:returns:
377+
The number of documents updated
378+
"""
379+
update_result = self._astra_db_collection.update_many(filter=filter_condition, update=update, upsert=False)
380+
381+
return update_result.update_info['nModified']
382+

integrations/astra/src/haystack_integrations/document_stores/astra/document_store.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
22
#
33
# SPDX-License-Identifier: Apache-2.0
4+
45
from typing import Any, Optional, Union
56

67
from haystack import default_from_dict, default_to_dict, logging
@@ -400,7 +401,6 @@ def delete_documents(self, document_ids: list[str]) -> None:
400401
Deletes documents from the document store.
401402
402403
:param document_ids: IDs of the documents to delete.
403-
:param delete_all: if `True`, delete all documents.
404404
:raises MissingDocumentError: if no document was deleted but document IDs were provided.
405405
"""
406406
if self.index.find_one_document({"filter": {}}) is not None:
@@ -420,7 +420,6 @@ def delete_all_documents(self) -> None:
420420
"""
421421
Deletes all documents from the document store.
422422
"""
423-
deletion_counter = 0
424423

425424
try:
426425
deletion_counter = self.index.delete_all_documents()
@@ -432,3 +431,58 @@ def delete_all_documents(self) -> None:
432431
logger.info("All documents deleted")
433432
else:
434433
logger.error("Could not delete all documents")
434+
435+
def delete_by_filter(self, filters: dict[str, Any]) -> int:
436+
"""
437+
Deletes documents that match the provided filters.
438+
439+
:param filters: The filters to apply to find documents to delete.
440+
:returns: The number of documents deleted.
441+
:raises AstraDocumentStoreFilterError: if the filter is invalid or not supported.
442+
"""
443+
if not isinstance(filters, dict):
444+
msg = "Filters must be a dictionary"
445+
raise AstraDocumentStoreFilterError(msg)
446+
447+
if "id" in filters:
448+
filters["_id"] = filters.pop("id")
449+
450+
converted_filters = _convert_filters(filters)
451+
deletion_count = self.index.delete(filters=converted_filters)
452+
453+
logger.info(f"{deletion_count} documents deleted by filter")
454+
return deletion_count
455+
456+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
457+
"""
458+
Updates documents that match the provided filters with the given metadata.
459+
460+
:param filters: The filters to apply to find documents to update.
461+
:param meta: The metadata fields to update. This will be merged with existing metadata.
462+
463+
:returns:
464+
The number of documents updated.
465+
466+
:raises:
467+
AstraDocumentStoreFilterError: if the filter is invalid or not supported.
468+
"""
469+
if not isinstance(filters, dict):
470+
msg = "Filters must be a dictionary"
471+
raise AstraDocumentStoreFilterError(msg)
472+
473+
if not isinstance(meta, dict):
474+
msg = "Meta must be a dictionary"
475+
raise AstraDocumentStoreFilterError(msg)
476+
477+
if "id" in filters:
478+
filters["_id"] = filters.pop("id")
479+
480+
converted_filters = _convert_filters(filters)
481+
482+
# use dot notation to update nested fields in the meta-object - ensures fields are created if they don't exist
483+
update_fields = {f"meta.{key}": value for key, value in meta.items()}
484+
update_operation = {"$set": update_fields}
485+
update_count = self.index.update_many(filter_condition=converted_filters, update=update_operation)
486+
487+
logger.info(f"{update_count} documents updated by filter")
488+
return update_count

integrations/astra/src/haystack_integrations/document_stores/astra/errors.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
22
#
33
# SPDX-License-Identifier: Apache-2.0
4+
45
from haystack.document_stores.errors import DocumentStoreError
56
from haystack.errors import FilterError
67

integrations/astra/src/haystack_integrations/document_stores/astra/filters.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
15
from typing import Any, Optional
26

37
from haystack.errors import FilterError

integrations/astra/tests/test_document_store.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
22
#
33
# SPDX-License-Identifier: Apache-2.0
4+
45
import operator
56
import os
67
from unittest import mock
@@ -210,6 +211,69 @@ def test_delete_all_documents(self, document_store: AstraDocumentStore):
210211
document_store.delete_all_documents()
211212
assert document_store.count_documents() == 0
212213

214+
def test_delete_by_filter(self, document_store: AstraDocumentStore, filterable_docs):
215+
document_store.write_documents(filterable_docs)
216+
initial_count = document_store.count_documents()
217+
assert initial_count > 0
218+
219+
# count documents that match the filter before deletion
220+
matching_docs = [
221+
d for d in filterable_docs if d.meta.get("chapter") == "intro"
222+
]
223+
expected_deleted_count = len(matching_docs)
224+
225+
# delete all documents with chapter="intro"
226+
deleted_count = document_store.delete_by_filter(
227+
filters={"field": "meta.chapter", "operator": "==", "value": "intro"}
228+
)
229+
230+
assert deleted_count == expected_deleted_count
231+
assert document_store.count_documents() == initial_count - deleted_count
232+
233+
# remaining documents don't have chapter="intro"
234+
remaining_docs = document_store.filter_documents()
235+
for doc in remaining_docs:
236+
assert doc.meta.get("chapter") != "intro"
237+
238+
# all documents with chapter="intro" were deleted
239+
intro_docs = document_store.filter_documents(
240+
filters={"field": "meta.chapter", "operator": "==", "value": "intro"}
241+
)
242+
assert len(intro_docs) == 0
243+
244+
def test_update_by_filter(self, document_store: AstraDocumentStore, filterable_docs):
245+
document_store.write_documents(filterable_docs)
246+
initial_count = document_store.count_documents()
247+
assert initial_count > 0
248+
249+
# count documents that match the filter before update
250+
matching_docs = [d for d in filterable_docs if d.meta.get("chapter") == "intro"]
251+
expected_updated_count = len(matching_docs)
252+
253+
# update all documents with chapter="intro" to have status="updated"
254+
updated_count = document_store.update_by_filter(
255+
filters={"field": "meta.chapter", "operator": "==", "value": "intro"},
256+
meta={"status": "updated"},
257+
)
258+
259+
assert updated_count == expected_updated_count
260+
assert document_store.count_documents() == initial_count
261+
262+
# verify the updated documents have the new metadata
263+
updated_docs = document_store.filter_documents(
264+
filters={"field": "meta.status", "operator": "==", "value": "updated"}
265+
)
266+
assert len(updated_docs) == expected_updated_count
267+
for doc in updated_docs:
268+
assert doc.meta.get("chapter") == "intro"
269+
assert doc.meta.get("status") == "updated"
270+
271+
# verify other documents weren't affected
272+
all_docs = document_store.filter_documents()
273+
for doc in all_docs:
274+
if doc.meta.get("chapter") != "intro":
275+
assert doc.meta.get("status") != "updated"
276+
213277
@pytest.mark.skip(reason="Unsupported filter operator not.")
214278
def test_not_operator(self, document_store, filterable_docs):
215279
pass

0 commit comments

Comments
 (0)