Skip to content

Latest commit

 

History

History
520 lines (334 loc) · 13.7 KB

File metadata and controls

520 lines (334 loc) · 13.7 KB
title Astra
id integrations-astra
description Astra integration for Haystack
slug /integrations-astra

haystack_integrations.components.retrievers.astra.retriever

AstraEmbeddingRetriever

A component for retrieving documents from an AstraDocumentStore.

Usage example:

from haystack_integrations.document_stores.astra import AstraDocumentStore
from haystack_integrations.components.retrievers.astra import AstraEmbeddingRetriever

document_store = AstraDocumentStore(
    api_endpoint=api_endpoint,
    token=token,
    collection_name=collection_name,
    duplicates_policy=DuplicatePolicy.SKIP,
    embedding_dim=384,
)

retriever = AstraEmbeddingRetriever(document_store=document_store)

init

__init__(
    document_store: AstraDocumentStore,
    filters: dict[str, Any] | None = None,
    top_k: int = 10,
    filter_policy: str | FilterPolicy = FilterPolicy.REPLACE,
) -> None

Initialize the AstraEmbeddingRetriever.

Parameters:

  • document_store (AstraDocumentStore) – An instance of AstraDocumentStore.
  • filters (dict[str, Any] | None) – a dictionary with filters to narrow down the search space.
  • top_k (int) – the maximum number of documents to retrieve.
  • filter_policy (str | FilterPolicy) – Policy to determine how filters are applied.

run

run(
    query_embedding: list[float],
    filters: dict[str, Any] | None = None,
    top_k: int | None = None,
) -> dict[str, list[Document]]

Retrieve documents from the AstraDocumentStore.

Parameters:

  • query_embedding (list[float]) – floats representing the query embedding
  • filters (dict[str, Any] | None) – Filters applied to the retrieved Documents. The way runtime filters are applied depends on the filter_policy chosen at retriever initialization. See init method docstring for more details.
  • top_k (int | None) – the maximum number of documents to retrieve.

Returns:

  • dict[str, list[Document]] – a dictionary with the following keys:
  • documents: A list of documents retrieved from the AstraDocumentStore.

run_async

run_async(
    query_embedding: list[float],
    filters: dict[str, Any] | None = None,
    top_k: int | None = None,
) -> dict[str, list[Document]]

Retrieve documents from the AstraDocumentStore asynchronously.

Runs the sync search in a thread pool to avoid blocking the event loop.

Parameters:

  • query_embedding (list[float]) – floats representing the query embedding
  • filters (dict[str, Any] | None) – Filters applied to the retrieved Documents. The way runtime filters are applied depends on the filter_policy chosen at retriever initialization. See init method docstring for more details.
  • top_k (int | None) – the maximum number of documents to retrieve.

Returns:

  • dict[str, list[Document]] – a dictionary with the following keys:
  • documents: A list of documents retrieved from the AstraDocumentStore.

to_dict

to_dict() -> dict[str, Any]

Serializes the component to a dictionary.

Returns:

  • dict[str, Any] – Dictionary with serialized data.

from_dict

from_dict(data: dict[str, Any]) -> AstraEmbeddingRetriever

Deserializes the component from a dictionary.

Parameters:

  • data (dict[str, Any]) – Dictionary to deserialize from.

Returns:

  • AstraEmbeddingRetriever – Deserialized component.

haystack_integrations.document_stores.astra.document_store

AstraDocumentStore

An AstraDocumentStore document store for Haystack.

Example Usage:

from haystack_integrations.document_stores.astra import AstraDocumentStore

document_store = AstraDocumentStore(
    api_endpoint=api_endpoint,
    token=token,
    collection_name=collection_name,
    duplicates_policy=DuplicatePolicy.SKIP,
    embedding_dim=384,
)

init

