Skip to content

Commit 23893c5

Browse files
committed
feat: implement removeInput method for regression, covariance, and correlation accumulators
1 parent 0d473a9 commit 23893c5

5 files changed

Lines changed: 197 additions & 8 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5669,6 +5669,54 @@ public void statFunctionsInHavingTest() {
56695669
DATABASE_NAME);
56705670
}
56715671

5672+
@Test
5673+
public void statFunctionsWindowFrameTest() {
5674+
String[] expectedHeader =
5675+
new String[] {
5676+
"device_id", "time", "corr_v", "covar_pop_v", "covar_samp_v", "slope_v", "intercept_v"
5677+
};
5678+
String[] retArray =
5679+
new String[] {
5680+
"d1,1970-01-01T00:00:00.001Z,1.0,0.25,0.5,1.0,0.0,",
5681+
"d1,1970-01-01T00:00:00.002Z,0.8660254037844386,0.3333333333333333,0.49999999999999994,1.5,-0.5,",
5682+
"d1,1970-01-01T00:00:00.003Z,-0.8660254037844385,-0.3333333333333333,-0.5,-1.4999999999999998,5.5,",
5683+
"d1,1970-01-01T00:00:00.004Z,-0.8660254037844385,-0.3333333333333333,-0.49999999999999994,-1.4999999999999998,6.0,",
5684+
"d1,1970-01-01T00:00:00.005Z,null,0.0,0.0,null,null,",
5685+
};
5686+
tableResultSetEqualTest(
5687+
"select device_id, time, "
5688+
+ "corr(s1, s2) over (partition by device_id order by time rows between 1 preceding and 1 following) as corr_v, "
5689+
+ "covar_pop(s1, s2) over (partition by device_id order by time rows between 1 preceding and 1 following) as covar_pop_v, "
5690+
+ "covar_samp(s1, s2) over (partition by device_id order by time rows between 1 preceding and 1 following) as covar_samp_v, "
5691+
+ "regr_slope(s1, s2) over (partition by device_id order by time rows between 1 preceding and 1 following) as slope_v, "
5692+
+ "regr_intercept(s1, s2) over (partition by device_id order by time rows between 1 preceding and 1 following) as intercept_v "
5693+
+ "from stat_table where device_id = 'd1' order by device_id, time",
5694+
expectedHeader,
5695+
retArray,
5696+
DATABASE_NAME);
5697+
}
5698+
5699+
@Test
5700+
public void statMomentsWindowFrameTest() {
5701+
String[] expectedHeader = new String[] {"device_id", "time", "skew_v", "kurt_v"};
5702+
String[] retArray =
5703+
new String[] {
5704+
"d1,1970-01-01T00:00:00.001Z,null,null,",
5705+
"d1,1970-01-01T00:00:00.002Z,null,null,",
5706+
"d1,1970-01-01T00:00:00.003Z,0.0,null,",
5707+
"d1,1970-01-01T00:00:00.004Z,0.0,-1.200000000000001,",
5708+
"d1,1970-01-01T00:00:00.005Z,0.0,-1.200000000000001,",
5709+
};
5710+
tableResultSetEqualTest(
5711+
"select device_id, time, "
5712+
+ "skewness(s1) over (partition by device_id order by time rows between 3 preceding and current row) as skew_v, "
5713+
+ "kurtosis(s1) over (partition by device_id order by time rows between 3 preceding and current row) as kurt_v "
5714+
+ "from stat_table where device_id = 'd1' order by device_id, time",
5715+
expectedHeader,
5716+
retArray,
5717+
DATABASE_NAME);
5718+
}
5719+
56725720
@Test
56735721
public void statFunctionsErrorTest() {
56745722
String typeError = "only support numeric data types [INT32, INT64, FLOAT, DOUBLE, TIMESTAMP]";

iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableCentralMomentAccumulator.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class TableCentralMomentAccumulator implements TableAccumulator {
3636
private static final long INSTANCE_SIZE =
3737
RamUsageEstimator.shallowSizeOfInstance(TableCentralMomentAccumulator.class);
3838
private static final int INTERMEDIATE_SIZE = Long.BYTES + 4 * Double.BYTES;
39+
private static final double EPSILON = 1e-12;
3940

4041
private final TSDataType seriesDataType;
4142
private final CentralMomentAccumulator.MomentType momentType;
@@ -229,7 +230,45 @@ public TableAccumulator copy() {
229230

230231
@Override
231232
public void removeInput(Column[] arguments) {
232-
throw new UnsupportedOperationException();
233+
checkArgument(arguments.length == 1, "Input of CentralMoment should be 1");
234+
if (count == 0 || arguments[0].isNull(0)) {
235+
return;
236+
}
237+
238+
double value = getDoubleValue(arguments[0], 0);
239+
if (count == 1) {
240+
reset();
241+
return;
242+
}
243+
244+
long nTotal = count;
245+
long nA = nTotal - 1;
246+
247+
double meanA = (nTotal * mean - value) / nA;
248+
249+
double delta = value - meanA;
250+
double delta2 = delta * delta;
251+
double delta3 = delta * delta2;
252+
double delta4 = delta2 * delta2;
253+
254+
double m2A = m2 - delta2 * nA / nTotal;
255+
double m3A =
256+
m3 - delta3 * nA * (nA - 1) / ((double) nTotal * nTotal) + 3.0 * delta * m2A / nTotal;
257+
double m4A =
258+
m4
259+
- delta4 * nA * ((double) nA * nA - nA + 1) / ((double) nTotal * nTotal * nTotal)
260+
- 6.0 * delta2 * m2A / ((double) nTotal * nTotal)
261+
+ 4.0 * delta * m3A / nTotal;
262+
263+
count = nA;
264+
mean = meanA;
265+
m2 = normalizeZero(m2A);
266+
m3 = normalizeZero(m3A);
267+
m4 = normalizeZero(m4A);
268+
}
269+
270+
private double normalizeZero(double value) {
271+
return Math.abs(value) < EPSILON ? 0 : value;
233272
}
234273

235274
@Override
@@ -253,6 +292,6 @@ public void reset() {
253292

254293
@Override
255294
public boolean removable() {
256-
return false;
295+
return true;
257296
}
258297
}

iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableCorrelationAccumulator.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class TableCorrelationAccumulator implements TableAccumulator {
3636
private static final long INSTANCE_SIZE =
3737
RamUsageEstimator.shallowSizeOfInstance(TableCorrelationAccumulator.class);
3838
private static final int INTERMEDIATE_SIZE = Long.BYTES + 5 * Double.BYTES;
39+
private static final double EPSILON = 1e-12;
3940
private final TSDataType xDataType;
4041
private final TSDataType yDataType;
4142
private final CorrelationAccumulator.CorrelationType correlationType;
@@ -126,7 +127,42 @@ private void update(double x, double y) {
126127

127128
@Override
128129
public void removeInput(Column[] arguments) {
129-
throw new UnsupportedOperationException("Remove not implemented for Correlation Accumulator");
130+
checkArgument(arguments.length == 2, "Input of Correlation should be 2");
131+
if (count == 0 || arguments[0].isNull(0) || arguments[1].isNull(0)) {
132+
return;
133+
}
134+
135+
double otherX = getDoubleValue(arguments[0], 0, xDataType);
136+
double otherY = getDoubleValue(arguments[1], 0, yDataType);
137+
138+
if (count == 1) {
139+
reset();
140+
return;
141+
}
142+
143+
long totalCount = count;
144+
long newCount = totalCount - 1;
145+
146+
double newMeanX = (totalCount * meanX - otherX) / newCount;
147+
double newMeanY = (totalCount * meanY - otherY) / newCount;
148+
149+
double deltaX = otherX - newMeanX;
150+
double deltaY = otherY - newMeanY;
151+
double correction = (double) newCount / totalCount;
152+
153+
c2 = c2 - deltaX * deltaY * correction;
154+
m2X = m2X - deltaX * deltaX * correction;
155+
m2Y = m2Y - deltaY * deltaY * correction;
156+
meanX = newMeanX;
157+
meanY = newMeanY;
158+
c2 = normalizeZero(c2);
159+
m2X = normalizeZero(m2X);
160+
m2Y = normalizeZero(m2Y);
161+
count = newCount;
162+
}
163+
164+
private double normalizeZero(double value) {
165+
return Math.abs(value) < EPSILON ? 0 : value;
130166
}
131167

132168
@Override
@@ -247,6 +283,6 @@ public void reset() {
247283

248284
@Override
249285
public boolean removable() {
250-
return false;
286+
return true;
251287
}
252288
}

iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableCovarianceAccumulator.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class TableCovarianceAccumulator implements TableAccumulator {
3636
private static final long INSTANCE_SIZE =
3737
RamUsageEstimator.shallowSizeOfInstance(TableCovarianceAccumulator.class);
3838
private static final int INTERMEDIATE_SIZE = Long.BYTES + 3 * Double.BYTES;
39+
private static final double EPSILON = 1e-12;
3940

4041
private final TSDataType xDataType;
4142
private final TSDataType yDataType;
@@ -123,7 +124,38 @@ private void update(double x, double y) {
123124

124125
@Override
125126
public void removeInput(Column[] arguments) {
126-
throw new UnsupportedOperationException("Remove not implemented for Covariance Accumulator");
127+
checkArgument(arguments.length == 2, "Input of Covariance should be 2");
128+
if (count == 0 || arguments[0].isNull(0) || arguments[1].isNull(0)) {
129+
return;
130+
}
131+
132+
double otherX = getDoubleValue(arguments[0], 0, xDataType);
133+
double otherY = getDoubleValue(arguments[1], 0, yDataType);
134+
135+
if (count == 1) {
136+
reset();
137+
return;
138+
}
139+
140+
long totalCount = count;
141+
long newCount = totalCount - 1;
142+
143+
double newMeanX = (totalCount * meanX - otherX) / newCount;
144+
double newMeanY = (totalCount * meanY - otherY) / newCount;
145+
146+
double deltaX = otherX - newMeanX;
147+
double deltaY = otherY - newMeanY;
148+
double correction = (double) newCount / totalCount;
149+
150+
c2 = c2 - deltaX * deltaY * correction;
151+
meanX = newMeanX;
152+
meanY = newMeanY;
153+
c2 = normalizeZero(c2);
154+
count = newCount;
155+
}
156+
157+
private double normalizeZero(double value) {
158+
return Math.abs(value) < EPSILON ? 0 : value;
127159
}
128160

129161
@Override
@@ -235,6 +267,6 @@ public void reset() {
235267

236268
@Override
237269
public boolean removable() {
238-
return false;
270+
return true;
239271
}
240272
}

iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableRegressionAccumulator.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class TableRegressionAccumulator implements TableAccumulator {
3636
private static final long INSTANCE_SIZE =
3737
RamUsageEstimator.shallowSizeOfInstance(TableRegressionAccumulator.class);
3838
private static final int INTERMEDIATE_SIZE = Long.BYTES + 4 * Double.BYTES;
39+
private static final double EPSILON = 1e-12;
3940

4041
private final TSDataType yDataType;
4142
private final TSDataType xDataType;
@@ -220,7 +221,40 @@ public void evaluateFinal(ColumnBuilder columnBuilder) {
220221

221222
@Override
222223
public void removeInput(Column[] arguments) {
223-
throw new UnsupportedOperationException();
224+
checkArgument(arguments.length == 2, "Input of Regression should be 2");
225+
if (count == 0 || arguments[0].isNull(0) || arguments[1].isNull(0)) {
226+
return;
227+
}
228+
229+
double otherY = getDoubleValue(arguments[0], 0, yDataType);
230+
double otherX = getDoubleValue(arguments[1], 0, xDataType);
231+
232+
if (count == 1) {
233+
reset();
234+
return;
235+
}
236+
237+
long totalCount = count;
238+
long newCount = totalCount - 1;
239+
240+
double newMeanX = (totalCount * meanX - otherX) / newCount;
241+
double newMeanY = (totalCount * meanY - otherY) / newCount;
242+
243+
double deltaX = otherX - newMeanX;
244+
double deltaY = otherY - newMeanY;
245+
double correction = (double) newCount / totalCount;
246+
247+
c2 = c2 - deltaX * deltaY * correction;
248+
m2X = m2X - deltaX * deltaX * correction;
249+
meanX = newMeanX;
250+
meanY = newMeanY;
251+
c2 = normalizeZero(c2);
252+
m2X = normalizeZero(m2X);
253+
count = newCount;
254+
}
255+
256+
private double normalizeZero(double value) {
257+
return Math.abs(value) < EPSILON ? 0 : value;
224258
}
225259

226260
@Override
@@ -244,6 +278,6 @@ public void reset() {
244278

245279
@Override
246280
public boolean removable() {
247-
return false;
281+
return true;
248282
}
249283
}

0 commit comments

Comments
 (0)