Skip to content

Commit a85c343

Browse files
ahkcsasifabashar
authored andcommitted
Support mvdedup eval function (opensearch-project#4828)
* Support eval function Signed-off-by: Kai Huang <ahkcs@amazon.com> * Updates Signed-off-by: Kai Huang <ahkcs@amazon.com> * update javadoc Signed-off-by: Kai Huang <ahkcs@amazon.com> * Update to use ARRAY_DISTINCT Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 18d506a commit a85c343

8 files changed

Lines changed: 206 additions & 0 deletions

File tree

core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public enum BuiltinFunctionName {
7575
MVAPPEND(FunctionName.of("mvappend")),
7676
MVJOIN(FunctionName.of("mvjoin")),
7777
MVINDEX(FunctionName.of("mvindex")),
78+
MVDEDUP(FunctionName.of("mvdedup")),
7879
FORALL(FunctionName.of("forall")),
7980
EXISTS(FunctionName.of("exists")),
8081
FILTER(FunctionName.of("filter")),

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLYFUNCTION;
151151
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTI_MATCH;
152152
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVAPPEND;
153+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVDEDUP;
153154
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVINDEX;
154155
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN;
155156
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
@@ -991,6 +992,7 @@ void populate() {
991992

992993
registerOperator(ARRAY, PPLBuiltinOperators.ARRAY);
993994
registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND);
995+
registerOperator(MVDEDUP, SqlLibraryOperators.ARRAY_DISTINCT);
994996
registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND);
995997
registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT);
996998
registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE);

docs/user/ppl/functions/collection.rst

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,44 @@ Example::
302302
| [1,text,2.5] |
303303
+--------------+
304304

