Skip to content

Commit b65cdaf

Browse files
feat: adding update_by_filter and delete_by_filter to PgVector document store (#2647)
* adding new operations + tests * fixing linting issues * formatting * cleaning up
1 parent 684390a commit b65cdaf

3 files changed

Lines changed: 622 additions & 0 deletions

File tree

integrations/pgvector/src/haystack_integrations/document_stores/pgvector/document_store.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-present deepset GmbH <info@deepset.ai>
22
#
33
# SPDX-License-Identifier: Apache-2.0
4+
45
from typing import Any, Literal, Optional, Union, overload
56

67
from haystack import default_from_dict, default_to_dict, logging
@@ -13,6 +14,7 @@
1314
from psycopg.rows import DictRow, dict_row
1415
from psycopg.sql import SQL, Composed, Identifier
1516
from psycopg.sql import Literal as SQLLiteral
17+
from psycopg.types.json import Jsonb
1618

1719
from pgvector.psycopg import register_vector, register_vector_async
1820

@@ -1036,6 +1038,190 @@ async def delete_all_documents_async(self) -> None:
10361038
error_msg="Could not delete all documents from PgvectorDocumentStore",
10371039
)
10381040

1041+
def delete_by_filter(self, filters: dict[str, Any]) -> int:
1042+
"""
1043+
Deletes all documents that match the provided filters.
1044+
1045+
:param filters: The filters to apply to select documents for deletion.
1046+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
1047+
:returns: The number of documents deleted.
1048+
"""
1049+
_validate_filters(filters)
1050+
1051+
delete_sql = SQL("DELETE FROM {schema_name}.{table_name}").format(
1052+
schema_name=Identifier(self.schema_name),
1053+
table_name=Identifier(self.table_name),
1054+
)
1055+
1056+
params = ()
1057+
if filters:
1058+
sql_where_clause, params = _convert_filters_to_where_clause_and_params(filters)
1059+
delete_sql += sql_where_clause
1060+
1061+
self._ensure_db_setup()
1062+
assert self._cursor is not None
1063+
1064+
try:
1065+
self._execute_sql(
1066+
cursor=self._cursor,
1067+
sql_query=delete_sql,
1068+
params=params,
1069+
error_msg="Could not delete documents by filter from PgvectorDocumentStore",
1070+
)
1071+
deleted_count = self._cursor.rowcount
1072+
logger.info(
1073+
"Deleted {n_docs} documents from table '{schema}.{table}' using filters.",
1074+
n_docs=deleted_count,
1075+
schema=self.schema_name,
1076+
table=self.table_name,
1077+
)
1078+
return deleted_count
1079+
except Error as e:
1080+
msg = f"Failed to delete documents by filter from PgvectorDocumentStore: {e!s}"
1081+
raise DocumentStoreError(msg) from e
1082+
1083+
async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
1084+
"""
1085+
Asynchronously deletes all documents that match the provided filters.
1086+
1087+
:param filters: The filters to apply to select documents for deletion.
1088+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
1089+
:returns: The number of documents deleted.
1090+
"""
1091+
_validate_filters(filters)
1092+
1093+
delete_sql = SQL("DELETE FROM {schema_name}.{table_name}").format(
1094+
schema_name=Identifier(self.schema_name),
1095+
table_name=Identifier(self.table_name),
1096+
)
1097+
1098+
params = ()
1099+
if filters:
1100+
sql_where_clause, params = _convert_filters_to_where_clause_and_params(filters)
1101+
delete_sql += sql_where_clause
1102+
1103+
await self._ensure_db_setup_async()
1104+
assert self._async_cursor is not None
1105+
1106+
try:
1107+
await self._execute_sql_async(
1108+
cursor=self._async_cursor,
1109+
sql_query=delete_sql,
1110+
params=params,
1111+
error_msg="Could not delete documents by filter from PgvectorDocumentStore",
1112+
)
1113+
deleted_count = self._async_cursor.rowcount
1114+
logger.info(
1115+
"Deleted {n_docs} documents from table '{schema}.{table}' using filters.",
1116+
n_docs=deleted_count,
1117+
schema=self.schema_name,
1118+
table=self.table_name,
1119+
)
1120+
return deleted_count
1121+
except Error as e:
1122+
msg = f"Failed to delete documents by filter from PgvectorDocumentStore: {e!s}"
1123+
raise DocumentStoreError(msg) from e
1124+
1125+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
1126+
"""
1127+
Updates the metadata of all documents that match the provided filters.
1128+
1129+
:param filters: The filters to apply to select documents for updating.
1130+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
1131+
:param meta: The metadata fields to update.
1132+
:returns: The number of documents updated.
1133+
"""
1134+
_validate_filters(filters)
1135+
1136+
if not meta:
1137+
msg = "meta must be a non-empty dictionary"
1138+
raise ValueError(msg)
1139+
1140+
update_sql = SQL(
1141+
"UPDATE {schema_name}.{table_name} SET meta = COALESCE(meta, '{{}}'::jsonb) || %s::jsonb"
1142+
).format(
1143+
schema_name=Identifier(self.schema_name),
1144+
table_name=Identifier(self.table_name),
1145+
)
1146+
1147+
params: tuple[Any, ...] = (Jsonb(meta),)
1148+
if filters:
1149+
sql_where_clause, where_params = _convert_filters_to_where_clause_and_params(filters)
1150+
update_sql += sql_where_clause
1151+
params = params + where_params
1152+
1153+
self._ensure_db_setup()
1154+
assert self._cursor is not None
1155+
1156+
try:
1157+
self._execute_sql(
1158+
cursor=self._cursor,
1159+
sql_query=update_sql,
1160+
params=params,
1161+
error_msg="Could not update documents by filter from PgvectorDocumentStore",
1162+
)
1163+
updated_count = self._cursor.rowcount
1164+
logger.info(
1165+
"Updated {n_docs} documents in table '{schema}.{table}' using filters.",
1166+
n_docs=updated_count,
1167+
schema=self.schema_name,
1168+
table=self.table_name,
1169+
)
1170+
return updated_count
1171+
except Error as e:
1172+
msg = f"Failed to update documents by filter in PgvectorDocumentStore: {e!s}"
1173+
raise DocumentStoreError(msg) from e
1174+
1175+
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
1176+
"""
1177+
Asynchronously updates the metadata of all documents that match the provided filters.
1178+
1179+
:param filters: The filters to apply to select documents for updating.
1180+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
1181+
:param meta: The metadata fields to update.
1182+
:returns: The number of documents updated.
1183+
"""
1184+
_validate_filters(filters)
1185+
1186+
if not meta:
1187+
msg = "meta must be a non-empty dictionary"
1188+
raise ValueError(msg)
1189+
1190+
update_sql = SQL(
1191+
"UPDATE {schema_name}.{table_name} SET meta = COALESCE(meta, '{{}}'::jsonb) || %s::jsonb"
1192+
).format(
1193+
schema_name=Identifier(self.schema_name),
1194+
table_name=Identifier(self.table_name),
1195+
)
1196+
1197+
params: tuple[Any, ...] = (Jsonb(meta),)
1198+
if filters:
1199+
sql_where_clause, where_params = _convert_filters_to_where_clause_and_params(filters)
1200+
update_sql += sql_where_clause
1201+
params = params + where_params
1202+
1203+
await self._ensure_db_setup_async()
1204+
assert self._async_cursor is not None
1205+
1206+
try:
1207+
await self._execute_sql_async(
1208+
cursor=self._async_cursor,
1209+
sql_query=update_sql,
1210+
params=params,
1211+
error_msg="Could not update documents by filter from PgvectorDocumentStore",
1212+
)
1213+
updated_count = self._async_cursor.rowcount
1214+
logger.info(
1215+
"Updated {n_docs} documents in table '{schema}.{table}' using filters.",
1216+
n_docs=updated_count,
1217+
schema=self.schema_name,
1218+
table=self.table_name,
1219+
)
1220+
return updated_count
1221+
except Error as e:
1222+
msg = f"Failed to update documents by filter in PgvectorDocumentStore: {e!s}"
1223+
raise DocumentStoreError(msg) from e
1224+
10391225
def _build_keyword_retrieval_query(
10401226
self, query: str, top_k: int, filters: Optional[dict[str, Any]] = None
10411227
) -> tuple[Composed, tuple]:

0 commit comments

Comments
 (0)