Skip to content

Commit a4d156e

Browse files
committed
[review-ready] highligting with ppl api pass-through design
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent caf435f commit a4d156e

11 files changed

Lines changed: 143 additions & 72 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,26 @@ public class CalcitePlanContext {
4545
private static final ThreadLocal<Boolean> legacyPreferredFlag =
4646
ThreadLocal.withInitial(() -> true);
4747

48+
/**
49+
* Thread-local highlight configuration from the PPL request body. Set by PPLService before query
50+
* execution and read by CalciteEnumerableIndexScan when building the OpenSearch request. The map
51+
* represents the highlight JSON object (fields, pre_tags, post_tags, fragment_size) that the
52+
* caller provides and the backend forwards as-is to OpenSearch.
53+
*/
54+
private static final ThreadLocal<Map<String, Object>> highlightConfig = new ThreadLocal<>();
55+
56+
public static void setHighlightConfig(Map<String, Object> config) {
57+
highlightConfig.set(config);
58+
}
59+
60+
public static Map<String, Object> getHighlightConfig() {
61+
return highlightConfig.get();
62+
}
63+
64+
public static void clearHighlightConfig() {
65+
highlightConfig.remove();
66+
}
67+
4868
@Getter @Setter private boolean isResolvingJoinCondition = false;
4969
@Getter @Setter private boolean isResolvingSubquery = false;
5070
@Getter @Setter private boolean inCoalesceFunction = false;

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,6 @@ public RelNode visitSearch(Search node, CalcitePlanContext context) {
228228
// Visit the Relation child to get the scan
229229
node.getChild().get(0).accept(this, context);
230230

231-
// Mark the scan as originating from a search command so that the optimizer
232-
// can scope auto-highlight injection to search queries only.
233-
PPLHintUtils.markSearchCommand(context.relBuilder);
234-
235231
// Create query_string function
236232
Function queryStringFunc =
237233
AstDSL.function(

core/src/main/java/org/opensearch/sql/calcite/utils/PPLHintUtils.java

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,10 @@
66
package org.opensearch.sql.calcite.utils;
77

88
import com.google.common.base.Suppliers;
9-
import java.util.List;
109
import java.util.function.Supplier;
1110
import lombok.experimental.UtilityClass;
12-
import org.apache.calcite.rel.RelNode;
1311
import org.apache.calcite.rel.core.Aggregate;
14-
import org.apache.calcite.rel.core.TableScan;
1512
import org.apache.calcite.rel.hint.HintStrategyTable;
16-
import org.apache.calcite.rel.hint.Hintable;
1713
import org.apache.calcite.rel.hint.RelHint;
1814
import org.apache.calcite.rel.logical.LogicalAggregate;
1915
import org.apache.calcite.tools.RelBuilder;
@@ -23,8 +19,6 @@ public class PPLHintUtils {
2319
private static final String HINT_AGG_ARGUMENTS = "AGG_ARGS";
2420
private static final String KEY_IGNORE_NULL_BUCKET = "ignoreNullBucket";
2521
private static final String KEY_HAS_NESTED_AGG_CALL = "hasNestedAggCall";
26-
public static final String HINT_SEARCH_COMMAND = "SEARCH_COMMAND";
27-
2822
private static final Supplier<HintStrategyTable> HINT_STRATEGY_TABLE =
2923
Suppliers.memoize(
3024
() ->
@@ -34,7 +28,6 @@ public class PPLHintUtils {
3428
(hint, rel) -> {
3529
return rel instanceof LogicalAggregate;
3630
})
37-
.hintStrategy(HINT_SEARCH_COMMAND, (hint, rel) -> rel instanceof TableScan)
3831
.build());
3932

4033
/**
@@ -86,34 +79,4 @@ public static boolean hasNestedAggCall(Aggregate aggregate) {
8679
.getOrDefault(KEY_HAS_NESTED_AGG_CALL, "false")
8780
.equals("true"));
8881
}
89-
90-
/**
91-
* Mark a scan node as originating from a PPL search command. The scan node may be on top of the
92-
* relBuilder stack directly, or wrapped in a Project (due to alias field wrapping). This hint is
93-
* used to scope auto-highlight injection to search command queries only.
94-
*/
95-
public static void markSearchCommand(RelBuilder relBuilder) {
96-
final RelHint hint = RelHint.builder(HINT_SEARCH_COMMAND).build();
97-
RelNode top = relBuilder.peek();
98-
if (top instanceof Hintable) {
99-
// Scan is directly on top of the stack
100-
relBuilder.hints(hint);
101-
} else if (top instanceof org.apache.calcite.rel.core.Project proj) {
102-
RelNode input = proj.getInput();
103-
if (input instanceof Hintable hintable) {
104-
RelNode hintedInput = hintable.attachHints(List.of(hint));
105-
RelNode newProject = proj.copy(proj.getTraitSet(), List.of(hintedInput));
106-
relBuilder.build(); // pop old project
107-
relBuilder.push(newProject);
108-
}
109-
}
110-
if (relBuilder.getCluster().getHintStrategies() == HintStrategyTable.EMPTY) {
111-
relBuilder.getCluster().setHintStrategies(HINT_STRATEGY_TABLE.get());
112-
}
113-
}
114-
115-
/** Return true if the scan has the SEARCH_COMMAND hint. */
116-
public static boolean isSearchCommand(TableScan scan) {
117-
return scan.getHints().stream().anyMatch(hint -> hint.hintName.equals(HINT_SEARCH_COMMAND));
118-
}
11982
}

core/src/main/java/org/opensearch/sql/executor/execution/AbstractPlan.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
package org.opensearch.sql.executor.execution;
77

8+
import java.util.Map;
89
import lombok.Getter;
910
import lombok.RequiredArgsConstructor;
11+
import lombok.Setter;
1012
import org.opensearch.sql.ast.statement.ExplainMode;
1113
import org.opensearch.sql.common.response.ResponseListener;
1214
import org.opensearch.sql.executor.ExecutionEngine;
@@ -22,6 +24,14 @@ public abstract class AbstractPlan {
2224

2325
@Getter protected final QueryType queryType;
2426

27+
/**
28+
* Highlight configuration from the PPL request body. Set by PPLService before submitting the plan
29+
* to the query manager. The plan carries this config across the thread boundary (REST handler
30+
* thread → sql-worker thread), and the worker thread sets it as a ThreadLocal before Calcite
31+
* planning and execution begin.
32+
*/
33+
@Getter @Setter private Map<String, Object> highlightConfig;
34+
2535
/** Start query execution. */
2636
public abstract void execute();
2737

core/src/main/java/org/opensearch/sql/executor/execution/ExplainPlan.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
package org.opensearch.sql.executor.execution;
77

8+
import java.util.Map;
89
import org.opensearch.sql.ast.statement.ExplainMode;
10+
import org.opensearch.sql.calcite.CalcitePlanContext;
911
import org.opensearch.sql.common.response.ResponseListener;
1012
import org.opensearch.sql.executor.ExecutionEngine;
1113
import org.opensearch.sql.executor.QueryId;
@@ -34,7 +36,19 @@ public ExplainPlan(
3436

3537
@Override
3638
public void execute() {
37-
plan.explain(explainListener, mode);
39+
setHighlightThreadLocal();
40+
try {
41+
plan.explain(explainListener, mode);
42+
} finally {
43+
CalcitePlanContext.clearHighlightConfig();
44+
}
45+
}
46+
47+
private void setHighlightThreadLocal() {
48+
Map<String, Object> config = getHighlightConfig();
49+
if (config != null) {
50+
CalcitePlanContext.setHighlightConfig(config);
51+
}
3852
}
3953

4054
@Override

core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55

66
package org.opensearch.sql.executor.execution;
77

8+
import java.util.Map;
89
import java.util.Optional;
910
import org.apache.commons.lang3.NotImplementedException;
1011
import org.opensearch.sql.ast.statement.ExplainMode;
1112
import org.opensearch.sql.ast.tree.Paginate;
1213
import org.opensearch.sql.ast.tree.UnresolvedPlan;
14+
import org.opensearch.sql.calcite.CalcitePlanContext;
1315
import org.opensearch.sql.common.response.ResponseListener;
1416
import org.opensearch.sql.executor.ExecutionEngine;
1517
import org.opensearch.sql.executor.QueryId;
@@ -60,10 +62,15 @@ public QueryPlan(
6062

6163
@Override
6264
public void execute() {
63-
if (pageSize.isPresent()) {
64-
queryService.execute(new Paginate(pageSize.get(), plan), getQueryType(), listener);
65-
} else {
66-
queryService.execute(plan, getQueryType(), listener);
65+
setHighlightThreadLocal();
66+
try {
67+
if (pageSize.isPresent()) {
68+
queryService.execute(new Paginate(pageSize.get(), plan), getQueryType(), listener);
69+
} else {
70+
queryService.execute(plan, getQueryType(), listener);
71+
}
72+
} finally {
73+
CalcitePlanContext.clearHighlightConfig();
6774
}
6875
}
6976

@@ -78,4 +85,11 @@ public void explain(
7885
queryService.explain(plan, getQueryType(), listener, mode);
7986
}
8087
}
88+
89+
private void setHighlightThreadLocal() {
90+
Map<String, Object> config = getHighlightConfig();
91+
if (config != null) {
92+
CalcitePlanContext.setHighlightConfig(config);
93+
}
94+
}
8195
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,18 @@
4545
import org.apache.logging.log4j.LogManager;
4646
import org.apache.logging.log4j.Logger;
4747
import org.checkerframework.checker.nullness.qual.Nullable;
48+
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
4849
import org.opensearch.search.sort.ScoreSortBuilder;
4950
import org.opensearch.search.sort.ScriptSortBuilder.ScriptSortType;
5051
import org.opensearch.search.sort.SortBuilder;
5152
import org.opensearch.search.sort.SortBuilders;
5253
import org.opensearch.search.sort.SortOrder;
54+
import org.opensearch.sql.calcite.CalcitePlanContext;
5355
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
5456
import org.opensearch.sql.common.setting.Settings.Key;
5557
import org.opensearch.sql.data.type.ExprType;
5658
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
59+
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
5760
import org.opensearch.sql.opensearch.request.PredicateAnalyzer;
5861
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
5962
import org.opensearch.sql.opensearch.storage.scan.context.AbstractAction;
@@ -106,13 +109,55 @@ public RelDataType deriveRowType() {
106109
public RelWriter explainTerms(RelWriter pw) {
107110
String explainString = String.valueOf(pushDownContext);
108111
if (pw instanceof RelWriterImpl) {
109-
// Only add request builder to the explain plan
110-
explainString += ", " + pushDownContext.createRequestBuilder();
112+
OpenSearchRequestBuilder requestBuilder = pushDownContext.createRequestBuilder();
113+
applyHighlightConfig(requestBuilder);
114+
explainString += ", " + requestBuilder;
111115
}
112116
return super.explainTerms(pw)
113117
.itemIf("PushDownContext", explainString, !pushDownContext.isEmpty());
114118
}
115119

120+
/**
121+
* Apply highlight configuration from the ThreadLocal to the OpenSearch request builder. The
122+
* highlight config is set on a ThreadLocal by the plan's execute() method (on the worker thread)
123+
* and forwarded as-is to OpenSearch.
124+
*
125+
* @param requestBuilder the OpenSearch request builder to attach the highlight clause to
126+
*/
127+
@SuppressWarnings("unchecked")
128+
protected static void applyHighlightConfig(OpenSearchRequestBuilder requestBuilder) {
129+
Map<String, Object> config = CalcitePlanContext.getHighlightConfig();
130+
if (config == null) {
131+
return;
132+
}
133+
HighlightBuilder highlightBuilder = new HighlightBuilder();
134+
Object fieldsObj = config.get("fields");
135+
if (fieldsObj instanceof Map) {
136+
Map<String, Object> fields = (Map<String, Object>) fieldsObj;
137+
for (String fieldName : fields.keySet()) {
138+
highlightBuilder.field(new HighlightBuilder.Field(fieldName));
139+
}
140+
}
141+
Object preTagsObj = config.get("pre_tags");
142+
if (preTagsObj instanceof List) {
143+
List<String> preTags = (List<String>) preTagsObj;
144+
highlightBuilder.preTags(preTags.toArray(new String[0]));
145+
}
146+
Object postTagsObj = config.get("post_tags");
147+
if (postTagsObj instanceof List) {
148+
List<String> postTags = (List<String>) postTagsObj;
149+
highlightBuilder.postTags(postTags.toArray(new String[0]));
150+
}
151+
Object fragmentSizeObj = config.get("fragment_size");
152+
if (fragmentSizeObj instanceof Number) {
153+
int fragmentSize = ((Number) fragmentSizeObj).intValue();
154+
for (HighlightBuilder.Field field : highlightBuilder.fields()) {
155+
field.fragmentSize(fragmentSize);
156+
}
157+
}
158+
requestBuilder.getSourceBuilder().highlighter(highlightBuilder);
159+
}
160+
116161
protected Integer getQuerySizeLimit() {
117162
return osIndex.getSettings().getSettingValue(Key.QUERY_SIZE_LIMIT);
118163
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableIndexScan.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
119119
@Override
120120
public Enumerator<Object> enumerator() {
121121
OpenSearchRequestBuilder requestBuilder = pushDownContext.createRequestBuilder();
122+
applyHighlightConfig(requestBuilder);
122123
return new OpenSearchIndexEnumerator(
123124
osIndex.getClient(),
124125
getRowType().getFieldNames(),

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.logging.log4j.Logger;
4141
import org.opensearch.search.aggregations.AggregationBuilder;
4242
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
43-
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
4443
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
4544
import org.opensearch.sql.calcite.utils.PPLHintUtils;
4645
import org.opensearch.sql.common.setting.Settings;
@@ -159,27 +158,6 @@ public AbstractRelNode pushDownFilter(Filter filter) {
159158
(OSRequestBuilderAction)
160159
requestBuilder -> requestBuilder.pushDownFilterForCalcite(queryExpression.builder()));
161160

162-
// Auto-inject wildcard highlight for PPL search command result highlighting.
163-
// Only adds highlight when the scan is marked with a SEARCH_COMMAND hint
164-
// (set by CalciteRelNodeVisitor.visitSearch), scoping it to the search command only.
165-
// Uses OSD custom tags so the frontend getHighlightHtml() can convert to <mark>.
166-
if (PPLHintUtils.isSearchCommand(this)) {
167-
newScan.pushDownContext.add(
168-
PushDownType.HIGHLIGHT,
169-
"auto_highlight",
170-
(OSRequestBuilderAction)
171-
requestBuilder -> {
172-
if (requestBuilder.getSourceBuilder().highlighter() == null) {
173-
HighlightBuilder highlightBuilder =
174-
new HighlightBuilder()
175-
.field(new HighlightBuilder.Field("*").numOfFragments(0))
176-
.preTags("@opensearch-dashboards-highlighted-field@")
177-
.postTags("@/opensearch-dashboards-highlighted-field@");
178-
requestBuilder.getSourceBuilder().highlighter(highlightBuilder);
179-
}
180-
});
181-
}
182-
183161
// If the query expression is partial, we need to replace the input of the filter with the
184162
// partial pushed scan and the filter condition with non-pushed-down conditions.
185163
if (queryExpression.isPartial()) {

ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import lombok.extern.log4j.Log4j2;
1212
import org.antlr.v4.runtime.tree.ParseTree;
13+
import org.json.JSONObject;
1314
import org.opensearch.sql.ast.statement.Statement;
1415
import org.opensearch.sql.common.response.ResponseListener;
1516
import org.opensearch.sql.common.setting.Settings;
@@ -64,7 +65,9 @@ public void execute(
6465
ResponseListener<QueryResponse> queryListener,
6566
ResponseListener<ExplainResponse> explainListener) {
6667
try {
67-
queryManager.submit(plan(request, queryListener, explainListener));
68+
AbstractPlan plan = plan(request, queryListener, explainListener);
69+
setHighlightOnPlan(plan, request);
70+
queryManager.submit(plan);
6871
} catch (Exception e) {
6972
queryListener.onFailure(e);
7073
}
@@ -79,12 +82,25 @@ public void execute(
7982
*/
8083
public void explain(PPLQueryRequest request, ResponseListener<ExplainResponse> listener) {
8184
try {
82-
queryManager.submit(plan(request, NO_CONSUMER_RESPONSE_LISTENER, listener));
85+
AbstractPlan plan = plan(request, NO_CONSUMER_RESPONSE_LISTENER, listener);
86+
setHighlightOnPlan(plan, request);
87+
queryManager.submit(plan);
8388
} catch (Exception e) {
8489
listener.onFailure(e);
8590
}
8691
}
8792

93+
/**
94+
* Set highlight configuration on the plan so it can be carried across the thread boundary. The
95+
* plan's execute() method will set the ThreadLocal on the worker thread.
96+
*/
97+
private void setHighlightOnPlan(AbstractPlan plan, PPLQueryRequest request) {
98+
JSONObject highlight = request.getHighlight();
99+
if (highlight != null) {
100+
plan.setHighlightConfig(highlight.toMap());
101+
}
102+
}
103+
88104
private AbstractPlan plan(
89105
PPLQueryRequest request,
90106
ResponseListener<QueryResponse> queryListener,

0 commit comments

Comments
 (0)