Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,10 @@
*
* @author dujie
*/
public class BinlogInputFormatBuilder extends BaseRichInputFormatBuilder {

protected BinlogInputFormat format;
public class BinlogInputFormatBuilder extends BaseRichInputFormatBuilder<BinlogInputFormat> {

public BinlogInputFormatBuilder() {
super.format = this.format = new BinlogInputFormat();
super(new BinlogInputFormat());
}

public void setBinlogConf(BinlogConf binlogConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@
* @author: lany
* @create: 2021/06/27 17:27
*/
public class ElasticsearchInputFormatBuilder extends BaseRichInputFormatBuilder {

protected ElasticsearchInputFormat format;
public class ElasticsearchInputFormatBuilder
extends BaseRichInputFormatBuilder<ElasticsearchInputFormat> {

public ElasticsearchInputFormatBuilder() {
super.format = this.format = new ElasticsearchInputFormat();
super(new ElasticsearchInputFormat());
}

public void setEsConf(ElasticsearchConf esConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
* @create 2021-06-02 10:16
* @description
*/
public class EmqxInputFormatBuilder extends BaseRichInputFormatBuilder {

protected EmqxInputFormat format;
public class EmqxInputFormatBuilder extends BaseRichInputFormatBuilder<EmqxInputFormat> {

public EmqxInputFormatBuilder() {
super.format = this.format = new EmqxInputFormat();
super(new EmqxInputFormat());
}

public void setEmqxConf(EmqxConf emqxConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@
* @author: xiuzhu
* @create: 2021/06/24
*/
public class FileInputFormatBuilder extends BaseRichInputFormatBuilder {

private FileInputFormat format;
public class FileInputFormatBuilder extends BaseRichInputFormatBuilder<FileInputFormat> {

public FileInputFormatBuilder() {
format = new FileInputFormat();
super.format = format;
super(new FileInputFormat());
}

public void setFileConf(BaseFileConf fileConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@
import org.apache.commons.lang.StringUtils;

/** @author jiangbo */
public class FtpInputFormatBuilder extends BaseRichInputFormatBuilder {

private FtpInputFormat format;
public class FtpInputFormatBuilder extends BaseRichInputFormatBuilder<FtpInputFormat> {

public FtpInputFormatBuilder() {
this.format = new FtpInputFormat();
super.format = format;
super(new FtpInputFormat());
}

public void setFtpConfig(FtpConfig ftpConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@
*
* @author huyifan.zju@163.com
*/
public class HBaseInputFormatBuilder extends BaseRichInputFormatBuilder {

private final HBaseInputFormat format;
public class HBaseInputFormatBuilder extends BaseRichInputFormatBuilder<HBaseInputFormat> {

public HBaseInputFormatBuilder() {
super.format = format = new HBaseInputFormat();
super(new HBaseInputFormat());
}

public void setHbaseConfig(Map<String, Object> hbaseConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
columnList.add(field);
}
hdfsConf.setColumn(columnList);
HdfsInputFormatBuilder builder = new HdfsInputFormatBuilder(hdfsConf.getFileType());
HdfsInputFormatBuilder builder = HdfsInputFormatBuilder.newBuild(hdfsConf.getFileType());
builder.setHdfsConf(hdfsConf);
AbstractRowConverter rowConverter;
switch (FileType.getByName(hdfsConf.getFileType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@
*
* @author tudou
*/
public class HdfsInputFormatBuilder extends BaseRichInputFormatBuilder {
public class HdfsInputFormatBuilder extends BaseRichInputFormatBuilder<BaseHdfsInputFormat> {

private final BaseHdfsInputFormat format;

public HdfsInputFormatBuilder(String type) {
public static HdfsInputFormatBuilder newBuild(String type) {
BaseHdfsInputFormat format;
switch (FileType.getByName(type)) {
case ORC:
format = new HdfsOrcInputFormat();
Expand All @@ -41,7 +40,11 @@ public HdfsInputFormatBuilder(String type) {
default:
format = new HdfsTextInputFormat();
}
super.format = format;
return new HdfsInputFormatBuilder(format);
}

private HdfsInputFormatBuilder(BaseHdfsInputFormat format) {
super(format);
}

public void setHdfsConf(HdfsConf hdfsConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public HdfsSourceFactory(SyncConf config, StreamExecutionEnvironment env) {

@Override
public DataStream<RowData> createSource() {
HdfsInputFormatBuilder builder = new HdfsInputFormatBuilder(hdfsConf.getFileType());
HdfsInputFormatBuilder builder = HdfsInputFormatBuilder.newBuild(hdfsConf.getFileType());
builder.setHdfsConf(hdfsConf);
AbstractRowConverter rowConverter =
HdfsUtil.createRowConverter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@
* @author : shifang
* @date : 2020/3/12
*/
public class HttpInputFormatBuilder extends BaseRichInputFormatBuilder {
protected HttpInputFormat format;
public class HttpInputFormatBuilder extends BaseRichInputFormatBuilder<HttpInputFormat> {

public HttpInputFormatBuilder() {
super.format = format = new HttpInputFormat();
super(new HttpInputFormat());
}

public void setHttpRestConfig(HttpRestConfig httpRestConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@
*/
public class InfluxdbInputFormatBuilder extends BaseRichInputFormatBuilder<InfluxdbInputFormat> {

protected InfluxdbInputFormat format;

public InfluxdbInputFormatBuilder() {
super.format = this.format = new InfluxdbInputFormat();
super(new InfluxdbInputFormat());
}

public void setInfluxdbConfig(InfluxdbSourceConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@
*
* @author huyifan.zju@163.com
*/
public class JdbcInputFormatBuilder extends BaseRichInputFormatBuilder {

protected JdbcInputFormat format;
public class JdbcInputFormatBuilder extends BaseRichInputFormatBuilder<JdbcInputFormat> {

public JdbcInputFormatBuilder(JdbcInputFormat format) {
super.format = this.format = format;
super(format);
}

public void setJdbcConf(JdbcConf jdbcConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
* @author tiezhu
* @since 2021/6/9 星期三
*/
public class KuduInputFormatBuilder extends BaseRichInputFormatBuilder {

private final KuduInputFormat format;
public class KuduInputFormatBuilder extends BaseRichInputFormatBuilder<KuduInputFormat> {

public KuduInputFormatBuilder() {
super.format = format = new KuduInputFormat();
super(new KuduInputFormat());
}

public void setKuduSourceConf(KuduSourceConf conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,29 @@
* @program flinkx
* @create 2021/06/24
*/
public class MongodbInputFormatBuilder extends BaseRichInputFormatBuilder {
public class MongodbInputFormatBuilder extends BaseRichInputFormatBuilder<MongodbInputFormat> {

private MongoClientConf mongoClientConf;

public MongodbInputFormatBuilder(MongodbDataSyncConf mongodbDataSyncConf) {
mongoClientConf = MongoClientConfFactory.createMongoClientConf(mongodbDataSyncConf);
public static MongodbInputFormatBuilder newBuild(MongodbDataSyncConf mongodbDataSyncConf) {
MongoClientConf clientConf =
MongoClientConfFactory.createMongoClientConf(mongodbDataSyncConf);
Bson filter = parseFilter(mongodbDataSyncConf.getFilter());
this.format =
new MongodbInputFormat(mongoClientConf, filter, mongodbDataSyncConf.getFetchSize());
return newBuild(clientConf, filter, mongodbDataSyncConf.getFetchSize());
}

public static MongodbInputFormatBuilder newBuild(
MongoClientConf mongoClientConf, Bson filter, int fetchSize) {
MongodbInputFormat format = new MongodbInputFormat(mongoClientConf, filter, fetchSize);
return new MongodbInputFormatBuilder(format);
}

public MongodbInputFormatBuilder(MongoClientConf mongoClientConf, Bson filter, int fetchSize) {
this.format = new MongodbInputFormat(mongoClientConf, filter, fetchSize);
private MongodbInputFormatBuilder(MongodbInputFormat format) {
super(format);
}

@Override
protected void checkFormat() {}

private Bson parseFilter(String str) {
private static Bson parseFilter(String str) {
if (StringUtils.isNotEmpty(str)) {
return BasicDBObject.parse(str);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public RawTypeConverter getRawTypeConverter() {

@Override
public DataStream<RowData> createSource() {
MongodbInputFormatBuilder builder = new MongodbInputFormatBuilder(mongodbDataSyncConf);
MongodbInputFormatBuilder builder = MongodbInputFormatBuilder.newBuild(mongodbDataSyncConf);
MongoConverterFactory mongoConverterFactory =
new MongoConverterFactory(mongodbDataSyncConf);
AbstractRowConverter converter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@
* @author jiangbo
* @date 2019/12/16
*/
public class OracleLogMinerInputFormatBuilder extends BaseRichInputFormatBuilder {

private final OracleLogMinerInputFormat format;
public class OracleLogMinerInputFormatBuilder
extends BaseRichInputFormatBuilder<OracleLogMinerInputFormat> {

public OracleLogMinerInputFormatBuilder() {
super.format = format = new OracleLogMinerInputFormat();
super(new OracleLogMinerInputFormat());
}

public void setLogMinerConfig(LogMinerConf logMinerConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
public class PGWalInputFormatBuilder extends BaseRichInputFormatBuilder<PGWalInputFormat> {

public PGWalInputFormatBuilder() {
format = new PGWalInputFormat();
super(new PGWalInputFormat());
}

public void setConf(PGWalConf conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*
* @author by kunni@dtstack.com
*/
public class SocketInputFormatBuilder extends BaseRichInputFormatBuilder {
public class SocketInputFormatBuilder extends BaseRichInputFormatBuilder<SocketInputFormat> {

protected SocketInputFormat format;

Expand All @@ -40,7 +40,7 @@ public class SocketInputFormatBuilder extends BaseRichInputFormatBuilder {
private static final int ADDRESS_SPLITS = 2;

public SocketInputFormatBuilder() {
super.format = format = new SocketInputFormat();
super(new SocketInputFormat());
}

public void setSocketConfig(SocketConfig socketConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,20 @@
* @program flinkx
* @create 2021/06/28
*/
public class SolrInputFormatBuilder extends BaseRichInputFormatBuilder {
public class SolrInputFormatBuilder extends BaseRichInputFormatBuilder<SolrInputFormat> {

private final SolrConf solrConf;

public SolrInputFormatBuilder(SolrConf solrConf) {
this.solrConf = solrConf;
public static SolrInputFormatBuilder newBuild(SolrConf solrConf) {
List<FieldConf> fields = solrConf.getColumn();
String[] fieldNames = new String[fields.size()];
for (int i = 0; i < fields.size(); i++) {
fieldNames[i] = fields.get(i).getName();
}
this.format = new SolrInputFormat(solrConf, fieldNames);
SolrInputFormat format = new SolrInputFormat(solrConf, fieldNames);
return new SolrInputFormatBuilder(format, solrConf);
}

private SolrInputFormatBuilder(SolrInputFormat solrInputFormat, SolrConf solrConf) {
super(solrInputFormat);
setConfig(solrConf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public RawTypeConverter getRawTypeConverter() {

@Override
public DataStream<RowData> createSource() {
SolrInputFormatBuilder builder = new SolrInputFormatBuilder(solrConf);
SolrInputFormatBuilder builder = SolrInputFormatBuilder.newBuild(solrConf);
SolrConverterFactory converterFactory = new SolrConverterFactory(solrConf);
AbstractRowConverter converter;
if (useAbstractBaseColumn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@
*
* @author tudou
*/
public class SqlServerCdcInputFormatBuilder extends BaseRichInputFormatBuilder {
public class SqlServerCdcInputFormatBuilder
extends BaseRichInputFormatBuilder<SqlServerCdcInputFormat> {

protected String tableFormat = "%s.%s";

protected SqlServerCdcInputFormat format;

public SqlServerCdcInputFormatBuilder() {
super.format = this.format = new SqlServerCdcInputFormat();
super(new SqlServerCdcInputFormat());
}

public void setSqlServerCdcConf(SqlServerCdcConf sqlServerCdcConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
*
* @author jiangbo
*/
public class StreamInputFormatBuilder extends BaseRichInputFormatBuilder {

private final StreamInputFormat format;
public class StreamInputFormatBuilder extends BaseRichInputFormatBuilder<StreamInputFormat> {

public StreamInputFormatBuilder() {
super.format = format = new StreamInputFormat();
super(new StreamInputFormat());
}

public void setStreamConf(StreamConf streamConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public abstract class BaseRichInputFormatBuilder<T extends BaseRichInputFormat>

protected T format;

public BaseRichInputFormatBuilder(T format) {
this.format = format;
}

public void setConfig(FlinkxCommonConf config) {
format.setConfig(config);
}
Expand Down