__init__(
    api_endpoint: Secret = Secret.from_env_var("ASTRA_DB_API_ENDPOINT"),
    token: Secret = Secret.from_env_var("ASTRA_DB_APPLICATION_TOKEN"),
    collection_name: str = "documents",
    embedding_dimension: int = 768,
    duplicates_policy: DuplicatePolicy = DuplicatePolicy.NONE,
    similarity: str = "cosine",
    namespace: str | None = None,
) -> None

The connection to Astra DB is established and managed through the JSON API.

The required credentials (api endpoint and application token) can be generated through the UI by clicking and the connect tab, and then selecting JSON API and Generate Configuration.

Parameters:

  • api_endpoint (Secret) – the Astra DB API endpoint.
  • token (Secret) – the Astra DB application token.
  • collection_name (str) – the current collection in the keyspace in the current Astra DB.
  • embedding_dimension (int) – dimension of embedding vector.
  • duplicates_policy (DuplicatePolicy) – handle duplicate documents based on DuplicatePolicy parameter options. Parameter options : (SKIP, OVERWRITE, FAIL, NONE)
  • DuplicatePolicy.NONE: Default policy, If a Document with the same ID already exists, it is skipped and not written.
  • DuplicatePolicy.SKIP: if a Document with the same ID already exists, it is skipped and not written.
  • DuplicatePolicy.OVERWRITE: if a Document with the same ID already exists, it is overwritten.
  • DuplicatePolicy.FAIL: if a Document with the same ID already exists, an error is raised.
  • similarity (str) – the similarity function used to compare document vectors.

Raises:

  • ValueError – if the API endpoint or token is not set.

index

index: AstraClient

Return the AstraClient index, initializing it if necessary.

from_dict

from_dict(data: dict[str, Any]) -> AstraDocumentStore

Deserializes the component from a dictionary.

Parameters:

  • data (dict[str, Any]) – Dictionary to deserialize from.

Returns:

  • AstraDocumentStore – Deserialized component.

to_dict

to_dict() -> dict[str, Any]

Serializes the component to a dictionary.

Returns:

  • dict[str, Any] – Dictionary with serialized data.

write_documents

write_documents(
    documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
) -> int

Indexes documents for later queries.

Parameters:

  • documents (list[Document]) – a list of Haystack Document objects.
  • policy (DuplicatePolicy) – handle duplicate documents based on DuplicatePolicy parameter options. Parameter options : (SKIP, OVERWRITE, FAIL, NONE)
  • DuplicatePolicy.NONE: Default policy, If a Document with the same ID already exists, it is skipped and not written.
  • DuplicatePolicy.SKIP: If a Document with the same ID already exists, it is skipped and not written.
  • DuplicatePolicy.OVERWRITE: If a Document with the same ID already exists, it is overwritten.
  • DuplicatePolicy.FAIL: If a Document with the same ID already exists, an error is raised.

Returns:

  • int – number of documents written.

Raises:

  • ValueError – if the documents are not of type Document or dict.
  • DuplicateDocumentError – if a document with the same ID already exists and policy is set to FAIL.
  • Exception – if the document ID is not a string or if id and _id are both present in the document.

count_documents

count_documents() -> int

Counts the number of documents in the document store.

Returns:

  • int – the number of documents in the document store.

filter_documents

filter_documents(filters: dict[str, Any] | None = None) -> list[Document]

Returns at most 1000 documents that match the filter.

Parameters:

  • filters (dict[str, Any] | None) – filters to apply.

Returns:

  • list[Document] – matching documents.

Raises:

  • AstraDocumentStoreFilterError – if the filter is invalid or not supported by this class.

get_documents_by_id

get_documents_by_id(ids: list[str]) -> list[Document]

Gets documents by their IDs.

Parameters:

  • ids (list[str]) – the IDs of the documents to retrieve.

Returns:

  • list[Document] – the matching documents.

get_document_by_id

get_document_by_id(document_id: str) -> Document

Gets a document by its ID.

Parameters:

  • document_id (str) – the ID to filter by

Returns:

  • Document – the found document

