Skip to content

Commit cb51d8e

Browse files
authored
mvjoin support in PPL Caclite (opensearch-project#4217)
* mvjoin support in PPL Caclite Signed-off-by: ps48 <pshenoy36@gmail.com> * fix texts Signed-off-by: ps48 <pshenoy36@gmail.com> * update docs Signed-off-by: ps48 <pshenoy36@gmail.com> * update doc examples Signed-off-by: ps48 <pshenoy36@gmail.com> * rebase main, update test Signed-off-by: ps48 <pshenoy36@gmail.com> * update test with real array fields Signed-off-by: ps48 <pshenoy36@gmail.com> * use verifyQueryThrowsException in CalcitePPLFunctionTypeTest Signed-off-by: ps48 <pshenoy36@gmail.com> * spotless check fix Signed-off-by: ps48 <pshenoy36@gmail.com> * remove string,string registration for mvjoin Signed-off-by: ps48 <pshenoy36@gmail.com> * remove string,string test Signed-off-by: ps48 <pshenoy36@gmail.com> --------- Signed-off-by: ps48 <pshenoy36@gmail.com>
1 parent bd40af5 commit cb51d8e

12 files changed

Lines changed: 325 additions & 1 deletion

File tree

core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public enum BuiltinFunctionName {
6262
/** Collection functions */
6363
ARRAY(FunctionName.of("array")),
6464
ARRAY_LENGTH(FunctionName.of("array_length")),
65+
MVJOIN(FunctionName.of("mvjoin")),
6566
FORALL(FunctionName.of("forall")),
6667
EXISTS(FunctionName.of("exists")),
6768
FILTER(FunctionName.of("filter")),

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@
144144
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY;
145145
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLYFUNCTION;
146146
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTI_MATCH;
147+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN;
147148
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
148149
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOTEQUAL;
149150
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOW;
@@ -816,6 +817,15 @@ void populate() {
816817
registerOperator(WEEKOFYEAR, PPLBuiltinOperators.WEEK);
817818

818819
registerOperator(INTERNAL_PATTERN_PARSER, PPLBuiltinOperators.PATTERN_PARSER);
820+
821+
// Register MVJOIN to use Calcite's ARRAY_JOIN
822+
register(
823+
MVJOIN,
824+
(FunctionImp2)
825+
(builder, array, delimiter) ->
826+
builder.makeCall(SqlLibraryOperators.ARRAY_JOIN, array, delimiter),
827+
PPLTypeChecker.family(SqlTypeFamily.ARRAY, SqlTypeFamily.CHARACTER));
828+
819829
registerOperator(ARRAY, PPLBuiltinOperators.ARRAY);
820830
registerOperator(ARRAY_LENGTH, SqlLibraryOperators.ARRAY_LENGTH);
821831
registerOperator(FORALL, PPLBuiltinOperators.FORALL);

docs/user/ppl/functions/collection.rst

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,37 @@ Example::
198198
| result |
199199
|------------|
200200
| 80 |
201-
+------------+
201+
+------------+
202+
203+
MVJOIN
204+
------
205+
206+
Description
207+
>>>>>>>>>>>
208+
209+
Version: 3.3.0
210+
211+
Usage: mvjoin(array, delimiter) joins string array elements into a single string, separated by the specified delimiter. NULL elements are excluded from the output. Only string arrays are supported.
212+
213+
Argument type: array: ARRAY of STRING, delimiter: STRING
214+
215+
Return type: STRING
216+
217+
Example::
218+
219+
PPL> source=people | eval result = mvjoin(array('a', 'b', 'c'), ',') | fields result | head 1
220+
fetched rows / total rows = 1/1
221+
+------------------------------------+
222+
| result |
223+
|------------------------------------|
224+
| "a,b,c" |
225+
+------------------------------------+
226+
227+
PPL> source=accounts | eval names_array = array(firstname, lastname) | eval result = mvjoin(names_array, ', ') | fields result | head 1
228+
fetched rows / total rows = 1/1
229+
+------------------------------------------+
230+
| result |
231+
|------------------------------------------|
232+
| "Amber, Duke" |
233+
+------------------------------------------+
234+

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import java.io.IOException;
1212
import java.util.List;
13+
import org.json.JSONArray;
1314
import org.json.JSONObject;
1415
import org.junit.jupiter.api.Test;
1516
import org.opensearch.client.ResponseException;
@@ -21,6 +22,7 @@ public void init() throws Exception {
2122
super.init();
2223
enableCalcite();
2324
loadIndex(Index.BANK);
25+
loadIndex(Index.ARRAY);
2426
}
2527

2628
@Test
@@ -241,4 +243,131 @@ public void testReduceWithUDF() throws IOException {
241243

242244
verifyDataRows(actual, rows(60));
243245
}
246+
247+
@Test
248+
public void testMvjoinWithStringArray() throws IOException {
249+
JSONObject actual =
250+
executeQuery(
251+
String.format(
252+
"source=%s | eval result = mvjoin(array('a', 'b', 'c'), ',') | fields result | head"
253+
+ " 1",
254+
TEST_INDEX_BANK));
255+
256+
verifySchema(actual, schema("result", "string"));
257+
verifyDataRows(actual, rows("a,b,c"));
258+
}
259+
260+
@Test
261+
public void testMvjoinWithStringifiedNumbers() throws IOException {
262+
// Note: mvjoin only supports string arrays
263+
JSONObject actual =
264+
executeQuery(
265+
String.format(
266+
"source=%s | eval result = mvjoin(array('1', '2', '3'), ' | ') | fields result |"
267+
+ " head 1",
268+
TEST_INDEX_BANK));
269+
270+
verifySchema(actual, schema("result", "string"));
271+
verifyDataRows(actual, rows("1 | 2 | 3"));
272+
}
273+
274+
@Test
275+
public void testMvjoinWithMixedStringValues() throws IOException {
276+
// mvjoin only supports string arrays
277+
JSONObject actual =
278+
executeQuery(
279+
String.format(
280+
"source=%s | eval result = mvjoin(array('1', 'text', '2.5'), ';') | fields result |"
281+
+ " head 1",
282+
TEST_INDEX_BANK));
283+
284+
verifySchema(actual, schema("result", "string"));
285+
verifyDataRows(actual, rows("1;text;2.5"));
286+
}
287+
288+
@Test
289+
public void testMvjoinWithEmptyArray() throws IOException {
290+
JSONObject actual =
291+
executeQuery(
292+
String.format(
293+
"source=%s | eval result = mvjoin(array(), '-') | fields result | head 1",
294+
TEST_INDEX_BANK));
295+
296+
verifySchema(actual, schema("result", "string"));
297+
verifyDataRows(actual, rows(""));
298+
}
299+
300+
@Test
301+
public void testMvjoinWithStringBooleans() throws IOException {
302+
// mvjoin only supports string arrays
303+
JSONObject actual =
304+
executeQuery(
305+
String.format(
306+
"source=%s | eval result = mvjoin(array('true', 'false', 'true'), '|') | fields"
307+
+ " result | head 1",
308+
TEST_INDEX_BANK));
309+
310+
verifySchema(actual, schema("result", "string"));
311+
verifyDataRows(actual, rows("true|false|true"));
312+
}
313+
314+
@Test
315+
public void testMvjoinWithSpecialDelimiters() throws IOException {
316+
JSONObject actual =
317+
executeQuery(
318+
String.format(
319+
"source=%s | eval result = mvjoin(array('apple', 'banana', 'cherry'), ' AND ') |"
320+
+ " fields result | head 1",
321+
TEST_INDEX_BANK));
322+
323+
verifySchema(actual, schema("result", "string"));
324+
verifyDataRows(actual, rows("apple AND banana AND cherry"));
325+
}
326+
327+
@Test
328+
public void testMvjoinWithArrayFromRealFields() throws IOException {
329+
// Test mvjoin on arrays created from real fields using array() function
330+
JSONObject actual =
331+
executeQuery(
332+
String.format(
333+
"source=%s | eval names_array = array(firstname, lastname) | eval result ="
334+
+ " mvjoin(names_array, ',') | fields firstname, lastname, result | head 1",
335+
TEST_INDEX_BANK));
336+
337+
verifySchema(
338+
actual,
339+
schema("firstname", "string"),
340+
schema("lastname", "string"),
341+
schema("result", "string"));
342+
// Verify that mvjoin correctly joins the firstname and lastname fields
343+
JSONArray dataRows = actual.getJSONArray("datarows");
344+
assertTrue(dataRows.length() > 0);
345+
JSONArray firstRow = dataRows.getJSONArray(0);
346+
assertEquals(firstRow.getString(0) + "," + firstRow.getString(1), firstRow.getString(2));
347+
}
348+
349+
@Test
350+
public void testMvjoinWithMultipleRealFields() throws IOException {
351+
// Test mvjoin with arrays created from multiple real fields
352+
JSONObject actual =
353+
executeQuery(
354+
String.format(
355+
"source=%s | eval info_array = array(city, state, employer) | eval result ="
356+
+ " mvjoin(info_array, ' | ') | fields city, state, employer, result | head 1",
357+
TEST_INDEX_BANK));
358+
359+
verifySchema(
360+
actual,
361+
schema("city", "string"),
362+
schema("state", "string"),
363+
schema("employer", "string"),
364+
schema("result", "string"));
365+
// Verify that mvjoin correctly joins the city, state, and employer fields
366+
JSONArray dataRows = actual.getJSONArray("datarows");
367+
assertTrue(dataRows.length() > 0);
368+
JSONArray firstRow = dataRows.getJSONArray(0);
369+
assertEquals(
370+
firstRow.getString(0) + " | " + firstRow.getString(1) + " | " + firstRow.getString(2),
371+
firstRow.getString(3));
372+
}
244373
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8+
import static org.junit.Assert.assertTrue;
9+
import static org.opensearch.sql.legacy.TestUtils.*;
810
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
911
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
1012
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;
@@ -558,6 +560,16 @@ public void testExplainAppendCommand() throws IOException {
558560
TEST_INDEX_BANK)));
559561
}
560562

