diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcColumnConverter.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcColumnConverter.java index 4567f1f751..6b6648fd33 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcColumnConverter.java @@ -21,6 +21,7 @@ import com.dtstack.chunjun.conf.ChunJunCommonConf; import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; import com.dtstack.chunjun.converter.ISerializationConverter; @@ -88,12 +89,25 @@ public JdbcColumnConverter(RowType rowType, ChunJunCommonConf commonConf) { @SuppressWarnings("unchecked") public RowData toInternal(ResultSet resultSet) throws Exception { List fieldConfList = commonConf.getColumn(); - ColumnRowData result = new ColumnRowData(fieldConfList.size()); + ColumnRowData result; + if (fieldConfList.size() == 1 + && ConstantValue.STAR_SYMBOL.equals(fieldConfList.get(0).getName())) { + result = new ColumnRowData(fieldTypes.length); + for (int index = 0; index < fieldTypes.length; index++) { + Object field = resultSet.getObject(index + 1); + AbstractBaseColumn baseColumn = + (AbstractBaseColumn) toInternalConverters.get(index).deserialize(field); + result.addField(baseColumn); + } + return result; + } int converterIndex = 0; + result = new ColumnRowData(fieldConfList.size()); for (FieldConf fieldConf : fieldConfList) { AbstractBaseColumn baseColumn = null; if (StringUtils.isBlank(fieldConf.getValue())) { Object field = resultSet.getObject(converterIndex + 1); + baseColumn = (AbstractBaseColumn) toInternalConverters.get(converterIndex).deserialize(field); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java index d4f55c5b9a..8a4a6952dd 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java @@ -38,6 +38,7 @@ import java.io.InputStream; import java.io.Reader; import java.math.BigDecimal; +import java.sql.Array; import java.sql.Connection; import java.sql.Date; import java.sql.ResultSet; @@ -354,6 +355,11 @@ public void setClob(int fieldIndex, Reader reader) throws SQLException { currentFieldNamedPstmt.setClob(fieldIndex, reader); } + @Override + public void setArray(int fieldIndex, Array array) throws SQLException { + currentFieldNamedPstmt.setArray(fieldIndex, array); + } + @Override public void close() throws SQLException { currentFieldNamedPstmt.close(); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatement.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatement.java index 6fec50924e..0ae01b9bae 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatement.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatement.java @@ -20,6 +20,7 @@ import java.io.InputStream; import java.io.Reader; import java.math.BigDecimal; +import java.sql.Array; import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; @@ -250,6 +251,9 @@ static FieldNamedPreparedStatement prepareStatement( void setBlob(int fieldIndex, InputStream is) throws SQLException; void setClob(int fieldIndex, Reader reader) throws SQLException; + + void setArray(int fieldIndex, Array array) throws SQLException; + /** * Releases this Statement object's database and JDBC resources immediately instead * of waiting for this to happen when it is automatically closed. It is generally good practice diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java index 9e0e178791..f6072e6885 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java @@ -20,6 +20,7 @@ import java.io.InputStream; import java.io.Reader; import java.math.BigDecimal; +import java.sql.Array; import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; @@ -258,6 +259,13 @@ public void setClob(int fieldIndex, Reader reader) throws SQLException { } } + @Override + public void setArray(int fieldIndex, Array array) throws SQLException { + for (int index : indexMapping[fieldIndex]) { + statement.setArray(index, array); + } + } + @Override public void close() throws SQLException { statement.close(); diff --git a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/converter/PostgresqlColumnConverter.java b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/converter/PostgresqlColumnConverter.java new file mode 100644 index 0000000000..18a6be5293 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/converter/PostgresqlColumnConverter.java @@ -0,0 +1,227 @@ +/* + * + * * + * * * Licensed to the Apache Software Foundation (ASF) under one + * * * or more contributor license agreements. See the NOTICE file + * * * distributed with this work for additional information + * * * regarding copyright ownership. The ASF licenses this file + * * * to you under the Apache License, Version 2.0 (the + * * * "License"); you may not use this file except in compliance + * * * with the License. You may obtain a copy of the License at + * * * + * * * http://www.apache.org/licenses/LICENSE-2.0 + * * * + * * * Unless required by applicable law or agreed to in writing, software + * * * distributed under the License is distributed on an "AS IS" BASIS, + * * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * * See the License for the specific language governing permissions and + * * * limitations under the License. + * * + * + */ + +package com.dtstack.chunjun.connector.postgresql.converter; + +import com.dtstack.chunjun.conf.ChunJunCommonConf; +import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter; +import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; +import com.dtstack.chunjun.converter.IDeserializationConverter; +import com.dtstack.chunjun.converter.ISerializationConverter; +import com.dtstack.chunjun.element.AbstractBaseColumn; +import com.dtstack.chunjun.element.ColumnRowData; +import com.dtstack.chunjun.element.column.ArrayColumn; +import com.dtstack.chunjun.element.column.BigDecimalColumn; +import com.dtstack.chunjun.element.column.BooleanColumn; +import com.dtstack.chunjun.element.column.BytesColumn; +import com.dtstack.chunjun.element.column.SqlDateColumn; +import com.dtstack.chunjun.element.column.StringColumn; +import com.dtstack.chunjun.element.column.TimeColumn; +import com.dtstack.chunjun.element.column.TimestampColumn; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; + +import org.postgresql.core.BaseConnection; +import org.postgresql.core.Oid; +import org.postgresql.jdbc.PgArray; + +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Company:www.dtstack.com. + * + * @author shitou + * @date 2022/4/14 + */ +public class PostgresqlColumnConverter extends JdbcColumnConverter { + + private List fieldTypeList; + private transient BaseConnection connection; + private static final Map arrayType = new HashMap<>(); + + public PostgresqlColumnConverter(RowType rowType, ChunJunCommonConf commonConf) { + super(rowType, commonConf); + } + + static { + arrayType.put("_int4", Oid.INT4_ARRAY); + arrayType.put("_int8", Oid.INT8_ARRAY); + arrayType.put("_float4", Oid.FLOAT4_ARRAY); + arrayType.put("_text", Oid.TEXT_ARRAY); + } + + @Override + public FieldNamedPreparedStatement toExternal( + RowData rowData, FieldNamedPreparedStatement statement) throws Exception { + for (int index = 0; index < rowData.getArity(); index++) { + if (arrayType.containsKey(fieldTypeList.get(index))) { + // eg: {1000,1000,10001}、{{1000,1000,10001},{1,2,3}} + String field = ((ColumnRowData) rowData).getField(index).asString(); + Array array = + new PgArray(connection, arrayType.get(fieldTypeList.get(index)), field); + AbstractBaseColumn arrayColumn = new ArrayColumn(array); + ((ColumnRowData) rowData).setField(index, arrayColumn); + } + toExternalConverters.get(index).serialize(rowData, index, statement); + } + return statement; + } + + @Override + protected IDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return val -> new BooleanColumn(Boolean.parseBoolean(val.toString())); + case TINYINT: + return val -> new BigDecimalColumn(((Integer) val).byteValue()); + case SMALLINT: + case INTEGER: + return val -> new BigDecimalColumn((Integer) val); + case INTERVAL_YEAR_MONTH: + return (IDeserializationConverter) + val -> { + YearMonthIntervalType yearMonthIntervalType = + (YearMonthIntervalType) type; + switch (yearMonthIntervalType.getResolution()) { + case YEAR: + return new BigDecimalColumn( + Integer.parseInt(String.valueOf(val).substring(0, 4))); + case MONTH: + case YEAR_TO_MONTH: + default: + throw new UnsupportedOperationException( + "jdbc converter only support YEAR"); + } + }; + case FLOAT: + return val -> new BigDecimalColumn((Float) val); + case DOUBLE: + return val -> new BigDecimalColumn((Double) val); + case BIGINT: + return val -> new BigDecimalColumn((Long) val); + case DECIMAL: + return val -> new BigDecimalColumn((BigDecimal) val); + case CHAR: + case VARCHAR: + return val -> new StringColumn((String) val); + case DATE: + return val -> new SqlDateColumn((Date) val); + case TIME_WITHOUT_TIME_ZONE: + return val -> new TimeColumn((Time) val); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (IDeserializationConverter) + val -> + new TimestampColumn( + (Timestamp) val, ((TimestampType) (type)).getPrecision()); + + case BINARY: + case VARBINARY: + return val -> new BytesColumn((byte[]) val); + case ARRAY: + // integer[] -> {1000,1000,10001} + // integer[][]-> {{1000,1000,10001},{1,2,3}} + return val -> new StringColumn(((Array) val).toString()); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + @Override + protected ISerializationConverter createExternalConverter( + LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return (val, index, statement) -> + statement.setBoolean( + index, ((ColumnRowData) val).getField(index).asBoolean()); + case TINYINT: + return (val, index, statement) -> statement.setByte(index, val.getByte(index)); + case SMALLINT: + case INTEGER: + case INTERVAL_YEAR_MONTH: + return (val, index, statement) -> + statement.setInt(index, ((ColumnRowData) val).getField(index).asYearInt()); + case FLOAT: + return (val, index, statement) -> + statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat()); + case DOUBLE: + return (val, index, statement) -> + statement.setDouble( + index, ((ColumnRowData) val).getField(index).asDouble()); + + case BIGINT: + return (val, index, statement) -> + statement.setLong(index, ((ColumnRowData) val).getField(index).asLong()); + case DECIMAL: + return (val, index, statement) -> + statement.setBigDecimal( + index, ((ColumnRowData) val).getField(index).asBigDecimal()); + case CHAR: + case VARCHAR: + return (val, index, statement) -> + statement.setString( + index, ((ColumnRowData) val).getField(index).asString()); + case DATE: + return (val, index, statement) -> + statement.setDate(index, ((ColumnRowData) val).getField(index).asSqlDate()); + case TIME_WITHOUT_TIME_ZONE: + return (val, index, statement) -> + statement.setTime(index, ((ColumnRowData) val).getField(index).asTime()); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return (val, index, statement) -> + statement.setTimestamp( + index, ((ColumnRowData) val).getField(index).asTimestamp()); + + case BINARY: + case VARBINARY: + return (val, index, statement) -> + statement.setBytes(index, ((ColumnRowData) val).getField(index).asBytes()); + case ARRAY: + return (val, index, statement) -> + statement.setArray( + index, (Array) ((ColumnRowData) val).getField(index).getData()); + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + public void setFieldTypeList(List fieldTypeList) { + this.fieldTypeList = fieldTypeList; + } + + public void setConnection(BaseConnection connection) { + this.connection = connection; + } +} diff --git a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/converter/PostgresqlRawTypeConverter.java b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/converter/PostgresqlRawTypeConverter.java index 08dab1f5a1..4e5de7f988 100644 --- a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/converter/PostgresqlRawTypeConverter.java +++ b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/converter/PostgresqlRawTypeConverter.java @@ -21,7 +21,11 @@ import com.dtstack.chunjun.throwable.UnsupportedTypeException; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.VarCharType; import java.util.Locale; @@ -95,6 +99,13 @@ public static DataType apply(String type) { case "BOOLEAN": case "BOOL": return DataTypes.BOOLEAN(); + case "_INT4": + case "_INT8": + return DataTypes.ARRAY(new AtomicDataType(new IntType())); + case "_TEXT": + return DataTypes.ARRAY(new AtomicDataType(new VarCharType())); + case "_FLOAT4": + return DataTypes.ARRAY(new AtomicDataType(new FloatType())); // 以下类型无法支持 // Enumerated Types diff --git a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java index 4fc5225456..783e5fe56f 100644 --- a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java +++ b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/dialect/PostgresqlDialect.java @@ -18,13 +18,22 @@ package com.dtstack.chunjun.connector.postgresql.dialect; +import com.dtstack.chunjun.conf.ChunJunCommonConf; import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement; import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; +import com.dtstack.chunjun.connector.postgresql.converter.PostgresqlColumnConverter; import com.dtstack.chunjun.connector.postgresql.converter.PostgresqlRawTypeConverter; +import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.RawTypeConverter; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import io.vertx.core.json.JsonArray; import org.apache.commons.lang3.StringUtils; +import java.sql.ResultSet; import java.util.Arrays; import java.util.Optional; import java.util.stream.Collectors; @@ -58,6 +67,12 @@ public RawTypeConverter getRawTypeConverter() { return PostgresqlRawTypeConverter::apply; } + @Override + public AbstractRowConverter + getColumnConverter(RowType rowType, ChunJunCommonConf commonConf) { + return new PostgresqlColumnConverter(rowType, commonConf); + } + @Override public Optional defaultDriverName() { return Optional.of(DRIVER); diff --git a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java index 674f4dc4b3..9058805fab 100644 --- a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/sink/PostgresOutputFormat.java @@ -20,6 +20,7 @@ import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter; import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat; +import com.dtstack.chunjun.connector.postgresql.converter.PostgresqlColumnConverter; import com.dtstack.chunjun.connector.postgresql.dialect.PostgresqlDialect; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.element.ColumnRowData; @@ -87,6 +88,10 @@ protected void openInternal(int taskNumber, int numTasks) { LOG.info("write sql:{}", copySql); } checkUpsert(); + if (rowConverter instanceof PostgresqlColumnConverter) { + ((PostgresqlColumnConverter) rowConverter).setConnection((BaseConnection) dbConn); + ((PostgresqlColumnConverter) rowConverter).setFieldTypeList(columnTypeList); + } } catch (SQLException sqe) { throw new IllegalArgumentException("checkUpsert() failed.", sqe); } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/element/column/ArrayColumn.java b/chunjun-core/src/main/java/com/dtstack/chunjun/element/column/ArrayColumn.java new file mode 100644 index 0000000000..c4ddfde39c --- /dev/null +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/element/column/ArrayColumn.java @@ -0,0 +1,91 @@ +/* + * + * * + * * * Licensed to the Apache Software Foundation (ASF) under one + * * * or more contributor license agreements. See the NOTICE file + * * * distributed with this work for additional information + * * * regarding copyright ownership. The ASF licenses this file + * * * to you under the Apache License, Version 2.0 (the + * * * "License"); you may not use this file except in compliance + * * * with the License. You may obtain a copy of the License at + * * * + * * * http://www.apache.org/licenses/LICENSE-2.0 + * * * + * * * Unless required by applicable law or agreed to in writing, software + * * * distributed under the License is distributed on an "AS IS" BASIS, + * * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * * See the License for the specific language governing permissions and + * * * limitations under the License. + * * + * + */ + +package com.dtstack.chunjun.element.column; + +import com.dtstack.chunjun.element.AbstractBaseColumn; + +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * Company:www.dtstack.com. + * + * @author shitou + * @date 2022/4/15 + */ +public class ArrayColumn extends AbstractBaseColumn { + + private static final long serialVersionUID = 1L; + protected Array data; + + public ArrayColumn(final Array data) { + super(data); + this.data = data; + } + + @Override + public Boolean asBoolean() { + return null; + } + + @Override + public byte[] asBytes() { + return new byte[0]; + } + + @Override + public String asString() { + if (data == null) { + return null; + } + return data.toString(); + } + + @Override + public BigDecimal asBigDecimal() { + return null; + } + + @Override + public Timestamp asTimestamp() { + return null; + } + + @Override + public Time asTime() { + return null; + } + + @Override + public Date asSqlDate() { + return null; + } + + @Override + public String asTimestampStr() { + return null; + } +}