Skip to content

Commit 5960ca0

Browse files
committed
Bring CalcitePPLJoinIT to parity on the analytics-engine route
CalcitePPLJoinIT failed on the analytics-engine route (parquet/composite store + DataFusion backend, -Dtests.analytics.parquet_indices=true) for three distinct reasons, none of which are real query defects. This brings the class to parity without weakening the assertions. 1. Shared-index pollution from a non-idempotent seed (the big one). init() runs before every test method (@before) and, with preserveClusterUponCompletion()=true, the state_country index is created once and reused across all methods. init() unconditionally PUT _doc/5..8 after loadIndex(). On the standard route those PUTs overwrite by _id and are harmless to repeat; on the analytics-engine route the parquet/composite store is append-only and does not overwrite by _id, so every method's init() appended 4 more duplicate rows. The shared index grew unboundedly and joins over it inflated (e.g. expected 6, got 60; self-joins far worse). Guard the seed with a static flag so it runs once per class load — the standard route ends at the same stable 8-row state_country it always did. 2. Column ordering. The analytics-engine route builds its scan schema from the serialized index mapping (getSourceAsMap), which OpenSearch returns in alphabetical field order, whereas the v2/Calcite path preserves declared order. Field-list/implicit-projection joins therefore returned the right rows with columns in a different order. Add explicit `| fields ...` to pin the projection order for the affected tests (testComplexSemiJoin, testComplexAntiJoin, testComplexSortPushDownForSMJWithMaxOptionAndFieldList). 3. Row ordering. The analytics-engine coordinator-reduce (RowProducingSink) appends Arrow batches in arrival order from the SEARCH-threadpool response handlers, so a query without ORDER BY has no guaranteed row order — unlike Calcite's deterministic enumerable execution. Two cases: - testComplexRightJoin sorts by a column that is null for the right-only rows, leaving their relative order unspecified; switch verifyDataRowsInOrder -> verifyDataRows. - The testCheckAccessTheReference* tests compare two alias-syntax variants to each other via assertJsonEquals on serialized datarows, which is order-sensitive. They only mean to assert the two variants return the same set of rows. Add MatcherUtils.assertJsonRowsEqualIgnoreOrder (multiset compare) and use it for those comparisons. Verified: standard route 43/43 pass (no regression); analytics-engine route drops from 33 failures to only the remaining text-equality-filter cases, which are a separate route limitation tracked elsewhere. Signed-off-by: Songkan Tang <songkant@amazon.com>
1 parent a50d81a commit 5960ca0

2 files changed

Lines changed: 88 additions & 36 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java

Lines changed: 64 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_OCCUPATION;
1010
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY;
1111
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
12+
import static org.opensearch.sql.util.MatcherUtils.assertJsonRowsEqualIgnoreOrder;
1213
import static org.opensearch.sql.util.MatcherUtils.rows;
1314
import static org.opensearch.sql.util.MatcherUtils.schema;
1415
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
@@ -26,6 +27,19 @@
2627