305+
MVDEDUP
306+
-------
307+
308+
Description
309+
>>>>>>>>>>>
310+
311+
Usage: mvdedup(array) removes duplicate values from a multivalue array while preserving the order of first occurrence. NULL elements are filtered out. Returns an array with duplicates removed, or null if the input is null.
312+
313+
Argument type: array: ARRAY
314+
315+
Return type: ARRAY
316+
317+
Example::
318+
319+
os> source=people | eval array = array(1, 2, 2, 3, 1, 4), result = mvdedup(array) | fields result | head 1
320+
fetched rows / total rows = 1/1
321+
+-----------+
322+
| result |
323+
|-----------|
324+
| [1,2,3,4] |
325+
+-----------+
326+
327+
os> source=people | eval array = array('z', 'a', 'z', 'b', 'a', 'c'), result = mvdedup(array) | fields result | head 1
328+
fetched rows / total rows = 1/1
329+
+-----------+
330+
| result |
331+
|-----------|
332+
| [z,a,b,c] |
333+
+-----------+
334+
335+
os> source=people | eval array = array(), result = mvdedup(array) | fields result | head 1
336+
fetched rows / total rows = 1/1
337+
+--------+
338+
| result |
339+
|--------|
340+
| [] |
341+
+--------+
342+
305343
MVINDEX
306344
-------
307345

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,4 +489,82 @@ public void testMvindexRangeSingleElement() throws IOException {
489489
verifySchema(actual, schema("result", "array"));
490490
verifyDataRows(actual, rows(List.of(3)));
491491
}
492+
493+
@Test
494+
public void testMvdedupWithDuplicates() throws IOException {
495+
JSONObject actual =
496+
executeQuery(
497+
String.format(
498+
"source=%s | eval arr = array(1, 2, 2, 3, 1, 4), result = mvdedup(arr) | head 1 |"
499+
+ " fields result",
500+
TEST_INDEX_BANK));
501+
502+
verifySchema(actual, schema("result", "array"));
503+
verifyDataRows(actual, rows(List.of(1, 2, 3, 4)));
504+
}
505+
506+
@Test
507+
public void testMvdedupWithNoDuplicates() throws IOException {
508+
JSONObject actual =
509+
executeQuery(
510+
String.format(
511+
"source=%s | eval arr = array(1, 2, 3, 4), result = mvdedup(arr) | head 1 |"
512+
+ " fields result",
513+
TEST_INDEX_BANK));
514+
515+
verifySchema(actual, schema("result", "array"));
516+
verifyDataRows(actual, rows(List.of(1, 2, 3, 4)));
517+
}
518+
519+
@Test
520+
public void testMvdedupWithAllDuplicates() throws IOException {
521+
JSONObject actual =
522+
executeQuery(
523+
String.format(
524+
"source=%s | eval arr = array(5, 5, 5, 5), result = mvdedup(arr) | head 1 |"
525+
+ " fields result",
526+
TEST_INDEX_BANK));
527+
528+
verifySchema(actual, schema("result", "array"));
529+
verifyDataRows(actual, rows(List.of(5)));
530+
}
531+
532+
@Test
533+
public void testMvdedupWithEmptyArray() throws IOException {
534+
JSONObject actual =
535+
executeQuery(
536+
String.format(
537+
"source=%s | eval arr = array(), result = mvdedup(arr) | head 1 | fields result",
538+
TEST_INDEX_BANK));
539+
540+
verifySchema(actual, schema("result", "array"));
541+
verifyDataRows(actual, rows(List.of()));
542+
}
543+
544+
@Test
545+
public void testMvdedupWithStrings() throws IOException {
546+
JSONObject actual =
547+
executeQuery(
548+
String.format(
549+
"source=%s | eval arr = array('apple', 'banana', 'apple', 'cherry', 'banana'),"
550+
+ " result = mvdedup(arr) | head 1 | fields result",
551+
TEST_INDEX_BANK));
552+
553+
verifySchema(actual, schema("result", "array"));
554+
verifyDataRows(actual, rows(List.of("apple", "banana", "cherry")));
555+
}
556+
557+
@Test
558+
public void testMvdedupPreservesOrder() throws IOException {
559+
JSONObject actual =
560+
executeQuery(
561+
String.format(
562+
"source=%s | eval arr = array('z', 'a', 'z', 'b', 'a', 'c'), result ="
563+
+ " mvdedup(arr) | head 1 | fields result",
564+
TEST_INDEX_BANK));
565+
566+
verifySchema(actual, schema("result", "array"));
567+
// Should preserve first occurrence order: z, a, b, c
568+
verifyDataRows(actual, rows(List.of("z", "a", "b", "c")));
569+
}
492570
}

ppl/src/main/antlr/OpenSearchPPLLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ ARRAY_LENGTH: 'ARRAY_LENGTH';
443443
MVAPPEND: 'MVAPPEND';
444444
MVJOIN: 'MVJOIN';
445445
MVINDEX: 'MVINDEX';
446+
MVDEDUP: 'MVDEDUP';
446447
FORALL: 'FORALL';
447448
FILTER: 'FILTER';
448449
TRANSFORM: 'TRANSFORM';

