Skip to content

Commit c4cac2a

Browse files
authored
Fix singleton stack-corruption NPE in DatetimeUdtNormalizeRule (opensearch-project#5458)
* fix: instantiate DatetimeUdt normalize/output-cast rules per plan() call DatetimeUdtNormalizeRule and DatetimeOutputCastRule extend RelHomogeneousShuttle, which inherits a stateful Deque<RelNode> stack from RelShuttleImpl. DatetimeExtension.postAnalysisRules() returned the static INSTANCE of each rule, sharing the same shuttle (and the same stack) across every UnifiedQueryPlanner.plan() invocation. If any traversal ever ends with an unbalanced stack, residual entries persist to the next query. The next query's visitChild() then pops a stale or empty stack and throws NoSuchElementException at RelShuttleImpl.visitChild line 67 (the stack.pop() in the finally block) — surfacing as the cluster-side stack trace reported on analytics-engine-routed parquet indices for queries that combine aggregations over datetime UDT columns (e.g. "stats count() as field_count, distinct_count(field)"). Return fresh instances per plan() instead. Drop the INSTANCE constants and the Lombok @NoArgsConstructor on both rules; document the singleton-unsafety on each class JavaDoc. Add a regression test that runs several plan() calls in sequence against the same context, covering stats+distinct_count over both schema-declared and eval-derived datetime columns. Signed-off-by: Kai Huang <ahkcs@amazon.com> * test: add analytics-engine regression IT for singleton stack-corruption CalciteDatetimeUdtNormalizeRegressionIT exercises the failure pattern that triggered the cluster-side NoSuchElementException: stats + distinct_count over datetime columns, repeated 20 times to amplify any plan() carry-over. The IT is harness-aware: - Without `-Dtests.analytics.force_routing=true`: queries go through the V2 / Calcite engine path. The DatetimeUdtNormalizeRule path is not exercised, so the IT passes as a baseline correctness check. - With `-Dtests.analytics.force_routing=true -Dtests.analytics.parquet_indices=true`: every query routes through the analytics-engine path and hits the DatetimeUdtNormalizeRule shuttle that this PR fixes. The 20-iteration pattern surfaces any remaining singleton-stack carry-over. CI's :integTest task (in-process testCluster without analytics-engine) runs the IT through the V2 path, which is safe and fast. The analytics-engine verification path is via :integTestRemote against an externally-managed cluster built per `docs/dev/ppl-analytics-engine-routing.md`. Signed-off-by: Kai Huang <ahkcs@amazon.com> * test: switch regression IT to concurrent query pattern The sequential iteration variant passed even with the singleton bug in place — local cluster doesn't carry over enough state between calls in one thread. The actual production trigger is parallel queries from a dashboard "field statistics" panel: multiple cluster threads call plan() simultaneously, all using the shared singleton's non-thread-safe ArrayDeque. Their push/pop operations interleave and corrupt the stack. Verified locally against analytics-engine path with parquet indices: - Unfixed cluster: 2-3 / 80 queries fail with NoSuchElementException (HTTP 500), matching the production stack trace exactly. - Fixed cluster: 0 / 80 failures. Uses CompletableFuture + 8-thread pool to fire 80 queries per test across: - testConcurrentStatsDistinctCountOverDatetime: same shape, varied datetime fields. - testConcurrentMixedDatetimePlans: three different plan shapes interleaved — mixed visitChild call counts amplify the race. Signed-off-by: Kai Huang <ahkcs@amazon.com> * test: rename to CalcitePlannerConcurrencyIT (review nit) @dai-chen flagged that the IT name was over-scoped to a single rule and the file would read better as a general bucket for planner-level concurrency / state-isolation regressions. The actual surface under test is UnifiedQueryPlanner's post-analysis pipeline — any RelShuttle extension that doesn't isolate per-call state is unsafe under concurrent load, not just the datetime rules. Renames the file and class, updates the JavaDoc to describe the planner- level invariant rather than the specific Datetime* rules, and notes the current cases as the regression that motivated the suite. Test method bodies and assertions are unchanged. Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent dc05942 commit c4cac2a

5 files changed

Lines changed: 187 additions & 12 deletions

File tree

api/src/main/java/org/opensearch/sql/api/spec/datetime/DatetimeExtension.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ public class DatetimeExtension implements LanguageExtension {
2222

2323
@Override
2424
public List<RelShuttle> postAnalysisRules() {
25-
return List.of(DatetimeUdtNormalizeRule.INSTANCE, DatetimeOutputCastRule.INSTANCE);
25+
// Fresh instances per plan() because RelHomogeneousShuttle inherits a stateful stack.
26+
return List.of(new DatetimeUdtNormalizeRule(), new DatetimeOutputCastRule());
2627
}
2728

2829
/** Maps datetime UDT types to their standard Calcite equivalents. */

api/src/main/java/org/opensearch/sql/api/spec/datetime/DatetimeOutputCastRule.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
import java.util.ArrayList;
1111
import java.util.List;
12-
import lombok.AccessLevel;
13-
import lombok.NoArgsConstructor;
1412
import org.apache.calcite.rel.RelHomogeneousShuttle;
1513
import org.apache.calcite.rel.RelNode;
1614
import org.apache.calcite.rel.logical.LogicalProject;
@@ -21,12 +19,14 @@
2119
import org.apache.calcite.rex.RexNode;
2220
import org.apache.calcite.sql.type.SqlTypeName;
2321

24-
/** Wraps the root output with CAST(datetime → VARCHAR) for PPL wire-format compatibility. */
25-
@NoArgsConstructor(access = AccessLevel.PRIVATE)
22+
/**
23+
* Wraps the root output with CAST(datetime → VARCHAR) for PPL wire-format compatibility.
24+
*
25+
* <p>Not a singleton: {@link RelHomogeneousShuttle} inherits a stateful {@code stack} field from
26+
* {@link org.apache.calcite.rel.RelShuttleImpl}, so a fresh instance must be used per plan().
27+
*/
2628
class DatetimeOutputCastRule extends RelHomogeneousShuttle {
2729

28-
static final DatetimeOutputCastRule INSTANCE = new DatetimeOutputCastRule();
29-
3030
@Override
3131
public RelNode visit(RelNode other) {
3232
List<RelDataTypeField> fields = other.getRowType().getFieldList();

api/src/main/java/org/opensearch/sql/api/spec/datetime/DatetimeUdtNormalizeRule.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
package org.opensearch.sql.api.spec.datetime;
77

88
import java.util.Optional;
9-
import lombok.AccessLevel;
10-
import lombok.NoArgsConstructor;
119
import org.apache.calcite.rel.RelHomogeneousShuttle;
1210
import org.apache.calcite.rel.RelNode;
1311
import org.apache.calcite.rel.type.RelDataType;
@@ -22,12 +20,12 @@
2220
/**
2321
* Temporary patch that rewrites datetime UDT return types on RexCall nodes to standard Calcite
2422
* types.
23+
*
24+
* <p>Not a singleton: {@link RelHomogeneousShuttle} inherits a stateful {@code stack} field from
25+
* {@link org.apache.calcite.rel.RelShuttleImpl}, so a fresh instance must be used per plan().
2526
*/
26-
@NoArgsConstructor(access = AccessLevel.PRIVATE)
2727
class DatetimeUdtNormalizeRule extends RelHomogeneousShuttle {
2828

29-
static final DatetimeUdtNormalizeRule INSTANCE = new DatetimeUdtNormalizeRule();
30-
3129
@Override
3230
public RelNode visit(RelNode other) {
3331
RelNode visited = super.visit(other);

api/src/test/java/org/opensearch/sql/api/spec/datetime/DatetimeExtensionTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,27 @@ public void testNonDatetimeFieldsNotWrapped() {
154154
""");
155155
}
156156

157+
@Test
158+
public void testSequentialPlanCallsDoNotCorruptShuttleStack() {
159+
// Regression test: DatetimeUdtNormalizeRule extends RelHomogeneousShuttle which inherits a
160+
// stateful Deque<RelNode> stack field. Earlier implementations used a static INSTANCE shared
161+
// across all plan() calls; under workloads with aggregations (especially count + distinct_count
162+
// over datetime columns), the shared stack would desynchronize and visitChild's pop would throw
163+
// NoSuchElementException on subsequent plan calls. Running several distinct plans through the
164+
// same context confirms each invocation gets a fresh shuttle.
165+
for (int i = 0; i < 5; i++) {
166+
planner.plan(
167+
"source = catalog.events"
168+
+ " | stats count() as field_count, distinct_count(created_at) as distinct_count");
169+
planner.plan(
170+
"source = catalog.events"
171+
+ " | eval ts = TIMESTAMP(name)"
172+
+ " | stats count() as field_count, distinct_count(ts) as distinct_count");
173+
planner.plan(
174+
"source = catalog.events | where created_at > \"2024-01-01\" | fields hire_date");
175+
}
176+
}
177+
157178
@Test
158179
public void testOutputCastCanCompileAndExecute() throws Exception {
159180
RelNode plan =
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.remote;
7+
8+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DATE_FORMATS;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.ExecutionException;
14+
import java.util.concurrent.Executors;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
import org.junit.jupiter.api.Test;
18+
import org.opensearch.sql.ppl.PPLIntegTestCase;
19+
20+
/**
21+
* Integration tests for {@code UnifiedQueryPlanner} state isolation under concurrent load.
22+
*
23+
* <p>The planner's post-analysis pipeline (extensions registered via {@code
24+
* LanguageSpec.postAnalysisRules}) uses Calcite {@code RelShuttle} subclasses. {@code
25+
* RelShuttleImpl} inherits a non-thread-safe {@code ArrayDeque<RelNode>} stack used by {@code
26+
* visitChild}'s push/pop. Any extension that returns the same shuttle instance across {@code
27+
* plan()} calls is unsafe under concurrent load: cluster threads call {@code plan()} simultaneously
28+
* and their push/pop on the shared stack interleave, leaving residual entries that surface on a
29+
* subsequent traversal as {@code NoSuchElementException} at {@code RelShuttleImpl.visitChild} line
30+
* 67 (the {@code stack.pop()} in the {@code finally} block).
31+
*
32+
* <p>The test methods here fire many queries through a thread pool to exercise concurrent {@code
33+
* plan()} invocations. New planner-level concurrency / state-isolation regressions belong in this
34+
* class. The current cases cover {@code DatetimeExtension}'s {@code RelHomogeneousShuttle}
35+
* subclasses ({@code DatetimeUdtNormalizeRule}, {@code DatetimeOutputCastRule}) which were
36+
* previously returned as static {@code INSTANCE}s and caused the production failure that motivated
37+
* this suite.
38+
*
39+
* <p>Run via:
40+
*
41+
* <pre>{@code
42+
* ./gradlew :integ-test:integTestRemote \
43+
* -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9300 \
44+
* -Dtests.clustername=runTask \
45+
* -Dtests.analytics.force_routing=true \
46+
* -Dtests.analytics.parquet_indices=true \
47+
* --tests org.opensearch.sql.calcite.remote.CalcitePlannerConcurrencyIT
48+
* }</pre>
49+
*/
50+
public class CalcitePlannerConcurrencyIT extends PPLIntegTestCase {
51+
52+
/** Concurrency level — matches the rough parallelism of a dashboard field-stats panel. */
53+
private static final int PARALLELISM = 8;
54+
55+
/** Total queries fired per test. */
56+
private static final int QUERIES = 80;
57+
58+
@Override
59+
public void init() throws Exception {
60+
super.init();
61+
enableCalcite();
62+
63+
// DATE_FORMATS has many datetime columns of different formats/precisions. With
64+
// -Dtests.analytics.parquet_indices=true the helper provisions it as a parquet-backed
65+
// composite index — required for analytics-engine routing.
66+
loadIndex(Index.DATE_FORMATS);
67+
}
68+
69+
@Test
70+
public void testConcurrentStatsDistinctCountOverDatetime() throws Exception {
71+
String[] fields = {
72+
"epoch_millis", "epoch_second", "date_optional_time", "strict_date_optional_time"
73+
};
74+
List<String> queries = new ArrayList<>(QUERIES);
75+
for (int i = 0; i < QUERIES; i++) {
76+
String field = fields[i % fields.length];
77+
queries.add(
78+
String.format(
79+
"source=%s | stats count() as field_count, distinct_count(%s) as distinct_count",
80+
TEST_INDEX_DATE_FORMATS, field));
81+
}
82+
executeConcurrent(queries);
83+
}
84+
85+
@Test
86+
public void testConcurrentMixedDatetimePlans() throws Exception {
87+
// Mix three plan shapes: stats+distinct_count, plain field projection (datetime cast), and
88+
// stats by a different field. Different plan shapes push the planner's post-analysis shuttles
89+
// through different visitChild call counts — amplifying any cross-query stack pollution.
90+
List<String> shapes =
91+
List.of(
92+
"source=%s | stats count() as field_count, distinct_count(epoch_millis) as"
93+
+ " distinct_count",
94+
"source=%s | fields epoch_millis, epoch_second, date_optional_time",
95+
"source=%s | stats count() as field_count, distinct_count(epoch_second) as"
96+
+ " distinct_count by date_optional_time");
97+
List<String> queries = new ArrayList<>(QUERIES);
98+
for (int i = 0; i < QUERIES; i++) {
99+
queries.add(String.format(shapes.get(i % shapes.size()), TEST_INDEX_DATE_FORMATS));
100+
}
101+
executeConcurrent(queries);
102+
}
103+
104+
/**
105+
* Fire all queries through a fixed-size thread pool. Asserts every query completes without
106+
* exception. With a shuttle-state-leak bug present this triggers {@code NoSuchElementException}
107+
* on at least one task once the stack interleaving corrupts state.
108+
*/
109+
private void executeConcurrent(List<String> queries) throws Exception {
110+
var executor = Executors.newFixedThreadPool(PARALLELISM);
111+
try {
112+
List<CompletableFuture<Void>> futures = new ArrayList<>(queries.size());
113+
AtomicInteger failures = new AtomicInteger();
114+
List<Throwable> errors = new ArrayList<>();
115+
for (String query : queries) {
116+
futures.add(
117+
CompletableFuture.runAsync(
118+
() -> {
119+
try {
120+
executeQuery(query);
121+
} catch (Exception e) {
122+
failures.incrementAndGet();
123+
synchronized (errors) {
124+
errors.add(e);
125+
}
126+
}
127+
},
128+
executor));
129+
}
130+
for (CompletableFuture<Void> f : futures) {
131+
try {
132+
f.get(60, TimeUnit.SECONDS);
133+
} catch (ExecutionException e) {
134+
failures.incrementAndGet();
135+
synchronized (errors) {
136+
errors.add(e.getCause());
137+
}
138+
}
139+
}
140+
if (failures.get() > 0) {
141+
StringBuilder msg = new StringBuilder();
142+
msg.append(failures.get()).append("/").append(queries.size()).append(" queries failed:");
143+
synchronized (errors) {
144+
for (int i = 0; i < Math.min(3, errors.size()); i++) {
145+
msg.append("\n - ").append(errors.get(i));
146+
}
147+
}
148+
throw new AssertionError(msg.toString());
149+
}
150+
} finally {
151+
executor.shutdown();
152+
executor.awaitTermination(30, TimeUnit.SECONDS);
153+
}
154+
}
155+
}

0 commit comments

Comments
 (0)