563+
@Test
564+
public void testMvjoinExplain() throws IOException {
565+
String query =
566+
"source=opensearch-sql_test_index_account | eval result = mvjoin(array('a', 'b', 'c'), ',')"
567+
+ " | fields result | head 1";
568+
var result = explainQueryToString(query);
569+
String expected = loadExpectedPlan("explain_mvjoin.json");
570+
assertJsonEqualsIgnoreId(expected, result);
571+
}
572+
561573
@Test
562574
public void testPushdownLimitIntoAggregation() throws IOException {
563575
enabledOnlyWhenPushdownIsEnabled();
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(fetch=[1])\n LogicalProject(result=[ARRAY_JOIN(array('a', 'b', 'c'), ',')])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableCalc(expr#0..16=[{inputs}], expr#17=['a'], expr#18=['b'], expr#19=['c'], expr#20=[array($t17, $t18, $t19)], expr#21=[','], expr#22=[ARRAY_JOIN($t20, $t21)], result=[$t22])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->1, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":1,\"timeout\":\"1m\"}, requestedTotalSize=1, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(fetch=[1])\n LogicalProject(result=[ARRAY_JOIN(array('a', 'b', 'c'), ',')])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..16=[{inputs}], expr#17=['a'], expr#18=['b'], expr#19=['c'], expr#20=[array($t17, $t18, $t19)], expr#21=[','], expr#22=[ARRAY_JOIN($t20, $t21)], result=[$t22])\n EnumerableLimit(fetch=[1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
5+
}
6+
}

