Skip to content

Commit 873a4dc

Browse files
committed
cleaning up
1 parent 2010261 commit 873a4dc

1 file changed

Lines changed: 10 additions & 22 deletions

File tree

  • integrations/opensearch/src/haystack_integrations/document_stores/opensearch

integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,9 +1093,6 @@ async def count_documents_by_filter_async(self, filters: dict) -> int:
10931093
def _build_cardinality_aggregations(index_mapping: dict[str, Any]) -> dict[str, Any]:
10941094
"""
10951095
Builds cardinality aggregations for all metadata fields in the index mapping.
1096-
1097-
:param index_mapping: The properties mapping from the index.
1098-
:returns: Dictionary of aggregations keyed by field name.
10991096
"""
11001097
special_fields = {"content", "embedding", "id", "score", "blob", "sparse_embedding"}
11011098
aggs = {}
@@ -1108,10 +1105,6 @@ def _build_cardinality_aggregations(index_mapping: dict[str, Any]) -> dict[str,
11081105
def _build_distinct_values_query_body(filters: dict, aggs: dict[str, Any]) -> dict[str, Any]:
11091106
"""
11101107
Builds the query body for distinct values counting with filters and aggregations.
1111-
1112-
:param filters: The filters to apply, or empty dict for no filters.
1113-
:param aggs: The aggregations to include in the query.
1114-
:returns: The query body dictionary.
11151108
"""
11161109
if filters:
11171110
normalized_filters = normalize_filters(filters)
@@ -1134,10 +1127,6 @@ def _extract_distinct_counts_from_aggregations(
11341127
) -> dict[str, int]:
11351128
"""
11361129
Extracts distinct value counts from search result aggregations.
1137-
1138-
:param aggregations: The aggregations from the search result.
1139-
:param index_mapping: The properties mapping from the index.
1140-
:returns: Dictionary mapping field names to their distinct value counts.
11411130
"""
11421131
special_fields = {"content", "embedding", "id", "score", "blob", "sparse_embedding"}
11431132
distinct_counts = {}
@@ -1459,6 +1448,9 @@ def query_sql(self, query: str, response_format: ResponseFormat = "json") -> Any
14591448
:param response_format: The format of the response. See https://docs.opensearch.org/latest/search-plugins/sql/response-formats/
14601449
:returns: The query results in the specified format. For JSON format, returns a list of dictionaries
14611450
(the _source from each hit). For other formats (csv, jdbc, raw), returns the response as text.
1451+
1452+
NOTE: For non-JSON formats (csv, jdbc, raw), use requests to make a raw HTTP request and get the text response
1453+
This avoids deserialization issues with the opensearchpy client.
14621454
"""
14631455
self._ensure_initialized()
14641456
assert self._client is not None
@@ -1501,9 +1493,8 @@ def query_sql(self, query: str, response_format: ResponseFormat = "json") -> Any
15011493

15021494
return self._process_sql_response(response_data, response_format)
15031495
except SerializationError:
1504-
# If we get here, it means requests failed above (likely AWS auth)
1505-
# and opensearchpy can't deserialize the response
1506-
# Re-raise as DocumentStoreError with a helpful message
1496+
# If we get here, it means requests failed above (likely AWS auth) and opensearchpy can't deserialize the
1497+
# response. Re-raise as DocumentStoreError with a helpful message
15071498
msg = (
15081499
f"Failed to execute SQL query in OpenSearch: Unable to deserialize {response_format} response. "
15091500
f"This format may not be supported with the current authentication method."
@@ -1521,6 +1512,9 @@ async def query_sql_async(self, query: str, response_format: ResponseFormat = "j
15211512
:param response_format: The format of the response. See https://docs.opensearch.org/latest/search-plugins/sql/response-formats/
15221513
:returns: The query results in the specified format. For JSON format, returns a list of dictionaries
15231514
(the _source from each hit). For other formats (csv, jdbc, raw), returns the response as text.
1515+
1516+
NOTE: For non-JSON formats (csv, jdbc, raw), use httpx AsyncClient to make a raw HTTP request and get the text
1517+
response. This avoids deserialization issues with the opensearchpy client.
15241518
"""
15251519
await self._ensure_initialized_async()
15261520
assert self._async_client is not None
@@ -1545,12 +1539,7 @@ async def query_sql_async(self, query: str, response_format: ResponseFormat = "j
15451539
)
15461540
response.raise_for_status()
15471541
return response.text
1548-
except ImportError:
1549-
# httpx not available, fall through to opensearchpy
1550-
pass
15511542
except Exception as e:
1552-
# If httpx fails (e.g., AWS auth), fall back to opensearchpy
1553-
# which will raise SerializationError that we can handle
15541543
logger.error(f"Failed to execute SQL query in OpenSearch: {e!s}")
15551544

15561545
try:
@@ -1566,9 +1555,8 @@ async def query_sql_async(self, query: str, response_format: ResponseFormat = "j
15661555

15671556
return self._process_sql_response(response_data, response_format)
15681557
except SerializationError:
1569-
# If we get here, it means httpx failed above (likely AWS auth or not installed)
1570-
# and opensearchpy can't deserialize the response
1571-
# Re-raise as DocumentStoreError with a helpful message
1558+
# If we get here, it means httpx failed above (likely AWS auth or not installed) and opensearchpy can't
1559+
# deserialize the response. Re-raise as DocumentStoreError with a helpful message
15721560
msg = (
15731561
f"Failed to execute SQL query in OpenSearch: Unable to deserialize {response_format} response. "
15741562
f"This format may not be supported with the current authentication method. "

0 commit comments

Comments
 (0)