Skip to content

Commit cfbc90e

Browse files
feat(falkordb): add missing document store operations
1 parent 813db45 commit cfbc90e

2 files changed

Lines changed: 207 additions & 2 deletions

File tree

integrations/falkordb/src/haystack_integrations/document_stores/falkordb/document_store.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,191 @@ def delete_documents(self, document_ids: list[str]) -> None:
449449
{"ids": document_ids},
450450
)
451451

452+
def delete_all_documents(self) -> None:
453+
"""
454+
Delete all documents from the graph.
455+
"""
456+
self._ensure_connected()
457+
self.graph.query(f"MATCH (d:{self.node_label}) DETACH DELETE d")
458+
459+
def delete_by_filter(self, filters: dict[str, Any]) -> int:
460+
"""
461+
Delete all documents that match the provided filters.
462+
463+
:param filters: Haystack filter dict.
464+
:returns: Number of documents deleted.
465+
"""
466+
self._ensure_connected()
467+
where_clause, params = _convert_filters(filters)
468+
count_result = self.graph.query(
469+
f"MATCH (d:{self.node_label}) WHERE {where_clause} RETURN count(d) AS n",
470+
params,
471+
)
472+
count = int(count_result.result_set[0][0]) if count_result.result_set else 0
473+
self.graph.query(
474+
f"MATCH (d:{self.node_label}) WHERE {where_clause} DETACH DELETE d",
475+
params,
476+
)
477+
return count
478+
479+
def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
480+
"""
481+
Update metadata fields on all documents that match the provided filters.
482+
483+
:param filters: Haystack filter dict selecting which documents to update.
484+
:param meta: Metadata fields to set. Keys may include or omit the ``meta.`` prefix.
485+
:returns: Number of documents updated.
486+
"""
487+
self._ensure_connected()
488+
where_clause, params = _convert_filters(filters)
489+
flat_meta = {k[5:] if k.startswith("meta.") else k: v for k, v in meta.items()}
490+
params["meta_update"] = flat_meta
491+
result = self.graph.query(
492+
f"MATCH (d:{self.node_label}) WHERE {where_clause} SET d += $meta_update RETURN count(d) AS n",
493+
params,
494+
)
495+
rows = result.result_set
496+
return int(rows[0][0]) if rows else 0
497+
498+
def count_documents_by_filter(self, filters: dict[str, Any]) -> int:
499+
"""
500+
Return the number of documents that match the provided filters.
501+
502+
:param filters: Haystack filter dict.
503+
:returns: Integer count of matching document nodes.
504+
"""
505+
self._ensure_connected()
506+
where_clause, params = _convert_filters(filters)
507+
result = self.graph.query(
508+
f"MATCH (d:{self.node_label}) WHERE {where_clause} RETURN count(d) AS n",
509+
params,
510+
)
511+
rows = result.result_set
512+
return int(rows[0][0]) if rows else 0
513+
514+
def count_unique_metadata_by_filter(
515+
self, filters: dict[str, Any], metadata_fields: list[str]
516+
) -> dict[str, int]:
517+
"""
518+
Return the number of unique values for each specified metadata field among documents
519+
that match the provided filters.
520+
521+
:param filters: Haystack filter dict. Pass an empty dict to count across all documents.
522+
:param metadata_fields: List of metadata field names. May include or omit the ``meta.`` prefix.
523+
:returns: Dict mapping each field name (without ``meta.`` prefix) to its unique value count.
524+
"""
525+
self._ensure_connected()
526+
if filters:
527+
where_clause, params = _convert_filters(filters)
528+
match = f"MATCH (d:{self.node_label}) WHERE {where_clause}"
529+
else:
530+
params = {}
531+
match = f"MATCH (d:{self.node_label})"
532+
533+
result: dict[str, int] = {}
534+
for field in metadata_fields:
535+
actual = field[5:] if field.startswith("meta.") else field
536+
res = self.graph.query(
537+
f"{match} RETURN count(DISTINCT d.{actual}) AS n",
538+
params,
539+
)
540+
rows = res.result_set
541+
result[actual] = int(rows[0][0]) if rows else 0
542+
return result
543+
544+
def get_metadata_fields_info(self) -> dict[str, dict[str, str]]:
545+
"""
546+
Return type information for each metadata field present on document nodes.
547+
548+
:returns: Dict mapping field names to a ``{"type": <typename>}`` dict.
549+
Type names are ``"str"``, ``"int"``, ``"float"``, or ``"bool"``.
550+
"""
551+
self._ensure_connected()
552+
_STANDARD_FIELDS = {"id", "content", "embedding", "score", "sparse_embedding"}
553+
result = self.graph.query(f"MATCH (d:{self.node_label}) RETURN keys(d)")
554+
all_keys: set[str] = set()
555+
for row in result.result_set:
556+
all_keys.update(row[0])
557+
all_keys -= _STANDARD_FIELDS
558+
559+
info: dict[str, dict[str, str]] = {}
560+
for key in sorted(all_keys):
561+
res = self.graph.query(
562+
f"MATCH (d:{self.node_label}) WHERE d.{key} IS NOT NULL RETURN d.{key} LIMIT 1"
563+
)
564+
if not res.result_set:
565+
continue
566+
val = res.result_set[0][0]
567+
if isinstance(val, bool):
568+
type_name = "bool"
569+
elif isinstance(val, int):
570+
type_name = "int"
571+
elif isinstance(val, float):
572+
type_name = "float"
573+
else:
574+
type_name = "str"
575+
info[key] = {"type": type_name}
576+
return info
577+
578+
def get_metadata_field_min_max(self, metadata_field: str) -> dict[str, Any]:
579+
"""
580+
Return the minimum and maximum values for the given metadata field.
581+
582+
:param metadata_field: Metadata field name. May include or omit the ``meta.`` prefix.
583+
:returns: Dict with keys ``"min"`` and ``"max"``. Values are ``None`` when no documents
584+
have a non-null value for the field.
585+
"""
586+
self._ensure_connected()
587+
field = metadata_field[5:] if metadata_field.startswith("meta.") else metadata_field
588+
result = self.graph.query(
589+
f"MATCH (d:{self.node_label}) WHERE d.{field} IS NOT NULL "
590+
f"RETURN min(d.{field}), max(d.{field})"
591+
)
592+
if not result.result_set:
593+
return {"min": None, "max": None}
594+
row = result.result_set[0]
595+
return {"min": row[0], "max": row[1]}
596+
597+
def get_metadata_field_unique_values(
598+
self,
599+
metadata_field: str,
600+
search_term: str | None = None,
601+
size: int | None = 10000,
602+
after: dict[str, Any] | None = None,
603+
) -> tuple[list[Any], dict[str, Any] | None]:
604+
"""
605+
Return distinct values for the given metadata field with optional filtering and pagination.
606+
607+
:param metadata_field: Metadata field name. May include or omit the ``meta.`` prefix.
608+
:param search_term: Optional substring filter applied to string field values.
609+
:param size: Maximum number of values to return per page. Defaults to 10 000.
610+
:param after: Pagination cursor returned by a previous call. Pass ``None`` for the first page.
611+
:returns: Tuple of ``(values, next_cursor)``. ``next_cursor`` is ``None`` on the last page.
612+
"""
613+
self._ensure_connected()
614+
field = metadata_field[5:] if metadata_field.startswith("meta.") else metadata_field
615+
offset = after.get("offset", 0) if after else 0
616+
limit = size if size is not None else 10000
617+
618+
query_params: dict[str, Any] = {}
619+
where_parts = [f"d.{field} IS NOT NULL"]
620+
if search_term:
621+
where_parts.append(f"toString(d.{field}) CONTAINS $search_term")
622+
query_params["search_term"] = search_term
623+
624+
where = " AND ".join(where_parts)
625+
cypher = (
626+
f"MATCH (d:{self.node_label}) WHERE {where} "
627+
f"RETURN DISTINCT d.{field} AS val "
628+
f"ORDER BY val "
629+
f"SKIP {offset} LIMIT {limit + 1}"
630+
)
631+
result = self.graph.query(cypher, query_params)
632+
rows = result.result_set
633+
values = [row[0] for row in rows[:limit]]
634+
next_cursor: dict[str, Any] | None = {"offset": offset + limit} if len(rows) > limit else None
635+
return values, next_cursor
636+
452637
# ------------------------------------------------------------------
453638
# Internal retrieval helpers (called by retriever components)
454639
# ------------------------------------------------------------------

