Skip to content

Commit 922330e

Browse files
Add APPROX_PERCENTILE aggregation function
1 parent d03bcff commit 922330e

16 files changed

Lines changed: 1964 additions & 8 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4309,6 +4309,42 @@ public void approxMostFrequentTest() {
43094309
DATABASE_NAME);
43104310
}
43114311

4312+
@Test
4313+
public void approxPercentileTest() {
4314+
tableResultSetEqualTest(
4315+
"select approx_percentile(time, 0.5),approx_percentile(s1,0.5),approx_percentile(s2,0.5),approx_percentile(s3,0.5),approx_percentile(s4,0.5) from table1",
4316+
buildHeaders(5),
4317+
new String[] {"2024-09-24T06:15:40.000Z,40,46000,40.0,46.0,"},
4318+
DATABASE_NAME);
4319+
4320+
tableResultSetEqualTest(
4321+
"select time,province,approx_percentile(s1,0.5),approx_percentile(s2,0.5) from table1 group by 1,2 order by 2,1",
4322+
new String[] {"time", "province", "_col2", "_col3"},
4323+
new String[] {
4324+
"2024-09-24T06:15:30.000Z,beijing,30,0,",
4325+
"2024-09-24T06:15:31.000Z,beijing,0,31000,",
4326+
"2024-09-24T06:15:35.000Z,beijing,0,35000,",
4327+
"2024-09-24T06:15:36.000Z,beijing,36,0,",
4328+
"2024-09-24T06:15:40.000Z,beijing,40,40000,",
4329+
"2024-09-24T06:15:41.000Z,beijing,41,0,",
4330+
"2024-09-24T06:15:46.000Z,beijing,0,46000,",
4331+
"2024-09-24T06:15:50.000Z,beijing,0,50000,",
4332+
"2024-09-24T06:15:51.000Z,beijing,0,0,",
4333+
"2024-09-24T06:15:55.000Z,beijing,55,0,",
4334+
"2024-09-24T06:15:30.000Z,shanghai,30,0,",
4335+
"2024-09-24T06:15:31.000Z,shanghai,0,31000,",
4336+
"2024-09-24T06:15:35.000Z,shanghai,0,35000,",
4337+
"2024-09-24T06:15:36.000Z,shanghai,36,0,",
4338+
"2024-09-24T06:15:40.000Z,shanghai,40,40000,",
4339+
"2024-09-24T06:15:41.000Z,shanghai,41,0,",
4340+
"2024-09-24T06:15:46.000Z,shanghai,0,46000,",
4341+
"2024-09-24T06:15:50.000Z,shanghai,0,50000,",
4342+
"2024-09-24T06:15:51.000Z,shanghai,0,0,",
4343+
"2024-09-24T06:15:55.000Z,shanghai,55,0,",
4344+
},
4345+
DATABASE_NAME);
4346+
}
4347+
43124348
@Test
43134349
public void exceptionTest() {
43144350
tableAssertTestFail(
@@ -4371,6 +4407,22 @@ public void exceptionTest() {
43714407
"select approx_most_frequent() from table1",
43724408
"701: Aggregation functions [approx_most_frequent] should only have three arguments",
43734409
DATABASE_NAME);
4410+
tableAssertTestFail(
4411+
"select approx_percentile() from table1",
4412+
"701: Aggregation functions [approx_percentile] should only have two or three arguments",
4413+
DATABASE_NAME);
4414+
tableAssertTestFail(
4415+
"select approx_percentile(s1,1.1) from table1",
4416+
"701: percentage should be in [0,1], got 1.1",
4417+
DATABASE_NAME);
4418+
tableAssertTestFail(
4419+
"select approx_percentile(s1,'test') from table1",
4420+
"701: The second argument of 'approx_percentile' function percentage must be a double literal",
4421+
DATABASE_NAME);
4422+
tableAssertTestFail(
4423+
"select approx_percentile(s5,0.5) from table1",
4424+
"701: Aggregation functions [approx_percentile] should have value column as numeric type [INT32, INT64, FLOAT, DOUBLE, TIMESTAMP]",
4425+
DATABASE_NAME);
43744426
}
43754427

43764428
// ==================================================================
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
16+
17+
import org.apache.iotdb.db.exception.sql.SemanticException;
18+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest;
19+
20+
import org.apache.tsfile.block.column.Column;
21+
import org.apache.tsfile.block.column.ColumnBuilder;
22+
import org.apache.tsfile.enums.TSDataType;
23+
import org.apache.tsfile.file.metadata.statistics.Statistics;
24+
import org.apache.tsfile.utils.Binary;
25+
import org.apache.tsfile.utils.RamUsageEstimator;
26+
import org.apache.tsfile.utils.ReadWriteIOUtils;
27+
import org.apache.tsfile.write.UnSupportedDataTypeException;
28+
29+
import java.nio.ByteBuffer;
30+
31+
public abstract class AbstractApproxPercentileAccumulator implements TableAccumulator {
32+
private static final long INSTANCE_SIZE =
33+
RamUsageEstimator.shallowSizeOfInstance(ApproxPercentileAccumulator.class);
34+
35+
protected final TDigest tDigest = new TDigest();
36+
protected final TSDataType seriesDataType;
37+
protected double percentage;
38+
39+
AbstractApproxPercentileAccumulator(TSDataType seriesDataType) {
40+
this.seriesDataType = seriesDataType;
41+
}
42+
43+
@Override
44+
public long getEstimatedSize() {
45+
return INSTANCE_SIZE + tDigest.getEstimatedSize();
46+
}
47+
48+
@Override
49+
public TableAccumulator copy() {
50+
return new ApproxPercentileAccumulator(seriesDataType);
51+
}
52+
53+
@Override
54+
public void addInput(Column[] arguments, AggregationMask mask) {
55+
if (arguments.length == 2) {
56+
percentage = arguments[1].getDouble(0);
57+
} else if (arguments.length == 3) {
58+
percentage = arguments[2].getDouble(0);
59+
} else {
60+
throw new SemanticException(
61+
String.format(
62+
"APPROX_PERCENTILE requires 2 or 3 arguments, but got %d", arguments.length));
63+
}
64+
switch (seriesDataType) {
65+
case INT32:
66+
addIntInput(arguments, mask);
67+
return;
68+
case INT64:
69+
case TIMESTAMP:
70+
addLongInput(arguments, mask);
71+
return;
72+
case FLOAT:
73+
addFloatInput(arguments, mask);
74+
return;
75+
case DOUBLE:
76+
addDoubleInput(arguments, mask);
77+
return;
78+
default:
79+
throw new UnSupportedDataTypeException(
80+
String.format(
81+
"Unsupported data type in APPROX_PERCENTILE Aggregation: %s", seriesDataType));
82+
}
83+
}
84+
85+
@Override
86+
public void addIntermediate(Column argument) {
87+
for (int i = 0; i < argument.getPositionCount(); i++) {
88+
if (!argument.isNull(i)) {
89+
byte[] data = argument.getBinary(i).getValues();
90+
// Read percentage from the first 8 bytes and TDigest from the rest
91+
ByteBuffer buffer = ByteBuffer.wrap(data);
92+
this.percentage = ReadWriteIOUtils.readDouble(buffer);
93+
TDigest other = TDigest.fromByteBuffer(buffer);
94+
tDigest.add(other);
95+
}
96+
}
97+
}
98+
99+
@Override
100+
public void evaluateIntermediate(ColumnBuilder columnBuilder) {
101+
int tDigestDataLength = tDigest.byteSize();
102+
// Create a buffer with space for percentage (8 bytes) + TDigest data
103+
ByteBuffer buffer = ByteBuffer.allocate(8 + tDigestDataLength);
104+
ReadWriteIOUtils.write(percentage, buffer);
105+
tDigest.toByteArray(buffer);
106+
columnBuilder.writeBinary(new Binary(buffer.array()));
107+
}
108+
109+
@Override
110+
public void evaluateFinal(ColumnBuilder columnBuilder) {
111+
switch (seriesDataType) {
112+
case INT32:
113+
columnBuilder.writeInt((int) tDigest.quantile(percentage));
114+
break;
115+
case INT64:
116+
case TIMESTAMP:
117+
columnBuilder.writeLong((long) tDigest.quantile(percentage));
118+
break;
119+
case FLOAT:
120+
columnBuilder.writeFloat((float) tDigest.quantile(percentage));
121+
break;
122+
case DOUBLE:
123+
columnBuilder.writeDouble(tDigest.quantile(percentage));
124+
break;
125+
default:
126+
throw new UnSupportedDataTypeException(
127+
String.format(
128+
"Unsupported data type in APPROX_PERCENTILE Aggregation: %s", seriesDataType));
129+
}
130+
}
131+
132+
@Override
133+
public boolean hasFinalResult() {
134+
return false;
135+
}
136+
137+
@Override
138+
public void addStatistics(Statistics[] statistics) {
139+
throw new UnsupportedOperationException(
140+
"ApproxPercentileAccumulator does not support statistics");
141+
}
142+
143+
@Override
144+
public void reset() {
145+
tDigest.reset();
146+
}
147+
148+
public abstract void addIntInput(Column[] arguments, AggregationMask mask);
149+
150+
public abstract void addLongInput(Column[] arguments, AggregationMask mask);
151+
152+
public abstract void addFloatInput(Column[] arguments, AggregationMask mask);
153+
154+
public abstract void addDoubleInput(Column[] arguments, AggregationMask mask);
155+
156+
public static double toDoubleExact(long value) {
157+
double doubleValue = (double) value;
158+
if ((long) doubleValue != value) {
159+
throw new SemanticException(
160+
String.format("no exact double representation for long: %s", value));
161+
}
162+
return value;
163+
}
164+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.FloatGroupedApproxMostFrequentAccumulator;
3030
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator;
3131
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedApproxCountDistinctAccumulator;
32+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedApproxPercentileAccumulator;
33+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedApproxPercentileWithWeightAccumulator;
3234
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAvgAccumulator;
3335
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedCountAccumulator;
3436
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedCountAllAccumulator;
@@ -261,6 +263,12 @@ private static GroupedAccumulator createBuiltinGroupedAccumulator(
261263
return new GroupedApproxCountDistinctAccumulator(inputDataTypes.get(0));
262264
case APPROX_MOST_FREQUENT:
263265
return getGroupedApproxMostFrequentAccumulator(inputDataTypes.get(0));
266+
case APPROX_PERCENTILE:
267+
if (inputDataTypes.size() == 2) {
268+
return new GroupedApproxPercentileAccumulator(inputDataTypes.get(0));
269+
} else {
270+
return new GroupedApproxPercentileWithWeightAccumulator(inputDataTypes.get(0));
271+
}
264272
default:
265273
throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
266274
}
@@ -331,6 +339,12 @@ public static TableAccumulator createBuiltinAccumulator(
331339
return new ApproxCountDistinctAccumulator(inputDataTypes.get(0));
332340
case APPROX_MOST_FREQUENT:
333341
return getApproxMostFrequentAccumulator(inputDataTypes.get(0));
342+
case APPROX_PERCENTILE:
343+
if (inputDataTypes.size() == 2) {
344+
return new ApproxPercentileAccumulator(inputDataTypes.get(0));
345+
} else {
346+
return new ApproxPercentileWithWeightAccumulator(inputDataTypes.get(0));
347+
}
334348
default:
335349
throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
336350
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
16+
17+
import org.apache.tsfile.block.column.Column;
18+
import org.apache.tsfile.enums.TSDataType;
19+
20+
public class ApproxPercentileAccumulator extends AbstractApproxPercentileAccumulator {
21+
22+
public ApproxPercentileAccumulator(TSDataType seriesDataType) {
23+
super(seriesDataType);
24+
}
25+
26+
@Override
27+
public void addIntInput(Column[] arguments, AggregationMask mask) {
28+
29+
Column valueColumn = arguments[0];
30+
int positionCount = mask.getPositionCount();
31+
32+
if (mask.isSelectAll()) {
33+
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
34+
if (!valueColumn.isNull(i)) {
35+
tDigest.add(valueColumn.getInt(i));
36+
}
37+
}
38+
} else {
39+
int[] selectedPositions = mask.getSelectedPositions();
40+
int position;
41+
for (int i = 0; i < positionCount; i++) {
42+
position = selectedPositions[i];
43+
if (!valueColumn.isNull(position)) {
44+
tDigest.add(valueColumn.getInt(position));
45+
}
46+
}
47+
}
48+
}
49+
50+
@Override
51+
public void addLongInput(Column[] arguments, AggregationMask mask) {
52+
53+
Column valueColumn = arguments[0];
54+
int positionCount = mask.getPositionCount();
55+
56+
if (mask.isSelectAll()) {
57+
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
58+
if (!valueColumn.isNull(i)) {
59+
tDigest.add(toDoubleExact(valueColumn.getLong(i)));
60+
}
61+
}
62+
} else {
63+
int[] selectedPositions = mask.getSelectedPositions();
64+
int position;
65+
for (int i = 0; i < positionCount; i++) {
66+
position = selectedPositions[i];
67+
if (!valueColumn.isNull(position)) {
68+
tDigest.add(toDoubleExact(valueColumn.getLong(position)));
69+
}
70+
}
71+
}
72+
}
73+
74+
@Override
75+
public void addFloatInput(Column[] arguments, AggregationMask mask) {
76+
77+
Column valueColumn = arguments[0];
78+
int positionCount = mask.getPositionCount();
79+
80+
if (mask.isSelectAll()) {
81+
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
82+
if (!valueColumn.isNull(i)) {
83+
tDigest.add(valueColumn.getFloat(i));
84+
}
85+
}
86+
} else {
87+
int[] selectedPositions = mask.getSelectedPositions();
88+
int position;
89+
for (int i = 0; i < positionCount; i++) {
90+
position = selectedPositions[i];
91+
if (!valueColumn.isNull(position)) {
92+
tDigest.add(valueColumn.getFloat(position));
93+
}
94+
}
95+
}
96+
}
97+
98+
@Override
99+
public void addDoubleInput(Column[] arguments, AggregationMask mask) {
100+
Column valueColumn = arguments[0];
101+
int positionCount = mask.getPositionCount();
102+
if (mask.isSelectAll()) {
103+
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
104+
if (!valueColumn.isNull(i)) {
105+
tDigest.add(valueColumn.getDouble(i));
106+
}
107+
}
108+
} else {
109+
int[] selectedPositions = mask.getSelectedPositions();
110+
int position;
111+
for (int i = 0; i < positionCount; i++) {
112+
position = selectedPositions[i];
113+
if (!valueColumn.isNull(position)) {
114+
tDigest.add(valueColumn.getDouble(position));
115+
}
116+
}
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)