diff --git a/flinkx-clients/src/main/java/com/dtstack/flinkx/client/Launcher.java b/flinkx-clients/src/main/java/com/dtstack/flinkx/client/Launcher.java index 0f3aa9f5b7..3bbb55168f 100644 --- a/flinkx-clients/src/main/java/com/dtstack/flinkx/client/Launcher.java +++ b/flinkx-clients/src/main/java/com/dtstack/flinkx/client/Launcher.java @@ -28,7 +28,6 @@ import com.dtstack.flinkx.options.OptionParser; import com.dtstack.flinkx.options.Options; import com.dtstack.flinkx.util.ExecuteProcessHelper; -import com.dtstack.flinkx.util.JsonModifyUtil; import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.configuration.ConfigConstants; @@ -72,12 +71,6 @@ public static void main(String[] args) throws Exception { for (int i = 0; i < argList.size(); i += 2) { temp.put(argList.get(i), argList.get(i + 1)); } - // 对json中的值进行修改 - String s = temp.get("-p"); - if (StringUtils.isNotBlank(s)) { - HashMap parameter = JsonModifyUtil.CommandTransform(s); - temp.put("-job", JsonModifyUtil.JsonValueReplace(temp.get("-job"), parameter)); - } // 清空list,填充修改后的参数值 argList.clear(); @@ -88,7 +81,7 @@ public static void main(String[] args) throws Exception { JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList); - ClusterClientHelper clusterClientHelper = null; + ClusterClientHelper clusterClientHelper; switch (ClusterMode.getByName(launcherOptions.getMode())) { case local: clusterClientHelper = new LocalClusterClientHelper(); diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java b/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java index 1ba3d8856c..7a3668cb6e 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/Main.java @@ -40,6 +40,7 @@ import com.dtstack.flinkx.util.DataSyncFactoryUtil; import com.dtstack.flinkx.util.ExecuteProcessHelper; import com.dtstack.flinkx.util.FactoryHelper; +import com.dtstack.flinkx.util.JobUtil; import com.dtstack.flinkx.util.PluginUtil; import com.dtstack.flinkx.util.PrintUtil; import com.dtstack.flinkx.util.PropertiesUtil; @@ -96,6 +97,7 @@ public static void main(String[] args) throws Exception { Options options = new OptionParser(args).getOptions(); String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name()); + String replacedJob = JobUtil.replaceJobParameter(options.getP(), job); Properties confProperties = PropertiesUtil.parseConf(options.getConfProp()); StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options); StreamTableEnvironment tEnv = @@ -105,10 +107,10 @@ public static void main(String[] args) throws Exception { tEnv.getConfig().getConfiguration().toString()); switch (EJobType.getByName(options.getJobType())) { case SQL: - exeSqlJob(env, tEnv, job, options); + exeSqlJob(env, tEnv, replacedJob, options); break; case SYNC: - exeSyncJob(env, tEnv, job, options); + exeSyncJob(env, tEnv, replacedJob, options); break; default: throw new FlinkxRuntimeException( diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/util/JsonModifyUtil.java b/flinkx-core/src/main/java/com/dtstack/flinkx/util/JobUtil.java similarity index 69% rename from flinkx-core/src/main/java/com/dtstack/flinkx/util/JsonModifyUtil.java rename to flinkx-core/src/main/java/com/dtstack/flinkx/util/JobUtil.java index 5481a27305..6ccec4128a 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/util/JsonModifyUtil.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/util/JobUtil.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,9 +23,24 @@ import org.apache.commons.lang3.StringUtils; import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; -/** @author tiezhu */ -public class JsonModifyUtil { +public class JobUtil { + + private JobUtil() throws IllegalAccessException { + throw new IllegalAccessException(getClass().getName() + " can not be instantiated"); + } + + public static String replaceJobParameter(String p, String job) { + if (StringUtils.isNotBlank(p)) { + HashMap parameters = CommandTransform(p); + for (Map.Entry entry : parameters.entrySet()) { + job = job.replaceAll(Pattern.quote(entry.getKey()), entry.getValue()); + } + } + return job; + } public static String JsonValueReplace(String json, HashMap parameter) { for (String item : parameter.keySet()) { @@ -42,7 +57,7 @@ public static HashMap CommandTransform(String command) { String[] split = StringUtils.split(command, ConstantValue.COMMA_SYMBOL); for (String item : split) { String[] temp = item.split(ConstantValue.EQUAL_SYMBOL); - parameter.put(temp[0], temp[1]); + parameter.put(temp[0].trim(), temp[1].trim()); } return parameter; } diff --git a/flinkx-local-test/src/main/java/com/dtstack/flinkx/local/test/LocalTest.java b/flinkx-local-test/src/main/java/com/dtstack/flinkx/local/test/LocalTest.java index f48679929e..8de386d458 100644 --- a/flinkx-local-test/src/main/java/com/dtstack/flinkx/local/test/LocalTest.java +++ b/flinkx-local-test/src/main/java/com/dtstack/flinkx/local/test/LocalTest.java @@ -35,9 +35,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; /** @author jiangbo */ @@ -54,8 +52,7 @@ public static void main(String[] args) throws Exception { // confProperties.setProperty("state.checkpoints.dir", "file:///ck"); String userDir = System.getProperty("user.dir"); - String jobPath = "/Users/wtz/work_place/job_place/json/1.12/binlog-stream.json"; - // userDir + "/flinkx-examples/json/stream/stream.json"; + String jobPath = userDir + "/flinkx-examples/json/stream/stream.json"; String flinkxDistDir = userDir + "/flinkx-dist"; String s = ""; @@ -63,6 +60,9 @@ public static void main(String[] args) throws Exception { List argsList = new ArrayList<>(); argsList.add("-mode"); argsList.add("local"); + // 替换脚本中的值 + // argsList.add("-p"); + // argsList.add("$aa=aaa, $bb=bbb"); String content = readFile(jobPath); if (StringUtils.endsWith(jobPath, "json")) { argsList.add("-jobType"); @@ -159,18 +159,20 @@ public static void main(String[] args) throws Exception { python.requirements -pyreq */ /* ---------------------------------------- pyFlink 测试 start --------------------------------------- */ - Map config = new HashMap<>(); - config.put( - "python.files", - "/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/test1.py"); - config.put("python.client.executable", "python3"); - config.put("python.executable", "python3"); - config.put( - "python.requirements", - "/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/requirements3.txt#/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/cached_dir_binary3"); - String configJsonString = GsonUtil.GSON.toJson(config); - argsList.add("-confProp"); - argsList.add(configJsonString); + // Map config = new HashMap<>(); + // config.put( + // "python.files", + // + // "/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/test1.py"); + // config.put("python.client.executable", "python3"); + // config.put("python.executable", "python3"); + // config.put( + // "python.requirements", + // + // "/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/requirements3.txt#/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/cached_dir_binary3"); + // String configJsonString = GsonUtil.GSON.toJson(config); + // argsList.add("-confProp"); + // argsList.add(configJsonString); /* ---------------------------------------- pyFlink 测试 end --------------------------------------- */ } // 防止加载flinkx-connector-kafka/target/classes/META-INF/services/下的spi文件