Skip to content

Commit 6eb5ac5

Browse files
Fix #5150: Fix dedup aggregation pushdown nullifying renamed fields
When the DedupPushdownRule converts a dedup to an aggregation-based top_hits query, fields that were renamed (via rename or eval) would return null values. This happened because the TopHitsParser returned results using original OpenSearch field names, but the output schema expected the renamed names. Added a field name mapping to TopHitsParser so it can translate original OS field names to their renamed output names in the LITERAL_AGG (dedup) aggregation response path. Signed-off-by: opensearchpplteam <opensearchpplteam@gmail.com>
1 parent e30b2f8 commit 6eb5ac5

4 files changed

Lines changed: 175 additions & 2 deletions

File tree

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
setup:
2+
- do:
3+
indices.create:
4+
index: test_5150
5+
- do:
6+
query.settings:
7+
body:
8+
transient:
9+
plugins.calcite.enabled : true
10+
11+
- do:
12+
bulk:
13+
index: test_5150
14+
refresh: true
15+
body:
16+
- '{"index": {}}'
17+
- '{"category":"X","value":100}'
18+
- '{"index": {}}'
19+
- '{"category":"X","value":200}'
20+
- '{"index": {}}'
21+
- '{"category":"Y","value":300}'
22+
- '{"index": {}}'
23+
- '{"category":"Y","value":400}'
24+
25+
---
26+
teardown:
27+
- do:
28+
query.settings:
29+
body:
30+
transient:
31+
plugins.calcite.enabled : false
32+
33+
---
34+
"5150: Rename non-dedup field then dedup retains renamed values":
35+
- skip:
36+
features:
37+
- headers
38+
- allowed_warnings
39+
- do:
40+
allowed_warnings:
41+
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
42+
headers:
43+
Content-Type: 'application/json'
44+
ppl:
45+
body:
46+
query: source=test_5150 | rename value as val | dedup category | sort category | fields category, val
47+
48+
- match: { total: 2 }
49+
- match: { schema: [{"name": "category", "type": "string"}, {"name": "val", "type": "bigint"}] }
50+
- length: { datarows: 2 }
51+
# Each row should have non-null val
52+
- is_true: datarows.0.1
53+
- is_true: datarows.1.1
54+
55+
---
56+
"5150: Eval new field then dedup on different field retains eval values":
57+
- skip:
58+
features:
59+
- headers
60+
- allowed_warnings
61+
- do:
62+
allowed_warnings:
63+
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
64+
headers:
65+
Content-Type: 'application/json'
66+
ppl:
67+
body:
68+
query: source=test_5150 | eval doubled = value * 2 | dedup category | sort category | fields category, value, doubled
69+
70+
- match: { total: 2 }
71+
- length: { datarows: 2 }
72+
# Each row should have non-null doubled
73+
- is_true: datarows.0.2
74+
- is_true: datarows.1.2

opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.common.collect.ImmutableList;
3939
import java.util.ArrayList;
4040
import java.util.Collections;
41+
import java.util.HashMap;
4142
import java.util.HashSet;
4243
import java.util.List;
4344
import java.util.Map;
@@ -601,7 +602,21 @@ yield switch (functionName) {
601602
TopHitsAggregationBuilder topHitsAggregationBuilder =
602603
createTopHitsBuilder(
603604
aggCall, args, aggName, helper, dedupNumber, false, false, null, null);
604-
yield Pair.of(topHitsAggregationBuilder, new TopHitsParser(aggName, false, false));
605+
// Build field name mapping for renamed fields (e.g., rename value as val).
606+
// The top_hits response uses original OS field names, but the output schema expects
607+
// the renamed names from the project.
608+
Map<String, String> fieldNameMapping = new HashMap<>();
609+
for (Pair<RexNode, String> arg : args) {
610+
if (arg.getKey() instanceof RexInputRef) {
611+
String originalName = helper.inferNamedField(arg.getKey()).getRootName();
612+
String outputName = arg.getValue();
613+
if (!originalName.equals(outputName)) {
614+
fieldNameMapping.put(originalName, outputName);
615+
}
616+
}
617+
}
618+
yield Pair.of(
619+
topHitsAggregationBuilder, new TopHitsParser(aggName, false, false, fieldNameMapping));
605620
}
606621
default ->
607622
throw new AggregateAnalyzer.AggregateAnalyzerException(

opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/TopHitsParser.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,27 @@ public class TopHitsParser implements MetricParser {
2727
private final boolean returnSingleValue;
2828
private final boolean returnMergeValue;
2929

30+
/**
31+
* Mapping from original OpenSearch field names to output field names (e.g., renamed via {@code
32+
* rename} command). When a field is renamed (e.g., {@code rename value as val}), the top_hits
33+
* response still contains the original field name ({@code value}), but the output schema expects
34+
* the renamed name ({@code val}). This mapping enables the translation.
35+
*/
36+
private final Map<String, String> fieldNameMapping;
37+
3038
public TopHitsParser(String name, boolean returnSingleValue, boolean returnMergeValue) {
39+
this(name, returnSingleValue, returnMergeValue, Collections.emptyMap());
40+
}
41+
42+
public TopHitsParser(
43+
String name,
44+
boolean returnSingleValue,
45+
boolean returnMergeValue,
46+
Map<String, String> fieldNameMapping) {
3147
this.name = name;
3248
this.returnSingleValue = returnSingleValue;
3349
this.returnMergeValue = returnMergeValue;
50+
this.fieldNameMapping = fieldNameMapping;
3451
}
3552

3653
@Override
@@ -129,12 +146,28 @@ public List<Map<String, Object>> parse(Aggregation agg) {
129146
? new LinkedHashMap<>()
130147
: new LinkedHashMap<>(hit.getSourceAsMap());
131148
hit.getFields().values().forEach(f -> map.put(f.getName(), f.getValue()));
132-
return map;
149+
return applyFieldNameMapping(map);
133150
})
134151
.toList();
135152
}
136153
}
137154

