55import base64
66import datetime
77import json
8- from asyncio import Semaphore , gather
98from dataclasses import asdict
109from typing import Any , NoReturn
11- from uuid import UUID
1210
1311from haystack import logging
1412from haystack .core .serialization import default_from_dict , default_to_dict
1513from haystack .dataclasses .document import Document
1614from haystack .document_stores .errors import DocumentStoreError , DuplicateDocumentError
1715from haystack .document_stores .types .policy import DuplicatePolicy
18- from more_itertools import batched
1916
2017import weaviate
2118from weaviate .collections .classes .aggregate import (
6966# See WeaviateDocumentStore._query_with_filters() for more information.
7067DEFAULT_QUERY_LIMIT = 9999
7168
69+ # See weaviate.collections.queries.hybrid.query.sync.pyi for the default value of alpha
70+ DEFAULT_ALPHA = 0.7
71+
7272
7373class WeaviateDocumentStore :
7474 """
@@ -112,7 +112,6 @@ def __init__(
112112 additional_config : AdditionalConfig | None = None ,
113113 grpc_port : int = 50051 ,
114114 grpc_secure : bool = False ,
115- concurrency_limit : int = 5 ,
116115 ) -> None :
117116 """
118117 Create a new instance of WeaviateDocumentStore and connects to the Weaviate instance.
@@ -156,8 +155,6 @@ def __init__(
156155 The port to use for the gRPC connection.
157156 :param grpc_secure:
158157 Whether to use a secure channel for the underlying gRPC API.
159- :param concurrency_limit:
160- Number of parallel requests to make. Default is 5.
161158 """
162159 self ._url = url
163160 self ._auth_client_secret = auth_client_secret
@@ -166,7 +163,6 @@ def __init__(
166163 self ._additional_config = additional_config
167164 self ._grpc_port = grpc_port
168165 self ._grpc_secure = grpc_secure
169- self .concurrency_limit = concurrency_limit
170166 self ._client : weaviate .WeaviateClient | None = None
171167 self ._async_client : weaviate .WeaviateAsyncClient | None = None
172168 self ._collection : weaviate .Collection | None = None
@@ -1225,7 +1221,6 @@ async def delete_all_documents_async(self, *, recreate_index: bool = False, batc
12251221 Reference: https://docs.weaviate.io/weaviate/manage-objects/delete#delete-all-objects
12261222 """
12271223 client = await self .async_client
1228- sem = Semaphore (max (1 , self .concurrency_limit ))
12291224
12301225 if recreate_index :
12311226 # get current up-to-date config from server, so we can recreate the collection faithfully
@@ -1245,18 +1240,23 @@ async def delete_all_documents_async(self, *, recreate_index: bool = False, batc
12451240 collection = await self .async_collection
12461241 async for obj in collection .iterator (return_properties = [], include_vector = False ):
12471242 uuids .append (obj .uuid )
1248-
1249- async def _runner (uuids : list [UUID ]) -> None :
1250- async with sem :
1243+ if len (uuids ) >= batch_size :
12511244 res = await collection .data .delete_many (where = weaviate .classes .query .Filter .by_id ().contains_any (uuids ))
1245+ if res .successful < len (uuids ):
1246+ logger .warning (
1247+ "Not all documents in the batch have been deleted. "
1248+ "Make sure to specify a deletion `batch_size` which is less than `QUERY_MAXIMUM_RESULTS`." ,
1249+ )
1250+ uuids .clear ()
1251+
1252+ if uuids :
1253+ res = await collection .data .delete_many (where = weaviate .classes .query .Filter .by_id ().contains_any (uuids ))
12521254 if res .successful < len (uuids ):
12531255 logger .warning (
1254- "Not all documents in the batch have been deleted. "
1256+ "Not all documents have been deleted. "
12551257 "Make sure to specify a deletion `batch_size` which is less than `QUERY_MAXIMUM_RESULTS`." ,
12561258 )
12571259
1258- await gather (* [_runner (list (batch )) for batch in batched (uuids , batch_size )])
1259-
12601260 def delete_by_filter (self , filters : dict [str , Any ]) -> int :
12611261 """
12621262 Deletes all documents that match the provided filters.
@@ -1572,12 +1572,10 @@ def _hybrid_retrieval(
15721572 query_embedding : list [float ],
15731573 filters : dict [str , Any ] | None = None ,
15741574 top_k : int | None = None ,
1575- alpha : float | None = None ,
1575+ alpha : float = DEFAULT_ALPHA ,
15761576 max_vector_distance : float | None = None ,
15771577 ) -> list [Document ]:
15781578 properties = [p .name for p in self .collection .config .get ().properties ]
1579- if alpha is None :
1580- alpha = 0.7
15811579 result = self .collection .query .hybrid (
15821580 query = query ,
15831581 vector = query_embedding ,
@@ -1599,14 +1597,12 @@ async def _hybrid_retrieval_async(
15991597 query_embedding : list [float ],
16001598 filters : dict [str , Any ] | None = None ,
16011599 top_k : int | None = None ,
1602- alpha : float | None = None ,
1600+ alpha : float = DEFAULT_ALPHA ,
16031601 max_vector_distance : float | None = None ,
16041602 ) -> list [Document ]:
16051603 collection = await self .async_collection
16061604 config = await collection .config .get ()
16071605 properties = [p .name for p in config .properties ]
1608- if alpha is None :
1609- alpha = 0.7
16101606 result = await collection .query .hybrid (
16111607 query = query ,
16121608 vector = query_embedding ,
0 commit comments