|
5 | 5 |
|
6 | 6 | package org.opensearch.sql.calcite.remote; |
7 | 7 |
|
8 | | -import static org.junit.Assert.assertNotNull; |
9 | 8 | import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DATE_FORMATS; |
10 | 9 |
|
11 | | -import java.io.IOException; |
12 | | -import org.json.JSONObject; |
| 10 | +import java.util.ArrayList; |
| 11 | +import java.util.List; |
| 12 | +import java.util.concurrent.CompletableFuture; |
| 13 | +import java.util.concurrent.ExecutionException; |
| 14 | +import java.util.concurrent.Executors; |
| 15 | +import java.util.concurrent.TimeUnit; |
| 16 | +import java.util.concurrent.atomic.AtomicInteger; |
13 | 17 | import org.junit.jupiter.api.Test; |
14 | 18 | import org.opensearch.sql.ppl.PPLIntegTestCase; |
15 | 19 |
|
16 | 20 | /** |
17 | 21 | * Regression IT for the {@code DatetimeUdtNormalizeRule} / {@code DatetimeOutputCastRule} |
18 | 22 | * singleton-stack-corruption bug. |
19 | 23 | * |
20 | | - * <p>Both rules extend Calcite's {@code RelHomogeneousShuttle}, which inherits a stateful {@code |
21 | | - * Deque<RelNode>} stack from {@code RelShuttleImpl}. Earlier code returned the same {@code |
22 | | - * INSTANCE} of each rule from {@code DatetimeExtension.postAnalysisRules()} on every {@code |
23 | | - * UnifiedQueryPlanner.plan()} call. If any traversal ever finished with an unbalanced stack, |
24 | | - * residual entries persisted to the next query and the next {@code visitChild()} popped a stale or |
25 | | - * empty stack — surfacing as {@code NoSuchElementException} at {@code RelShuttleImpl.visitChild} |
26 | | - * line 67. |
| 24 | + * <p>Both rules extend Calcite's {@code RelHomogeneousShuttle}, which inherits a stateful |
| 25 | + * non-thread-safe {@code ArrayDeque<RelNode>} stack from {@code RelShuttleImpl}. Earlier code |
| 26 | + * returned the same {@code INSTANCE} of each rule from {@code |
| 27 | + * DatetimeExtension.postAnalysisRules()} on every {@code UnifiedQueryPlanner.plan()} call. Under |
| 28 | + * parallel query load — exactly what a dashboard "field statistics" panel issues when it probes |
| 29 | + * every field in an index concurrently — multiple cluster threads call {@code plan()} |
| 30 | + * simultaneously and their push/pop on the shared stack interleave, leaving residual entries that |
| 31 | + * surface on a subsequent traversal as {@code NoSuchElementException} at {@code |
| 32 | + * RelShuttleImpl.visitChild} line 67 (the {@code stack.pop()} in the {@code finally} block). |
27 | 33 | * |
28 | | - * <p>The failure was reported as intermittent on dashboards issuing field-statistics queries of the |
29 | | - * shape {@code stats count() as field_count, distinct_count(field)} against parquet-backed indices. |
30 | | - * This IT runs the same query shape repeatedly against a parquet-backed (composite) index with |
31 | | - * multiple datetime columns to exercise the analytics-engine route end-to-end. |
| 34 | + * <p>This IT reproduces the production failure by firing many {@code distinct_count} queries over |
| 35 | + * datetime fields concurrently against a parquet-backed (composite) index. |
32 | 36 | * |
33 | 37 | * <p>Run via: |
34 | 38 | * |
|
43 | 47 | */ |
44 | 48 | public class CalciteDatetimeUdtNormalizeRegressionIT extends PPLIntegTestCase { |
45 | 49 |
|
46 | | - /** Number of repetitions per test. Singleton bug surfaces order-dependent across plan() calls. */ |
47 | | - private static final int ITERATIONS = 20; |
| 50 | + /** Concurrency level — matches the rough parallelism of a dashboard field-stats panel. */ |
| 51 | + private static final int PARALLELISM = 8; |
| 52 | + |
| 53 | + /** Total queries fired per test. */ |
| 54 | + private static final int QUERIES = 80; |
48 | 55 |
|
49 | 56 | @Override |
50 | 57 | public void init() throws Exception { |
51 | 58 | super.init(); |
52 | 59 | enableCalcite(); |
53 | 60 |
|
54 | | - // DATE_FORMATS index has many date columns of different formats / precisions, which is what |
55 | | - // dashboard field-statistics panels iterate over. Loaded through the helper so that with |
56 | | - // -Dtests.analytics.parquet_indices=true it gets provisioned as a parquet-backed composite |
57 | | - // index — required for analytics-engine routing. |
| 61 | + // DATE_FORMATS has many datetime columns of different formats/precisions. With |
| 62 | + // -Dtests.analytics.parquet_indices=true the helper provisions it as a parquet-backed |
| 63 | + // composite index — required for analytics-engine routing. |
58 | 64 | loadIndex(Index.DATE_FORMATS); |
59 | 65 | } |
60 | 66 |
|
61 | 67 | @Test |
62 | | - public void testSequentialStatsDistinctCountOverDatetime() throws IOException { |
63 | | - // Bug repro: each iteration runs a stats + distinct_count over a datetime field. With the |
64 | | - // singleton rule, residual entries on the shuttle's internal stack from the previous plan() |
65 | | - // would cause NoSuchElementException on a subsequent traversal. With fresh instances per |
66 | | - // plan(), the stack is always empty at entry and the iterations all succeed. |
67 | | - String query = |
68 | | - String.format( |
69 | | - "source=%s | stats count() as field_count, distinct_count(epoch_millis) as" |
70 | | - + " distinct_count", |
71 | | - TEST_INDEX_DATE_FORMATS); |
72 | | - for (int i = 0; i < ITERATIONS; i++) { |
73 | | - JSONObject result = executeQuery(query); |
74 | | - // Sanity: response was returned without a cluster-side exception. |
75 | | - assertNotNull("iteration " + i + " produced no result", result); |
| 68 | + public void testConcurrentStatsDistinctCountOverDatetime() throws Exception { |
| 69 | + String[] fields = { |
| 70 | + "epoch_millis", "epoch_second", "date_optional_time", "strict_date_optional_time" |
| 71 | + }; |
| 72 | + List<String> queries = new ArrayList<>(QUERIES); |
| 73 | + for (int i = 0; i < QUERIES; i++) { |
| 74 | + String field = fields[i % fields.length]; |
| 75 | + queries.add( |
| 76 | + String.format( |
| 77 | + "source=%s | stats count() as field_count, distinct_count(%s) as distinct_count", |
| 78 | + TEST_INDEX_DATE_FORMATS, field)); |
76 | 79 | } |
| 80 | + executeConcurrent(queries); |
77 | 81 | } |
78 | 82 |
|
79 | 83 | @Test |
80 | | - public void testInterleavedStatsAndDatetimeProjection() throws IOException { |
81 | | - // Bug repro variant: interleave stats+distinct_count with a plain datetime projection that |
82 | | - // exercises the DatetimeOutputCastRule. Different plan shapes per iteration push the rule |
83 | | - // through different visitChild paths and surface stack desync faster. |
84 | | - for (int i = 0; i < ITERATIONS; i++) { |
85 | | - executeQuery( |
86 | | - String.format( |
87 | | - "source=%s | stats count() as field_count, distinct_count(epoch_millis) as" |
88 | | - + " distinct_count", |
89 | | - TEST_INDEX_DATE_FORMATS)); |
90 | | - executeQuery( |
91 | | - String.format( |
92 | | - "source=%s | fields epoch_millis, epoch_second, date_optional_time", |
93 | | - TEST_INDEX_DATE_FORMATS)); |
94 | | - executeQuery( |
95 | | - String.format( |
96 | | - "source=%s | stats count() as field_count, distinct_count(epoch_second) as" |
97 | | - + " distinct_count", |
98 | | - TEST_INDEX_DATE_FORMATS)); |
| 84 | + public void testConcurrentMixedDatetimePlans() throws Exception { |
| 85 | + // Mix three plan shapes: stats+distinct_count, plain field projection (datetime cast), and |
| 86 | + // stats by a different field. Different plan shapes push the singleton shuttle through |
| 87 | + // different visitChild call counts — making cross-query stack pollution more likely. |
| 88 | + List<String> shapes = |
| 89 | + List.of( |
| 90 | + "source=%s | stats count() as field_count, distinct_count(epoch_millis) as" |
| 91 | + + " distinct_count", |
| 92 | + "source=%s | fields epoch_millis, epoch_second, date_optional_time", |
| 93 | + "source=%s | stats count() as field_count, distinct_count(epoch_second) as" |
| 94 | + + " distinct_count by date_optional_time"); |
| 95 | + List<String> queries = new ArrayList<>(QUERIES); |
| 96 | + for (int i = 0; i < QUERIES; i++) { |
| 97 | + queries.add(String.format(shapes.get(i % shapes.size()), TEST_INDEX_DATE_FORMATS)); |
99 | 98 | } |
| 99 | + executeConcurrent(queries); |
100 | 100 | } |
101 | 101 |
|
102 | | - @Test |
103 | | - public void testDistinctCountOverMultipleDatetimeFields() throws IOException { |
104 | | - // Bug repro variant: iterate distinct_count over different datetime fields in sequence — |
105 | | - // mirrors the dashboard field-statistics tab that probes every field in the index. |
106 | | - String[] datetimeFields = { |
107 | | - "epoch_millis", "epoch_second", "date_optional_time", "strict_date_optional_time" |
108 | | - }; |
109 | | - for (int i = 0; i < ITERATIONS; i++) { |
110 | | - String field = datetimeFields[i % datetimeFields.length]; |
111 | | - executeQuery( |
112 | | - String.format( |
113 | | - "source=%s | stats count() as field_count, distinct_count(%s) as distinct_count", |
114 | | - TEST_INDEX_DATE_FORMATS, field)); |
| 102 | + /** |
| 103 | + * Fire all queries through a fixed-size thread pool. Asserts every query completes without |
| 104 | + * exception. With the singleton bug present this triggers {@code NoSuchElementException} on at |
| 105 | + * least one task once the stack interleaving corrupts state. |
| 106 | + */ |
| 107 | + private void executeConcurrent(List<String> queries) throws Exception { |
| 108 | + var executor = Executors.newFixedThreadPool(PARALLELISM); |
| 109 | + try { |
| 110 | + List<CompletableFuture<Void>> futures = new ArrayList<>(queries.size()); |
| 111 | + AtomicInteger failures = new AtomicInteger(); |
| 112 | + List<Throwable> errors = new ArrayList<>(); |
| 113 | + for (String query : queries) { |
| 114 | + futures.add( |
| 115 | + CompletableFuture.runAsync( |
| 116 | + () -> { |
| 117 | + try { |
| 118 | + executeQuery(query); |
| 119 | + } catch (Exception e) { |
| 120 | + failures.incrementAndGet(); |
| 121 | + synchronized (errors) { |
| 122 | + errors.add(e); |
| 123 | + } |
| 124 | + } |
| 125 | + }, |
| 126 | + executor)); |
| 127 | + } |
| 128 | + for (CompletableFuture<Void> f : futures) { |
| 129 | + try { |
| 130 | + f.get(60, TimeUnit.SECONDS); |
| 131 | + } catch (ExecutionException e) { |
| 132 | + failures.incrementAndGet(); |
| 133 | + synchronized (errors) { |
| 134 | + errors.add(e.getCause()); |
| 135 | + } |
| 136 | + } |
| 137 | + } |
| 138 | + if (failures.get() > 0) { |
| 139 | + StringBuilder msg = new StringBuilder(); |
| 140 | + msg.append(failures.get()).append("/").append(queries.size()).append(" queries failed:"); |
| 141 | + synchronized (errors) { |
| 142 | + for (int i = 0; i < Math.min(3, errors.size()); i++) { |
| 143 | + msg.append("\n - ").append(errors.get(i)); |
| 144 | + } |
| 145 | + } |
| 146 | + throw new AssertionError(msg.toString()); |
| 147 | + } |
| 148 | + } finally { |
| 149 | + executor.shutdown(); |
| 150 | + executor.awaitTermination(30, TimeUnit.SECONDS); |
115 | 151 | } |
116 | 152 | } |
117 | 153 | } |
0 commit comments