Skip to content

Commit b025fef

Browse files
committed
resolved PR comments from expani
Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>
1 parent 786c059 commit b025fef

12 files changed

Lines changed: 99 additions & 57 deletions

File tree

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/AggregationMetadata.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
*
2121
* <p>A multi-level aggregation tree (e.g., terms → terms → avg) produces
2222
* multiple metadata instances — one per distinct GROUP BY key set.
23+
*
24+
* <p>TODO: Add tree structure (parent-child links between granularity levels) and pipeline
25+
* aggregation support.
2326
*/
2427
public class AggregationMetadata {
2528

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/AggregationMetadataBuilder.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.calcite.rel.core.AggregateCall;
1313
import org.apache.calcite.rel.type.RelDataType;
1414
import org.apache.calcite.rel.type.RelDataTypeFactory;
15+
import org.apache.calcite.rel.type.RelDataTypeField;
1516
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
1617
import org.apache.calcite.sql.type.SqlTypeName;
1718
import org.apache.calcite.util.ImmutableBitSet;
@@ -103,7 +104,7 @@ public AggregationMetadata build(RelDataType inputRowType, RelDataTypeFactory ty
103104
List<Integer> allGroupIndices = new ArrayList<>();
104105
List<String> allGroupFieldNames = new ArrayList<>();
105106
for (GroupingInfo g : groupings) {
106-
allGroupIndices.addAll(g.resolveIndices(inputRowType));
107+
allGroupIndices.addAll(resolveFieldIndices(g, inputRowType));
107108
allGroupFieldNames.addAll(g.getFieldNames());
108109
}
109110

@@ -152,4 +153,25 @@ public AggregationMetadata build(RelDataType inputRowType, RelDataTypeFactory ty
152153

153154
return new AggregationMetadata(ImmutableBitSet.of(allGroupIndices), allGroupFieldNames, allCalls, allFieldNames, bucketOrders);
154155
}
156+
157+
/**
158+
* Resolves field-based grouping names to column indices in the input schema.
159+
*
160+
* @param grouping the grouping info containing field names
161+
* @param inputRowType the schema before aggregation
162+
* @return column indices for each field name
163+
* @throws ConversionException if a field name is not found in the schema
164+
*/
165+
private static List<Integer> resolveFieldIndices(GroupingInfo grouping, RelDataType inputRowType) throws ConversionException {
166+
List<String> fieldNames = grouping.getFieldNames();
167+
List<Integer> indices = new ArrayList<>(fieldNames.size());
168+
for (String name : fieldNames) {
169+
RelDataTypeField field = inputRowType.getField(name, false, false);
170+
if (field == null) {
171+
throw new ConversionException("Group-by field '" + name + "' not found in schema");
172+
}
173+
indices.add(field.getIndex());
174+
}
175+
return indices;
176+
}
155177
}

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/AggregationRegistry.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
*/
2323
public class AggregationRegistry {
2424

25-
private final Map<Class<? extends AggregationBuilder>, AggregationType<?>> translators = new HashMap<>();
25+
private final Map<Class<? extends AggregationBuilder>, AggregationTranslator<?>> translators = new HashMap<>();
2626

2727
/** Creates an empty registry. */
2828
public AggregationRegistry() {}
@@ -32,7 +32,7 @@ public AggregationRegistry() {}
3232
*
3333
* @param translator the translator to register
3434
*/
35-
public void register(AggregationType<?> translator) {
35+
public void register(AggregationTranslator<?> translator) {
3636
translators.put(translator.getAggregationType(), translator);
3737
}
3838

@@ -43,7 +43,7 @@ public void register(AggregationType<?> translator) {
4343
* @param aggClass the aggregation builder class
4444
* @return the translator, or null
4545
*/
46-
public AggregationType<?> get(Class<? extends AggregationBuilder> aggClass) {
46+
public AggregationTranslator<?> get(Class<? extends AggregationBuilder> aggClass) {
4747
return translators.get(aggClass);
4848
}
4949

@@ -55,7 +55,7 @@ public AggregationType<?> get(Class<? extends AggregationBuilder> aggClass) {
5555
*/
5656
@SuppressWarnings("unchecked")
5757
public <T extends AggregationBuilder> MetricTranslator<T> getMetric(Class<? extends AggregationBuilder> aggClass) {
58-
AggregationType<?> translator = translators.get(aggClass);
58+
AggregationTranslator<?> translator = translators.get(aggClass);
5959
return translator instanceof MetricTranslator ? (MetricTranslator<T>) translator : null;
6060
}
6161

@@ -67,7 +67,7 @@ public <T extends AggregationBuilder> MetricTranslator<T> getMetric(Class<? exte
6767
*/
6868
@SuppressWarnings("unchecked")
6969
public <T extends AggregationBuilder> BucketTranslator<T> getBucket(Class<? extends AggregationBuilder> aggClass) {
70-
AggregationType<?> translator = translators.get(aggClass);
70+
AggregationTranslator<?> translator = translators.get(aggClass);
7171
return translator instanceof BucketTranslator ? (BucketTranslator<T>) translator : null;
7272
}
7373
}

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/AggregationType.java renamed to sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/AggregationTranslator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* Provides type identification for the {@link AggregationRegistry}.
1616
* Bucket and metric subtypes define their own contracts.
1717
*/
18-
public interface AggregationType<T extends AggregationBuilder> {
18+
public interface AggregationTranslator<T extends AggregationBuilder> {
1919

2020
/** Returns the concrete AggregationBuilder class this type handles. */
2121
Class<T> getAggregationType();

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/AggregationTreeWalker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private void walkRecursive(
7575
RelDataType rowType
7676
) throws ConversionException {
7777
for (AggregationBuilder aggBuilder : aggs) {
78-
AggregationType<?> type = registry.get(aggBuilder.getClass());
78+
AggregationTranslator<?> type = registry.get(aggBuilder.getClass());
7979

8080
if (type == null) {
8181
throw new ConversionException("No translator registered for aggregation type: " + aggBuilder.getClass().getSimpleName());

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/FieldGrouping.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,6 @@
88

99
package org.opensearch.dsl.aggregation;
1010

11-
import org.apache.calcite.rel.type.RelDataType;
12-
import org.apache.calcite.rel.type.RelDataTypeField;
13-
import org.opensearch.dsl.converter.ConversionException;
14-
15-
import java.util.ArrayList;
1611
import java.util.List;
1712

1813
/**
@@ -36,17 +31,4 @@ public FieldGrouping(List<String> fieldNames) {
3631
public List<String> getFieldNames() {
3732
return fieldNames;
3833
}
39-
40-
@Override
41-
public List<Integer> resolveIndices(RelDataType inputRowType) throws ConversionException {
42-
List<Integer> indices = new ArrayList<>(fieldNames.size());
43-
for (String name : fieldNames) {
44-
RelDataTypeField field = inputRowType.getField(name, false, false);
45-
if (field == null) {
46-
throw new ConversionException("Group-by field '" + name + "' not found in schema");
47-
}
48-
indices.add(field.getIndex());
49-
}
50-
return indices;
51-
}
5234
}

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/GroupingInfo.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88

99
package org.opensearch.dsl.aggregation;
1010

11-
import org.apache.calcite.rel.type.RelDataType;
12-
import org.opensearch.dsl.converter.ConversionException;
13-
1411
import java.util.List;
1512

1613
/**
@@ -22,13 +19,4 @@ public interface GroupingInfo {
2219

2320
/** Returns the logical field names this grouping contributes. */
2421
List<String> getFieldNames();
25-
26-
/**
27-
* Resolves this grouping to column indices in the input schema.
28-
*
29-
* @param inputRowType the schema before aggregation
30-
* @return column indices for the GROUP BY bit set
31-
* @throws ConversionException if field lookup fails
32-
*/
33-
List<Integer> resolveIndices(RelDataType inputRowType) throws ConversionException;
3422
}

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/bucket/BucketTranslator.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,20 @@
88

99
package org.opensearch.dsl.aggregation.bucket;
1010

11-
import org.opensearch.dsl.aggregation.AggregationType;
11+
import org.opensearch.dsl.aggregation.AggregationTranslator;
1212
import org.opensearch.dsl.aggregation.GroupingInfo;
1313
import org.opensearch.dsl.result.BucketEntry;
1414
import org.opensearch.search.aggregations.AggregationBuilder;
1515
import org.opensearch.search.aggregations.BucketOrder;
1616
import org.opensearch.search.aggregations.InternalAggregation;
1717

1818
import java.util.Collection;
19-
import java.util.List;
2019

2120
/**
2221
* Translates a bucket aggregation (terms, multi_terms, etc.) to a {@link GroupingInfo}
2322
* for GROUP BY resolution, and converts results back to InternalAggregation for response building.
2423
*/
25-
public interface BucketTranslator<T extends AggregationBuilder> extends AggregationType<T> {
24+
public interface BucketTranslator<T extends AggregationBuilder> extends AggregationTranslator<T> {
2625

2726
/**
2827
* Returns the grouping contribution for this bucket.
@@ -55,5 +54,5 @@ public interface BucketTranslator<T extends AggregationBuilder> extends Aggregat
5554
* @param buckets the bucket entries with keys, doc counts, and sub-aggs
5655
* @return the InternalAggregation
5756
*/
58-
InternalAggregation toBucketAggregation(T agg, List<BucketEntry> buckets);
57+
InternalAggregation toBucketAggregation(T agg, Iterable<BucketEntry> buckets);
5958
}

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/bucket/TermsBucketTranslator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public BucketOrder getBucketOrder(TermsAggregationBuilder agg) {
5050

5151
// TODO: implement response conversion
5252
@Override
53-
public InternalAggregation toBucketAggregation(TermsAggregationBuilder agg, List<BucketEntry> buckets) {
53+
public InternalAggregation toBucketAggregation(TermsAggregationBuilder agg, Iterable<BucketEntry> buckets) {
5454
throw new UnsupportedOperationException("toBucketAggregation not yet implemented");
5555
}
5656
}

sandbox/plugins/dsl-query-executor/src/main/java/org/opensearch/dsl/aggregation/metric/MetricTranslator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
import org.apache.calcite.rel.core.AggregateCall;
1212
import org.apache.calcite.rel.type.RelDataType;
13-
import org.opensearch.dsl.aggregation.AggregationType;
13+
import org.opensearch.dsl.aggregation.AggregationTranslator;
1414
import org.opensearch.dsl.converter.ConversionException;
1515
import org.opensearch.search.aggregations.AggregationBuilder;
1616
import org.opensearch.search.aggregations.InternalAggregation;
@@ -19,7 +19,7 @@
1919
* Translates a metric aggregation (AVG, SUM, MIN, MAX, etc.) to a Calcite AggregateCall,
2020
* and converts raw result values back to OpenSearch InternalAggregation for response building.
2121
*/
22-
public interface MetricTranslator<T extends AggregationBuilder> extends AggregationType<T> {
22+
public interface MetricTranslator<T extends AggregationBuilder> extends AggregationTranslator<T> {
2323

2424
/**
2525
* Converts the metric aggregation to a Calcite AggregateCall.
@@ -39,6 +39,9 @@ public interface MetricTranslator<T extends AggregationBuilder> extends Aggregat
3939
*/
4040
String getAggregateFieldName(T agg);
4141

42+
// TODO: Revisit signature — accept a stream/iterator of <String,Object> for bulk conversion
43+
// to avoid per-row virtual dispatch overhead, and use Arrow-native types once Analytics Core
44+
// exposes them.
4245
/**
4346
* Converts a raw result value from execution into an OpenSearch InternalAggregation.
4447
*

0 commit comments

Comments
 (0)