diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 878ba89..ca2d891 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -32,7 +32,7 @@ jobs: ports: - 6380:6379 redistimeseries: - image: redislabs/redistimeseries:latest + image: redislabs/redistimeseries:1.6.0 options: >- --health-cmd "redis-cli ping" --health-interval 10s diff --git a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/GroupByOptions.java b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/GroupByOptions.java new file mode 100644 index 0000000..4596bdb --- /dev/null +++ b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/GroupByOptions.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020 dengliming. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.dengliming.redismodule.redistimeseries; + +import io.github.dengliming.redismodule.redistimeseries.protocol.Keywords; + +import java.util.List; + +/** + * @author xdev.developer + */ +public class GroupByOptions { + + private String groupByLabel; + private Reducer reducer; + + /** + * Group by label using reducer aggregation + * @param label grouping label + * @param reducer reducer + * @return RangeOptions + */ + public GroupByOptions groupBy(String label, Reducer reducer) { + this.groupByLabel = label; + this.reducer = reducer; + return this; + } + + public void build(List args) { + if (groupByLabel != null && reducer != null) { + args.add(Keywords.GROUPBY); + args.add(groupByLabel); + args.add(Keywords.REDUCE); + args.add(reducer.getKey()); + } + } +} diff --git a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeries.java b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeries.java index 5e0bec7..8873f75 100644 --- a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeries.java +++ b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeries.java @@ -292,6 +292,41 @@ public RFuture> mrangeAsync(long from, long to, RangeOptions ra return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TS_MRANGE, args.toArray()); } + /** + * Query a timestamp range across multiple time-series by filters. + * + * @param from fromTimestamp + * @param to to timestamp + * @param rangeOptions Optional args + * @param groupBy Optional group by args + * @param filters list of filters + * @return List of TimeSeries + */ + public List mrange(long from, long to, RangeOptions rangeOptions, GroupByOptions groupBy, String... filters) { + return commandExecutor.get(mrangeAsync(from, to, rangeOptions, groupBy, filters)); + } + + public RFuture> mrangeAsync(long from, long to, RangeOptions rangeOptions, GroupByOptions groupBy, String... filters) { + RAssert.notEmpty(filters, "filters must not be empty"); + + List args = new ArrayList<>(); + args.add(from); + args.add(to); + if (rangeOptions != null) { + rangeOptions.build(args); + } + args.add(Keywords.FILTER); + for (String filter : filters) { + args.add(filter); + } + + if (groupBy != null) { + groupBy.build(args); + } + + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, TS_MRANGE, args.toArray()); + } + /** * Get the last sample. * diff --git a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/Reducer.java b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/Reducer.java new file mode 100644 index 0000000..4945e7c --- /dev/null +++ b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/Reducer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020 dengliming. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.dengliming.redismodule.redistimeseries; + +/** + * Group by reducer + * + * @author xdev.developer + */ +public enum Reducer { + SUM("sum"), MIN("min"), MAX("max"); + + private String key; + + Reducer(String key) { + this.key = key; + } + + public String getKey() { + return key; + } +} diff --git a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java index a581f66..250512d 100644 --- a/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java +++ b/redistimeseries/src/main/java/io/github/dengliming/redismodule/redistimeseries/protocol/Keywords.java @@ -21,6 +21,6 @@ */ public enum Keywords { - RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE, ALIGN; + RETENTION, UNCOMPRESSED, LABELS, TIMESTAMP, AGGREGATION, COUNT, WITHLABELS, FILTER, DUPLICATE_POLICY, ON_DUPLICATE, ALIGN, GROUPBY, REDUCE; } diff --git a/redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java b/redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java index 298c30c..d25896c 100644 --- a/redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java +++ b/redistimeseries/src/test/java/io/github/dengliming/redismodule/redistimeseries/RedisTimeSeriesTest.java @@ -17,7 +17,6 @@ package io.github.dengliming.redismodule.redistimeseries; import org.junit.Assert; -import org.junit.Ignore; import org.junit.jupiter.api.Test; import org.redisson.client.RedisException; @@ -28,6 +27,7 @@ import static io.github.dengliming.redismodule.redistimeseries.Sample.Value; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.tuple; /** * @author dengliming @@ -179,7 +179,7 @@ public void testAggregations() { assertThat(sum.get(0).getValue()).isEqualTo(40.0d); } - @Ignore("Only for redis timeseries > 1.6.0") + @Test public void testAggregationsAlign() { RedisTimeSeries redisTimeSeries = getRedisTimeSeries(); long from = 1L; @@ -225,6 +225,87 @@ public void testAggregationsAlign() { assertThat(end.get(1).getValue()).isEqualTo(10.0d); } + @Test + public void testGroupBy() { + RedisTimeSeries redisTimeSeries = getRedisTimeSeries(); + long from = 1L; + long to = 10; + + TimeSeriesOptions cpuSystem = new TimeSeriesOptions() + .labels(new Label("metric", "cpu"), new Label("name", "system")) + .unCompressed(); + + TimeSeriesOptions cpuUser = new TimeSeriesOptions() + .labels(new Label("metric", "cpu"), new Label("name", "user")) + .unCompressed(); + + assertThat(redisTimeSeries.add(new Sample("ts1", Value.of(1L, 90.0d)), cpuSystem).longValue()).isEqualTo(1L); + assertThat(redisTimeSeries.add(new Sample("ts1", Value.of(2L, 45.0d)), cpuSystem).longValue()).isEqualTo(2L); + assertThat(redisTimeSeries.add(new Sample("ts2", Value.of(2L, 99.0d)), cpuUser).longValue()).isEqualTo(2L); + assertThat(redisTimeSeries.add(new Sample("ts3", Value.of(2L, 2.0d)), cpuSystem).longValue()).isEqualTo(2L); + + List max = redisTimeSeries.mrange(from, to, + new RangeOptions().withLabels(), + new GroupByOptions().groupBy("name", Reducer.MAX), "metric=cpu"); + + assertThat(max).hasSize(2); + + assertThat(max.get(0).getLabels()) + .extracting(Label::getKey, Label::getValue) + .containsExactlyInAnyOrder( + tuple("name", "system"), + tuple("__reducer__", "max"), + tuple("__source__", "ts1,ts3")); + + assertThat(max.get(0).getValues()) + .extracting(Value::getTimestamp, Value::getValue) + .containsExactly( + tuple(1L, 90.0d), + tuple(2L, 45.0d)); + + assertThat(max.get(1).getLabels()) + .extracting(Label::getKey, Label::getValue) + .containsExactlyInAnyOrder( + tuple("name", "user"), + tuple("__reducer__", "max"), + tuple("__source__", "ts2")); + + assertThat(max.get(1).getValues()) + .extracting(Value::getTimestamp, Value::getValue) + .containsExactly(tuple(2L, 99.0d)); + + List min = redisTimeSeries.mrange(from, to, + new RangeOptions().withLabels(), + new GroupByOptions().groupBy("name", Reducer.MIN), "metric=cpu"); + + assertThat(min).hasSize(2); + + assertThat(min.get(0).getLabels()) + .extracting(Label::getKey, Label::getValue) + .containsExactlyInAnyOrder( + tuple("name", "system"), + tuple("__reducer__", "min"), + tuple("__source__", "ts1,ts3")); + + assertThat(min.get(0).getValues()) + .extracting(Value::getTimestamp, Value::getValue) + .containsExactly( + tuple(1L, 90.0d), + tuple(2L, 2.0d)); + + assertThat(min.get(1).getLabels()) + .extracting(Label::getKey, Label::getValue) + .containsExactlyInAnyOrder( + tuple("name", "user"), + tuple("__reducer__", "min"), + tuple("__source__", "ts2")); + + assertThat(min.get(1).getValues()) + .extracting(Value::getTimestamp, Value::getValue) + .containsExactly(tuple(2L, 99.0d)); + + } + @Test public void testQueryIndex() { RedisTimeSeries redisTimeSeries = getRedisTimeSeries();