diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsOrcColumnConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsOrcColumnConverter.java index e15140cbe9..a4df99af9a 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsOrcColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsOrcColumnConverter.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.hdfs.converter; import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; @@ -61,8 +62,8 @@ public class HdfsOrcColumnConverter private List ColumnNameList; private transient Map decimalColInfo; - public HdfsOrcColumnConverter(List fieldConfList) { - super(fieldConfList.size()); + public HdfsOrcColumnConverter(List fieldConfList, HdfsConf hdfsConf) { + super(fieldConfList.size(), hdfsConf); for (int i = 0; i < fieldConfList.size(); i++) { String type = fieldConfList.get(i).getType(); int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); @@ -83,12 +84,15 @@ public RowData toInternal(RowData input) throws Exception { ColumnRowData row = new ColumnRowData(input.getArity()); if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; + List fieldConfList = commonConf.getColumn(); for (int i = 0; i < input.getArity(); i++) { row.addField( - (AbstractBaseColumn) - toInternalConverters - .get(i) - .deserialize(genericRowData.getField(i))); + assembleFieldProps( + fieldConfList.get(i), + (AbstractBaseColumn) + toInternalConverters + .get(i) + .deserialize(genericRowData.getField(i)))); } } else { throw new ChunJunRuntimeException( diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetColumnConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetColumnConverter.java index f5d5bdac4f..63f54be4ff 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetColumnConverter.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.hdfs.converter; import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf; import com.dtstack.chunjun.connector.hdfs.util.HdfsUtil; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.converter.AbstractRowConverter; @@ -64,8 +65,8 @@ public class HdfsParquetColumnConverter private List columnNameList; private transient Map decimalColInfo; - public HdfsParquetColumnConverter(List fieldConfList) { - super(fieldConfList.size()); + public HdfsParquetColumnConverter(List fieldConfList, HdfsConf hdfsConf) { + super(fieldConfList.size(), hdfsConf); for (int i = 0; i < fieldConfList.size(); i++) { String type = fieldConfList.get(i).getType(); int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); @@ -86,12 +87,15 @@ public RowData toInternal(RowData input) throws Exception { ColumnRowData row = new ColumnRowData(input.getArity()); if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; + List fieldConfList = commonConf.getColumn(); for (int i = 0; i < input.getArity(); i++) { row.addField( - (AbstractBaseColumn) - toInternalConverters - .get(i) - .deserialize(genericRowData.getField(i))); + assembleFieldProps( + fieldConfList.get(i), + (AbstractBaseColumn) + toInternalConverters + .get(i) + .deserialize(genericRowData.getField(i)))); } } else { throw new ChunJunRuntimeException( diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java index ae00fc3e41..fd016203ab 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.hdfs.converter; import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; @@ -51,8 +52,8 @@ public class HdfsTextColumnConverter extends AbstractRowConverter { - public HdfsTextColumnConverter(List fieldConfList) { - super(fieldConfList.size()); + public HdfsTextColumnConverter(List fieldConfList, HdfsConf hdfsConf) { + super(fieldConfList.size(), hdfsConf); for (int i = 0; i < fieldConfList.size(); i++) { String type = fieldConfList.get(i).getType(); int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); @@ -73,12 +74,15 @@ public RowData toInternal(RowData input) throws Exception { ColumnRowData row = new ColumnRowData(input.getArity()); if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; + List fieldConfList = commonConf.getColumn(); for (int i = 0; i < input.getArity(); i++) { row.addField( - (AbstractBaseColumn) - toInternalConverters - .get(i) - .deserialize(genericRowData.getField(i))); + assembleFieldProps( + fieldConfList.get(i), + (AbstractBaseColumn) + toInternalConverters + .get(i) + .deserialize(genericRowData.getField(i)))); } } else { throw new ChunJunRuntimeException( diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsSinkFactory.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsSinkFactory.java index 8242a951a3..00b3006018 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsSinkFactory.java @@ -57,7 +57,8 @@ public DataStreamSink createSink(DataStream dataSet) { useAbstractBaseColumn, hdfsConf.getFileType(), hdfsConf.getColumn(), - getRawTypeConverter()); + getRawTypeConverter(), + hdfsConf); builder.setRowConverter(rowConverter, useAbstractBaseColumn); return createOutput(dataSet, builder.finish()); diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsSourceFactory.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsSourceFactory.java index a6dcec67fb..f4aa0b50e1 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsSourceFactory.java @@ -56,7 +56,8 @@ public DataStream createSource() { useAbstractBaseColumn, hdfsConf.getFileType(), hdfsConf.getColumn(), - getRawTypeConverter()); + getRawTypeConverter(), + hdfsConf); builder.setRowConverter(rowConverter, useAbstractBaseColumn); return createInput(builder.finish()); diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/util/HdfsUtil.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/util/HdfsUtil.java index b83d51daab..6757e0a625 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/util/HdfsUtil.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/util/HdfsUtil.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.hdfs.util; import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf; import com.dtstack.chunjun.connector.hdfs.converter.HdfsOrcColumnConverter; import com.dtstack.chunjun.connector.hdfs.converter.HdfsOrcRowConverter; import com.dtstack.chunjun.connector.hdfs.converter.HdfsParquetColumnConverter; @@ -331,18 +332,19 @@ public static AbstractRowConverter createRowConverter( boolean useAbstractBaseColumn, String fileType, List fieldConfList, - RawTypeConverter converter) { + RawTypeConverter converter, + HdfsConf hdfsConf) { AbstractRowConverter rowConverter; if (useAbstractBaseColumn) { switch (FileType.getByName(fileType)) { case ORC: - rowConverter = new HdfsOrcColumnConverter(fieldConfList); + rowConverter = new HdfsOrcColumnConverter(fieldConfList, hdfsConf); break; case PARQUET: - rowConverter = new HdfsParquetColumnConverter(fieldConfList); + rowConverter = new HdfsParquetColumnConverter(fieldConfList, hdfsConf); break; default: - rowConverter = new HdfsTextColumnConverter(fieldConfList); + rowConverter = new HdfsTextColumnConverter(fieldConfList, hdfsConf); } } else { RowType rowType = TableUtil.createRowType(fieldConfList, converter); diff --git a/chunjun-connectors/chunjun-connector-hive/src/main/java/com/dtstack/chunjun/connector/hive/sink/HiveOutputFormat.java b/chunjun-connectors/chunjun-connector-hive/src/main/java/com/dtstack/chunjun/connector/hive/sink/HiveOutputFormat.java index 0bf1034861..11e0092bb4 100644 --- a/chunjun-connectors/chunjun-connector-hive/src/main/java/com/dtstack/chunjun/connector/hive/sink/HiveOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hive/src/main/java/com/dtstack/chunjun/connector/hive/sink/HiveOutputFormat.java @@ -322,7 +322,8 @@ private BaseHdfsOutputFormat createHdfsOutputFormat( useAbstractBaseColumn, copyHiveConf.getFileType(), fieldConfList, - HdfsRawTypeConverter::apply), + HdfsRawTypeConverter::apply, + hiveConf), useAbstractBaseColumn); builder.setInitAccumulatorAndDirty(false); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java index 99615653ca..74700fa6fb 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java @@ -87,6 +87,12 @@ public AbstractRowConverter(int converterSize) { this.toExternalConverters = new ArrayList<>(converterSize); } + public AbstractRowConverter(int converterSize, ChunJunCommonConf commonConf) { + this.toInternalConverters = new ArrayList<>(converterSize); + this.toExternalConverters = new ArrayList<>(converterSize); + this.commonConf = commonConf; + } + protected IDeserializationConverter wrapIntoNullableInternalConverter( IDeserializationConverter IDeserializationConverter) { return val -> { @@ -187,6 +193,7 @@ protected ISerializationConverter wrapIntoNullableExternalConverter( public RowData toInternalLookup(LookupT input) throws Exception { throw new RuntimeException("Subclass need rewriting"); } + /** * BinaryRowData *