22#
33# SPDX-License-Identifier: Apache-2.0
44
5- import json
65from collections .abc import Mapping
76from math import exp
87from typing import Any , Literal , Optional , Union
98
9+ import httpx
1010import requests
1111from haystack import default_from_dict , default_to_dict , logging
1212from haystack .dataclasses import Document
@@ -1418,6 +1418,39 @@ async def get_field_unique_values_async(
14181418
14191419 return unique_values , total_count
14201420
1421+ def _prepare_sql_http_request_params (
1422+ self , base_url : str , response_format : ResponseFormat
1423+ ) -> tuple [str , dict [str , str ], Any ]:
1424+ """
1425+ Prepares HTTP request parameters for SQL query execution.
1426+ """
1427+ url = f"{ base_url } /_plugins/_sql?format={ response_format } "
1428+ headers = {"Content-Type" : "application/json" }
1429+ auth = None
1430+ if self ._http_auth :
1431+ if isinstance (self ._http_auth , tuple ):
1432+ auth = self ._http_auth
1433+ elif isinstance (self ._http_auth , AWSAuth ):
1434+ # For AWS auth, we need to use the opensearchpy client
1435+ # Fall through to the try/except below
1436+ pass
1437+ return url , headers , auth
1438+
1439+ @staticmethod
1440+ def _process_sql_response (response_data : Any , response_format : ResponseFormat ) -> Any :
1441+ """
1442+ Processes the SQL query response data.
1443+ """
1444+ if response_format == "json" :
1445+ # extract only the query results
1446+ if isinstance (response_data , dict ) and "hits" in response_data :
1447+ hits = response_data .get ("hits" , {}).get ("hits" , [])
1448+ # extract _source from each hit, which contains the actual document data
1449+ return [hit .get ("_source" , {}) for hit in hits ]
1450+ return response_data
1451+ else :
1452+ return response_data if isinstance (response_data , str ) else str (response_data )
1453+
14211454 def query_sql (self , query : str , response_format : ResponseFormat = "json" ) -> Any :
14221455 """
14231456 Execute a raw OpenSearch SQL query against the index.
@@ -1436,18 +1469,8 @@ def query_sql(self, query: str, response_format: ResponseFormat = "json") -> Any
14361469 # Get connection info from the transport
14371470 connection = self ._client .transport .get_connection ()
14381471 base_url = connection .host
1439- url = f"{ base_url } /_plugins/_sql?format={ response_format } "
1440-
1441- headers = {"Content-Type" : "application/json" }
1442- auth = None
1443- if self ._http_auth :
1444- if isinstance (self ._http_auth , tuple ):
1445- auth = self ._http_auth
1446- elif isinstance (self ._http_auth , AWSAuth ):
1447- # For AWS auth, we need to use the opensearchpy client
1448- # Fall through to the try/except below
1449- pass
1450-
1472+ url , headers , auth = self ._prepare_sql_http_request_params (base_url , response_format )
1473+
14511474 verify = self ._verify_certs if self ._verify_certs is not None else True
14521475 timeout = self ._timeout if self ._timeout is not None else 30.0
14531476 response = requests .post (
@@ -1463,33 +1486,28 @@ def query_sql(self, query: str, response_format: ResponseFormat = "json") -> Any
14631486 except Exception as e :
14641487 # If requests fails (e.g., AWS auth), fall back to opensearchpy
14651488 # which will raise SerializationError that we can handle
1466- pass
1467-
1489+ logger . error ( f"Failed to execute SQL query in OpenSearch: { e !s } " )
1490+
14681491 try :
14691492 body = {"query" : query }
14701493 params = {"format" : response_format }
1471-
1494+
14721495 response_data = self ._client .transport .perform_request (
14731496 method = "POST" ,
14741497 url = "/_plugins/_sql" ,
14751498 params = params ,
14761499 body = body ,
14771500 )
14781501
1479- if response_format == "json" :
1480- # extract only the query results
1481- if isinstance (response_data , dict ) and "hits" in response_data :
1482- hits = response_data .get ("hits" , {}).get ("hits" , [])
1483- # extract _source from each hit, which contains the actual document data
1484- return [hit .get ("_source" , {}) for hit in hits ]
1485- return response_data
1486- else :
1487- return response_data if isinstance (response_data , str ) else str (response_data )
1502+ return self ._process_sql_response (response_data , response_format )
14881503 except SerializationError :
14891504 # If we get here, it means requests failed above (likely AWS auth)
14901505 # and opensearchpy can't deserialize the response
14911506 # Re-raise as DocumentStoreError with a helpful message
1492- msg = f"Failed to execute SQL query in OpenSearch: Unable to deserialize { response_format } response. This format may not be supported with the current authentication method."
1507+ msg = (
1508+ f"Failed to execute SQL query in OpenSearch: Unable to deserialize { response_format } response. "
1509+ f"This format may not be supported with the current authentication method."
1510+ )
14931511 raise DocumentStoreError (msg ) from None
14941512 except Exception as e :
14951513 msg = f"Failed to execute SQL query in OpenSearch: { e !s} "
@@ -1510,26 +1528,14 @@ async def query_sql_async(self, query: str, response_format: ResponseFormat = "j
15101528 # For non-JSON formats, use httpx directly to avoid deserialization issues
15111529 if response_format != "json" :
15121530 try :
1513- import httpx
1514-
15151531 # Get connection info from the transport
15161532 connection = self ._async_client .transport .get_connection ()
15171533 base_url = connection .host
1518- url = f"{ base_url } /_plugins/_sql?format={ response_format } "
1519-
1520- headers = {"Content-Type" : "application/json" }
1521- auth = None
1522- if self ._http_auth :
1523- if isinstance (self ._http_auth , tuple ):
1524- auth = self ._http_auth
1525- elif isinstance (self ._http_auth , AWSAuth ):
1526- # For AWS auth, we need to use the opensearchpy client
1527- # Fall through to the try/except below
1528- pass
1529-
1534+ url , headers , auth = self ._prepare_sql_http_request_params (base_url , response_format )
1535+
15301536 verify = self ._verify_certs if self ._verify_certs is not None else True
15311537 timeout = httpx .Timeout (self ._timeout if self ._timeout else 30.0 )
1532-
1538+
15331539 async with httpx .AsyncClient (verify = verify , timeout = timeout ) as client :
15341540 response = await client .post (
15351541 url ,
@@ -1545,33 +1551,29 @@ async def query_sql_async(self, query: str, response_format: ResponseFormat = "j
15451551 except Exception as e :
15461552 # If httpx fails (e.g., AWS auth), fall back to opensearchpy
15471553 # which will raise SerializationError that we can handle
1548- pass
1554+ logger . error ( f"Failed to execute SQL query in OpenSearch: { e !s } " )
15491555
15501556 try :
15511557 body = {"query" : query }
15521558 params = {"format" : response_format }
1553-
1559+
15541560 response_data = await self ._async_client .transport .perform_request (
15551561 method = "POST" ,
15561562 url = "/_plugins/_sql" ,
15571563 params = params ,
15581564 body = body ,
15591565 )
15601566
1561- if response_format == "json" :
1562- # extract only the query results
1563- if isinstance (response_data , dict ) and "hits" in response_data :
1564- hits = response_data .get ("hits" , {}).get ("hits" , [])
1565- # extract _source from each hit, which contains the actual document data
1566- return [hit .get ("_source" , {}) for hit in hits ]
1567- return response_data
1568- else :
1569- return response_data if isinstance (response_data , str ) else str (response_data )
1567+ return self ._process_sql_response (response_data , response_format )
15701568 except SerializationError :
15711569 # If we get here, it means httpx failed above (likely AWS auth or not installed)
15721570 # and opensearchpy can't deserialize the response
15731571 # Re-raise as DocumentStoreError with a helpful message
1574- msg = f"Failed to execute SQL query in OpenSearch: Unable to deserialize { response_format } response. This format may not be supported with the current authentication method. Consider installing httpx for better support."
1572+ msg = (
1573+ f"Failed to execute SQL query in OpenSearch: Unable to deserialize { response_format } response. "
1574+ f"This format may not be supported with the current authentication method. "
1575+ f"Consider installing httpx for better support."
1576+ )
15751577 raise DocumentStoreError (msg ) from None
15761578 except Exception as e :
15771579 msg = f"Failed to execute SQL query in OpenSearch: { e !s} "
0 commit comments