ppl/src/main/antlr/OpenSearchPPLLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ ISBLANK: 'ISBLANK';
421421
// COLLECTION FUNCTIONS
422422
ARRAY: 'ARRAY';
423423
ARRAY_LENGTH: 'ARRAY_LENGTH';
424+
MVJOIN: 'MVJOIN';
424425
FORALL: 'FORALL';
425426
FILTER: 'FILTER';
426427
TRANSFORM: 'TRANSFORM';

ppl/src/main/antlr/OpenSearchPPLParser.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -975,6 +975,7 @@ geoipFunctionName
975975
collectionFunctionName
976976
: ARRAY
977977
| ARRAY_LENGTH
978+
| MVJOIN
978979
| FORALL
979980
| EXISTS
980981
| FILTER
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ppl.calcite;
7+
8+
import org.apache.calcite.rel.RelNode;
9+
import org.apache.calcite.test.CalciteAssert;
10+
import org.junit.Test;
11+
12+
public class CalcitePPLArrayFunctionTest extends CalcitePPLAbstractTest {
13+
14+
public CalcitePPLArrayFunctionTest() {
15+
super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL);
16+
}
17+
18+
@Test
19+
public void testMvjoinWithStringArray() {
20+
String ppl =
21+
"source=EMP | eval joined = mvjoin(array('a', 'b', 'c'), ',') | head 1 | fields joined";
22+
RelNode root = getRelNode(ppl);
23+
24+
String expectedLogical =
25+
"LogicalProject(joined=[$8])\n"
26+
+ " LogicalSort(fetch=[1])\n"
27+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
28+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array('a', 'b', 'c'), ',')])\n"
29+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
30+
verifyLogical(root, expectedLogical);
31+
32+
String expectedResult = "joined=a,b,c\n";
33+
verifyResult(root, expectedResult);
34+
35+
String expectedSparkSql =
36+
"SELECT ARRAY_JOIN(`array`('a', 'b', 'c'), ',') `joined`\n"
37+
+ "FROM `scott`.`EMP`\n"
38+
+ "LIMIT 1";
39+
verifyPPLToSparkSQL(root, expectedSparkSql);
40+
}
41+
42+
@Test
43+
public void testMvjoinWithDifferentDelimiter() {
44+
String ppl =
45+
"source=EMP | eval joined = mvjoin(array('apple', 'banana', 'cherry'), ' | ') | head 1 |"
46+
+ " fields joined";
47+
RelNode root = getRelNode(ppl);
48+
49+
String expectedLogical =
50+
"LogicalProject(joined=[$8])\n"
51+
+ " LogicalSort(fetch=[1])\n"
52+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
53+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array('apple':VARCHAR,"
54+
+ " 'banana':VARCHAR, 'cherry':VARCHAR), ' | ':VARCHAR)])\n"
55+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
56+
verifyLogical(root, expectedLogical);
57+
58+
String expectedResult = "joined=apple | banana | cherry\n";
59+
verifyResult(root, expectedResult);
60+
61+
String expectedSparkSql =
62+
"SELECT ARRAY_JOIN(`array`('apple', 'banana', 'cherry'), ' | ') `joined`\n"
63+
+ "FROM `scott`.`EMP`\n"
64+
+ "LIMIT 1";
65+
verifyPPLToSparkSQL(root, expectedSparkSql);
66+
}
67+
68+
@Test
69+
public void testMvjoinWithEmptyArray() {
70+
String ppl = "source=EMP | eval joined = mvjoin(array(), ',') | head 1 | fields joined";
71+
RelNode root = getRelNode(ppl);
72+
73+
String expectedLogical =
74+
"LogicalProject(joined=[$8])\n"
75+
+ " LogicalSort(fetch=[1])\n"
76+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
77+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array(), ',')])\n"
78+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
79+
verifyLogical(root, expectedLogical);
80+
81+
String expectedResult = "joined=\n";
82+
verifyResult(root, expectedResult);
83+
84+
String expectedSparkSql =
85+
"SELECT ARRAY_JOIN(`array`(), ',') `joined`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1";
86+
verifyPPLToSparkSQL(root, expectedSparkSql);
87+
}
88+
89+
@Test
90+
public void testMvjoinWithFieldReference() {
91+
String ppl =
92+
"source=EMP | eval joined = mvjoin(array(ENAME, JOB), '-') | head 1 | fields joined";
93+
RelNode root = getRelNode(ppl);
94+
95+
String expectedLogical =
96+
"LogicalProject(joined=[$8])\n"
97+
+ " LogicalSort(fetch=[1])\n"
98+
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
99+
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array($1, $2), '-')])\n"
100+
+ " LogicalTableScan(table=[[scott, EMP]])\n";
101+
verifyLogical(root, expectedLogical);
102+
103+
String expectedSparkSql =
104+
"SELECT ARRAY_JOIN(`array`(`ENAME`, `JOB`), '-') `joined`\n"
105+
+ "FROM `scott`.`EMP`\n"
106+
+ "LIMIT 1";
107+
verifyPPLToSparkSQL(root, expectedSparkSql);
108+
}
109+
}

0 commit comments

Comments
 (0)