Skip to content

Commit 5f68f19

Browse files
Aggregation bucket align (for RedisTimeSeries >= 1.6.0) (#30)
Signed-off-by: Den Kovalevskyi <xdev.developer@gmail.com>
1 parent 4ebd561 commit 5f68f19

File tree

4 files changed

+122
-1
lines changed

4 files changed

+122
-1
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2020 dengliming.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.github.dengliming.redismodule.redistimeseries;
18+
19+
/**
20+
* Aggregation Align type
21+
*
22+
* @author xdev.developer
23+
*/
24+
public enum Align {
25+
START("start"), END("end");
26+
27+
private String key;
28+
29+
Align(String key) {
30+
this.key = key;
31+
}
32+
33+
public String getKey() {
34+
return key;
35+
}
36+
}

redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RangeOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class RangeOptions {
2727

2828
private int count;
2929
private Aggregation aggregationType;
30+
private Align aggregationAlign;
3031
private long timeBucket;
3132
private boolean withLabels;
3233

@@ -41,6 +42,13 @@ public RangeOptions aggregationType(Aggregation aggregationType, long timeBucket
4142
return this;
4243
}
4344

45+
public RangeOptions aggregationType(Aggregation aggregationType, long timeBucket, Align align) {
46+
this.aggregationType = aggregationType;
47+
this.timeBucket = timeBucket;
48+
this.aggregationAlign = align;
49+
return this;
50+
}
51+
4452
public RangeOptions withLabels() {
4553
this.withLabels = true;
4654
return this;
@@ -55,6 +63,10 @@ public void build(List<Object> args) {
5563
args.add(Keywords.AGGREGATION);
5664
args.add(aggregationType.getKey());
5765
args.add(timeBucket);
66+
if (aggregationAlign != null) {
67+
args.add(Keywords.ALIGN);
68+
args.add(aggregationAlign.getKey());
69+
}
5870
}
5971
if (withLabels) {
6072
args.add(Keywords.WITHLABELS);

redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@
2121
*/
2222
public enum Keywords {
2323

24-
RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE;
24+
RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE, ALIGN;
2525

2626
}

redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
package io.github.dengliming.redismodule.redistimeseries;
1818

1919
import org.junit.Assert;
20+
import org.junit.Ignore;
2021
import org.junit.jupiter.api.Test;
2122
import org.redisson.client.RedisException;
2223

24+
import java.time.Instant;
25+
import java.time.temporal.ChronoUnit;
2326
import java.util.List;
2427
import java.util.Map;
2528

@@ -152,6 +155,76 @@ public void testRange() {
152155
assertThat(timeSeries).isEmpty();
153156
}
154157

158+
@Test
159+
public void testAggregations() {
160+
RedisTimeSeries redisTimeSeries = getRedisTimeSeries();
161+
long timestamp = System.currentTimeMillis();
162+
String sensor = "temperature:sum";
163+
164+
assertThat(redisTimeSeries.incrBy(sensor, 10, timestamp, new TimeSeriesOptions()
165+
.retentionTime(6000L)
166+
.unCompressed()).longValue()).isEqualTo(timestamp);
167+
168+
assertThat(redisTimeSeries.incrBy(sensor, 20, timestamp + 1).longValue()).isEqualTo(timestamp + 1);
169+
170+
List<Value> values = redisTimeSeries.range(sensor, timestamp, timestamp + 1);
171+
assertThat(values).hasSize(2);
172+
173+
List<Value> sum = redisTimeSeries.range(sensor, timestamp, timestamp + 10, new RangeOptions()
174+
.aggregationType(Aggregation.SUM, 60000L));
175+
176+
assertThat(sum).hasSize(1);
177+
// Timestamp trimmed to timeBucket (minutes)
178+
assertThat(sum.get(0).getTimestamp()).isEqualTo(Instant.ofEpochMilli(timestamp).truncatedTo(ChronoUnit.MINUTES).toEpochMilli());
179+
assertThat(sum.get(0).getValue()).isEqualTo(40.0d);
180+
}
181+
182+
@Ignore("Only for redis timeseries > 1.6.0")
183+
public void testAggregationsAlign() {
184+
RedisTimeSeries redisTimeSeries = getRedisTimeSeries();
185+
long from = 1L;
186+
long to = 10000L;
187+
long timeBucket = 3000L;
188+
189+
String sensor = "temperature:sum:align";
190+
TimeSeriesOptions options = new TimeSeriesOptions().unCompressed();
191+
192+
/*
193+
TS: 1000 | 2000 | 3000 | 4000
194+
VAL: 1 | 1 | 10 | 10
195+
BT : -------------------|-----
196+
*/
197+
198+
assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(1000L, 1.0d)), options).longValue()).isEqualTo(1000L);
199+
assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(2000L, 1.0d)), options).longValue()).isEqualTo(2000L);
200+
assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(3000L, 10.0d)), options).longValue()).isEqualTo(3000L);
201+
assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(4000L, 10.0d)), options).longValue()).isEqualTo(4000L);
202+
203+
List<Value> values = redisTimeSeries.range(sensor, from, to);
204+
assertThat(values).hasSize(4);
205+
206+
List<Value> start = redisTimeSeries.range(sensor, from, to, new RangeOptions()
207+
.aggregationType(Aggregation.SUM, timeBucket, Align.START));
208+
209+
assertThat(start).hasSize(2);
210+
assertThat(start.get(0).getTimestamp()).isEqualTo(from);
211+
assertThat(start.get(0).getValue()).isEqualTo(12.0d);
212+
213+
assertThat(start.get(1).getTimestamp()).isEqualTo(from + timeBucket);
214+
assertThat(start.get(1).getValue()).isEqualTo(10.0d);
215+
216+
List<Value> end = redisTimeSeries.range(sensor, from, to, new RangeOptions()
217+
.aggregationType(Aggregation.SUM, timeBucket, Align.END));
218+
219+
assertThat(end).hasSize(2);
220+
221+
assertThat(end.get(0).getTimestamp()).isEqualTo(1000L);
222+
assertThat(end.get(0).getValue()).isEqualTo(12.0d);
223+
224+
assertThat(end.get(1).getTimestamp()).isEqualTo(4000L);
225+
assertThat(end.get(1).getValue()).isEqualTo(10.0d);
226+
}
227+
155228
@Test
156229
public void testQueryIndex() {
157230
RedisTimeSeries redisTimeSeries = getRedisTimeSeries();

0 commit comments

Comments
 (0)