Skip to content

Commit e99aff0

Browse files
authored
Stabilize CalciteStreamstatsCommandIT on the analytics-engine route (#5582)
Gate the streamstats ITs that diverge on the analytics-engine route (parquet-backed composite store, DataFusion backend) so the route runs green, while keeping every test active on the v2/Calcite path. Mechanism follows the established capability-gating pattern (#5560): an in-test @RequiresCapability(...) annotation plus a matching integTestRemote excludeTestsMatching entry; both are no-ops on the v2 route. Triaged single-shard and multi-shard (num_shards=3) analytics runs against the v2 baseline. Three groups: - DOC_MUTATION (4 tests): testStreamstatsGlobalWithNull, testStreamstatsGlobalWithNullBucket, testStreamstatsResetWithNull, testStreamstatsResetWithNullBucket seed state via PUT+DELETE. Doc-level DELETE is unsupported on the parquet store, and same-_id PUT is append-only, so the leaked doc inflated the row counts of every sibling test that reads the shared index. Gated with the same DOC_MUTATION capability the three existing mutation tests already carry. - CHAINED_STREAMSTATS_BY (4 tests): chaining two streamstats where an upstream stage partitions `by` a group emits a ROW_NUMBER() sequence column from each stage; the Substrait converter names both physical columns identically, so the stacked schema has a duplicate/ambiguous field name (500) or, for chained window streamstats, non-deterministic values. The Calcite logical plan is correct; the alias is lost in Substrait conversion. Fails single- and multi-shard. - STREAMSTATS_SORT_NOT_HONORED (1 test): testStreamstatsAndSort. The window is computed over the backend scan order, ignoring a preceding `| sort` (the OVER clause carries no explicit ORDER BY), so the per-row aggregates diverge from the v2/Calcite path. Pass rate on the single-shard analytics route, CalciteStreamstatsCommandIT: | metric | before | after | |-----------|--------|-------| | tests run | 47 | 42 | | failures | 12 | 0 | | skipped | 3 | 7 | (The before-failures count is inflated by the DOC_MUTATION leak described above; the four leaking tests plus their downstream row-count victims all clear once gated.) v2 route (:integTest): 47 run, 0 failed, 0 skipped — gates are no-ops off the analytics route. Twelve further tests fail only on the multi-shard route (they pass single-shard) due to cross-shard fragment-order non-determinism in the streamstats window gather; that is an engine-side gap and is left unchanged here rather than gated, to avoid skipping passing tests on the single-shard route. Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 0b96cfc commit e99aff0

3 files changed

Lines changed: 56 additions & 1 deletion

File tree

integ-test/build.gradle

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,6 +1328,20 @@ task integTestRemote(type: RestIntegTestTask) {
13281328

13291329
// === Excludes: asserts a Lucene pushdown fragment absent on the AE route ===
13301330
excludeTestsMatching '*CalciteSortCommandIT.testPushdownSortCastToDoubleExpression'
1331+
1332+
// === Excludes: CalciteStreamstatsCommandIT route divergences ===
1333+
// Each test also carries an in-test @RequiresCapability(...) recording the reason.
1334+
// - CHAINED_STREAMSTATS_BY: chaining two streamstats where an upstream stage has `by`
1335+
// emits two ROW_NUMBER() sequence columns the Substrait converter names identically,
1336+
// so the stacked schema has a duplicate/ambiguous field name (500) or, for chained
1337+
// window streamstats, non-deterministic values. Fails single- and multi-shard.
1338+
excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstats'
1339+
excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithWindow'
1340+
excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithNull1'
1341+
excludeTestsMatching '*CalciteStreamstatsCommandIT.testMultipleStreamstatsWithEval'
1342+
// - STREAMSTATS_SORT_NOT_HONORED: streamstats computes its window over the backend scan
1343+
// order, ignoring a preceding `| sort` (the OVER clause has no explicit ORDER BY).
1344+
excludeTestsMatching '*CalciteStreamstatsCommandIT.testStreamstatsAndSort'
13311345
}
13321346
}
13331347

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
package org.opensearch.sql.calcite.remote;
77

88
import static org.opensearch.sql.legacy.TestsConstants.*;
9+
import static org.opensearch.sql.util.Capability.CHAINED_STREAMSTATS_BY;
910
import static org.opensearch.sql.util.Capability.DOC_MUTATION;
11+
import static org.opensearch.sql.util.Capability.STREAMSTATS_SORT_NOT_HONORED;
1012
import static org.opensearch.sql.util.MatcherUtils.*;
1113

1214
import java.io.IOException;
@@ -558,6 +560,7 @@ public void testStreamstatsGlobal() throws IOException {
558560
}
559561

560562
@Test
563+
@RequiresCapability(DOC_MUTATION)
561564
public void testStreamstatsGlobalWithNull() throws IOException {
562565
final int docId = 7;
563566
Request insertRequest =
@@ -613,6 +616,7 @@ public void testStreamstatsGlobalWithNull() throws IOException {
613616
}
614617

615618
@Test
619+
@RequiresCapability(DOC_MUTATION)
616620
public void testStreamstatsGlobalWithNullBucket() throws IOException {
617621
final int docId = 7;
618622
Request insertRequest =
@@ -718,6 +722,7 @@ public void testStreamstatsReset() throws IOException {
718722
}
719723

720724
@Test
725+
@RequiresCapability(DOC_MUTATION)
721726
public void testStreamstatsResetWithNull() throws IOException {
722727
final int docId = 7;
723728
Request insertRequest =
@@ -773,6 +778,7 @@ public void testStreamstatsResetWithNull() throws IOException {
773778
}
774779

775780
@Test
781+
@RequiresCapability(DOC_MUTATION)
776782
public void testStreamstatsResetWithNullBucket() throws IOException {
777783
final int docId = 7;
778784
Request insertRequest =
@@ -845,6 +851,7 @@ public void testUnsupportedWindowFunctions() {
845851
}
846852

847853
@Test
854+
@RequiresCapability(CHAINED_STREAMSTATS_BY)
848855
public void testMultipleStreamstats() throws IOException {
849856
JSONObject actual =
850857
executeQuery(
@@ -863,6 +870,7 @@ public void testMultipleStreamstats() throws IOException {
863870
}
864871

865872
@Test
873+
@RequiresCapability(CHAINED_STREAMSTATS_BY)
866874
public void testMultipleStreamstatsWithWindow() throws IOException {
867875
// Test case from GitHub issue #4800: chained streamstats with window=2
868876
JSONObject actual =
@@ -899,6 +907,7 @@ public void testMultipleStreamstatsWithWindow() throws IOException {
899907
// causing Calcite's RelDecorrelator to fail on duplicate correlate references.
900908

901909
@Test
910+
@RequiresCapability(CHAINED_STREAMSTATS_BY)
902911
public void testMultipleStreamstatsWithNull1() throws IOException {
903912
JSONObject actual =
904913
executeQuery(
@@ -1008,6 +1017,7 @@ public void testStreamstatsAndEventstats() throws IOException {
10081017
}
10091018

10101019
@Test
1020+
@RequiresCapability(STREAMSTATS_SORT_NOT_HONORED)
10111021
public void testStreamstatsAndSort() throws IOException {
10121022
JSONObject actual =
10131023
executeQuery(
@@ -1074,6 +1084,7 @@ public void testWhereInWithStreamstatsSubquery() throws IOException {
10741084
}
10751085

10761086
@Test
1087+
@RequiresCapability(CHAINED_STREAMSTATS_BY)
10771088
public void testMultipleStreamstatsWithEval() throws IOException {
10781089
JSONObject actual =
10791090
executeQuery(

integ-test/src/test/java/org/opensearch/sql/util/Capability.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,37 @@ public enum Capability {
469469
LUCENE_PUSHDOWN_EXPLAIN(
470470
"A test asserting a Lucene-specific pushdown fragment in the explain plan (e.g. SORT->[...])"
471471
+ " can't pass on the analytics-engine route: the DataFusion scan produces a different"
472-
+ " plan with no Lucene pushdown fragment.");
472+
+ " plan with no Lucene pushdown fragment."),
473+
474+
/**
475+
* Chaining two {@code streamstats} commands where an upstream stage partitions {@code by} a group
476+
* fails on the analytics-engine route. Each {@code streamstats ... by} stage projects a {@code
477+
* ROW_NUMBER() OVER ()} sequence column to order its window; the Calcite plan aliases these
478+
* distinctly ({@code __stream_seq__}), but the Substrait converter names both physical columns
479+
* after the operator ({@code "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"}),
480+
* so the stacked schema has a duplicate/ambiguous field name. Verified: it surfaces as a 500
481+
* ({@code Schema contains duplicate unqualified field name ...} / streaming-fragment failure) or,
482+
* for chained {@code window} streamstats, non-deterministic window values. A single {@code
483+
* streamstats by} (or a chain where only the final stage has {@code by}) works.
484+
*/
485+
CHAINED_STREAMSTATS_BY(
486+
"Chaining two streamstats where an upstream stage partitions by a group fails on the"
487+
+ " analytics-engine route: both stages emit a ROW_NUMBER() sequence column the Substrait"
488+
+ " converter names identically, producing a duplicate/ambiguous field name (500) or"
489+
+ " non-deterministic window values."),
490+
491+
/**
492+
* {@code streamstats} computes its running/window aggregate over the backend scan order on the
493+
* analytics-engine route, ignoring a preceding {@code | sort}. The {@code OVER} clause carries no
494+
* explicit {@code ORDER BY} (streamstats orders by encounter order by design), so DataFusion
495+
* evaluates the window in scan order rather than the sorted order the v2/Calcite path honors.
496+
* Verified: {@code sort age | streamstats window=2 avg(age)} yields window values computed in
497+
* insertion order, not age order, so the per-row aggregates diverge.
498+
*/
499+
STREAMSTATS_SORT_NOT_HONORED(
500+
"streamstats computes its window over the backend scan order on the analytics-engine route,"
501+
+ " ignoring a preceding | sort (the OVER clause has no explicit ORDER BY), so the window"
502+
+ " values diverge from the v2/Calcite path which honors the sort.");
473503

474504
private final String reason;
475505

0 commit comments

Comments
 (0)