Skip to content

Commit b220be4

Browse files
authored
Support distinct_count/dc in eventstats (opensearch-project#4084)
* Support distinct_count/dc in eventstats Signed-off-by: Kai Huang <ahkcs@amazon.com> * Add UT and optimize code Signed-off-by: Kai Huang <ahkcs@amazon.com> * doc Signed-off-by: Kai Huang <ahkcs@amazon.com> * fix CI Signed-off-by: Kai Huang <ahkcs@amazon.com> * remove unit test and registration Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com> Signed-off-by: Kai Huang <105710027+ahkcs@users.noreply.github.com>
1 parent 284ecc4 commit b220be4

7 files changed

Lines changed: 167 additions & 24 deletions

File tree

core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,9 @@ public enum BuiltinFunctionName {
365365
.put("stddev_samp", BuiltinFunctionName.STDDEV_SAMP)
366366
// .put("earliest", BuiltinFunctionName.EARLIEST)
367367
// .put("latest", BuiltinFunctionName.LATEST)
368+
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
369+
.put("dc", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
370+
.put("distinct_count", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
368371
.put("pattern", BuiltinFunctionName.INTERNAL_PATTERN)
369372
.build();
370373

docs/user/ppl/cmd/eventstats.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,28 @@ Example::
277277
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+
278278

279279

280+
DISTINCT_COUNT, DC(Since 3.3)
281+
------------------
282+
283+
Description
284+
>>>>>>>>>>>
285+
286+
Usage: DISTINCT_COUNT(expr), DC(expr). Returns the approximate number of distinct values of expr using HyperLogLog++ algorithm. Both ``DISTINCT_COUNT`` and ``DC`` are equivalent and provide the same functionality.
287+
288+
Example::
289+
290+
PPL> source=accounts | eventstats dc(state) as distinct_states, distinct_count(state) as dc_states_alt by gender;
291+
fetched rows / total rows = 4/4
292+
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+
293+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | distinct_states | dc_states_alt |
294+
|----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------|-----------------|
295+
| 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 1 | 1 |
296+
| 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 3 | 3 |
297+
| 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 3 | 3 |
298+
| 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 3 | 3 |
299+
+----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+
300+
301+
280302
Configuration
281303
=============
282304
This command requires Calcite enabled.
@@ -312,6 +334,8 @@ Eventstats::
312334
source = table | where a < 50 | eventstats count(c)
313335
source = table | eventstats min(c), max(c) by b
314336
source = table | eventstats count(c) as count_by by b | where count_by > 1000
337+
source = table | eventstats dc(field) as distinct_count
338+
source = table | eventstats distinct_count(category) by region
315339

316340

317341
Example 1: Calculate the average, sum and count of a field by group

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,27 @@ public void supportPushDownScriptOnTextField() throws IOException {
234234
assertJsonEqualsIgnoreId(expected, result);
235235
}
236236

237+
@Test
238+
public void testEventstatsDistinctCountExplain() throws IOException {
239+
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
240+
String query =
241+
"source=opensearch-sql_test_index_account | eventstats dc(state) as distinct_states";
242+
var result = explainQueryToString(query);
243+
String expected = loadFromFile("expectedOutput/calcite/explain_eventstats_dc.json");
244+
assertJsonEqualsIgnoreId(expected, result);
245+
}
246+
247+
@Test
248+
public void testEventstatsDistinctCountFunctionExplain() throws IOException {
249+
Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled());
250+
String query =
251+
"source=opensearch-sql_test_index_account | eventstats distinct_count(state) as"
252+
+ " distinct_states by gender";
253+
var result = explainQueryToString(query);
254+
String expected = loadFromFile("expectedOutput/calcite/explain_eventstats_distinct_count.json");
255+
assertJsonEqualsIgnoreId(expected, result);
256+
}
257+
237258
// Only for Calcite, as v2 gets unstable serialized string for function
238259
@Test
239260
public void testExplainOnAggregationWithSumEnhancement() throws IOException {

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLEventstatsIT.java

Lines changed: 105 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
import org.json.JSONObject;
1414
import org.junit.Ignore;
1515
import org.junit.jupiter.api.Test;
16-
import org.opensearch.client.Request;
17-
import org.opensearch.sql.legacy.TestsConstants;
1816
import org.opensearch.sql.ppl.PPLIntegTestCase;
1917

2018
public class CalcitePPLEventstatsIT extends PPLIntegTestCase {
@@ -290,28 +288,6 @@ public void testUnsupportedWindowFunctions() {
290288
}
291289
}
292290

293-
@Ignore("DC should fail in window function")
294-
public void testDistinctCountShouldFail() throws IOException {
295-
Request request1 =
296-
new Request("PUT", "/" + TestsConstants.TEST_INDEX_STATE_COUNTRY + "/_doc/5?refresh=true");
297-
request1.setJsonEntity(
298-
"{\"name\":\"Jim\",\"age\":27,\"state\":\"Ontario\",\"country\":\"Canada\",\"year\":2023,\"month\":4}");
299-
client().performRequest(request1);
300-
JSONObject actual =
301-
executeQuery(
302-
String.format(
303-
"source=%s | eventstats distinct_count(state) by country",
304-
TEST_INDEX_STATE_COUNTRY));
305-
306-
verifyDataRows(
307-
actual,
308-
rows("John", "Canada", "Ontario", 4, 2023, 25, 3),
309-
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 3),
310-
rows("Jim", "Canada", "Ontario", 4, 2023, 27, 3),
311-
rows("Jake", "USA", "California", 4, 2023, 70, 2),
312-
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
313-
}
314-
315291
@Test
316292
public void testMultipleEventstat() throws IOException {
317293
JSONObject actual =
@@ -599,6 +575,111 @@ public void testEventstatVarianceWithNullBy() throws IOException {
599575
rows("Hello", "USA", "New York", 4, 2023, 30, 20, 28.284271247461902, 400, 800));
600576
}
601577

578+
@Test
579+
public void testEventstatDistinctCount() throws IOException {
580+
JSONObject actual =
581+
executeQuery(
582+
String.format(
583+
"source=%s | eventstats dc(state) as dc_state", TEST_INDEX_STATE_COUNTRY));
584+
585+
verifySchemaInOrder(
586+
actual,
587+
schema("name", "string"),
588+
schema("country", "string"),
589+
schema("state", "string"),
590+
schema("month", "int"),
591+
schema("year", "int"),
592+
schema("age", "int"),
593+
schema("dc_state", "bigint"));
594+
595+
verifyDataRows(
596+
actual,
597+
rows("John", "Canada", "Ontario", 4, 2023, 25, 4),
598+
rows("Jake", "USA", "California", 4, 2023, 70, 4),
599+
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 4),
600+
rows("Hello", "USA", "New York", 4, 2023, 30, 4));
601+
}
602+
603+
@Test
604+
public void testEventstatDistinctCountByCountry() throws IOException {
605+
JSONObject actual =
606+
executeQuery(
607+
String.format(
608+
"source=%s | eventstats dc(state) as dc_state by country",
609+
TEST_INDEX_STATE_COUNTRY));
610+
611+
verifySchemaInOrder(
612+
actual,
613+
schema("name", "string"),
614+
schema("country", "string"),
615+
schema("state", "string"),
616+
schema("month", "int"),
617+
schema("year", "int"),
618+
schema("age", "int"),
619+
schema("dc_state", "bigint"));
620+
621+
verifyDataRows(
622+
actual,
623+
rows("John", "Canada", "Ontario", 4, 2023, 25, 2),
624+
rows("Jake", "USA", "California", 4, 2023, 70, 2),
625+
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2),
626+
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
627+
}
628+
629+
@Test
630+
public void testEventstatDistinctCountFunction() throws IOException {
631+
JSONObject actual =
632+
executeQuery(
633+
String.format(
634+
"source=%s | eventstats distinct_count(country) as dc_country",
635+
TEST_INDEX_STATE_COUNTRY));
636+
637+
verifySchemaInOrder(
638+
actual,
639+
schema("name", "string"),
640+
schema("country", "string"),
641+
schema("state", "string"),
642+
schema("month", "int"),
643+
schema("year", "int"),
644+
schema("age", "int"),
645+
schema("dc_country", "bigint"));
646+
647+
verifyDataRows(
648+
actual,
649+
rows("John", "Canada", "Ontario", 4, 2023, 25, 2),
650+
rows("Jake", "USA", "California", 4, 2023, 70, 2),
651+
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 2),
652+
rows("Hello", "USA", "New York", 4, 2023, 30, 2));
653+
}
654+
655+
@Test
656+
public void testEventstatDistinctCountWithNull() throws IOException {
657+
JSONObject actual =
658+
executeQuery(
659+
String.format(
660+
"source=%s | eventstats dc(state) as dc_state",
661+
TEST_INDEX_STATE_COUNTRY_WITH_NULL));
662+
663+
verifySchemaInOrder(
664+
actual,
665+
schema("name", "string"),
666+
schema("country", "string"),
667+
schema("state", "string"),
668+
schema("month", "int"),
669+
schema("year", "int"),
670+
schema("age", "int"),
671+
schema("dc_state", "bigint"));
672+
673+
verifyDataRows(
674+
actual,
675+
rows(null, "Canada", null, 4, 2023, 10, 4),
676+
rows("Kevin", null, null, 4, 2023, null, 4),
677+
rows("John", "Canada", "Ontario", 4, 2023, 25, 4),
678+
rows("Jake", "USA", "California", 4, 2023, 70, 4),
679+
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 4),
680+
rows("Hello", "USA", "New York", 4, 2023, 30, 4));
681+
}
682+
602683
@Ignore
603684
@Test
604685
public void testEventstatEarliestAndLatest() throws IOException {
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER (PARTITION BY $4)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(partition {4} aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}

ppl/src/main/antlr/OpenSearchPPLParser.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,8 @@ scalarWindowFunctionName
415415
| LAST
416416
| NTH
417417
| NTILE
418+
| DISTINCT_COUNT
419+
| DC
418420
;
419421

420422
// aggregation terms

0 commit comments

Comments
 (0)