Skip to content

Commit c7f3ef1

Browse files
committed
merge main: add async init support
2 parents 6a80680 + ecb6203 commit c7f3ef1

18 files changed

Lines changed: 860 additions & 149 deletions

File tree

integrations/astra/CHANGELOG.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,23 @@
11
# Changelog
22

3-
## [integrations/astra-v3.0.1]
3+
## [integrations/astra-v3.1.0] - 2025-12-30
4+
5+
### 🚀 Features
6+
7+
- Adding `delete_by_filter` and `update_by_filter` (#2631)
8+
9+
10+
### ⚙️ CI
11+
12+
- Change pytest command (#2475)
413

514
### 🧹 Chores
615

7-
- Adopt PEP 585 type hinting
16+
- Remove Readme API CI workflow and configs (#2573)
17+
18+
### 🌀 Miscellaneous
819

20+
- Adopt PEP 585 type hinting (part 1) (#2509)
921

1022
## [integrations/astra-v3.0.0] - 2025-10-21
1123

integrations/astra/src/haystack_integrations/document_stores/astra/astra_client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
15
import json
26
from typing import Any, Optional, Union
37
from warnings import warn
@@ -356,3 +360,22 @@ def count_documents(self, upper_bound: int = 10000) -> int:
356360
:returns: the number of documents in the index
357361
"""
358362
return self._astra_db_collection.count_documents({}, upper_bound=upper_bound)
363+
364+
def update(
365+
self,
366+
*,
367+
filters: dict[str, Union[str, float, int, bool, list, dict]],
368+
update: dict[str, Any],
369+
) -> int:
370+
"""
371+
Update multiple documents in the Astra index that match the filter.
372+
373+
:param filters: the filter to match documents to update
374+
:param update: the update operations to apply (e.g., {"$set": {...}})
375+
376+
:returns:
377+
The number of documents updated
378+
"""
379+
update_result = self._astra_db_collection.update_many(filter=filters, update=update, upsert=False)
380+
381+
return update_result.update_info["nModified"]

integrations/astra/src/haystack_integrations/document_stores/astra/document_store.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
22
#
33
# SPDX-License-Identifier: Apache-2.0
4+
45
from typing import Any, Optional, Union
56

67
from haystack import default_from_dict, default_to_dict, logging
@@ -400,7 +401,6 @@ def delete_documents(self, document_ids: list[str]) -> None:
400401
Deletes documents from the document store.
401402
402403
:param document_ids: IDs of the documents to delete.
403-
:param delete_all: if `True`, delete all documents.
404404
:raises MissingDocumentError: if no document was deleted but document IDs were provided.
405405
"""
406406
if self.index.find_one_document({"filter": {}}) is not None:
@@ -420,7 +420,6 @@ def delete_all_documents(self) -> None:
420420
"""
421421
Deletes all documents from the document store.
422422
"""
423-
deletion_counter = 0
424423

425424
try:
426425
deletion_counter = self.index.delete_all_documents()
@@ -432,3 +431,59 @@ def delete_all_documents(self) -> None:
432431
logger.info("All documents deleted")
433432
else:
434433
logger.error("Could not delete all documents")
434+
435+
def delete_by_filter(self, filters: dict[str, Any]) -> int:
436+
"""
437+
Deletes documents that match the provided filters.
438+
439+
:param filters: The filters to apply to find documents to delete.
440+
:returns: The number of documents deleted.
441+
:raises AstraDocumentStoreFilterError: if the filter is invalid or not supported.
442+
"""
443+
if not isinstance(filters, dict):
444+
msg = "Filters must be a dictionary"
445+
raise AstraDocumentStoreFilterError(msg)
446+
447+
if "id" in filters:
448+
filters["_id"] = filters.pop("id")
449+
450+
converted_filters = _convert_filters(filters)
451+
deletion_count = self.index.delete(filters=converted_filters)
452+
453+
logger.info(f"{deletion_count} documents deleted by filter")
454+
return deletion_count
455+
456+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
457+
"""
458+
Updates documents that match the provided filters with the given metadata.
459+
460+
:param filters: The filters to apply to find documents to update.
461+
:param meta: The metadata fields to update. This will be merged with existing metadata.
462+
463+
:returns:
464+
The number of documents updated.
465+
466+
:raises:
467+
AstraDocumentStoreFilterError: if the filter is invalid or not supported.
468+
"""
469+
if not isinstance(filters, dict):
470+
msg = "Filters must be a dictionary"
471+
raise AstraDocumentStoreFilterError(msg)
472+
473+
if not isinstance(meta, dict):
474+
msg = "Meta must be a dictionary"
475+
raise AstraDocumentStoreFilterError(msg)
476+
477+
if "id" in filters:
478+
filters["_id"] = filters.pop("id")
479+
480+
converted_filters = _convert_filters(filters)
481+
482+
# use dot notation to update nested fields in the meta-object - ensures fields are created if they don't exist
483+
update_fields = {f"meta.{key}": value for key, value in meta.items()}
484+
update_operation = {"$set": update_fields}
485+
update_count = self.index.update(filters=converted_filters, update=update_operation) # type: ignore
486+
487+
logger.info(f"{update_count} documents updated by filter")
488+
489+
return update_count

integrations/astra/src/haystack_integrations/document_stores/astra/errors.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
22
#
33
# SPDX-License-Identifier: Apache-2.0
4+
45
from haystack.document_stores.errors import DocumentStoreError
56
from haystack.errors import FilterError
67

integrations/astra/src/haystack_integrations/document_stores/astra/filters.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
15
from typing import Any, Optional
26

37
from haystack.errors import FilterError

integrations/astra/tests/test_document_store.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-present Anant Corporation <support@anant.us>
22
#
33
# SPDX-License-Identifier: Apache-2.0
4+
45
import operator
56
import os
67
from unittest import mock
@@ -210,6 +211,67 @@ def test_delete_all_documents(self, document_store: AstraDocumentStore):
210211
document_store.delete_all_documents()
211212
assert document_store.count_documents() == 0
212213

214+
def test_delete_by_filter(self, document_store: AstraDocumentStore, filterable_docs):
215+
document_store.write_documents(filterable_docs)
216+
initial_count = document_store.count_documents()
217+
assert initial_count > 0
218+
219+
# count documents that match the filter before deletion
220+
matching_docs = [d for d in filterable_docs if d.meta.get("chapter") == "intro"]
221+
expected_deleted_count = len(matching_docs)
222+
223+
# delete all documents with chapter="intro"
224+
deleted_count = document_store.delete_by_filter(
225+
filters={"field": "meta.chapter", "operator": "==", "value": "intro"}
226+
)
227+
228+
assert deleted_count == expected_deleted_count
229+
assert document_store.count_documents() == initial_count - deleted_count
230+
231+
# remaining documents don't have chapter="intro"
232+
remaining_docs = document_store.filter_documents()
233+
for doc in remaining_docs:
234+
assert doc.meta.get("chapter") != "intro"
235+
236+
# all documents with chapter="intro" were deleted
237+
intro_docs = document_store.filter_documents(
238+
filters={"field": "meta.chapter", "operator": "==", "value": "intro"}
239+
)
240+
assert len(intro_docs) == 0
241+
242+
def test_update_by_filter(self, document_store: AstraDocumentStore, filterable_docs):
243+
document_store.write_documents(filterable_docs)
244+
initial_count = document_store.count_documents()
245+
assert initial_count > 0
246+
247+
# count documents that match the filter before update
248+
matching_docs = [d for d in filterable_docs if d.meta.get("chapter") == "intro"]
249+
expected_updated_count = len(matching_docs)
250+
251+
# update all documents with chapter="intro" to have status="updated"
252+
updated_count = document_store.update_by_filter(
253+
filters={"field": "meta.chapter", "operator": "==", "value": "intro"},
254+
meta={"status": "updated"},
255+
)
256+
257+
assert updated_count == expected_updated_count
258+
assert document_store.count_documents() == initial_count
259+
260+
# verify the updated documents have the new metadata
261+
updated_docs = document_store.filter_documents(
262+
filters={"field": "meta.status", "operator": "==", "value": "updated"}
263+
)
264+
assert len(updated_docs) == expected_updated_count
265+
for doc in updated_docs:
266+
assert doc.meta.get("chapter") == "intro"
267+
assert doc.meta.get("status") == "updated"
268+
269+
# verify other documents weren't affected
270+
all_docs = document_store.filter_documents()
271+
for doc in all_docs:
272+
if doc.meta.get("chapter") != "intro":
273+
assert doc.meta.get("status") != "updated"
274+
213275
@pytest.mark.skip(reason="Unsupported filter operator not.")
214276
def test_not_operator(self, document_store, filterable_docs):
215277
pass

integrations/google_genai/tests/test_chat_generator.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,17 +1300,14 @@ def test_aggregate_streaming_chunks_with_reasoning(monkeypatch):
13001300
chunk1.tool_calls = []
13011301
chunk1.meta = {"usage": {"prompt_tokens": 10, "completion_tokens": 5}}
13021302
chunk1.component_info = component_info
1303+
chunk1.reasoning = None
13031304

13041305
chunk2 = Mock()
13051306
chunk2.content = " world"
13061307
chunk2.tool_calls = []
13071308
chunk2.meta = {"usage": {"prompt_tokens": 10, "completion_tokens": 8}}
13081309
chunk2.component_info = component_info
1309-
1310-
# Mock the reasoning content that would be extracted
1311-
mock_reasoning = Mock()
1312-
mock_reasoning.reasoning_text = "I should greet the user politely"
1313-
mock_reasoning.reasoning_type = "REASONING_TYPE_UNSPECIFIED"
1310+
chunk2.reasoning = None
13141311

13151312
# Mock the final chunk with reasoning
13161313
final_chunk = Mock()
@@ -1321,6 +1318,7 @@ def test_aggregate_streaming_chunks_with_reasoning(monkeypatch):
13211318
"model": "gemini-2.5-pro",
13221319
}
13231320
final_chunk.component_info = component_info
1321+
final_chunk.reasoning = ReasoningContent(reasoning_text="I should greet the user politely")
13241322

13251323
# Add reasoning deltas to the final chunk meta (this is how the real method works)
13261324
final_chunk.meta["reasoning_deltas"] = [{"type": "reasoning", "content": "I should greet the user politely"}]

integrations/meta_llama/tests/test_llama_chat_generator.py

Lines changed: 1 addition & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from haystack import Pipeline
1111
from haystack.components.generators.utils import print_streaming_chunk
1212
from haystack.components.tools import ToolInvoker
13-
from haystack.dataclasses import ChatMessage, ChatRole, StreamingChunk, ToolCall
13+
from haystack.dataclasses import ChatMessage, ChatRole, StreamingChunk
1414
from haystack.tools import Tool, Toolset
1515
from haystack.utils.auth import Secret
1616
from openai import OpenAIError
@@ -387,61 +387,6 @@ class NobelPrizeInfo(BaseModel):
387387
assert "achievement_description" in msg
388388
assert msg["nationality"] == "American"
389389

390-
@pytest.mark.skipif(
391-
not os.environ.get("LLAMA_API_KEY", None),
392-
reason="Export an env var called LLAMA_API_KEY containing the Llama API key to run this test.",
393-
)
394-
@pytest.mark.integration
395-
def test_live_run_with_response_format_json_schema(self):
396-
response_schema = {
397-
"type": "json_schema",
398-
"json_schema": {
399-
"name": "CapitalCity",
400-
"strict": True,
401-
"schema": {
402-
"title": "CapitalCity",
403-
"type": "object",
404-
"properties": {
405-
"city": {"title": "City", "type": "string"},
406-
"country": {"title": "Country", "type": "string"},
407-
},
408-
"required": ["city", "country"],
409-
"additionalProperties": False,
410-
},
411-
},
412-
}
413-
414-
chat_messages = [ChatMessage.from_user("What's the capital of France?")]
415-
comp = MetaLlamaChatGenerator(generation_kwargs={"response_format": response_schema})
416-
results = comp.run(chat_messages)
417-
assert len(results["replies"]) == 1
418-
message: ChatMessage = results["replies"][0]
419-
msg = json.loads(message.text)
420-
assert "Paris" in msg["city"]
421-
assert isinstance(msg["country"], str)
422-
assert "France" in msg["country"]
423-
assert message.meta["finish_reason"] == "stop"
424-
425-
@pytest.mark.skipif(
426-
not os.environ.get("LLAMA_API_KEY", None),
427-
reason="Export an env var called LLAMA_API_KEY containing the OpenAI API key to run this test.",
428-
)
429-
@pytest.mark.integration
430-
def test_live_run_with_tools(self, tools):
431-
chat_messages = [ChatMessage.from_user("What's the weather like in Paris?")]
432-
component = MetaLlamaChatGenerator(tools=tools)
433-
results = component.run(chat_messages)
434-
assert len(results["replies"]) == 1
435-
message = results["replies"][0]
436-
assert message.text is None
437-
438-
assert message.tool_calls
439-
tool_call = message.tool_call
440-
assert isinstance(tool_call, ToolCall)
441-
assert tool_call.tool_name == "weather"
442-
assert tool_call.arguments == {"city": "Paris"}
443-
assert message.meta["finish_reason"] == "tool_calls"
444-
445390
@pytest.mark.skipif(
446391
not os.environ.get("LLAMA_API_KEY", None),
447392
reason="Export an env var called LLAMA_API_KEY containing the OpenAI API key to run this test.",

0 commit comments

Comments
 (0)