Skip to content

Commit d1a62c4

Browse files
committed
Support spath with dynamic fields
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
1 parent 65baa2a commit d1a62c4

11 files changed

Lines changed: 321 additions & 85 deletions

File tree

common/src/main/java/org/opensearch/sql/common/utils/DebugUtils.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
import java.util.Collection;
99
import java.util.Map;
1010
import java.util.stream.Collectors;
11-
import org.apache.logging.log4j.LogManager;
12-
import org.apache.logging.log4j.Logger;
1311

1412
/**
1513
* Utility class for debugging operations. This class is only for debugging purpose, and not
@@ -18,7 +16,6 @@
1816
public class DebugUtils {
1917
// Update this to true while you are debugging. (Safe guard to avoid usage in production code. )
2018
private static final boolean IS_DEBUG = false;
21-
private static final Logger logger = LogManager.getLogger(DebugUtils.class);
2219

2320
public static <T> T debug(T obj, String message) {
2421
verifyDebug();
@@ -39,7 +36,7 @@ private static void verifyDebug() {
3936
}
4037

4138
private static void print(String format, Object... args) {
42-
logger.info(String.format(format, args));
39+
System.out.println(String.format(format, args));
4340
}
4441

4542
private static String getCalledFrom(int pos) {

core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionResult.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ public boolean hasWildcards() {
7878
return wildcard != NULL_WILDCARD;
7979
}
8080

81+
public boolean hasPartialWildcards() {
82+
return wildcard != NULL_WILDCARD && wildcard != ANY_WILDCARD;
83+
}
84+
8185
public boolean hasRegularFields() {
8286
return !regularFields.isEmpty();
8387
}

core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,10 @@ private void acceptAndVerifyNodeVisited(Node node, FieldResolutionContext contex
110110

111111
@Override
112112
public Node visitProject(Project node, FieldResolutionContext context) {
113-
boolean isSelectAll =
114-
node.getProjectList().stream().anyMatch(expr -> expr instanceof AllFields);
113+
boolean isSingleSelectAll =
114+
node.getProjectList().size() == 1 && node.getProjectList().getFirst() instanceof AllFields;
115115

116-
if (isSelectAll) {
116+
if (isSingleSelectAll) {
117117
visitChildren(node, context);
118118
} else {
119119
Set<String> projectFields = new HashSet<>();
@@ -181,10 +181,9 @@ public Node visitSpath(SPath node, FieldResolutionContext context) {
181181
// set requirements for spath command;
182182
context.setResult(node, context.getCurrentRequirements());
183183
FieldResolutionResult requirements = context.getCurrentRequirements();
184-
if (requirements.hasWildcards()) {
184+
if (requirements.hasPartialWildcards()) {
185185
throw new IllegalArgumentException(
186-
"Spath command cannot extract arbitrary fields. Please project fields explicitly by"
187-
+ " fields command without wildcard or stats command.");
186+
"Spath command cannot be used with partial wildcard such as `prefix*`.");
188187
}
189188

190189
context.pushRequirements(context.getCurrentRequirements().or(Set.of(node.getInField())));
@@ -237,6 +236,8 @@ private Set<String> extractFieldsFromExpression(UnresolvedExpression expr) {
237236

238237
if (expr instanceof Field field) {
239238
fields.add(field.getField().toString());
239+
} else if (expr instanceof AllFields) {
240+
fields.add("*");
240241
} else if (expr instanceof QualifiedName name) {
241242
fields.add(name.toString());
242243
} else if (expr instanceof Alias alias) {

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_DESC;
1515
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
1616
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
17+
import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP;
1718
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupNotNull;
1819
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupOrNull;
1920
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_MAIN;
@@ -422,7 +423,7 @@ private List<RexNode> expandProjectFields(
422423
if (WildcardUtils.containsWildcard(fieldName)) {
423424
List<String> matchingFields =
424425
WildcardUtils.expandWildcardPattern(fieldName, currentFields).stream()
425-
.filter(f -> !isMetadataField(f))
426+
.filter(f -> !isMetadataField(f) && !f.equals(DYNAMIC_FIELDS_MAP))
426427
.filter(addedFields::add)
427428
.toList();
428429
if (matchingFields.isEmpty()) {
@@ -714,12 +715,6 @@ private RelNode spathExtractAll(SPath node, CalcitePlanContext context) {
714715
visitChildren(node, context);
715716

716717
FieldResolutionResult resolutionResult = context.resolveFields(node);
717-
if (resolutionResult.hasWildcards()) {
718-
// Logic for handling wildcards (dynamic fields) will be implemented later.
719-
throw new IllegalArgumentException(
720-
"spath command failed to identify fields to extract. Use fields/stats command to specify"
721-
+ " output fields.");
722-
}
723718

724719
// 1. Extract all fields from JSON in `inField`
725720
RexNode inField = rexVisitor.analyze(AstDSL.field(node.getInField()), context);
@@ -748,10 +743,51 @@ private RelNode spathExtractAll(SPath node, CalcitePlanContext context) {
748743
fields.add(context.relBuilder.alias(item, fieldName));
749744
}
750745

746+
// 3. Add _MAP field for dynamic fields when wildcards present
747+
if (resolutionResult.hasWildcards()) {
748+
RexNode dynamicMapField =
749+
createDynamicMapField(map, resolutionResult.getRegularFields(), context);
750+
fields.add(context.relBuilder.alias(dynamicMapField, "_MAP"));
751+
}
752+
751753
context.relBuilder.project(fields);
752754
return context.relBuilder.peek();
753755
}
754756

757+
/**
758+
* Creates a dynamic map field containing all JSON attributes not mapped to static fields.
759+
*
760+
* @param fullMap The complete map from JSON_EXTRACT_ALL
761+
* @param regularFields Set of field names that are extracted as static columns
762+
* @param context CalcitePlanContext
763+
* @return RexNode representing MAP_REMOVE(fullMap, ARRAY[regularFields])
764+
*/
765+
private RexNode createDynamicMapField(
766+
RexNode fullMap, Set<String> regularFields, CalcitePlanContext context) {
767+
// Build array of keys to remove: ARRAY['field1', 'field2', ...]
768+
List<RexNode> keysToRemove =
769+
regularFields.stream()
770+
.sorted()
771+
.map(name -> context.rexBuilder.makeLiteral(name))
772+
.collect(Collectors.toList());
773+
774+
// Create ARRAY<VARCHAR> type
775+
RelDataType arrayType =
776+
context
777+
.rexBuilder
778+
.getTypeFactory()
779+
.createArrayType(
780+
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), -1);
781+
782+
// Build ARRAY value constructor
783+
RexNode keyArray =
784+
context.rexBuilder.makeCall(
785+
arrayType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, keysToRemove);
786+
787+
// MAP_REMOVE(fullMap, keyArray) → filtered map with only unmapped fields
788+
return makeCall(context, BuiltinFunctionName.MAP_REMOVE, fullMap, keyArray);
789+
}
790+
755791
private RexNode itemCall(RexNode node, String key, CalcitePlanContext context) {
756792
return makeCall(
757793
context, BuiltinFunctionName.INTERNAL_ITEM, node, context.rexBuilder.makeLiteral(key));
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.plan;
7+
8+
import lombok.experimental.UtilityClass;
9+
10+
@UtilityClass
11+
public class DynamicFieldsConstants {
12+
public static String DYNAMIC_FIELDS_MAP = "_MAP";
13+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.utils;
7+
8+
import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP;
9+
10+
import java.util.ArrayList;
11+
import java.util.HashSet;
12+
import java.util.LinkedHashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.Set;
16+
import java.util.stream.Collectors;
17+
import lombok.experimental.UtilityClass;
18+
import org.opensearch.sql.data.model.ExprNullValue;
19+
import org.opensearch.sql.data.model.ExprStringValue;
20+
import org.opensearch.sql.data.model.ExprTupleValue;
21+
import org.opensearch.sql.data.model.ExprValue;
22+
import org.opensearch.sql.data.model.ExprValueUtils;
23+
import org.opensearch.sql.data.type.ExprCoreType;
24+
import org.opensearch.sql.data.type.ExprType;
25+
import org.opensearch.sql.executor.ExecutionEngine.QueryResponse;
26+
import org.opensearch.sql.executor.ExecutionEngine.Schema;
27+
import org.opensearch.sql.executor.ExecutionEngine.Schema.Column;
28+
29+
/** Utility class for expanding dynamic fields in QueryResponse into result columns */
30+
@UtilityClass
31+
public class DynamicFieldsResultProcessor {
32+
33+
/**
34+
* Expand dynamic fields into individual columns in QueryResponse.
35+
*
36+
* @param response Original QueryResponse with _MAP column
37+
* @return New QueryResponse with expanded individual columns
38+
*/
39+
public static QueryResponse expandDynamicFields(QueryResponse response) {
40+
if (!hasDynamicFields(response)) {
41+
return response;
42+
}
43+
44+
Map<String, ExprType> dynamicFieldTypes = getDynamicFieldTypes(response.getResults());
45+
Schema expandedSchema = createExpandedSchema(response.getSchema(), dynamicFieldTypes);
46+
List<ExprValue> expandedRows = expandResultRows(response.getResults(), expandedSchema);
47+
48+
return new QueryResponse(expandedSchema, expandedRows, response.getCursor());
49+
}
50+
51+
private static boolean hasDynamicFields(QueryResponse response) {
52+
return response.getSchema().getColumns().stream()
53+
.anyMatch(column -> DYNAMIC_FIELDS_MAP.equals(column.getName()));
54+
}
55+
56+
private static Schema createExpandedSchema(
57+
Schema originalSchema, Map<String, ExprType> dynamicFieldTypes) {
58+
List<Column> expandedColumns =
59+
originalSchema.getColumns().stream()
60+
.filter(col -> !DYNAMIC_FIELDS_MAP.equals(col.getName()))
61+
.collect(Collectors.toList());
62+
Set<String> staticFields =
63+
expandedColumns.stream().map(col -> col.getName()).collect(Collectors.toSet());
64+
65+
List<String> sortedDynamicFields =
66+
dynamicFieldTypes.keySet().stream().sorted().collect(Collectors.toList());
67+
for (String dynamicFieldName : sortedDynamicFields) {
68+
ExprType fieldType = dynamicFieldTypes.get(dynamicFieldName);
69+
if (!staticFields.contains(dynamicFieldName)) {
70+
expandedColumns.add(new Column(dynamicFieldName, null, fieldType));
71+
}
72+
}
73+
74+
return new Schema(expandedColumns);
75+
}
76+
77+
/** Expands result rows by extracting MAP field values into individual columns. */
78+
private static List<ExprValue> expandResultRows(
79+
List<ExprValue> originalResults, Schema expandedSchema) {
80+
List<ExprValue> expandedResults = new ArrayList<>();
81+
82+
for (ExprValue row : originalResults) {
83+
if (row instanceof ExprTupleValue) {
84+
expandedResults.add(expandRow((ExprTupleValue) row, expandedSchema));
85+
} else {
86+
// Non-tuple rows are passed through unchanged
87+
expandedResults.add(row);
88+
}
89+
}
90+
91+
return expandedResults;
92+
}
93+
94+
private static ExprTupleValue expandRow(ExprTupleValue row, Schema expandedSchema) {
95+
Map<String, ExprValue> expandedRow = new LinkedHashMap<>();
96+
Map<String, ExprValue> originalRow = row.tupleValue();
97+
Map<String, ExprValue> dynamicFields = getDynamicFields(row);
98+
99+
for (Column column : expandedSchema.getColumns()) {
100+
String colName = column.getName();
101+
expandedRow.put(colName, getColValue(originalRow, dynamicFields, colName));
102+
}
103+
return ExprTupleValue.fromExprValueMap(expandedRow);
104+
}
105+
106+
private static ExprValue getColValue(
107+
Map<String, ExprValue> originalRow, Map<String, ExprValue> dynamicFields, String colName) {
108+
if (originalRow.containsKey(colName)) {
109+
return originalRow.get(colName);
110+
} else if (dynamicFields.containsKey(colName)) {
111+
// All the dynamic fields are converted to STRING for type consistency.
112+
return convertToStringValue(dynamicFields.get(colName));
113+
} else {
114+
return ExprValueUtils.nullValue();
115+
}
116+
}
117+
118+
private static ExprValue convertToStringValue(ExprValue v) {
119+
if (v instanceof ExprStringValue || v instanceof ExprNullValue) {
120+
return v;
121+
} else {
122+
return new ExprStringValue(String.valueOf(v));
123+
}
124+
}
125+
126+
private static Map<String, ExprType> getDynamicFieldTypes(List<ExprValue> results) {
127+
Set<String> fieldNames = collectDynamicFields(results);
128+
Map<String, ExprType> inferredTypes = new LinkedHashMap<>();
129+
for (String fieldName : fieldNames) {
130+
inferredTypes.put(fieldName, ExprCoreType.STRING);
131+
}
132+
return inferredTypes;
133+
}
134+
135+
private Set<String> collectDynamicFields(List<ExprValue> results) {
136+
Set<String> fieldNames = new HashSet<>();
137+
138+
for (ExprValue row : results) {
139+
if (row instanceof ExprTupleValue) {
140+
Map<String, ExprValue> dynamicFields = getDynamicFields(row);
141+
fieldNames.addAll(dynamicFields.keySet());
142+
}
143+
}
144+
return fieldNames;
145+
}
146+
147+
private static Map<String, ExprValue> getDynamicFields(ExprValue row) {
148+
ExprValue mapValue = row.tupleValue().get(DYNAMIC_FIELDS_MAP);
149+
if (mapValue == null || mapValue.isNull() || mapValue.isMissing()) {
150+
return Map.of();
151+
}
152+
153+
if (mapValue instanceof ExprTupleValue) {
154+
return mapValue.tupleValue();
155+
}
156+
157+
return Map.of();
158+
}
159+
}

core/src/test/java/org/opensearch/sql/ast/analysis/FieldResolutionResultTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,28 @@ void testFieldResolutionResultAndWithBothNullWildcards() {
343343
assertFalse(combined.hasWildcards());
344344
}
345345

346+
@Test
347+
void testWildcardAnyAndAny() {
348+
FieldResolutionResult result1 = new FieldResolutionResult(Set.of("a"), "*");
349+
FieldResolutionResult result2 = new FieldResolutionResult(Set.of("b"), "*");
350+
351+
FieldResolutionResult combined = result1.and(result2);
352+
353+
assertEquals(Set.of("a", "b"), combined.getRegularFields());
354+
assertEquals("*", combined.getWildcard().toString());
355+
}
356+
357+
@Test
358+
void testWildcardAnyAndNull() {
359+
FieldResolutionResult result1 = new FieldResolutionResult(Set.of("a"), "*");
360+
FieldResolutionResult result2 = new FieldResolutionResult(Set.of("b"));
361+
362+
FieldResolutionResult combined = result1.and(result2);
363+
364+
assertEquals(Set.of("b"), combined.getRegularFields());
365+
assertEquals("", combined.getWildcard().toString());
366+
}
367+
346368
@Test
347369
void testFieldResolutionResultOrOperation() {
348370
FieldResolutionResult result = new FieldResolutionResult(Set.of("field1"), "user*");

0 commit comments

Comments
 (0)