| title | Valkey |
|---|---|
| id | integrations-valkey |
| description | Valkey integration for Haystack |
| slug | /integrations-valkey |
A component for retrieving documents from a ValkeyDocumentStore using vector similarity search.
This retriever uses dense embeddings to find semantically similar documents. It supports filtering by metadata fields and configurable similarity thresholds.
Key features:
- Vector similarity search using HNSW algorithm
- Metadata filtering with tag and numeric field support
- Configurable top-k results
- Filter policy management for runtime filter application
Usage example:
from haystack.document_stores.types import DuplicatePolicy
from haystack import Document
from haystack import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack_integrations.components.retrievers.valkey import ValkeyEmbeddingRetriever
from haystack_integrations.document_stores.valkey import ValkeyDocumentStore
document_store = ValkeyDocumentStore(index_name="my_index", embedding_dim=768)
documents = [Document(content="There are over 7,000 languages spoken around the world today."),
Document(content="Elephants have been observed to behave in a way that indicates..."),
Document(content="In certain places, you can witness the phenomenon of bioluminescent waves.")]
document_embedder = SentenceTransformersDocumentEmbedder()
document_embedder.warm_up()
documents_with_embeddings = document_embedder.run(documents)
document_store.write_documents(documents_with_embeddings.get("documents"), policy=DuplicatePolicy.OVERWRITE)
query_pipeline = Pipeline()
query_pipeline.add_component("text_embedder", SentenceTransformersTextEmbedder())
query_pipeline.add_component("retriever", ValkeyEmbeddingRetriever(document_store=document_store))
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
query = "How many languages are there?"
res = query_pipeline.run({"text_embedder": {"text": query}})
assert res['retriever']['documents'][0].content == "There are over 7,000 languages spoken around the world today."__init__(
*,
document_store: ValkeyDocumentStore,
filters: dict[str, Any] | None = None,
top_k: int = 10,
filter_policy: str | FilterPolicy = FilterPolicy.REPLACE
)Parameters:
- document_store (
ValkeyDocumentStore) – The Valkey Document Store. - filters (
dict[str, Any] | None) – Filters applied to the retrieved Documents. - top_k (
int) – Maximum number of Documents to return. - filter_policy (
str | FilterPolicy) – Policy to determine how filters are applied.
Raises:
ValueError– Ifdocument_storeis not an instance ofValkeyDocumentStore.
to_dict() -> dict[str, Any]Serializes the component to a dictionary.
Returns:
dict[str, Any]– Dictionary with serialized data.
from_dict(data: dict[str, Any]) -> ValkeyEmbeddingRetrieverDeserializes the component from a dictionary.
Parameters:
- data (
dict[str, Any]) – Dictionary to deserialize from.
Returns:
ValkeyEmbeddingRetriever– Deserialized component.
run(
query_embedding: list[float],
filters: dict[str, Any] | None = None,
top_k: int | None = None,
) -> dict[str, list[Document]]Retrieve documents from the ValkeyDocumentStore, based on their dense embeddings.
Parameters:
- query_embedding (
list[float]) – Embedding of the query. - filters (
dict[str, Any] | None) – Filters applied to the retrieved Documents. The way runtime filters are applied depends on thefilter_policychosen at retriever initialization. See init method docstring for more details. - top_k (
int | None) – Maximum number ofDocuments to return.
Returns:
dict[str, list[Document]]– List of Document similar toquery_embedding.
run_async(
query_embedding: list[float],
filters: dict[str, Any] | None = None,
top_k: int | None = None,
) -> dict[str, list[Document]]Asynchronously retrieve documents from the ValkeyDocumentStore, based on their dense embeddings.
Parameters:
- query_embedding (
list[float]) – Embedding of the query. - filters (
dict[str, Any] | None) – Filters applied to the retrieved Documents. The way runtime filters are applied depends on thefilter_policychosen at retriever initialization. See init method docstring for more details. - top_k (
int | None) – Maximum number ofDocuments to return.
Returns:
dict[str, list[Document]]– List of Document similar toquery_embedding.
Bases: DocumentStore
A document store implementation using Valkey with vector search capabilities.
This document store provides persistent storage for documents with embeddings and supports vector similarity search using the Valkey Search module. It's designed for high-performance retrieval applications requiring both semantic search and metadata filtering.
Key features:
- Vector similarity search with HNSW algorithm
- Metadata filtering on tag and numeric fields
- Configurable distance metrics (L2, cosine, inner product)
- Batch operations for efficient document management
- Both synchronous and asynchronous operations
- Cluster and standalone mode support
Supported filterable Document metadata fields:
- meta_category (TagField): exact string matches
- meta_status (TagField): status filtering
- meta_priority (NumericField): numeric comparisons
- meta_score (NumericField): score filtering
- meta_timestamp (NumericField): date/time filtering
Usage example:
from haystack import Document
from haystack_integrations.document_stores.valkey import ValkeyDocumentStore
# Initialize document store
document_store = ValkeyDocumentStore(
nodes_list=[("localhost", 6379)],
index_name="my_documents",
embedding_dim=768,
distance_metric="cosine"
)
# Store documents with embeddings
documents = [
Document(
content="Valkey is a Redis-compatible database",
embedding=[0.1, 0.2, ...], # 768-dim vector
meta={"category": "database", "priority": 1}
)
]
document_store.write_documents(documents)
# Search with filters
results = document_store._embedding_retrival(
embedding=[0.1, 0.15, ...],
filters={"field": "meta.category", "operator": "==", "value": "database"},
limit=10
)__init__(
nodes_list: list[tuple[str, int]] | None = None,
*,
cluster_mode: bool = False,
use_tls: bool = False,
username: Secret | None = Secret.from_env_var(
"VALKEY_USERNAME", strict=False
),
password: Secret | None = Secret.from_env_var(
"VALKEY_PASSWORD", strict=False
),
request_timeout: int = 500,
retry_attempts: int = 3,
retry_base_delay_ms: int = 1000,
retry_exponent_base: int = 2,
batch_size: int = 100,
index_name: str = "default",
distance_metric: Literal["l2", "cosine", "ip"] = "cosine",
embedding_dim: int = 768,
metadata_fields: dict[str, type[str] | type[int]] | None = None
)Creates a new ValkeyDocumentStore instance.
Parameters:
- nodes_list (
list[tuple[str, int]] | None) – List of (host, port) tuples for Valkey nodes. Defaults to [("localhost", 6379)]. - cluster_mode (
bool) – Whether to connect in cluster mode. Defaults to False. - use_tls (
bool) – Whether to use TLS for connections. Defaults to False. - username (
Secret | None) – Username for authentication. If not provided, reads from VALKEY_USERNAME environment variable. Defaults to None. - password (
Secret | None) – Password for authentication. If not provided, reads from VALKEY_PASSWORD environment variable. Defaults to None. - request_timeout (
int) – Request timeout in milliseconds. Defaults to 500. - retry_attempts (
int) – Number of retry attempts for failed operations. Defaults to 3. - retry_base_delay_ms (
int) – Base delay in milliseconds for exponential backoff. Defaults to 1000. - retry_exponent_base (
int) – Exponent base for exponential backoff calculation. Defaults to 2. - batch_size (
int) – Number of documents to process in a single batch for async operations. Defaults to 100. - index_name (
str) – Name of the search index. Defaults to "haystack_document". - distance_metric (
Literal['l2', 'cosine', 'ip']) – Distance metric for vector similarity. Options: "l2", "cosine", "ip" (inner product). Defaults to "cosine". - embedding_dim (
int) – Dimension of document embeddings. Defaults to 768. - metadata_fields (
dict[str, type[str] | type[int]] | None) – Dictionary mapping metadata field names to Python types for filtering. Supported types: str (for exact matching), int (for numeric comparisons). Example:{"category": str, "priority": int}. If not provided, no metadata fields will be indexed for filtering.
to_dict() -> dict[str, Any]Serializes this store to a dictionary.
from_dict(data: dict[str, Any]) -> ValkeyDocumentStoreDeserializes the store from a dictionary.
count_documents() -> intReturn the number of documents stored in the document store.
This method queries the Valkey Search index to get the total count of indexed documents. If the index doesn't exist, it returns 0.
Returns:
int– The number of documents in the document store.
Raises:
ValkeyDocumentStoreError– If there's an error accessing the index or counting documents.
Example:
document_store = ValkeyDocumentStore()
count = document_store.count_documents()
print(f"Total documents: {count}")count_documents_async() -> intAsynchronously return the number of documents stored in the document store.
This method queries the Valkey Search index to get the total count of indexed documents. If the index doesn't exist, it returns 0. This is the async version of count_documents().
Returns:
int– The number of documents in the document store.
Raises:
ValkeyDocumentStoreError– If there's an error accessing the index or counting documents.
Example:
document_store = ValkeyDocumentStore()
count = await document_store.count_documents_async()
print(f"Total documents: {count}")filter_documents(filters: dict[str, Any] | None = None) -> list[Document]Filter documents by metadata without vector search.
This method retrieves documents based on metadata filters without performing vector similarity search. Since Valkey Search requires vector queries, this method uses a dummy vector internally and removes the similarity scores from results.
Parameters:
- filters (
dict[str, Any] | None) – Optional metadata filters in Haystack format. Supports filtering on: - meta.category (string equality)
- meta.status (string equality)
- meta.priority (numeric comparisons)
- meta.score (numeric comparisons)
- meta.timestamp (numeric comparisons)
Returns:
list[Document]– List of documents matching the filters, with score set to None.
Raises:
ValkeyDocumentStoreError– If there's an error filtering documents.
Example:
# Filter by category
docs = document_store.filter_documents(
filters={"field": "meta.category", "operator": "==", "value": "news"}
)
# Filter by numeric range
docs = document_store.filter_documents(
filters={"field": "meta.priority", "operator": ">=", "value": 5}
)filter_documents_async(filters: dict[str, Any] | None = None) -> list[Document]Asynchronously filter documents by metadata without vector search.
This is the async version of filter_documents(). It retrieves documents based on metadata filters without performing vector similarity search. Since Valkey Search requires vector queries, this method uses a dummy vector internally and removes the similarity scores from results.
Parameters:
- filters (
dict[str, Any] | None) – Optional metadata filters in Haystack format. Supports filtering on: - meta.category (string equality)
- meta.status (string equality)
- meta.priority (numeric comparisons)
- meta.score (numeric comparisons)
- meta.timestamp (numeric comparisons)
Returns:
list[Document]– List of documents matching the filters, with score set to None.
Raises:
ValkeyDocumentStoreError– If there's an error filtering documents.
Example:
# Filter by category
docs = await document_store.filter_documents_async(
filters={"field": "meta.category", "operator": "==", "value": "news"}
)
# Filter by numeric range
docs = await document_store.filter_documents_async(
filters={"field": "meta.priority", "operator": ">=", "value": 5}
)write_documents(
documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
) -> intWrite documents to the document store.
This method stores documents with their embeddings and metadata in Valkey. The search index is automatically created if it doesn't exist. Documents without embeddings will be assigned a dummy vector for indexing purposes.
Parameters:
- documents (
list[Document]) – List of Document objects to store. Each document should have: - content: The document text
- embedding: Vector representation (optional, dummy vector used if missing)
- meta: Optional metadata dict with supported fields (category, status, priority, score, timestamp)
- policy (
DuplicatePolicy) – How to handle duplicate documents. Only NONE and OVERWRITE are supported. Defaults to DuplicatePolicy.NONE.
Returns:
int– Number of documents successfully written.
Raises:
ValkeyDocumentStoreError– If there's an error writing documents.ValueError– If documents list contains invalid objects.
Example:
documents = [
Document(
content="First document",
embedding=[0.1, 0.2, 0.3],
meta={"category": "news", "priority": 1}
),
Document(
content="Second document",
embedding=[0.4, 0.5, 0.6],
meta={"category": "blog", "priority": 2}
)
]
count = document_store.write_documents(documents)
print(f"Wrote {count} documents")write_documents_async(
documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
) -> intAsynchronously write documents to the document store.
This is the async version of write_documents(). It stores documents with their embeddings and metadata in Valkey using batch processing for improved performance. The search index is automatically created if it doesn't exist.
Parameters:
- documents (
list[Document]) – List of Document objects to store. Each document should have: - content: The document text
- embedding: Vector representation (optional, dummy vector used if missing)
- meta: Optional metadata dict with supported fields (category, status, priority, score, timestamp)
- policy (
DuplicatePolicy) – How to handle duplicate documents. Only NONE and OVERWRITE are supported. Defaults to DuplicatePolicy.NONE.
Returns:
int– Number of documents successfully written.
Raises:
ValkeyDocumentStoreError– If there's an error writing documents.ValueError– If documents list contains invalid objects.
Example:
documents = [
Document(
content="First document",
embedding=[0.1, 0.2, 0.3],
meta={"category": "news", "priority": 1}
),
Document(
content="Second document",
embedding=[0.4, 0.5, 0.6],
meta={"category": "blog", "priority": 2}
)
]
count = await document_store.write_documents_async(documents)
print(f"Wrote {count} documents")delete_documents(document_ids: list[str]) -> NoneDelete documents from the document store by their IDs.
This method removes documents from both the Valkey database and the search index. If some documents are not found, a warning is logged but the operation continues.
Parameters:
- document_ids (
list[str]) – List of document IDs to delete. These should be the same IDs used when the documents were originally stored.
Raises:
ValkeyDocumentStoreError– If there's an error deleting documents.
Example:
# Delete specific documents
document_store.delete_documents(["doc1", "doc2", "doc3"])
# Delete a single document
document_store.delete_documents(["single_doc_id"])delete_documents_async(document_ids: list[str]) -> NoneAsynchronously delete documents from the document store by their IDs.
This is the async version of delete_documents(). It removes documents from both the Valkey database and the search index. If some documents are not found, a warning is logged but the operation continues.
Parameters:
- document_ids (
list[str]) – List of document IDs to delete. These should be the same IDs used when the documents were originally stored.
Raises:
ValkeyDocumentStoreError– If there's an error deleting documents.
Example:
# Delete specific documents
await document_store.delete_documents_async(["doc1", "doc2", "doc3"])
# Delete a single document
await document_store.delete_documents_async(["single_doc_id"])delete_by_filter(filters: dict[str, Any]) -> intDelete all documents that match the provided filters.
Parameters:
- filters (
dict[str, Any]) – Haystack filter dictionary to select documents to delete.
Returns:
int– The number of documents deleted.
Raises:
FilterError– If the filter structure is invalid.ValkeyDocumentStoreError– If deletion fails.
delete_by_filter_async(filters: dict[str, Any]) -> intAsynchronously delete all documents that match the provided filters.
Parameters:
- filters (
dict[str, Any]) – Haystack filter dictionary to select documents to delete.
Returns:
int– The number of documents deleted.
Raises:
FilterError– If the filter structure is invalid.ValkeyDocumentStoreError– If deletion fails.
update_by_filter(filters: dict[str, Any], meta: dict[str, Any]) -> intUpdate metadata of all documents that match the provided filters.
Parameters:
- filters (
dict[str, Any]) – Haystack filter dictionary to select documents to update. - meta (
dict[str, Any]) – Metadata key-value pairs to set on matching documents (merged with existing meta).
Returns:
int– The number of documents updated.
Raises:
FilterError– If the filter structure is invalid.ValkeyDocumentStoreError– If update or write fails.
update_by_filter_async(filters: dict[str, Any], meta: dict[str, Any]) -> intAsynchronously update metadata of all documents that match the provided filters.
Parameters:
- filters (
dict[str, Any]) – Haystack filter dictionary to select documents to update. - meta (
dict[str, Any]) – Metadata key-value pairs to set on matching documents (merged with existing meta).
Returns:
int– The number of documents updated.
Raises:
FilterError– If the filter structure is invalid.ValkeyDocumentStoreError– If update or write fails.
count_documents_by_filter(filters: dict[str, Any]) -> intReturn the number of documents that match the provided filters.
Parameters:
- filters (
dict[str, Any]) – Haystack filter dictionary to apply.
Returns:
int– The number of matching documents.
Raises:
FilterError– If the filter structure is invalid.ValkeyDocumentStoreError– If counting fails.
count_documents_by_filter_async(filters: dict[str, Any]) -> intAsynchronously return the number of documents that match the provided filters.
Parameters:
- filters (
dict[str, Any]) – Haystack filter dictionary to apply.
Returns:
int– The number of matching documents.
Raises:
FilterError– If the filter structure is invalid.ValkeyDocumentStoreError– If counting fails.
count_unique_metadata_by_filter(
filters: dict[str, Any], metadata_fields: list[str]
) -> dict[str, int]Count unique values for each specified metadata field in documents matching the filters.
Parameters:
- filters (
dict[str, Any]) – Haystack filter dictionary to select documents. - metadata_fields (
list[str]) – List of metadata field names (e.g. "category" or "meta.category").
Returns:
dict[str, int]– Dictionary mapping each field name to the count of its unique values.
Raises:
FilterError– If the filter structure is invalid.ValueError– If a field in metadata_fields is not configured for filtering.ValkeyDocumentStoreError– If the operation fails.
count_unique_metadata_by_filter_async(
filters: dict[str, Any], metadata_fields: list[str]
) -> dict[str, int]Asynchronously count unique values for each specified metadata field in documents matching the filters.
Parameters:
- filters (
dict[str, Any]) – Haystack filter dictionary to select documents. - metadata_fields (
list[str]) – List of metadata field names (e.g. "category" or "meta.category").
Returns:
dict[str, int]– Dictionary mapping each field name to the count of its unique values.
Raises:
FilterError– If the filter structure is invalid.ValueError– If a field in metadata_fields is not configured for filtering.ValkeyDocumentStoreError– If the operation fails.
get_metadata_fields_info() -> dict[str, dict[str, str]]Return information about metadata fields configured for filtering.
Returns the store's configured metadata field names and their types (as used in the index). Field names are returned without the "meta." prefix (e.g. "category", "priority").
Returns:
dict[str, dict[str, str]]– Dictionary mapping field name to a dict with "type" key ("keyword" for tag, "long" for numeric).
get_metadata_field_min_max(metadata_field: str) -> dict[str, Any]Return the minimum and maximum values for a numeric metadata field.
Parameters:
- metadata_field (
str) – Metadata field name (e.g. "priority" or "meta.priority"). Must be a configured numeric field.
Returns:
dict[str, Any]– Dictionary with "min" and "max" keys (values are int/float or None if no values).
Raises:
ValueError– If the field is not configured or is not numeric.ValkeyDocumentStoreError– If the operation fails.
get_metadata_field_min_max_async(metadata_field: str) -> dict[str, Any]Asynchronously return the minimum and maximum values for a numeric metadata field.
Parameters:
- metadata_field (
str) – Metadata field name (e.g. "priority" or "meta.priority"). Must be a configured numeric field.
Returns:
dict[str, Any]– Dictionary with "min" and "max" keys (values are int/float or None if no values).
Raises:
ValueError– If the field is not configured or is not numeric.ValkeyDocumentStoreError– If the operation fails.
get_metadata_field_unique_values(
metadata_field: str,
search_term: str | None = None,
from_: int = 0,
size: int = 10,
) -> tuple[list[str], int]Return unique values for a metadata field with optional search and pagination.
Values are stringified. For tag fields the distinct values are returned; for numeric fields the string representation of each distinct value is returned.
Parameters:
- metadata_field (
str) – Metadata field name (e.g. "category" or "meta.category"). - search_term (
str | None) – Optional case-insensitive substring filter on the value. - from_ (
int) – Start index for pagination (default 0). - size (
int) – Number of values to return (default 10).
Returns:
tuple[list[str], int]– Tuple of (list of unique values for the requested page, total count of unique values).
Raises:
ValueError– If the field is not configured for filtering.ValkeyDocumentStoreError– If the operation fails.
get_metadata_field_unique_values_async(
metadata_field: str,
search_term: str | None = None,
from_: int = 0,
size: int = 10,
) -> tuple[list[str], int]Asynchronously return unique values for a metadata field with optional search and pagination.
Parameters:
- metadata_field (
str) – Metadata field name (e.g. "category" or "meta.category"). - search_term (
str | None) – Optional case-insensitive substring filter on the value. - from_ (
int) – Start index for pagination (default 0). - size (
int) – Number of values to return (default 10).
Returns:
tuple[list[str], int]– Tuple of (list of unique values for the requested page, total count of unique values).
Raises:
ValueError– If the field is not configured for filtering.ValkeyDocumentStoreError– If the operation fails.
delete_all_documents() -> NoneDelete all documents from the document store.
This method removes all documents by dropping the entire search index. This is an efficient way to clear all data but requires recreating the index for future operations. If the index doesn't exist, the operation completes without error.
Raises:
ValkeyDocumentStoreError– If there's an error dropping the index.
Warning: This operation is irreversible and will permanently delete all documents and the search index.
Example:
# Clear all documents from the store
document_store.delete_all_documents()
# The index will be automatically recreated on next write operation
document_store.write_documents(new_documents)delete_all_documents_async() -> NoneAsynchronously delete all documents from the document store.
This is the async version of delete_all_documents(). It removes all documents by dropping the entire search index. This is an efficient way to clear all data but requires recreating the index for future operations. If the index doesn't exist, the operation completes without error.
Raises:
ValkeyDocumentStoreError– If there's an error dropping the index.
Warning: This operation is irreversible and will permanently delete all documents and the search index.
Example:
# Clear all documents from the store
await document_store.delete_all_documents_async()
# The index will be automatically recreated on next write operation
await document_store.write_documents_async(new_documents)Valkey document store filtering utilities.
This module provides filter conversion from Haystack's filter format to Valkey Search query syntax. It supports both tag-based exact matching and numeric range filtering with logical operators.
Supported filter operations:
- TagField filters: ==, !=, in, not in (exact string matches)
- NumericField filters: ==, !=, >, >=, <, <=, in, not in (numeric comparisons)
- Logical operators: AND, OR for combining conditions
Filter syntax examples:
# Simple equality filter
filters = {"field": "meta.category", "operator": "==", "value": "tech"}
# Numeric range filter
filters = {"field": "meta.priority", "operator": ">=", "value": 5}
# List membership filter
filters = {"field": "meta.status", "operator": "in", "value": ["active", "pending"]}
# Complex logical filter
filters = {
"operator": "AND",
"conditions": [
{"field": "meta.category", "operator": "==", "value": "tech"},
{"field": "meta.priority", "operator": ">=", "value": 3}
]
}