Raises:

  • MissingDocumentError – if the document is not found

search

search(
    query_embedding: list[float],
    top_k: int,
    filters: dict[str, Any] | None = None,
) -> list[Document]

Perform a search for a list of queries.

Parameters:

  • query_embedding (list[float]) – a list of query embeddings.
  • top_k (int) – the number of results to return.
  • filters (dict[str, Any] | None) – filters to apply during search.

Returns:

  • list[Document] – matching documents.

delete_documents

delete_documents(document_ids: list[str]) -> None

Deletes documents from the document store.

Parameters:

  • document_ids (list[str]) – IDs of the documents to delete.

Raises:

  • MissingDocumentError – if no document was deleted but document IDs were provided.

delete_all_documents

delete_all_documents() -> None

Deletes all documents from the document store.

delete_by_filter

delete_by_filter(filters: dict[str, Any]) -> int

Deletes documents that match the provided filters.

Parameters:

  • filters (dict[str, Any]) – The filters to apply to find documents to delete.

Returns:

  • int – The number of documents deleted.

Raises:

  • AstraDocumentStoreFilterError – if the filter is invalid or not supported.

update_by_filter

update_by_filter(filters: dict[str, Any], meta: dict[str, Any]) -> int

Updates documents that match the provided filters with the given metadata.

Parameters:

  • filters (dict[str, Any]) – The filters to apply to find documents to update.
  • meta (dict[str, Any]) – The metadata fields to update. This will be merged with existing metadata.

Returns:

  • int – The number of documents updated.

Raises:

  • AstraDocumentStoreFilterError – if the filter is invalid or not supported.

count_documents_by_filter

count_documents_by_filter(filters: dict[str, Any]) -> int

Applies a filter and counts the documents that matched it.

Parameters:

  • filters (dict[str, Any]) – The filters to apply to the document list.

Returns:

  • int – The number of documents that match the filter.

count_unique_metadata_by_filter

count_unique_metadata_by_filter(
    filters: dict[str, Any], metadata_fields: list[str]
) -> dict[str, int]

Applies a filter selecting documents and counts the unique values for each meta field of the matched documents.

Parameters:

  • filters (dict[str, Any]) – The filters to apply to the document list.
  • metadata_fields (list[str]) – The metadata fields to count unique values for.

Returns:

  • dict[str, int] – A dictionary where the keys are the metadata field names and the values are the count of unique values.

get_metadata_fields_info

get_metadata_fields_info() -> dict[str, dict[str, str]]

Returns the metadata fields and the corresponding types.

Returns:

  • dict[str, dict[str, str]] – A dictionary mapping field names to dictionaries with a type key.

get_metadata_field_min_max

get_metadata_field_min_max(metadata_field: str) -> dict[str, Any]

For a given metadata field, find its max and min value.

Parameters:

  • metadata_field (str) – The metadata field to inspect.

Returns:

  • dict[str, Any] – A dictionary with min and max.

get_metadata_field_unique_values

get_metadata_field_unique_values(
    metadata_field: str,
    search_term: str | None = None,
    from_: int = 0,
    size: int = 10,
) -> tuple[list[str], int]

Retrieves unique values for a field matching a search term or all possible values if no search term is given.

Parameters:

  • metadata_field (str) – The metadata field to inspect.
  • search_term (str | None) – Optional case-insensitive substring search term.
  • from_ (int) – The starting index for pagination.
  • size (int) – The number of values to return.

Returns:

  • tuple[list[str], int] – A tuple containing the paginated values and the total count.

haystack_integrations.document_stores.astra.errors

AstraDocumentStoreError

Bases: DocumentStoreError

Parent class for all AstraDocumentStore errors.

AstraDocumentStoreFilterError

Bases: FilterError

Raised when an invalid filter is passed to AstraDocumentStore.

AstraDocumentStoreConfigError

Bases: AstraDocumentStoreError

Raised when an invalid configuration is passed to AstraDocumentStore.