Skip to content

Commit be22069

Browse files
add filter methods to MongoDB DocumentStore (#2474)
1 parent 4251d7f commit be22069

3 files changed

Lines changed: 212 additions & 0 deletions

File tree

integrations/mongodb_atlas/src/haystack_integrations/document_stores/mongodb_atlas/document_store.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,114 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:
423423
return
424424
await self._collection_async.delete_many(filter={"id": {"$in": document_ids}})
425425

426+
def delete_by_filter(self, filters: Dict[str, Any]) -> int:
427+
"""
428+
Deletes all documents that match the provided filters.
429+
430+
:param filters: The filters to apply to select documents for deletion.
431+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
432+
:returns: The number of documents deleted.
433+
"""
434+
self._ensure_connection_setup()
435+
assert self._collection is not None
436+
437+
try:
438+
normalized_filters = _normalize_filters(filters)
439+
result = self._collection.delete_many(filter=normalized_filters)
440+
deleted_count = result.deleted_count
441+
logger.info(
442+
"Deleted {n_docs} documents from collection '{collection}' using filters.",
443+
n_docs=deleted_count,
444+
collection=self.collection_name,
445+
)
446+
return deleted_count
447+
except Exception as e:
448+
msg = f"Failed to delete documents by filter from MongoDB Atlas: {e!s}"
449+
raise DocumentStoreError(msg) from e
450+
451+
async def delete_by_filter_async(self, filters: Dict[str, Any]) -> int:
452+
"""
453+
Asynchronously deletes all documents that match the provided filters.
454+
455+
:param filters: The filters to apply to select documents for deletion.
456+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
457+
:returns: The number of documents deleted.
458+
"""
459+
await self._ensure_connection_setup_async()
460+
assert self._collection_async is not None
461+
462+
try:
463+
normalized_filters = _normalize_filters(filters)
464+
result = await self._collection_async.delete_many(filter=normalized_filters)
465+
deleted_count = result.deleted_count
466+
logger.info(
467+
"Deleted {n_docs} documents from collection '{collection}' using filters.",
468+
n_docs=deleted_count,
469+
collection=self.collection_name,
470+
)
471+
return deleted_count
472+
except Exception as e:
473+
msg = f"Failed to delete documents by filter from MongoDB Atlas: {e!s}"
474+
raise DocumentStoreError(msg) from e
475+
476+
def update_by_filter(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int:
477+
"""
478+
Updates the metadata of all documents that match the provided filters.
479+
480+
:param filters: The filters to apply to select documents for updating.
481+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
482+
:param meta: The metadata fields to update.
483+
:returns: The number of documents updated.
484+
"""
485+
self._ensure_connection_setup()
486+
assert self._collection is not None
487+
488+
try:
489+
normalized_filters = _normalize_filters(filters)
490+
# Build update operation to set metadata fields
491+
# MongoDB stores documents with flatten=False, so metadata is in the "meta" field
492+
update_fields = {f"meta.{key}": value for key, value in meta.items()}
493+
result = self._collection.update_many(filter=normalized_filters, update={"$set": update_fields})
494+
updated_count = result.modified_count
495+
logger.info(
496+
"Updated {n_docs} documents in collection '{collection}' using filters.",
497+
n_docs=updated_count,
498+
collection=self.collection_name,
499+
)
500+
return updated_count
501+
except Exception as e:
502+
msg = f"Failed to update documents by filter in MongoDB Atlas: {e!s}"
503+
raise DocumentStoreError(msg) from e
504+
505+
async def update_by_filter_async(self, filters: Dict[str, Any], meta: Dict[str, Any]) -> int:
506+
"""
507+
Asynchronously updates the metadata of all documents that match the provided filters.
508+
509+
:param filters: The filters to apply to select documents for updating.
510+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
511+
:param meta: The metadata fields to update.
512+
:returns: The number of documents updated.
513+
"""
514+
await self._ensure_connection_setup_async()
515+
assert self._collection_async is not None
516+
517+
try:
518+
normalized_filters = _normalize_filters(filters)
519+
# Build update operation to set metadata fields
520+
# MongoDB stores documents with flatten=False, so metadata is in the "meta" field
521+
update_fields = {f"meta.{key}": value for key, value in meta.items()}
522+
result = await self._collection_async.update_many(filter=normalized_filters, update={"$set": update_fields})
523+
updated_count = result.modified_count
524+
logger.info(
525+
"Updated {n_docs} documents in collection '{collection}' using filters.",
526+
n_docs=updated_count,
527+
collection=self.collection_name,
528+
)
529+
return updated_count
530+
except Exception as e:
531+
msg = f"Failed to update documents by filter in MongoDB Atlas: {e!s}"
532+
raise DocumentStoreError(msg) from e
533+
426534
def delete_all_documents(self, *, recreate_collection: bool = False) -> None:
427535
"""
428536
Deletes all documents in the document store.

integrations/mongodb_atlas/tests/test_document_store.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,58 @@ def test_custom_content_field(self):
334334
finally:
335335
database[collection_name].drop()
336336

337+
def test_delete_by_filter(self, document_store: MongoDBAtlasDocumentStore):
338+
docs = [
339+
Document(content="Doc 1", meta={"category": "A"}),
340+
Document(content="Doc 2", meta={"category": "B"}),
341+
Document(content="Doc 3", meta={"category": "A"}),
342+
]
343+
document_store.write_documents(docs)
344+
assert document_store.count_documents() == 3
345+
346+
# Delete documents with category="A"
347+
deleted_count = document_store.delete_by_filter(
348+
filters={"field": "meta.category", "operator": "==", "value": "A"}
349+
)
350+
assert deleted_count == 2
351+
assert document_store.count_documents() == 1
352+
353+
# Verify the remaining document is the one with category="B"
354+
remaining_docs = document_store.filter_documents()
355+
assert len(remaining_docs) == 1
356+
assert remaining_docs[0].meta["category"] == "B"
357+
358+
def test_update_by_filter(self, document_store: MongoDBAtlasDocumentStore):
359+
docs = [
360+
Document(content="Doc 1", meta={"category": "A"}),
361+
Document(content="Doc 2", meta={"category": "B"}),
362+
Document(content="Doc 3", meta={"category": "A"}),
363+
]
364+
document_store.write_documents(docs)
365+
assert document_store.count_documents() == 3
366+
367+
# Update documents with category="A" to have status="published"
368+
updated_count = document_store.update_by_filter(
369+
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
370+
)
371+
assert updated_count == 2
372+
373+
# Verify the updated documents have the new metadata
374+
published_docs = document_store.filter_documents(
375+
filters={"field": "meta.status", "operator": "==", "value": "published"}
376+
)
377+
assert len(published_docs) == 2
378+
for doc in published_docs:
379+
assert doc.meta["status"] == "published"
380+
assert doc.meta["category"] == "A"
381+
382+
# Verify documents with category="B" were not updated
383+
unpublished_docs = document_store.filter_documents(
384+
filters={"field": "meta.category", "operator": "==", "value": "B"}
385+
)
386+
assert len(unpublished_docs) == 1
387+
assert "status" not in unpublished_docs[0].meta
388+
337389
def test_delete_all_documents(self, document_store: MongoDBAtlasDocumentStore):
338390
docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")]
339391
document_store.write_documents(docs)

integrations/mongodb_atlas/tests/test_document_store_async.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,58 @@ async def test_delete_documents_async(self, document_store: MongoDBAtlasDocument
128128
await document_store.delete_documents_async(document_ids=["1"])
129129
assert await document_store.count_documents_async() == 0
130130

131+
async def test_delete_by_filter_async(self, document_store: MongoDBAtlasDocumentStore):
132+
docs = [
133+
Document(content="Doc 1", meta={"category": "A"}),
134+
Document(content="Doc 2", meta={"category": "B"}),
135+
Document(content="Doc 3", meta={"category": "A"}),
136+
]
137+
await document_store.write_documents_async(docs)
138+
assert await document_store.count_documents_async() == 3
139+
140+
# Delete documents with category="A"
141+
deleted_count = await document_store.delete_by_filter_async(
142+
filters={"field": "meta.category", "operator": "==", "value": "A"}
143+
)
144+
assert deleted_count == 2
145+
assert await document_store.count_documents_async() == 1
146+
147+
# Verify the remaining document is the one with category="B"
148+
remaining_docs = await document_store.filter_documents_async()
149+
assert len(remaining_docs) == 1
150+
assert remaining_docs[0].meta["category"] == "B"
151+
152+
async def test_update_by_filter_async(self, document_store: MongoDBAtlasDocumentStore):
153+
docs = [
154+
Document(content="Doc 1", meta={"category": "A"}),
155+
Document(content="Doc 2", meta={"category": "B"}),
156+
Document(content="Doc 3", meta={"category": "A"}),
157+
]
158+
await document_store.write_documents_async(docs)
159+
assert await document_store.count_documents_async() == 3
160+
161+
# Update documents with category="A" to have status="published"
162+
updated_count = await document_store.update_by_filter_async(
163+
filters={"field": "meta.category", "operator": "==", "value": "A"}, meta={"status": "published"}
164+
)
165+
assert updated_count == 2
166+
167+
# Verify the updated documents have the new metadata
168+
published_docs = await document_store.filter_documents_async(
169+
filters={"field": "meta.status", "operator": "==", "value": "published"}
170+
)
171+
assert len(published_docs) == 2
172+
for doc in published_docs:
173+
assert doc.meta["status"] == "published"
174+
assert doc.meta["category"] == "A"
175+
176+
# Verify documents with category="B" were not updated
177+
unpublished_docs = await document_store.filter_documents_async(
178+
filters={"field": "meta.category", "operator": "==", "value": "B"}
179+
)
180+
assert len(unpublished_docs) == 1
181+
assert "status" not in unpublished_docs[0].meta
182+
131183
async def test_delete_all_documents_async(self, document_store: MongoDBAtlasDocumentStore):
132184
docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")]
133185
await document_store.write_documents_async(docs)

0 commit comments

Comments
 (0)