Skip to content

Commit 1df9666

Browse files
committed
Refactor S3 Vectors document store: use Haystack built-in filter utility, deduplicate retrieval logic
- Replace hand-rolled _apply_filters_in_memory/_document_matches/_compare with haystack.utils.filters.document_matches_filter (same utility used by InMemoryDocumentStore). Gains NOT operator, nested dotted field paths, and date comparison support for free. (-65 lines) - Deduplicate blob/content reconstruction in _embedding_retrieval() by reusing _s3_vector_to_document() + dataclasses.replace() (-20 lines) - Make filter_documents() warning conditional on filters actually being provided (no warning when listing all documents)
1 parent d12a8b5 commit 1df9666

File tree

1 file changed

+15
-96
lines changed
  • integrations/amazon_s3_vectors/src/haystack_integrations/document_stores/amazon_s3_vectors

1 file changed

+15
-96
lines changed

integrations/amazon_s3_vectors/src/haystack_integrations/document_stores/amazon_s3_vectors/document_store.py

Lines changed: 15 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import base64
66
import json
7+
from dataclasses import replace
78
from typing import Any, Literal
89

910
import boto3
@@ -13,6 +14,7 @@
1314
from haystack.document_stores.errors import DocumentStoreError
1415
from haystack.document_stores.types import DuplicatePolicy
1516
from haystack.utils.auth import Secret, deserialize_secrets_inplace
17+
from haystack.utils.filters import document_matches_filter
1618

1719
from .filters import _normalize_filters, _validate_filters
1820

@@ -315,12 +317,13 @@ def filter_documents(self, filters: dict[str, Any] | None = None) -> list[Docume
315317
:param filters: Haystack-format filters to apply.
316318
:returns: A list of matching Documents.
317319
"""
318-
logger.warning(
319-
"S3 Vectors does not support standalone filtered listing. "
320-
"filter_documents() will fetch ALL vectors and apply filters client-side, "
321-
"which can be very slow for large indexes. "
322-
"Prefer using S3VectorsEmbeddingRetriever with filters for efficient filtered retrieval."
323-
)
320+
if filters:
321+
logger.warning(
322+
"S3 Vectors does not support standalone filtered listing. "
323+
"filter_documents() will fetch ALL vectors and apply filters client-side, "
324+
"which can be very slow for large indexes. "
325+
"Prefer using S3VectorsEmbeddingRetriever with filters for efficient filtered retrieval."
326+
)
324327

325328
client = self._get_client()
326329

@@ -347,7 +350,7 @@ def filter_documents(self, filters: dict[str, Any] | None = None) -> list[Docume
347350

348351
if filters:
349352
_validate_filters(filters)
350-
documents = _apply_filters_in_memory(documents, filters)
353+
documents = [doc for doc in documents if document_matches_filter(filters=filters, document=doc)]
351354

352355
return documents
353356

@@ -425,39 +428,20 @@ def _embedding_retrieval(
425428

426429
documents = []
427430
for v in result.get("vectors", []):
428-
metadata = v.get("metadata", {})
429-
content = metadata.pop(_CONTENT_KEY, None)
430-
431-
# Remove other internal keys from user-facing metadata
432-
blob_data = metadata.pop(_BLOB_DATA_KEY, None)
433-
blob_meta = metadata.pop(_BLOB_META_KEY, None)
434-
blob_mime_type = metadata.pop(_BLOB_MIME_TYPE_KEY, None)
435-
436-
blob = None
437-
if blob_data is not None:
438-
blob = ByteStream(
439-
data=base64.b64decode(blob_data) if isinstance(blob_data, str) else blob_data,
440-
meta=blob_meta or {},
441-
mime_type=blob_mime_type,
442-
)
431+
doc = self._s3_vector_to_document(v)
443432

444-
raw_distance = v.get("distance")
433+
# Compute score from distance
445434
score = None
435+
raw_distance = v.get("distance")
446436
if raw_distance is not None:
447437
if distance_metric == "cosine":
448438
score = 1.0 - raw_distance
449439
else:
450440
# euclidean: negate so higher = more similar
451441
score = -raw_distance
452442

453-
doc = Document(
454-
id=v["key"],
455-
content=content,
456-
meta=metadata,
457-
blob=blob,
458-
score=score,
459-
)
460-
documents.append(doc)
443+
# query_vectors does not return vector data; attach score
444+
documents.append(replace(doc, embedding=None, score=score))
461445

462446
return documents
463447

@@ -538,68 +522,3 @@ def _s3_vector_to_document(vector: dict[str, Any]) -> Document:
538522
embedding=embedding,
539523
blob=blob,
540524
)
541-
542-
543-
def _apply_filters_in_memory(documents: list[Document], filters: dict[str, Any]) -> list[Document]:
544-
"""
545-
Apply Haystack filters to a list of Documents in memory.
546-
547-
This is used by ``filter_documents`` since S3 Vectors doesn't support
548-
standalone metadata queries without a vector search.
549-
"""
550-
return [doc for doc in documents if _document_matches(doc, filters)]
551-
552-
553-
def _document_matches(doc: Document, filters: dict[str, Any]) -> bool:
554-
"""Check if a single Document matches the given Haystack filter."""
555-
if "operator" in filters and "conditions" in filters:
556-
operator = filters["operator"]
557-
conditions = filters["conditions"]
558-
if operator == "AND":
559-
return all(_document_matches(doc, c) for c in conditions)
560-
if operator == "OR":
561-
return any(_document_matches(doc, c) for c in conditions)
562-
msg = f"Unknown logical operator '{operator}'"
563-
raise ValueError(msg)
564-
565-
if "field" in filters:
566-
field = filters["field"]
567-
operator = filters["operator"]
568-
value = filters["value"]
569-
570-
# Resolve the field value from the Document
571-
if field.startswith("meta."):
572-
field_name = field[5:]
573-
doc_value = doc.meta.get(field_name) if doc.meta else None
574-
elif field == "content":
575-
doc_value = doc.content
576-
elif field == "id":
577-
doc_value = doc.id
578-
else:
579-
doc_value = doc.meta.get(field) if doc.meta else None
580-
581-
return _compare(doc_value, operator, value)
582-
583-
return True
584-
585-
586-
def _compare(doc_value: Any, operator: str, filter_value: Any) -> bool:
587-
"""Perform a comparison between a document field value and a filter value."""
588-
if operator == "==":
589-
return doc_value == filter_value
590-
if operator == "!=":
591-
return doc_value != filter_value
592-
if operator == ">":
593-
return doc_value is not None and doc_value > filter_value
594-
if operator == ">=":
595-
return doc_value is not None and doc_value >= filter_value
596-
if operator == "<":
597-
return doc_value is not None and doc_value < filter_value
598-
if operator == "<=":
599-
return doc_value is not None and doc_value <= filter_value
600-
if operator == "in":
601-
return doc_value is not None and doc_value in filter_value
602-
if operator == "not in":
603-
return doc_value is not None and doc_value not in filter_value
604-
msg = f"Unknown comparison operator '{operator}'"
605-
raise ValueError(msg)

0 commit comments

Comments
 (0)