Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ public enum BuiltinFunctionName {
/** Collection functions */
ARRAY(FunctionName.of("array")),
ARRAY_LENGTH(FunctionName.of("array_length")),
ARRAY_SLICE(FunctionName.of("array_slice"), true),
MAP_APPEND(FunctionName.of("map_append"), true),
MAP_CONCAT(FunctionName.of("map_concat"), true),
MAP_REMOVE(FunctionName.of("map_remove"), true),
MVAPPEND(FunctionName.of("mvappend")),
MVJOIN(FunctionName.of("mvjoin")),
MVINDEX(FunctionName.of("mvindex")),
FORALL(FunctionName.of("forall")),
EXISTS(FunctionName.of("exists")),
FILTER(FunctionName.of("filter")),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.function.CollectionUDF;

import static org.opensearch.sql.expression.function.BuiltinFunctionName.ADDFUNCTION;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ARRAY_LENGTH;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ARRAY_SLICE;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IF;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.INTERNAL_ITEM;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.LESS;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUBTRACT;

import java.math.BigDecimal;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.opensearch.sql.expression.function.PPLFuncImpTable;

/**
* MVINDEX function implementation that returns a subset of a multivalue array.
*
* <p>Usage:
*
* <ul>
* <li>mvindex(array, start) - returns single element at index (0-based)
* <li>mvindex(array, start, end) - returns array slice from start to end (inclusive, 0-based)
* </ul>
*
* <p>Supports negative indexing where -1 refers to the last element.
*
* <p>Implementation notes:
*
* <ul>
* <li>Single element access uses Calcite's ITEM operator (1-based indexing)
* <li>Range access uses Calcite's ARRAY_SLICE operator (0-based indexing with length parameter)
* <li>Index conversion handles the difference between PPL's 0-based indexing and Calcite's
* conventions
* </ul>
*/
public class MVIndexFunctionImp implements PPLFuncImpTable.FunctionImp {

@Override
public RexNode resolve(RexBuilder builder, RexNode... args) {
RexNode array = args[0];
RexNode startIdx = args[1];

// Use resolve to get array length instead of direct makeCall
RexNode arrayLen = PPLFuncImpTable.INSTANCE.resolve(builder, ARRAY_LENGTH, array);

if (args.length == 2) {
// Single element access using ITEM (1-based indexing)
return resolveSingleElement(builder, array, startIdx, arrayLen);
} else {
// Range access using ARRAY_SLICE (0-based indexing)
RexNode endIdx = args[2];
return resolveRange(builder, array, startIdx, endIdx, arrayLen);
}
}

/**
* Resolves single element access: mvindex(array, index)
*
* <p>Uses Calcite's ITEM operator which uses 1-based indexing. Converts PPL's 0-based index to
* 1-based by adding 1.
*/
private RexNode resolveSingleElement(
RexBuilder builder, RexNode array, RexNode startIdx, RexNode arrayLen) {
// Convert 0-based PPL index to 1-based Calcite ITEM index
RexNode zero = builder.makeExactLiteral(BigDecimal.ZERO);
RexNode one = builder.makeExactLiteral(BigDecimal.ONE);

RexNode isNegative = PPLFuncImpTable.INSTANCE.resolve(builder, LESS, startIdx, zero);
RexNode sumArrayLenStart =
PPLFuncImpTable.INSTANCE.resolve(builder, ADDFUNCTION, arrayLen, startIdx);
RexNode negativeCase =
PPLFuncImpTable.INSTANCE.resolve(builder, ADDFUNCTION, sumArrayLenStart, one);
RexNode positiveCase = PPLFuncImpTable.INSTANCE.resolve(builder, ADDFUNCTION, startIdx, one);

RexNode normalizedStart =
PPLFuncImpTable.INSTANCE.resolve(builder, IF, isNegative, negativeCase, positiveCase);

return PPLFuncImpTable.INSTANCE.resolve(builder, INTERNAL_ITEM, array, normalizedStart);
}

/**
* Resolves range access: mvindex(array, start, end)
*
* <p>Uses Calcite's ARRAY_SLICE operator which uses 0-based indexing and a length parameter.
* PPL's end index is inclusive, so length = (end - start) + 1.
*/
private RexNode resolveRange(
RexBuilder builder, RexNode array, RexNode startIdx, RexNode endIdx, RexNode arrayLen) {
// Normalize negative indices for ARRAY_SLICE (0-based)
RexNode zero = builder.makeExactLiteral(BigDecimal.ZERO);
RexNode one = builder.makeExactLiteral(BigDecimal.ONE);

RexNode isStartNegative = PPLFuncImpTable.INSTANCE.resolve(builder, LESS, startIdx, zero);
RexNode startNegativeCase =
PPLFuncImpTable.INSTANCE.resolve(builder, ADDFUNCTION, arrayLen, startIdx);
RexNode normalizedStart =
PPLFuncImpTable.INSTANCE.resolve(builder, IF, isStartNegative, startNegativeCase, startIdx);

RexNode isEndNegative = PPLFuncImpTable.INSTANCE.resolve(builder, LESS, endIdx, zero);
RexNode endNegativeCase =
PPLFuncImpTable.INSTANCE.resolve(builder, ADDFUNCTION, arrayLen, endIdx);
RexNode normalizedEnd =
PPLFuncImpTable.INSTANCE.resolve(builder, IF, isEndNegative, endNegativeCase, endIdx);

// Calculate length: (normalizedEnd - normalizedStart) + 1
RexNode diff =
PPLFuncImpTable.INSTANCE.resolve(builder, SUBTRACT, normalizedEnd, normalizedStart);
RexNode length = PPLFuncImpTable.INSTANCE.resolve(builder, ADDFUNCTION, diff, one);

// Call ARRAY_SLICE(array, normalizedStart, length)
return PPLFuncImpTable.INSTANCE.resolve(builder, ARRAY_SLICE, array, normalizedStart, length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.AND;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ARRAY;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ARRAY_LENGTH;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ARRAY_SLICE;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ASCII;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ASIN;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.ATAN;
Expand Down Expand Up @@ -149,6 +150,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLYFUNCTION;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTI_MATCH;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVAPPEND;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVINDEX;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOTEQUAL;
Expand Down Expand Up @@ -287,6 +289,7 @@
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.expression.function.CollectionUDF.MVIndexFunctionImp;

public class PPLFuncImpTable {
private static final Logger logger = LogManager.getLogger(PPLFuncImpTable.class);
Expand Down Expand Up @@ -981,12 +984,25 @@ void populate() {
builder.makeCall(SqlLibraryOperators.ARRAY_JOIN, array, delimiter),
PPLTypeChecker.family(SqlTypeFamily.ARRAY, SqlTypeFamily.CHARACTER));

// Register MVINDEX to use Calcite's ITEM/ARRAY_SLICE with index normalization
register(
MVINDEX,
new MVIndexFunctionImp(),
PPLTypeChecker.wrapComposite(
(CompositeOperandTypeChecker)
OperandTypes.family(SqlTypeFamily.ARRAY, SqlTypeFamily.INTEGER)
.or(
OperandTypes.family(
SqlTypeFamily.ARRAY, SqlTypeFamily.INTEGER, SqlTypeFamily.INTEGER)),
false));

registerOperator(ARRAY, PPLBuiltinOperators.ARRAY);
registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND);
registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND);
registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT);
registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE);
registerOperator(ARRAY_LENGTH, SqlLibraryOperators.ARRAY_LENGTH);
registerOperator(ARRAY_SLICE, SqlLibraryOperators.ARRAY_SLICE);
registerOperator(FORALL, PPLBuiltinOperators.FORALL);
registerOperator(EXISTS, PPLBuiltinOperators.EXISTS);
registerOperator(FILTER, PPLBuiltinOperators.FILTER);
Expand Down
55 changes: 55 additions & 0 deletions docs/user/ppl/functions/collection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,58 @@ Example::
|--------------|
| [1,text,2.5] |
+--------------+

MVINDEX
-------

Description
>>>>>>>>>>>

Usage: mvindex(array, start, [end]) returns a subset of the multivalue array using the start and optional end index values. Indexes are 0-based (first element is at index 0). Supports negative indexing where -1 refers to the last element. When only start is provided, returns a single element. When both start and end are provided, returns an array of elements from start to end (inclusive).

Argument type: array: ARRAY, start: INTEGER, end: INTEGER (optional)

Return type: ANY (single element) or ARRAY (range)

Example::

os> source=people | eval array = array('a', 'b', 'c', 'd', 'e'), result = mvindex(array, 1) | fields result | head 1
fetched rows / total rows = 1/1
+--------+
| result |
|--------|
| b |
+--------+

os> source=people | eval array = array('a', 'b', 'c', 'd', 'e'), result = mvindex(array, -1) | fields result | head 1
fetched rows / total rows = 1/1
+--------+
| result |
|--------|
| e |
+--------+

os> source=people | eval array = array(1, 2, 3, 4, 5), result = mvindex(array, 1, 3) | fields result | head 1
fetched rows / total rows = 1/1
+---------+
| result |
|---------|
| [2,3,4] |
+---------+

os> source=people | eval array = array(1, 2, 3, 4, 5), result = mvindex(array, -3, -1) | fields result | head 1
fetched rows / total rows = 1/1
+---------+
| result |
|---------|
| [3,4,5] |
+---------+

os> source=people | eval array = array('alex', 'celestino', 'claudia', 'david'), result = mvindex(array, 0, 2) | fields result | head 1
fetched rows / total rows = 1/1
+--------------------------+
| result |
|--------------------------|
| [alex,celestino,claudia] |
+--------------------------+

Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,123 @@ public void testMvjoinWithMultipleRealFields() throws IOException {
firstRow.getString(0) + " | " + firstRow.getString(1) + " | " + firstRow.getString(2),
firstRow.getString(3));
}

