Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions veadk/database/database_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import time
from typing import BinaryIO, TextIO
Expand Down
52 changes: 50 additions & 2 deletions veadk/database/vector/opensearch_vector_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def query(self, query: str, **kwargs: Any) -> list[str]:
assert collection_name is not None, "Collection name is required."
if not self._opensearch_client.indices.exists(index=collection_name):
logger.warning(
f"querying {query}, but collection {collection_name} does not exist. retun a empty list."
f"querying {query}, but collection {collection_name} does not exist. return a empty list."
)
return []
query_vector = self._embedding_client.embed_query(query)
Expand All @@ -196,10 +196,58 @@ def query(self, query: str, **kwargs: Any) -> list[str]:

@override
def delete(self, collection_name: str, **kwargs: Any):
"""drop index"""
if not self._opensearch_client.indices.exists(index=collection_name):
raise ValueError(f"Collection {collection_name} does not exist.")
self._opensearch_client.indices.delete(index=collection_name)

def is_empty(self, collection_name: str):
def is_empty(self, collection_name: str) -> bool:
response = self._opensearch_client.count(index=collection_name)
return response["count"] == 0

def collection_exists(self, collection_name: str) -> bool:
return self._opensearch_client.indices.exists(index=collection_name)

def list_all_collection(self) -> list:
"""List all index name of OpenSearch."""
response = self._opensearch_client.indices.get_alias()
return list(response.keys())

def get_all_docs(self, collection_name: str, size: int = 10000) -> list[dict]:
"""Match all docs in one index of OpenSearch"""
if not self.collection_exists(collection_name):
logger.warning(
f"Get all docs, but collection {collection_name} does not exist. return a empty list."
)
return []

query = {"size": size, "query": {"match_all": {}}}
response = self._opensearch_client.search(index=collection_name, body=query)
return [
{
"id": hit["_id"],
"page_content": hit["_source"]["page_content"],
}
for hit in response["hits"]["hits"]
]

def delete_by_query(self, collection_name: str, query: str):
"""Delete docs by query in one index of OpenSearch"""
if not self.collection_exists(collection_name):
raise ValueError(f"Collection {collection_name} does not exist.")

query = {"query": {"match": {"page_content": query}}}
response = self._opensearch_client.delete_by_query(
index=collection_name, body=query
)
self._opensearch_client.indices.refresh(index=collection_name)
return response

def delete_by_id(self, collection_name: str, id: str):
"""Delete docs by id in index of OpenSearch"""
if not self.collection_exists(collection_name):
raise ValueError(f"Collection {collection_name} does not exist.")

response = self._opensearch_client.delete(index=collection_name, id=id)
self._opensearch_client.indices.refresh(index=collection_name)
return response
2 changes: 1 addition & 1 deletion veadk/memory/long_term_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _filter_and_convert_events(self, events: list[Event]) -> list[str]:
# convert: to string-format for storage
message = event.content.model_dump(exclude_none=True, mode="json")

final_events.append(json.dumps(message))
final_events.append(json.dumps(message, ensure_ascii=False))
return final_events

@override
Expand Down