Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_HOBBIES;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_OCCUPATION;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STATE_COUNTRY;
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
import static org.opensearch.sql.util.MatcherUtils.assertJsonRowsEqualIgnoreOrder;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
Expand All @@ -32,29 +34,46 @@ public void init() throws Exception {
enableCalcite();
supportAllJoinTypes();

// Seed iff state_country is (re)created on this pass — kept in lockstep with loadIndex's own
// `if (!isIndexExist)` create-and-load guard. init() runs before every test method (@Before),
// and loadIndex skips the create+load when the index already exists, so the extra _doc/5..8
// PUTs below must run on exactly the same condition. A standalone "seed once" latch would
// desync from the index: whenever the framework wipes/recreates state_country between methods
// (loadIndex re-creates the 4 fixture rows), a latch already flipped true would skip re-seeding
// and leave the index at 4 rows, breaking the joins that expect 8. Gating on isIndexExist keeps
// seed and load together. On the analytics-engine route the parquet/composite store is
// append-only and does not overwrite by _id, so re-running these PUTs every method would
// accumulate duplicate rows and inflate join counts — this guard prevents that too.
boolean seedStateCountry = !isIndexExist(client(), TestsConstants.TEST_INDEX_STATE_COUNTRY);
loadIndex(Index.STATE_COUNTRY);
loadIndex(Index.OCCUPATION);
loadIndex(Index.HOBBIES);
Request request1 =
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
request1.setJsonEntity(
"{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
client().performRequest(request1);
Request request2 =
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true");
request2.setJsonEntity(
"{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
client().performRequest(request2);
Request request3 =
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true");
request3.setJsonEntity(
"{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
client().performRequest(request3);
Request request4 =
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true");
request4.setJsonEntity(
"{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}");
client().performRequest(request4);
if (seedStateCountry) {
Request request1 =
new Request(
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
request1.setJsonEntity(
"{\"name\":\"Jim\",\"age\":27,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
client().performRequest(request1);
Request request2 =
new Request(
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/6?refresh=true");
request2.setJsonEntity(
"{\"name\":\"Peter\",\"age\":57,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
client().performRequest(request2);
Request request3 =
new Request(
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/7?refresh=true");
request3.setJsonEntity(
"{\"name\":\"Rick\",\"age\":70,\"state\":\"B.C\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
client().performRequest(request3);
Request request4 =
new Request(
"PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/8?refresh=true");
request4.setJsonEntity(
"{\"name\":\"David\",\"age\":40,\"state\":\"Washington\",\"country\":\"USA\",\"year\":2023,\"month\":4}");
client().performRequest(request4);
}
}

@Test
Expand Down Expand Up @@ -239,7 +258,11 @@ public void testComplexRightJoin() throws IOException {
schema("occupation", "string"),
schema("b.country", "string"),
schema("salary", "int"));
verifyDataRowsInOrder(
// The four right-only rows all have a null a.age, so `sort a.age` leaves their relative order
// unspecified — DataFusion (analytics-engine route) breaks the tie differently than the
// v2/Calcite path. Assert membership rather than position; the four null-age rows and the two
// matched rows are all present.
verifyDataRows(
actual,
rows(null, null, null, null, "Engineer", "England", 100000),
rows(null, null, null, null, "Artist", "USA", 70000),
Expand All @@ -255,7 +278,8 @@ public void testComplexSemiJoin() throws IOException {
executeQuery(
String.format(
"source = %s | where country = 'Canada' OR country = 'England' | left semi join"
+ " left=a, right=b ON a.name = b.name %s | sort a.age",
+ " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country,"
+ " state, month, year, age",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
verifySchema(
actual,
Expand All @@ -277,7 +301,8 @@ public void testComplexAntiJoin() throws IOException {
executeQuery(
String.format(
"source = %s | where country = 'Canada' OR country = 'England' | left anti join"
+ " left=a, right=b ON a.name = b.name %s | sort a.age",
+ " left=a, right=b ON a.name = b.name %s | sort a.age | fields name, country,"
+ " state, month, year, age",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
verifySchema(
actual,
Expand Down Expand Up @@ -529,7 +554,7 @@ public void testCheckAccessTheReferenceByAliases() throws IOException {
String.format(
"source = %s as t1 | JOIN ON t1.name = t2.name %s as t2 | fields t1.name, t2.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());

JSONObject res3 =
Expand All @@ -550,9 +575,9 @@ public void testCheckAccessTheReferenceByAliases() throws IOException {
"source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields"
+ " t1.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString());
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString());
}

Expand All @@ -570,7 +595,7 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException {
"source = %s | JOIN left = t1 ON t1.name = t2.name [ source = %s as t2 ] | fields"
+ " t1.name, t2.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());

JSONObject res3 =
Expand All @@ -591,9 +616,9 @@ public void testCheckAccessTheReferenceBySubqueryAliases() throws IOException {
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ]"
+ " as tt | fields tt.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res3.getJSONArray("datarows").toString(), res4.getJSONArray("datarows").toString());
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res4.getJSONArray("datarows").toString(), res5.getJSONArray("datarows").toString());
}

Expand All @@ -617,9 +642,9 @@ public void testCheckAccessTheReferenceByOverrideAliases() throws IOException {
"source = %s as tt | JOIN left = t1 ON t1.name = t2.name %s as t2 | fields"
+ " t1.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
}

Expand All @@ -643,9 +668,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases() throws IOExce
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt"
+ " | fields tt.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
}

Expand All @@ -669,9 +694,9 @@ public void testCheckAccessTheReferenceByOverrideSubqueryAliases2() throws IOExc
"source = %s | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = %s ] as tt"
+ " | fields t2.name",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_OCCUPATION));
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res2.getJSONArray("datarows").toString());
assertJsonEquals(
assertJsonRowsEqualIgnoreOrder(
res1.getJSONArray("datarows").toString(), res3.getJSONArray("datarows").toString());
}

Expand All @@ -698,7 +723,10 @@ public void testInnerJoinWithRelationSubquery() throws IOException {
schema("avg(salary)", "double"),
schema("age_span", "int"),
schema("b.country", "string"));
verifyDataRowsInOrder(actual, rows(70000.0, 30, "USA"), rows(100000, 70, "England"));
// The final `stats ... by` output has no ORDER BY, so row order is unspecified — the
// analytics-engine route (RowProducingSink appends batches in arrival order) emits the two
// groups in a different order than the v2/Calcite path. Assert membership, not position.
verifyDataRows(actual, rows(70000.0, 30, "USA"), rows(100000, 70, "England"));
}

@Test
Expand Down Expand Up @@ -1161,7 +1189,8 @@ public void testComplexSortPushDownForSMJWithMaxOptionAndFieldList() throws IOEx
executeQuery(
String.format(
"source=%s | eval name2=substring(name, 2, 1) | join max=1 name2,age [ source=%s |"
+ " eval name2=substring(state, 2, 1) ]",
+ " eval name2=substring(state, 2, 1) ] | fields name, country, state, month,"
+ " year, age, name2",
TEST_INDEX_STATE_COUNTRY, TEST_INDEX_STATE_COUNTRY));
verifySchema(
actual,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -432,6 +433,29 @@ public static void assertJsonEquals(String expected, String actual) {
JsonParser.parseString(eliminatePid(actual)));
}

/**
* Compare two {@code datarows} JSON arrays as multisets — same rows, order ignored. Use when the
* test only asserts that two queries return the <em>same set of rows</em> (e.g. checking that two
* equivalent alias syntaxes produce the same result), not that they emit them in the same order.
* The analytics-engine (DataFusion) route does not guarantee the same row order as the v2/Calcite
* route, so a plain {@link #assertJsonEquals} on the serialized datarows is order-sensitive and
* flaky on that route; comparing as multisets asserts the intended equivalence without depending
* on output order.
*/
public static void assertJsonRowsEqualIgnoreOrder(String expectedRows, String actualRows) {
List<String> expected = new ArrayList<>();
new JSONArray(eliminatePid(expectedRows))
.iterator()
.forEachRemaining(o -> expected.add(o.toString()));
List<String> actual = new ArrayList<>();
new JSONArray(eliminatePid(actualRows))
.iterator()
.forEachRemaining(o -> actual.add(o.toString()));
expected.sort(Comparator.naturalOrder());
actual.sort(Comparator.naturalOrder());
assertEquals(expected, actual);
}

/**
* Compare two JSON string are equals with ignoring the RelNode id in the Calcite plan.
* Deprecated, use {@link #assertYamlEqualsIgnoreId(String, String)}
Expand Down
Loading