Skip to content

Commit c051220

Browse files
authored
Added UDT for IP and Binary support (#5463)
* Added UDT for IP and Binary support Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * refactored cidr and updated comments Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * spotless fix Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> --------- Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>
1 parent 0d80347 commit c051220

6 files changed

Lines changed: 177 additions & 49 deletions

File tree

core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.apache.calcite.sql.type.SqlTypeName;
4747
import org.apache.calcite.sql.type.SqlTypeUtil;
4848
import org.checkerframework.checker.nullness.qual.Nullable;
49+
import org.opensearch.analytics.schema.BinaryType;
50+
import org.opensearch.analytics.schema.IpType;
4951
import org.opensearch.sql.calcite.type.AbstractExprRelDataType;
5052
import org.opensearch.sql.calcite.type.ExprBinaryType;
5153
import org.opensearch.sql.calcite.type.ExprDateType;
@@ -275,6 +277,25 @@ public static ExprType convertRelDataTypeToExprType(RelDataType type) {
275277
return exprType;
276278
}
277279

280+
/**
281+
* Result-schema-only variant of {@link #convertRelDataTypeToExprType} that recognizes the
282+
* analytics-engine {@link IpType} / {@link BinaryType} markers as {@link ExprCoreType#IP} /
283+
* {@link ExprCoreType#BINARY}.
284+
*
285+
* <p>Kept off the general path because Calcite's planner-internal coercion would round-trip
286+
* through {@link #convertExprTypeToRelDataType} and synthesize {@code CAST(... AS ExprIPType)}
287+
* casts the substrait converter can't handle.
288+
*/
289+
public static ExprType convertAnalyticsEngineRelDataTypeToExprType(RelDataType type) {
290+
if (type instanceof IpType) {
291+
return IP;
292+
}
293+
if (type instanceof BinaryType) {
294+
return BINARY;
295+
}
296+
return convertRelDataTypeToExprType(type);
297+
}
298+
278299
public static ExprValue getExprValueByExprType(ExprType type, Object value) {
279300
switch (type) {
280301
case UNDEFINED:

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55

66
package org.opensearch.sql.executor.analytics;
77

8+
import java.net.InetAddress;
9+
import java.net.UnknownHostException;
810
import java.util.ArrayList;
11+
import java.util.Base64;
912
import java.util.LinkedHashMap;
1013
import java.util.List;
1114
import java.util.Map;
@@ -14,6 +17,9 @@
1417
import org.apache.calcite.rel.type.RelDataType;
1518
import org.apache.calcite.rel.type.RelDataTypeField;
1619
import org.opensearch.analytics.exec.QueryPlanExecutor;
20+
import org.opensearch.analytics.schema.BinaryType;
21+
import org.opensearch.analytics.schema.IpType;
22+
import org.opensearch.common.network.InetAddresses;
1723
import org.opensearch.core.action.ActionListener;
1824
import org.opensearch.sql.ast.statement.ExplainMode;
1925
import org.opensearch.sql.calcite.CalcitePlanContext;
@@ -123,15 +129,46 @@ private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeFie
123129
for (Object[] row : rows) {
124130
Map<String, ExprValue> valueMap = new LinkedHashMap<>();
125131
for (int i = 0; i < fields.size(); i++) {
126-
String columnName = fields.get(i).getName();
132+
RelDataTypeField field = fields.get(i);
127133
Object value = (i < row.length) ? row[i] : null;
128-
valueMap.put(columnName, ExprValueUtils.fromObjectValue(value));
134+
valueMap.put(field.getName(), toExprValue(value, field.getType()));
129135
}
130136
results.add(ExprTupleValue.fromExprValueMap(valueMap));
131137
}
132138
return results;
133139
}
134140

141+
/**
142+
* Converts a single result cell to an {@link ExprValue}, dispatching on the column's UDT when
143+
* present so {@code byte[]} payloads are rendered correctly:
144+
*
145+
* <ul>
146+
* <li>{@link IpType} + {@code byte[]} &rarr; canonical address string (matches {@code
147+
* IpFieldMapper}'s {@code valueFetcher} output).
148+
* <li>{@link BinaryType} + {@code byte[]} &rarr; base64-encoded string (matches the OpenSearch
149+
* {@code binary} field wire format).
150+
* <li>Anything else &rarr; existing {@link ExprValueUtils#fromObjectValue} path.
151+
* </ul>
152+
*
153+
* <p>Without this dispatch, {@code fromObjectValue} throws {@code unsupported object class [B} on
154+
* byte[] cells, and IP buffers leak through as raw 16-byte ipv4-mapped-ipv6 garbage.
155+
*/
156+
private static ExprValue toExprValue(Object value, RelDataType type) {
157+
if (value instanceof byte[] bytes) {
158+
if (type instanceof IpType) {
159+
try {
160+
return ExprValueUtils.stringValue(
161+
InetAddresses.toAddrString(InetAddress.getByAddress(bytes)));
162+
} catch (UnknownHostException e) {
163+
throw new IllegalStateException("invalid IP buffer length: " + bytes.length, e);
164+
}
165+
} else if (type instanceof BinaryType) {
166+
return ExprValueUtils.stringValue(Base64.getEncoder().encodeToString(bytes));
167+
}
168+
}
169+
return ExprValueUtils.fromObjectValue(value);
170+
}
171+
135172
private Schema buildSchema(List<RelDataTypeField> fields) {
136173
List<Schema.Column> columns = new ArrayList<>();
137174
for (RelDataTypeField field : fields) {
@@ -143,7 +180,7 @@ private Schema buildSchema(List<RelDataTypeField> fields) {
143180

144181
private ExprType convertType(RelDataType type) {
145182
try {
146-
return OpenSearchTypeFactory.convertRelDataTypeToExprType(type);
183+
return OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(type);
147184
} catch (IllegalArgumentException e) {
148185
return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN;
149186
}

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@
270270
import static org.opensearch.sql.expression.function.BuiltinFunctionName.YEARWEEK;
271271

272272
import com.google.common.collect.ImmutableMap;
273-
import inet.ipaddr.IPAddress;
274273
import java.math.BigDecimal;
275274
import java.util.ArrayList;
276275
import java.util.Arrays;
@@ -285,7 +284,6 @@
285284
import java.util.stream.Collectors;
286285
import java.util.stream.Stream;
287286
import javax.annotation.Nullable;
288-
import org.apache.calcite.avatica.util.ByteString;
289287
import org.apache.calcite.rel.type.RelDataType;
290288
import org.apache.calcite.rex.RexBuilder;
291289
import org.apache.calcite.rex.RexLambda;
@@ -315,10 +313,8 @@
315313
import org.opensearch.sql.calcite.utils.PlanUtils;
316314
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
317315
import org.opensearch.sql.exception.ExpressionEvaluationException;
318-
import org.opensearch.sql.exception.SemanticCheckException;
319316
import org.opensearch.sql.executor.QueryType;
320317
import org.opensearch.sql.expression.function.CollectionUDF.MVIndexFunctionImp;
321-
import org.opensearch.sql.utils.IPUtils;
322318

323319
public class PPLFuncImpTable {
324320
private static final Logger logger = LogManager.getLogger(PPLFuncImpTable.class);
@@ -914,29 +910,6 @@ void populate() {
914910
registerDivideFunction(DIVIDEFUNCTION);
915911
registerOperator(SHA2, PPLBuiltinOperators.SHA2);
916912
registerOperator(CIDRMATCH, PPLBuiltinOperators.CIDRMATCH);
917-
// (VARBINARY, VARCHAR) overload for ip / binary columns. The lambda parses the cidr
918-
// literal at plan time and emits AND(col >= low, col <= high) directly.
919-
// Only literal cidrs are expanded.
920-
register(
921-
CIDRMATCH,
922-
(FunctionImp2)
923-
(builder, col, cidr) -> {
924-
if (cidr instanceof RexLiteral lit
925-
&& col.getType().getSqlTypeName() == SqlTypeName.VARBINARY) {
926-
byte[][] range = parseCidrToIpv6Range(lit.getValueAs(String.class));
927-
RelDataType varbinary =
928-
builder.getTypeFactory().createSqlType(SqlTypeName.VARBINARY);
929-
RexNode low = builder.makeLiteral(new ByteString(range[0]), varbinary, false);
930-
RexNode high = builder.makeLiteral(new ByteString(range[1]), varbinary, false);
931-
// makeCall(AND, ...) auto-flattens at construction, so no Filter.isFlat issue.
932-
return builder.makeCall(
933-
SqlStdOperatorTable.AND,
934-
builder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, col, low),
935-
builder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, col, high));
936-
}
937-
return builder.makeCall(PPLBuiltinOperators.CIDRMATCH, col, cidr);
938-
},
939-
PPLTypeChecker.family(SqlTypeFamily.BINARY, SqlTypeFamily.STRING));
940913
registerOperator(INTERNAL_GROK, PPLBuiltinOperators.GROK);
941914
registerOperator(INTERNAL_PARSE, PPLBuiltinOperators.PARSE);
942915
registerOperator(MATCH, PPLBuiltinOperators.MATCH);
@@ -1627,22 +1600,4 @@ private static SqlOperandTypeChecker extractTypeCheckerFromUDF(SqlOperator opera
16271600
}
16281601
return typeChecker;
16291602
}
1630-
1631-
/**
1632-
* Parses a CIDR string and returns its lower and upper bounds in canonical 16-byte IPv6-mapped
1633-
* form. Used by the (BINARY, STRING) {@code cidrmatch} overload to expand into a byte-range
1634-
* conjunction at plan time.
1635-
*
1636-
* <p>Delegates to {@link IPUtils#toRange(String)} for parsing; converts both bounds to IPv6 to
1637-
* guarantee 16-byte output regardless of whether the input cidr is IPv4 or IPv6.
1638-
*/
1639-
private static byte[][] parseCidrToIpv6Range(String cidr) {
1640-
if (cidr == null) {
1641-
throw new SemanticCheckException("cidrmatch range argument is null");
1642-
}
1643-
IPAddress range = IPUtils.toRange(cidr);
1644-
byte[] low = range.getLower().toIPv6().getBytes();
1645-
byte[] high = range.getUpper().toIPv6().getBytes();
1646-
return new byte[][] {low, high};
1647-
}
16481603
}

core/src/main/java/org/opensearch/sql/expression/function/udf/ip/CidrMatchFunction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
* <ul>
3131
* <li>(STRING, STRING) -> BOOLEAN
3232
* <li>(IP, STRING) -> BOOLEAN
33+
* <li>(BINARY, STRING) -> BOOLEAN — accepts VARBINARY-backed ip columns in the analytics-engine
34+
* schema; the backend's CidrMatchFunctionAdapter rewrites these before they reach DataFusion.
3335
* </ul>
3436
*/
3537
public class CidrMatchFunction extends ImplementorUDF {
@@ -50,7 +52,8 @@ public UDFOperandMetadata getOperandMetadata() {
5052
return UDFOperandMetadata.wrapUDT(
5153
List.of(
5254
List.of(ExprCoreType.IP, ExprCoreType.STRING),
53-
List.of(ExprCoreType.STRING, ExprCoreType.STRING)));
55+
List.of(ExprCoreType.STRING, ExprCoreType.STRING),
56+
List.of(ExprCoreType.BINARY, ExprCoreType.STRING)));
5457
}
5558

5659
public static class CidrMatchImplementor implements NotNullImplementor {

core/src/test/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactoryTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
import org.apache.calcite.rel.type.RelDataType;
1717
import org.apache.calcite.sql.type.SqlTypeName;
1818
import org.junit.jupiter.api.Test;
19+
import org.opensearch.analytics.schema.BinaryType;
20+
import org.opensearch.analytics.schema.IpType;
1921
import org.opensearch.sql.calcite.type.AbstractExprRelDataType;
2022
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT;
2123
import org.opensearch.sql.data.type.ExprCoreType;
24+
import org.opensearch.sql.data.type.ExprType;
2225

2326
public class OpenSearchTypeFactoryTest {
2427

@@ -282,4 +285,49 @@ public void testConvertExprTypeBinaryToNullableVarbinary() {
282285
assertEquals(SqlTypeName.VARBINARY, result.getSqlTypeName());
283286
assertTrue(result.isNullable());
284287
}
288+
289+
// ---------- convertAnalyticsEngineRelDataTypeToExprType ----------
290+
// UDT-aware variant for the response-schema path. Must agree with the
291+
// planner-internal convertRelDataTypeToExprType on every non-UDT input.
292+
293+
@Test
294+
public void testConvertAnalyticsEngineIpTypeReturnsIpExprType() {
295+
ExprType result =
296+
OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(new IpType(true));
297+
assertEquals(ExprCoreType.IP, result);
298+
}
299+
300+
@Test
301+
public void testConvertAnalyticsEngineBinaryTypeReturnsBinaryExprType() {
302+
ExprType result =
303+
OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(new BinaryType(true));
304+
assertEquals(ExprCoreType.BINARY, result);
305+
}
306+
307+
@Test
308+
public void testConvertAnalyticsEnginePlainVarbinaryFallsBackToBinary() {
309+
// Plain VARBINARY (no UDT) must still resolve to BINARY via the delegated path.
310+
RelDataType varbinary = TYPE_FACTORY.createSqlType(SqlTypeName.VARBINARY);
311+
ExprType result = OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(varbinary);
312+
assertEquals(ExprCoreType.BINARY, result);
313+
}
314+
315+
@Test
316+
public void testConvertAnalyticsEngineDelegatesParityForNonUdtTypes() {
317+
// Parity check: drift would mean response-schema labels diverge from Calcite's view.
318+
RelDataType[] samples =
319+
new RelDataType[] {
320+
TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT),
321+
TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR),
322+
TYPE_FACTORY.createSqlType(SqlTypeName.BOOLEAN),
323+
TYPE_FACTORY.createSqlType(SqlTypeName.DOUBLE),
324+
TYPE_FACTORY.createSqlType(SqlTypeName.TIMESTAMP),
325+
};
326+
for (RelDataType t : samples) {
327+
assertEquals(
328+
OpenSearchTypeFactory.convertRelDataTypeToExprType(t),
329+
OpenSearchTypeFactory.convertAnalyticsEngineRelDataTypeToExprType(t),
330+
"Analytics-engine variant must agree with the general variant for " + t.getSqlTypeName());
331+
}
332+
}
285333
}

core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.junit.jupiter.api.BeforeEach;
2929
import org.junit.jupiter.api.Test;
3030
import org.opensearch.analytics.exec.QueryPlanExecutor;
31+
import org.opensearch.analytics.schema.BinaryType;
32+
import org.opensearch.analytics.schema.IpType;
3133
import org.opensearch.core.action.ActionListener;
3234
import org.opensearch.sql.calcite.CalcitePlanContext;
3335
import org.opensearch.sql.calcite.SysLimit;
@@ -180,6 +182,58 @@ void executeRelNode_temporalTypes() {
180182
// Query size limit is now enforced in the RelNode plan (LogicalSystemLimit) before it reaches
181183
// AnalyticsExecutionEngine. The engine trusts the executor to honor the limit.
182184

185+
/** Raw 16-byte ipv6-mapped buffer + IpType → canonical IP string + schema reports "ip". */
186+
@Test
187+
void executeRelNode_ipColumnRendersAsAddressString() {
188+
RelNode relNode = mockRelNodeWithType("host", new IpType(true));
189+
// 1.2.3.4 in ipv4-mapped-ipv6 form: 10 zero bytes + ff ff + 4 IPv4 bytes.
190+
byte[] ipv4 = new byte[16];
191+
ipv4[10] = (byte) 0xff;
192+
ipv4[11] = (byte) 0xff;
193+
ipv4[12] = 1;
194+
ipv4[13] = 2;
195+
ipv4[14] = 3;
196+
ipv4[15] = 4;
197+
// ::1 in pure ipv6 form.
198+
byte[] ipv6 = new byte[16];
199+
ipv6[15] = 1;
200+
Iterable<Object[]> rows = Arrays.asList(new Object[] {ipv4}, new Object[] {ipv6});
201+
stubExecutorWith(relNode, rows);
202+
203+
QueryResponse response = executeAndCapture(relNode);
204+
String dump = dumpResponse(response);
205+
206+
// Schema: column reports "ip", not "binary".
207+
assertEquals(ExprCoreType.IP, response.getSchema().getColumns().get(0).getExprType(), dump);
208+
// Cells: byte[] → formatted address string.
209+
assertEquals(
210+
"1.2.3.4",
211+
response.getResults().get(0).tupleValue().get("host").value(),
212+
"ipv4-mapped IPv6 buffer should render as dotted quad. " + dump);
213+
assertEquals(
214+
"::1",
215+
response.getResults().get(1).tupleValue().get("host").value(),
216+
"pure IPv6 buffer should render as RFC 5952 compressed form. " + dump);
217+
}
218+
219+
/** Raw byte buffer + BinaryType → base64 string + schema reports "binary". */
220+
@Test
221+
void executeRelNode_binaryColumnRendersAsBase64() {
222+
RelNode relNode = mockRelNodeWithType("blob", new BinaryType(true));
223+
Iterable<Object[]> rows =
224+
Collections.singletonList(new Object[] {"Some binary blob".getBytes()});
225+
stubExecutorWith(relNode, rows);
226+
227+
QueryResponse response = executeAndCapture(relNode);
228+
String dump = dumpResponse(response);
229+
230+
assertEquals(ExprCoreType.BINARY, response.getSchema().getColumns().get(0).getExprType(), dump);
231+
assertEquals(
232+
"U29tZSBiaW5hcnkgYmxvYg==",
233+
response.getResults().get(0).tupleValue().get("blob").value(),
234+
"byte[] should base64-encode to match OpenSearch binary wire format. " + dump);
235+
}
236+
183237
@Test
184238
void executeRelNode_emptyResults() {
185239
RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR);
@@ -380,6 +434,16 @@ private RelNode mockRelNode(Object... nameTypePairs) {
380434
return relNode;
381435
}
382436

437+
/** Variant of {@link #mockRelNode} that accepts a pre-built RelDataType (e.g. UDTs). */
438+
private RelNode mockRelNodeWithType(String name, RelDataType type) {
439+
SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
440+
RelDataType rowType = typeFactory.builder().add(name, type).build();
441+
442+
RelNode relNode = mock(RelNode.class);
443+
when(relNode.getRowType()).thenReturn(rowType);
444+
return relNode;
445+
}
446+
383447
private ResponseListener<QueryResponse> captureListener(AtomicReference<QueryResponse> ref) {
384448
return new ResponseListener<QueryResponse>() {
385449
@Override

0 commit comments

Comments
 (0)