diff --git a/commons/src/main/java/io/github/dengliming/redismodule/common/api/RBatch.java b/commons/src/main/java/io/github/dengliming/redismodule/common/api/RBatch.java
new file mode 100644
index 0000000..d425118
--- /dev/null
+++ b/commons/src/main/java/io/github/dengliming/redismodule/common/api/RBatch.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2022 dengliming.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.github.dengliming.redismodule.common.api;
+
+import org.redisson.api.BatchResult;
+import org.redisson.api.RFuture;
+import org.redisson.client.RedisException;
+
+public interface RBatch {
+
+ /**
+ * Executes all operations accumulated during async methods invocations.
+ *
+ * If cluster configuration used then operations are grouped by slot ids
+ * and may be executed on different servers. Thus command execution order could be changed
+ *
+ * @return List with result object for each command
+ * @throws RedisException in case of any error
+ *
+ */
+ BatchResult> execute() throws RedisException;
+
+ /**
+ * Executes all operations accumulated during async methods invocations asynchronously.
+ *
+ * In cluster configurations operations grouped by slot ids
+ * so may be executed on different servers. Thus command execution order could be changed
+ *
+ * @return List with result object for each command
+ */
+ RFuture> executeAsync();
+
+ /**
+ * Discard batched commands and release allocated buffers used for parameters encoding.
+ */
+ void discard();
+
+ /**
+ * Discard batched commands and release allocated buffers used for parameters encoding.
+ *
+ * @return void
+ */
+ RFuture discardAsync();
+}
diff --git a/commons/src/main/java/io/github/dengliming/redismodule/common/api/RCommonBatch.java b/commons/src/main/java/io/github/dengliming/redismodule/common/api/RCommonBatch.java
new file mode 100644
index 0000000..5d2a717
--- /dev/null
+++ b/commons/src/main/java/io/github/dengliming/redismodule/common/api/RCommonBatch.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2022 dengliming.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.github.dengliming.redismodule.common.api;
+
+import org.redisson.api.BatchOptions;
+import org.redisson.api.BatchResult;
+import org.redisson.api.RFuture;
+import org.redisson.command.CommandAsyncExecutor;
+import org.redisson.command.CommandBatchService;
+
+public abstract class RCommonBatch implements RBatch {
+
+ private final CommandBatchService executorService;
+
+ public RCommonBatch(CommandAsyncExecutor executor, BatchOptions options) {
+ this.executorService = new CommandBatchService(executor, options);
+ }
+
+ public CommandBatchService getExecutorService() {
+ return executorService;
+ }
+
+ @Override
+ public BatchResult> execute() {
+ return executorService.execute();
+ }
+
+ @Override
+ public RFuture> executeAsync() {
+ return executorService.executeAsync();
+ }
+
+ @Override
+ public void discard() {
+ executorService.discard();
+ }
+
+ @Override
+ public RFuture discardAsync() {
+ return executorService.discardAsync();
+ }
+}
diff --git a/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/RedisJSON.java b/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/RedisJSON.java
index 17d7dae..ed54105 100644
--- a/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/RedisJSON.java
+++ b/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/RedisJSON.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 dengliming.
+ * Copyright 2021-2022 dengliming.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -70,8 +70,12 @@ public class RedisJSON {
}
public RedisJSON(CommandAsyncExecutor commandExecutor) {
+ this(commandExecutor, commandExecutor.getConnectionManager().getCodec());
+ }
+
+ public RedisJSON(CommandAsyncExecutor commandExecutor, Codec codec) {
this.commandExecutor = commandExecutor;
- this.codec = commandExecutor.getConnectionManager().getCodec();
+ this.codec = codec;
}
/**
diff --git a/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/RedisJSONBatch.java b/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/RedisJSONBatch.java
new file mode 100644
index 0000000..1c95092
--- /dev/null
+++ b/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/RedisJSONBatch.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2022 dengliming.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.github.dengliming.redismodule.redisjson;
+
+import io.github.dengliming.redismodule.common.api.RCommonBatch;
+import org.redisson.api.BatchOptions;
+import org.redisson.command.CommandAsyncExecutor;
+
+public class RedisJSONBatch extends RCommonBatch {
+
+ public RedisJSONBatch(CommandAsyncExecutor executor, BatchOptions options) {
+ super(executor, options);
+ }
+
+ public RedisJSON getRedisJSON() {
+ return new RedisJSON(getExecutorService());
+ }
+}
diff --git a/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/client/RedisJSONClient.java b/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/client/RedisJSONClient.java
index 855bc8c..2375e3f 100644
--- a/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/client/RedisJSONClient.java
+++ b/redisjson/src/main/java/io/github/dengliming/redismodule/redisjson/client/RedisJSONClient.java
@@ -17,7 +17,9 @@
package io.github.dengliming.redismodule.redisjson.client;
import io.github.dengliming.redismodule.redisjson.RedisJSON;
+import io.github.dengliming.redismodule.redisjson.RedisJSONBatch;
import org.redisson.Redisson;
+import org.redisson.api.BatchOptions;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.Config;
@@ -32,6 +34,14 @@ public RedisJSON getRedisJSON() {
return new RedisJSON(getCommandExecutor());
}
+ public RedisJSONBatch createRedisJSONBatch() {
+ return this.createRedisJSONBatch(BatchOptions.defaults());
+ }
+
+ public RedisJSONBatch createRedisJSONBatch(BatchOptions options) {
+ return new RedisJSONBatch(getCommandExecutor(), options);
+ }
+
public Void flushall() {
CommandAsyncExecutor commandExecutor = getCommandExecutor();
return commandExecutor.get(commandExecutor.writeAllAsync(RedisCommands.FLUSHALL));
diff --git a/redisjson/src/test/java/io/github/dengliming/redismodule/redisjson/AbstractTest.java b/redisjson/src/test/java/io/github/dengliming/redismodule/redisjson/AbstractTest.java
index aff26b0..a118fb5 100644
--- a/redisjson/src/test/java/io/github/dengliming/redismodule/redisjson/AbstractTest.java
+++ b/redisjson/src/test/java/io/github/dengliming/redismodule/redisjson/AbstractTest.java
@@ -47,4 +47,8 @@ public void destroy() {
public RedisJSON getRedisJSON() {
return redisJSONClient == null ? null : redisJSONClient.getRedisJSON();
}
+
+ public RedisJSONBatch getRedisJSONBatch() {
+ return redisJSONClient == null ? null : redisJSONClient.createRedisJSONBatch();
+ }
}
diff --git a/redisjson/src/test/java/io/github/dengliming/redismodule/redisjson/RedisJSONTest.java b/redisjson/src/test/java/io/github/dengliming/redismodule/redisjson/RedisJSONTest.java
index ef6a21d..e16a478 100644
--- a/redisjson/src/test/java/io/github/dengliming/redismodule/redisjson/RedisJSONTest.java
+++ b/redisjson/src/test/java/io/github/dengliming/redismodule/redisjson/RedisJSONTest.java
@@ -20,6 +20,7 @@
import io.github.dengliming.redismodule.redisjson.args.SetArgs;
import io.github.dengliming.redismodule.redisjson.utils.GsonUtils;
import org.junit.jupiter.api.Test;
+import org.redisson.api.BatchResult;
import java.util.ArrayList;
import java.util.Arrays;
@@ -177,4 +178,23 @@ public void testObjKeys() {
assertThat(redisJSON.objKeys(key, ".")).containsExactly("a", "nested");
assertThat(redisJSON.objKeys(key, "$..a")).containsExactly(null, Arrays.asList("b", "c"));
}
+
+ @Test
+ public void testPipelining() {
+ String key = "foo";
+ Map m = new HashMap<>();
+ m.put("id", 1);
+ m.put("names", new ArrayList<>());
+
+ RedisJSONBatch batch = getRedisJSONBatch();
+ RedisJSON redisJSON = batch.getRedisJSON();
+ redisJSON.setAsync(key, SetArgs.Builder.create(".", GsonUtils.toJson(m)));
+ redisJSON.objLenAsync(key, ".");
+ redisJSON.objLenAsync("not exist", ".");
+ BatchResult res = batch.execute();
+ assertThat(res.getResponses().size()).isEqualTo(3);
+ assertThat(res.getResponses().get(0)).isEqualTo("OK");
+ assertThat(res.getResponses().get(1)).isEqualTo(2L);
+ assertThat(res.getResponses().get(2)).isEqualTo(0L);
+ }
}