diff --git a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java index 5dcd686fcb..45d00d2f0e 100644 --- a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java @@ -18,6 +18,7 @@ package com.dtstack.flinkx.connector.redis.converter; +import com.dtstack.flinkx.conf.FieldConf; import com.dtstack.flinkx.connector.redis.conf.RedisConf; import com.dtstack.flinkx.connector.redis.enums.RedisDataMode; import com.dtstack.flinkx.connector.redis.enums.RedisDataType; @@ -25,6 +26,7 @@ import com.dtstack.flinkx.element.ColumnRowData; import com.dtstack.flinkx.element.column.StringColumn; import com.dtstack.flinkx.element.column.TimestampColumn; +import com.dtstack.flinkx.util.JsonUtil; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; @@ -34,7 +36,10 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_CRITICAL_TIME; import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_KEY_VALUE_SIZE; @@ -141,7 +146,17 @@ private String[] getValues(ColumnRowData row) { } private String concatValues(ColumnRowData row) { - return StringUtils.join(getValues(row), redisConf.getValueFieldDelimiter()); + List columns = redisConf.getColumn(); + Map fieldMap = new HashMap<>(); + int index = 0; + + for (FieldConf fieldConf : columns) { + if (Objects.nonNull(row.getField(index))) { + fieldMap.put(fieldConf.getName(), row.getField(index).getData()); + } + index++; + } + return JsonUtil.toJson(fieldMap); } private String concatKey(ColumnRowData row) {