integrations/falkordb/tests/test_document_store.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,17 @@
1313
from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError
1414
from haystack.document_stores.types import DuplicatePolicy
1515
from haystack.errors import FilterError
16-
from haystack.testing.document_store import DocumentStoreBaseTests
16+
from haystack.testing.document_store import (
17+
CountDocumentsByFilterTest,
18+
CountUniqueMetadataByFilterTest,
19+
DeleteAllTest,
20+
DeleteByFilterTest,
21+
DocumentStoreBaseTests,
22+
GetMetadataFieldMinMaxTest,
23+
GetMetadataFieldsInfoTest,
24+
GetMetadataFieldUniqueValuesTest,
25+
UpdateByFilterTest,
26+
)
1727

1828
from haystack_integrations.components.retrievers.falkordb import (
1929
FalkorDBCypherRetriever,
@@ -269,7 +279,17 @@ def test_write_documents_wraps_errors(self, mock_falkordb):
269279

270280

271281
@pytest.mark.integration
272-
class TestDocumentStore(DocumentStoreBaseTests):
282+
class TestDocumentStore(
283+
DocumentStoreBaseTests,
284+
DeleteAllTest,
285+
DeleteByFilterTest,
286+
UpdateByFilterTest,
287+
CountDocumentsByFilterTest,
288+
CountUniqueMetadataByFilterTest,
289+
GetMetadataFieldsInfoTest,
290+
GetMetadataFieldMinMaxTest,
291+
GetMetadataFieldUniqueValuesTest,
292+
):
273293
"""
274294
Test FalkorDBDocumentStore against the standard Haystack DocumentStore tests.
275295
"""

0 commit comments

Comments
 (0)