Skip to content

Commit c8fb585

Browse files
committed
adding new operations + tests
1 parent 684390a commit c8fb585

3 files changed

Lines changed: 625 additions & 0 deletions

File tree

integrations/pgvector/src/haystack_integrations/document_stores/pgvector/document_store.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from psycopg.rows import DictRow, dict_row
1414
from psycopg.sql import SQL, Composed, Identifier
1515
from psycopg.sql import Literal as SQLLiteral
16+
from psycopg.types.json import Jsonb
1617

1718
from pgvector.psycopg import register_vector, register_vector_async
1819

@@ -1036,6 +1037,186 @@ async def delete_all_documents_async(self) -> None:
10361037
error_msg="Could not delete all documents from PgvectorDocumentStore",
10371038
)
10381039

1040+
def delete_by_filter(self, filters: dict[str, Any]) -> int:
1041+
"""
1042+
Deletes all documents that match the provided filters.
1043+
1044+
:param filters: The filters to apply to select documents for deletion.
1045+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
1046+
:returns: The number of documents deleted.
1047+
"""
1048+
_validate_filters(filters)
1049+
1050+
delete_sql = SQL("DELETE FROM {schema_name}.{table_name}").format(
1051+
schema_name=Identifier(self.schema_name),
1052+
table_name=Identifier(self.table_name),
1053+
)
1054+
1055+
params = ()
1056+
if filters:
1057+
sql_where_clause, params = _convert_filters_to_where_clause_and_params(filters)
1058+
delete_sql += sql_where_clause
1059+
1060+
self._ensure_db_setup()
1061+
assert self._cursor is not None
1062+
1063+
try:
1064+
self._execute_sql(
1065+
cursor=self._cursor,
1066+
sql_query=delete_sql,
1067+
params=params,
1068+
error_msg="Could not delete documents by filter from PgvectorDocumentStore",
1069+
)
1070+
deleted_count = self._cursor.rowcount
1071+
logger.info(
1072+
"Deleted {n_docs} documents from table '{schema}.{table}' using filters.",
1073+
n_docs=deleted_count,
1074+
schema=self.schema_name,
1075+
table=self.table_name,
1076+
)
1077+
return deleted_count
1078+
except Error as e:
1079+
msg = f"Failed to delete documents by filter from PgvectorDocumentStore: {e!s}"
1080+
raise DocumentStoreError(msg) from e
1081+
1082+
async def delete_by_filter_async(self, filters: dict[str, Any]) -> int:
1083+
"""
1084+
Asynchronously deletes all documents that match the provided filters.
1085+
1086+
:param filters: The filters to apply to select documents for deletion.
1087+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
1088+
:returns: The number of documents deleted.
1089+
"""
1090+
_validate_filters(filters)
1091+
1092+
delete_sql = SQL("DELETE FROM {schema_name}.{table_name}").format(
1093+
schema_name=Identifier(self.schema_name),
1094+
table_name=Identifier(self.table_name),
1095+
)
1096+
1097+
params = ()
1098+
if filters:
1099+
sql_where_clause, params = _convert_filters_to_where_clause_and_params(filters)
1100+
delete_sql += sql_where_clause
1101+
1102+
await self._ensure_db_setup_async()
1103+
assert self._async_cursor is not None
1104+
1105+
try:
1106+
await self._execute_sql_async(
1107+
cursor=self._async_cursor,
1108+
sql_query=delete_sql,
1109+
params=params,
1110+
error_msg="Could not delete documents by filter from PgvectorDocumentStore",
1111+
)
1112+
deleted_count = self._async_cursor.rowcount
1113+
logger.info(
1114+
"Deleted {n_docs} documents from table '{schema}.{table}' using filters.",
1115+
n_docs=deleted_count,
1116+
schema=self.schema_name,
1117+
table=self.table_name,
1118+
)
1119+
return deleted_count
1120+
except Error as e:
1121+
msg = f"Failed to delete documents by filter from PgvectorDocumentStore: {e!s}"
1122+
raise DocumentStoreError(msg) from e
1123+
1124+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
1125+
"""
1126+
Updates the metadata of all documents that match the provided filters.
1127+
1128+
:param filters: The filters to apply to select documents for updating.
1129+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
1130+
:param meta: The metadata fields to update.
1131+
:returns: The number of documents updated.
1132+
"""
1133+
_validate_filters(filters)
1134+
1135+
if not meta:
1136+
msg = "meta must be a non-empty dictionary"
1137+
raise ValueError(msg)
1138+
1139+
update_sql = SQL("UPDATE {schema_name}.{table_name} SET meta = COALESCE(meta, '{{}}'::jsonb) || %s::jsonb").format(
1140+
schema_name=Identifier(self.schema_name),
1141+
table_name=Identifier(self.table_name),
1142+
)
1143+
1144+
params: tuple[Any, ...] = (Jsonb(meta),)
1145+
if filters:
1146+
sql_where_clause, where_params = _convert_filters_to_where_clause_and_params(filters)
1147+
update_sql += sql_where_clause
1148+
params = params + where_params
1149+
1150+
self._ensure_db_setup()
1151+
assert self._cursor is not None
1152+
1153+
try:
1154+
self._execute_sql(
1155+
cursor=self._cursor,
1156+
sql_query=update_sql,
1157+
params=params,
1158+
error_msg="Could not update documents by filter from PgvectorDocumentStore",
1159+
)
1160+
updated_count = self._cursor.rowcount
1161+
logger.info(
1162+
"Updated {n_docs} documents in table '{schema}.{table}' using filters.",
1163+
n_docs=updated_count,
1164+
schema=self.schema_name,
1165+
table=self.table_name,
1166+
)
1167+
return updated_count
1168+
except Error as e:
1169+
msg = f"Failed to update documents by filter in PgvectorDocumentStore: {e!s}"
1170+
raise DocumentStoreError(msg) from e
1171+
1172+
async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
1173+
"""
1174+
Asynchronously updates the metadata of all documents that match the provided filters.
1175+
1176+
:param filters: The filters to apply to select documents for updating.
1177+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
1178+
:param meta: The metadata fields to update.
1179+
:returns: The number of documents updated.
1180+
"""
1181+
_validate_filters(filters)
1182+
1183+
if not meta:
1184+
msg = "meta must be a non-empty dictionary"
1185+
raise ValueError(msg)
1186+
1187+
update_sql = SQL("UPDATE {schema_name}.{table_name} SET meta = COALESCE(meta, '{{}}'::jsonb) || %s::jsonb").format(
1188+
schema_name=Identifier(self.schema_name),
1189+
table_name=Identifier(self.table_name),
1190+
)
1191+
1192+
params: tuple[Any, ...] = (Jsonb(meta),)
1193+
if filters:
1194+
sql_where_clause, where_params = _convert_filters_to_where_clause_and_params(filters)
1195+
update_sql += sql_where_clause
1196+
params = params + where_params
1197+
1198+
await self._ensure_db_setup_async()
1199+
assert self._async_cursor is not None
1200+
1201+
try:
1202+
await self._execute_sql_async(
1203+
cursor=self._async_cursor,
1204+
sql_query=update_sql,
1205+
params=params,
1206+
error_msg="Could not update documents by filter from PgvectorDocumentStore",
1207+
)
1208+
updated_count = self._async_cursor.rowcount
1209+
logger.info(
1210+
"Updated {n_docs} documents in table '{schema}.{table}' using filters.",
1211+
n_docs=updated_count,
1212+
schema=self.schema_name,
1213+
table=self.table_name,
1214+
)
1215+
return updated_count
1216+
except Error as e:
1217+
msg = f"Failed to update documents by filter in PgvectorDocumentStore: {e!s}"
1218+
raise DocumentStoreError(msg) from e
1219+
10391220
def _build_keyword_retrieval_query(
10401221
self, query: str, top_k: int, filters: Optional[dict[str, Any]] = None
10411222
) -> tuple[Composed, tuple]:

0 commit comments

Comments
 (0)