diff --git a/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/inputformat/BinlogInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/inputformat/BinlogInputFormatBuilder.java index f856a27033..f6bd8ac262 100644 --- a/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/inputformat/BinlogInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/inputformat/BinlogInputFormatBuilder.java @@ -48,12 +48,10 @@ * * @author dujie */ -public class BinlogInputFormatBuilder extends BaseRichInputFormatBuilder { - - protected BinlogInputFormat format; +public class BinlogInputFormatBuilder extends BaseRichInputFormatBuilder { public BinlogInputFormatBuilder() { - super.format = this.format = new BinlogInputFormat(); + super(new BinlogInputFormat()); } public void setBinlogConf(BinlogConf binlogConf) { diff --git a/flinkx-connectors/flinkx-connector-elasticsearch7/src/main/java/com/dtstack/flinkx/connector/elasticsearch7/source/ElasticsearchInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-elasticsearch7/src/main/java/com/dtstack/flinkx/connector/elasticsearch7/source/ElasticsearchInputFormatBuilder.java index 5f15f890a7..434af40ef4 100644 --- a/flinkx-connectors/flinkx-connector-elasticsearch7/src/main/java/com/dtstack/flinkx/connector/elasticsearch7/source/ElasticsearchInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-elasticsearch7/src/main/java/com/dtstack/flinkx/connector/elasticsearch7/source/ElasticsearchInputFormatBuilder.java @@ -29,12 +29,11 @@ * @author: lany * @create: 2021/06/27 17:27 */ -public class ElasticsearchInputFormatBuilder extends BaseRichInputFormatBuilder { - - protected ElasticsearchInputFormat format; +public class ElasticsearchInputFormatBuilder + extends BaseRichInputFormatBuilder { public ElasticsearchInputFormatBuilder() { - super.format = this.format = new ElasticsearchInputFormat(); + super(new ElasticsearchInputFormat()); } public void setEsConf(ElasticsearchConf esConf) { diff --git a/flinkx-connectors/flinkx-connector-emqx/src/main/java/com/dtstack/flinkx/connector/emqx/source/EmqxInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-emqx/src/main/java/com/dtstack/flinkx/connector/emqx/source/EmqxInputFormatBuilder.java index 5e1087dcc8..9e0b42c713 100644 --- a/flinkx-connectors/flinkx-connector-emqx/src/main/java/com/dtstack/flinkx/connector/emqx/source/EmqxInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-emqx/src/main/java/com/dtstack/flinkx/connector/emqx/source/EmqxInputFormatBuilder.java @@ -28,12 +28,10 @@ * @create 2021-06-02 10:16 * @description */ -public class EmqxInputFormatBuilder extends BaseRichInputFormatBuilder { - - protected EmqxInputFormat format; +public class EmqxInputFormatBuilder extends BaseRichInputFormatBuilder { public EmqxInputFormatBuilder() { - super.format = this.format = new EmqxInputFormat(); + super(new EmqxInputFormat()); } public void setEmqxConf(EmqxConf emqxConf) { diff --git a/flinkx-connectors/flinkx-connector-file/src/main/java/com/dtstack/flinkx/connector/file/source/FileInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-file/src/main/java/com/dtstack/flinkx/connector/file/source/FileInputFormatBuilder.java index 643578b6d2..663a8c957b 100644 --- a/flinkx-connectors/flinkx-connector-file/src/main/java/com/dtstack/flinkx/connector/file/source/FileInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-file/src/main/java/com/dtstack/flinkx/connector/file/source/FileInputFormatBuilder.java @@ -28,13 +28,10 @@ * @author: xiuzhu * @create: 2021/06/24 */ -public class FileInputFormatBuilder extends BaseRichInputFormatBuilder { - - private FileInputFormat format; +public class FileInputFormatBuilder extends BaseRichInputFormatBuilder { public FileInputFormatBuilder() { - format = new FileInputFormat(); - super.format = format; + super(new FileInputFormat()); } public void setFileConf(BaseFileConf fileConf) { diff --git a/flinkx-connectors/flinkx-connector-ftp/src/main/java/com/dtstack/flinkx/connector/ftp/source/FtpInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-ftp/src/main/java/com/dtstack/flinkx/connector/ftp/source/FtpInputFormatBuilder.java index f6cd7ef8ef..a1cca8962e 100644 --- a/flinkx-connectors/flinkx-connector-ftp/src/main/java/com/dtstack/flinkx/connector/ftp/source/FtpInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-ftp/src/main/java/com/dtstack/flinkx/connector/ftp/source/FtpInputFormatBuilder.java @@ -25,13 +25,10 @@ import org.apache.commons.lang.StringUtils; /** @author jiangbo */ -public class FtpInputFormatBuilder extends BaseRichInputFormatBuilder { - - private FtpInputFormat format; +public class FtpInputFormatBuilder extends BaseRichInputFormatBuilder { public FtpInputFormatBuilder() { - this.format = new FtpInputFormat(); - super.format = format; + super(new FtpInputFormat()); } public void setFtpConfig(FtpConfig ftpConfig) { diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/source/HBaseInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/source/HBaseInputFormatBuilder.java index cba4b4f6ad..0d1fc8273d 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/source/HBaseInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/source/HBaseInputFormatBuilder.java @@ -36,12 +36,10 @@ * * @author huyifan.zju@163.com */ -public class HBaseInputFormatBuilder extends BaseRichInputFormatBuilder { - - private final HBaseInputFormat format; +public class HBaseInputFormatBuilder extends BaseRichInputFormatBuilder { public HBaseInputFormatBuilder() { - super.format = format = new HBaseInputFormat(); + super(new HBaseInputFormat()); } public void setHbaseConfig(Map hbaseConfig) { diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsDynamicTableSource.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsDynamicTableSource.java index d450950d63..50b1f38237 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsDynamicTableSource.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsDynamicTableSource.java @@ -68,7 +68,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon columnList.add(field); } hdfsConf.setColumn(columnList); - HdfsInputFormatBuilder builder = new HdfsInputFormatBuilder(hdfsConf.getFileType()); + HdfsInputFormatBuilder builder = HdfsInputFormatBuilder.newBuild(hdfsConf.getFileType()); builder.setHdfsConf(hdfsConf); AbstractRowConverter rowConverter; switch (FileType.getByName(hdfsConf.getFileType())) { diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsInputFormatBuilder.java index 8feebe07c8..987560b4e6 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsInputFormatBuilder.java @@ -26,11 +26,10 @@ * * @author tudou */ -public class HdfsInputFormatBuilder extends BaseRichInputFormatBuilder { +public class HdfsInputFormatBuilder extends BaseRichInputFormatBuilder { - private final BaseHdfsInputFormat format; - - public HdfsInputFormatBuilder(String type) { + public static HdfsInputFormatBuilder newBuild(String type) { + BaseHdfsInputFormat format; switch (FileType.getByName(type)) { case ORC: format = new HdfsOrcInputFormat(); @@ -41,7 +40,11 @@ public HdfsInputFormatBuilder(String type) { default: format = new HdfsTextInputFormat(); } - super.format = format; + return new HdfsInputFormatBuilder(format); + } + + private HdfsInputFormatBuilder(BaseHdfsInputFormat format) { + super(format); } public void setHdfsConf(HdfsConf hdfsConf) { diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsSourceFactory.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsSourceFactory.java index ba7936359f..6da48ba076 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsSourceFactory.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/source/HdfsSourceFactory.java @@ -49,7 +49,7 @@ public HdfsSourceFactory(SyncConf config, StreamExecutionEnvironment env) { @Override public DataStream createSource() { - HdfsInputFormatBuilder builder = new HdfsInputFormatBuilder(hdfsConf.getFileType()); + HdfsInputFormatBuilder builder = HdfsInputFormatBuilder.newBuild(hdfsConf.getFileType()); builder.setHdfsConf(hdfsConf); AbstractRowConverter rowConverter = HdfsUtil.createRowConverter( diff --git a/flinkx-connectors/flinkx-connector-http/src/main/java/com/dtstack/flinkx/connector/http/inputformat/HttpInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-http/src/main/java/com/dtstack/flinkx/connector/http/inputformat/HttpInputFormatBuilder.java index a288f5f575..125fb7a028 100644 --- a/flinkx-connectors/flinkx-connector-http/src/main/java/com/dtstack/flinkx/connector/http/inputformat/HttpInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-http/src/main/java/com/dtstack/flinkx/connector/http/inputformat/HttpInputFormatBuilder.java @@ -51,11 +51,10 @@ * @author : shifang * @date : 2020/3/12 */ -public class HttpInputFormatBuilder extends BaseRichInputFormatBuilder { - protected HttpInputFormat format; +public class HttpInputFormatBuilder extends BaseRichInputFormatBuilder { public HttpInputFormatBuilder() { - super.format = format = new HttpInputFormat(); + super(new HttpInputFormat()); } public void setHttpRestConfig(HttpRestConfig httpRestConfig) { diff --git a/flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/source/InfluxdbInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/source/InfluxdbInputFormatBuilder.java index f26ed7f0a0..cd765bc078 100644 --- a/flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/source/InfluxdbInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/source/InfluxdbInputFormatBuilder.java @@ -36,10 +36,8 @@ */ public class InfluxdbInputFormatBuilder extends BaseRichInputFormatBuilder { - protected InfluxdbInputFormat format; - public InfluxdbInputFormatBuilder() { - super.format = this.format = new InfluxdbInputFormat(); + super(new InfluxdbInputFormat()); } public void setInfluxdbConfig(InfluxdbSourceConfig config) { diff --git a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/source/JdbcInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/source/JdbcInputFormatBuilder.java index 07d097da8d..6318a83be7 100644 --- a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/source/JdbcInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/source/JdbcInputFormatBuilder.java @@ -37,12 +37,10 @@ * * @author huyifan.zju@163.com */ -public class JdbcInputFormatBuilder extends BaseRichInputFormatBuilder { - - protected JdbcInputFormat format; +public class JdbcInputFormatBuilder extends BaseRichInputFormatBuilder { public JdbcInputFormatBuilder(JdbcInputFormat format) { - super.format = this.format = format; + super(format); } public void setJdbcConf(JdbcConf jdbcConf) { diff --git a/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/source/KuduInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/source/KuduInputFormatBuilder.java index 83007d96bc..daff2cc7f5 100644 --- a/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/source/KuduInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/source/KuduInputFormatBuilder.java @@ -30,12 +30,10 @@ * @author tiezhu * @since 2021/6/9 ζ˜ŸζœŸδΈ‰ */ -public class KuduInputFormatBuilder extends BaseRichInputFormatBuilder { - - private final KuduInputFormat format; +public class KuduInputFormatBuilder extends BaseRichInputFormatBuilder { public KuduInputFormatBuilder() { - super.format = format = new KuduInputFormat(); + super(new KuduInputFormat()); } public void setKuduSourceConf(KuduSourceConf conf) { diff --git a/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/source/MongodbInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/source/MongodbInputFormatBuilder.java index 3efff3226f..9c6be7ac2d 100644 --- a/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/source/MongodbInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/source/MongodbInputFormatBuilder.java @@ -32,25 +32,29 @@ * @program flinkx * @create 2021/06/24 */ -public class MongodbInputFormatBuilder extends BaseRichInputFormatBuilder { +public class MongodbInputFormatBuilder extends BaseRichInputFormatBuilder { - private MongoClientConf mongoClientConf; - - public MongodbInputFormatBuilder(MongodbDataSyncConf mongodbDataSyncConf) { - mongoClientConf = MongoClientConfFactory.createMongoClientConf(mongodbDataSyncConf); + public static MongodbInputFormatBuilder newBuild(MongodbDataSyncConf mongodbDataSyncConf) { + MongoClientConf clientConf = + MongoClientConfFactory.createMongoClientConf(mongodbDataSyncConf); Bson filter = parseFilter(mongodbDataSyncConf.getFilter()); - this.format = - new MongodbInputFormat(mongoClientConf, filter, mongodbDataSyncConf.getFetchSize()); + return newBuild(clientConf, filter, mongodbDataSyncConf.getFetchSize()); + } + + public static MongodbInputFormatBuilder newBuild( + MongoClientConf mongoClientConf, Bson filter, int fetchSize) { + MongodbInputFormat format = new MongodbInputFormat(mongoClientConf, filter, fetchSize); + return new MongodbInputFormatBuilder(format); } - public MongodbInputFormatBuilder(MongoClientConf mongoClientConf, Bson filter, int fetchSize) { - this.format = new MongodbInputFormat(mongoClientConf, filter, fetchSize); + private MongodbInputFormatBuilder(MongodbInputFormat format) { + super(format); } @Override protected void checkFormat() {} - private Bson parseFilter(String str) { + private static Bson parseFilter(String str) { if (StringUtils.isNotEmpty(str)) { return BasicDBObject.parse(str); } diff --git a/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/source/MongodbSourceFactory.java b/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/source/MongodbSourceFactory.java index 20e188088c..2ea0e0bc65 100644 --- a/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/source/MongodbSourceFactory.java +++ b/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/source/MongodbSourceFactory.java @@ -60,7 +60,7 @@ public RawTypeConverter getRawTypeConverter() { @Override public DataStream createSource() { - MongodbInputFormatBuilder builder = new MongodbInputFormatBuilder(mongodbDataSyncConf); + MongodbInputFormatBuilder builder = MongodbInputFormatBuilder.newBuild(mongodbDataSyncConf); MongoConverterFactory mongoConverterFactory = new MongoConverterFactory(mongodbDataSyncConf); AbstractRowConverter converter; diff --git a/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/inputformat/OracleLogMinerInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/inputformat/OracleLogMinerInputFormatBuilder.java index 001c6d6bff..787c6e4b87 100644 --- a/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/inputformat/OracleLogMinerInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/inputformat/OracleLogMinerInputFormatBuilder.java @@ -50,12 +50,11 @@ * @author jiangbo * @date 2019/12/16 */ -public class OracleLogMinerInputFormatBuilder extends BaseRichInputFormatBuilder { - - private final OracleLogMinerInputFormat format; +public class OracleLogMinerInputFormatBuilder + extends BaseRichInputFormatBuilder { public OracleLogMinerInputFormatBuilder() { - super.format = format = new OracleLogMinerInputFormat(); + super(new OracleLogMinerInputFormat()); } public void setLogMinerConfig(LogMinerConf logMinerConf) { diff --git a/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/inputformat/PGWalInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/inputformat/PGWalInputFormatBuilder.java index d5f6d078de..c894565a6a 100644 --- a/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/inputformat/PGWalInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/inputformat/PGWalInputFormatBuilder.java @@ -37,7 +37,7 @@ public class PGWalInputFormatBuilder extends BaseRichInputFormatBuilder { public PGWalInputFormatBuilder() { - format = new PGWalInputFormat(); + super(new PGWalInputFormat()); } public void setConf(PGWalConf conf) { diff --git a/flinkx-connectors/flinkx-connector-socket/src/main/java/com/dtstack/flinkx/connector/socket/inputformat/SocketInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-socket/src/main/java/com/dtstack/flinkx/connector/socket/inputformat/SocketInputFormatBuilder.java index c309e87dae..34e7862457 100644 --- a/flinkx-connectors/flinkx-connector-socket/src/main/java/com/dtstack/flinkx/connector/socket/inputformat/SocketInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-socket/src/main/java/com/dtstack/flinkx/connector/socket/inputformat/SocketInputFormatBuilder.java @@ -31,7 +31,7 @@ * * @author by kunni@dtstack.com */ -public class SocketInputFormatBuilder extends BaseRichInputFormatBuilder { +public class SocketInputFormatBuilder extends BaseRichInputFormatBuilder { protected SocketInputFormat format; @@ -40,7 +40,7 @@ public class SocketInputFormatBuilder extends BaseRichInputFormatBuilder { private static final int ADDRESS_SPLITS = 2; public SocketInputFormatBuilder() { - super.format = format = new SocketInputFormat(); + super(new SocketInputFormat()); } public void setSocketConfig(SocketConfig socketConfig) { diff --git a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrInputFormatBuilder.java index ffbe22dd2f..131e3ce7c5 100644 --- a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrInputFormatBuilder.java @@ -29,18 +29,20 @@ * @program flinkx * @create 2021/06/28 */ -public class SolrInputFormatBuilder extends BaseRichInputFormatBuilder { +public class SolrInputFormatBuilder extends BaseRichInputFormatBuilder { - private final SolrConf solrConf; - - public SolrInputFormatBuilder(SolrConf solrConf) { - this.solrConf = solrConf; + public static SolrInputFormatBuilder newBuild(SolrConf solrConf) { List fields = solrConf.getColumn(); String[] fieldNames = new String[fields.size()]; for (int i = 0; i < fields.size(); i++) { fieldNames[i] = fields.get(i).getName(); } - this.format = new SolrInputFormat(solrConf, fieldNames); + SolrInputFormat format = new SolrInputFormat(solrConf, fieldNames); + return new SolrInputFormatBuilder(format, solrConf); + } + + private SolrInputFormatBuilder(SolrInputFormat solrInputFormat, SolrConf solrConf) { + super(solrInputFormat); setConfig(solrConf); } diff --git a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrSourceFactory.java b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrSourceFactory.java index 7affd71345..acf8f57e12 100644 --- a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrSourceFactory.java +++ b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/source/SolrSourceFactory.java @@ -57,7 +57,7 @@ public RawTypeConverter getRawTypeConverter() { @Override public DataStream createSource() { - SolrInputFormatBuilder builder = new SolrInputFormatBuilder(solrConf); + SolrInputFormatBuilder builder = SolrInputFormatBuilder.newBuild(solrConf); SolrConverterFactory converterFactory = new SolrConverterFactory(solrConf); AbstractRowConverter converter; if (useAbstractBaseColumn) { diff --git a/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/inputFormat/SqlServerCdcInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/inputFormat/SqlServerCdcInputFormatBuilder.java index 566731ced0..d49066b246 100644 --- a/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/inputFormat/SqlServerCdcInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/inputFormat/SqlServerCdcInputFormatBuilder.java @@ -50,14 +50,15 @@ * * @author tudou */ -public class SqlServerCdcInputFormatBuilder extends BaseRichInputFormatBuilder { +public class SqlServerCdcInputFormatBuilder + extends BaseRichInputFormatBuilder { protected String tableFormat = "%s.%s"; protected SqlServerCdcInputFormat format; public SqlServerCdcInputFormatBuilder() { - super.format = this.format = new SqlServerCdcInputFormat(); + super(new SqlServerCdcInputFormat()); } public void setSqlServerCdcConf(SqlServerCdcConf sqlServerCdcConf) { diff --git a/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/source/StreamInputFormatBuilder.java b/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/source/StreamInputFormatBuilder.java index 85b484ad73..a7667959ba 100644 --- a/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/source/StreamInputFormatBuilder.java +++ b/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/source/StreamInputFormatBuilder.java @@ -28,12 +28,10 @@ * * @author jiangbo */ -public class StreamInputFormatBuilder extends BaseRichInputFormatBuilder { - - private final StreamInputFormat format; +public class StreamInputFormatBuilder extends BaseRichInputFormatBuilder { public StreamInputFormatBuilder() { - super.format = format = new StreamInputFormat(); + super(new StreamInputFormat()); } public void setStreamConf(StreamConf streamConf) { diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/source/format/BaseRichInputFormatBuilder.java b/flinkx-core/src/main/java/com/dtstack/flinkx/source/format/BaseRichInputFormatBuilder.java index 279dc00c3b..a4e958d97e 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/source/format/BaseRichInputFormatBuilder.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/source/format/BaseRichInputFormatBuilder.java @@ -38,6 +38,10 @@ public abstract class BaseRichInputFormatBuilder protected T format; + public BaseRichInputFormatBuilder(T format) { + this.format = format; + } + public void setConfig(FlinkxCommonConf config) { format.setConfig(config); }