Skip to content

Commit 396723b

Browse files
authored
Bring CalcitePPLJoinIT to parity on the analytics-engine route (#5554)
CalcitePPLJoinIT failed on the analytics-engine route (parquet/composite store + DataFusion backend, -Dtests.analytics.parquet_indices=true) for three distinct reasons, none of which are real query defects. This brings the class to parity without weakening the assertions. 1. Non-idempotent seed inflated/diverged the shared state_country index. init() runs before every test method (@before). After loadIndex() it unconditionally PUT _doc/5..8 to grow state_country from 4 to 8 rows. On the analytics-engine route the parquet/composite store is append-only and does not overwrite by _id, so re-running those PUTs every method accumulated duplicate rows and inflated downstream join counts (e.g. expected 6, got 60). Make the seed conditional, gated on the SAME `isIndexExist` check loadIndex uses: seed exactly when loadIndex (re)creates state_country, so seed and load stay in lockstep. (An earlier attempt used a static "seed once" latch, but that desyncs from the index — when the framework recreates state_country with only the 4 fixture rows, a latch already flipped true skips re-seeding and leaves the index at 4 rows, which broke the in-memory integTest on macOS/Windows CI. Gating on isIndexExist is correct regardless of the cluster's per-method index lifecycle.) 2. Column ordering. The analytics-engine route builds its scan schema from the serialized index mapping (getSourceAsMap), which OpenSearch returns in alphabetical field order, whereas the v2/Calcite path preserves declared order. Field-list/implicit-projection joins therefore returned the right rows with columns in a different order. Add explicit `| fields ...` to pin the projection order for the affected tests (testComplexSemiJoin, testComplexAntiJoin, testComplexSortPushDownForSMJWithMaxOptionAndFieldList). 3. Row ordering. The analytics-engine coordinator-reduce (RowProducingSink) appends Arrow batches in arrival order from the SEARCH-threadpool response handlers, so a query without ORDER BY has no guaranteed row order — unlike Calcite's deterministic enumerable execution. Cases: - testComplexRightJoin sorts by a column that is null for the right-only rows, leaving their relative order unspecified; switch verifyDataRowsInOrder -> verifyDataRows. - testInnerJoinWithRelationSubquery ends in `stats ... by` with no ORDER BY, so the two output groups come back in a route-dependent order (flaky); switch verifyDataRowsInOrder -> verifyDataRows. - The testCheckAccessTheReference* tests compare two alias-syntax variants to each other via assertJsonEquals on serialized datarows, which is order-sensitive. They only mean to assert the two variants return the same set of rows. Add MatcherUtils.assertJsonRowsEqualIgnoreOrder (multiset compare) and use it for those comparisons. Verified: standard route (integTest, in-memory) 43/43 pass; analytics-engine route drops from 33 failures to only the two remaining exact-equality-on-bare- text cases (testJoinComparing, testJoinSubsearchMaxOut), a separate route limitation (DYNAMIC_STRING_NO_KEYWORD) tracked elsewhere. Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent a50d81a commit 396723b

2 files changed

Lines changed: 90 additions & 37 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java

Lines changed: 66 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
89
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_HOBBIES;
910
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_OCCUPATION;
1011
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY;
1112
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
13+
import static org.opensearch.sql.util.MatcherUtils.assertJsonRowsEqualIgnoreOrder;
1214
import static org.opensearch.sql.util.MatcherUtils.rows;
1315
import static org.opensearch.sql.util.MatcherUtils.schema;
1416
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
@@ -32,29 +34,46 @@ public void init() throws Exception {
3234
enableCalcite();
3335
supportAllJoinTypes();
3436

37+
// Seed iff state_country is (re)created on this pass — kept in lockstep with loadIndex's own
38+
// `if (!isIndexExist)` create-and-load guard. init() runs before every test method (@Before),
39+
// and loadIndex skips the create+load when the index already exists, so the extra _doc/5..8
40+
// PUTs below must run on exactly the same condition. A standalone "seed once" latch would
41+
// desync from the index: whenever the framework wipes/recreates state_country between methods
42+
// (loadIndex re-creates the 4 fixture rows), a latch already flipped true would skip re-seeding
43+
// and leave the index at 4 rows, breaking the joins that expect 8. Gating on isIndexExist keeps
44+
// seed and load together. On the analytics-engine route the parquet/composite store is
45+
// append-only and does not overwrite by _id, so re-running these PUTs every method would
46+
// accumulate duplicate rows and inflate join counts — this guard prevents that too.
47+
boolean seedStateCountry = !isIndexExist(client(), TestsConstants.TEST_INDEX_STATE_COUNTRY);
3548
loadIndex(Index.STATE_COUNTRY);
3649
loadIndex(Index.OCCUPATION);
3750
loadIndex(Index.HOBBIES);
38-
Request request1 =
39-
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
40-
request1.setJsonEntity(
41-
"{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
42-
client().performRequest(request1);
43-
Request request2 =
44-
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true");
45-
request2.setJsonEntity(
46-
"{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
47-
client().performRequest(request2);
48-
Request request3 =
49-
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true");
50-
request3.setJsonEntity(
51-
"{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
52-
client().performRequest(request3);
53-
Request request4 =
54-
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true");
55-
request4.setJsonEntity(
56-
"{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}");
57-
client().performRequest(request4);
51+
if (seedStateCountry) {
52+
Request request1 =
53+
new Request(
54+
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
55+
request1.setJsonEntity(
56+
"{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
57+
client().performRequest(request1);
58+
Request request2 =
59+
new Request(
60+
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true");
61+
request2.setJsonEntity(
62+
"{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
63+
client().performRequest(request2);
64+
Request request3 =
65+
new Request(
66+
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true");
67+
request3.setJsonEntity(
68+
"{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
69+
client().performRequest(request3);
70+
Request request4 =
71+
new Request(
72+
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true");
73+
request4.setJsonEntity(
74+
"{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}");
75+
client().performRequest(request4);
76+
}
5877
}
5978

6079
@Test
@@ -239,7 +258,11 @@ public void testComplexRightJoin() throws IOException {
239258
schema("occupation", "string"),
240259
schema("b.country", "string"),
241260
schema("salary", "int"));
242-
verifyDataRowsInOrder(
261+
// The four right-only rows all have a null a.age, so `sort a.age` leaves their relative order
262+
// unspecified — DataFusion (analytics-engine route) breaks the tie differently than the
263+
// v2/Calcite path. Assert membership rather than position; the four null-age rows and the two
264+
// matched rows are all present.
265+
verifyDataRows(
243266
actual,
244267
rows(null, null, null, null, "Engineer", "England", 100000),
245268
rows(null, null, null, null, "Artist", "USA", 70000),
@@ -255,7 +278,8 @@ public void testComplexSemiJoin() throws IOException {
255278
executeQuery(
256279
String.format(
257280
"source = %s | where country = 'Canada' OR country = 'England' | left semi join"
258-
+ " left=a, right=b ON a.name = b.name %s | sort a.age",
281+
+ " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country,"
282+
+ " state, month, year, age",
259283
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
260284
verifySchema(
261285
actual,
@@ -277,7 +301,8 @@ public void testComplexAntiJoin() throws IOException {
277301
executeQuery(
278302
String.format(
279303
"source = %s | where country = 'Canada' OR country = 'England' | left anti join"
280-
+ " left=a, right=b ON a.name = b.name %s | sort a.age",
304+
+ " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country,"
305+
+ " state, month, year, age",
281306
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
282307
verifySchema(
283308
actual,
@@ -529,7 +554,7 @@ public void testCheckAccessTheReferenceByAliases() throws IOException {
529554
String.format(
530555
"source = %s as t1 | JOIN ON t1.name = t2.name %s as t2 | fields t1.name, t2.name",
531556
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
532-
assertJsonEquals(
557+
assertJsonRowsEqualIgnoreOrder(
533558
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
534559

535560
JSONObject res3 =
@@ -550,9 +575,9 @@ public void testCheckAccessTheReferenceByAliases() throws IOException {
550575
"source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields"
551576
+ " t1.name",
552577
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
553-
assertJsonEquals(
578+
assertJsonRowsEqualIgnoreOrder(
554579
res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString());
555-
assertJsonEquals(
580+
assertJsonRowsEqualIgnoreOrder(
556581
res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString());
557582
}
558583

@@ -570,7 +595,7 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException {
570595
"source = %s | JOIN left = t1 ON t1.name = t2.name [ source = %s as t2 ] | fields"
571596
+ " t1.name, t2.name",
572597
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
573-
assertJsonEquals(
598+
assertJsonRowsEqualIgnoreOrder(
574599
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
575600

576601
JSONObject res3 =
@@ -591,9 +616,9 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException {
591616
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ]"
592617
+ " as tt | fields tt.name",
593618
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
594-
assertJsonEquals(
619+
assertJsonRowsEqualIgnoreOrder(
595620
res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString());
596-
assertJsonEquals(
621+
assertJsonRowsEqualIgnoreOrder(
597622
res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString());
598623
}
599624

@@ -617,9 +642,9 @@ public void testCheckAccessTheReferenceByOverrideAliases() throws IOException {
617642
"source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields"
618643
+ " t1.name",
619644
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
620-
assertJsonEquals(
645+
assertJsonRowsEqualIgnoreOrder(
621646
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
622-
assertJsonEquals(
647+
assertJsonRowsEqualIgnoreOrder(
623648
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
624649
}
625650

@@ -643,9 +668,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases() throws IOExce
643668
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt"
644669
+ " | fields tt.name",
645670
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
646-
assertJsonEquals(
671+
assertJsonRowsEqualIgnoreOrder(
647672
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
648-
assertJsonEquals(
673+
assertJsonRowsEqualIgnoreOrder(
649674
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
650675
}
651676

@@ -669,9 +694,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases2() throws IOExc
669694
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt"
670695
+ " | fields t2.name",
671696
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
672-
assertJsonEquals(
697+
assertJsonRowsEqualIgnoreOrder(
673698
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
674-
assertJsonEquals(
699+
assertJsonRowsEqualIgnoreOrder(
675700
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
676701
}
677702

@@ -698,7 +723,10 @@ public void testInnerJoinWithRelationSubquery() throws IOException {
698723
schema("avg(salary)", "double"),
699724
schema("age_span", "int"),
700725
schema("b.country", "string"));
701-
verifyDataRowsInOrder(actual, rows(70000.0, 30, "USA"), rows(100000, 70, "England"));
726+
// The final `stats ... by` output has no ORDER BY, so row order is unspecified — the
727+
// analytics-engine route (RowProducingSink appends batches in arrival order) emits the two
728+
// groups in a different order than the v2/Calcite path. Assert membership, not position.
729+
verifyDataRows(actual, rows(70000.0, 30, "USA"), rows(100000, 70, "England"));
702730
}
703731

704732
@Test
@@ -1161,7 +1189,8 @@ public void testComplexSortPushDownForSMJWithMaxOptionAndFieldList() throws IOEx
11611189
executeQuery(
11621190
String.format(
11631191
"source=%s | eval name2=substring(name, 2, 1) | join max=1 name2,age [ source=%s |"
1164-
+ " eval name2=substring(state, 2, 1) ]",
1192+
+ " eval name2=substring(state, 2, 1) ] | fields name, country, state, month,"
1193+
+ " year, age, name2",
11651194
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_STATE_COUNTRY));
11661195
verifySchema(
11671196
actual,

integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.math.BigDecimal;
2525
import java.util.ArrayList;
2626
import java.util.Arrays;
27+
import java.util.Comparator;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.function.Function;
@@ -432,6 +433,29 @@ public static void assertJsonEquals(String expected, String actual) {
432433
JsonParser.parseString(eliminatePid(actual)));
433434
}
434435

436+
/**
437+
* Compare two {@code datarows} JSON arrays as multisets — same rows, order ignored. Use when the
438+
* test only asserts that two queries return the <em>same set of rows</em> (e.g. checking that two
439+
* equivalent alias syntaxes produce the same result), not that they emit them in the same order.
440+
* The analytics-engine (DataFusion) route does not guarantee the same row order as the v2/Calcite
441+
* route, so a plain {@link #assertJsonEquals} on the serialized datarows is order-sensitive and
442+
* flaky on that route; comparing as multisets asserts the intended equivalence without depending
443+
* on output order.
444+
*/
445+
public static void assertJsonRowsEqualIgnoreOrder(String expectedRows, String actualRows) {
446+
List<String> expected = new ArrayList<>();
447+
new JSONArray(eliminatePid(expectedRows))
448+
.iterator()
449+
.forEachRemaining(o -> expected.add(o.toString()));
450+
List<String> actual = new ArrayList<>();
451+
new JSONArray(eliminatePid(actualRows))
452+
.iterator()
453+
.forEachRemaining(o -> actual.add(o.toString()));
454+
expected.sort(Comparator.naturalOrder());
455+
actual.sort(Comparator.naturalOrder());
456+
assertEquals(expected, actual);
457+
}
458+
435459
/**
436460
* Compare two JSON string are equals with ignoring the RelNode id in the Calcite plan.
437461
* Deprecated, use {@link #assertYamlEqualsIgnoreId(String, String)}

0 commit comments

Comments
 (0)