From 00c96c98e4d9f28802cdc4bfc006a1f4153faa95 Mon Sep 17 00:00:00 2001 From: Songkan Tang Date: Tue, 16 Jun 2026 06:41:14 +0000 Subject: [PATCH] Bring CalcitePPLJoinIT to parity on the analytics-engine route MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CalcitePPLJoinIT failed on the analytics-engine route (parquet/composite store + DataFusion backend, -Dtests.analytics.parquet_indices=true) for three distinct reasons, none of which are real query defects. This brings the class to parity without weakening the assertions. 1. Non-idempotent seed inflated/diverged the shared state_country index. init() runs before every test method (@Before). After loadIndex() it unconditionally PUT _doc/5..8 to grow state_country from 4 to 8 rows. On the analytics-engine route the parquet/composite store is append-only and does not overwrite by _id, so re-running those PUTs every method accumulated duplicate rows and inflated downstream join counts (e.g. expected 6, got 60). Make the seed conditional, gated on the SAME `isIndexExist` check loadIndex uses: seed exactly when loadIndex (re)creates state_country, so seed and load stay in lockstep. (An earlier attempt used a static "seed once" latch, but that desyncs from the index — when the framework recreates state_country with only the 4 fixture rows, a latch already flipped true skips re-seeding and leaves the index at 4 rows, which broke the in-memory integTest on macOS/Windows CI. Gating on isIndexExist is correct regardless of the cluster's per-method index lifecycle.) 2. Column ordering. The analytics-engine route builds its scan schema from the serialized index mapping (getSourceAsMap), which OpenSearch returns in alphabetical field order, whereas the v2/Calcite path preserves declared order. Field-list/implicit-projection joins therefore returned the right rows with columns in a different order. Add explicit `| fields ...` to pin the projection order for the affected tests (testComplexSemiJoin, testComplexAntiJoin, testComplexSortPushDownForSMJWithMaxOptionAndFieldList). 3. Row ordering. The analytics-engine coordinator-reduce (RowProducingSink) appends Arrow batches in arrival order from the SEARCH-threadpool response handlers, so a query without ORDER BY has no guaranteed row order — unlike Calcite's deterministic enumerable execution. Cases: - testComplexRightJoin sorts by a column that is null for the right-only rows, leaving their relative order unspecified; switch verifyDataRowsInOrder -> verifyDataRows. - testInnerJoinWithRelationSubquery ends in `stats ... by` with no ORDER BY, so the two output groups come back in a route-dependent order (flaky); switch verifyDataRowsInOrder -> verifyDataRows. - The testCheckAccessTheReference* tests compare two alias-syntax variants to each other via assertJsonEquals on serialized datarows, which is order-sensitive. They only mean to assert the two variants return the same set of rows. Add MatcherUtils.assertJsonRowsEqualIgnoreOrder (multiset compare) and use it for those comparisons. Verified: standard route (integTest, in-memory) 43/43 pass; analytics-engine route drops from 33 failures to only the two remaining exact-equality-on-bare- text cases (testJoinComparing, testJoinSubsearchMaxOut), a separate route limitation (DYNAMIC_STRING_NO_KEYWORD) tracked elsewhere. Signed-off-by: Songkan Tang --- .../sql/calcite/remote/CalcitePPLJoinIT.java | 103 +++++++++++------- .../org/opensearch/sql/util/MatcherUtils.java | 24 ++++ 2 files changed, 90 insertions(+), 37 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java index 9aa9b022681..cdefe15155c 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java @@ -5,10 +5,12 @@ package org.opensearch.sql.calcite.remote; +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_HOBBIES; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_OCCUPATION; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY; import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals; +import static org.opensearch.sql.util.MatcherUtils.assertJsonRowsEqualIgnoreOrder; import static org.opensearch.sql.util.MatcherUtils.rows; import static org.opensearch.sql.util.MatcherUtils.schema; import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; @@ -32,29 +34,46 @@ public void init() throws Exception { enableCalcite(); supportAllJoinTypes(); + // Seed iff state_country is (re)created on this pass — kept in lockstep with loadIndex's own + // `if (!isIndexExist)` create-and-load guard. init() runs before every test method (@Before), + // and loadIndex skips the create+load when the index already exists, so the extra _doc/5..8 + // PUTs below must run on exactly the same condition. A standalone "seed once" latch would + // desync from the index: whenever the framework wipes/recreates state_country between methods + // (loadIndex re-creates the 4 fixture rows), a latch already flipped true would skip re-seeding + // and leave the index at 4 rows, breaking the joins that expect 8. Gating on isIndexExist keeps + // seed and load together. On the analytics-engine route the parquet/composite store is + // append-only and does not overwrite by _id, so re-running these PUTs every method would + // accumulate duplicate rows and inflate join counts — this guard prevents that too. + boolean seedStateCountry = !isIndexExist(client(), TestsConstants.TEST_INDEX_STATE_COUNTRY); loadIndex(Index.STATE_COUNTRY); loadIndex(Index.OCCUPATION); loadIndex(Index.HOBBIES); - Request request1 = - new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true"); - request1.setJsonEntity( - "{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}"); - client().performRequest(request1); - Request request2 = - new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true"); - request2.setJsonEntity( - "{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}"); - client().performRequest(request2); - Request request3 = - new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true"); - request3.setJsonEntity( - "{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}"); - client().performRequest(request3); - Request request4 = - new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true"); - request4.setJsonEntity( - "{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}"); - client().performRequest(request4); + if (seedStateCountry) { + Request request1 = + new Request( + "PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true"); + request1.setJsonEntity( + "{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}"); + client().performRequest(request1); + Request request2 = + new Request( + "PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true"); + request2.setJsonEntity( + "{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}"); + client().performRequest(request2); + Request request3 = + new Request( + "PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true"); + request3.setJsonEntity( + "{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}"); + client().performRequest(request3); + Request request4 = + new Request( + "PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true"); + request4.setJsonEntity( + "{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}"); + client().performRequest(request4); + } } @Test @@ -239,7 +258,11 @@ public void testComplexRightJoin() throws IOException { schema("occupation", "string"), schema("b.country", "string"), schema("salary", "int")); - verifyDataRowsInOrder( + // The four right-only rows all have a null a.age, so `sort a.age` leaves their relative order + // unspecified — DataFusion (analytics-engine route) breaks the tie differently than the + // v2/Calcite path. Assert membership rather than position; the four null-age rows and the two + // matched rows are all present. + verifyDataRows( actual, rows(null, null, null, null, "Engineer", "England", 100000), rows(null, null, null, null, "Artist", "USA", 70000), @@ -255,7 +278,8 @@ public void testComplexSemiJoin() throws IOException { executeQuery( String.format( "source = %s | where country = 'Canada' OR country = 'England' | left semi join" - + " left=a, right=b ON a.name = b.name %s | sort a.age", + + " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country," + + " state, month, year, age", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); verifySchema( actual, @@ -277,7 +301,8 @@ public void testComplexAntiJoin() throws IOException { executeQuery( String.format( "source = %s | where country = 'Canada' OR country = 'England' | left anti join" - + " left=a, right=b ON a.name = b.name %s | sort a.age", + + " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country," + + " state, month, year, age", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); verifySchema( actual, @@ -529,7 +554,7 @@ public void testCheckAccessTheReferenceByAliases() throws IOException { String.format( "source = %s as t1 | JOIN ON t1.name = t2.name %s as t2 | fields t1.name, t2.name", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString()); JSONObject res3 = @@ -550,9 +575,9 @@ public void testCheckAccessTheReferenceByAliases() throws IOException { "source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields" + " t1.name", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString()); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString()); } @@ -570,7 +595,7 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException { "source = %s | JOIN left = t1 ON t1.name = t2.name [ source = %s as t2 ] | fields" + " t1.name, t2.name", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString()); JSONObject res3 = @@ -591,9 +616,9 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException { "source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ]" + " as tt | fields tt.name", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString()); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString()); } @@ -617,9 +642,9 @@ public void testCheckAccessTheReferenceByOverrideAliases() throws IOException { "source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields" + " t1.name", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString()); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString()); } @@ -643,9 +668,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases() throws IOExce "source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt" + " | fields tt.name", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString()); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString()); } @@ -669,9 +694,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases2() throws IOExc "source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt" + " | fields t2.name", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION)); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString()); - assertJsonEquals( + assertJsonRowsEqualIgnoreOrder( res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString()); } @@ -698,7 +723,10 @@ public void testInnerJoinWithRelationSubquery() throws IOException { schema("avg(salary)", "double"), schema("age_span", "int"), schema("b.country", "string")); - verifyDataRowsInOrder(actual, rows(70000.0, 30, "USA"), rows(100000, 70, "England")); + // The final `stats ... by` output has no ORDER BY, so row order is unspecified — the + // analytics-engine route (RowProducingSink appends batches in arrival order) emits the two + // groups in a different order than the v2/Calcite path. Assert membership, not position. + verifyDataRows(actual, rows(70000.0, 30, "USA"), rows(100000, 70, "England")); } @Test @@ -1161,7 +1189,8 @@ public void testComplexSortPushDownForSMJWithMaxOptionAndFieldList() throws IOEx executeQuery( String.format( "source=%s | eval name2=substring(name, 2, 1) | join max=1 name2,age [ source=%s |" - + " eval name2=substring(state, 2, 1) ]", + + " eval name2=substring(state, 2, 1) ] | fields name, country, state, month," + + " year, age, name2", TEST_INDEX_STATE_COUNTRY, TEST_INDEX_STATE_COUNTRY)); verifySchema( actual, diff --git a/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java b/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java index 61ebc9a16e9..dc9ebc30de5 100644 --- a/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java @@ -24,6 +24,7 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -432,6 +433,29 @@ public static void assertJsonEquals(String expected, String actual) { JsonParser.parseString(eliminatePid(actual))); } + /** + * Compare two {@code datarows} JSON arrays as multisets — same rows, order ignored. Use when the + * test only asserts that two queries return the same set of rows (e.g. checking that two + * equivalent alias syntaxes produce the same result), not that they emit them in the same order. + * The analytics-engine (DataFusion) route does not guarantee the same row order as the v2/Calcite + * route, so a plain {@link #assertJsonEquals} on the serialized datarows is order-sensitive and + * flaky on that route; comparing as multisets asserts the intended equivalence without depending + * on output order. + */ + public static void assertJsonRowsEqualIgnoreOrder(String expectedRows, String actualRows) { + List expected = new ArrayList<>(); + new JSONArray(eliminatePid(expectedRows)) + .iterator() + .forEachRemaining(o -> expected.add(o.toString())); + List actual = new ArrayList<>(); + new JSONArray(eliminatePid(actualRows)) + .iterator() + .forEachRemaining(o -> actual.add(o.toString())); + expected.sort(Comparator.naturalOrder()); + actual.sort(Comparator.naturalOrder()); + assertEquals(expected, actual); + } + /** * Compare two JSON string are equals with ignoring the RelNode id in the Calcite plan. * Deprecated, use {@link #assertYamlEqualsIgnoreId(String, String)}