|
63 | 63 | import org.apache.calcite.rex.RexVisitorImpl; |
64 | 64 | import org.apache.calcite.rex.RexWindowBounds; |
65 | 65 | import org.apache.calcite.sql.SqlKind; |
| 66 | +import org.apache.calcite.sql.fun.SqlLibraryOperators; |
66 | 67 | import org.apache.calcite.sql.fun.SqlStdOperatorTable; |
67 | 68 | import org.apache.calcite.sql.fun.SqlTrimFunction; |
68 | 69 | import org.apache.calcite.sql.type.ArraySqlType; |
|
124 | 125 | import org.opensearch.sql.ast.tree.Lookup.OutputStrategy; |
125 | 126 | import org.opensearch.sql.ast.tree.ML; |
126 | 127 | import org.opensearch.sql.ast.tree.Multisearch; |
| 128 | +import org.opensearch.sql.ast.tree.MvCombine; |
127 | 129 | import org.opensearch.sql.ast.tree.Paginate; |
128 | 130 | import org.opensearch.sql.ast.tree.Parse; |
129 | 131 | import org.opensearch.sql.ast.tree.Patterns; |
@@ -3169,6 +3171,174 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) { |
3169 | 3171 | return context.relBuilder.peek(); |
3170 | 3172 | } |
3171 | 3173 |
|
| 3174 | + /** |
| 3175 | + * mvcombine command visitor to collapse rows that are identical on all non-target fields. |
| 3176 | + * |
| 3177 | + * <p>Grouping semantics: |
| 3178 | + * |
| 3179 | + * <ul> |
| 3180 | + * <li>The target field is always excluded from the GROUP BY keys. |
| 3181 | + * <li>Metadata fields (for example {@code _id}, {@code _index}, {@code _score}) are excluded |
| 3182 | + * from GROUP BY keys <strong>unless</strong> they were explicitly projected earlier (for |
| 3183 | + * example, via {@code fields}). |
| 3184 | + * </ul> |
| 3185 | + * |
| 3186 | + * <p>The target field values are aggregated using {@code ARRAY_AGG}, with {@code NULL} values |
| 3187 | + * filtered out. The aggregation result replaces the original target column and produces an {@code |
| 3188 | + * ARRAY<T>} output. |
| 3189 | + * |
| 3190 | + * <p>The original output column order is preserved. Metadata fields are projected as typed {@code |
| 3191 | + * NULL} literals after aggregation only when they are not part of grouping (since they were |
| 3192 | + * skipped). |
| 3193 | + * |
| 3194 | + * @param node mvcombine command to be visited |
| 3195 | + * @param context CalcitePlanContext containing the RelBuilder and planning context |
| 3196 | + * @return RelNode representing collapsed records with the target combined into a multivalue array |
| 3197 | + * @throws SemanticCheckException if the mvcombine target is not a direct field reference |
| 3198 | + */ |
| 3199 | + @Override |
| 3200 | + public RelNode visitMvCombine(MvCombine node, CalcitePlanContext context) { |
| 3201 | + // 1) Lower the child plan first so the RelBuilder has the input schema on the stack. |
| 3202 | + visitChildren(node, context); |
| 3203 | + |
| 3204 | + final RelBuilder relBuilder = context.relBuilder; |
| 3205 | + |
| 3206 | + final RelNode input = relBuilder.peek(); |
| 3207 | + final List<String> inputFieldNames = input.getRowType().getFieldNames(); |
| 3208 | + final List<RelDataType> inputFieldTypes = |
| 3209 | + input.getRowType().getFieldList().stream().map(RelDataTypeField::getType).toList(); |
| 3210 | + |
| 3211 | + // If true, we should NOT auto-skip meta fields (because user explicitly projected them) |
| 3212 | + final boolean includeMetaFields = context.isProjectVisited(); |
| 3213 | + |
| 3214 | + // 2) Resolve the mvcombine target to an input column index (must be a direct field reference). |
| 3215 | + final Field targetField = node.getField(); |
| 3216 | + final int targetIndex = resolveTargetIndex(targetField, context); |
| 3217 | + final String targetName = inputFieldNames.get(targetIndex); |
| 3218 | + |
| 3219 | + // 3) Group by all non-target fields, skipping meta fields unless explicitly projected. |
| 3220 | + final List<RexNode> groupExprs = |
| 3221 | + buildGroupExpressionsExcludingTarget( |
| 3222 | + targetIndex, inputFieldNames, relBuilder, includeMetaFields); |
| 3223 | + |
| 3224 | + // 4) Aggregate target values using ARRAY_AGG, filtering out NULLs. |
| 3225 | + performArrayAggAggregation(relBuilder, targetIndex, targetName, groupExprs); |
| 3226 | + |
| 3227 | + // 5) Restore original output column order (ARRAY_AGG already returns ARRAY<T>). |
| 3228 | + restoreColumnOrderAfterArrayAgg( |
| 3229 | + relBuilder, inputFieldNames, inputFieldTypes, targetIndex, groupExprs, includeMetaFields); |
| 3230 | + |
| 3231 | + return relBuilder.peek(); |
| 3232 | + } |
| 3233 | + |
| 3234 | + /** Resolves the mvcombine target expression to an input field index. */ |
| 3235 | + private int resolveTargetIndex(Field targetField, CalcitePlanContext context) { |
| 3236 | + final RexNode targetRex; |
| 3237 | + try { |
| 3238 | + targetRex = rexVisitor.analyze(targetField, context); |
| 3239 | + } catch (IllegalArgumentException e) { |
| 3240 | + // Make missing-field behavior deterministic (and consistently mapped to 4xx) |
| 3241 | + // instead of leaking RelBuilder/rexVisitor exception wording. |
| 3242 | + throw new SemanticCheckException( |
| 3243 | + "mvcombine target field not found: " + targetField.getField().toString(), e); |
| 3244 | + } |
| 3245 | + |
| 3246 | + if (!isInputRef(targetRex)) { |
| 3247 | + throw new SemanticCheckException( |
| 3248 | + "mvcombine target must be a direct field reference, but got: " + targetField); |
| 3249 | + } |
| 3250 | + |
| 3251 | + final int index = ((RexInputRef) targetRex).getIndex(); |
| 3252 | + |
| 3253 | + final RelDataType fieldType = |
| 3254 | + context.relBuilder.peek().getRowType().getFieldList().get(index).getType(); |
| 3255 | + |
| 3256 | + if (SqlTypeUtil.isArray(fieldType) || SqlTypeUtil.isMultiset(fieldType)) { |
| 3257 | + throw new SemanticCheckException( |
| 3258 | + "mvcombine target cannot be an array/multivalue type, but got: " + fieldType); |
| 3259 | + } |
| 3260 | + |
| 3261 | + return index; |
| 3262 | + } |
| 3263 | + |
| 3264 | + /** |
| 3265 | + * Builds group-by expressions for mvcombine: all non-target input fields; meta fields are skipped |
| 3266 | + * unless includeMetaFields is true. |
| 3267 | + */ |
| 3268 | + private List<RexNode> buildGroupExpressionsExcludingTarget( |
| 3269 | + int targetIndex, |
| 3270 | + List<String> inputFieldNames, |
| 3271 | + RelBuilder relBuilder, |
| 3272 | + boolean includeMetaFields) { |
| 3273 | + |
| 3274 | + final List<RexNode> groupExprs = new ArrayList<>(Math.max(0, inputFieldNames.size() - 1)); |
| 3275 | + for (int i = 0; i < inputFieldNames.size(); i++) { |
| 3276 | + if (i == targetIndex) { |
| 3277 | + continue; |
| 3278 | + } |
| 3279 | + if (isMetadataField(inputFieldNames.get(i)) && !includeMetaFields) { |
| 3280 | + continue; |
| 3281 | + } |
| 3282 | + groupExprs.add(relBuilder.field(i)); |
| 3283 | + } |
| 3284 | + return groupExprs; |
| 3285 | + } |
| 3286 | + |
| 3287 | + /** Applies mvcombine aggregation. */ |
| 3288 | + private void performArrayAggAggregation( |
| 3289 | + RelBuilder relBuilder, int targetIndex, String targetName, List<RexNode> groupExprs) { |
| 3290 | + |
| 3291 | + final RexNode targetRef = relBuilder.field(targetIndex); |
| 3292 | + final RexNode notNullTarget = relBuilder.isNotNull(targetRef); |
| 3293 | + |
| 3294 | + final RelBuilder.AggCall aggCall = |
| 3295 | + relBuilder |
| 3296 | + .aggregateCall(SqlLibraryOperators.ARRAY_AGG, targetRef) |
| 3297 | + .filter(notNullTarget) |
| 3298 | + .as(targetName); |
| 3299 | + |
| 3300 | + relBuilder.aggregate(relBuilder.groupKey(groupExprs), aggCall); |
| 3301 | + } |
| 3302 | + |
| 3303 | + /** |
| 3304 | + * Restores the original output column order after the aggregate step. Meta fields are set to |
| 3305 | + * typed NULL only when they were skipped from grouping (includeMetaFields=false). |
| 3306 | + */ |
| 3307 | + private void restoreColumnOrderAfterArrayAgg( |
| 3308 | + RelBuilder relBuilder, |
| 3309 | + List<String> inputFieldNames, |
| 3310 | + List<RelDataType> inputFieldTypes, |
| 3311 | + int targetIndex, |
| 3312 | + List<RexNode> groupExprs, |
| 3313 | + boolean includeMetaFields) { |
| 3314 | + |
| 3315 | + final int aggregatedTargetPos = groupExprs.size(); |
| 3316 | + |
| 3317 | + final List<RexNode> projections = new ArrayList<>(inputFieldNames.size()); |
| 3318 | + final List<String> projectionNames = new ArrayList<>(inputFieldNames.size()); |
| 3319 | + |
| 3320 | + int groupPos = 0; |
| 3321 | + for (int i = 0; i < inputFieldNames.size(); i++) { |
| 3322 | + final String fieldName = inputFieldNames.get(i); |
| 3323 | + projectionNames.add(fieldName); |
| 3324 | + |
| 3325 | + if (i == targetIndex) { |
| 3326 | + // aggregated target is always the last field in the aggregate output |
| 3327 | + projections.add(relBuilder.field(aggregatedTargetPos)); |
| 3328 | + } else if (isMetadataField(fieldName) && !includeMetaFields) { |
| 3329 | + // meta fields were skipped from grouping => not present in aggregate output => keep schema |
| 3330 | + // stable |
| 3331 | + projections.add(relBuilder.getRexBuilder().makeNullLiteral(inputFieldTypes.get(i))); |
| 3332 | + } else { |
| 3333 | + // grouped field (including meta fields when includeMetaFields=true) |
| 3334 | + projections.add(relBuilder.field(groupPos)); |
| 3335 | + groupPos++; |
| 3336 | + } |
| 3337 | + } |
| 3338 | + |
| 3339 | + relBuilder.project(projections, projectionNames, /* force= */ true); |
| 3340 | + } |
| 3341 | + |
3172 | 3342 | @Override |
3173 | 3343 | public RelNode visitValues(Values values, CalcitePlanContext context) { |
3174 | 3344 | if (values.getValues() == null || values.getValues().isEmpty()) { |
|
0 commit comments