Skip to content

Commit f5e580c

Browse files
feat: Add operations to PineConeDocumentStore (#2772)
* Add operations to PineConeDocumentStore * Fix lint in document store.py * Type lint issues fixed in document_store.py * refactor methods to use helper functions to avoid duplication in sync/async methods * Add support for boolean and keywords in get_metadata_field_min_max and fix documentation * Add Lint fixes for get_metadata_min_max_impl --------- Co-authored-by: David S. Batista <dsbatista@gmail.com>
1 parent b1768be commit f5e580c

3 files changed

Lines changed: 651 additions & 0 deletions

File tree

integrations/pinecone/src/haystack_integrations/document_stores/pinecone/document_store.py

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,3 +740,331 @@ def _prepare_documents_for_writing(
740740
)
741741

742742
return self._convert_documents_to_pinecone_format(documents)
743+
744+
@staticmethod
745+
def _count_documents_impl(documents: list[Document]) -> int:
746+
"""Helper method to count documents and log warning if at TOP_K_LIMIT."""
747+
count = len(documents)
748+
if count == TOP_K_LIMIT:
749+
logger.warning(
750+
f"Count reached Pinecone's limit of {TOP_K_LIMIT} documents. "
751+
f"The actual number of matching documents may be higher."
752+
)
753+
return count
754+
755+
@staticmethod
756+
def _count_unique_metadata_impl(documents: list[Document], metadata_fields: list[str]) -> dict[str, int]:
757+
"""Helper method to count unique metadata values across specified fields."""
758+
result = {}
759+
for field in metadata_fields:
760+
unique_values = set()
761+
for doc in documents:
762+
if doc.meta and field in doc.meta:
763+
value = doc.meta[field]
764+
# Handle list values
765+
if isinstance(value, list):
766+
unique_values.update(value)
767+
else:
768+
unique_values.add(value)
769+
result[field] = len(unique_values)
770+
771+
if len(documents) == TOP_K_LIMIT:
772+
logger.warning(
773+
f"Analysis limited to {TOP_K_LIMIT} documents due to Pinecone's limits. "
774+
f"Unique value counts may be incomplete."
775+
)
776+
return result
777+
778+
@staticmethod
779+
def _get_metadata_fields_info_impl(documents: list[Document]) -> dict[str, dict[str, str]]:
780+
"""Helper method to infer metadata field types from documents."""
781+
if not documents:
782+
return {}
783+
784+
field_types: dict[str, dict[str, str]] = {}
785+
786+
# Check if any document has content
787+
if any(doc.content is not None for doc in documents):
788+
field_types["content"] = {"type": "text"}
789+
790+
# Collect all field values to infer types accurately
791+
field_samples: dict[str, set[str]] = {}
792+
793+
for doc in documents:
794+
if doc.meta:
795+
for field, value in doc.meta.items():
796+
if field not in field_samples:
797+
field_samples[field] = set()
798+
799+
# Note: bool check MUST come before int/float because bool is a subclass of int in Python
800+
if isinstance(value, bool):
801+
field_samples[field].add("boolean")
802+
elif isinstance(value, (int, float)):
803+
field_samples[field].add("long")
804+
elif isinstance(value, str):
805+
field_samples[field].add("keyword")
806+
elif isinstance(value, list):
807+
# For lists, check the type of elements if list is non-empty
808+
if value:
809+
# Sample first element to determine list type
810+
if isinstance(value[0], str):
811+
field_samples[field].add("keyword")
812+
elif isinstance(value[0], (int, float)):
813+
field_samples[field].add("long")
814+
elif isinstance(value[0], bool):
815+
field_samples[field].add("boolean")
816+
else:
817+
# Empty list, default to keyword
818+
field_samples[field].add("keyword")
819+
820+
# Assign types based on collected samples
821+
for field, types_seen in field_samples.items():
822+
if len(types_seen) == 1:
823+
# Consistent type across all documents
824+
field_types[field] = {"type": types_seen.pop()}
825+
else:
826+
# Mixed types - default to keyword and log warning
827+
logger.warning(
828+
f"Field '{field}' has mixed types {types_seen} across documents. "
829+
f"Defaulting to 'keyword' type. Consider using consistent types for better query performance."
830+
)
831+
field_types[field] = {"type": "keyword"}
832+
833+
if len(documents) == TOP_K_LIMIT:
834+
logger.info(
835+
f"Schema inference based on {TOP_K_LIMIT} documents (Pinecone's query limit). "
836+
f"If you have more documents with different metadata fields, they won't be reflected here."
837+
)
838+
839+
return field_types
840+
841+
@staticmethod
842+
def _get_metadata_field_min_max_impl(documents: list[Document], metadata_field: str) -> dict[str, Any]:
843+
"""Helper method to get min/max values for a metadata field (supports numeric, boolean, and string types)."""
844+
values: list[bool | int | float | str] = []
845+
for doc in documents:
846+
if doc.meta and metadata_field in doc.meta:
847+
value = doc.meta[metadata_field]
848+
# Note: bool check must come before numeric because bool is subclass of int
849+
if isinstance(value, bool):
850+
values.append(value)
851+
elif isinstance(value, (int, float)):
852+
values.append(value)
853+
elif isinstance(value, str):
854+
values.append(value)
855+
856+
if not values:
857+
msg = f"No values found for metadata field '{metadata_field}'"
858+
raise ValueError(msg)
859+
860+
result = {"min": min(values), "max": max(values)}
861+
862+
if len(documents) == TOP_K_LIMIT:
863+
logger.warning(
864+
f"Min/max calculation limited to {TOP_K_LIMIT} documents. "
865+
f"Results may not reflect the true min/max across all documents."
866+
)
867+
868+
return result
869+
870+
@staticmethod
871+
def _get_metadata_field_unique_values_impl(
872+
documents: list[Document], metadata_field: str, search_term: str | None, from_: int, size: int
873+
) -> tuple[list[str], int]:
874+
"""Helper method to get unique values for a metadata field with search and pagination."""
875+
unique_values: set[str] = set()
876+
for doc in documents:
877+
if doc.meta and metadata_field in doc.meta:
878+
value = doc.meta[metadata_field]
879+
# Handle list values
880+
if isinstance(value, list):
881+
unique_values.update(str(v) for v in value)
882+
else:
883+
unique_values.add(str(value))
884+
885+
# Convert to sorted list
886+
unique_values_list = sorted(unique_values)
887+
888+
# Apply search term filter if provided
889+
if search_term:
890+
search_term_lower = search_term.lower()
891+
unique_values_list = [v for v in unique_values_list if search_term_lower in v.lower()]
892+
893+
total_count = len(unique_values_list)
894+
895+
# Apply pagination
896+
paginated_values = unique_values_list[from_ : from_ + size]
897+
898+
if len(documents) == TOP_K_LIMIT:
899+
logger.warning(f"Unique values extraction limited to {TOP_K_LIMIT} documents. Results may be incomplete.")
900+
901+
return paginated_values, total_count
902+
903+
def count_documents_by_filter(self, filters: dict[str, Any]) -> int:
904+
"""
905+
Returns the count of documents that match the provided filters.
906+
907+
Note: Due to Pinecone's limitations, this method fetches documents and counts them.
908+
For large result sets, this is subject to Pinecone's TOP_K_LIMIT of 1000 documents.
909+
910+
:param filters: The filters to apply to the document list.
911+
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
912+
:returns: The number of documents that match the filters.
913+
"""
914+
documents = self.filter_documents(filters=filters)
915+
return self._count_documents_impl(documents)
916+
917+
async def count_documents_by_filter_async(self, filters: dict[str, Any]) -> int:
918+
"""
919+
Asynchronously returns the count of documents that match the provided filters.
920+
921+
Note: Due to Pinecone's limitations, this method fetches documents and counts them.
922+
For large result sets, this is subject to Pinecone's TOP_K_LIMIT of 1000 documents.
923+
924+
:param filters: The filters to apply to the document list.
925+
:returns: The number of documents that match the filters.
926+
"""
927+
documents = await self.filter_documents_async(filters=filters)
928+
return self._count_documents_impl(documents)
929+
930+
def count_unique_metadata_by_filter(self, filters: dict[str, Any], metadata_fields: list[str]) -> dict[str, int]:
931+
"""
932+
Counts unique values for each specified metadata field in documents matching the filters.
933+
934+
Note: Due to Pinecone's limitations, this method fetches documents and aggregates in Python.
935+
Subject to Pinecone's TOP_K_LIMIT of 1000 documents.
936+
937+
:param filters: The filters to apply to select documents.
938+
:param metadata_fields: List of metadata field names to count unique values for.
939+
:returns: Dictionary mapping field names to counts of unique values.
940+
"""
941+
documents = self.filter_documents(filters=filters)
942+
return self._count_unique_metadata_impl(documents, metadata_fields)
943+
944+
async def count_unique_metadata_by_filter_async(
945+
self, filters: dict[str, Any], metadata_fields: list[str]
946+
) -> dict[str, int]:
947+
"""
948+
Asynchronously counts unique values for each specified metadata field in documents matching the filters.
949+
950+
Note: Due to Pinecone's limitations, this method fetches documents and aggregates in Python.
951+
Subject to Pinecone's TOP_K_LIMIT of 1000 documents.
952+
953+
:param filters: The filters to apply to select documents.
954+
:param metadata_fields: List of metadata field names to count unique values for.
955+
:returns: Dictionary mapping field names to counts of unique values.
956+
"""
957+
documents = await self.filter_documents_async(filters=filters)
958+
return self._count_unique_metadata_impl(documents, metadata_fields)
959+
960+
def get_metadata_fields_info(self) -> dict[str, dict[str, str]]:
961+
"""
962+
Returns information about metadata fields and their types by sampling documents.
963+
964+
Note: Pinecone doesn't provide a schema introspection API, so this method infers field types
965+
by examining the metadata of documents stored in the index (up to 1000 documents).
966+
967+
Type mappings:
968+
- 'text': Document content field
969+
- 'keyword': String metadata values
970+
- 'long': Numeric metadata values (int or float)
971+
- 'boolean': Boolean metadata values
972+
973+
:returns: Dictionary mapping field names to type information.
974+
Example: {'content': {'type': 'text'}, 'category': {'type': 'keyword'}, 'priority': {'type': 'long'}}
975+
"""
976+
documents = self.filter_documents(filters=None)
977+
return self._get_metadata_fields_info_impl(documents)
978+
979+
async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]:
980+
"""
981+
Asynchronously returns information about metadata fields and their types by sampling documents.
982+
983+
Note: Pinecone doesn't provide a schema introspection API, so this method infers field types
984+
by examining the metadata of documents stored in the index (up to 1000 documents).
985+
986+
Type mappings:
987+
- 'text': Document content field
988+
- 'keyword': String metadata values
989+
- 'long': Numeric metadata values (int or float)
990+
- 'boolean': Boolean metadata values
991+
992+
:returns: Dictionary mapping field names to type information.
993+
Example: {'content': {'type': 'text'}, 'category': {'type': 'keyword'}, 'priority': {'type': 'long'}}
994+
"""
995+
documents = await self.filter_documents_async(filters=None)
996+
return self._get_metadata_fields_info_impl(documents)
997+
998+
def get_metadata_field_min_max(self, metadata_field: str) -> dict[str, Any]:
999+
"""
1000+
Returns the minimum and maximum values for a metadata field.
1001+
1002+
Supports numeric (int, float), boolean, and string (keyword) types:
1003+
- Numeric: Returns min/max based on numeric value
1004+
- Boolean: Returns False as min, True as max
1005+
- String: Returns min/max based on alphabetical ordering
1006+
1007+
Note: This method fetches all documents and computes min/max in Python.
1008+
Subject to Pinecone's TOP_K_LIMIT of 1000 documents.
1009+
1010+
:param metadata_field: The metadata field name to analyze.
1011+
:returns: Dictionary with 'min' and 'max' keys.
1012+
:raises ValueError: If the field doesn't exist or has no values.
1013+
"""
1014+
documents = self.filter_documents(filters=None)
1015+
return self._get_metadata_field_min_max_impl(documents, metadata_field)
1016+
1017+
async def get_metadata_field_min_max_async(self, metadata_field: str) -> dict[str, Any]:
1018+
"""
1019+
Asynchronously returns the minimum and maximum values for a metadata field.
1020+
1021+
Supports numeric (int, float), boolean, and string (keyword) types:
1022+
- Numeric: Returns min/max based on numeric value
1023+
- Boolean: Returns False as min, True as max
1024+
- String: Returns min/max based on alphabetical ordering
1025+
1026+
Note: This method fetches all documents and computes min/max in Python.
1027+
Subject to Pinecone's TOP_K_LIMIT of 1000 documents.
1028+
1029+
:param metadata_field: The metadata field name to analyze.
1030+
:returns: Dictionary with 'min' and 'max' keys.
1031+
:raises ValueError: If the field doesn't exist or has no values.
1032+
"""
1033+
documents = await self.filter_documents_async(filters=None)
1034+
return self._get_metadata_field_min_max_impl(documents, metadata_field)
1035+
1036+
def get_metadata_field_unique_values(
1037+
self, metadata_field: str, search_term: str | None = None, from_: int = 0, size: int = 10
1038+
) -> tuple[list[str], int]:
1039+
"""
1040+
Retrieves unique values for a metadata field with optional search and pagination.
1041+
1042+
Note: This method fetches documents and extracts unique values in Python.
1043+
Subject to Pinecone's TOP_K_LIMIT of 1000 documents.
1044+
1045+
:param metadata_field: The metadata field name to get unique values for.
1046+
:param search_term: Optional search term to filter values (case-insensitive substring match).
1047+
:param from_: Starting offset for pagination (default: 0).
1048+
:param size: Number of values to return (default: 10).
1049+
:returns: Tuple of (list of unique values, total count of matching values).
1050+
"""
1051+
documents = self.filter_documents(filters=None)
1052+
return self._get_metadata_field_unique_values_impl(documents, metadata_field, search_term, from_, size)
1053+
1054+
async def get_metadata_field_unique_values_async(
1055+
self, metadata_field: str, search_term: str | None = None, from_: int = 0, size: int = 10
1056+
) -> tuple[list[str], int]:
1057+
"""
1058+
Asynchronously retrieves unique values for a metadata field with optional search and pagination.
1059+
1060+
Note: This method fetches documents and extracts unique values in Python.
1061+
Subject to Pinecone's TOP_K_LIMIT of 1000 documents.
1062+
1063+
:param metadata_field: The metadata field name to get unique values for.
1064+
:param search_term: Optional search term to filter values (case-insensitive substring match).
1065+
:param from_: Starting offset for pagination (default: 0).
1066+
:param size: Number of values to return (default: 10).
1067+
:returns: Tuple of (list of unique values, total count of matching values).
1068+
"""
1069+
documents = await self.filter_documents_async(filters=None)
1070+
return self._get_metadata_field_unique_values_impl(documents, metadata_field, search_term, from_, size)

0 commit comments

Comments
 (0)