diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/pom.xml b/chunjun-connectors/chunjun-connector-jdbc-base/pom.xml index 63be088830..ba09aa5297 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/pom.xml +++ b/chunjun-connectors/chunjun-connector-jdbc-base/pom.xml @@ -15,7 +15,7 @@ 3.9.7 - 1.2.6 + 1.2.11 @@ -43,6 +43,16 @@ com.alibaba druid ${druid.version} + + + jconsole + com.sun + + + tools + com.sun + + diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/CdcConf.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/CdcConf.java new file mode 100644 index 0000000000..c8d939ca32 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/CdcConf.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.jdbc.conf; + +import com.dtstack.chunjun.conf.ChunJunCommonConf; + +import java.io.Serializable; +import java.util.List; + +public class CdcConf extends ChunJunCommonConf implements Serializable { + private static final long serialVersionUID = 1L; + + protected String host; + protected int port; + protected List databaseList; + protected List tableList; + + protected String username; + + protected String password; + + protected int serverId; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public List getDatabaseList() { + return databaseList; + } + + public void setDatabaseList(List databaseList) { + this.databaseList = databaseList; + } + + public List getTableList() { + return tableList; + } + + public void setTableList(List tableList) { + this.tableList = tableList; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getServerId() { + return serverId; + } + + public void setServerId(int serverId) { + this.serverId = serverId; + } +} diff --git a/chunjun-connectors/chunjun-connector-kafka/src/test/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactoryTest.java b/chunjun-connectors/chunjun-connector-kafka/src/test/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactoryTest.java index b7f0b22ae7..3e6260a720 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/test/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactoryTest.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/test/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactoryTest.java @@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.data.RowData; -import com.sun.tools.javac.util.Assert; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -107,6 +107,6 @@ public void KafkaSourceFactoryTest() { DataStream source = new KafkaSourceFactory(config, env).createSource(); factory = new KafkaSinkFactory(config); - Assert.checkNonNull(factory.createSink(source)); + Assert.assertNotNull(factory.createSink(source)); } } diff --git a/chunjun-connectors/chunjun-connector-kafka/src/test/java/com/dtstack/chunjun/connector/kafka/source/KafkaSourceFactoryTest.java b/chunjun-connectors/chunjun-connector-kafka/src/test/java/com/dtstack/chunjun/connector/kafka/source/KafkaSourceFactoryTest.java index 5479b02ba2..6c6a3077cb 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/test/java/com/dtstack/chunjun/connector/kafka/source/KafkaSourceFactoryTest.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/test/java/com/dtstack/chunjun/connector/kafka/source/KafkaSourceFactoryTest.java @@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration; -import com.sun.tools.javac.util.Assert; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -92,11 +92,11 @@ public void KafkaSourceFactoryTest() { Main.parseConf( String.format(job, " \"mode\": \"earliest-offset\""), new Options()); factory = new KafkaSourceFactory(config, env); - Assert.checkNonNull(factory.createSource()); + Assert.assertNotNull(factory.createSource()); config = Main.parseConf(String.format(job, " \"mode\": \"latest-offset\""), new Options()); factory = new KafkaSourceFactory(config, env); - Assert.checkNonNull(factory.createSource()); + Assert.assertNotNull(factory.createSource()); config = Main.parseConf( @@ -106,7 +106,7 @@ public void KafkaSourceFactoryTest() { + System.currentTimeMillis()), new Options()); factory = new KafkaSourceFactory(config, env); - Assert.checkNonNull(factory.createSource()); + Assert.assertNotNull(factory.createSource()); config = Main.parseConf( @@ -115,6 +115,6 @@ public void KafkaSourceFactoryTest() { " \"mode\": \"specific-offsets\",\"offset\":\"partition:0,offset:42;partition:1,offset:300\""), new Options()); factory = new KafkaSourceFactory(config, env); - Assert.checkNonNull(factory.createSource()); + Assert.assertNotNull(factory.createSource()); } } diff --git a/chunjun-connectors/chunjun-connector-mysqlcdc/pom.xml b/chunjun-connectors/chunjun-connector-mysqlcdc/pom.xml new file mode 100644 index 0000000000..f9c5a90d5f --- /dev/null +++ b/chunjun-connectors/chunjun-connector-mysqlcdc/pom.xml @@ -0,0 +1,110 @@ + + + + chunjun-connectors + com.dtstack.chunjun + 1.12-SNAPSHOT + + 4.0.0 + + chunjun-connector-mysqlcdc + ChunJun : Connectors : MySQLCDC + + + 8 + 8 + + + + + com.dtstack.chunjun + chunjun-connector-jdbc-base + ${project.version} + + + mysql + mysql-connector-java + 5.1.46 + + + com.alibaba.ververica + flink-connector-mysql-cdc + ${flinkcdc.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + false + + + org.slf4j:slf4j-api + log4j:log4j + ch.qos.logback:* + + + + + com.google.common + shade.core.com.google.common + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + maven-antrun-plugin + + + copy-resources + + package + + run + + + + + + + + + + + + + + + + + + diff --git a/chunjun-connectors/chunjun-connector-mysqlcdc/src/main/java/com/dtstack/chunjun/connector/mysqlcdc/converter/MysqlCdcRawTypeConverter.java b/chunjun-connectors/chunjun-connector-mysqlcdc/src/main/java/com/dtstack/chunjun/connector/mysqlcdc/converter/MysqlCdcRawTypeConverter.java new file mode 100644 index 0000000000..f3f2ace04f --- /dev/null +++ b/chunjun-connectors/chunjun-connector-mysqlcdc/src/main/java/com/dtstack/chunjun/connector/mysqlcdc/converter/MysqlCdcRawTypeConverter.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.mysqlcdc.converter; + +import com.dtstack.chunjun.throwable.UnsupportedTypeException; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +import java.util.Locale; + +/** + * @program: ChunJun + * @author: wuren + * @create: 2021/04/14 + */ +public class MysqlCdcRawTypeConverter { + + /** + * 将MySQL数据库中的类型,转换成flink的DataType类型。 转换关系参考 com.mysql.jdbc.MysqlDefs 类里面的信息。 + * com.mysql.jdbc.ResultSetImpl.getObject(int) + */ + public static DataType apply(String type) { + switch (type.toUpperCase(Locale.ENGLISH)) { + case "BOOLEAN": + case "BIT": + return DataTypes.BOOLEAN(); + case "TINYINT": + return DataTypes.TINYINT(); + case "TINYINT UNSIGNED": + case "SMALLINT": + return DataTypes.SMALLINT(); + case "SMALLINT UNSIGNED": + case "MEDIUMINT": + case "MEDIUMINT UNSIGNED": + case "INT": + case "INTEGER": + case "INT24": + return DataTypes.INT(); + case "INT UNSIGNED": + case "BIGINT": + return DataTypes.BIGINT(); + case "REAL": + case "FLOAT": + case "FLOAT UNSIGNED": + return DataTypes.FLOAT(); + case "BIGINT UNSIGNED": + case "DECIMAL": + case "DECIMAL128": + case "DECIMAL UNSIGNED": + case "NUMERIC": + return DataTypes.DECIMAL(38, 18); + case "DOUBLE": + case "DOUBLE UNSIGNED": + return DataTypes.DOUBLE(); + case "CHAR": + case "VARCHAR": + case "STRING": + return DataTypes.STRING(); + case "DATE": + return DataTypes.DATE(); + case "TIME": + return DataTypes.TIME(); + case "YEAR": + return DataTypes.INTERVAL(DataTypes.YEAR()); + case "TIMESTAMP": + case "DATETIME": + return DataTypes.TIMESTAMP(0); + case "TINYBLOB": + case "BLOB": + case "MEDIUMBLOB": + case "LONGBLOB": + return DataTypes.BYTES(); + case "TINYTEXT": + case "TEXT": + case "MEDIUMTEXT": + case "LONGTEXT": + return DataTypes.STRING(); + case "BINARY": + case "VARBINARY": + // BYTES 底层调用的是VARBINARY最大长度 + return DataTypes.BYTES(); + case "JSON": + return DataTypes.STRING(); + case "ENUM": + case "SET": + return DataTypes.STRING(); + case "GEOMETRY": + return DataTypes.BYTES(); + + default: + throw new UnsupportedTypeException(type); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-mysqlcdc/src/main/java/com/dtstack/chunjun/connector/mysqlcdc/source/MysqlcdcSourceFactory.java b/chunjun-connectors/chunjun-connector-mysqlcdc/src/main/java/com/dtstack/chunjun/connector/mysqlcdc/source/MysqlcdcSourceFactory.java new file mode 100644 index 0000000000..c2589e027d --- /dev/null +++ b/chunjun-connectors/chunjun-connector-mysqlcdc/src/main/java/com/dtstack/chunjun/connector/mysqlcdc/source/MysqlcdcSourceFactory.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.mysqlcdc.source; + +import com.dtstack.chunjun.conf.SyncConf; +import com.dtstack.chunjun.connector.jdbc.adapter.ConnectionAdapter; +import com.dtstack.chunjun.connector.jdbc.conf.CdcConf; +import com.dtstack.chunjun.connector.jdbc.conf.ConnectionConf; +import com.dtstack.chunjun.connector.jdbc.exclusion.FieldNameExclusionStrategy; +import com.dtstack.chunjun.connector.mysqlcdc.converter.MysqlCdcRawTypeConverter; +import com.dtstack.chunjun.converter.RawTypeConverter; +import com.dtstack.chunjun.source.SourceFactory; +import com.dtstack.chunjun.util.GsonUtil; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; +import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; +import com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; + +public class MysqlcdcSourceFactory extends SourceFactory { + + protected CdcConf cdcConf; + + public MysqlcdcSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) { + super(syncConf, env); + Gson gson = + new GsonBuilder() + .registerTypeAdapter( + ConnectionConf.class, new ConnectionAdapter("SourceConnectionConf")) + .addDeserializationExclusionStrategy( + new FieldNameExclusionStrategy("column")) + .create(); + GsonUtil.setTypeAdapter(gson); + cdcConf = gson.fromJson(gson.toJson(syncConf.getReader().getParameter()), getConfClass()); + } + + @Override + public RawTypeConverter getRawTypeConverter() { + return MysqlCdcRawTypeConverter::apply; + } + + @Override + public DataStream createSource() { + + List dataTypes = + new ArrayList<>(syncConf.getReader().getFieldList().size()); + + syncConf.getReader() + .getFieldList() + .forEach( + fieldConf -> { + dataTypes.add( + DataTypes.FIELD( + fieldConf.getName(), + getRawTypeConverter().apply(fieldConf.getType()))); + }); + final DataType dataType = DataTypes.ROW(dataTypes.toArray(new DataTypes.Field[0])); + + DebeziumSourceFunction mySqlSource = + new MySQLSource.Builder() + .hostname(cdcConf.getHost()) + .port(cdcConf.getPort()) + .databaseList(cdcConf.getDatabaseList().toArray(new String[0])) + .tableList( + cdcConf.getTableList() + .toArray(new String[cdcConf.getDatabaseList().size()])) + .username(cdcConf.getUsername()) + .password(cdcConf.getPassword()) + .serverId(cdcConf.getServerId()) + .deserializer(buildRowDataDebeziumDeserializeSchema(dataType)) + .build(); + + return env.addSource(mySqlSource, "MysqlCdcSource", getTypeInformation()); + } + + private DebeziumDeserializationSchema buildRowDataDebeziumDeserializeSchema( + DataType dataType) { + return new RowDataDebeziumDeserializeSchema( + (RowType) dataType.getLogicalType(), + getTypeInformation(), + new DemoValueValidator(), + ZoneOffset.UTC); + } + + protected Class getConfClass() { + return CdcConf.class; + } + + public static final class DemoValueValidator + implements RowDataDebeziumDeserializeSchema.ValueValidator { + + @Override + public void validate(RowData rowData, RowKind rowKind) { + // do nothing + } + } +} diff --git a/chunjun-connectors/pom.xml b/chunjun-connectors/pom.xml index 9c70ba019e..09fa3722d7 100644 --- a/chunjun-connectors/pom.xml +++ b/chunjun-connectors/pom.xml @@ -74,6 +74,7 @@ chunjun-connector-oraclelogminer chunjun-connector-sqlservercdc chunjun-connector-pgwal + chunjun-connector-mysqlcdc chunjun-connector-http diff --git a/chunjun-core/src/test/java/com/dtstack/chunjun/conf/SyncConfTest.java b/chunjun-core/src/test/java/com/dtstack/chunjun/conf/SyncConfTest.java index 9ee91cf66d..77e216f235 100644 --- a/chunjun-core/src/test/java/com/dtstack/chunjun/conf/SyncConfTest.java +++ b/chunjun-core/src/test/java/com/dtstack/chunjun/conf/SyncConfTest.java @@ -18,10 +18,11 @@ package com.dtstack.chunjun.conf; -import com.sun.tools.javac.util.List; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import java.util.Arrays; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -35,7 +36,7 @@ public void toStringWhenAllFieldsAreNotNull() { syncConf.setPluginRoot("pluginRoot"); syncConf.setRemotePluginPath("remotePluginPath"); syncConf.setSavePointPath("savePointPath"); - syncConf.setSyncJarList(List.of("syncJarList")); + syncConf.setSyncJarList(Arrays.asList("syncJarList")); String expected = "SyncConf{job=null, pluginRoot='pluginRoot', remotePluginPath='remotePluginPath', savePointPath='savePointPath', syncJarList=syncJarList}"; diff --git a/chunjun-examples/json/mysqlcdc/mysql_mysql_cdc_transform.json b/chunjun-examples/json/mysqlcdc/mysql_mysql_cdc_transform.json new file mode 100644 index 0000000000..bcc173ff98 --- /dev/null +++ b/chunjun-examples/json/mysqlcdc/mysql_mysql_cdc_transform.json @@ -0,0 +1,80 @@ +{ + "job": { + "content": [ + { + "reader": { + "parameter": { + "host": "localhost", + "port": 3306, + "serverId": 1, + "databaseList": [ + "anan" + ], + "tableList": [ + "anan.a2" + ], + "username": "root", + "password": "root", + "column": [ + { + "name": "aa", + "type": "string" + }, + { + "name": "bb", + "type": "int" + } + ], + "writeMode": "insert" + }, + "table": { + "tableName": "a2" + }, + "name": "mysqlcdcreader" + }, + "writer": { + "parameter": { + "username": "root", + "password": "root", + "connection": [ + { + "jdbcUrl": "jdbc:mysql://localhost:3306/anan?useSSL=false", + "table": [ + "a3" + ] + } + ], + "column": [ + { + "name": "aa", + "type": "string" + }, + { + "name": "bb", + "type": "int" + } + ] + }, + "table": { + "tableName": "a3" + }, + "name": "mysqlwriter" + }, + "transformer": { + "transformSql": "select aa,bb from a2" + } + } + ], + "setting": { + "errorLimit": { + "record": 100 + }, + "speed": { + "bytes": 0, + "channel": 1, + "readerChannel": 1, + "writerChannel": 1 + } + } + } +} diff --git a/chunjun-local-test/pom.xml b/chunjun-local-test/pom.xml index 973f490945..b1fb1d1eca 100644 --- a/chunjun-local-test/pom.xml +++ b/chunjun-local-test/pom.xml @@ -156,6 +156,11 @@ chunjun-connector-mysql ${project.version} + + com.dtstack.chunjun + chunjun-connector-mysqlcdc + ${project.version} + com.dtstack.chunjun chunjun-connector-mysqld @@ -355,5 +360,10 @@ chunjun-ddl-mysql ${project.version} + + mysql + mysql-connector-java + 8.0.30 + diff --git a/pom.xml b/pom.xml index 44f4bacf91..1eef88dc65 100644 --- a/pom.xml +++ b/pom.xml @@ -21,13 +21,14 @@ chunjun-restore chunjun-sql chunjun-assembly - + UTF-8 1.8 1.12.7 + 1.3.0 2.12 0.8.1 3.1.4