Skip to content

Commit 679d8ca

Browse files
authored
Support timeouts for Calcite queries (#4857)
1 parent 2f6cbfd commit 679d8ca

20 files changed

Lines changed: 237 additions & 44 deletions

File tree

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public enum Key {
2525

2626
/** PPL Settings. */
2727
PPL_ENABLED("plugins.ppl.enabled"),
28+
PPL_QUERY_TIMEOUT("plugins.ppl.query.timeout"),
2829
PATTERN_METHOD("plugins.ppl.pattern.method"),
2930
PATTERN_MODE("plugins.ppl.pattern.mode"),
3031
PATTERN_MAX_SAMPLE_COUNT("plugins.ppl.pattern.max.sample.count"),

docs/user/ppl/admin/settings.rst

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,42 @@ PPL query::
7373
"status": 400
7474
}
7575

76+
plugins.ppl.query.timeout
77+
=========================
78+
79+
Description
80+
-----------
81+
82+
This setting controls the maximum execution time for PPL queries. When a query exceeds this timeout, it will be interrupted and return a timeout error.
83+
84+
1. The default value is 300s (5 minutes).
85+
2. This setting is node scope.
86+
3. This setting can be updated dynamically.
87+
88+
Example
89+
-------
90+
91+
You can configure the query timeout:
92+
93+
PPL query::
94+
95+
sh$ curl -sS -H 'Content-Type: application/json' \
96+
... -X PUT localhost:9200/_plugins/_query/settings \
97+
... -d '{"transient" : {"plugins.ppl.query.timeout" : "60s"}}'
98+
{
99+
"acknowledged": true,
100+
"persistent": {},
101+
"transient": {
102+
"plugins": {
103+
"ppl": {
104+
"query": {
105+
"timeout": "60s"
106+
}
107+
}
108+
}
109+
}
110+
}
111+
76112
plugins.query.memory_limit
77113
==========================
78114

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,79 @@
77

88
import java.util.Map;
99
import lombok.RequiredArgsConstructor;
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.apache.logging.log4j.ThreadContext;
13+
import org.opensearch.OpenSearchTimeoutException;
1114
import org.opensearch.common.unit.TimeValue;
15+
import org.opensearch.sql.common.setting.Settings;
1216
import org.opensearch.sql.executor.QueryId;
1317
import org.opensearch.sql.executor.QueryManager;
1418
import org.opensearch.sql.executor.execution.AbstractPlan;
19+
import org.opensearch.threadpool.Scheduler;
1520
import org.opensearch.threadpool.ThreadPool;
1621
import org.opensearch.transport.client.node.NodeClient;
1722

1823
/** QueryManager implemented in OpenSearch cluster. */
1924
@RequiredArgsConstructor
2025
public class OpenSearchQueryManager implements QueryManager {
2126

27+
private static final Logger LOG = LogManager.getLogger(OpenSearchQueryManager.class);
28+
2229
private final NodeClient nodeClient;
2330

31+
private final Settings settings;
32+
2433
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
2534
public static final String SQL_BACKGROUND_THREAD_POOL_NAME = "sql_background_io";
2635

2736
@Override
2837
public QueryId submit(AbstractPlan queryPlan) {
29-
schedule(nodeClient, () -> queryPlan.execute());
38+
TimeValue timeout = settings.getSettingValue(Settings.Key.PPL_QUERY_TIMEOUT);
39+
schedule(nodeClient, queryPlan::execute, timeout);
3040

3141
return queryPlan.getQueryId();
3242
}
3343

34-
private void schedule(NodeClient client, Runnable task) {
44+
private void schedule(NodeClient client, Runnable task, TimeValue timeout) {
3545
ThreadPool threadPool = client.threadPool();
36-
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
46+
47+
Runnable wrappedTask =
48+
withCurrentContext(
49+
() -> {
50+
final Thread executionThread = Thread.currentThread();
51+
52+
Scheduler.ScheduledCancellable timeoutTask =
53+
threadPool.schedule(
54+
() -> {
55+
LOG.warn(
56+
"Query execution timed out after {}. Interrupting execution thread.",
57+
timeout);
58+
executionThread.interrupt();
59+
},
60+
timeout,
61+
ThreadPool.Names.GENERIC);
62+
63+
try {
64+
task.run();
65+
timeoutTask.cancel();
66+
// Clear any leftover thread interrupts to keep the thread pool clean
67+
Thread.interrupted();
68+
} catch (Exception e) {
69+
timeoutTask.cancel();
70+
71+
// Special-case handling of timeout-related interruptions
72+
if (Thread.interrupted() || e.getCause() instanceof InterruptedException) {
73+
LOG.error("Query was interrupted due to timeout after {}", timeout);
74+
throw new OpenSearchTimeoutException(
75+
"Query execution timed out after " + timeout);
76+
}
77+
78+
throw e;
79+
}
80+
});
81+
82+
threadPool.schedule(wrappedTask, new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
3783
}
3884

3985
private Runnable withCurrentContext(final Runnable task) {

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/AggregateIndexScanRule.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import java.util.function.Predicate;
1414
import org.apache.calcite.plan.RelOptRuleCall;
1515
import org.apache.calcite.plan.RelOptUtil;
16-
import org.apache.calcite.plan.RelRule;
1716
import org.apache.calcite.rel.AbstractRelNode;
1817
import org.apache.calcite.rel.RelNode;
1918
import org.apache.calcite.rel.core.Aggregate;
@@ -40,15 +39,15 @@
4039

4140
/** Planner rule that push a {@link LogicalAggregate} down to {@link CalciteLogicalIndexScan} */
4241
@Value.Enclosing
43-
public class AggregateIndexScanRule extends RelRule<AggregateIndexScanRule.Config> {
42+
public class AggregateIndexScanRule extends InterruptibleRelRule<AggregateIndexScanRule.Config> {
4443

4544
/** Creates a AggregateIndexScanRule. */
4645
protected AggregateIndexScanRule(Config config) {
4746
super(config);
4847
}
4948

5049
@Override
51-
public void onMatch(RelOptRuleCall call) {
50+
protected void onMatchImpl(RelOptRuleCall call) {
5251
if (call.rels.length == 5) {
5352
final LogicalAggregate aggregate = call.rel(0);
5453
final LogicalProject topProject = call.rel(1);

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.List;
1010
import java.util.function.Predicate;
1111
import org.apache.calcite.plan.RelOptRuleCall;
12-
import org.apache.calcite.plan.RelRule;
1312
import org.apache.calcite.rel.logical.LogicalAggregate;
1413
import org.apache.calcite.rel.logical.LogicalFilter;
1514
import org.apache.calcite.rel.logical.LogicalProject;
@@ -31,16 +30,17 @@
3130
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;
3231

3332
@Value.Enclosing
34-
public class DedupPushdownRule extends RelRule<DedupPushdownRule.Config> {
33+
public class DedupPushdownRule extends InterruptibleRelRule<DedupPushdownRule.Config> {
3534
private static final Logger LOG = LogManager.getLogger();
3635

3736
protected DedupPushdownRule(Config config) {
3837
super(config);
3938
}
4039

4140
@Override
42-
public void onMatch(RelOptRuleCall call) {
41+
protected void onMatchImpl(RelOptRuleCall call) {
4342
final LogicalProject finalProject = call.rel(0);
43+
// TODO Used when number of duplication is more than 1
4444
final LogicalFilter numOfDedupFilter = call.rel(1);
4545
final LogicalProject projectWithWindow = call.rel(2);
4646
if (call.rels.length == 5) {

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ExpandCollationOnProjectExprRule.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.function.Predicate;
1212
import org.apache.calcite.adapter.enumerable.EnumerableProject;
1313
import org.apache.calcite.plan.RelOptRuleCall;
14-
import org.apache.calcite.plan.RelRule;
1514
import org.apache.calcite.plan.RelTrait;
1615
import org.apache.calcite.plan.RelTraitSet;
1716
import org.apache.calcite.plan.volcano.AbstractConverter;
@@ -44,14 +43,14 @@
4443
*/
4544
@Value.Enclosing
4645
public class ExpandCollationOnProjectExprRule
47-
extends RelRule<ExpandCollationOnProjectExprRule.Config> {
46+
extends InterruptibleRelRule<ExpandCollationOnProjectExprRule.Config> {
4847

4948
protected ExpandCollationOnProjectExprRule(Config config) {
5049
super(config);
5150
}
5251

5352
@Override
54-
public void onMatch(RelOptRuleCall call) {
53+
protected void onMatchImpl(RelOptRuleCall call) {
5554
final AbstractConverter converter = call.rel(0);
5655
final Project project = call.rel(1);
5756
final RelTraitSet toTraits = converter.getTraitSet();

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/FilterIndexScanRule.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import java.util.function.Predicate;
99
import org.apache.calcite.plan.RelOptRuleCall;
10-
import org.apache.calcite.plan.RelRule;
1110
import org.apache.calcite.rel.AbstractRelNode;
1211
import org.apache.calcite.rel.core.Filter;
1312
import org.apache.calcite.rel.logical.LogicalFilter;
@@ -18,15 +17,15 @@
1817

1918
/** Planner rule that push a {@link LogicalFilter} down to {@link CalciteLogicalIndexScan} */
2019
@Value.Enclosing
21-
public class FilterIndexScanRule extends RelRule<FilterIndexScanRule.Config> {
20+
public class FilterIndexScanRule extends InterruptibleRelRule<FilterIndexScanRule.Config> {
2221

2322
/** Creates a FilterIndexScanRule. */
2423
protected FilterIndexScanRule(Config config) {
2524
super(config);
2625
}
2726

2827
@Override
29-
public void onMatch(RelOptRuleCall call) {
28+
protected void onMatchImpl(RelOptRuleCall call) {
3029
if (call.rels.length == 2) {
3130
// the ordinary variant
3231
final LogicalFilter filter = call.rel(0);
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.opensearch.planner.rules;
7+
8+
import org.apache.calcite.plan.RelOptRuleCall;
9+
import org.apache.calcite.plan.RelRule;
10+
import org.opensearch.OpenSearchTimeoutException;
11+
import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig;
12+
13+
/**
14+
* Base class for OpenSearch planner rules that automatically checks for thread interruption during
15+
* query planning. This ensures that long-running planning operations can be interrupted when a
16+
* query timeout occurs.
17+
*
18+
* <p>All OpenSearch planner rules should extend this class instead of extending {@link RelRule}
19+
* directly. This provides automatic timeout support without requiring manual interruption checks in
20+
* each rule.
21+
*
22+
* <p>Example usage:
23+
*
24+
* <pre>{@code
25+
* public class MyCustomRule extends InterruptibleRelRule<MyCustomRule.Config> {
26+
* protected MyCustomRule(Config config) {
27+
* super(config);
28+
* }
29+
*
30+
* @Override
31+
* protected void onMatchImpl(RelOptRuleCall call) {
32+
* // Rule implementation - interruption is checked automatically
33+
* // before this method is called
34+
* }
35+
* }
36+
* }</pre>
37+
*
38+
* @param <C> the configuration type for this rule
39+
*/
40+
public abstract class InterruptibleRelRule<C extends OpenSearchRuleConfig> extends RelRule<C> {
41+
42+
/**
43+
* Constructs an InterruptibleRelRule with the given configuration.
44+
*
45+
* @param config the rule configuration
46+
*/
47+
protected InterruptibleRelRule(C config) {
48+
super(config);
49+
}
50+
51+
/**
52+
* Called when the rule matches. This method checks for thread interruption before delegating to
53+
* the implementation-specific {@link #onMatchImpl(RelOptRuleCall)} method.
54+
*
55+
* <p>Do not override this method in subclasses. Instead, override {@link
56+
* #onMatchImpl(RelOptRuleCall)}.
57+
*
58+
* @param call the rule call context
59+
* @throws RuntimeException wrapping {@link InterruptedException} if the thread has been
60+
* interrupted
61+
*/
62+
@Override
63+
public final void onMatch(RelOptRuleCall call) {
64+
if (Thread.currentThread().isInterrupted()) {
65+
throw new OpenSearchTimeoutException(
66+
new InterruptedException(
67+
"Query planning interrupted in rule: " + getClass().getSimpleName()));
68+
}
69+
70+
onMatchImpl(call);
71+
}
72+
73+
/**
74+
* Implementation-specific match handler. Subclasses must implement this method instead of
75+
* overriding {@link #onMatch(RelOptRuleCall)}.
76+
*
77+
* <p>This method is called after an automatic interruption check. If the thread has been
78+
* interrupted (due to a timeout), this method will not be called.
79+
*
80+
* @param call the rule call context
81+
*/
82+
protected abstract void onMatchImpl(RelOptRuleCall call);
83+
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/LimitIndexScanRule.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import java.util.Objects;
99
import org.apache.calcite.plan.RelOptRuleCall;
10-
import org.apache.calcite.plan.RelRule;
1110
import org.apache.calcite.rel.AbstractRelNode;
1211
import org.apache.calcite.rel.logical.LogicalSort;
1312
import org.apache.calcite.rex.RexLiteral;
@@ -22,14 +21,14 @@
2221
* down to {@link CalciteLogicalIndexScan}
2322
*/
2423
@Value.Enclosing
25-
public class LimitIndexScanRule extends RelRule<LimitIndexScanRule.Config> {
24+
public class LimitIndexScanRule extends InterruptibleRelRule<LimitIndexScanRule.Config> {
2625

2726
protected LimitIndexScanRule(Config config) {
2827
super(config);
2928
}
3029

3130
@Override
32-
public void onMatch(RelOptRuleCall call) {
31+
protected void onMatchImpl(RelOptRuleCall call) {
3332
final LogicalSort sort = call.rel(0);
3433
final CalciteLogicalIndexScan scan = call.rel(1);
3534

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/ProjectIndexScanRule.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import java.util.Objects;
1313
import org.apache.calcite.plan.RelOptRuleCall;
1414
import org.apache.calcite.plan.RelOptTable;
15-
import org.apache.calcite.plan.RelRule;
1615
import org.apache.calcite.rel.logical.LogicalProject;
1716
import org.apache.calcite.rex.RexInputRef;
1817
import org.apache.calcite.rex.RexNode;
@@ -27,15 +26,15 @@
2726

2827
/** Planner rule that push a {@link LogicalProject} down to {@link CalciteLogicalIndexScan} */
2928
@Value.Enclosing
30-
public class ProjectIndexScanRule extends RelRule<ProjectIndexScanRule.Config> {
29+
public class ProjectIndexScanRule extends InterruptibleRelRule<ProjectIndexScanRule.Config> {
3130

3231
/** Creates a ProjectIndexScanRule. */
3332
protected ProjectIndexScanRule(Config config) {
3433
super(config);
3534
}
3635

3736
@Override
38-
public void onMatch(RelOptRuleCall call) {
37+
protected void onMatchImpl(RelOptRuleCall call) {
3938
if (call.rels.length == 2) {
4039
// the ordinary variant
4140
final LogicalProject project = call.rel(0);

0 commit comments

Comments
 (0)