Skip to content

Commit 2b43b8b

Browse files
authored
Dedup pushdown (TopHits Agg) should work with Object fields (#4991) (#4995)
(cherry picked from commit 1192376) Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent fe77730 commit 2b43b8b

6 files changed

Lines changed: 53 additions & 27 deletions

File tree

integ-test/src/test/java/org/opensearch/sql/calcite/big5/CalcitePPLBig5IT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.calcite.big5;
77

8+
import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsIgnoreId;
9+
810
import java.io.IOException;
911
import org.junit.FixMethodOrder;
1012
import org.junit.Test;
@@ -42,4 +44,13 @@ public void coalesce_nonexistent_field_fallback() throws IOException {
4244
String ppl = sanitize(loadExpectedQuery("coalesce_nonexistent_field_fallback.ppl"));
4345
timing(summary, "coalesce_nonexistent_field_fallback", ppl);
4446
}
47+
48+
/** Tests deduplication by metrics.size field with sorting by timestamp. */
49+
@Test
50+
public void dedup_metrics_size_field() throws IOException {
51+
String ppl = sanitize(loadExpectedQuery("dedup_metrics_size_field.ppl"));
52+
timing(summary, "dedup_metrics_size_field", ppl);
53+
String expected = loadExpectedPlan("big5/dedup_metrics_size_field.yaml");
54+
assertYamlEqualsIgnoreId(expected, explainQueryYaml(ppl));
55+
}
4556
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
{
3+
"name": "dedup_metrics_size_field",
4+
"operation-type": "search",
5+
"index": "{{index_name | default('big5')}}",
6+
"body": {
7+
"query": {
8+
"exists": {
9+
"field": "metrics.size",
10+
"boost": 1.0
11+
}
12+
},
13+
"_source": {
14+
"includes": ["agent", "process", "log", "message", "tags", "cloud", "input", "@timestamp", "ecs", "data_stream", "meta", "host", "metrics", "metrics.size", "aws", "event"],
15+
"excludes": []
16+
}
17+
}
18+
}
19+
*/
20+
source = big5
21+
| dedup metrics.size
22+
| sort - @timestamp
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$7], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(agent=[$0], process=[$6], log=[$8], message=[$11], tags=[$12], cloud=[$13], input=[$15], @timestamp=[$17], ecs=[$18], data_stream=[$20], meta=[$24], host=[$26], metrics=[$27], aws=[$30], event=[$35])
5+
LogicalSort(sort0=[$17], dir0=[DESC-nulls-last])
6+
LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], ecs=[$18], ecs.version=[$19], data_stream=[$20], data_stream.dataset=[$21], data_stream.namespace=[$22], data_stream.type=[$23], meta=[$24], meta.file=[$25], host=[$26], metrics=[$27], metrics.size=[$28], metrics.tmin=[$29], aws=[$30], aws.cloudwatch=[$31], aws.cloudwatch.ingestion_time=[$32], aws.cloudwatch.log_group=[$33], aws.cloudwatch.log_stream=[$34], event=[$35], event.dataset=[$36], event.id=[$37], event.ingested=[$38], _id=[$39], _index=[$40], _score=[$41], _maxscore=[$42], _sort=[$43], _routing=[$44])
7+
LogicalFilter(condition=[<=($45, 1)])
8+
LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], ecs=[$18], ecs.version=[$19], data_stream=[$20], data_stream.dataset=[$21], data_stream.namespace=[$22], data_stream.type=[$23], meta=[$24], meta.file=[$25], host=[$26], metrics=[$27], metrics.size=[$28], metrics.tmin=[$29], aws=[$30], aws.cloudwatch=[$31], aws.cloudwatch.ingestion_time=[$32], aws.cloudwatch.log_group=[$33], aws.cloudwatch.log_stream=[$34], event=[$35], event.dataset=[$36], event.id=[$37], event.ingested=[$38], _id=[$39], _index=[$40], _score=[$41], _maxscore=[$42], _sort=[$43], _routing=[$44], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $28)])
9+
LogicalFilter(condition=[IS NOT NULL($28)])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableSort(sort0=[$7], dir0=[DESC-nulls-last])
14+
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"metrics.size":{"terms":{"field":"metrics.size","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"_source":{"includes":["metrics.size","agent","agent.ephemeral_id","agent.id","agent.name","agent.type","agent.version","process","process.name","log","log.file","log.file.path","message","tags","cloud","cloud.region","input","input.type","@timestamp","ecs","ecs.version","data_stream","data_stream.dataset","data_stream.namespace","data_stream.type","meta","meta.file","host","metrics","metrics.tmin","aws","aws.cloudwatch","aws.cloudwatch.ingestion_time","aws.cloudwatch.log_group","aws.cloudwatch.log_stream","event","event.dataset","event.id","event.ingested","_id","_index","_score","_maxscore","_sort","_routing"],"excludes":[]},"script_fields":{}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/ObjectContent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public boolean isBoolean() {
141141

142142
@Override
143143
public boolean isArray() {
144-
return value instanceof ArrayNode;
144+
return value instanceof ArrayNode || value instanceof List;
145145
}
146146

147147
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,7 @@ public ExprValue construct(String jsonString, boolean supportArrays) {
182182
* @return ExprValue
183183
*/
184184
public ExprValue construct(String field, Object value, boolean supportArrays) {
185-
Object extractedValue = extractFinalPrimitiveValue(value);
186-
return parse(new ObjectContent(extractedValue), field, type(field), supportArrays);
185+
return parse(new ObjectContent(value), field, type(field), supportArrays);
187186
}
188187

189188
private ExprValue parse(
@@ -214,7 +213,9 @@ private ExprValue parse(
214213
|| type == STRUCT) {
215214
return parseStruct(content, field, supportArrays);
216215
} else if (typeActionMap.containsKey(type)) {
217-
return typeActionMap.get(type).apply(content, type);
216+
return content.isArray()
217+
? parseArray(content, field, type, supportArrays)
218+
: typeActionMap.get(type).apply(content, type);
218219
} else {
219220
throw new IllegalStateException(
220221
String.format(
@@ -581,26 +582,4 @@ private ExprValue parseInnerArrayValue(
581582
private String makeField(String path, String field) {
582583
return path.equalsIgnoreCase(TOP_PATH) ? field : String.join(".", path, field);
583584
}
584-
585-
/**
586-
* Recursively extracts the final primitive value from nested Map structures. For example:
587-
* {attributes={telemetry={sdk={language=java}}}} -> "java"
588-
*
589-
* @param value The value to extract from
590-
* @return The extracted primitive value, or the original value if extraction is not possible
591-
*/
592-
@SuppressWarnings("unchecked")
593-
private Object extractFinalPrimitiveValue(Object value) {
594-
if (value == null || !(value instanceof Map)) {
595-
return value;
596-
}
597-
598-
Map<String, Object> map = (Map<String, Object>) value;
599-
if (map.size() == 1) {
600-
Object singleValue = map.values().iterator().next();
601-
return extractFinalPrimitiveValue(singleValue);
602-
}
603-
604-
return value;
605-
}
606585
}

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ protected void apply(
9393
.filter(pair -> ((RexInputRef) pair.getKey()).getIndex() == i)
9494
.map(Pair::getValue)
9595
.findFirst()
96-
.get())
96+
.orElse(projectWithWindow.getInput().getRowType().getFieldNames().get(i)))
9797
.collect(Collectors.toList());
9898
if (dedupColumnIndices.size() != dedupColumnNames.size()) {
9999
return;

0 commit comments

Comments
 (0)