diff --git a/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/source/ClickhouseInputFormat.java b/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/source/ClickhouseInputFormat.java index 2a6ece2186..432c2624ad 100644 --- a/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/source/ClickhouseInputFormat.java +++ b/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/source/ClickhouseInputFormat.java @@ -80,7 +80,7 @@ public void openInternal(InputSplit inputSplit) { columnCount = resultSet.getMetaData().getColumnCount(); } // 增量任务 - isUpdateLocation = + needUpdateEndLocation = jdbcConf.isIncrement() && !jdbcConf.isPolling() && !jdbcConf.isUseMaxFunc(); RowType rowType = TableUtil.createRowType( diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java index 95ef89b645..cd2f3b9613 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java @@ -55,7 +55,7 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable { private String orderByColumn; private String querySql; private String splitPk; - private String splitStrategy = "range"; + private String splitStrategy; private int fetchSize = 0; private int queryTimeOut = 0; // 连接超时时间 diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java index 56f5d95e39..46c43ad8f0 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java @@ -357,7 +357,7 @@ default String getSplitRangeFilter(JdbcInputSplit split, String splitPkName) { sql.append(" AND "); } sql.append(quoteIdentifier(splitPkName)) - .append(" < ") + .append(split.getRangeEndLocationOperator()) .append(split.getEndLocationOfSplit()); } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java index 2d5b208759..2d3cfd967f 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java @@ -55,6 +55,7 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -85,7 +86,7 @@ public class JdbcInputFormat extends BaseRichInputFormat { protected boolean hasNext; protected int columnCount; - protected boolean isUpdateLocation; + protected boolean needUpdateEndLocation; protected Object state = null; protected StringAccumulator maxValueAccumulator; @@ -95,15 +96,17 @@ public class JdbcInputFormat extends BaseRichInputFormat { // 轮询增量标识字段类型 protected ColumnType type; + protected JdbcInputSplit currentJdbcInputSplit; + @Override public void openInternal(InputSplit inputSplit) { - JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit; - initMetric(jdbcInputSplit); - if (!canReadData(jdbcInputSplit)) { + this.currentJdbcInputSplit = (JdbcInputSplit) inputSplit; + initMetric(currentJdbcInputSplit); + if (!canReadData(currentJdbcInputSplit)) { LOG.warn( "Not read data when the start location are equal to end location, start = {}, end = {}", - jdbcInputSplit.getStartLocation(), - jdbcInputSplit.getEndLocation()); + currentJdbcInputSplit.getStartLocation(), + currentJdbcInputSplit.getEndLocation()); hasNext = false; return; } @@ -127,14 +130,14 @@ public void openInternal(InputSplit inputSplit) { columnNameList = columnPair.getLeft(); columnTypeList = columnPair.getRight(); - querySQL = buildQuerySql(jdbcInputSplit); + querySQL = buildQuerySql(currentJdbcInputSplit); jdbcConf.setQuerySql(querySQL); - executeQuery(jdbcInputSplit.getStartLocation()); + executeQuery(currentJdbcInputSplit.getStartLocation()); if (!resultSet.isClosed()) { columnCount = resultSet.getMetaData().getColumnCount(); } // 增量任务 - isUpdateLocation = + needUpdateEndLocation = jdbcConf.isIncrement() && !jdbcConf.isPolling() && !jdbcConf.isUseMaxFunc(); RowType rowType = TableUtil.createRowType( @@ -158,39 +161,70 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) { "numTaskVertices is [%s], but parallelism in jdbcConf is [%s]", minNumSplits, jdbcConf.getParallelism())); } - JdbcInputSplit[] splits; if (jdbcConf.getParallelism() > 1 && StringUtils.equalsIgnoreCase("range", jdbcConf.getSplitStrategy())) { - splits = createSplitsInternalBySplitRange(minNumSplits); + // splitStrategy = range + return createSplitsInternalBySplitRange(minNumSplits); } else { - splits = new JdbcInputSplit[minNumSplits]; - if (StringUtils.isNotBlank(jdbcConf.getStartLocation())) { - String[] startLocations = - jdbcConf.getStartLocation().split(ConstantValue.COMMA_SYMBOL); - if (startLocations.length == 1) { - for (int i = 0; i < minNumSplits; i++) { - splits[i] = - new JdbcInputSplit( - i, minNumSplits, i, startLocations[0], null, null, null); - } - } else if (startLocations.length != jdbcConf.getParallelism()) { - throw new ChunJunRuntimeException( - String.format( - "startLocations is %s, but parallelism in jdbcConf is [%s]", - Arrays.toString(startLocations), jdbcConf.getParallelism())); - } else { - for (int i = 0; i < minNumSplits; i++) { - splits[i] = - new JdbcInputSplit( - i, minNumSplits, i, startLocations[i], null, null, null); - } + // default,splitStrategy = mod + return createSplitsInternalBySplitMod(minNumSplits, jdbcConf.getStartLocation()); + } + } + + /** Create split for modSplitStrategy */ + public JdbcInputSplit[] createSplitsInternalBySplitMod(int minNumSplits, String startLocation) { + JdbcInputSplit[] splits = new JdbcInputSplit[minNumSplits]; + if (StringUtils.isNotBlank(startLocation)) { + String[] startLocations = startLocation.split(ConstantValue.COMMA_SYMBOL); + if (startLocations.length == 1) { + for (int i = 0; i < minNumSplits; i++) { + splits[i] = + new JdbcInputSplit( + i, + minNumSplits, + i, + startLocations[0], + null, + null, + null, + "mod", + jdbcConf.isPolling()); } + } else if (startLocations.length != jdbcConf.getParallelism()) { + throw new ChunJunRuntimeException( + String.format( + "startLocations is %s, but parallelism in jdbcConf is [%s]", + Arrays.toString(startLocations), jdbcConf.getParallelism())); } else { for (int i = 0; i < minNumSplits; i++) { - splits[i] = new JdbcInputSplit(i, minNumSplits, i, null, null, null, null); + splits[i] = + new JdbcInputSplit( + i, + minNumSplits, + i, + startLocations[i], + null, + null, + null, + "mod", + jdbcConf.isPolling()); } } + } else { + for (int i = 0; i < minNumSplits; i++) { + splits[i] = + new JdbcInputSplit( + i, + minNumSplits, + i, + null, + null, + null, + null, + "mod", + jdbcConf.isPolling()); + } } LOG.info("createInputSplitsInternal success, splits is {}", GsonUtil.GSON.toJson(splits)); @@ -202,7 +236,7 @@ public boolean reachedEnd() { if (hasNext) { return false; } else { - if (jdbcConf.isPolling()) { + if (currentJdbcInputSplit.isPolling()) { try { TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval()); // 间隔轮询检测数据库连接是否断开,超时时间三秒,断开后自动重连 @@ -250,7 +284,7 @@ public RowData nextRecordInternal(RowData rowData) throws ReadRecordException { try { @SuppressWarnings("unchecked") RowData finalRowData = rowConverter.toInternal(resultSet); - if (isUpdateLocation) { + if (needUpdateEndLocation) { Object obj; switch (type) { case DATETIME: @@ -498,7 +532,7 @@ private Pair getSplitRangeFromDb() { */ protected boolean canReadData(JdbcInputSplit jdbcInputSplit) { // 只排除增量同步 - if (!jdbcConf.isIncrement() || jdbcConf.isPolling()) { + if (!jdbcConf.isIncrement() || currentJdbcInputSplit.isPolling()) { return true; } @@ -688,7 +722,7 @@ protected void buildLocationFilter(JdbcInputSplit jdbcInputSplit, List w startLocation, jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn()), jdbcConf.getIncreColumnType(), - jdbcConf.isPolling(), + jdbcInputSplit.isPolling(), this::getTimeStr); } if (StringUtils.isNotBlank(jdbcInputSplit.getEndLocation())) { @@ -714,25 +748,50 @@ protected String buildQuerySqlBySplit(JdbcInputSplit jdbcInputSplit, List splits = new ArrayList<>(); Pair splitRangeFromDb = getSplitRangeFromDb(); - if (StringUtils.isBlank(splitRangeFromDb.getLeft()) - || "null".equalsIgnoreCase(splitRangeFromDb.getLeft())) { - // 没有数据,返回空数组 - return new JdbcInputSplit[minNumSplits]; + BigDecimal left, right = null; + if (StringUtils.isNotBlank(splitRangeFromDb.getLeft()) + && !"null".equalsIgnoreCase(splitRangeFromDb.getLeft())) { + left = new BigDecimal(splitRangeFromDb.getLeft()); + right = new BigDecimal(splitRangeFromDb.getRight()); + splits.addAll(createRangeSplits(left, right, minNumSplits)); + if (jdbcConf.isPolling()) { + // rangeSplit in polling mode,range first then mod.we need to change the last range + // shard here to <= endLocationOfSplit + splits.get(splits.size() - 1).setRangeEndLocationOperator(" <= "); + } } - BigDecimal left = new BigDecimal(splitRangeFromDb.getLeft()); - BigDecimal right = new BigDecimal(splitRangeFromDb.getRight()); - LOG.info("create splitsInternal,the splitKey range is {} --> {}", left, right); + // create modSplit for polling + if (jdbcConf.isPolling()) { + splits.addAll( + Arrays.asList( + createSplitsInternalBySplitMod( + jdbcConf.getParallelism(), + right == null + ? jdbcConf.getStartLocation() + : String.valueOf(right)))); + } + // Reverse,when pollingMode configures rangeStrategy, be sure to do rangeSplit first, then + // modSplit + Collections.reverse(splits); + return splits.toArray(new JdbcInputSplit[0]); + } + + protected List createRangeSplits( + BigDecimal left, BigDecimal right, int minNumSplits) { BigDecimal endAndStartGap = right.subtract(left); + if (endAndStartGap.compareTo(BigDecimal.ZERO) < 0) return new ArrayList<>(); + JdbcInputSplit[] splits; + LOG.info("create splitsInternal,the splitKey range is {} --> {}", left, right); BigDecimal remainder = endAndStartGap.remainder(new BigDecimal(minNumSplits)); endAndStartGap = endAndStartGap.subtract(remainder); BigDecimal step = endAndStartGap.divide(new BigDecimal(minNumSplits)); if (step.compareTo(BigDecimal.ZERO) == 0) { - // left = right时,step和remainder都为0 + // if left = right,step and remainder is 0 if (remainder.compareTo(BigDecimal.ZERO) == 0) { minNumSplits = 1; } else { @@ -750,9 +809,14 @@ protected JdbcInputSplit[] createSplitsInternalBySplitRange(int minNumSplits) { end = end.add(BigDecimal.ONE); remainder = remainder.subtract(BigDecimal.ONE); } - // 分片范围是 splitPk >=start and splitPk < end 最后一个分片范围是splitPk >= start + // incrementalMode,The final rangeSplit scope is splitPk >= start + // pollingMode,The final rangeSplit scope is splitPk >= start and splitPk < =end if (i == minNumSplits - 1) { - end = null; + if (jdbcConf.isPolling()) { + end = right; + } else { + end = null; + } } splits[i] = new JdbcInputSplit( @@ -762,10 +826,12 @@ protected JdbcInputSplit[] createSplitsInternalBySplitRange(int minNumSplits) { jdbcConf.getStartLocation(), null, start.toString(), - Objects.isNull(end) ? null : end.toString()); + Objects.isNull(end) ? null : end.toString(), + "range", + false); } - return splits; + return Arrays.asList(splits); } /** @@ -775,7 +841,7 @@ protected JdbcInputSplit[] createSplitsInternalBySplitRange(int minNumSplits) { * @throws SQLException */ protected void executeQuery(String startLocation) throws SQLException { - if (jdbcConf.isPolling()) { + if (currentJdbcInputSplit.isPolling()) { if (StringUtils.isBlank(startLocation)) { // Get the startLocation from the database queryPollingWithOutStartLocation(); diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatBuilder.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatBuilder.java index 999d11b109..56fab9808a 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatBuilder.java @@ -68,9 +68,6 @@ protected void checkFormat() { sb.append("increColumn can't be empty when increment is true;\n"); } conf.setSplitPk(conf.getIncreColumn()); - if (conf.getParallelism() > 1) { - conf.setSplitStrategy("mod"); - } } if (conf.getParallelism() > 1) { diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputSplit.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputSplit.java index 84f3538b79..6edd61a3c8 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputSplit.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputSplit.java @@ -39,6 +39,13 @@ public class JdbcInputSplit extends GenericInputSplit { /** 分片endLocation * */ private String endLocationOfSplit; + private boolean isPolling; + + private String splitStrategy; + + /** only latest range split use '<=' */ + private String rangeEndLocationOperator = " < "; + /** * Creates a generic input split with the given split number. * @@ -52,13 +59,17 @@ public JdbcInputSplit( String startLocation, String endLocation, String startLocationOfSplit, - String endLocationOfSplit) { + String endLocationOfSplit, + String splitStrategy, + boolean isPolling) { super(partitionNumber, totalNumberOfPartitions); this.mod = mod; this.startLocation = startLocation; this.endLocation = endLocation; this.startLocationOfSplit = startLocationOfSplit; this.endLocationOfSplit = endLocationOfSplit; + this.splitStrategy = splitStrategy; + this.isPolling = isPolling; } public int getMod() { @@ -101,6 +112,30 @@ public void setEndLocationOfSplit(String endLocationOfSplit) { this.endLocationOfSplit = endLocationOfSplit; } + public String getRangeEndLocationOperator() { + return rangeEndLocationOperator; + } + + public void setRangeEndLocationOperator(String rangeEndLocationOperator) { + this.rangeEndLocationOperator = rangeEndLocationOperator; + } + + public boolean isPolling() { + return isPolling; + } + + public void setPolling(boolean polling) { + isPolling = polling; + } + + public String getSplitStrategy() { + return splitStrategy; + } + + public void setSplitStrategy(String splitStrategy) { + this.splitStrategy = splitStrategy; + } + @Override public String toString() { return "JdbcInputSplit{" @@ -118,6 +153,15 @@ public String toString() { + ", endLocationOfSplit='" + endLocationOfSplit + '\'' - + '}'; + + ", isPolling=" + + isPolling + + ", splitStrategy='" + + splitStrategy + + '\'' + + ", rangeEndLocationOperator='" + + rangeEndLocationOperator + + '\'' + + '}' + + super.toString(); } } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcSourceFactory.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcSourceFactory.java index 08d85a5abf..fdd6e282a1 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcSourceFactory.java @@ -92,6 +92,7 @@ ConnectionConf.class, new ConnectionAdapter("SourceConnectionConf")) } } initIncrementConfig(jdbcConf); + setDefaultSplitStrategy(jdbcConf); super.initCommonConf(jdbcConf); if (StringUtils.isBlank(jdbcConf.getCustomSql())) { rebuildJdbcConf(); @@ -140,6 +141,17 @@ protected JdbcInputFormatBuilder getBuilder() { return new JdbcInputFormatBuilder(new JdbcInputFormat()); } + /** set default split strategy if splitStrategy is blank */ + private void setDefaultSplitStrategy(JdbcConf jdbcConf) { + if (jdbcConf.getSplitStrategy() == null || jdbcConf.getSplitStrategy().equals("")) { + if (jdbcConf.isIncrement() && jdbcConf.getParallelism() > 1) { + jdbcConf.setSplitStrategy("mod"); + } else { + jdbcConf.setSplitStrategy("range"); + } + } + } + /** * 初始化增量或间隔轮询任务配置 * diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputFormat.java index ef63cffe2f..a5cb91d794 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputFormat.java @@ -90,7 +90,12 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) { for (int i = 0; i < subList.size(); i++) { DistributedJdbcInputSplit split = - new DistributedJdbcInputSplit(i, minNumSplits, subList.get(i)); + new DistributedJdbcInputSplit( + i, + minNumSplits, + subList.get(i), + jdbcConf.getSplitStrategy(), + jdbcConf.isPolling()); inputSplits[i] = split; } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputSplit.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputSplit.java index b4c502bbe2..2eeb28f247 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputSplit.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputSplit.java @@ -32,8 +32,21 @@ public class DistributedJdbcInputSplit extends JdbcInputSplit { private List sourceList; public DistributedJdbcInputSplit( - int partitionNumber, int totalNumberOfPartitions, List sourceList) { - super(partitionNumber, totalNumberOfPartitions, partitionNumber, null, null, null, null); + int partitionNumber, + int totalNumberOfPartitions, + List sourceList, + String splitStrategy, + boolean isPolling) { + super( + partitionNumber, + totalNumberOfPartitions, + partitionNumber, + null, + null, + null, + null, + splitStrategy, + isPolling); this.sourceList = sourceList; } diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/SqlUtil.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/SqlUtil.java index 297ffcc137..ba1986b12a 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/SqlUtil.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/SqlUtil.java @@ -110,7 +110,10 @@ public static String buildQuerySqlBySplit( } splitFilter = buildSplitFilterSql( - jdbcConf.getSplitStrategy(), jdbcDialect, jdbcInputSplit, splitColumn); + jdbcInputSplit.getSplitStrategy(), + jdbcDialect, + jdbcInputSplit, + splitColumn); } String querySql; diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/test/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatTest.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/test/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatTest.java index 211198b8b6..243aa5a8df 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/test/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatTest.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/test/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormatTest.java @@ -33,6 +33,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.math.BigDecimal; import java.util.Arrays; /** @author liuliu 2022/4/15 */ @@ -49,6 +50,8 @@ public void setup() { Whitebox.setInternalState(jdbcInputFormat, "LOG", LOG); JdbcConf jdbcConf = PowerMockito.mock(JdbcConf.class); Whitebox.setInternalState(jdbcInputFormat, "jdbcConf", jdbcConf); + PowerMockito.when(jdbcConf.isPolling()).thenReturn(true); + PowerMockito.when(jdbcConf.getParallelism()).thenReturn(3); PowerMockito.when(jdbcConf.getStartLocation()).thenReturn("10"); } @@ -61,9 +64,19 @@ public void createSplitsInternalBySplitRangeTest() PowerMockito.method(JdbcInputFormat.class, "getSplitRangeFromDb"); Mockito.when(getSplitRangeFromDb.invoke(jdbcInputFormat)) .thenReturn(Pair.of("12.123", "345534.12")); + Mockito.when( + jdbcInputFormat.createRangeSplits( + Mockito.any(BigDecimal.class), + Mockito.any(BigDecimal.class), + Mockito.anyInt())) + .thenCallRealMethod(); + Mockito.when( + jdbcInputFormat.createSplitsInternalBySplitMod( + Mockito.anyInt(), Mockito.anyString())) + .thenCallRealMethod(); JdbcInputSplit[] splitsInternalBySplitRange = jdbcInputFormat.createSplitsInternalBySplitRange(3); - Arrays.stream(splitsInternalBySplitRange).forEach(split -> System.out.println(split)); - assert splitsInternalBySplitRange.length == 3; + Arrays.stream(splitsInternalBySplitRange).forEach(System.out::println); + assert splitsInternalBySplitRange.length == 6; } }