diff --git a/README.md b/README.md index f45c216fbb..ca572bee4d 100644 --- a/README.md +++ b/README.md @@ -21,13 +21,13 @@ Official website of ChunJun: https://dtstack.github.io/chunjun/ ChunJun abstracts different databases into reader/source plugins, writer/sink plugins and lookup plugins, and it has the following features: - Based on the real-time computing engine--Flink, and supports JSON template and SQL script configuration tasks. The SQL script is compatible with Flink SQL syntax; -- Support distributed operation, support flink-standalone, yarn-session, yarn-per job and other submission methods; -- Support Docker one-click deployment, support deploy and run on k8s; +- Supports distributed operation, support flink-standalone, yarn-session, yarn-per job and other submission methods; +- Supports Docker one-click deployment, support deploy and run on k8s; - Supports a variety of heterogeneous data sources, and supports synchronization and calculation of more than 20 data sources such as MySQL, Oracle, SQLServer, Hive, Kudu, etc. - Easy to expand, highly flexible, newly expanded data source plugins can integrate with existing data source plugins instantly, plugin developers do not need to care about the code logic of other plugins; - Not only supports full synchronization, but also supports incremental synchronization and interval training; - Not only supports offline synchronization and calculation, but also compatible with real-time scenarios; -- Support dirty data storage, and provide indicator monitoring, etc.; +- Supports dirty data storage, and provide indicator monitoring, etc.; - Cooperate with the flink checkpoint mechanism to achieve breakpoint resuming, task disaster recovery; - Not only supports synchronizing DML data, but also supports DDL synchronization, like 'CREATE TABLE', 'ALTER COLUMN', etc.; diff --git a/chunjun-connectors/chunjun-connector-elasticsearch-base/src/main/java/com/dtstack/chunjun/connector/elasticsearch/ElasticsearchRowConverter.java b/chunjun-connectors/chunjun-connector-elasticsearch-base/src/main/java/com/dtstack/chunjun/connector/elasticsearch/ElasticsearchRowConverter.java index f1df11dd97..b180529910 100644 --- a/chunjun-connectors/chunjun-connector-elasticsearch-base/src/main/java/com/dtstack/chunjun/connector/elasticsearch/ElasticsearchRowConverter.java +++ b/chunjun-connectors/chunjun-connector-elasticsearch-base/src/main/java/com/dtstack/chunjun/connector/elasticsearch/ElasticsearchRowConverter.java @@ -21,15 +21,22 @@ import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; import com.dtstack.chunjun.converter.ISerializationConverter; +import com.dtstack.chunjun.util.ExternalDataUtil; +import com.dtstack.chunjun.util.GsonUtil; +import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; @@ -45,6 +52,7 @@ import java.time.LocalDate; import java.time.LocalTime; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -68,15 +76,15 @@ public class ElasticsearchRowConverter public ElasticsearchRowConverter(RowType rowType) { super(rowType); - List fieldNames = rowType.getFieldNames(); - for (int i = 0; i < rowType.getFieldCount(); i++) { + List fields = rowType.getFields(); + + for (int i = 0; i < fields.size(); i++) { toInternalConverters.add( - wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i)))); + wrapIntoNullableInternalConverter(createInternalConverter(fields.get(i)))); toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i])); - typeIndexList.add(new Tuple3<>(fieldNames.get(i), i, rowType.getTypeAt(i))); + createExternalConverter(fields.get(i)), fields.get(i).getType())); + typeIndexList.add(new Tuple3<>(fields.get(i).getName(), i, fields.get(i).getType())); } } @@ -135,9 +143,9 @@ public Map toExternal(RowData rowData, Map outpu return output; } - @Override protected ISerializationConverter> createExternalConverter( - LogicalType type) { + RowType.RowField rowField) { + LogicalType type = rowField.getType(); switch (type.getTypeRoot()) { case TINYINT: return (val, index, output) -> @@ -203,37 +211,77 @@ protected ISerializationConverter> createExternalConverter( throw new RuntimeException("Converter error.", e); } }; + case ROW: + return (val, index, output) -> { + List fields = ((RowType) type).getFields(); + HashMap map = new HashMap<>(); + for (int i = 0; i < fields.size(); i++) { + ExternalDataUtil.rowDataToExternal( + val.getRow(index, fields.size()), + i, + fields.get(i).getType(), + map, + fields.get(i).getName()); + } + output.put(typeIndexList.get(index)._1(), map); + }; + + case ARRAY: + return (val, index, output) -> { + ArrayData array = val.getArray(index); + Object[] obj = new Object[array.size()]; + ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, array); + output.put(typeIndexList.get(index)._1(), obj); + }; + case MAP: + return (val, index, output) -> { + MapData map = val.getMap(index); + Map resultMap = new HashMap<>(); + ExternalDataUtil.mapDataToExternal( + map, + ((MapType) type).getKeyType(), + ((MapType) type).getValueType(), + resultMap); + output.put(typeIndexList.get(index)._1(), resultMap); + }; + case MULTISET: + return (val, index, output) -> { + MapData map = val.getMap(index); + ArrayData arrayData = map.keyArray(); + Object[] obj = new Object[arrayData.size()]; + ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, arrayData); + output.put(typeIndexList.get(index)._1(), obj); + }; default: throw new UnsupportedOperationException("Unsupported type:" + type); } } - @Override - protected IDeserializationConverter createInternalConverter(LogicalType type) { - + protected IDeserializationConverter createInternalConverter(RowType.RowField rowField) { + LogicalType type = rowField.getType(); switch (type.getTypeRoot()) { case NULL: return val -> null; case BOOLEAN: - return val -> new Boolean(String.valueOf(val)); + return val -> val == null ? null : new Boolean(String.valueOf(val)); case FLOAT: - return val -> new Float(String.valueOf(val)); + return val -> val == null ? null : new Float(String.valueOf(val)); case DOUBLE: - return val -> new Double(String.valueOf(val)); + return val -> val == null ? null : new Double(String.valueOf(val)); case INTERVAL_YEAR_MONTH: case INTERVAL_DAY_TIME: - return val -> Time.valueOf(String.valueOf(val)); + return val -> val == null ? null : Time.valueOf(String.valueOf(val)); case INTEGER: - return val -> new Integer(String.valueOf(val)); + return val -> val == null ? null : new Integer(String.valueOf(val)); case BIGINT: - return val -> new Long(String.valueOf(val)); + return val -> val == null ? null : new Long(String.valueOf(val)); case TINYINT: - return val -> new Integer(String.valueOf(val)).byteValue(); + return val -> val == null ? null : new Integer(String.valueOf(val)).byteValue(); case SMALLINT: // Converter for small type that casts value to int and then return short value, // since // JDBC 1.0 use int type for small values. - return val -> new Integer(String.valueOf(val)).shortValue(); + return val -> val == null ? null : new Integer(String.valueOf(val)).shortValue(); case DECIMAL: final int precision = ((DecimalType) type).getPrecision(); final int scale = ((DecimalType) type).getScale(); @@ -241,31 +289,105 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { // decimal(20, 0) in SQL, // but other precision like decimal(30, 0) can work too from lenient consideration. return val -> - val instanceof BigInteger - ? DecimalData.fromBigDecimal( - new BigDecimal((BigInteger) val, 0), precision, scale) - : DecimalData.fromBigDecimal( - new BigDecimal(String.valueOf(val)), precision, scale); + val == null + ? null + : val instanceof BigInteger + ? DecimalData.fromBigDecimal( + new BigDecimal((BigInteger) val, 0), + precision, + scale) + : DecimalData.fromBigDecimal( + new BigDecimal(String.valueOf(val)), + precision, + scale); case DATE: return val -> - (int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay()); + val == null + ? null + : (int) + ((Date.valueOf(String.valueOf(val))) + .toLocalDate() + .toEpochDay()); case TIME_WITHOUT_TIME_ZONE: return val -> - (int) - ((Time.valueOf(String.valueOf(val))).toLocalTime().toNanoOfDay() - / 1_000_000L); + val == null + ? null + : (int) + ((Time.valueOf(String.valueOf(val))) + .toLocalTime() + .toNanoOfDay() + / 1_000_000L); case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - return val -> TimestampData.fromTimestamp(Timestamp.valueOf(String.valueOf(val))); + return val -> + val == null + ? null + : TimestampData.fromTimestamp( + Timestamp.valueOf(String.valueOf(val))); case CHAR: case VARCHAR: - return val -> StringData.fromString(val.toString()); + return val -> val == null ? null : StringData.fromString(val.toString()); case BINARY: case VARBINARY: - return val -> (byte[]) val; - case ARRAY: + return val -> val == null ? null : (byte[]) val; case ROW: + return val -> { + List childrenFields = ((RowType) type).getFields(); + Map val1 = (Map) val; + GenericRowData genericRowData = new GenericRowData(childrenFields.size()); + for (int i = 0; i < childrenFields.size(); i++) { + if (val1.get(childrenFields.get(i).getName()) == null) { + genericRowData.setField(i, null); + } else { + Object value = + createInternalConverter(childrenFields.get(i)) + .deserialize(val1.get(childrenFields.get(i).getName())); + genericRowData.setField(i, value); + } + } + return genericRowData; + }; + case ARRAY: + return (val) -> { + ArrayList list = (ArrayList) val; + Object[] result = new Object[list.size()]; + LogicalType logicalType = type.getChildren().get(0); + RowType.RowField rowField1 = new RowType.RowField("", logicalType, ""); + IDeserializationConverter internalConverter = + createInternalConverter(rowField1); + for (int i = 0; i < list.size(); i++) { + if (list.get(i) == null) { + result[i] = null; + } else { + Object value = internalConverter.deserialize(list.get(i)); + result[i] = value; + } + } + return new GenericArrayData(result); + }; case MAP: + return val -> { + if (val == null) { + return null; + } + HashMap resultMap = new HashMap<>(); + Map map = GsonUtil.GSON.fromJson(val.toString(), Map.class); + LogicalType keyType = ((MapType) type).getKeyType(); + LogicalType valueType = ((MapType) type).getValueType(); + RowType.RowField keyRowField = new RowType.RowField("", keyType, ""); + RowType.RowField valueRowField = new RowType.RowField("", valueType, ""); + IDeserializationConverter keyInternalConverter = + createInternalConverter(keyRowField); + IDeserializationConverter valueInternalConverter = + createInternalConverter(valueRowField); + for (Object key : map.keySet()) { + resultMap.put( + keyInternalConverter.deserialize(key), + valueInternalConverter.deserialize(map.get(key))); + } + + return new GenericMapData(resultMap); + }; case MULTISET: case RAW: default: diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcRowConverter.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcRowConverter.java index 3ec7648dc8..969b21f79b 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcRowConverter.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcRowConverter.java @@ -22,16 +22,23 @@ import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; import com.dtstack.chunjun.converter.ISerializationConverter; +import com.dtstack.chunjun.util.ExternalDataUtil; +import com.dtstack.chunjun.util.GsonUtil; import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil; +import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.utils.TypeConversions; @@ -40,12 +47,17 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.sql.Array; +import java.sql.Connection; import java.sql.Date; import java.sql.ResultSet; import java.sql.Time; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** Base class for all converters that convert between JDBC object and Flink internal object. */ public class JdbcRowConverter @@ -56,13 +68,13 @@ public class JdbcRowConverter public JdbcRowConverter(RowType rowType) { super(rowType); - for (int i = 0; i < rowType.getFieldCount(); i++) { + List fields = rowType.getFields(); + for (int i = 0; i < fields.size(); i++) { toInternalConverters.add( - wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i)))); + wrapIntoNullableInternalConverter(createInternalConverter(fields.get(i)))); toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i])); + createExternalConverter(fields.get(i)), fields.get(i).getType())); } } @@ -71,15 +83,22 @@ public JdbcRowConverter(RowType rowType) { wrapIntoNullableExternalConverter( ISerializationConverter serializationConverter, LogicalType type) { - final int sqlType = - JdbcTypeUtil.typeInformationToSqlType( - TypeConversions.fromDataTypeToLegacyInfo( - TypeConversions.fromLogicalToDataType(type))); + int sqlType = 0; + try { + // Exclude nested data types, such as ROW(id int,data ROW(id string)) + sqlType = + JdbcTypeUtil.typeInformationToSqlType( + TypeConversions.fromDataTypeToLegacyInfo( + TypeConversions.fromLogicalToDataType(type))); + } catch (IllegalArgumentException e) { + LOG.warn(e.getMessage()); + } + int finalSqlType = sqlType; return (val, index, statement) -> { if (val == null || val.isNullAt(index) || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) { - statement.setNull(index, sqlType); + statement.setNull(index, finalSqlType); } else { serializationConverter.serialize(val, index, statement); } @@ -115,8 +134,8 @@ public FieldNamedPreparedStatement toExternal( return statement; } - @Override - protected IDeserializationConverter createInternalConverter(LogicalType type) { + protected IDeserializationConverter createInternalConverter(RowType.RowField rowField) { + LogicalType type = rowField.getType(); switch (type.getTypeRoot()) { case NULL: return val -> null; @@ -159,23 +178,73 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { return val -> TimestampData.fromTimestamp((Timestamp) val); case CHAR: case VARCHAR: - return val -> StringData.fromString(val.toString()); + return val -> val == null ? null : StringData.fromString(val.toString()); case BINARY: case VARBINARY: return val -> (byte[]) val; case ARRAY: + return (val) -> { + Array val1 = (Array) val; + Object[] array = (Object[]) val1.getArray(); + Object[] result = new Object[array.length]; + LogicalType logicalType = type.getChildren().get(0); + RowType.RowField rowField1 = new RowType.RowField("", logicalType, ""); + IDeserializationConverter internalConverter = + createInternalConverter(rowField1); + for (int i = 0; i < array.length; i++) { + Object value = internalConverter.deserialize(array[i]); + result[i] = value; + } + return new GenericArrayData(result); + }; + case ROW: + return val -> { + List childrenFields = ((RowType) type).getFields(); + HashMap childrenData = GsonUtil.GSON.fromJson(val.toString(), HashMap.class); + GenericRowData genericRowData = new GenericRowData(childrenFields.size()); + for (int i = 0; i < childrenFields.size(); i++) { + Object value = + createInternalConverter(childrenFields.get(i)) + .deserialize( + childrenData.get(childrenFields.get(i).getName())); + genericRowData.setField(i, value); + } + return genericRowData; + }; case MAP: - case MULTISET: + return val -> { + if (val == null) { + return null; + } + HashMap resultMap = new HashMap<>(); + Map map = GsonUtil.GSON.fromJson(val.toString(), Map.class); + LogicalType keyType = ((MapType) type).getKeyType(); + LogicalType valueType = ((MapType) type).getValueType(); + RowType.RowField keyRowField = new RowType.RowField("", keyType, ""); + RowType.RowField valueRowField = new RowType.RowField("", valueType, ""); + IDeserializationConverter keyInternalConverter = + createInternalConverter(keyRowField); + IDeserializationConverter valueInternalConverter = + createInternalConverter(valueRowField); + for (Object key : map.keySet()) { + resultMap.put( + keyInternalConverter.deserialize(key), + valueInternalConverter.deserialize(map.get(key))); + } + + return new GenericMapData(resultMap); + }; + case STRUCTURED_TYPE: case RAW: default: throw new UnsupportedOperationException("Unsupported type:" + type); } } - @Override protected ISerializationConverter createExternalConverter( - LogicalType type) { + RowType.RowField rowField) { + LogicalType type = rowField.getType(); switch (type.getTypeRoot()) { case BOOLEAN: return (val, index, statement) -> @@ -226,10 +295,57 @@ protected ISerializationConverter createExternalCon index, val.getDecimal(index, decimalPrecision, decimalScale) .toBigDecimal()); + case ROW: + return (val, index, statement) -> { + List fields = ((RowType) type).getFields(); + HashMap map = new HashMap<>(); + for (int i = 0; i < fields.size(); i++) { + ExternalDataUtil.rowDataToExternal( + val.getRow(index, fields.size()), + i, + fields.get(i).getType(), + map, + fields.get(i).getName()); + } + + statement.setObject(index, GsonUtil.GSON.toJson(map)); + }; + case ARRAY: - case MAP: + return (val, index, statement) -> { + Connection connection = statement.getConnection(); + ArrayData array = val.getArray(index); + Object[] obj = new Object[array.size()]; + ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, array); + Array result = + connection.createArrayOf( + type.getChildren().get(0).getTypeRoot().name(), obj); + statement.setArray(index, result); + }; case MULTISET: - case ROW: + return (val, index, statement) -> { + Connection connection = statement.getConnection(); + MapData map = val.getMap(index); + ArrayData arrayData = map.keyArray(); + Object[] obj = new Object[arrayData.size()]; + ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, arrayData); + Array result = + connection.createArrayOf( + type.getChildren().get(0).getTypeRoot().name(), obj); + statement.setArray(index, result); + }; + case MAP: + return (val, index, statement) -> { + MapData map = val.getMap(index); + Map resultMap = new HashMap<>(); + ExternalDataUtil.mapDataToExternal( + map, + ((MapType) type).getKeyType(), + ((MapType) type).getValueType(), + resultMap); + statement.setObject(index, resultMap); + }; + case STRUCTURED_TYPE: case RAW: default: throw new UnsupportedOperationException("Unsupported type:" + type); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java index 7fe9681009..6ef65e88a4 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/sink/PreparedStmtProxy.java @@ -382,6 +382,11 @@ public void reOpen(Connection connection) throws SQLException { } } + @Override + public Connection getConnection() throws SQLException { + return currentFieldNamedPstmt.getConnection(); + } + public void clearStatementCache() { pstmtCache.invalidateAll(); } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatement.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatement.java index 374ec70b62..2d68871364 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatement.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatement.java @@ -266,4 +266,7 @@ static FieldNamedPreparedStatement prepareStatement( /** *Reopen the Statement */ void reOpen(Connection connection) throws SQLException; + + /** get the connection */ + Connection getConnection() throws SQLException; } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java index cdf6be5f30..d076a3138b 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java @@ -279,4 +279,9 @@ public void reOpen(Connection connection) throws SQLException { statement = null; statement = connection.prepareStatement(parsedSQL); } + + @Override + public Connection getConnection() throws SQLException { + return statement.getConnection(); + } } diff --git a/chunjun-connectors/chunjun-connector-stream/src/main/java/com/dtstack/chunjun/connector/stream/converter/StreamRowConverter.java b/chunjun-connectors/chunjun-connector-stream/src/main/java/com/dtstack/chunjun/connector/stream/converter/StreamRowConverter.java index 5c75fd1332..c9a6b5178a 100644 --- a/chunjun-connectors/chunjun-connector-stream/src/main/java/com/dtstack/chunjun/connector/stream/converter/StreamRowConverter.java +++ b/chunjun-connectors/chunjun-connector-stream/src/main/java/com/dtstack/chunjun/connector/stream/converter/StreamRowConverter.java @@ -21,9 +21,15 @@ import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; import com.dtstack.chunjun.converter.ISerializationConverter; +import com.dtstack.chunjun.util.ExternalDataUtil; +import com.dtstack.chunjun.util.GsonUtil; +import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; @@ -31,6 +37,7 @@ import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; @@ -42,6 +49,10 @@ import java.sql.Time; import java.time.LocalDate; import java.time.LocalTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; import static java.time.temporal.ChronoField.MILLI_OF_DAY; @@ -55,15 +66,17 @@ public class StreamRowConverter private static final long serialVersionUID = 1L; + private Random random = new Random(); + public StreamRowConverter(RowType rowType) { super(rowType); - for (int i = 0; i < rowType.getFieldCount(); i++) { + List fields = rowType.getFields(); + for (int i = 0; i < fields.size(); i++) { toInternalConverters.add( - wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i)))); + wrapIntoNullableInternalConverter(createInternalConverter(fields.get(i)))); toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i])); + createExternalConverter(fields.get(i)), fields.get(i).getType())); } } @@ -98,8 +111,8 @@ public RowData toExternal(RowData rowData, RowData output) throws Exception { return output; } - @Override - protected IDeserializationConverter createInternalConverter(LogicalType type) { + protected IDeserializationConverter createInternalConverter(RowType.RowField rowField) { + LogicalType type = rowField.getType(); switch (type.getTypeRoot()) { case NULL: return val -> null; @@ -152,17 +165,70 @@ protected IDeserializationConverter createInternalConverter(LogicalType type) { case VARBINARY: return val -> JMockData.mock(byte[].class); case ARRAY: + return val -> { + Object[] result = new Object[random.nextInt(10)]; + LogicalType logicalType = type.getChildren().get(0); + RowType.RowField rowField1 = new RowType.RowField("", logicalType, ""); + IDeserializationConverter internalConverter = + createInternalConverter(rowField1); + for (int i = 0; i < result.length; i++) { + Object value = internalConverter.deserialize(null); + result[i] = value; + } + return new GenericArrayData(result); + }; case ROW: + return val -> { + List childrenFields = ((RowType) type).getFields(); + GenericRowData genericRowData = new GenericRowData(childrenFields.size()); + for (int i = 0; i < childrenFields.size(); i++) { + Object value = + createInternalConverter(childrenFields.get(i)).deserialize(null); + genericRowData.setField(i, value); + } + return genericRowData; + }; case MAP: + return val -> { + HashMap resultMap = new HashMap<>(); + LogicalType keyType = ((MapType) type).getKeyType(); + LogicalType valueType = ((MapType) type).getValueType(); + RowType.RowField keyRowField = new RowType.RowField("", keyType, ""); + RowType.RowField valueRowField = new RowType.RowField("", valueType, ""); + IDeserializationConverter keyInternalConverter = + createInternalConverter(keyRowField); + IDeserializationConverter valueInternalConverter = + createInternalConverter(valueRowField); + for (int i = 0; i < random.nextInt(5); i++) { + resultMap.put( + keyInternalConverter.deserialize(null), + valueInternalConverter.deserialize(null)); + } + + return new GenericMapData(resultMap); + }; case MULTISET: + return val -> { + HashMap resultMap = new HashMap<>(); + LogicalType logicalType = type.getChildren().get(0); + RowType.RowField keyRowField = new RowType.RowField("", logicalType, ""); + IDeserializationConverter keyInternalConverter = + createInternalConverter(keyRowField); + for (int i = 0; i < random.nextInt(5); i++) { + resultMap.put(keyInternalConverter.deserialize(null), 1); + } + + return new GenericMapData(resultMap); + }; case RAW: default: throw new UnsupportedOperationException("Unsupported type:" + type); } } - @Override - protected ISerializationConverter createExternalConverter(LogicalType type) { + protected ISerializationConverter createExternalConverter( + RowType.RowField rowField) { + LogicalType type = rowField.getType(); switch (type.getTypeRoot()) { case BOOLEAN: return (val, index, rowData) -> rowData.setField(index, val.getBoolean(index)); @@ -219,9 +285,45 @@ protected ISerializationConverter createExternalConverter(Logica val.getDecimal(index, decimalPrecision, decimalScale) .toBigDecimal()); case ARRAY: + return (val, index, rowData) -> { + ArrayData array = val.getArray(index); + Object[] obj = new Object[array.size()]; + ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, array); + rowData.setField(index, GsonUtil.GSON.toJson(obj)); + }; case MAP: + return (val, index, rowData) -> { + MapData map = val.getMap(index); + Map resultMap = new HashMap<>(); + ExternalDataUtil.mapDataToExternal( + map, + ((MapType) type).getKeyType(), + ((MapType) type).getValueType(), + resultMap); + rowData.setField(index, GsonUtil.GSON.toJson(resultMap)); + }; case MULTISET: + return (val, index, rowData) -> { + MapData map = val.getMap(index); + ArrayData arrayData = map.keyArray(); + Object[] obj = new Object[arrayData.size()]; + ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, arrayData); + rowData.setField(index, GsonUtil.GSON.toJson(obj)); + }; case ROW: + return (val, index, rowData) -> { + List fields = ((RowType) type).getFields(); + HashMap map = new HashMap<>(); + for (int i = 0; i < fields.size(); i++) { + ExternalDataUtil.rowDataToExternal( + val.getRow(index, fields.size()), + i, + fields.get(i).getType(), + map, + fields.get(i).getName()); + } + rowData.setField(index, GsonUtil.GSON.toJson(map)); + }; case RAW: default: throw new UnsupportedOperationException("Unsupported type:" + type); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/util/ExternalDataUtil.java b/chunjun-core/src/main/java/com/dtstack/chunjun/util/ExternalDataUtil.java new file mode 100644 index 0000000000..6bdb22c939 --- /dev/null +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/util/ExternalDataUtil.java @@ -0,0 +1,368 @@ +package com.dtstack.chunjun.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; + +import java.sql.Date; +import java.sql.Time; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author: gaoasi + * @email: aschaser@163,com + * @date: 2022/10/25 7:00 下午 + */ +public class ExternalDataUtil { + + public static void arrayDataToExternal(LogicalType type, Object[] to, ArrayData from) { + switch (type.getTypeRoot()) { + case BOOLEAN: + for (int i = 0; i < to.length; i++) { + to[i] = from.getBoolean(i); + } + break; + case TINYINT: + for (int i = 0; i < to.length; i++) { + to[i] = from.getByte(i); + } + break; + case SMALLINT: + for (int i = 0; i < to.length; i++) { + to[i] = from.getShort(i); + } + break; + case INTEGER: + case INTERVAL_YEAR_MONTH: + for (int i = 0; i < to.length; i++) { + to[i] = from.getInt(i); + } + break; + case BIGINT: + case INTERVAL_DAY_TIME: + for (int i = 0; i < to.length; i++) { + to[i] = from.getLong(i); + } + break; + case FLOAT: + for (int i = 0; i < to.length; i++) { + to[i] = from.getFloat(i); + } + break; + case DOUBLE: + for (int i = 0; i < to.length; i++) { + to[i] = from.getDouble(i); + } + break; + case CHAR: + case VARCHAR: + // value is BinaryString + for (int i = 0; i < to.length; i++) { + to[i] = from.getString(i) == null ? null : from.getString(i).toString(); + } + break; + case BINARY: + case VARBINARY: + for (int i = 0; i < to.length; i++) { + to[i] = from.getBinary(i); + } + break; + case DATE: + for (int i = 0; i < to.length; i++) { + to[i] = Date.valueOf(LocalDate.ofEpochDay(from.getInt(i))); + } + break; + case TIME_WITHOUT_TIME_ZONE: + for (int i = 0; i < to.length; i++) { + to[i] = Time.valueOf(LocalTime.ofNanoOfDay(from.getInt(i) * 1_000_000L)); + } + break; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); + for (int i = 0; i < to.length; i++) { + to[i] = from.getTimestamp(i, timestampPrecision).toTimestamp(); + } + break; + case DECIMAL: + final int decimalPrecision = ((DecimalType) type).getPrecision(); + final int decimalScale = ((DecimalType) type).getScale(); + for (int i = 0; i < to.length; i++) { + to[i] = from.getDecimal(i, decimalPrecision, decimalScale).toBigDecimal(); + } + break; + case ROW: + List childrenFields = ((RowType) type).getFields(); + HashMap map = new HashMap<>(); + for (int i = 0; i < to.length; i++) { + RowData row = from.getRow(i, childrenFields.size()); + HashMap map1 = new HashMap<>(); + for (int j = 0; j < childrenFields.size(); j++) { + rowDataToExternal( + row, + j, + childrenFields.get(j).getType(), + map1, + childrenFields.get(j).getName()); + } + map.put(childrenFields.get(i).getName(), map1); + to[i] = map; + } + break; + case ARRAY: + for (int i = 0; i < to.length; i++) { + LogicalType logicalType = type.getChildren().get(0); + ArrayData array = from.getArray(i); + Object[] objects = new Object[array.size()]; + arrayDataToExternal(logicalType, objects, array); + to[i] = objects; + } + break; + case MAP: + for (int i = 0; i < to.length; i++) { + MapData tmpMap = from.getMap(i); + LogicalType keyType = ((MapType) type).getKeyType(); + LogicalType valueType = ((MapType) type).getValueType(); + Map resultMap = new HashMap<>(); + mapDataToExternal(tmpMap, keyType, valueType, resultMap); + to[i] = resultMap; + } + break; + case MULTISET: + for (int i = 0; i < to.length; i++) { + MapData tmpMap = from.getMap(i); + ArrayData arrayData = tmpMap.keyArray(); + LogicalType logicalType = type.getChildren().get(0); + Object[] obj = new Object[arrayData.size()]; + ExternalDataUtil.arrayDataToExternal(logicalType, obj, arrayData); + to[i] = obj; + } + break; + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + public static void rowDataToExternal( + RowData rowData, int pos, LogicalType type, Map map, String fieldName) { + switch (type.getTypeRoot()) { + case BOOLEAN: + map.put(fieldName, rowData.getBoolean(pos)); + break; + case TINYINT: + map.put(fieldName, rowData.getByte(pos)); + break; + case SMALLINT: + map.put(fieldName, rowData.getShort(pos)); + break; + case INTEGER: + case INTERVAL_YEAR_MONTH: + map.put(fieldName, rowData.getInt(pos)); + break; + case BIGINT: + case INTERVAL_DAY_TIME: + map.put(fieldName, rowData.getLong(pos)); + break; + case FLOAT: + map.put(fieldName, rowData.getFloat(pos)); + break; + case DOUBLE: + map.put(fieldName, rowData.getDouble(pos)); + break; + case CHAR: + case VARCHAR: + // value is BinaryString + String result = + rowData.getString(pos) == null ? null : rowData.getString(pos).toString(); + map.put(fieldName, result); + break; + case BINARY: + case VARBINARY: + map.put(fieldName, rowData.getBinary(pos)); + break; + case DATE: + map.put(fieldName, Date.valueOf(LocalDate.ofEpochDay(rowData.getInt(pos)))); + break; + case TIME_WITHOUT_TIME_ZONE: + map.put( + fieldName, + Time.valueOf(LocalTime.ofNanoOfDay(rowData.getInt(pos) * 1_000_000L))); + break; + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); + map.put(fieldName, rowData.getTimestamp(pos, timestampPrecision).toTimestamp()); + break; + case DECIMAL: + final int decimalPrecision = ((DecimalType) type).getPrecision(); + final int decimalScale = ((DecimalType) type).getScale(); + map.put( + fieldName, + rowData.getDecimal(pos, decimalPrecision, decimalScale).toBigDecimal()); + break; + case ROW: + List childrenFields = ((RowType) type).getFields(); + RowData row = rowData.getRow(pos, childrenFields.size()); + HashMap map1 = new HashMap<>(); + for (int i = 0; i < childrenFields.size(); i++) { + rowDataToExternal( + row, + i, + childrenFields.get(i).getType(), + map1, + childrenFields.get(i).getName()); + } + map.put(fieldName, map1); + break; + case ARRAY: + ArrayData array = rowData.getArray(pos); + Object[] objects = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + LogicalType logicalType = type.getChildren().get(0); + arrayDataToExternal(logicalType, objects, array); + } + map.put(fieldName, objects); + break; + case MAP: + MapData tmpMap = rowData.getMap(pos); + LogicalType keyType = ((MapType) type).getKeyType(); + LogicalType valueType = ((MapType) type).getValueType(); + Map resultMap = new HashMap<>(); + mapDataToExternal(tmpMap, keyType, valueType, resultMap); + map.put(fieldName, resultMap); + break; + case MULTISET: + MapData map2 = rowData.getMap(pos); + ArrayData arrayData = map2.keyArray(); + LogicalType logicalType = type.getChildren().get(0); + Object[] obj = new Object[arrayData.size()]; + ExternalDataUtil.arrayDataToExternal(logicalType, obj, arrayData); + map.put(fieldName, obj); + break; + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + + public static void mapDataToExternal( + MapData mapData, LogicalType keyType, LogicalType valueType, Map map) { + ArrayData keyArray = mapData.keyArray(); + ArrayData valueArray = mapData.valueArray(); + int size = keyArray.size(); + for (int i = 0; i < size; i++) { + Object key = toExternal(keyArray, i, keyType); + Object value = toExternal(valueArray, i, valueType); + map.put(key, value); + } + } + + public static Object toExternal(ArrayData arrayData, int pos, LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return arrayData.getBoolean(pos); + case TINYINT: + return arrayData.getByte(pos); + case SMALLINT: + return arrayData.getShort(pos); + case INTEGER: + case INTERVAL_YEAR_MONTH: + return arrayData.getInt(pos); + case BIGINT: + case INTERVAL_DAY_TIME: + return arrayData.getLong(pos); + case FLOAT: + return arrayData.getFloat(pos); + case DOUBLE: + return arrayData.getDouble(pos); + case CHAR: + case VARCHAR: + // value is BinaryString + + return arrayData.getString(pos) == null + ? null + : arrayData.getString(pos).toString(); + case BINARY: + case VARBINARY: + return arrayData.getBinary(pos); + case DATE: + return Date.valueOf(LocalDate.ofEpochDay(arrayData.getInt(pos))); + case TIME_WITHOUT_TIME_ZONE: + return Time.valueOf(LocalTime.ofNanoOfDay(arrayData.getInt(pos) * 1_000_000L)); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = ((TimestampType) type).getPrecision(); + return arrayData.getTimestamp(pos, timestampPrecision).toTimestamp(); + case DECIMAL: + final int decimalPrecision = ((DecimalType) type).getPrecision(); + final int decimalScale = ((DecimalType) type).getScale(); + return arrayData.getDecimal(pos, decimalPrecision, decimalScale).toBigDecimal(); + case ROW: + List childrenFields = ((RowType) type).getFields(); + RowData row = arrayData.getRow(pos, childrenFields.size()); + HashMap map1 = new HashMap<>(); + for (int i = 0; i < childrenFields.size(); i++) { + rowDataToExternal( + row, + i, + childrenFields.get(i).getType(), + map1, + childrenFields.get(i).getName()); + } + return map1; + case ARRAY: + ArrayData array = arrayData.getArray(pos); + Object[] objects = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + LogicalType logicalType = type.getChildren().get(0); + arrayDataToExternal(logicalType, objects, array); + } + return objects; + case MAP: + MapData map = arrayData.getMap(pos); + LogicalType keyType = ((MapType) type).getKeyType(); + LogicalType valueType = ((MapType) type).getValueType(); + Map resultMap = new HashMap<>(); + mapDataToExternal(map, keyType, valueType, resultMap); + return resultMap; + case MULTISET: + MapData map2 = arrayData.getMap(pos); + ArrayData keyArray = map2.keyArray(); + LogicalType logicalType = type.getChildren().get(0); + Object[] obj = new Object[keyArray.size()]; + ExternalDataUtil.arrayDataToExternal(logicalType, obj, arrayData); + return obj; + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } +} diff --git a/chunjun-examples/sql/elasticsearch7/insert_row_array_map_data_type.sql b/chunjun-examples/sql/elasticsearch7/insert_row_array_map_data_type.sql new file mode 100644 index 0000000000..19fc874bc4 --- /dev/null +++ b/chunjun-examples/sql/elasticsearch7/insert_row_array_map_data_type.sql @@ -0,0 +1,52 @@ +CREATE TABLE source +( + id INT, + name STRING, + money DECIMAL(32, 2), + dateone timestamp, + age bigint, + datethree timestamp, + datesix timestamp(6), + datenigth timestamp(9), + dtdate date, + dttime time, + `row` ROW(id int,name varchar), + plateText ARRAY, + pt MAP +) WITH ( + 'connector' = 'stream-x', + 'number-of-rows' = '10', -- 输入条数,默认无限 + 'rows-per-second' = '1' -- 每秒输入条数,默认不限制 + ); + +CREATE TABLE es7_sink +( + id INT, + name STRING, + money DECIMAL(32, 2), + dateone timestamp, + age bigint, + datethree timestamp, + datesix timestamp(6), + datenigth timestamp(9), + dtdate date, + dttime time, + `row` ROW(id int,name varchar), + plateText ARRAY, + pt MAP +) + WITH ( + 'connector' = 'elasticsearch7-x', + 'hosts' = '127.0.0.1:9200', + 'username' = 'elastic', + 'password' = 'elastic', + 'index' = 'students_4', + 'client.connect-timeout' = '10000', + 'security.ssl-keystore-file'='ca.crt', + 'security.ssl-keystore-password'='', + 'security.ssl-type'='ca' + ); + + +insert into es7_sink +select * from source; diff --git a/chunjun-examples/sql/elasticsearch7/source_row_array_map_data_type.sql.sql b/chunjun-examples/sql/elasticsearch7/source_row_array_map_data_type.sql.sql new file mode 100644 index 0000000000..70a5b6c492 --- /dev/null +++ b/chunjun-examples/sql/elasticsearch7/source_row_array_map_data_type.sql.sql @@ -0,0 +1,53 @@ + +CREATE TABLE source +( + id INT, + name STRING, + money DECIMAL(32, 2), + dateone timestamp, + age bigint, + datethree timestamp, + datesix timestamp(6), + datenigth timestamp(9), + dtdate date, + dttime time, + `row` ROW(id int,name varchar), + plateText ARRAY, + pt MAP +) + WITH ( + 'connector' = 'elasticsearch7-x', + 'hosts' = '127.0.0.1:9200', + 'username' = 'elastic', + 'password' = 'elastic', + 'index' = 'students_4', + 'client.connect-timeout' = '10000', + 'security.ssl-keystore-file'='ca.crt', + 'security.ssl-keystore-password'='', + 'security.ssl-type'='ca' + ); + +CREATE TABLE sink +( + id INT, + name STRING, + money DECIMAL(32, 2), + dateone timestamp, + age bigint, + datethree timestamp, + datesix timestamp(6), + datenigth timestamp(9), + dtdate date, + dttime time, + `row` ROW(id int,name varchar), + plateText ARRAY, + pt MAP +) WITH ( + 'connector' = 'stream-x', + 'number-of-rows' = '10', -- 输入条数,默认无限 + 'rows-per-second' = '1' -- 每秒输入条数,默认不限制 + ); + + +insert into sink +select * from source; diff --git a/chunjun-examples/sql/postgresql/row_array_map_data_type.sql b/chunjun-examples/sql/postgresql/row_array_map_data_type.sql new file mode 100644 index 0000000000..096a237913 --- /dev/null +++ b/chunjun-examples/sql/postgresql/row_array_map_data_type.sql @@ -0,0 +1,29 @@ + +CREATE TABLE test +( + id bigint, + deployment_pictures ARRAY, + deployment_images MAP, + deployment_contacts ROW(Name varchar,Contact varchar), + origin_id varchar +) WITH ( + 'connector' = 'postgresql-x', + 'url' = 'jdbc:postgresql://localhost:5432/visual_test1', + 'table-name' = 'test', + 'username' = 'pg', + 'password' = 'pg' + ); + +CREATE TABLE sink( + id bigint, + deployment_pictures ARRAY, + deployment_images MAP, + deployment_contacts ROW(Name varchar,Contact varchar), + origin_id varchar +)WITH( + 'connector' = 'stream-x', + 'print' = 'true' + ); + +insert into sink +select * from test; diff --git a/chunjun-examples/sql/stream/row_array_map_data_type.sql b/chunjun-examples/sql/stream/row_array_map_data_type.sql new file mode 100644 index 0000000000..b640a15115 --- /dev/null +++ b/chunjun-examples/sql/stream/row_array_map_data_type.sql @@ -0,0 +1,44 @@ +CREATE TABLE source +( + id INT, + name STRING, + money DECIMAL(32, 2), + dateone timestamp, + age bigint, + datethree timestamp, + datesix timestamp(6), + datenigth timestamp(9), + dtdate date, + dttime time, + `row` ROW(id int,name varchar), + plateText ARRAY, + pt MAP +) WITH ( + 'connector' = 'stream-x', + 'number-of-rows' = '10', -- 输入条数,默认无限 + 'rows-per-second' = '1' -- 每秒输入条数,默认不限制 + ); + +CREATE TABLE sink +( + id INT, + name STRING, + money DECIMAL(32, 2), + dateone timestamp, + age bigint, + datethree timestamp, + datesix timestamp(6), + datenigth timestamp(9), + dtdate date, + dttime time, + `row` ROW(id int,name varchar), + plateText ARRAY, + pt MAP +) WITH ( + 'connector' = 'stream-x', + 'print' = 'true' + ); + +insert into sink +select * +from source;