Skip to content

Commit 3ff2fbe

Browse files
author
whiletrue
authored
Merge pull request #589 from jefftlin/flinkx-connectors-redis/fixSinkValues
[fix][connectors redis]redis sink values保留字段类型方便反序列化
2 parents 10bc231 + 97a5cc3 commit 3ff2fbe

1 file changed

Lines changed: 16 additions & 1 deletion

File tree

flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
package com.dtstack.flinkx.connector.redis.converter;
2020

21+
import com.dtstack.flinkx.conf.FieldConf;
2122
import com.dtstack.flinkx.connector.redis.conf.RedisConf;
2223
import com.dtstack.flinkx.connector.redis.enums.RedisDataMode;
2324
import com.dtstack.flinkx.connector.redis.enums.RedisDataType;
2425
import com.dtstack.flinkx.converter.AbstractRowConverter;
2526
import com.dtstack.flinkx.element.ColumnRowData;
2627
import com.dtstack.flinkx.element.column.StringColumn;
2728
import com.dtstack.flinkx.element.column.TimestampColumn;
29+
import com.dtstack.flinkx.util.JsonUtil;
2830

2931
import org.apache.flink.table.data.RowData;
3032
import org.apache.flink.table.types.logical.LogicalType;
@@ -34,7 +36,10 @@
3436

3537
import java.text.SimpleDateFormat;
3638
import java.util.ArrayList;
39+
import java.util.HashMap;
3740
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Objects;
3843

3944
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_CRITICAL_TIME;
4045
import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_KEY_VALUE_SIZE;
@@ -141,7 +146,17 @@ private String[] getValues(ColumnRowData row) {
141146
}
142147

143148
private String concatValues(ColumnRowData row) {
144-
return StringUtils.join(getValues(row), redisConf.getValueFieldDelimiter());
149+
List<FieldConf> columns = redisConf.getColumn();
150+
Map<String, Object> fieldMap = new HashMap<>();
151+
int index = 0;
152+
153+
for (FieldConf fieldConf : columns) {
154+
if (Objects.nonNull(row.getField(index))) {
155+
fieldMap.put(fieldConf.getName(), row.getField(index).getData());
156+
}
157+
index++;
158+
}
159+
return JsonUtil.toJson(fieldMap);
145160
}
146161

147162
private String concatKey(ColumnRowData row) {

0 commit comments

Comments
 (0)