diff --git a/integrations/opensearch/docker-compose.yml b/integrations/opensearch/docker-compose.yml index 30b01d5c14..02b653e88b 100644 --- a/integrations/opensearch/docker-compose.yml +++ b/integrations/opensearch/docker-compose.yml @@ -1,6 +1,6 @@ services: opensearch: - image: "opensearchproject/opensearch:2.11.0" + image: "opensearchproject/opensearch:3.5.0" ports: - 9200:9200 - 9600:9600 @@ -8,6 +8,7 @@ services: environment: - discovery.type=single-node - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m" + - "OPENSEARCH_INITIAL_ADMIN_PASSWORD=SecureHaystack!2026" healthcheck: test: curl --fail https://localhost:9200/_cat/health -ku admin:admin || exit 1 interval: 10s diff --git a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py index 439d2335ab..986a4414cc 100644 --- a/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py +++ b/integrations/opensearch/src/haystack_integrations/document_stores/opensearch/document_store.py @@ -2003,12 +2003,9 @@ def _query_sql(self, query: str, fetch_size: int | None = None) -> dict[str, Any if fetch_size is not None: body["fetch_size"] = fetch_size - params = {"format": "json"} - response_data = self._client.transport.perform_request( method="POST", url="/_plugins/_sql", - params=params, body=body, ) @@ -2039,12 +2036,9 @@ async def _query_sql_async(self, query: str, fetch_size: int | None = None) -> d if fetch_size is not None: body["fetch_size"] = fetch_size - params = {"format": "json"} - response_data = await self._async_client.transport.perform_request( method="POST", url="/_plugins/_sql", - params=params, body=body, ) diff --git a/integrations/opensearch/tests/conftest.py b/integrations/opensearch/tests/conftest.py index 72b112b5ae..42f52de7eb 100644 --- a/integrations/opensearch/tests/conftest.py +++ b/integrations/opensearch/tests/conftest.py @@ -28,11 +28,11 @@ def document_store(): store = OpenSearchDocumentStore( hosts=hosts, index=index, - http_auth=("admin", "admin"), + http_auth=("admin", "SecureHaystack!2026"), verify_certs=False, embedding_dim=768, return_embedding=True, - method={"space_type": "cosinesimil", "engine": "nmslib", "name": "hnsw"}, + method={"space_type": "cosinesimil", "engine": "lucene", "name": "hnsw"}, ) store._ensure_initialized() yield store @@ -52,11 +52,11 @@ def document_store_2(): store = OpenSearchDocumentStore( hosts=hosts, index=index, - http_auth=("admin", "admin"), + http_auth=("admin", "SecureHaystack!2026"), verify_certs=False, embedding_dim=768, return_embedding=False, - method={"space_type": "cosinesimil", "engine": "nmslib", "name": "hnsw"}, + method={"space_type": "cosinesimil", "engine": "lucene", "name": "hnsw"}, ) yield store @@ -81,10 +81,10 @@ def document_store_readonly(): store = OpenSearchDocumentStore( hosts=hosts, index=index, - http_auth=("admin", "admin"), + http_auth=("admin", "SecureHaystack!2026"), verify_certs=False, embedding_dim=768, - method={"space_type": "cosinesimil", "engine": "nmslib", "name": "hnsw"}, + method={"space_type": "cosinesimil", "engine": "lucene", "name": "hnsw"}, create_index=False, ) store._ensure_initialized() @@ -109,11 +109,11 @@ def document_store_embedding_dim_4_no_emb_returned(): store = OpenSearchDocumentStore( hosts=hosts, index=index, - http_auth=("admin", "admin"), + http_auth=("admin", "SecureHaystack!2026"), verify_certs=False, embedding_dim=4, return_embedding=False, - method={"space_type": "cosinesimil", "engine": "nmslib", "name": "hnsw"}, + method={"space_type": "cosinesimil", "engine": "lucene", "name": "hnsw"}, ) yield store @@ -133,7 +133,7 @@ def document_store_embedding_dim_4_no_emb_returned_faiss(): store = OpenSearchDocumentStore( hosts=hosts, index=index, - http_auth=("admin", "admin"), + http_auth=("admin", "SecureHaystack!2026"), verify_certs=False, embedding_dim=4, method={"space_type": "innerproduct", "engine": "faiss", "name": "hnsw"}, diff --git a/integrations/opensearch/tests/test_document_store.py b/integrations/opensearch/tests/test_document_store.py index 9107788dce..5b55307cea 100644 --- a/integrations/opensearch/tests/test_document_store.py +++ b/integrations/opensearch/tests/test_document_store.py @@ -164,12 +164,24 @@ def document_store(self, document_store): def assert_documents_are_equal(self, received: list[Document], expected: list[Document]): """ - The OpenSearchDocumentStore.filter_documents() method returns a Documents with their score set. - We don't want to compare the score, so we set it to None before comparing the documents. + The OpenSearchDocumentStore.filter_documents() method returns documents with their score set. + + We don't want to compare the score, so we set it to None before comparing. + + Embeddings are not exactly the same when retrieved from OpenSearch (float round-trip), + so we compare them approximately and then set both to None for the final equality check. """ - for doc in received: - doc.score = None - assert received == expected + assert len(received) == len(expected) + received = sorted(received, key=lambda x: x.id) + expected = sorted(expected, key=lambda x: x.id) + for received_doc, expected_doc in zip(received, expected, strict=True): + received_doc.score = None + if received_doc.embedding is None: + assert expected_doc.embedding is None + else: + assert received_doc.embedding == pytest.approx(expected_doc.embedding) + received_doc.embedding, expected_doc.embedding = None, None + assert received_doc == expected_doc def test_write_documents(self, document_store: OpenSearchDocumentStore): docs = [Document(id="1")] @@ -983,24 +995,15 @@ def test_query_sql(self, document_store: OpenSearchDocumentStore): # Verify raw JSON response structure assert isinstance(result, dict) - assert "hits" in result - assert "hits" in result["hits"] - assert len(result["hits"]["hits"]) == 2 # Two documents with category A - - # Extract _source from each hit - hits = result["hits"]["hits"] - assert all(isinstance(hit, dict) and "_source" in hit for hit in hits) - - categories = [hit["_source"].get("category") for hit in hits] - assert all(cat == "A" for cat in categories) - - # verify all expected fields are present in _source - for hit in hits: - source = hit["_source"] - assert "content" in source - assert "category" in source - assert "status" in source - assert "priority" in source + assert "schema" in result + assert "datarows" in result + assert "size" in result + assert "status" in result + assert [entry["name"] for entry in result["schema"]] == ["content", "category", "status", "priority"] + assert len(result["datarows"]) == 2 # Two documents with category A + + categories = [row[1] for row in result["datarows"]] + assert all(category == "A" for category in categories) # error handling for invalid SQL query invalid_query = "SELECT * FROM non_existent_index" diff --git a/integrations/opensearch/tests/test_document_store_async.py b/integrations/opensearch/tests/test_document_store_async.py index e6ce4faffb..c72927d9b3 100644 --- a/integrations/opensearch/tests/test_document_store_async.py +++ b/integrations/opensearch/tests/test_document_store_async.py @@ -827,24 +827,15 @@ async def test_query_sql(self, document_store: OpenSearchDocumentStore): # Verify raw JSON response structure assert isinstance(result, dict) - assert "hits" in result - assert "hits" in result["hits"] - assert len(result["hits"]["hits"]) == 2 # Two documents with category A + assert "schema" in result + assert "datarows" in result + assert "size" in result + assert "status" in result + assert [entry["name"] for entry in result["schema"]] == ["content", "category", "status", "priority"] + assert len(result["datarows"]) == 2 # Two documents with category A - # Extract _source from each hit - hits = result["hits"]["hits"] - assert all(isinstance(hit, dict) and "_source" in hit for hit in hits) - - categories = [hit["_source"].get("category") for hit in hits] - assert all(cat == "A" for cat in categories) - - # verify all expected fields are present in _source - for hit in hits: - source = hit["_source"] - assert "content" in source - assert "category" in source - assert "status" in source - assert "priority" in source + categories = [row[1] for row in result["datarows"]] + assert all(category == "A" for category in categories) # error handling for invalid SQL query invalid_query = "SELECT * FROM non_existent_index" @@ -869,9 +860,14 @@ async def test_query_sql_async_with_fetch_size(self, document_store: OpenSearchD # Should return raw JSON response (exact count depends on OpenSearch behavior) assert isinstance(result, dict) - assert "hits" in result - assert "hits" in result["hits"] - assert all(isinstance(hit, dict) and "_source" in hit for hit in result["hits"]["hits"]) + assert "schema" in result + assert "datarows" in result + assert "size" in result + assert "status" in result + assert [entry["name"] for entry in result["schema"]] == ["content", "category", "index"] + assert len(result["datarows"]) > 0 + assert len(result["datarows"]) <= 5 + assert result.get("cursor") is not None @pytest.mark.integration @pytest.mark.asyncio @@ -889,13 +885,14 @@ async def test_query_sql_async_pagination_flow(self, document_store: OpenSearchD # Query with small fetch_size to test pagination result = await document_store._query_sql_async(sql_query, fetch_size=10) assert isinstance(result, dict) - assert "hits" in result - assert "hits" in result["hits"] - assert len(result["hits"]["hits"]) > 0 - - # Verify all results have expected fields in _source - for hit in result["hits"]["hits"]: - source = hit["_source"] - assert "content" in source - assert "category" in source - assert "index" in source + assert "schema" in result + assert "datarows" in result + assert "size" in result + assert "status" in result + assert [entry["name"] for entry in result["schema"]] == ["content", "category", "index"] + assert len(result["datarows"]) > 0 + assert len(result["datarows"]) <= 10 + + # Verify all results contain expected row columns + for row in result["datarows"]: + assert len(row) == 3 diff --git a/integrations/opensearch/tests/test_filters.py b/integrations/opensearch/tests/test_filters.py index c69f7f4044..9ac2cf95d3 100644 --- a/integrations/opensearch/tests/test_filters.py +++ b/integrations/opensearch/tests/test_filters.py @@ -229,9 +229,19 @@ def test_normalize_ranges(): class TestFilters(FilterDocumentsTest): def assert_documents_are_equal(self, received: list[Document], expected: list[Document]): """ - The OpenSearchDocumentStore.filter_documents() method returns a Documents with their score set. - We don't want to compare the score, so we set it to None before comparing the documents. + The OpenSearchDocumentStore.filter_documents() method returns documents with their score set. + We don't want to compare the score, so we set it to None before comparing. + Embeddings are not exactly the same when retrieved from OpenSearch (float round-trip), + so we compare them approximately and then set both to None for the final equality check. """ - for doc in received: - doc.score = None - assert received == expected + assert len(received) == len(expected) + received = sorted(received, key=lambda x: x.id) + expected = sorted(expected, key=lambda x: x.id) + for received_doc, expected_doc in zip(received, expected, strict=True): + received_doc.score = None + if received_doc.embedding is None: + assert expected_doc.embedding is None + else: + assert received_doc.embedding == pytest.approx(expected_doc.embedding) + received_doc.embedding, expected_doc.embedding = None, None + assert received_doc == expected_doc diff --git a/integrations/opensearch/tests/test_sql_retriever.py b/integrations/opensearch/tests/test_sql_retriever.py index d8ba60ef9a..d84760c4b7 100644 --- a/integrations/opensearch/tests/test_sql_retriever.py +++ b/integrations/opensearch/tests/test_sql_retriever.py @@ -52,7 +52,7 @@ def test_from_dict(_mock_opensearch_client): @pytest.mark.integration def test_sql_retriever_basic_query_hits_format(document_store: OpenSearchDocumentStore): - """Test regular SELECT query - verifies raw JSON response with hits structure""" + """Test regular SELECT query - verifies raw response""" docs = [ Document(content="Python programming", meta={"category": "A", "status": "active", "priority": 1}), Document(content="Java programming", meta={"category": "B", "status": "active", "priority": 2}), @@ -72,31 +72,23 @@ def test_sql_retriever_basic_query_hits_format(document_store: OpenSearchDocumen response = result["result"] assert isinstance(response, dict) - # Verify raw OpenSearch JSON response structure - assert "_shards" in response - assert "hits" in response - assert "took" in response - assert "timed_out" in response - # Verify hits structure - hits = response["hits"] - assert "total" in hits - assert "hits" in hits - assert len(hits["hits"]) == 2 - - # Verify each hit contains _source with selected fields - for hit in hits["hits"]: - assert "_source" in hit - assert "_index" in hit - source = hit["_source"] - assert "content" in source - assert "category" in source - assert source["category"] == "A" + assert "total" in response + assert "size" in response + assert "status" in response + + # Verify the schema contains all the selected fields + assert all( + field in [entry["name"] for entry in response["schema"]] + for field in ["content", "category", "status", "priority"] + ) + # Verify datarows contain the expected number of rows and columns + assert len(response["datarows"]) == 2 @pytest.mark.integration def test_sql_retriever_count_query_aggregations_format(document_store: OpenSearchDocumentStore): - """Test aggregate query (COUNT) - verifies raw JSON response with aggregations structure""" + """Test aggregate query (COUNT) - verifies tabular SQL response format.""" docs = [ Document(content="Doc 1", meta={"category": "A"}), Document(content="Doc 2", meta={"category": "B"}), @@ -112,27 +104,25 @@ def test_sql_retriever_count_query_aggregations_format(document_store: OpenSearc response = result["result"] assert isinstance(response, dict) - # Verify raw OpenSearch JSON response structure - assert "_shards" in response - assert "aggregations" in response - assert "hits" in response - assert "took" in response - assert "timed_out" in response + # Verify tabular SQL response structure + assert "schema" in response + assert "datarows" in response + assert "size" in response + assert "status" in response - # Verify aggregations structure - aggregations = response["aggregations"] - assert "total" in aggregations - assert aggregations["total"]["value"] == 3 + # Verify COUNT schema and value + assert len(response["schema"]) == 1 + assert response["schema"][0]["name"] == "COUNT(*)" + assert response["schema"][0].get("alias") == "total" + assert response["schema"][0]["type"] == "long" - # Verify hits structure (should be empty for aggregate queries) - hits = response["hits"] - assert "total" in hits - assert len(hits.get("hits", [])) == 0 + assert len(response["datarows"]) == 1 + assert response["datarows"][0] == [3] @pytest.mark.integration def test_sql_retriever_metadata_extraction(document_store: OpenSearchDocumentStore): - """Test extracting metadata fields - verifies raw JSON response structure""" + """Test extracting metadata fields - verifies tabular SQL response structure.""" docs = [ Document( content="Python tutorial", @@ -162,34 +152,36 @@ def test_sql_retriever_metadata_extraction(document_store: OpenSearchDocumentSto response = result["result"] assert isinstance(response, dict) - # Verify raw response structure - assert "hits" in response - hits = response["hits"] - assert len(hits["hits"]) == 2 - - # Verify _source contains only selected metadata fields - authors = [] - for hit in hits["hits"]: - source = hit["_source"] - assert "author" in source - assert "year" in source - assert "rating" in source - assert "content" not in source - assert source["year"] >= 2023 - authors.append(source["author"]) + # Verify tabular SQL response structure + assert "schema" in response + assert "datarows" in response + assert "size" in response + assert "status" in response + + # Verify schema contains only selected metadata fields in query order + assert [entry["name"] for entry in response["schema"]] == ["author", "year", "rating"] + + # Verify returned rows + rows = response["datarows"] + assert len(rows) == 2 + + authors = [row[0] for row in rows] + years = [row[1] for row in rows] + ratings = [row[2] for row in rows] assert "Jane Smith" in authors assert "John Doe" in authors + assert all(year >= 2023 for year in years) # Verify ordering by rating DESC - assert hits["hits"][0]["_source"]["rating"] >= hits["hits"][1]["_source"]["rating"] + assert ratings[0] >= ratings[1] @pytest.mark.integration def test_sql_retriever_runtime_document_store_switching( document_store: OpenSearchDocumentStore, document_store_2: OpenSearchDocumentStore ): - """Test switching document stores at runtime""" + """Test switching document stores at runtime with tabular SQL responses.""" docs1 = [ Document(content="Python programming", meta={"category": "A"}), Document(content="Java programming", meta={"category": "B"}), @@ -207,20 +199,21 @@ def test_sql_retriever_runtime_document_store_switching( # Query first store sql_query1 = f"SELECT content, category FROM {document_store._index} WHERE category = 'A'" # noqa: S608 result1 = retriever.run(query=sql_query1) - assert len(result1["result"]["hits"]["hits"]) == 1 - assert "Python" in result1["result"]["hits"]["hits"][0]["_source"]["content"] + response1 = result1["result"] + assert [entry["name"] for entry in response1["schema"]] == ["content", "category"] + assert len(response1["datarows"]) == 1 + assert "Python" in response1["datarows"][0][0] # Query second store at runtime sql_query2 = f"SELECT content, category FROM {document_store_2._index} WHERE category = 'C'" # noqa: S608 result2 = retriever.run(query=sql_query2, document_store=document_store_2) - assert len(result2["result"]["hits"]["hits"]) == 1 - assert "JavaScript" in result2["result"]["hits"]["hits"][0]["_source"]["content"] + response2 = result2["result"] + assert [entry["name"] for entry in response2["schema"]] == ["content", "category"] + assert len(response2["datarows"]) == 1 + assert "JavaScript" in response2["datarows"][0][0] # Verify results are different - assert ( - result1["result"]["hits"]["hits"][0]["_source"]["content"] - != result2["result"]["hits"]["hits"][0]["_source"]["content"] - ) + assert response1["datarows"][0][0] != response2["datarows"][0][0] @pytest.mark.integration @@ -240,7 +233,7 @@ def test_sql_retriever_error_handling(document_store: OpenSearchDocumentStore): @pytest.mark.integration def test_sql_retriever_with_fetch_size(document_store: OpenSearchDocumentStore): - """Test SQL retriever with fetch_size parameter""" + """Test SQL retriever with fetch_size parameter and tabular response format.""" docs = [Document(content=f"Document {i}", meta={"category": "A", "index": i}) for i in range(15)] document_store.write_documents(docs, refresh=True) @@ -254,21 +247,34 @@ def test_sql_retriever_with_fetch_size(document_store: OpenSearchDocumentStore): result = retriever.run(query=sql_query) assert "result" in result assert isinstance(result["result"], dict) - assert "hits" in result["result"] - assert len(result["result"]["hits"]["hits"]) > 0 + response = result["result"] + assert "schema" in response + assert "datarows" in response + assert "size" in response + assert "status" in response + assert [entry["name"] for entry in response["schema"]] == ["content", "category", "index"] + assert len(response["datarows"]) > 0 + assert len(response["datarows"]) <= 5 + assert response.get("cursor") is not None # Test with runtime fetch_size override result2 = retriever.run(query=sql_query, fetch_size=10) assert "result" in result2 assert isinstance(result2["result"], dict) - assert "hits" in result2["result"] - assert len(result2["result"]["hits"]["hits"]) > 0 + response2 = result2["result"] + assert "schema" in response2 + assert "datarows" in response2 + assert "size" in response2 + assert "status" in response2 + assert [entry["name"] for entry in response2["schema"]] == ["content", "category", "index"] + assert len(response2["datarows"]) > 0 + assert len(response2["datarows"]) <= 10 @pytest.mark.integration @pytest.mark.asyncio async def test_sql_retriever_async_basic_query(document_store: OpenSearchDocumentStore): - """Test basic async SQL query execution""" + """Test basic async SQL query execution with tabular SQL response.""" docs = [ Document(content="Python programming", meta={"category": "A", "status": "active", "priority": 1}), Document(content="Java programming", meta={"category": "B", "status": "active", "priority": 2}), @@ -286,11 +292,15 @@ async def test_sql_retriever_async_basic_query(document_store: OpenSearchDocumen assert "result" in result response = result["result"] assert isinstance(response, dict) - assert "hits" in response - assert len(response["hits"]["hits"]) == 2 + assert "schema" in response + assert "datarows" in response + assert "size" in response + assert "status" in response + assert [entry["name"] for entry in response["schema"]] == ["content", "category", "status"] + assert len(response["datarows"]) == 2 - categories = [hit["_source"]["category"] for hit in response["hits"]["hits"]] - assert all(cat == "A" for cat in categories) + categories = [row[1] for row in response["datarows"]] + assert all(category == "A" for category in categories) @pytest.mark.integration @@ -298,7 +308,7 @@ async def test_sql_retriever_async_basic_query(document_store: OpenSearchDocumen async def test_sql_retriever_async_runtime_document_store_switching( document_store: OpenSearchDocumentStore, document_store_2: OpenSearchDocumentStore ): - """Test async switching document stores at runtime""" + """Test async switching document stores at runtime with tabular SQL responses.""" docs1 = [ Document(content="Python programming", meta={"category": "A"}), Document(content="Java programming", meta={"category": "B"}), @@ -316,20 +326,21 @@ async def test_sql_retriever_async_runtime_document_store_switching( # Query first store sql_query1 = f"SELECT content, category FROM {document_store._index} WHERE category = 'A'" # noqa: S608 result1 = await retriever.run_async(query=sql_query1) - assert len(result1["result"]["hits"]["hits"]) == 1 - assert "Python" in result1["result"]["hits"]["hits"][0]["_source"]["content"] + response1 = result1["result"] + assert [entry["name"] for entry in response1["schema"]] == ["content", "category"] + assert len(response1["datarows"]) == 1 + assert "Python" in response1["datarows"][0][0] # Query second store at runtime sql_query2 = f"SELECT content, category FROM {document_store_2._index} WHERE category = 'C'" # noqa: S608 result2 = await retriever.run_async(query=sql_query2, document_store=document_store_2) - assert len(result2["result"]["hits"]["hits"]) == 1 - assert "JavaScript" in result2["result"]["hits"]["hits"][0]["_source"]["content"] + response2 = result2["result"] + assert [entry["name"] for entry in response2["schema"]] == ["content", "category"] + assert len(response2["datarows"]) == 1 + assert "JavaScript" in response2["datarows"][0][0] # Verify results are different - assert ( - result1["result"]["hits"]["hits"][0]["_source"]["content"] - != result2["result"]["hits"]["hits"][0]["_source"]["content"] - ) + assert response1["datarows"][0][0] != response2["datarows"][0][0] @pytest.mark.integration @@ -351,7 +362,7 @@ async def test_sql_retriever_async_error_handling(document_store: OpenSearchDocu @pytest.mark.integration @pytest.mark.asyncio async def test_sql_retriever_async_with_fetch_size(document_store: OpenSearchDocumentStore): - """Test async SQL retriever with fetch_size parameter""" + """Test async SQL retriever with fetch_size using tabular SQL responses.""" docs = [Document(content=f"Document {i}", meta={"category": "A", "index": i}) for i in range(15)] await document_store.write_documents_async(docs, refresh=True) @@ -365,12 +376,25 @@ async def test_sql_retriever_async_with_fetch_size(document_store: OpenSearchDoc result = await retriever.run_async(query=sql_query) assert "result" in result assert isinstance(result["result"], dict) - assert "hits" in result["result"] - assert len(result["result"]["hits"]["hits"]) > 0 + response = result["result"] + assert "schema" in response + assert "datarows" in response + assert "size" in response + assert "status" in response + assert [entry["name"] for entry in response["schema"]] == ["content", "category", "index"] + assert len(response["datarows"]) > 0 + assert len(response["datarows"]) <= 5 + assert response.get("cursor") is not None # Test with runtime fetch_size override result2 = await retriever.run_async(query=sql_query, fetch_size=10) assert "result" in result2 assert isinstance(result2["result"], dict) - assert "hits" in result2["result"] - assert len(result2["result"]["hits"]["hits"]) > 0 + response2 = result2["result"] + assert "schema" in response2 + assert "datarows" in response2 + assert "size" in response2 + assert "status" in response2 + assert [entry["name"] for entry in response2["schema"]] == ["content", "category", "index"] + assert len(response2["datarows"]) > 0 + assert len(response2["datarows"]) <= 10