Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void openInternal(InputSplit inputSplit) {
columnCount = resultSet.getMetaData().getColumnCount();
}
// 增量任务
isUpdateLocation =
needUpdateEndLocation =
jdbcConf.isIncrement() && !jdbcConf.isPolling() && !jdbcConf.isUseMaxFunc();
RowType rowType =
TableUtil.createRowType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable {
private String orderByColumn;
private String querySql;
private String splitPk;
private String splitStrategy = "range";
private String splitStrategy;
private int fetchSize = 0;
private int queryTimeOut = 0;
// 连接超时时间
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ default String getSplitRangeFilter(JdbcInputSplit split, String splitPkName) {
sql.append(" AND ");
}
sql.append(quoteIdentifier(splitPkName))
.append(" < ")
.append(split.getRangeEndLocationOperator())
.append(split.getEndLocationOfSplit());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -85,7 +86,7 @@ public class JdbcInputFormat extends BaseRichInputFormat {
protected boolean hasNext;

protected int columnCount;
protected boolean isUpdateLocation;
protected boolean needUpdateEndLocation;
protected Object state = null;

protected StringAccumulator maxValueAccumulator;
Expand All @@ -95,15 +96,17 @@ public class JdbcInputFormat extends BaseRichInputFormat {
// 轮询增量标识字段类型
protected ColumnType type;

protected JdbcInputSplit currentJdbcInputSplit;

@Override
public void openInternal(InputSplit inputSplit) {
JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit;
initMetric(jdbcInputSplit);
if (!canReadData(jdbcInputSplit)) {
this.currentJdbcInputSplit = (JdbcInputSplit) inputSplit;
initMetric(currentJdbcInputSplit);
if (!canReadData(currentJdbcInputSplit)) {
LOG.warn(
"Not read data when the start location are equal to end location, start = {}, end = {}",
jdbcInputSplit.getStartLocation(),
jdbcInputSplit.getEndLocation());
currentJdbcInputSplit.getStartLocation(),
currentJdbcInputSplit.getEndLocation());
hasNext = false;
return;
}
Expand All @@ -127,14 +130,14 @@ public void openInternal(InputSplit inputSplit) {
columnNameList = columnPair.getLeft();
columnTypeList = columnPair.getRight();

querySQL = buildQuerySql(jdbcInputSplit);
querySQL = buildQuerySql(currentJdbcInputSplit);
jdbcConf.setQuerySql(querySQL);
executeQuery(jdbcInputSplit.getStartLocation());
executeQuery(currentJdbcInputSplit.getStartLocation());
if (!resultSet.isClosed()) {
columnCount = resultSet.getMetaData().getColumnCount();
}
// 增量任务
isUpdateLocation =
needUpdateEndLocation =
jdbcConf.isIncrement() && !jdbcConf.isPolling() && !jdbcConf.isUseMaxFunc();
RowType rowType =
TableUtil.createRowType(
Expand All @@ -158,39 +161,70 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) {
"numTaskVertices is [%s], but parallelism in jdbcConf is [%s]",
minNumSplits, jdbcConf.getParallelism()));
}
JdbcInputSplit[] splits;

if (jdbcConf.getParallelism() > 1
&& StringUtils.equalsIgnoreCase("range", jdbcConf.getSplitStrategy())) {
splits = createSplitsInternalBySplitRange(minNumSplits);
// splitStrategy = range
return createSplitsInternalBySplitRange(minNumSplits);
} else {
splits = new JdbcInputSplit[minNumSplits];
if (StringUtils.isNotBlank(jdbcConf.getStartLocation())) {
String[] startLocations =
jdbcConf.getStartLocation().split(ConstantValue.COMMA_SYMBOL);
if (startLocations.length == 1) {
for (int i = 0; i < minNumSplits; i++) {
splits[i] =
new JdbcInputSplit(
i, minNumSplits, i, startLocations[0], null, null, null);
}
} else if (startLocations.length != jdbcConf.getParallelism()) {
throw new ChunJunRuntimeException(
String.format(
"startLocations is %s, but parallelism in jdbcConf is [%s]",
Arrays.toString(startLocations), jdbcConf.getParallelism()));
} else {
for (int i = 0; i < minNumSplits; i++) {
splits[i] =
new JdbcInputSplit(
i, minNumSplits, i, startLocations[i], null, null, null);
}
// default,splitStrategy = mod
return createSplitsInternalBySplitMod(minNumSplits, jdbcConf.getStartLocation());
}
}

/** Create split for modSplitStrategy */
public JdbcInputSplit[] createSplitsInternalBySplitMod(int minNumSplits, String startLocation) {
JdbcInputSplit[] splits = new JdbcInputSplit[minNumSplits];
if (StringUtils.isNotBlank(startLocation)) {
String[] startLocations = startLocation.split(ConstantValue.COMMA_SYMBOL);
if (startLocations.length == 1) {
for (int i = 0; i < minNumSplits; i++) {
splits[i] =
new JdbcInputSplit(
i,
minNumSplits,
i,
startLocations[0],
null,
null,
null,
"mod",
jdbcConf.isPolling());
}
} else if (startLocations.length != jdbcConf.getParallelism()) {
throw new ChunJunRuntimeException(
String.format(
"startLocations is %s, but parallelism in jdbcConf is [%s]",
Arrays.toString(startLocations), jdbcConf.getParallelism()));
} else {
for (int i = 0; i < minNumSplits; i++) {
splits[i] = new JdbcInputSplit(i, minNumSplits, i, null, null, null, null);
splits[i] =
new JdbcInputSplit(
i,
minNumSplits,
i,
startLocations[i],
null,
null,
null,
"mod",
jdbcConf.isPolling());
}
}
} else {
for (int i = 0; i < minNumSplits; i++) {
splits[i] =
new JdbcInputSplit(
i,
minNumSplits,
i,
null,
null,
null,
null,
"mod",
jdbcConf.isPolling());
}
}

LOG.info("createInputSplitsInternal success, splits is {}", GsonUtil.GSON.toJson(splits));
Expand All @@ -202,7 +236,7 @@ public boolean reachedEnd() {
if (hasNext) {
return false;
} else {
if (jdbcConf.isPolling()) {
if (currentJdbcInputSplit.isPolling()) {
try {
TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval());
// 间隔轮询检测数据库连接是否断开,超时时间三秒,断开后自动重连
Expand Down Expand Up @@ -250,7 +284,7 @@ public RowData nextRecordInternal(RowData rowData) throws ReadRecordException {
try {
@SuppressWarnings("unchecked")
RowData finalRowData = rowConverter.toInternal(resultSet);
if (isUpdateLocation) {
if (needUpdateEndLocation) {
Object obj;
switch (type) {
case DATETIME:
Expand Down Expand Up @@ -498,7 +532,7 @@ private Pair<String, String> getSplitRangeFromDb() {
*/
protected boolean canReadData(JdbcInputSplit jdbcInputSplit) {
// 只排除增量同步
if (!jdbcConf.isIncrement() || jdbcConf.isPolling()) {
if (!jdbcConf.isIncrement() || currentJdbcInputSplit.isPolling()) {
return true;
}

Expand Down Expand Up @@ -688,7 +722,7 @@ protected void buildLocationFilter(JdbcInputSplit jdbcInputSplit, List<String> w
startLocation,
jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn()),
jdbcConf.getIncreColumnType(),
jdbcConf.isPolling(),
jdbcInputSplit.isPolling(),
this::getTimeStr);
}
if (StringUtils.isNotBlank(jdbcInputSplit.getEndLocation())) {
Expand All @@ -714,25 +748,50 @@ protected String buildQuerySqlBySplit(JdbcInputSplit jdbcInputSplit, List<String
jdbcConf, jdbcDialect, whereList, columnNameList, jdbcInputSplit);
}

/** create split when splitStrategy is range * */
/** create split for rangeSplitStrategy */
protected JdbcInputSplit[] createSplitsInternalBySplitRange(int minNumSplits) {
JdbcInputSplit[] splits;
List<JdbcInputSplit> splits = new ArrayList<>();
Pair<String, String> splitRangeFromDb = getSplitRangeFromDb();
if (StringUtils.isBlank(splitRangeFromDb.getLeft())
|| "null".equalsIgnoreCase(splitRangeFromDb.getLeft())) {
// 没有数据,返回空数组
return new JdbcInputSplit[minNumSplits];
BigDecimal left, right = null;
if (StringUtils.isNotBlank(splitRangeFromDb.getLeft())
&& !"null".equalsIgnoreCase(splitRangeFromDb.getLeft())) {
left = new BigDecimal(splitRangeFromDb.getLeft());
right = new BigDecimal(splitRangeFromDb.getRight());
splits.addAll(createRangeSplits(left, right, minNumSplits));
if (jdbcConf.isPolling()) {
// rangeSplit in polling mode,range first then mod.we need to change the last range
// shard here to <= endLocationOfSplit
splits.get(splits.size() - 1).setRangeEndLocationOperator(" <= ");
}
}
BigDecimal left = new BigDecimal(splitRangeFromDb.getLeft());
BigDecimal right = new BigDecimal(splitRangeFromDb.getRight());
LOG.info("create splitsInternal,the splitKey range is {} --> {}", left, right);
// create modSplit for polling
if (jdbcConf.isPolling()) {
splits.addAll(
Arrays.asList(
createSplitsInternalBySplitMod(
jdbcConf.getParallelism(),
right == null
? jdbcConf.getStartLocation()
: String.valueOf(right))));
}
// Reverse,when pollingMode configures rangeStrategy, be sure to do rangeSplit first, then
// modSplit
Collections.reverse(splits);
return splits.toArray(new JdbcInputSplit[0]);
}

protected List<JdbcInputSplit> createRangeSplits(
BigDecimal left, BigDecimal right, int minNumSplits) {
BigDecimal endAndStartGap = right.subtract(left);
if (endAndStartGap.compareTo(BigDecimal.ZERO) < 0) return new ArrayList<>();
JdbcInputSplit[] splits;
LOG.info("create splitsInternal,the splitKey range is {} --> {}", left, right);
BigDecimal remainder = endAndStartGap.remainder(new BigDecimal(minNumSplits));
endAndStartGap = endAndStartGap.subtract(remainder);
BigDecimal step = endAndStartGap.divide(new BigDecimal(minNumSplits));

if (step.compareTo(BigDecimal.ZERO) == 0) {
// left = right时,step和remainder都为0
// if left = right,step and remainder is 0
if (remainder.compareTo(BigDecimal.ZERO) == 0) {
minNumSplits = 1;
} else {
Expand All @@ -750,9 +809,14 @@ protected JdbcInputSplit[] createSplitsInternalBySplitRange(int minNumSplits) {
end = end.add(BigDecimal.ONE);
remainder = remainder.subtract(BigDecimal.ONE);
}
// 分片范围是 splitPk >=start and splitPk < end 最后一个分片范围是splitPk >= start
// incrementalMode,The final rangeSplit scope is splitPk >= start
// pollingMode,The final rangeSplit scope is splitPk >= start and splitPk < =end
if (i == minNumSplits - 1) {
end = null;
if (jdbcConf.isPolling()) {
end = right;
} else {
end = null;
}
}
splits[i] =
new JdbcInputSplit(
Expand All @@ -762,10 +826,12 @@ protected JdbcInputSplit[] createSplitsInternalBySplitRange(int minNumSplits) {
jdbcConf.getStartLocation(),
null,
start.toString(),
Objects.isNull(end) ? null : end.toString());
Objects.isNull(end) ? null : end.toString(),
"range",
false);
}

return splits;
return Arrays.asList(splits);
}

/**
Expand All @@ -775,7 +841,7 @@ protected JdbcInputSplit[] createSplitsInternalBySplitRange(int minNumSplits) {
* @throws SQLException
*/
protected void executeQuery(String startLocation) throws SQLException {
if (jdbcConf.isPolling()) {
if (currentJdbcInputSplit.isPolling()) {
if (StringUtils.isBlank(startLocation)) {
// Get the startLocation from the database
queryPollingWithOutStartLocation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ protected void checkFormat() {
sb.append("increColumn can't be empty when increment is true;\n");
}
conf.setSplitPk(conf.getIncreColumn());
if (conf.getParallelism() > 1) {
conf.setSplitStrategy("mod");
}
}

if (conf.getParallelism() > 1) {
Expand Down
Loading