Skip to content

Commit 5cf2bfa

Browse files
committed
Support profile options for PPL - Part I Implement phases level metrics. (opensearch-project#4983)
* Init Signed-off-by: Peng Huo <penghuo@gmail.com> * Cleanup ThreadLocal Signed-off-by: Peng Huo <penghuo@gmail.com> * Update doc Signed-off-by: Peng Huo <penghuo@gmail.com> * Update Doc Signed-off-by: Peng Huo <penghuo@gmail.com> * Refactor Code Signed-off-by: Peng Huo <penghuo@gmail.com> * Remove unused code Signed-off-by: Peng Huo <penghuo@gmail.com> * Address comments Signed-off-by: Peng Huo <penghuo@gmail.com> * Add Task 1 - Add Phases level metrics Signed-off-by: Peng Huo <penghuo@gmail.com> * Reformat doc Signed-off-by: Peng Huo <penghuo@gmail.com> --------- Signed-off-by: Peng Huo <penghuo@gmail.com> (cherry picked from commit 47709a0)
1 parent e7177b1 commit 5cf2bfa

22 files changed

Lines changed: 675 additions & 29 deletions

File tree

common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public class QueryContext {
2020
/** The key of the request id in the context map. */
2121
private static final String REQUEST_ID_KEY = "request_id";
2222

23+
private static final String PROFILE_KEY = "profile";
24+
2325
/**
2426
* Generates a random UUID and adds to the {@link ThreadContext} as the request id.
2527
*
@@ -66,4 +68,20 @@ private QueryContext() {
6668
throw new AssertionError(
6769
getClass().getCanonicalName() + " is a utility class and must not be initialized");
6870
}
71+
72+
/**
73+
* Store the profile flag in thread context.
74+
*
75+
* @param profileEnabled whether profiling is enabled
76+
*/
77+
public static void setProfile(boolean profileEnabled) {
78+
ThreadContext.put(PROFILE_KEY, Boolean.toString(profileEnabled));
79+
}
80+
81+
/**
82+
* @return true if profiling flag is set in the thread context.
83+
*/
84+
public static boolean isProfileEnabled() {
85+
return Boolean.parseBoolean(ThreadContext.get(PROFILE_KEY));
86+
}
6987
}

core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
package org.opensearch.sql.calcite.utils;
2929

3030
import static java.util.Objects.requireNonNull;
31+
import static org.opensearch.sql.monitor.profile.MetricName.OPTIMIZE;
3132

3233
import com.google.common.collect.ImmutableList;
3334
import java.lang.reflect.Type;
@@ -91,6 +92,8 @@
9192
import org.opensearch.sql.calcite.plan.OpenSearchRules;
9293
import org.opensearch.sql.calcite.plan.Scannable;
9394
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
95+
import org.opensearch.sql.monitor.profile.ProfileMetric;
96+
import org.opensearch.sql.monitor.profile.QueryProfiling;
9497

9598
/**
9699
* Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory
@@ -340,6 +343,8 @@ public static class OpenSearchRelRunners {
340343
* org.apache.calcite.tools.RelRunners#run(RelNode)}
341344
*/
342345
public static PreparedStatement run(CalcitePlanContext context, RelNode rel) {
346+
ProfileMetric optimizeTime = QueryProfiling.current().getOrCreateMetric(OPTIMIZE);
347+
long startTime = System.nanoTime();
343348
final RelShuttle shuttle =
344349
new RelHomogeneousShuttle() {
345350
@Override
@@ -358,7 +363,9 @@ public RelNode visit(TableScan scan) {
358363
// the line we changed here
359364
try (Connection connection = context.connection) {
360365
final RelRunner runner = connection.unwrap(RelRunner.class);
361-
return runner.prepareStatement(rel);
366+
PreparedStatement preparedStatement = runner.prepareStatement(rel);
367+
optimizeTime.set(System.nanoTime() - startTime);
368+
return preparedStatement;
362369
} catch (SQLException e) {
363370
throw Util.throwAsRuntime(e);
364371
}

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,19 @@
3838
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
3939
import org.opensearch.sql.common.response.ResponseListener;
4040
import org.opensearch.sql.common.setting.Settings;
41+
import org.opensearch.sql.common.utils.QueryContext;
42+
import org.opensearch.sql.common.utils.QueryContext;
4143
import org.opensearch.sql.datasource.DataSourceService;
4244
import org.opensearch.sql.exception.CalciteUnsupportedException;
4345
import org.opensearch.sql.exception.NonFallbackCalciteException;
46+
import org.opensearch.sql.monitor.profile.MetricName;
47+
import org.opensearch.sql.monitor.profile.ProfileContext;
48+
import org.opensearch.sql.monitor.profile.ProfileMetric;
49+
import org.opensearch.sql.monitor.profile.QueryProfiling;
50+
import org.opensearch.sql.monitor.profile.MetricName;
51+
import org.opensearch.sql.monitor.profile.ProfileContext;
52+
import org.opensearch.sql.monitor.profile.ProfileMetric;
53+
import org.opensearch.sql.monitor.profile.QueryProfiling;
4454
import org.opensearch.sql.planner.PlanContext;
4555
import org.opensearch.sql.planner.Planner;
4656
import org.opensearch.sql.planner.logical.LogicalPaginate;
@@ -93,6 +103,10 @@ public void executeWithCalcite(
93103
CalcitePlanContext.run(
94104
() -> {
95105
try {
106+
ProfileContext profileContext =
107+
QueryProfiling.activate(QueryContext.isProfileEnabled());
108+
ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE);
109+
long analyzeStart = System.nanoTime();
96110
AccessController.doPrivileged(
97111
(PrivilegedAction<Void>)
98112
() -> {
@@ -102,6 +116,7 @@ public void executeWithCalcite(
102116
RelNode relNode = analyze(plan, context);
103117
RelNode optimized = optimize(relNode, context);
104118
RelNode calcitePlan = convertToCalcitePlan(optimized);
119+
analyzeMetric.set(System.nanoTime() - analyzeStart);
105120
executionEngine.execute(calcitePlan, context, listener);
106121
return null;
107122
});
@@ -136,6 +151,7 @@ public void explainWithCalcite(
136151
CalcitePlanContext.run(
137152
() -> {
138153
try {
154+
QueryProfiling.noop();
139155
AccessController.doPrivileged(
140156
(PrivilegedAction<Void>)
141157
() -> {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
import java.util.concurrent.atomic.LongAdder;
9+
10+
/** Concrete metric backed by {@link LongAdder}. */
11+
final class DefaultMetricImpl implements ProfileMetric {
12+
13+
private final String name;
14+
private final LongAdder value = new LongAdder();
15+
16+
/**
17+
* Construct a metric with the provided name.
18+
*
19+
* @param name metric name
20+
*/
21+
DefaultMetricImpl(String name) {
22+
this.name = name;
23+
}
24+
25+
@Override
26+
public String name() {
27+
return name;
28+
}
29+
30+
@Override
31+
public long value() {
32+
return value.sum();
33+
}
34+
35+
@Override
36+
public void add(long delta) {
37+
value.add(delta);
38+
}
39+
40+
@Override
41+
public void set(long value) {
42+
this.value.reset();
43+
this.value.add(value);
44+
}
45+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
import java.util.LinkedHashMap;
9+
import java.util.Map;
10+
import java.util.Objects;
11+
import java.util.concurrent.ConcurrentHashMap;
12+
13+
/** Default implementation that records profiling metrics. */
14+
public class DefaultProfileContext implements ProfileContext {
15+
16+
private final long startNanos = System.nanoTime();
17+
private boolean finished;
18+
private final Map<MetricName, DefaultMetricImpl> metrics = new ConcurrentHashMap<>();
19+
private QueryProfile profile;
20+
21+
public DefaultProfileContext() {}
22+
23+
/** {@inheritDoc} */
24+
@Override
25+
public ProfileMetric getOrCreateMetric(MetricName name) {
26+
Objects.requireNonNull(name, "name");
27+
return metrics.computeIfAbsent(name, key -> new DefaultMetricImpl(key.name()));
28+
}
29+
30+
/** {@inheritDoc} */
31+
@Override
32+
public synchronized QueryProfile finish() {
33+
if (finished) {
34+
return profile;
35+
}
36+
finished = true;
37+
long endNanos = System.nanoTime();
38+
Map<MetricName, Double> snapshot = new LinkedHashMap<>(MetricName.values().length);
39+
for (MetricName metricName : MetricName.values()) {
40+
DefaultMetricImpl metric = metrics.get(metricName);
41+
double millis = metric == null ? 0d : roundToMillis(metric.value());
42+
snapshot.put(metricName, millis);
43+
}
44+
double totalMillis = roundToMillis(endNanos - startNanos);
45+
profile = new QueryProfile(totalMillis, snapshot);
46+
return profile;
47+
}
48+
49+
private double roundToMillis(long nanos) {
50+
return Math.round((nanos / 1_000_000.0d) * 100.0d) / 100.0d;
51+
}
52+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
/** Named metrics used by query profiling. */
9+
public enum MetricName {
10+
ANALYZE,
11+
OPTIMIZE,
12+
EXECUTE,
13+
FORMAT
14+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
import java.util.Objects;
9+
10+
/** Disabled profiling context. */
11+
public final class NoopProfileContext implements ProfileContext {
12+
13+
public static final NoopProfileContext INSTANCE = new NoopProfileContext();
14+
15+
private NoopProfileContext() {}
16+
17+
/** {@inheritDoc} */
18+
@Override
19+
public ProfileMetric getOrCreateMetric(MetricName name) {
20+
Objects.requireNonNull(name, "name");
21+
return NoopProfileMetric.INSTANCE;
22+
}
23+
24+
/** {@inheritDoc} */
25+
@Override
26+
public QueryProfile finish() {
27+
return null;
28+
}
29+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
/** No-op metric implementation. */
9+
final class NoopProfileMetric implements ProfileMetric {
10+
11+
static final NoopProfileMetric INSTANCE = new NoopProfileMetric();
12+
13+
private NoopProfileMetric() {}
14+
15+
/** {@inheritDoc} */
16+
@Override
17+
public String name() {
18+
return "";
19+
}
20+
21+
/** {@inheritDoc} */
22+
@Override
23+
public long value() {
24+
return 0;
25+
}
26+
27+
/** {@inheritDoc} */
28+
@Override
29+
public void add(long delta) {}
30+
31+
/** {@inheritDoc} */
32+
@Override
33+
public void set(long value) {}
34+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
/** Context for collecting profiling metrics during query execution. */
9+
public interface ProfileContext {
10+
/**
11+
* Obtain or create a metric with the provided name.
12+
*
13+
* @param name fully qualified metric name
14+
* @return metric instance
15+
*/
16+
ProfileMetric getOrCreateMetric(MetricName name);
17+
18+
/**
19+
* Finalize profiling and return a snapshot.
20+
*
21+
* @return immutable query profile snapshot
22+
*/
23+
QueryProfile finish();
24+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.monitor.profile;
7+
8+
/** Metric for query profiling. */
9+
public interface ProfileMetric {
10+
/**
11+
* @return metric name.
12+
*/
13+
String name();
14+
15+
/**
16+
* @return current metric value.
17+
*/
18+
long value();
19+
20+
/**
21+
* Increment the metric by the given delta.
22+
*
23+
* @param delta amount to add
24+
*/
25+
void add(long delta);
26+
27+
/**
28+
* Set the metric to the provided value.
29+
*
30+
* @param value new metric value
31+
*/
32+
void set(long value);
33+
}

0 commit comments

Comments
 (0)