155+
/**
156+
* Apply field name mapping to translate original OpenSearch field names to output field names.
157+
* Fields not present in the mapping are kept as-is.
158+
*/
159+
private Map<String, Object> applyFieldNameMapping(Map<String, Object> map) {
160+
if (fieldNameMapping.isEmpty()) {
161+
return map;
162+
}
163+
Map<String, Object> result = new LinkedHashMap<>();
164+
for (Map.Entry<String, Object> entry : map.entrySet()) {
165+
String mappedName = fieldNameMapping.getOrDefault(entry.getKey(), entry.getKey());
166+
result.put(mappedName, entry.getValue());
167+
}
168+
return result;
169+
}
170+
138171
private boolean isEmptyHits(SearchHit[] hits) {
139172
return isFieldsEmpty(hits) && isSourceEmpty(hits);
140173
}

opensearch/src/test/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParserTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,57 @@ void two_bucket_percentiles_should_pass() {
570570
ImmutableMap.of("percentiles", List.of(21.0, 27.0, 30.0, 35.0, 55.0, 58.0, 60.0))));
571571
}
572572

573+
/**
574+
* Test for issue #5150: dedup aggregation pushdown with renamed fields. When a field is renamed
575+
* (e.g., rename value as val), the top_hits response uses original field names. The TopHitsParser
576+
* with fieldNameMapping should translate them to the renamed names.
577+
*/
578+
@Test
579+
void dedup_top_hits_with_field_name_mapping_should_remap_fields() {
580+
String response =
581+
"{\n"
582+
+ " \"composite#composite_buckets\": {\n"
583+
+ " \"buckets\": [\n"
584+
+ " {\n"
585+
+ " \"key\": {\n"
586+
+ " \"category\": \"X\"\n"
587+
+ " },\n"
588+
+ " \"doc_count\": 2,\n"
589+
+ " \"top_hits#dedup\": {\n"
590+
+ " \"hits\": {\n"
591+
+ " \"total\": { \"value\": 1, \"relation\": \"eq\" },\n"
592+
+ " \"max_score\": 1.0,\n"
593+
+ " \"hits\": [\n"
594+
+ " {\n"
595+
+ " \"_index\": \"test\",\n"
596+
+ " \"_id\": \"1\",\n"
597+
+ " \"_score\": 1.0,\n"
598+
+ " \"fields\": {\n"
599+
+ " \"category\": [\"X\"],\n"
600+
+ " \"value\": [100]\n"
601+
+ " }\n"
602+
+ " }\n"
603+
+ " ]\n"
604+
+ " }\n"
605+
+ " }\n"
606+
+ " }\n"
607+
+ " ]\n"
608+
+ " }\n"
609+
+ "}";
610+
// "value" is renamed to "val" — the mapping should translate it in the response.
611+
// Use BucketAggregationParser as used by the dedup aggregation pushdown path.
612+
OpenSearchAggregationResponseParser parser =
613+
new BucketAggregationParser(
614+
List.of(new TopHitsParser("dedup", false, false, Map.of("value", "val"))), List.of());
615+
List<Map<String, Object>> result = parse(parser, response);
616+
assertEquals(1, result.size());
617+
Map<String, Object> row = result.get(0);
618+
// The renamed field "val" should be present, not the original "value"
619+
assertEquals(100, row.get("val"));
620+
assertNull(row.get("value"));
621+
assertEquals("X", row.get("category"));
622+
}
623+
573624
public List<Map<String, Object>> parse(OpenSearchAggregationResponseParser parser, String json) {
574625
return parser.parse(fromJson(json));
575626
}

0 commit comments

Comments
 (0)