diff --git a/integrations/faiss/README.md b/integrations/faiss/README.md new file mode 100644 index 0000000000..345ff270d9 --- /dev/null +++ b/integrations/faiss/README.md @@ -0,0 +1,17 @@ +# faiss-haystack + +This package provides a [FAISS](https://github.com/facebookresearch/faiss) document store for [Haystack](https://github.com/deepset-ai/haystack). + +## Installation + +```bash +pip install faiss-haystack +``` + +## Usage + +```python +from haystack_integrations.document_stores.faiss import FAISSDocumentStore + +document_store = FAISSDocumentStore(index_path="my_index") +``` diff --git a/integrations/faiss/pydoc/config_docusaurus.yml b/integrations/faiss/pydoc/config_docusaurus.yml new file mode 100644 index 0000000000..72b64e3dd9 --- /dev/null +++ b/integrations/faiss/pydoc/config_docusaurus.yml @@ -0,0 +1,14 @@ +loaders: + - modules: + - haystack_integrations.components.retrievers.faiss.embedding_retriever + - haystack_integrations.document_stores.faiss.document_store + search_path: [../src] +processors: + - type: filter + documented_only: true + skip_empty_modules: true +renderer: + description: FAISS integration for Haystack + id: integrations-faiss + filename: faiss.md + title: FAISS diff --git a/integrations/faiss/pyproject.toml b/integrations/faiss/pyproject.toml new file mode 100644 index 0000000000..5646508a95 --- /dev/null +++ b/integrations/faiss/pyproject.toml @@ -0,0 +1,159 @@ +[build-system] +requires = ["hatchling", "hatch-vcs"] +build-backend = "hatchling.build" + +[project] +name = "faiss-haystack" +dynamic = ["version"] +description = '' +readme = "README.md" +requires-python = ">=3.10" +license = "Apache-2.0" +keywords = [] +authors = [{ name = "Deepset", email = "info@deepset.ai" }] +classifiers = [ + "License :: OSI Approved :: Apache Software License", + "Development Status :: 4 - Beta", + "Programming Language :: Python", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dependencies = [ + "haystack-ai>=2.24.0", + "faiss-cpu>=1.8.0", + "numpy", +] + +[project.urls] +Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/faiss#readme" +Issues = "https://github.com/deepset-ai/haystack-core-integrations/issues" +Source = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/faiss" + +[tool.hatch.build.targets.wheel] +packages = ["src/haystack_integrations"] + +[tool.hatch.version] +source = "vcs" +tag-pattern = 'integrations\/faiss-v(?P.*)' + +[tool.hatch.version.raw-options] +root = "../.." +git_describe_command = 'git describe --tags --match="integrations/faiss-v[0-9]*"' + +[tool.hatch.envs.default] +installer = "uv" +dependencies = ["haystack-pydoc-tools", "ruff"] + +[tool.hatch.envs.default.scripts] +docs = ["pydoc-markdown pydoc/config_docusaurus.yml"] +fmt = "ruff check --fix {args}; ruff format {args}" +fmt-check = "ruff check {args} && ruff format --check {args}" + +[tool.hatch.envs.test] +dependencies = [ + "pytest", + "pytest-cov", + "pytest-rerunfailures", + "mypy", + "pandas", +] + +[tool.hatch.envs.test.scripts] +unit = 'pytest -m "not integration" {args:tests}' +integration = 'pytest -m "integration" {args:tests}' +all = 'pytest {args:tests}' +cov-retry = 'pytest --cov=haystack_integrations --reruns 3 --reruns-delay 30 -x {args:tests}' + +types = "mypy -p haystack_integrations.document_stores.faiss -p haystack_integrations.components.retrievers.faiss {args}" + +[tool.mypy] +install_types = true +non_interactive = true +check_untyped_defs = true +disallow_incomplete_defs = true + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.ruff] +line-length = 120 + +[tool.ruff.lint] +select = [ + "A", + "ARG", + "B", + "C", + "DTZ", + "E", + "EM", + "F", + "FBT", + "I", + "ICN", + "ISC", + "N", + "PLC", + "PLE", + "PLR", + "PLW", + "Q", + "RUF", + "S", + "T", + "TID", + "UP", + "W", + "YTT", +] +ignore = [ + # Allow non-abstract empty methods in abstract base classes + "B027", + # Allow boolean positional values in function calls, like `dict.get(... True)` + "FBT003", + # Ignore checks for possible passwords + "S105", + "S106", + "S107", + # Ignore complexity + "C901", + "PLR0911", + "PLR0912", + "PLR0913", + "PLR0915", + # Ignore unused params + "ARG002", + # Allow assertions + "S101", +] +exclude = ["example"] + +[tool.ruff.lint.isort] +known-first-party = ["haystack_integrations"] + +[tool.ruff.lint.flake8-tidy-imports] +ban-relative-imports = "parents" + +[tool.ruff.lint.per-file-ignores] +# Tests can use magic values, assertions, and relative imports +"tests/**/*" = ["PLR2004", "S101", "TID252"] +"example/**/*" = ["T201"] + +[tool.coverage.run] +source = ["haystack_integrations"] +branch = true +parallel = false + + +[tool.coverage.report] +omit = ["*/tests/*", "*/__init__.py"] +show_missing = true +exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"] + + +[tool.pytest.ini_options] +minversion = "6.0" +markers = ["integration: integration tests"] diff --git a/integrations/faiss/src/haystack_integrations/__init__.py b/integrations/faiss/src/haystack_integrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/faiss/src/haystack_integrations/components/__init__.py b/integrations/faiss/src/haystack_integrations/components/__init__.py new file mode 100644 index 0000000000..e873bc3327 --- /dev/null +++ b/integrations/faiss/src/haystack_integrations/components/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/integrations/faiss/src/haystack_integrations/components/retrievers/__init__.py b/integrations/faiss/src/haystack_integrations/components/retrievers/__init__.py new file mode 100644 index 0000000000..e873bc3327 --- /dev/null +++ b/integrations/faiss/src/haystack_integrations/components/retrievers/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/integrations/faiss/src/haystack_integrations/components/retrievers/faiss/__init__.py b/integrations/faiss/src/haystack_integrations/components/retrievers/faiss/__init__.py new file mode 100644 index 0000000000..7929f78b7f --- /dev/null +++ b/integrations/faiss/src/haystack_integrations/components/retrievers/faiss/__init__.py @@ -0,0 +1,6 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 +from .embedding_retriever import FAISSEmbeddingRetriever + +__all__ = ["FAISSEmbeddingRetriever"] diff --git a/integrations/faiss/src/haystack_integrations/components/retrievers/faiss/embedding_retriever.py b/integrations/faiss/src/haystack_integrations/components/retrievers/faiss/embedding_retriever.py new file mode 100644 index 0000000000..fb918cf383 --- /dev/null +++ b/integrations/faiss/src/haystack_integrations/components/retrievers/faiss/embedding_retriever.py @@ -0,0 +1,153 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +from haystack import component, default_from_dict, default_to_dict +from haystack.dataclasses import Document +from haystack.document_stores.types import FilterPolicy +from haystack.document_stores.types.filter_policy import apply_filter_policy + +from haystack_integrations.document_stores.faiss import FAISSDocumentStore + + +@component +class FAISSEmbeddingRetriever: + """ + Retrieves documents from the `FAISSDocumentStore`, based on their dense embeddings. + + Example usage: + ```python + from haystack import Document, Pipeline + from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder + from haystack.document_stores.types import DuplicatePolicy + + from haystack_integrations.document_stores.faiss import FAISSDocumentStore + from haystack_integrations.components.retrievers.faiss import FAISSEmbeddingRetriever + + document_store = FAISSDocumentStore(embedding_dim=768) + + documents = [ + Document(content="There are over 7,000 languages spoken around the world today."), + Document(content="Elephants have been observed to behave in a way that indicates a high level of intelligence."), + Document(content="In certain places, you can witness the phenomenon of bioluminescent waves."), + ] + + document_embedder = SentenceTransformersDocumentEmbedder() + document_embedder.warm_up() + documents_with_embeddings = document_embedder.run(documents)["documents"] + + document_store.write_documents(documents_with_embeddings, policy=DuplicatePolicy.OVERWRITE) + + query_pipeline = Pipeline() + query_pipeline.add_component("text_embedder", SentenceTransformersTextEmbedder()) + query_pipeline.add_component("retriever", FAISSEmbeddingRetriever(document_store=document_store)) + query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding") + + query = "How many languages are there?" + res = query_pipeline.run({"text_embedder": {"text": query}}) + + assert res["retriever"]["documents"][0].content == "There are over 7,000 languages spoken around the world today." + ``` + """ # noqa: E501 + + def __init__( + self, + *, + document_store: FAISSDocumentStore, + filters: dict[str, Any] | None = None, + top_k: int = 10, + filter_policy: str | FilterPolicy = FilterPolicy.REPLACE, + ): + """ + :param document_store: An instance of `FAISSDocumentStore`. + :param filters: Filters applied to the retrieved Documents at initialisation time. At runtime, these are merged + with any runtime filters according to the `filter_policy`. + :param top_k: Maximum number of Documents to return. + :param filter_policy: Policy to determine how init-time and runtime filters are combined. + See `FilterPolicy` for details. Defaults to `FilterPolicy.REPLACE`. + :raises ValueError: If `document_store` is not an instance of `FAISSDocumentStore`. + """ + if not isinstance(document_store, FAISSDocumentStore): + msg = "document_store must be an instance of FAISSDocumentStore" + raise ValueError(msg) + + self.document_store = document_store + self.filters = filters or {} + self.top_k = top_k + self.filter_policy = ( + filter_policy if isinstance(filter_policy, FilterPolicy) else FilterPolicy.from_str(filter_policy) + ) + + def to_dict(self) -> dict[str, Any]: + """ + Serializes the component to a dictionary. + + :returns: Dictionary with serialized data. + """ + return default_to_dict( + self, + filters=self.filters, + top_k=self.top_k, + filter_policy=self.filter_policy.value, + document_store=self.document_store.to_dict(), + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "FAISSEmbeddingRetriever": + """ + Deserializes the component from a dictionary. + + :param data: Dictionary to deserialize from. + :returns: Deserialized component. + """ + doc_store_params = data["init_parameters"]["document_store"] + data["init_parameters"]["document_store"] = FAISSDocumentStore.from_dict(doc_store_params) + return default_from_dict(cls, data) + + @component.output_types(documents=list[Document]) + def run( + self, + query_embedding: list[float], + filters: dict[str, Any] | None = None, + top_k: int | None = None, + ) -> dict[str, list[Document]]: + """ + Retrieve documents from the `FAISSDocumentStore`, based on their embeddings. + + :param query_embedding: Embedding of the query. + :param filters: Filters applied to the retrieved Documents. The way runtime filters are applied depends on + the `filter_policy` chosen at retriever initialization. See init method docstring for more + details. + :param top_k: Maximum number of Documents to return. Overrides the value set at initialization. + :returns: A dictionary with the following keys: + - `documents`: List of `Document`s that are similar to `query_embedding`. + """ + filters = apply_filter_policy(self.filter_policy, self.filters, filters) + top_k = top_k or self.top_k + docs = self.document_store.search(query_embedding=query_embedding, top_k=top_k, filters=filters) + return {"documents": docs} + + @component.output_types(documents=list[Document]) + async def run_async( + self, + query_embedding: list[float], + filters: dict[str, Any] | None = None, + top_k: int | None = None, + ) -> dict[str, list[Document]]: + """ + Asynchronously retrieve documents from the `FAISSDocumentStore`, based on their embeddings. + + Since FAISS search is CPU-bound and fully in-memory, this delegates directly to the synchronous + `run()` method. No I/O or network calls are involved. + + :param query_embedding: Embedding of the query. + :param filters: Filters applied to the retrieved Documents. The way runtime filters are applied depends on + the `filter_policy` chosen at retriever initialization. See init method docstring for more + details. + :param top_k: Maximum number of Documents to return. Overrides the value set at initialization. + :returns: A dictionary with the following keys: + - `documents`: List of `Document`s that are similar to `query_embedding`. + """ + return self.run(query_embedding=query_embedding, filters=filters, top_k=top_k) diff --git a/integrations/faiss/src/haystack_integrations/document_stores/__init__.py b/integrations/faiss/src/haystack_integrations/document_stores/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/faiss/src/haystack_integrations/document_stores/faiss/__init__.py b/integrations/faiss/src/haystack_integrations/document_stores/faiss/__init__.py new file mode 100644 index 0000000000..9e23aaa22d --- /dev/null +++ b/integrations/faiss/src/haystack_integrations/document_stores/faiss/__init__.py @@ -0,0 +1,3 @@ +from .document_store import FAISSDocumentStore + +__all__ = ["FAISSDocumentStore"] diff --git a/integrations/faiss/src/haystack_integrations/document_stores/faiss/document_store.py b/integrations/faiss/src/haystack_integrations/document_stores/faiss/document_store.py new file mode 100644 index 0000000000..fc78f19cd6 --- /dev/null +++ b/integrations/faiss/src/haystack_integrations/document_stores/faiss/document_store.py @@ -0,0 +1,549 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import json +import logging +from collections.abc import Iterable +from dataclasses import replace +from pathlib import Path +from typing import Any + +import faiss +import numpy as np +from haystack import default_from_dict, default_to_dict +from haystack.dataclasses import Document +from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError +from haystack.document_stores.types import DuplicatePolicy +from haystack.errors import FilterError + +logger = logging.getLogger(__name__) + + +class FAISSDocumentStore: + """ + A Document Store using FAISS for vector search and a simple JSON file for metadata storage. + + This Document Store is suitable for small to medium-sized datasets where simplicity is preferred over scalability. + It supports basic persistence by saving the FAISS index to a `.faiss` file and documents to a `.json` file. + """ + + def __init__( + self, + index_path: str | None = None, + index_string: str = "Flat", + embedding_dim: int = 768, + ): + """ + Initializes the FAISSDocumentStore. + + :param index_path: Path to save/load the index and documents. If None, the store is in-memory only. + :param index_string: The FAISS index factory string. Default is "Flat". + :param embedding_dim: The dimension of the embeddings. Default is 768. + """ + self.index_path = index_path + self.embedding_dim = embedding_dim + self.index_string = index_string + + # Initialize in-memory storage + self.documents: dict[str, Document] = {} + self.id_map: dict[int, str] = {} # Map integer IDs (FAISS) to string IDs (Documents) + self.inverse_id_map: dict[str, int] = {} # Map string IDs to integer IDs + self._next_id = 0 + + # Initialize FAISS index + self.index: faiss.Index | None = None + if self.index_path and Path(self.index_path).with_suffix(".faiss").exists(): + self.load(self.index_path) + else: + self._create_new_index() + + def _create_new_index(self): + """Creates a new FAISS index.""" + try: + # We use IndexIDMap to support add_with_ids + base_index = faiss.index_factory(self.embedding_dim, self.index_string) + self.index = faiss.IndexIDMap(base_index) + except RuntimeError as e: + msg = f"Could not create FAISS index with factory string '{self.index_string}': {e}" + raise DocumentStoreError(msg) from e + + def count_documents(self) -> int: + """ + Returns the number of documents in the store. + """ + return len(self.documents) + + def filter_documents(self, filters: dict[str, Any] | None = None) -> list[Document]: + """ + Returns documents that match the provided filters. + + :param filters: A dictionary of filters to apply. + :return: A list of matching Documents. + """ + if not filters: + return list(self.documents.values()) + + filtered_docs = [] + for doc in self.documents.values(): + if self._matches_filters(doc, filters): + filtered_docs.append(doc) + return filtered_docs + + def _matches_filters(self, doc: Document, filters: dict[str, Any]) -> bool: + """ + Checks if a document matches the given filters. + + Currently, supports simple equality and comparison checks. + """ + return self._check_condition(doc, filters) + + @staticmethod + def _get_doc_value(doc: Document, field: str) -> Any: + """Helper to get value from doc, handling 'meta.' prefix.""" + if field == "content": + return doc.content + if field == "id": + return doc.id + if field.startswith("meta."): + key = field[5:] + return doc.meta.get(key) + # Fallback: check top level attributes then meta + if hasattr(doc, field): + return getattr(doc, field) + return doc.meta.get(field) + + def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.FAIL) -> int: + """ + Writes documents to the store. + + :param documents: The list of documents to write. + :param policy: The policy to handle duplicate documents. + :return: The number of documents written. + """ + if not isinstance(documents, Iterable) or isinstance(documents, (str, bytes)): + msg = "param 'documents' must contain an iterable of objects of type Document." + raise ValueError(msg) + + if any(not isinstance(doc, Document) for doc in documents): + msg = "param 'documents' must contain an iterable of objects of type Document." + raise ValueError(msg) + + if not documents: + return 0 + + # Check for duplicates first if policy is FAIL + if policy == DuplicatePolicy.FAIL: + for doc in documents: + if doc.id in self.documents: + msg = f"Document with id '{doc.id}' already exists." + raise DuplicateDocumentError(msg) + + # Process documents + ids_to_add_to_index = [] + vectors_to_add = [] + + docs_written = 0 + + for doc in documents: + if policy == DuplicatePolicy.SKIP and doc.id in self.documents: + continue + + # Handle overwrite or new + if doc.id in self.documents: + # If overwriting, we need to remove the old vector from index first? + # FAISS doesn't support easy update. We'd have to remove and add. + # For MVP, let's implement remove then add for overwrite. + self.delete_documents([doc.id]) + + self.documents[doc.id] = doc + + if doc.embedding is not None: + # Assign a new integer ID + int_id = self._next_id + self._next_id += 1 + + self.id_map[int_id] = doc.id + self.inverse_id_map[doc.id] = int_id + + ids_to_add_to_index.append(int_id) + vectors_to_add.append(doc.embedding) + + docs_written += 1 + + # Add to FAISS + if vectors_to_add: + vectors = np.array(vectors_to_add, dtype="float32") + ids = np.array(ids_to_add_to_index, dtype="int64") + self.index.add_with_ids(vectors, ids) + + return docs_written + + def delete_documents(self, document_ids: list[str]) -> None: + """ + Deletes documents from the store. + """ + if not document_ids: + return + + ids_to_remove_from_index = [] + + for doc_id in document_ids: + if doc_id in self.documents: + del self.documents[doc_id] + + if doc_id in self.inverse_id_map: + int_id = self.inverse_id_map.pop(doc_id) + del self.id_map[int_id] + ids_to_remove_from_index.append(int_id) + + if ids_to_remove_from_index and self.index.ntotal > 0: + ids_array = np.array(ids_to_remove_from_index, dtype="int64") + self.index.remove_ids(ids_array) + + def delete_all_documents(self) -> None: + """ + Deletes all documents from the store. + """ + self.documents = {} + self.id_map = {} + self.inverse_id_map = {} + self._next_id = 0 + self._create_new_index() + + def search( + self, query_embedding: list[float], top_k: int = 10, filters: dict[str, Any] | None = None + ) -> list[Document]: + """ + Performs a vector search. + + :param query_embedding: The query embedding. + :param top_k: The number of results to return. + :param filters: Filters to apply. + :return: A list of matching Documents. + """ + if not self.index or self.index.ntotal == 0: + return [] + + # Ensure embedding format + query_vec = np.array([query_embedding], dtype="float32") + + # Search in FAISS + # Valid strategy for pre-filtering vs post-filtering: + # Since FAISS `IndexIDMap` doesn't support pre-filtering natively comfortably + # without `RangeSearch` or specialized impls, we fetch more and filter post-retrieval. + + fetch_k = top_k + if filters: + fetch_k = min(self.index.ntotal, top_k * 10) # Simple heuristic + + distances, indices = self.index.search(query_vec, fetch_k) + + results = [] + for dist, int_id in zip(distances[0], indices[0], strict=False): + if int_id == -1: + continue + + doc_id = self.id_map.get(int_id) + if not doc_id or doc_id not in self.documents: + continue + + doc = self.documents[doc_id] + + if filters and not self._matches_filters(doc, filters): + continue + + # Build a new instance instead of mutating score in place. + score = float(1 / (1 + dist)) if self.index_string == "Flat" else float(dist) + result_doc = replace(doc, score=score) + + results.append(result_doc) + + if len(results) >= top_k: + break + + return results + + def _check_condition(self, doc: Document, condition: dict[str, Any]) -> bool: + if "operator" not in condition and "conditions" not in condition: + # This might be a legacy or malformed filter from tests like test_missing_top_level_operator_key + # The standard Haystack filter structure enforces keys. + # On failure to parse standard structure, we should raise FilterError as per tests? + # Actually, looking at the tests (e.g. TestFAISSDocumentStore.test_missing_top_level_operator_key), + # they expect FilterError if "operator" is missing from a condition block. + msg = "Filter condition missing 'operator'" + raise FilterError(msg) + + operator = condition.get("operator", "==") + + if operator == "AND": + if "conditions" not in condition: + msg = "Missing 'conditions' for AND operator" + raise FilterError(msg) + return all(self._check_condition(doc, cond) for cond in condition.get("conditions", [])) + elif operator == "OR": + if "conditions" not in condition: + msg = "Missing 'conditions' for OR operator" + raise FilterError(msg) + return any(self._check_condition(doc, cond) for cond in condition.get("conditions", [])) + elif operator == "NOT": + if "conditions" not in condition: + msg = "Missing 'conditions' for NOT operator" + raise FilterError(msg) + conditions = condition.get("conditions") + if not isinstance(conditions, list) or not conditions: + msg = "NOT operator expects at least one condition" + raise FilterError(msg) + return not all(self._check_condition(doc, cond) for cond in conditions) + + # Leaf condition + if "field" not in condition: + msg = "Missing 'field' in filter condition" + raise FilterError(msg) + field = condition.get("field") + if "value" not in condition: + msg = "Missing 'value' in filter condition" + raise FilterError(msg) + value = condition.get("value") + + doc_val = FAISSDocumentStore._get_doc_value(doc, field) + + # Type check for comparison operators + if operator in [">", ">=", "<", "<="]: + if doc_val is None: + # Haystack specific: if field is missing/None, and we compare, it generally shouldn't match. + return False + + if value is None: + # Comparing anything with None using inequalities is invalid, + # but tests expect efficient handling (no match) + return False + + # Check for compatibility + # We allow int/float comparison + is_number_doc = isinstance(doc_val, (int, float)) + is_number_val = isinstance(value, (int, float)) + + if is_number_doc and is_number_val: + # Compatible + pass + elif type(doc_val) is not type(value): + # Incompatible types for inequality implementation (like str vs int, or list vs int) + msg = f"Type mismatch: cannot compare {type(doc_val)} with {type(value)}" + raise FilterError(msg) + + try: + if operator == ">": + return doc_val > value + if operator == ">=": + return doc_val >= value + if operator == "<": + return doc_val < value + if operator == "<=": + return doc_val <= value + except TypeError as e: + msg = f"Type mismatch in filter: {e}" + raise FilterError(msg) from e + + if operator == "==": + return doc_val == value + elif operator == "!=": + return doc_val != value + elif operator == "in": + if not isinstance(value, list): + msg = "Value for 'in' must be a list" + raise FilterError(msg) + return doc_val in value + elif operator == "not in": + if not isinstance(value, list): + msg = "Value for 'not in' must be a list" + raise FilterError(msg) + return doc_val not in value + + return False + + # Mixin implementations + + def delete_by_filter(self, filters: dict[str, Any]) -> int: + """ + Deletes documents that match the provided filters from the store. + + :param filters: A dictionary of filters to apply to find documents to delete. + :returns: The number of documents deleted. + """ + docs_to_delete = self.filter_documents(filters) + ids = [doc.id for doc in docs_to_delete] + self.delete_documents(ids) + return len(ids) + + def count_documents_by_filter(self, filters: dict[str, Any]) -> int: + """ + Returns the number of documents that match the provided filters. + + :param filters: A dictionary of filters to apply. + :returns: The number of matching documents. + """ + return len(self.filter_documents(filters)) + + def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + """ + Updates documents that match the provided filters with the new metadata. + + Note: Updates are performed in-memory only. To persist these changes, + you must explicitly call `save()` after updating. + + :param filters: A dictionary of filters to apply to find documents to update. + :param meta: A dictionary of metadata key-value pairs to update in the matching documents. + :returns: The number of documents updated. + """ + docs_to_update = self.filter_documents(filters) + for doc in docs_to_update: + doc.meta.update(meta) + # In this dict implementation, it's updated in place in memory. + return len(docs_to_update) + + def get_metadata_fields_info(self) -> dict[str, dict[str, Any]]: + """ + Infers and returns the types of all metadata fields from the stored documents. + + :returns: A dictionary mapping field names to dictionaries with a "type" key + (e.g. `{"field": {"type": "long"}}`). + """ + fields_idx = {} + for doc in self.documents.values(): + for key, value in doc.meta.items(): + if key not in fields_idx: + type_name = type(value).__name__ + if type_name == "str": + type_name = "keyword" + elif type_name == "int": + type_name = "long" + elif type_name == "bool": + type_name = "boolean" + fields_idx[key] = {"type": type_name} + return fields_idx + + def get_metadata_field_min_max(self, field_name: str) -> dict[str, Any]: + """ + Returns the minimum and maximum values for a specific metadata field. + + :param field_name: The name of the metadata field. + :returns: A dictionary with keys "min" and "max" containing the respective min and max values. + """ + values = [] + for doc in self.documents.values(): + val = ( + FAISSDocumentStore._get_doc_value(doc, field_name) + if not field_name.startswith("meta.") + else doc.meta.get(field_name[5:]) + ) + if val is not None: + values.append(val) + + if not values: + return {"min": None, "max": None} + + return {"min": min(values), "max": max(values)} + + def get_metadata_field_unique_values(self, field_name: str) -> list[Any]: + """ + Returns all unique values for a specific metadata field. + + :param field_name: The name of the metadata field. + :returns: A list of unique values for the specified field. + """ + values = set() + for doc in self.documents.values(): + val = ( + FAISSDocumentStore._get_doc_value(doc, field_name) + if not field_name.startswith("meta.") + else doc.meta.get(field_name[5:]) + ) + if val is not None: + values.add(val) + return list(values) + + def count_unique_metadata_by_filter(self, filters: dict[str, Any], fields: list[str]) -> dict[str, int]: + """ + Returns a count of unique values for multiple metadata fields, optionally scoped by a filter. + + :param filters: A dictionary of filters to apply. + :param fields: A list of metadata field names to count unique values for. + :returns: A dictionary mapping each field name to the count of its unique values. + """ + filtered_docs = self.filter_documents(filters) + counts = {} + + for field in fields: + unique_vals = set() + for doc in filtered_docs: + val = FAISSDocumentStore._get_doc_value(doc, field) + if val is not None: + unique_vals.add(val) + counts[field] = len(unique_vals) + + return dict(counts) + + def to_dict(self) -> dict[str, Any]: + """ + Serializes the store to a dictionary. + """ + return default_to_dict( + self, + index_path=self.index_path, + index_string=self.index_string, + embedding_dim=self.embedding_dim, + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "FAISSDocumentStore": + """ + Deserializes the store from a dictionary. + """ + return default_from_dict(cls, data) + + def save(self, index_path: str | Path) -> None: + """ + Saves the index and documents to disk. + """ + path = Path(index_path) + faiss.write_index(self.index, str(path.with_suffix(".faiss"))) + + # Save documents and ID mapping + data = { + "documents": [doc.to_dict() for doc in self.documents.values()], + "id_map": self.id_map, + "inverse_id_map": self.inverse_id_map, + "next_id": self._next_id, + } + + with open(path.with_suffix(".json"), "w", encoding="utf-8") as f: + json.dump(data, f) + + def load(self, index_path: str | Path) -> None: + """ + Loads the index and documents from disk. + """ + path = Path(index_path) + if not path.with_suffix(".faiss").exists(): + msg = f"File not found: {path.with_suffix('.faiss')}" + raise ValueError(msg) + + self.index = faiss.read_index(str(path.with_suffix(".faiss"))) + + with open(path.with_suffix(".json"), encoding="utf-8") as f: + data = json.load(f) + + self.documents = {doc_dict["id"]: Document.from_dict(doc_dict) for doc_dict in data["documents"]} + self.id_map = {int(k): v for k, v in data["id_map"].items()} + # inverse_id_map keys are strings, values are ints. JSON keys are strings. + self.inverse_id_map = data["inverse_id_map"] + self._next_id = data["next_id"] + + # Verify sync + if len(self.documents) != len(self.id_map): + logger.warning( + "Loaded %d documents but %d ID mappings. Index might be out of sync.", + len(self.documents), + len(self.id_map), + ) diff --git a/integrations/faiss/src/haystack_integrations/document_stores/faiss/py.typed b/integrations/faiss/src/haystack_integrations/document_stores/faiss/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/faiss/tests/__init__.py b/integrations/faiss/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/faiss/tests/test_document_store.py b/integrations/faiss/tests/test_document_store.py new file mode 100644 index 0000000000..a70ac4d472 --- /dev/null +++ b/integrations/faiss/tests/test_document_store.py @@ -0,0 +1,158 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from haystack.dataclasses import Document +from haystack.errors import FilterError +from haystack.testing.document_store import ( + CountDocumentsTest, + DeleteAllTest, + DeleteByFilterTest, + DeleteDocumentsTest, + FilterDocumentsTest, + UpdateByFilterTest, +) + +from haystack_integrations.document_stores.faiss import FAISSDocumentStore + + +class TestFAISSDocumentStore( + CountDocumentsTest, + DeleteDocumentsTest, + FilterDocumentsTest, + UpdateByFilterTest, + DeleteAllTest, + DeleteByFilterTest, +): + @pytest.fixture + def document_store(self, tmp_path): + return FAISSDocumentStore(index_path=str(tmp_path / "test_index")) + + def test_write_documents(self, document_store): + + doc = Document(content="test") + document_store.write_documents([doc]) + assert document_store.count_documents() == 1 + assert document_store.filter_documents()[0].id == doc.id + + def test_persistence(self, tmp_path): + + path = tmp_path / "persistent_index" + ds = FAISSDocumentStore(index_path=str(path), embedding_dim=3) + + doc = Document(content="test persistence", embedding=[0.1, 0.2, 0.3]) + ds.write_documents([doc]) + ds.save(path) + + # Load in new instance + ds_loaded = FAISSDocumentStore(index_path=str(path), embedding_dim=3) + assert ds_loaded.count_documents() == 1 + assert ds_loaded.filter_documents()[0].content == "test persistence" + assert ds_loaded.filter_documents()[0].embedding == [0.1, 0.2, 0.3] + + def test_persistence_no_embeddings(self, tmp_path): + + path = tmp_path / "persistent_index_no_embed" + ds = FAISSDocumentStore(index_path=str(path), embedding_dim=3) + + doc = Document(content="test no embedding") + ds.write_documents([doc]) + ds.save(path) + + # Load in new instance + ds_loaded = FAISSDocumentStore(index_path=str(path), embedding_dim=3) + assert ds_loaded.count_documents() == 1 + assert ds_loaded.filter_documents()[0].content == "test no embedding" + assert ds_loaded.filter_documents()[0].embedding is None + + def test_load_missing_files(self, tmp_path): + path = tmp_path / "missing_index" + ds = FAISSDocumentStore(index_path=str(path), embedding_dim=3) + with pytest.raises(ValueError, match="File not found"): + ds.load(path) + + def test_search_with_and_without_filters(self, document_store): + + # Setup documents with missing/varied embeddings to test edge cases + doc1 = Document(content="test1", embedding=[0.1, 0.2, 0.3], meta={"category": "A"}) + doc2 = Document(content="test2", embedding=[0.4, 0.5, 0.6], meta={"category": "B"}) + doc3 = Document(content="test3", meta={"category": "A"}) # No embedding + + # document_store from fixture uses default embedding_dim=768, so we must recreate + ds = FAISSDocumentStore(index_path=document_store.index_path, embedding_dim=3) + ds.write_documents([doc1, doc2, doc3]) + + # Test search based on query embedding + results = ds.search(query_embedding=[0.1, 0.2, 0.3], top_k=2) + assert len(results) == 2 + assert results[0].content == "test1" # Closest match + + # Test search with filter + results_filtered = ds.search( + query_embedding=[0.1, 0.2, 0.3], top_k=2, filters={"field": "meta.category", "operator": "==", "value": "B"} + ) + assert len(results_filtered) == 1 + assert results_filtered[0].content == "test2" + + def test_to_dict_from_dict(self): + ds = FAISSDocumentStore(index_path="test_index", index_string="Flat", embedding_dim=128) + + data = ds.to_dict() + assert data["type"] == "haystack_integrations.document_stores.faiss.document_store.FAISSDocumentStore" + assert data["init_parameters"]["index_path"] == "test_index" + assert data["init_parameters"]["index_string"] == "Flat" + assert data["init_parameters"]["embedding_dim"] == 128 + + ds_loaded = FAISSDocumentStore.from_dict(data) + assert ds_loaded.index_path == "test_index" + assert ds_loaded.index_string == "Flat" + assert ds_loaded.embedding_dim == 128 + + def test_count_documents_by_filter(self, document_store): + + docs = [ + Document(content="test1", meta={"category": "A"}), + Document(content="test2", meta={"category": "B"}), + Document(content="test3", meta={"category": "A"}), + ] + document_store.write_documents(docs) + + count = document_store.count_documents_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"} + ) + assert count == 2 + + def test_get_metadata_fields_info(self, document_store): + + docs = [Document(content="test1", meta={"category": "A", "count": 1, "is_active": True})] + document_store.write_documents(docs) + + info = document_store.get_metadata_fields_info() + assert "category" in info + assert info["category"]["type"] == "keyword" + assert "count" in info + assert info["count"]["type"] == "long" + assert "is_active" in info + assert info["is_active"]["type"] == "boolean" + + def test_count_unique_metadata_by_filter(self, document_store): + + docs = [ + Document(content="test1", meta={"category": "A", "status": "active"}), + Document(content="test2", meta={"category": "B", "status": "inactive"}), + Document(content="test3", meta={"category": "A", "status": "active"}), + ] + document_store.write_documents(docs) + + counts = document_store.count_unique_metadata_by_filter( + filters={"field": "meta.category", "operator": "==", "value": "A"}, fields=["meta.status"] + ) + assert "meta.status" in counts + assert counts["meta.status"] == 1 # Only "active" status for category A + + def test_not_filter_with_empty_conditions_raises_filter_error(self, document_store): + document_store.write_documents([Document(content="test", meta={"category": "A"})]) + + with pytest.raises(FilterError, match="NOT operator expects at least one condition"): + document_store.filter_documents(filters={"operator": "NOT", "conditions": []}) diff --git a/integrations/faiss/tests/test_embedding_retriever.py b/integrations/faiss/tests/test_embedding_retriever.py new file mode 100644 index 0000000000..efb76b107c --- /dev/null +++ b/integrations/faiss/tests/test_embedding_retriever.py @@ -0,0 +1,132 @@ +# SPDX-FileCopyrightText: 2023-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from haystack import Pipeline +from haystack.dataclasses import Document +from haystack.document_stores.types import FilterPolicy + +from haystack_integrations.components.retrievers.faiss import FAISSEmbeddingRetriever +from haystack_integrations.document_stores.faiss import FAISSDocumentStore + +EMBEDDING_DIM = 3 + + +@pytest.fixture +def document_store(): + """In-memory FAISSDocumentStore with dim=3 for fast unit tests.""" + return FAISSDocumentStore(embedding_dim=EMBEDDING_DIM) + + +@pytest.fixture +def populated_store(document_store): + """Store pre-loaded with 3 documents that have embeddings and metadata.""" + docs = [ + Document(content="alpha", embedding=[1.0, 0.0, 0.0], meta={"category": "A"}), + Document(content="beta", embedding=[0.0, 1.0, 0.0], meta={"category": "B"}), + Document(content="gamma", embedding=[0.0, 0.0, 1.0], meta={"category": "A"}), + ] + document_store.write_documents(docs) + return document_store + + +class TestFAISSEmbeddingRetriever: + def test_run_with_query_embedding_only(self, populated_store): + retriever = FAISSEmbeddingRetriever(document_store=populated_store, top_k=2) + result = retriever.run(query_embedding=[1.0, 0.0, 0.0]) + + assert "documents" in result + assert isinstance(result["documents"], list) + assert len(result["documents"]) == 2 + # All returned items must be Document instances + assert all(isinstance(d, Document) for d in result["documents"]) + + def test_run_with_filters(self, populated_store): + retriever = FAISSEmbeddingRetriever(document_store=populated_store, top_k=3) + filters = {"field": "meta.category", "operator": "==", "value": "A"} + result = retriever.run(query_embedding=[1.0, 0.0, 0.0], filters=filters) + + assert "documents" in result + contents = [d.content for d in result["documents"]] + # Only category-A docs should be returned + assert all(d.meta["category"] == "A" for d in result["documents"]) + assert "beta" not in contents + + def test_run_with_top_k_override(self, populated_store): + retriever = FAISSEmbeddingRetriever(document_store=populated_store, top_k=3) + result = retriever.run(query_embedding=[1.0, 0.0, 0.0], top_k=1) + + assert len(result["documents"]) == 1 + + def test_to_dict_from_dict_roundtrip(self, document_store): + retriever = FAISSEmbeddingRetriever( + document_store=document_store, + filters={"field": "meta.category", "operator": "==", "value": "A"}, + top_k=5, + filter_policy=FilterPolicy.MERGE, + ) + + serialized = retriever.to_dict() + assert serialized["type"] == ( + "haystack_integrations.components.retrievers.faiss.embedding_retriever.FAISSEmbeddingRetriever" + ) + assert serialized["init_parameters"]["top_k"] == 5 + assert serialized["init_parameters"]["filter_policy"] == FilterPolicy.MERGE.value + assert "document_store" in serialized["init_parameters"] + + restored = FAISSEmbeddingRetriever.from_dict(serialized) + assert restored.top_k == 5 + assert restored.filter_policy == FilterPolicy.MERGE + assert isinstance(restored.document_store, FAISSDocumentStore) + + def test_filter_policy_replace(self, populated_store): + """REPLACE: runtime filters fully replace init-time filters.""" + init_filters = {"field": "meta.category", "operator": "==", "value": "A"} + runtime_filters = {"field": "meta.category", "operator": "==", "value": "B"} + + retriever = FAISSEmbeddingRetriever( + document_store=populated_store, + filters=init_filters, + top_k=3, + filter_policy=FilterPolicy.REPLACE, + ) + result = retriever.run(query_embedding=[0.0, 1.0, 0.0], filters=runtime_filters) + + # Only category B docs should appear — the init filter was replaced + assert all(d.meta["category"] == "B" for d in result["documents"]) + + def test_filter_policy_merge(self, populated_store): + """MERGE: runtime filters are merged with init-time filters.""" + init_filters = {"field": "meta.category", "operator": "==", "value": "A"} + + retriever = FAISSEmbeddingRetriever( + document_store=populated_store, + filters=init_filters, + top_k=3, + filter_policy=FilterPolicy.MERGE, + ) + # Run without any runtime filter — init filter alone should apply + result = retriever.run(query_embedding=[1.0, 0.0, 0.0]) + + assert len(result["documents"]) >= 1 + assert all(d.meta["category"] == "A" for d in result["documents"]) + + def test_invalid_document_store_type(self): + with pytest.raises(ValueError, match="document_store must be an instance of FAISSDocumentStore"): + FAISSEmbeddingRetriever(document_store="not_a_store") # type: ignore[arg-type] + + def test_run_in_pipeline(self, populated_store): + """End-to-end: FAISSEmbeddingRetriever wired into a Haystack Pipeline.""" + retriever = FAISSEmbeddingRetriever(document_store=populated_store, top_k=2) + + pipeline = Pipeline() + pipeline.add_component("retriever", retriever) + + result = pipeline.run({"retriever": {"query_embedding": [1.0, 0.0, 0.0]}}) + + assert "retriever" in result + assert "documents" in result["retriever"] + assert isinstance(result["retriever"]["documents"], list) + assert len(result["retriever"]["documents"]) == 2 + assert all(isinstance(d, Document) for d in result["retriever"]["documents"])