diff --git a/docs/connectors-en/redis/redis-sink.md b/docs/connectors-en/redis/redis-sink.md
index 6d27444a7d..307f9ccfb8 100644
--- a/docs/connectors-en/redis/redis-sink.md
+++ b/docs/connectors-en/redis/redis-sink.md
@@ -181,6 +181,23 @@ All major versions
- Default:(none)
+- **type和mode**
+ - Description:Type indicates the data type and mode indicates the write mode
+ - Options:string/list/set/zset/hash
+
+ | Type | Mode |
+ | ---- | ---- |
+ | string | set |
+ | list | lpush |
+ | list | rpush |
+ | set | sadd |
+ | zset | zadd |
+ | hash | hset |
+ - Required:yes
+ - Default:(none)
+
+
+
## 五、Data Types
| support | BOOLEAN、TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE、DECIMAL、STRING、VARCHAR、CHAR、TIMESTAMP、DATE、BINARY |
| ---------- | --- |
diff --git a/docs/connectors/redis/redis-sink.md b/docs/connectors/redis/redis-sink.md
index 251d2f21d5..33b629102e 100644
--- a/docs/connectors/redis/redis-sink.md
+++ b/docs/connectors/redis/redis-sink.md
@@ -180,6 +180,23 @@ redis sink
- 默认值:无
+- **type和mode**
+ - 描述:type 表示 value 的类型,mode 表示在选定的数据类型下的写入模式。
+ - 选项:string/list/set/zset/hash
+
+ | type | 描述 | mode | 说明 | 注意 |
+ | ---- | ---- | ---- | ---- | ---- |
+ | string | 字符串 | set | 存储这个数据,如果已经存在则覆盖 | |
+ | list | 字符串列表 | lpush | 在 list 最左边存储这个数据 | |
+ | list | 字符串列表 | rpush | 在 list 最右边存储这个数据 | |
+ | set | 字符串集合 | sadd | 向 set 集合中存储这个数据,如果已经存在则覆盖 | |
+ | zset | 有序字符串集合 | zadd | 向 zset 有序集合中存储这个数据,如果已经存在则覆盖 | 当 value 类型是 zset 时,数据源的每一行记录需要遵循相应的规范,即每一行记录除 key 以外,只能有一对 score 和 value,并且 score 必须在 value 前面,rediswriter 方能解析出哪一个 column 是 score,哪一个 column 是 value。 |
+ | hash | 哈希 | hset | 向 hash 有序集合中存储这个数据,如果已经存在则覆盖 | 当 value 类型是 hash 时,数据源的每一行记录需要遵循相应的规范,即每一行记录除 key 以外,只能有一对 attribute 和 value,并且 attribute 必须在 value 前面,Rediswriter 方能解析出哪一个 column 是 attribute,哪一个 column 是 value。 |
+ - 必选:是
+ - 默认值:无
+
+
+
## 五、数据类型
| 支持 | BOOLEAN、TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE、DECIMAL、STRING、VARCHAR、CHAR、TIMESTAMP、DATE、BINARY |
| --- | --- |
diff --git a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/options/RedisOptions.java b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/options/RedisOptions.java
index 6013b6f13d..9608fe1e20 100644
--- a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/options/RedisOptions.java
+++ b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/options/RedisOptions.java
@@ -50,7 +50,7 @@ public class RedisOptions {
ConfigOptions.key("redis.default.port")
.stringType()
.defaultValue("6379")
- .withDescription("edis.default.port");
+ .withDescription("redis.default.port");
public static final ConfigOption URL =
ConfigOptions.key("url").stringType().noDefaultValue().withDescription("url");
@@ -93,4 +93,10 @@ public class RedisOptions {
.intType()
.defaultValue(0)
.withDescription("keyExpiredTime");
+
+ public static final ConfigOption REDIS_DATA_TYPE =
+ ConfigOptions.key("type").stringType().noDefaultValue().withDescription("type");
+
+ public static final ConfigOption REDIS_DATA_MODE =
+ ConfigOptions.key("mode").stringType().noDefaultValue().withDescription("mode");
}
diff --git a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/table/RedisDynamicTableFactory.java b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/table/RedisDynamicTableFactory.java
index f8876134bc..2c086099e0 100644
--- a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/table/RedisDynamicTableFactory.java
+++ b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/table/RedisDynamicTableFactory.java
@@ -20,6 +20,8 @@
import com.dtstack.flinkx.connector.redis.conf.RedisConf;
import com.dtstack.flinkx.connector.redis.enums.RedisConnectType;
+import com.dtstack.flinkx.connector.redis.enums.RedisDataMode;
+import com.dtstack.flinkx.connector.redis.enums.RedisDataType;
import com.dtstack.flinkx.connector.redis.sink.RedisDynamicTableSink;
import com.dtstack.flinkx.connector.redis.source.RedisDynamicTableSource;
import com.dtstack.flinkx.lookup.conf.LookupConf;
@@ -50,6 +52,8 @@
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.MINIDLE;
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.PASSWORD;
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDISTYPE;
+import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_DATA_MODE;
+import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_DATA_TYPE;
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.TABLENAME;
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.TIMEOUT;
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.URL;
@@ -119,6 +123,8 @@ public Set> requiredOptions() {
Set> requiredOptions = new HashSet<>();
requiredOptions.add(URL);
requiredOptions.add(TABLENAME);
+ requiredOptions.add(REDIS_DATA_TYPE);
+ requiredOptions.add(REDIS_DATA_MODE);
return requiredOptions;
}
@@ -162,6 +168,8 @@ private RedisConf getRedisConf(ReadableConfig config, TableSchema schema) {
redisConf.setMaxIdle(config.get(MAXIDLE));
redisConf.setMinIdle(config.get(MINIDLE));
redisConf.setExpireTime(config.get(KEYEXPIREDTIME));
+ redisConf.setType(RedisDataType.getDataType(config.get(REDIS_DATA_TYPE)));
+ redisConf.setMode(RedisDataMode.getDataMode(config.get(REDIS_DATA_MODE)));
List keyFields = schema.getPrimaryKey().map(pk -> pk.getColumns()).orElse(null);
redisConf.setUpdateKey(keyFields);
diff --git a/flinkx-examples/sql/redis/stream_redis.sql b/flinkx-examples/sql/redis/stream_redis.sql
index 5f80e670ce..2ee3c05100 100644
--- a/flinkx-examples/sql/redis/stream_redis.sql
+++ b/flinkx-examples/sql/redis/stream_redis.sql
@@ -58,6 +58,8 @@ CREATE TABLE sink
,'min.idle' = '0' -- 最小空闲连接数 ,默认:0
-- ,'keyExpiredTime' = '1000' -- redis sink的key的过期时间。默认是0(永不过期),单位是s。默认:0
,'sink.parallelism' = '3' -- sink并行度
+ ,'type' = 'hash'
+ ,'mode' = 'hset'
);