ppl/src/main/antlr/OpenSearchPPLParser.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,7 @@ collectionFunctionName
10961096
| MVAPPEND
10971097
| MVJOIN
10981098
| MVINDEX
1099+
| MVDEDUP
10991100
| FORALL
11001101
| EXISTS
11011102
| FILTER

ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,4 +214,80 @@ public void testMvindexRangeNegative() {
214214
+ "LIMIT 1";
215215
verifyPPLToSparkSQL(root, expectedSparkSql);
216216
}
217+
218+
@Test
219+
public void testMvdedupWithDuplicates() {
220+
String ppl =
221+
"source=EMP | eval arr = array(1, 2, 2, 3, 1, 4), result = mvdedup(arr) | head 1 |"
222+
+ " fields result";
223+
RelNode root = getRelNode(ppl);
224+
225+
String expectedLogical =
226+
"LogicalProject(result=[$9])\n"
227+
+ " LogicalSort(fetch=[1])\n"
228+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
229+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array(1, 2, 2, 3, 1, 4)],"
230+
+ " result=[ARRAY_DISTINCT(array(1, 2, 2, 3, 1, 4))])\n"
231+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
232+
verifyLogical(root, expectedLogical);
233+
234+
String expectedResult = "result=[1, 2, 3, 4]\n";
235+
verifyResult(root, expectedResult);
236+
237+
String expectedSparkSql =
238+
"SELECT ARRAY_DISTINCT(ARRAY(1, 2, 2, 3, 1, 4)) `result`\n"
239+
+ "FROM `scott`.`EMP`\n"
240+
+ "LIMIT 1";
241+
verifyPPLToSparkSQL(root, expectedSparkSql);
242+
}
243+
244+
@Test
245+
public void testMvdedupWithNoDuplicates() {
246+
String ppl =
247+
"source=EMP | eval arr = array(1, 2, 3, 4), result = mvdedup(arr) | head 1 | fields"
248+
+ " result";
249+
RelNode root = getRelNode(ppl);
250+
251+
String expectedLogical =
252+
"LogicalProject(result=[$9])\n"
253+
+ " LogicalSort(fetch=[1])\n"
254+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
255+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array(1, 2, 3, 4)],"
256+
+ " result=[ARRAY_DISTINCT(array(1, 2, 3, 4))])\n"
257+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
258+
verifyLogical(root, expectedLogical);
259+
260+
String expectedResult = "result=[1, 2, 3, 4]\n";
261+
verifyResult(root, expectedResult);
262+
263+
String expectedSparkSql =
264+
"SELECT ARRAY_DISTINCT(ARRAY(1, 2, 3, 4)) `result`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1";
265+
verifyPPLToSparkSQL(root, expectedSparkSql);
266+
}
267+
268+
@Test
269+
public void testMvdedupPreservesOrder() {
270+
String ppl =
271+
"source=EMP | eval arr = array('z', 'a', 'z', 'b', 'a', 'c'), result = mvdedup(arr) |"
272+
+ " head 1 | fields result";
273+
RelNode root = getRelNode(ppl);
274+
275+
String expectedLogical =
276+
"LogicalProject(result=[$9])\n"
277+
+ " LogicalSort(fetch=[1])\n"
278+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
279+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array('z', 'a', 'z', 'b', 'a', 'c')],"
280+
+ " result=[ARRAY_DISTINCT(array('z', 'a', 'z', 'b', 'a', 'c'))])\n"
281+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
282+
verifyLogical(root, expectedLogical);
283+
284+
String expectedResult = "result=[z, a, b, c]\n";
285+
verifyResult(root, expectedResult);
286+
287+
String expectedSparkSql =
288+
"SELECT ARRAY_DISTINCT(ARRAY('z', 'a', 'z', 'b', 'a', 'c')) `result`\n"
289+
+ "FROM `scott`.`EMP`\n"
290+
+ "LIMIT 1";
291+
verifyPPLToSparkSQL(root, expectedSparkSql);
292+
}
217293
}

ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,15 @@ public void testMvindex() {
829829
anonymize("source=t | eval result=mvindex(array(1, 2, 3, 4, 5), 1, 3) | fields result"));
830830
}
831831

832+
@Test
833+
public void testMvdedup() {
834+
// Test mvdedup with array containing duplicates
835+
assertEquals(
836+
"source=table | eval identifier=mvdedup(array(***,***,***,***,***,***)) | fields +"
837+
+ " identifier",
838+
anonymize("source=t | eval result=mvdedup(array(1, 2, 2, 3, 1, 4)) | fields result"));
839+
}
840+
832841
@Test
833842
public void testRexWithOffsetField() {
834843
when(settings.getSettingValue(Key.PPL_REX_MAX_MATCH_LIMIT)).thenReturn(10);

0 commit comments

Comments
 (0)