Skip to content

Commit 2c46164

Browse files
[Backport 2.19-dev] Feature/mvcombine (#5025) (#5096)
* Feature/mvcombine (#5025) * MvCombine Command Feature Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * MvCombine Command Feature Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Add doctests to MvCombine Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * spotlesscheck apply Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * spotlesscheck apply Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * spotlesscheck apply Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * spotlessapply Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address coderrabbit comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address coderrabbit comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address coderrabbit comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address coderrabbit comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address coderrabbit comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address coderrabbit comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Add mvcombine to index.md Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Remove the nomv related implementation as that command is still not yet implemented Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Remove the nomv related implementation as that command is still not yet implemented Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Remove the nomv related implementation as that command is still not yet implemented Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Remove the nomv related implementation as that command is still not yet implemented Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * complete the checklist from ppl-commands.md Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * spotlessApply Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Add visitMvCombine method to the FieldResolutionVisitor Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Apply spotlesscheck Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Add changes to exclude the metadata fields and remove the CAST logic Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address CrossClusterSearchIT comment Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address CrossClusterSearchIT comment Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address CrossClusterSearchIT comment Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Coderrabbit issues Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Coderrabbit issues Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Coderrabbit issues Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Coderrabbit issues Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> * Address comments Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> --------- Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> Signed-off-by: Srikanth Padakanti <srikanth29.9@gmail.com> Co-authored-by: Srikanth Padakanti <srikanth_padakanti@apple.com> (cherry picked from commit 92e73ea) * Fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * Fix compile error Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> Signed-off-by: Srikanth Padakanti <srikanth29.9@gmail.com> Signed-off-by: Lantao Jin <ltjin@amazon.com> Co-authored-by: Srikanth Padakanti <srikanth29.9@gmail.com> Co-authored-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent a23b553 commit 2c46164

30 files changed

Lines changed: 1125 additions & 3 deletions

File tree

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.opensearch.sql.ast.tree.Lookup;
8181
import org.opensearch.sql.ast.tree.ML;
8282
import org.opensearch.sql.ast.tree.Multisearch;
83+
import org.opensearch.sql.ast.tree.MvCombine;
8384
import org.opensearch.sql.ast.tree.Paginate;
8485
import org.opensearch.sql.ast.tree.Parse;
8586
import org.opensearch.sql.ast.tree.Patterns;
@@ -536,6 +537,11 @@ public LogicalPlan visitAddColTotals(AddColTotals node, AnalysisContext context)
536537
throw getOnlyForCalciteException("addcoltotals");
537538
}
538539

540+
@Override
541+
public LogicalPlan visitMvCombine(MvCombine node, AnalysisContext context) {
542+
throw getOnlyForCalciteException("mvcombine");
543+
}
544+
539545
/** Build {@link ParseExpression} to context and skip to child nodes. */
540546
@Override
541547
public LogicalPlan visitParse(Parse node, AnalysisContext context) {

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.opensearch.sql.ast.tree.Lookup;
6969
import org.opensearch.sql.ast.tree.ML;
7070
import org.opensearch.sql.ast.tree.Multisearch;
71+
import org.opensearch.sql.ast.tree.MvCombine;
7172
import org.opensearch.sql.ast.tree.Paginate;
7273
import org.opensearch.sql.ast.tree.Parse;
7374
import org.opensearch.sql.ast.tree.Patterns;
@@ -467,4 +468,8 @@ public T visitAddTotals(AddTotals node, C context) {
467468
public T visitAddColTotals(AddColTotals node, C context) {
468469
return visitChildren(node, context);
469470
}
471+
472+
public T visitMvCombine(MvCombine node, C context) {
473+
return visitChildren(node, context);
474+
}
470475
}

core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.sql.ast.tree.Join;
4545
import org.opensearch.sql.ast.tree.Lookup;
4646
import org.opensearch.sql.ast.tree.Multisearch;
47+
import org.opensearch.sql.ast.tree.MvCombine;
4748
import org.opensearch.sql.ast.tree.Parse;
4849
import org.opensearch.sql.ast.tree.Patterns;
4950
import org.opensearch.sql.ast.tree.Project;
@@ -634,6 +635,22 @@ public Node visitExpand(Expand node, FieldResolutionContext context) {
634635
return node;
635636
}
636637

638+
@Override
639+
public Node visitMvCombine(MvCombine node, FieldResolutionContext context) {
640+
Set<String> mvCombineFields = extractFieldsFromExpression(node.getField());
641+
642+
FieldResolutionResult current = context.getCurrentRequirements();
643+
644+
Set<String> regularFields = new HashSet<>(current.getRegularFields());
645+
regularFields.addAll(mvCombineFields);
646+
647+
context.pushRequirements(new FieldResolutionResult(regularFields, Set.of(ALL_FIELDS)));
648+
649+
visitChildren(node, context);
650+
context.popRequirements();
651+
return node;
652+
}
653+
637654
private Set<String> extractFieldsFromAggregation(UnresolvedExpression expr) {
638655
Set<String> fields = new HashSet<>();
639656
if (expr instanceof Alias) {

core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.opensearch.sql.ast.tree.Head;
6363
import org.opensearch.sql.ast.tree.Limit;
6464
import org.opensearch.sql.ast.tree.MinSpanBin;
65+
import org.opensearch.sql.ast.tree.MvCombine;
6566
import org.opensearch.sql.ast.tree.Parse;
6667
import org.opensearch.sql.ast.tree.Patterns;
6768
import org.opensearch.sql.ast.tree.Project;
@@ -468,6 +469,14 @@ public static List<Argument> defaultDedupArgs() {
468469
argument("consecutive", booleanLiteral(false)));
469470
}
470471

472+
public static MvCombine mvcombine(Field field) {
473+
return new MvCombine(field, null);
474+
}
475+
476+
public static MvCombine mvcombine(Field field, String delim) {
477+
return new MvCombine(field, delim);
478+
}
479+
471480
public static List<Argument> sortOptions() {
472481
return exprList(argument("desc", booleanLiteral(false)));
473482
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import javax.annotation.Nullable;
11+
import lombok.EqualsAndHashCode;
12+
import lombok.Getter;
13+
import lombok.ToString;
14+
import org.opensearch.sql.ast.AbstractNodeVisitor;
15+
import org.opensearch.sql.ast.expression.Field;
16+
17+
@Getter
18+
@ToString(callSuper = true)
19+
@EqualsAndHashCode(callSuper = false)
20+
public class MvCombine extends UnresolvedPlan {
21+
22+
private final Field field;
23+
private final String delim;
24+
@Nullable private UnresolvedPlan child;
25+
26+
public MvCombine(Field field, @Nullable String delim) {
27+
this.field = field;
28+
this.delim = (delim == null) ? " " : delim;
29+
}
30+
31+
public MvCombine attach(UnresolvedPlan child) {
32+
this.child = child;
33+
return this;
34+
}
35+
36+
@Override
37+
public List<UnresolvedPlan> getChild() {
38+
return child == null ? ImmutableList.of() : ImmutableList.of(child);
39+
}
40+
41+
@Override
42+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
43+
return nodeVisitor.visitMvCombine(this, context);
44+
}
45+
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.calcite.rex.RexVisitorImpl;
6464
import org.apache.calcite.rex.RexWindowBounds;
6565
import org.apache.calcite.sql.SqlKind;
66+
import org.apache.calcite.sql.fun.SqlLibraryOperators;
6667
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
6768
import org.apache.calcite.sql.fun.SqlTrimFunction;
6869
import org.apache.calcite.sql.type.ArraySqlType;
@@ -124,6 +125,7 @@
124125
import org.opensearch.sql.ast.tree.Lookup.OutputStrategy;
125126
import org.opensearch.sql.ast.tree.ML;
126127
import org.opensearch.sql.ast.tree.Multisearch;
128+
import org.opensearch.sql.ast.tree.MvCombine;
127129
import org.opensearch.sql.ast.tree.Paginate;
128130
import org.opensearch.sql.ast.tree.Parse;
129131
import org.opensearch.sql.ast.tree.Patterns;
@@ -3189,6 +3191,174 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) {
31893191
return context.relBuilder.peek();
31903192
}
31913193

3194+
/**
3195+
* mvcombine command visitor to collapse rows that are identical on all non-target fields.
3196+
*
3197+
* <p>Grouping semantics:
3198+
*
3199+
* <ul>
3200+
* <li>The target field is always excluded from the GROUP BY keys.
3201+
* <li>Metadata fields (for example {@code _id}, {@code _index}, {@code _score}) are excluded
3202+
* from GROUP BY keys <strong>unless</strong> they were explicitly projected earlier (for
3203+
* example, via {@code fields}).
3204+
* </ul>
3205+
*
3206+
* <p>The target field values are aggregated using {@code ARRAY_AGG}, with {@code NULL} values
3207+
* filtered out. The aggregation result replaces the original target column and produces an {@code
3208+
* ARRAY<T>} output.
3209+
*
3210+
* <p>The original output column order is preserved. Metadata fields are projected as typed {@code
3211+
* NULL} literals after aggregation only when they are not part of grouping (since they were
3212+
* skipped).
3213+
*
3214+
* @param node mvcombine command to be visited
3215+
* @param context CalcitePlanContext containing the RelBuilder and planning context
3216+
* @return RelNode representing collapsed records with the target combined into a multivalue array
3217+
* @throws SemanticCheckException if the mvcombine target is not a direct field reference
3218+
*/
3219+
@Override
3220+
public RelNode visitMvCombine(MvCombine node, CalcitePlanContext context) {
3221+
// 1) Lower the child plan first so the RelBuilder has the input schema on the stack.
3222+
visitChildren(node, context);
3223+
3224+
final RelBuilder relBuilder = context.relBuilder;
3225+
3226+
final RelNode input = relBuilder.peek();
3227+
final List<String> inputFieldNames = input.getRowType().getFieldNames();
3228+
final List<RelDataType> inputFieldTypes =
3229+
input.getRowType().getFieldList().stream().map(RelDataTypeField::getType).collect(Collectors.toList());
3230+
3231+
// If true, we should NOT auto-skip meta fields (because user explicitly projected them)
3232+
final boolean includeMetaFields = context.isProjectVisited();
3233+
3234+
// 2) Resolve the mvcombine target to an input column index (must be a direct field reference).
3235+
final Field targetField = node.getField();
3236+
final int targetIndex = resolveTargetIndex(targetField, context);
3237+
final String targetName = inputFieldNames.get(targetIndex);
3238+
3239+
// 3) Group by all non-target fields, skipping meta fields unless explicitly projected.
3240+
final List<RexNode> groupExprs =
3241+
buildGroupExpressionsExcludingTarget(
3242+
targetIndex, inputFieldNames, relBuilder, includeMetaFields);
3243+
3244+
// 4) Aggregate target values using ARRAY_AGG, filtering out NULLs.
3245+
performArrayAggAggregation(relBuilder, targetIndex, targetName, groupExprs);
3246+
3247+
// 5) Restore original output column order (ARRAY_AGG already returns ARRAY<T>).
3248+
restoreColumnOrderAfterArrayAgg(
3249+
relBuilder, inputFieldNames, inputFieldTypes, targetIndex, groupExprs, includeMetaFields);
3250+
3251+
return relBuilder.peek();
3252+
}
3253+
3254+
/** Resolves the mvcombine target expression to an input field index. */
3255+
private int resolveTargetIndex(Field targetField, CalcitePlanContext context) {
3256+
final RexNode targetRex;
3257+
try {
3258+
targetRex = rexVisitor.analyze(targetField, context);
3259+
} catch (IllegalArgumentException e) {
3260+
// Make missing-field behavior deterministic (and consistently mapped to 4xx)
3261+
// instead of leaking RelBuilder/rexVisitor exception wording.
3262+
throw new SemanticCheckException(
3263+
"mvcombine target field not found: " + targetField.getField().toString(), e);
3264+
}
3265+
3266+
if (!isInputRef(targetRex)) {
3267+
throw new SemanticCheckException(
3268+
"mvcombine target must be a direct field reference, but got: " + targetField);
3269+
}
3270+
3271+
final int index = ((RexInputRef) targetRex).getIndex();
3272+
3273+
final RelDataType fieldType =
3274+
context.relBuilder.peek().getRowType().getFieldList().get(index).getType();
3275+
3276+
if (SqlTypeUtil.isArray(fieldType) || SqlTypeUtil.isMultiset(fieldType)) {
3277+
throw new SemanticCheckException(
3278+
"mvcombine target cannot be an array/multivalue type, but got: " + fieldType);
3279+
}
3280+
3281+
return index;
3282+
}
3283+
3284+
/**
3285+
* Builds group-by expressions for mvcombine: all non-target input fields; meta fields are skipped
3286+
* unless includeMetaFields is true.
3287+
*/
3288+
private List<RexNode> buildGroupExpressionsExcludingTarget(
3289+
int targetIndex,
3290+
List<String> inputFieldNames,
3291+
RelBuilder relBuilder,
3292+
boolean includeMetaFields) {
3293+
3294+
final List<RexNode> groupExprs = new ArrayList<>(Math.max(0, inputFieldNames.size() - 1));
3295+
for (int i = 0; i < inputFieldNames.size(); i++) {
3296+
if (i == targetIndex) {
3297+
continue;
3298+
}
3299+
if (isMetadataField(inputFieldNames.get(i)) && !includeMetaFields) {
3300+
continue;
3301+
}
3302+
groupExprs.add(relBuilder.field(i));
3303+
}
3304+
return groupExprs;
3305+
}
3306+
3307+
/** Applies mvcombine aggregation. */
3308+
private void performArrayAggAggregation(
3309+
RelBuilder relBuilder, int targetIndex, String targetName, List<RexNode> groupExprs) {
3310+
3311+
final RexNode targetRef = relBuilder.field(targetIndex);
3312+
final RexNode notNullTarget = relBuilder.isNotNull(targetRef);
3313+
3314+
final RelBuilder.AggCall aggCall =
3315+
relBuilder
3316+
.aggregateCall(SqlLibraryOperators.ARRAY_AGG, targetRef)
3317+
.filter(notNullTarget)
3318+
.as(targetName);
3319+
3320+
relBuilder.aggregate(relBuilder.groupKey(groupExprs), aggCall);
3321+
}
3322+
3323+
/**
3324+
* Restores the original output column order after the aggregate step. Meta fields are set to
3325+
* typed NULL only when they were skipped from grouping (includeMetaFields=false).
3326+
*/
3327+
private void restoreColumnOrderAfterArrayAgg(
3328+
RelBuilder relBuilder,
3329+
List<String> inputFieldNames,
3330+
List<RelDataType> inputFieldTypes,
3331+
int targetIndex,
3332+
List<RexNode> groupExprs,
3333+
boolean includeMetaFields) {
3334+
3335+
final int aggregatedTargetPos = groupExprs.size();
3336+
3337+
final List<RexNode> projections = new ArrayList<>(inputFieldNames.size());
3338+
final List<String> projectionNames = new ArrayList<>(inputFieldNames.size());
3339+
3340+
int groupPos = 0;
3341+
for (int i = 0; i < inputFieldNames.size(); i++) {
3342+
final String fieldName = inputFieldNames.get(i);
3343+
projectionNames.add(fieldName);
3344+
3345+
if (i == targetIndex) {
3346+
// aggregated target is always the last field in the aggregate output
3347+
projections.add(relBuilder.field(aggregatedTargetPos));
3348+
} else if (isMetadataField(fieldName) && !includeMetaFields) {
3349+
// meta fields were skipped from grouping => not present in aggregate output => keep schema
3350+
// stable
3351+
projections.add(relBuilder.getRexBuilder().makeNullLiteral(inputFieldTypes.get(i)));
3352+
} else {
3353+
// grouped field (including meta fields when includeMetaFields=true)
3354+
projections.add(relBuilder.field(groupPos));
3355+
groupPos++;
3356+
}
3357+
}
3358+
3359+
relBuilder.project(projections, projectionNames, /* force= */ true);
3360+
}
3361+
31923362
@Override
31933363
public RelNode visitValues(Values values, CalcitePlanContext context) {
31943364
if (values.getValues() == null || values.getValues().isEmpty()) {

docs/category.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
"user/ppl/cmd/head.md",
2424
"user/ppl/cmd/join.md",
2525
"user/ppl/cmd/lookup.md",
26+
"user/ppl/cmd/mvcombine.md",
2627
"user/ppl/cmd/parse.md",
2728
"user/ppl/cmd/patterns.md",
2829
"user/ppl/cmd/rare.md",

docs/user/dql/metadata.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ Example 1: Show All Indices Information
3737
SQL query::
3838

3939
os> SHOW TABLES LIKE '%'
40-
fetched rows / total rows = 23/23
40+
fetched rows / total rows = 24/24
4141
+----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------+
4242
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION |
4343
|----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------|
@@ -50,6 +50,7 @@ SQL query::
5050
| docTestCluster | null | events_many_hosts | BASE TABLE | null | null | null | null | null | null |
5151
| docTestCluster | null | events_null | BASE TABLE | null | null | null | null | null | null |
5252
| docTestCluster | null | json_test | BASE TABLE | null | null | null | null | null | null |
53+
| docTestCluster | null | mvcombine_data | BASE TABLE | null | null | null | null | null | null |
5354
| docTestCluster | null | nested | BASE TABLE | null | null | null | null | null | null |
5455
| docTestCluster | null | nyc_taxi | BASE TABLE | null | null | null | null | null | null |
5556
| docTestCluster | null | occupation | BASE TABLE | null | null | null | null | null | null |

0 commit comments

Comments
 (0)