@Test
public void testMvindexSingleElementPositive() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array('a', 'b', 'c', 'd', 'e'), result = mvindex(arr, 1)"
+ " | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "string"));
verifyDataRows(actual, rows("b"));
}

@Test
public void testMvindexSingleElementNegative() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array('a', 'b', 'c', 'd', 'e'), result = mvindex(arr, -1)"
+ " | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "string"));
verifyDataRows(actual, rows("e"));
}

@Test
public void testMvindexSingleElementNegativeMiddle() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array('a', 'b', 'c', 'd', 'e'), result = mvindex(arr, -3)"
+ " | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "string"));
verifyDataRows(actual, rows("c"));
}

@Test
public void testMvindexRangePositive() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array(1, 2, 3, 4, 5), result = mvindex(arr, 1, 3) | head"
+ " 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of(2, 3, 4)));
}

@Test
public void testMvindexRangeNegative() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array(1, 2, 3, 4, 5), result = mvindex(arr, -3, -1) |"
+ " head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of(3, 4, 5)));
}

@Test
public void testMvindexRangeMixed() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array(1, 2, 3, 4, 5), result = mvindex(arr, -4, 2) | head"
+ " 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of(2, 3)));
}

@Test
public void testMvindexRangeFirstThree() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array('alex', 'celestino', 'claudia', 'david'), result ="
+ " mvindex(arr, 0, 2) | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of("alex", "celestino", "claudia")));
}

@Test
public void testMvindexRangeLastThree() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array('buttercup', 'dash', 'flutter', 'honey', 'ivory',"
+ " 'minty', 'pinky', 'rarity'), result = mvindex(arr, -3, -1) | head 1 |"
+ " fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of("minty", "pinky", "rarity")));
}

@Test
public void testMvindexRangeSingleElement() throws IOException {
// When start == end, should return single element in array
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array(1, 2, 3, 4, 5), result = mvindex(arr, 2, 2) | head"
+ " 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of(3)));
}
}
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ ARRAY: 'ARRAY';
ARRAY_LENGTH: 'ARRAY_LENGTH';
MVAPPEND: 'MVAPPEND';
MVJOIN: 'MVJOIN';
MVINDEX: 'MVINDEX';
FORALL: 'FORALL';
FILTER: 'FILTER';
TRANSFORM: 'TRANSFORM';
Expand Down
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,7 @@ collectionFunctionName
| ARRAY_LENGTH
| MVAPPEND
| MVJOIN
| MVINDEX
| FORALL
| EXISTS
| FILTER
Expand Down
Loading
Loading