2728
public class CalcitePPLJoinIT extends PPLIntegTestCase {
2829

30+
/**
31+
* Guards the one-time seeding of extra docs (_id 5..8) into the shared {@code state_country}
32+
* index below. {@link #init()} runs before every test method (@Before), and {@code
33+
* preserveClusterUponCompletion()} keeps indices alive across methods, so the index is created
34+
* once and reused. On the standard route these PUTs overwrite by {@code _id} and are harmless to
35+
* repeat, but on the analytics-engine route the parquet/composite store is append-only and does
36+
* not overwrite by {@code _id} — re-running them every method accumulates duplicate rows, which
37+
* inflates downstream join row counts. Seeding once keeps both routes at a stable 8-row {@code
38+
* state_country}. Static so it is set once per class load and reset implicitly when the class
39+
* (and its preserved indices, wiped @AfterClass) goes away.
40+
*/
41+
private static boolean stateCountrySeeded = false;
42+
2943
@Override
3044
public void init() throws Exception {
3145
super.init();
@@ -35,26 +49,33 @@ public void init() throws Exception {
3549
loadIndex(Index.STATE_COUNTRY);
3650
loadIndex(Index.OCCUPATION);
3751
loadIndex(Index.HOBBIES);
38-
Request request1 =
39-
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
40-
request1.setJsonEntity(
41-
"{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
42-
client().performRequest(request1);
43-
Request request2 =
44-
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true");
45-
request2.setJsonEntity(
46-
"{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
47-
client().performRequest(request2);
48-
Request request3 =
49-
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true");
50-
request3.setJsonEntity(
51-
"{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
52-
client().performRequest(request3);
53-
Request request4 =
54-
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true");
55-
request4.setJsonEntity(
56-
"{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}");
57-
client().performRequest(request4);
52+
if (!stateCountrySeeded) {
53+
Request request1 =
54+
new Request(
55+
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
56+
request1.setJsonEntity(
57+
"{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
58+
client().performRequest(request1);
59+
Request request2 =
60+
new Request(
61+
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true");
62+
request2.setJsonEntity(
63+
"{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
64+
client().performRequest(request2);
65+
Request request3 =
66+
new Request(
67+
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true");
68+
request3.setJsonEntity(
69+
"{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
70+
client().performRequest(request3);
71+
Request request4 =
72+
new Request(
73+
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true");
74+
request4.setJsonEntity(
75+
"{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}");
76+
client().performRequest(request4);
77+
stateCountrySeeded = true;
78+
}
5879
}
5980

6081
@Test
@@ -239,7 +260,11 @@ public void testComplexRightJoin() throws IOException {
239260
schema("occupation", "string"),
240261
schema("b.country", "string"),
241262
schema("salary", "int"));
242-
verifyDataRowsInOrder(
263+
// The four right-only rows all have a null a.age, so `sort a.age` leaves their relative order
264+
// unspecified — DataFusion (analytics-engine route) breaks the tie differently than the
265+
// v2/Calcite path. Assert membership rather than position; the four null-age rows and the two
266+
// matched rows are all present.
267+
verifyDataRows(
243268
actual,
244269
rows(null, null, null, null, "Engineer", "England", 100000),
245270
rows(null, null, null, null, "Artist", "USA", 70000),
@@ -255,7 +280,8 @@ public void testComplexSemiJoin() throws IOException {
255280
executeQuery(
256281
String.format(
257282
"source = %s | where country = 'Canada' OR country = 'England' | left semi join"
258-
+ " left=a, right=b ON a.name = b.name %s | sort a.age",
283+
+ " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country,"
284+
+ " state, month, year, age",
259285
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
260286
verifySchema(
261287
actual,
@@ -277,7 +303,8 @@ public void testComplexAntiJoin() throws IOException {
277303
executeQuery(
278304
String.format(
279305
"source = %s | where country = 'Canada' OR country = 'England' | left anti join"
280-
+ " left=a, right=b ON a.name = b.name %s | sort a.age",
306+
+ " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country,"
307+
+ " state, month, year, age",
281308
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
282309
verifySchema(
283310
actual,
@@ -529,7 +556,7 @@ public void testCheckAccessTheReferenceByAliases() throws IOException {
529556
String.format(
530557
"source = %s as t1 | JOIN ON t1.name = t2.name %s as t2 | fields t1.name, t2.name",
531558
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
532-
assertJsonEquals(
559+
assertJsonRowsEqualIgnoreOrder(
533560
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
534561

535562
JSONObject res3 =
@@ -550,9 +577,9 @@ public void testCheckAccessTheReferenceByAliases() throws IOException {
550577
"source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields"
551578
+ " t1.name",
552579
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
553-
assertJsonEquals(
580+
assertJsonRowsEqualIgnoreOrder(
554581
res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString());
555-
assertJsonEquals(
582+
assertJsonRowsEqualIgnoreOrder(
556583
res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString());
557584
}
558585

@@ -570,7 +597,7 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException {
570597
"source = %s | JOIN left = t1 ON t1.name = t2.name [ source = %s as t2 ] | fields"
571598
+ " t1.name, t2.name",
572599
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
573-
assertJsonEquals(
600+
assertJsonRowsEqualIgnoreOrder(
574601
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
575602

576603
JSONObject res3 =
@@ -591,9 +618,9 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException {
591618
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ]"
592619
+ " as tt | fields tt.name",
593620
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
594-
assertJsonEquals(
621+
assertJsonRowsEqualIgnoreOrder(
595622
res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString());
596-
assertJsonEquals(
623+
assertJsonRowsEqualIgnoreOrder(
597624
res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString());
598625
}
599626

@@ -617,9 +644,9 @@ public void testCheckAccessTheReferenceByOverrideAliases() throws IOException {
617644
"source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields"
618645
+ " t1.name",
619646
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
620-
assertJsonEquals(
647+
assertJsonRowsEqualIgnoreOrder(
621648
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
622-
assertJsonEquals(
649+
assertJsonRowsEqualIgnoreOrder(
623650
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
624651
}
625652

@@ -643,9 +670,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases() throws IOExce
643670
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt"
644671
+ " | fields tt.name",
645672
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
646-
assertJsonEquals(
673+
assertJsonRowsEqualIgnoreOrder(
647674
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
648-
assertJsonEquals(
675+
assertJsonRowsEqualIgnoreOrder(
649676
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
650677
}
651678

@@ -669,9 +696,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases2() throws IOExc
669696
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt"
670697
+ " | fields t2.name",
671698
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
672-
assertJsonEquals(
699+
assertJsonRowsEqualIgnoreOrder(
673700
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
674-
assertJsonEquals(
701+
assertJsonRowsEqualIgnoreOrder(
675702
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
676703
}
677704

@@ -1161,7 +1188,8 @@ public void testComplexSortPushDownForSMJWithMaxOptionAndFieldList() throws IOEx
11611188
executeQuery(
11621189
String.format(
11631190
"source=%s | eval name2=substring(name, 2, 1) | join max=1 name2,age [ source=%s |"
1164-
+ " eval name2=substring(state, 2, 1) ]",
1191+
+ " eval name2=substring(state, 2, 1) ] | fields name, country, state, month,"
1192+
+ " year, age, name2",
11651193
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_STATE_COUNTRY));
11661194
verifySchema(
11671195
actual,

integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.math.BigDecimal;
2525
import java.util.ArrayList;
2626
import java.util.Arrays;
27+
import java.util.Comparator;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.function.Function;
@@ -432,6 +433,29 @@ public static void assertJsonEquals(String expected, String actual) {
432433
JsonParser.parseString(eliminatePid(actual)));
433434
}
434435

436+
/**
437+
* Compare two {@code datarows} JSON arrays as multisets — same rows, order ignored. Use when the
438+
* test only asserts that two queries return the <em>same set of rows</em> (e.g. checking that two
439+
* equivalent alias syntaxes produce the same result), not that they emit them in the same order.
440+
* The analytics-engine (DataFusion) route does not guarantee the same row order as the v2/Calcite
441+
* route, so a plain {@link #assertJsonEquals} on the serialized datarows is order-sensitive and
442+
* flaky on that route; comparing as multisets asserts the intended equivalence without depending
443+
* on output order.
444+
*/
445+
public static void assertJsonRowsEqualIgnoreOrder(String expectedRows, String actualRows) {
446+
List<String> expected = new ArrayList<>();
447+
new JSONArray(eliminatePid(expectedRows))
448+
.iterator()
449+
.forEachRemaining(o -> expected.add(o.toString()));
450+
List<String> actual = new ArrayList<>();
451+
new JSONArray(eliminatePid(actualRows))
452+
.iterator()
453+
.forEachRemaining(o -> actual.add(o.toString()));
454+
expected.sort(Comparator.naturalOrder());
455+
actual.sort(Comparator.naturalOrder());
456+
assertEquals(expected, actual);
457+
}
458+
435459
/**
436460
* Compare two JSON string are equals with ignoring the RelNode id in the Calcite plan.
437461
* Deprecated, use {@link #assertYamlEqualsIgnoreId(String, String)}

0 commit comments

Comments
 (0)