diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java
index 9aa9b02268..cdefe15155 100644
--- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java
+++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLJoinIT.java
@@ -5,10 +5,12 @@
package org.opensearch.sql.calcite.remote;
+import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_HOBBIES;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_OCCUPATION;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY;
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
+import static org.opensearch.sql.util.MatcherUtils.assertJsonRowsEqualIgnoreOrder;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
@@ -32,29 +34,46 @@ public void init() throws Exception {
enableCalcite();
supportAllJoinTypes();
+ // Seed iff state_country is (re)created on this pass — kept in lockstep with loadIndex's own
+ // `if (!isIndexExist)` create-and-load guard. init() runs before every test method (@Before),
+ // and loadIndex skips the create+load when the index already exists, so the extra _doc/5..8
+ // PUTs below must run on exactly the same condition. A standalone "seed once" latch would
+ // desync from the index: whenever the framework wipes/recreates state_country between methods
+ // (loadIndex re-creates the 4 fixture rows), a latch already flipped true would skip re-seeding
+ // and leave the index at 4 rows, breaking the joins that expect 8. Gating on isIndexExist keeps
+ // seed and load together. On the analytics-engine route the parquet/composite store is
+ // append-only and does not overwrite by _id, so re-running these PUTs every method would
+ // accumulate duplicate rows and inflate join counts — this guard prevents that too.
+ boolean seedStateCountry = !isIndexExist(client(), TestsConstants.TEST_INDEX_STATE_COUNTRY);
loadIndex(Index.STATE_COUNTRY);
loadIndex(Index.OCCUPATION);
loadIndex(Index.HOBBIES);
- Request request1 =
- new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
- request1.setJsonEntity(
- "{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
- client().performRequest(request1);
- Request request2 =
- new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true");
- request2.setJsonEntity(
- "{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
- client().performRequest(request2);
- Request request3 =
- new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true");
- request3.setJsonEntity(
- "{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
- client().performRequest(request3);
- Request request4 =
- new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true");
- request4.setJsonEntity(
- "{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}");
- client().performRequest(request4);
+ if (seedStateCountry) {
+ Request request1 =
+ new Request(
+ "PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
+ request1.setJsonEntity(
+ "{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
+ client().performRequest(request1);
+ Request request2 =
+ new Request(
+ "PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true");
+ request2.setJsonEntity(
+ "{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
+ client().performRequest(request2);
+ Request request3 =
+ new Request(
+ "PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true");
+ request3.setJsonEntity(
+ "{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
+ client().performRequest(request3);
+ Request request4 =
+ new Request(
+ "PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true");
+ request4.setJsonEntity(
+ "{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}");
+ client().performRequest(request4);
+ }
}
@Test
@@ -239,7 +258,11 @@ public void testComplexRightJoin() throws IOException {
schema("occupation", "string"),
schema("b.country", "string"),
schema("salary", "int"));
- verifyDataRowsInOrder(
+ // The four right-only rows all have a null a.age, so `sort a.age` leaves their relative order
+ // unspecified — DataFusion (analytics-engine route) breaks the tie differently than the
+ // v2/Calcite path. Assert membership rather than position; the four null-age rows and the two
+ // matched rows are all present.
+ verifyDataRows(
actual,
rows(null, null, null, null, "Engineer", "England", 100000),
rows(null, null, null, null, "Artist", "USA", 70000),
@@ -255,7 +278,8 @@ public void testComplexSemiJoin() throws IOException {
executeQuery(
String.format(
"source = %s | where country = 'Canada' OR country = 'England' | left semi join"
- + " left=a, right=b ON a.name = b.name %s | sort a.age",
+ + " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country,"
+ + " state, month, year, age",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
verifySchema(
actual,
@@ -277,7 +301,8 @@ public void testComplexAntiJoin() throws IOException {
executeQuery(
String.format(
"source = %s | where country = 'Canada' OR country = 'England' | left anti join"
- + " left=a, right=b ON a.name = b.name %s | sort a.age",
+ + " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country,"
+ + " state, month, year, age",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
verifySchema(
actual,
@@ -529,7 +554,7 @@ public void testCheckAccessTheReferenceByAliases() throws IOException {
String.format(
"source = %s as t1 | JOIN ON t1.name = t2.name %s as t2 | fields t1.name, t2.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
JSONObject res3 =
@@ -550,9 +575,9 @@ public void testCheckAccessTheReferenceByAliases() throws IOException {
"source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields"
+ " t1.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString());
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString());
}
@@ -570,7 +595,7 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException {
"source = %s | JOIN left = t1 ON t1.name = t2.name [ source = %s as t2 ] | fields"
+ " t1.name, t2.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
JSONObject res3 =
@@ -591,9 +616,9 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException {
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ]"
+ " as tt | fields tt.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString());
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString());
}
@@ -617,9 +642,9 @@ public void testCheckAccessTheReferenceByOverrideAliases() throws IOException {
"source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields"
+ " t1.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
}
@@ -643,9 +668,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases() throws IOExce
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt"
+ " | fields tt.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
}
@@ -669,9 +694,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases2() throws IOExc
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt"
+ " | fields t2.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
- assertJsonEquals(
+ assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
}
@@ -698,7 +723,10 @@ public void testInnerJoinWithRelationSubquery() throws IOException {
schema("avg(salary)", "double"),
schema("age_span", "int"),
schema("b.country", "string"));
- verifyDataRowsInOrder(actual, rows(70000.0, 30, "USA"), rows(100000, 70, "England"));
+ // The final `stats ... by` output has no ORDER BY, so row order is unspecified — the
+ // analytics-engine route (RowProducingSink appends batches in arrival order) emits the two
+ // groups in a different order than the v2/Calcite path. Assert membership, not position.
+ verifyDataRows(actual, rows(70000.0, 30, "USA"), rows(100000, 70, "England"));
}
@Test
@@ -1161,7 +1189,8 @@ public void testComplexSortPushDownForSMJWithMaxOptionAndFieldList() throws IOEx
executeQuery(
String.format(
"source=%s | eval name2=substring(name, 2, 1) | join max=1 name2,age [ source=%s |"
- + " eval name2=substring(state, 2, 1) ]",
+ + " eval name2=substring(state, 2, 1) ] | fields name, country, state, month,"
+ + " year, age, name2",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_STATE_COUNTRY));
verifySchema(
actual,
diff --git a/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java b/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java
index 61ebc9a16e..dc9ebc30de 100644
--- a/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java
+++ b/integ-test/src/test/java/org/opensearch/sql/util/MatcherUtils.java
@@ -24,6 +24,7 @@
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -432,6 +433,29 @@ public static void assertJsonEquals(String expected, String actual) {
JsonParser.parseString(eliminatePid(actual)));
}
+ /**
+ * Compare two {@code datarows} JSON arrays as multisets — same rows, order ignored. Use when the
+ * test only asserts that two queries return the same set of rows (e.g. checking that two
+ * equivalent alias syntaxes produce the same result), not that they emit them in the same order.
+ * The analytics-engine (DataFusion) route does not guarantee the same row order as the v2/Calcite
+ * route, so a plain {@link #assertJsonEquals} on the serialized datarows is order-sensitive and
+ * flaky on that route; comparing as multisets asserts the intended equivalence without depending
+ * on output order.
+ */
+ public static void assertJsonRowsEqualIgnoreOrder(String expectedRows, String actualRows) {
+ List expected = new ArrayList<>();
+ new JSONArray(eliminatePid(expectedRows))
+ .iterator()
+ .forEachRemaining(o -> expected.add(o.toString()));
+ List actual = new ArrayList<>();
+ new JSONArray(eliminatePid(actualRows))
+ .iterator()
+ .forEachRemaining(o -> actual.add(o.toString()));
+ expected.sort(Comparator.naturalOrder());
+ actual.sort(Comparator.naturalOrder());
+ assertEquals(expected, actual);
+ }
+
/**
* Compare two JSON string are equals with ignoring the RelNode id in the Calcite plan.
* Deprecated, use {@link #assertYamlEqualsIgnoreId(String, String)}