diff --git a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java index 62045c9e0b..56b2e64ca2 100644 --- a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java +++ b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/dialect/GreenplumDialect.java @@ -24,11 +24,6 @@ import java.util.Optional; -/** - * company www.dtstack.com - * - * @author jier - */ public class GreenplumDialect extends PostgresqlDialect { private static final String DIALECT_NAME = "Greenplum"; diff --git a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumOutputFormat.java b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumOutputFormat.java index 9c19d5cdfc..8769e69c20 100644 --- a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumOutputFormat.java @@ -41,10 +41,6 @@ import java.sql.Connection; import java.sql.SQLException; -/** - * @program: flinkx - * @author: jier - */ public class GreenplumOutputFormat extends JdbcOutputFormat { // pg 字符串里含有\u0000 会报错 ERROR: invalid byte sequence for encoding "UTF8": 0x00 diff --git a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java index fa38a5d998..a27638644f 100644 --- a/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-greenplum/src/main/java/com/dtstack/chunjun/connector/greenplum/sink/GreenplumSinkFactory.java @@ -28,11 +28,6 @@ import static com.dtstack.chunjun.connector.greenplum.sink.GreenplumOutputFormat.INSERT_SQL_MODE_TYPE; -/** - * company www.dtstack.com - * - * @author jier - */ public class GreenplumSinkFactory extends JdbcSinkFactory { public GreenplumSinkFactory(SyncConf syncConf) { diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/Main.java b/chunjun-core/src/main/java/com/dtstack/chunjun/Main.java index 34963b12fc..5ecf35718d 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/Main.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/Main.java @@ -52,6 +52,7 @@ import com.dtstack.chunjun.util.TableUtil; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; @@ -143,6 +144,8 @@ private static void exeSqlJob( try { configStreamExecutionEnvironment(env, options, null); List jarUrlList = ExecuteProcessHelper.getExternalJarUrls(options.getAddjar()); + String runMode = options.getRunMode(); + if ("batch".equalsIgnoreCase(runMode)) env.setRuntimeMode(RuntimeExecutionMode.BATCH); StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv); TableResult execute = statementSet.execute(); if (env instanceof MyLocalStreamEnvironment) { diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/options/Options.java b/chunjun-core/src/main/java/com/dtstack/chunjun/options/Options.java index de82f1bc86..ef27544de5 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/options/Options.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/options/Options.java @@ -88,6 +88,9 @@ public class Options { @OptionRequired(description = "file add to ship file") private String addShipfile; + @OptionRequired(description = "flink run mode") + private String runMode; + private Configuration flinkConfiguration = null; public Configuration loadFlinkConfiguration() { @@ -235,6 +238,14 @@ public void setJobType(String jobType) { this.jobType = jobType; } + public String getRunMode() { + return runMode; + } + + public void setRunMode(String runMode) { + this.runMode = runMode; + } + @Override public String toString() { return new StringJoiner(", ", Options.class.getSimpleName() + "[", "]") @@ -248,10 +259,12 @@ public String toString() { .add("flinkLibDir='" + flinkLibDir + "'") .add("confProp='" + confProp + "'") .add("p='" + p + "'") + .add("pj='" + pj + "'") .add("pluginLoadMode='" + pluginLoadMode + "'") .add("remoteChunJunDistDir='" + remoteChunJunDistDir + "'") .add("addjar='" + addjar + "'") .add("addShipfile='" + addShipfile + "'") + .add("runMode='" + runMode + "'") .add("flinkConfiguration=" + flinkConfiguration) .toString(); }