Skip to content

Commit bde93ca

Browse files
committed
Calcite PPL search result highlighting
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent 019cd26 commit bde93ca

14 files changed

Lines changed: 307 additions & 15 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ private RelBuilder scan(RelOptTable tableSchema, CalcitePlanContext context) {
227227
public RelNode visitSearch(Search node, CalcitePlanContext context) {
228228
// Visit the Relation child to get the scan
229229
node.getChild().get(0).accept(this, context);
230+
231+
// Mark the scan as originating from a search command so that the optimizer
232+
// can scope auto-highlight injection to search queries only.
233+
PPLHintUtils.markSearchCommand(context.relBuilder);
234+
230235
// Create query_string function
231236
Function queryStringFunc =
232237
AstDSL.function(

core/src/main/java/org/opensearch/sql/calcite/utils/PPLHintUtils.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
package org.opensearch.sql.calcite.utils;
77

88
import com.google.common.base.Suppliers;
9+
import java.util.List;
910
import java.util.function.Supplier;
1011
import lombok.experimental.UtilityClass;
12+
import org.apache.calcite.rel.RelNode;
1113
import org.apache.calcite.rel.core.Aggregate;
14+
import org.apache.calcite.rel.core.TableScan;
1215
import org.apache.calcite.rel.hint.HintStrategyTable;
16+
import org.apache.calcite.rel.hint.Hintable;
1317
import org.apache.calcite.rel.hint.RelHint;
1418
import org.apache.calcite.rel.logical.LogicalAggregate;
1519
import org.apache.calcite.tools.RelBuilder;
@@ -19,6 +23,7 @@ public class PPLHintUtils {
1923
private static final String HINT_AGG_ARGUMENTS = "AGG_ARGS";
2024
private static final String KEY_IGNORE_NULL_BUCKET = "ignoreNullBucket";
2125
private static final String KEY_HAS_NESTED_AGG_CALL = "hasNestedAggCall";
26+
public static final String HINT_SEARCH_COMMAND = "SEARCH_COMMAND";
2227

2328
private static final Supplier<HintStrategyTable> HINT_STRATEGY_TABLE =
2429
Suppliers.memoize(
@@ -29,7 +34,7 @@ public class PPLHintUtils {
2934
(hint, rel) -> {
3035
return rel instanceof LogicalAggregate;
3136
})
32-
// add more here
37+
.hintStrategy(HINT_SEARCH_COMMAND, (hint, rel) -> rel instanceof TableScan)
3338
.build());
3439

3540
/**
@@ -81,4 +86,34 @@ public static boolean hasNestedAggCall(Aggregate aggregate) {
8186
.getOrDefault(KEY_HAS_NESTED_AGG_CALL, "false")
8287
.equals("true"));
8388
}
89+
90+
/**
91+
* Mark a scan node as originating from a PPL search command. The scan node may be on top of the
92+
* relBuilder stack directly, or wrapped in a Project (due to alias field wrapping). This hint is
93+
* used to scope auto-highlight injection to search command queries only.
94+
*/
95+
public static void markSearchCommand(RelBuilder relBuilder) {
96+
final RelHint hint = RelHint.builder(HINT_SEARCH_COMMAND).build();
97+
RelNode top = relBuilder.peek();
98+
if (top instanceof Hintable) {
99+
// Scan is directly on top of the stack
100+
relBuilder.hints(hint);
101+
} else if (top instanceof org.apache.calcite.rel.core.Project proj) {
102+
RelNode input = proj.getInput();
103+
if (input instanceof Hintable hintable) {
104+
RelNode hintedInput = hintable.attachHints(List.of(hint));
105+
RelNode newProject = proj.copy(proj.getTraitSet(), List.of(hintedInput));
106+
relBuilder.build(); // pop old project
107+
relBuilder.push(newProject);
108+
}
109+
}
110+
if (relBuilder.getCluster().getHintStrategies() == HintStrategyTable.EMPTY) {
111+
relBuilder.getCluster().setHintStrategies(HINT_STRATEGY_TABLE.get());
112+
}
113+
}
114+
115+
/** Return true if the scan has the SEARCH_COMMAND hint. */
116+
public static boolean isSearchCommand(TableScan scan) {
117+
return scan.getHints().stream().anyMatch(hint -> hint.hintName.equals(HINT_SEARCH_COMMAND));
118+
}
84119
}

core/src/main/java/org/opensearch/sql/expression/HighlightExpression.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
/** Highlight Expression. */
2424
@Getter
2525
public class HighlightExpression extends FunctionExpression {
26+
/** The field name used to store highlight data on ExprTupleValue rows. */
27+
public static final String HIGHLIGHT_FIELD = "_highlight";
28+
2629
private final Expression highlightField;
2730
private final ExprType type;
2831

@@ -46,7 +49,7 @@ public HighlightExpression(Expression highlightField) {
4649
*/
4750
@Override
4851
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
49-
String refName = "_highlight";
52+
String refName = HIGHLIGHT_FIELD;
5053
// Not a wilcard expression
5154
if (this.type == ExprCoreType.ARRAY) {
5255
refName += "." + StringUtils.unquoteText(getHighlightField().toString());

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.opensearch.sql.executor.ExecutionEngine.Schema.Column;
5454
import org.opensearch.sql.executor.Explain;
5555
import org.opensearch.sql.executor.pagination.PlanSerializer;
56+
import org.opensearch.sql.expression.HighlightExpression;
5657
import org.opensearch.sql.expression.function.BuiltinFunctionName;
5758
import org.opensearch.sql.expression.function.PPLFuncImpTable;
5859
import org.opensearch.sql.monitor.profile.MetricName;
@@ -63,6 +64,7 @@
6364
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
6465
import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction;
6566
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
67+
import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexEnumerator;
6668
import org.opensearch.sql.planner.physical.PhysicalPlan;
6769
import org.opensearch.sql.storage.TableScanOperator;
6870
import org.opensearch.transport.client.node.NodeClient;
@@ -211,6 +213,7 @@ public void execute(
211213
client.schedule(
212214
() -> {
213215
try (PreparedStatement statement = OpenSearchRelRunners.run(context, rel)) {
216+
OpenSearchIndexEnumerator.clearCollectedHighlights();
214217
ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
215218
long execTime = System.nanoTime();
216219
ResultSet result = statement.executeQuery();
@@ -279,6 +282,21 @@ private QueryResponse buildResultSet(
279282
values.add(ExprTupleValue.fromExprValueMap(row));
280283
}
281284

285+
// Merge highlight data collected by the enumerator back into ExprTupleValues.
286+
// The Calcite row pipeline only carries schema column values, so highlight metadata
287+
// is collected as a side channel in OpenSearchIndexEnumerator and merged here.
288+
List<ExprValue> collectedHighlights =
289+
OpenSearchIndexEnumerator.getAndClearCollectedHighlights();
290+
for (int i = 0; i < Math.min(values.size(), collectedHighlights.size()); i++) {
291+
ExprValue hl = collectedHighlights.get(i);
292+
if (hl != null) {
293+
Map<String, ExprValue> rowWithHighlight =
294+
new LinkedHashMap<>(ExprValueUtils.getTupleValue(values.get(i)));
295+
rowWithHighlight.put(HighlightExpression.HIGHLIGHT_FIELD, hl);
296+
values.set(i, ExprTupleValue.fromExprValueMap(rowWithHighlight));
297+
}
298+
}
299+
282300
List<Column> columns = new ArrayList<>(metaData.getColumnCount());
283301
for (int i = 1; i <= columnCount; ++i) {
284302
String columnName = metaData.getColumnName(i);

opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.sql.opensearch.response;
77

8+
import static org.opensearch.sql.expression.HighlightExpression.HIGHLIGHT_FIELD;
89
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATAFIELD_TYPE_MAP;
910
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;
1011
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_INDEX;
@@ -200,7 +201,7 @@ private void addHighlightsToBuilder(
200201
.map(Text::toString)
201202
.collect(Collectors.toList())));
202203
}
203-
builder.put("_highlight", ExprTupleValue.fromExprValueMap(hlBuilder.build()));
204+
builder.put(HIGHLIGHT_FIELD, ExprTupleValue.fromExprValueMap(hlBuilder.build()));
204205
}
205206
}
206207

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public double estimateRowCount(RelMetadataQuery mq) {
130130
(rowCount, operation) ->
131131
switch (operation.type()) {
132132
case AGGREGATION -> mq.getRowCount((RelNode) operation.digest());
133-
case PROJECT, SORT, SORT_EXPR -> rowCount;
133+
case PROJECT, SORT, SORT_EXPR, HIGHLIGHT -> rowCount;
134134
case SORT_AGG_METRICS ->
135135
NumberUtil.min(rowCount, osIndex.getQueryBucketSize().doubleValue());
136136
// Refer the org.apache.calcite.rel.metadata.RelMdRowCount
@@ -176,8 +176,8 @@ public double estimateRowCount(RelMetadataQuery mq) {
176176
dRows = mq.getRowCount((RelNode) operation.digest());
177177
dCpu += dRows * getAggMultiplier(operation);
178178
}
179-
// Ignored Project in cost accumulation, but it will affect the external cost
180-
case PROJECT -> {}
179+
// Ignored Project and Highlight in cost accumulation
180+
case PROJECT, HIGHLIGHT -> {}
181181
case SORT -> dCpu += dRows;
182182
case SORT_AGG_METRICS -> {
183183
dRows = dRows * .9 / 10; // *.9 because always bucket IS_NOT_NULL
@@ -266,6 +266,11 @@ public Map<String, String> getAliasMapping() {
266266
return osIndex.getAliasMapping();
267267
}
268268

269+
@Override
270+
public RelNode withHints(List<RelHint> hintList) {
271+
return buildScan(getCluster(), traitSet, hintList, table, osIndex, schema, pushDownContext);
272+
}
273+
269274
public abstract AbstractCalciteIndexScan copy();
270275

271276
protected List<String> getCollationNames(List<RelFieldCollation> collations) {

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.logging.log4j.Logger;
4141
import org.opensearch.search.aggregations.AggregationBuilder;
4242
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
43+
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
4344
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
4445
import org.opensearch.sql.calcite.utils.PPLHintUtils;
4546
import org.opensearch.sql.common.setting.Settings;
@@ -158,6 +159,27 @@ public AbstractRelNode pushDownFilter(Filter filter) {
158159
(OSRequestBuilderAction)
159160
requestBuilder -> requestBuilder.pushDownFilterForCalcite(queryExpression.builder()));
160161

162+
// Auto-inject wildcard highlight for PPL search command result highlighting.
163+
// Only adds highlight when the scan is marked with a SEARCH_COMMAND hint
164+
// (set by CalciteRelNodeVisitor.visitSearch), scoping it to the search command only.
165+
// Uses OSD custom tags so the frontend getHighlightHtml() can convert to <mark>.
166+
if (PPLHintUtils.isSearchCommand(this)) {
167+
newScan.pushDownContext.add(
168+
PushDownType.HIGHLIGHT,
169+
"auto_highlight",
170+
(OSRequestBuilderAction)
171+
requestBuilder -> {
172+
if (requestBuilder.getSourceBuilder().highlighter() == null) {
173+
HighlightBuilder highlightBuilder =
174+
new HighlightBuilder()
175+
.field(new HighlightBuilder.Field("*").numOfFragments(0))
176+
.preTags("@opensearch-dashboards-highlighted-field@")
177+
.postTags("@/opensearch-dashboards-highlighted-field@");
178+
requestBuilder.getSourceBuilder().highlighter(highlightBuilder);
179+
}
180+
});
181+
}
182+
161183
// If the query expression is partial, we need to replace the input of the filter with the
162184
// partial pushed scan and the filter condition with non-pushed-down conditions.
163185
if (queryExpression.isPartial()) {

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55

66
package org.opensearch.sql.opensearch.storage.scan;
77

8+
import static org.opensearch.sql.expression.HighlightExpression.HIGHLIGHT_FIELD;
9+
10+
import java.util.ArrayList;
811
import java.util.Collections;
912
import java.util.Iterator;
1013
import java.util.List;
14+
import java.util.Map;
1115
import lombok.EqualsAndHashCode;
1216
import lombok.ToString;
1317
import org.apache.calcite.linq4j.Enumerator;
@@ -27,6 +31,27 @@
2731
*/
2832
public class OpenSearchIndexEnumerator implements Enumerator<Object> {
2933

34+
/**
35+
* Thread-local collector for highlight data. Since the Calcite row pipeline only carries schema
36+
* column values, highlight metadata from OpenSearch hits is collected here as a side channel.
37+
* After execution, {@link #getAndClearCollectedHighlights()} retrieves the collected data so it
38+
* can be merged back into the ExprTupleValues for the JDBC response.
39+
*/
40+
private static final ThreadLocal<List<ExprValue>> COLLECTED_HIGHLIGHTS =
41+
ThreadLocal.withInitial(ArrayList::new);
42+
43+
/** Retrieve collected highlights and clear the ThreadLocal. */
44+
public static List<ExprValue> getAndClearCollectedHighlights() {
45+
List<ExprValue> result = new ArrayList<>(COLLECTED_HIGHLIGHTS.get());
46+
COLLECTED_HIGHLIGHTS.get().clear();
47+
return result;
48+
}
49+
50+
/** Clear collected highlights (call before starting a new execution). */
51+
public static void clearCollectedHighlights() {
52+
COLLECTED_HIGHLIGHTS.get().clear();
53+
}
54+
3055
/** OpenSearch client. */
3156
private final OpenSearchClient client;
3257

@@ -111,6 +136,12 @@ public boolean moveNext() {
111136
}
112137
if (iterator.hasNext()) {
113138
current = iterator.next();
139+
// Collect highlight data as a side channel for the JDBC response.
140+
// The Calcite row (from current()) only carries schema column values,
141+
// so _highlight must be preserved separately.
142+
Map<String, ExprValue> tuple = ExprValueUtils.getTupleValue(current);
143+
ExprValue hl = tuple.get(HIGHLIGHT_FIELD);
144+
COLLECTED_HIGHLIGHTS.get().add(hl != null && !hl.isMissing() ? hl : null);
114145
queryCount++;
115146
return true;
116147
} else {
@@ -123,6 +154,7 @@ public void reset() {
123154
bgScanner.reset(request);
124155
iterator = bgScanner.fetchNextBatch(request).iterator();
125156
queryCount = 0;
157+
COLLECTED_HIGHLIGHTS.get().clear();
126158
}
127159

128160
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public enum PushDownType {
1515
SCRIPT, // script in predicate
1616
SORT_AGG_METRICS, // convert composite aggregate to terms or multi-terms bucket aggregate
1717
RARE_TOP, // convert composite aggregate to nested aggregate
18-
SORT_EXPR
19-
// HIGHLIGHT,
18+
SORT_EXPR,
19+
HIGHLIGHT
2020
// NESTED
2121
}

protocol/src/main/java/org/opensearch/sql/protocol/response/QueryResult.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@
55

66
package org.opensearch.sql.protocol.response;
77

8+
import static org.opensearch.sql.expression.HighlightExpression.HIGHLIGHT_FIELD;
9+
810
import java.util.Collection;
911
import java.util.Iterator;
1012
import java.util.LinkedHashMap;
13+
import java.util.List;
1114
import java.util.Locale;
1215
import java.util.Map;
16+
import java.util.stream.Collectors;
1317
import lombok.Getter;
1418
import org.opensearch.sql.data.model.ExprValue;
1519
import org.opensearch.sql.data.model.ExprValueUtils;
@@ -82,19 +86,48 @@ public Map<String, String> columnNameTypes() {
8286

8387
@Override
8488
public Iterator<Object[]> iterator() {
85-
// Any chance to avoid copy for json response generation?
8689
return exprValues.stream()
8790
.map(ExprValueUtils::getTupleValue)
88-
.map(Map::values)
89-
.map(this::convertExprValuesToValues)
91+
.map(
92+
tuple ->
93+
tuple.entrySet().stream()
94+
.filter(e -> !HIGHLIGHT_FIELD.equals(e.getKey()))
95+
.map(e -> e.getValue().value())
96+
.toArray(Object[]::new))
9097
.iterator();
9198
}
9299

93-
private String getColumnName(Column column) {
94-
return (column.getAlias() != null) ? column.getAlias() : column.getName();
100+
/**
101+
* Extract highlight data from each result row. Each row may contain a {@code _highlight} field
102+
* added by {@code OpenSearchResponse.addHighlightsToBuilder()} and preserved through projection.
103+
* Returns a list parallel to datarows where each entry is either a map of field name to highlight
104+
* fragments, or null if no highlight data exists for that row.
105+
*
106+
* @return list of highlight maps, one per row
107+
*/
108+
public List<Map<String, Object>> highlights() {
109+
return exprValues.stream()
110+
.map(ExprValueUtils::getTupleValue)
111+
.map(
112+
tuple -> {
113+
ExprValue hl = tuple.get(HIGHLIGHT_FIELD);
114+
if (hl == null || hl.isMissing()) {
115+
return null;
116+
}
117+
Map<String, Object> hlMap = new LinkedHashMap<>();
118+
for (Map.Entry<String, ExprValue> entry : hl.tupleValue().entrySet()) {
119+
hlMap.put(
120+
entry.getKey(),
121+
entry.getValue().collectionValue().stream()
122+
.map(ExprValue::stringValue)
123+
.collect(Collectors.toList()));
124+
}
125+
return (Map<String, Object>) hlMap;
126+
})
127+
.collect(Collectors.toList());
95128
}
96129

97-
private Object[] convertExprValuesToValues(Collection<ExprValue> exprValues) {
98-
return exprValues.stream().map(ExprValue::value).toArray(Object[]::new);
130+
private String getColumnName(Column column) {
131+
return (column.getAlias() != null) ? column.getAlias() : column.getName();
99132
}
100133
}

0 commit comments

Comments
 (0)