Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.dtstack.flinkx.options.OptionParser;
import com.dtstack.flinkx.options.Options;
import com.dtstack.flinkx.util.ExecuteProcessHelper;
import com.dtstack.flinkx.util.JsonModifyUtil;

import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.configuration.ConfigConstants;
Expand Down Expand Up @@ -72,12 +71,6 @@ public static void main(String[] args) throws Exception {
for (int i = 0; i < argList.size(); i += 2) {
temp.put(argList.get(i), argList.get(i + 1));
}
// 对json中的值进行修改
String s = temp.get("-p");
if (StringUtils.isNotBlank(s)) {
HashMap<String, String> parameter = JsonModifyUtil.CommandTransform(s);
temp.put("-job", JsonModifyUtil.JsonValueReplace(temp.get("-job"), parameter));
}

// 清空list,填充修改后的参数值
argList.clear();
Expand All @@ -88,7 +81,7 @@ public static void main(String[] args) throws Exception {

JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList);

ClusterClientHelper clusterClientHelper = null;
ClusterClientHelper clusterClientHelper;
switch (ClusterMode.getByName(launcherOptions.getMode())) {
case local:
clusterClientHelper = new LocalClusterClientHelper();
Expand Down
6 changes: 4 additions & 2 deletions flinkx-core/src/main/java/com/dtstack/flinkx/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.dtstack.flinkx.util.DataSyncFactoryUtil;
import com.dtstack.flinkx.util.ExecuteProcessHelper;
import com.dtstack.flinkx.util.FactoryHelper;
import com.dtstack.flinkx.util.JobUtil;
import com.dtstack.flinkx.util.PluginUtil;
import com.dtstack.flinkx.util.PrintUtil;
import com.dtstack.flinkx.util.PropertiesUtil;
Expand Down Expand Up @@ -96,6 +97,7 @@ public static void main(String[] args) throws Exception {

Options options = new OptionParser(args).getOptions();
String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name());
String replacedJob = JobUtil.replaceJobParameter(options.getP(), job);
Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options);
StreamTableEnvironment tEnv =
Expand All @@ -105,10 +107,10 @@ public static void main(String[] args) throws Exception {
tEnv.getConfig().getConfiguration().toString());
switch (EJobType.getByName(options.getJobType())) {
case SQL:
exeSqlJob(env, tEnv, job, options);
exeSqlJob(env, tEnv, replacedJob, options);
break;
case SYNC:
exeSyncJob(env, tEnv, job, options);
exeSyncJob(env, tEnv, replacedJob, options);
break;
default:
throw new FlinkxRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -23,9 +23,24 @@
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

/** @author tiezhu */
public class JsonModifyUtil {
public class JobUtil {

private JobUtil() throws IllegalAccessException {
throw new IllegalAccessException(getClass().getName() + " can not be instantiated");
}

public static String replaceJobParameter(String p, String job) {
if (StringUtils.isNotBlank(p)) {
HashMap<String, String> parameters = CommandTransform(p);
for (Map.Entry<String, String> entry : parameters.entrySet()) {
job = job.replaceAll(Pattern.quote(entry.getKey()), entry.getValue());
}
}
return job;
}

public static String JsonValueReplace(String json, HashMap<String, String> parameter) {
for (String item : parameter.keySet()) {
Expand All @@ -42,7 +57,7 @@ public static HashMap<String, String> CommandTransform(String command) {
String[] split = StringUtils.split(command, ConstantValue.COMMA_SYMBOL);
for (String item : split) {
String[] temp = item.split(ConstantValue.EQUAL_SYMBOL);
parameter.put(temp[0], temp[1]);
parameter.put(temp[0].trim(), temp[1].trim());
}
return parameter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/** @author jiangbo */
Expand All @@ -54,15 +52,17 @@ public static void main(String[] args) throws Exception {
// confProperties.setProperty("state.checkpoints.dir", "file:///ck");
String userDir = System.getProperty("user.dir");

String jobPath = "/Users/wtz/work_place/job_place/json/1.12/binlog-stream.json";
// userDir + "/flinkx-examples/json/stream/stream.json";
String jobPath = userDir + "/flinkx-examples/json/stream/stream.json";
String flinkxDistDir = userDir + "/flinkx-dist";
String s = "";

// 任务配置参数
List<String> argsList = new ArrayList<>();
argsList.add("-mode");
argsList.add("local");
// 替换脚本中的值
// argsList.add("-p");
// argsList.add("$aa=aaa, $bb=bbb");
String content = readFile(jobPath);
if (StringUtils.endsWith(jobPath, "json")) {
argsList.add("-jobType");
Expand Down Expand Up @@ -159,18 +159,20 @@ public static void main(String[] args) throws Exception {
python.requirements -pyreq
*/
/* ---------------------------------------- pyFlink 测试 start --------------------------------------- */
Map<String, String> config = new HashMap<>();
config.put(
"python.files",
"/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/test1.py");
config.put("python.client.executable", "python3");
config.put("python.executable", "python3");
config.put(
"python.requirements",
"/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/requirements3.txt#/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/cached_dir_binary3");
String configJsonString = GsonUtil.GSON.toJson(config);
argsList.add("-confProp");
argsList.add(configJsonString);
// Map<String, String> config = new HashMap<>();
// config.put(
// "python.files",
//
// "/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/test1.py");
// config.put("python.client.executable", "python3");
// config.put("python.executable", "python3");
// config.put(
// "python.requirements",
//
// "/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/requirements3.txt#/Users/lzq/Desktop/Projects/Flink/PyFlinkDemo/enjoyment.code/PythonUDFProvideToJava/cached_dir_binary3");
// String configJsonString = GsonUtil.GSON.toJson(config);
// argsList.add("-confProp");
// argsList.add(configJsonString);
/* ---------------------------------------- pyFlink 测试 end --------------------------------------- */
}
// 防止加载flinkx-connector-kafka/target/classes/META-INF/services/下的spi文件
Expand Down