diff --git a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/Align.java b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/Align.java new file mode 100644 index 0000000..6751eb1 --- /dev/null +++ b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/Align.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020 dengliming. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.dengliming.redismodule.redistimeseries; + +/** + * Aggregation Align type + * + * @author xdev.developer + */ +public enum Align { + START("start"), END("end"); + + private String key; + + Align(String key) { + this.key = key; + } + + public String getKey() { + return key; + } +} diff --git a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RangeOptions.java b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RangeOptions.java index ac1515e..0689195 100644 --- a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RangeOptions.java +++ b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RangeOptions.java @@ -27,6 +27,7 @@ public class RangeOptions { private int count; private Aggregation aggregationType; + private Align aggregationAlign; private long timeBucket; private boolean withLabels; @@ -41,6 +42,13 @@ public RangeOptions aggregationType(Aggregation aggregationType, long timeBucket return this; } + public RangeOptions aggregationType(Aggregation aggregationType, long timeBucket, Align align) { + this.aggregationType = aggregationType; + this.timeBucket = timeBucket; + this.aggregationAlign = align; + return this; + } + public RangeOptions withLabels() { this.withLabels = true; return this; @@ -55,6 +63,10 @@ public void build(List args) { args.add(Keywords.AGGREGATION); args.add(aggregationType.getKey()); args.add(timeBucket); + if (aggregationAlign != null) { + args.add(Keywords.ALIGN); + args.add(aggregationAlign.getKey()); + } } if (withLabels) { args.add(Keywords.WITHLABELS); diff --git a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java index 7fba89d..a581f66 100644 --- a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java +++ b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java @@ -21,6 +21,6 @@ */ public enum Keywords { - RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE; + RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE, ALIGN; } diff --git a/redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java b/redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java index 60183ca..298c30c 100644 --- a/redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java +++ b/redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java @@ -17,9 +17,12 @@ package io.github.dengliming.redismodule.redistimeseries; import org.junit.Assert; +import org.junit.Ignore; import org.junit.jupiter.api.Test; import org.redisson.client.RedisException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; @@ -152,6 +155,76 @@ public void testRange() { assertThat(timeSeries).isEmpty(); } + @Test + public void testAggregations() { + RedisTimeSeries redisTimeSeries = getRedisTimeSeries(); + long timestamp = System.currentTimeMillis(); + String sensor = "temperature:sum"; + + assertThat(redisTimeSeries.incrBy(sensor, 10, timestamp, new TimeSeriesOptions() + .retentionTime(6000L) + .unCompressed()).longValue()).isEqualTo(timestamp); + + assertThat(redisTimeSeries.incrBy(sensor, 20, timestamp + 1).longValue()).isEqualTo(timestamp + 1); + + List values = redisTimeSeries.range(sensor, timestamp, timestamp + 1); + assertThat(values).hasSize(2); + + List sum = redisTimeSeries.range(sensor, timestamp, timestamp + 10, new RangeOptions() + .aggregationType(Aggregation.SUM, 60000L)); + + assertThat(sum).hasSize(1); + // Timestamp trimmed to timeBucket (minutes) + assertThat(sum.get(0).getTimestamp()).isEqualTo(Instant.ofEpochMilli(timestamp).truncatedTo(ChronoUnit.MINUTES).toEpochMilli()); + assertThat(sum.get(0).getValue()).isEqualTo(40.0d); + } + + @Ignore("Only for redis timeseries > 1.6.0") + public void testAggregationsAlign() { + RedisTimeSeries redisTimeSeries = getRedisTimeSeries(); + long from = 1L; + long to = 10000L; + long timeBucket = 3000L; + + String sensor = "temperature:sum:align"; + TimeSeriesOptions options = new TimeSeriesOptions().unCompressed(); + + /* + TS: 1000 | 2000 | 3000 | 4000 + VAL: 1 | 1 | 10 | 10 + BT : -------------------|----- + */ + + assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(1000L, 1.0d)), options).longValue()).isEqualTo(1000L); + assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(2000L, 1.0d)), options).longValue()).isEqualTo(2000L); + assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(3000L, 10.0d)), options).longValue()).isEqualTo(3000L); + assertThat(redisTimeSeries.add(new Sample(sensor, Value.of(4000L, 10.0d)), options).longValue()).isEqualTo(4000L); + + List values = redisTimeSeries.range(sensor, from, to); + assertThat(values).hasSize(4); + + List start = redisTimeSeries.range(sensor, from, to, new RangeOptions() + .aggregationType(Aggregation.SUM, timeBucket, Align.START)); + + assertThat(start).hasSize(2); + assertThat(start.get(0).getTimestamp()).isEqualTo(from); + assertThat(start.get(0).getValue()).isEqualTo(12.0d); + + assertThat(start.get(1).getTimestamp()).isEqualTo(from + timeBucket); + assertThat(start.get(1).getValue()).isEqualTo(10.0d); + + List end = redisTimeSeries.range(sensor, from, to, new RangeOptions() + .aggregationType(Aggregation.SUM, timeBucket, Align.END)); + + assertThat(end).hasSize(2); + + assertThat(end.get(0).getTimestamp()).isEqualTo(1000L); + assertThat(end.get(0).getValue()).isEqualTo(12.0d); + + assertThat(end.get(1).getTimestamp()).isEqualTo(4000L); + assertThat(end.get(1).getValue()).isEqualTo(10.0d); + } + @Test public void testQueryIndex() { RedisTimeSeries redisTimeSeries = getRedisTimeSeries();