Skip to content

Commit 972f42f

Browse files
authored
[feat][jdbc] Separate the relationship between incrementColumn with RestoreColumn and splitPk (#1258)
1 parent cac4515 commit 972f42f

22 files changed

Lines changed: 892 additions & 425 deletions

File tree

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable {
6666
private boolean polling = false;
6767
/** 字段名称 */
6868
private String increColumn;
69+
/** Whether an OrderBy sort is required,increment mode need set to true. */
70+
private boolean isOrderBy = true;
6971
/** 字段索引 */
7072
private int increColumnIndex = -1;
7173
/** 字段类型 */
@@ -295,6 +297,14 @@ public void setIncreColumn(String increColumn) {
295297
this.increColumn = increColumn;
296298
}
297299

300+
public boolean isOrderBy() {
301+
return isOrderBy;
302+
}
303+
304+
public void setOrderBy(boolean orderBy) {
305+
isOrderBy = orderBy;
306+
}
307+
298308
public int getIncreColumnIndex() {
299309
return increColumnIndex;
300310
}

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/dialect/JdbcDialect.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@
2424
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputSplit;
2525
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
2626
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
27+
import com.dtstack.chunjun.connector.jdbc.util.key.DateTypeUtil;
28+
import com.dtstack.chunjun.connector.jdbc.util.key.KeyUtil;
29+
import com.dtstack.chunjun.connector.jdbc.util.key.NumericTypeUtil;
30+
import com.dtstack.chunjun.connector.jdbc.util.key.TimestampTypeUtil;
2731
import com.dtstack.chunjun.converter.AbstractRowConverter;
2832
import com.dtstack.chunjun.converter.RawTypeConverter;
33+
import com.dtstack.chunjun.enums.ColumnType;
34+
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
2935

3036
import org.apache.flink.table.api.TableSchema;
3137
import org.apache.flink.table.api.ValidationException;
@@ -36,6 +42,7 @@
3642
import org.apache.commons.lang3.StringUtils;
3743

3844
import java.io.Serializable;
45+
import java.math.BigInteger;
3946
import java.sql.ResultSet;
4047
import java.util.Arrays;
4148
import java.util.Objects;
@@ -370,4 +377,23 @@ default String getSplitModFilter(JdbcInputSplit split, String splitPkName) {
370377
" mod(%s, %s) = %s",
371378
quoteIdentifier(splitPkName), split.getTotalNumberOfSplits(), split.getMod());
372379
}
380+
381+
default KeyUtil<?, BigInteger> initKeyUtil(String incrementName, String incrementType) {
382+
switch (ColumnType.getType(incrementType)) {
383+
case TIMESTAMP:
384+
case DATETIME:
385+
return new TimestampTypeUtil();
386+
case DATE:
387+
return new DateTypeUtil();
388+
default:
389+
if (ColumnType.isNumberType(incrementType)) {
390+
return new NumericTypeUtil();
391+
} else {
392+
throw new ChunJunRuntimeException(
393+
String.format(
394+
"Unsupported columnType [%s], columnName [%s]",
395+
incrementType, incrementName));
396+
}
397+
}
398+
}
373399
}

chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcDynamicTableSource.java

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
2424
import com.dtstack.chunjun.connector.jdbc.lookup.JdbcAllTableFunction;
2525
import com.dtstack.chunjun.connector.jdbc.lookup.JdbcLruTableFunction;
26+
import com.dtstack.chunjun.connector.jdbc.util.key.KeyUtil;
2627
import com.dtstack.chunjun.enums.CacheType;
2728
import com.dtstack.chunjun.lookup.conf.LookupConf;
2829
import com.dtstack.chunjun.source.DtInputFormatSourceFunction;
@@ -45,6 +46,7 @@
4546

4647
import org.apache.commons.lang3.StringUtils;
4748

49+
import java.math.BigInteger;
4850
import java.util.ArrayList;
4951
import java.util.List;
5052
import java.util.Objects;
@@ -127,34 +129,70 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
127129

128130
// TODO sql任务使用增量同步或者间隔轮询时暂不支持增量指标写入外部存储,暂时设置为false
129131
jdbcConf.setInitReporter(false);
130-
String increColumn = jdbcConf.getIncreColumn();
131-
if (StringUtils.isNotBlank(increColumn)) {
132-
FieldConf fieldConf =
133-
FieldConf.getSameNameMetaColumn(jdbcConf.getColumn(), increColumn);
134-
if (fieldConf != null) {
135-
jdbcConf.setIncreColumnIndex(fieldConf.getIndex());
136-
jdbcConf.setIncreColumnType(fieldConf.getType());
137132

138-
jdbcConf.setRestoreColumn(increColumn);
139-
jdbcConf.setRestoreColumnIndex(fieldConf.getIndex());
140-
jdbcConf.setRestoreColumnType(fieldConf.getType());
141-
} else {
142-
throw new IllegalArgumentException("unknown incre column name: " + increColumn);
143-
}
144-
}
133+
KeyUtil<?, BigInteger> restoreKeyUtil = null;
134+
KeyUtil<?, BigInteger> splitKeyUtil = null;
135+
KeyUtil<?, BigInteger> incrementKeyUtil = null;
145136

137+
// init restore info
146138
String restoreColumn = jdbcConf.getRestoreColumn();
147139
if (StringUtils.isNotBlank(restoreColumn)) {
148140
FieldConf fieldConf =
149141
FieldConf.getSameNameMetaColumn(jdbcConf.getColumn(), restoreColumn);
150142
if (fieldConf != null) {
151143
jdbcConf.setRestoreColumnIndex(fieldConf.getIndex());
152144
jdbcConf.setRestoreColumnType(fieldConf.getType());
145+
restoreKeyUtil = jdbcDialect.initKeyUtil(fieldConf.getName(), fieldConf.getType());
153146
} else {
154147
throw new IllegalArgumentException("unknown restore column name: " + restoreColumn);
155148
}
156149
}
157150

151+
// init splitInfo
152+
String splitPk = jdbcConf.getSplitPk();
153+
if (StringUtils.isNotBlank(splitPk)) {
154+
FieldConf fieldConf = FieldConf.getSameNameMetaColumn(jdbcConf.getColumn(), splitPk);
155+
if (fieldConf != null) {
156+
jdbcConf.setSplitPk(fieldConf.getType());
157+
splitKeyUtil = jdbcDialect.initKeyUtil(fieldConf.getName(), fieldConf.getType());
158+
}
159+
}
160+
161+
// init incrementInfo
162+
String incrementColumn = jdbcConf.getIncreColumn();
163+
if (StringUtils.isNotBlank(incrementColumn)) {
164+
FieldConf fieldConf =
165+
FieldConf.getSameNameMetaColumn(jdbcConf.getColumn(), incrementColumn);
166+
int index;
167+
String name;
168+
String type;
169+
if (fieldConf != null) {
170+
index = fieldConf.getIndex();
171+
name = fieldConf.getName();
172+
type = fieldConf.getType();
173+
incrementKeyUtil = jdbcDialect.initKeyUtil(name, type);
174+
} else {
175+
throw new IllegalArgumentException(
176+
"unknown increment column name: " + incrementColumn);
177+
}
178+
jdbcConf.setIncreColumn(name);
179+
jdbcConf.setIncreColumnType(type);
180+
181+
jdbcConf.setRestoreColumn(name);
182+
jdbcConf.setRestoreColumnIndex(index);
183+
jdbcConf.setRestoreColumnType(type);
184+
restoreKeyUtil = incrementKeyUtil;
185+
186+
if (StringUtils.isBlank(jdbcConf.getSplitPk())) {
187+
splitKeyUtil = incrementKeyUtil;
188+
jdbcConf.setSplitPk(name);
189+
}
190+
}
191+
192+
builder.setRestoreKeyUtil(restoreKeyUtil);
193+
builder.setSplitKeyUtil(splitKeyUtil);
194+
builder.setIncrementKeyUtil(incrementKeyUtil);
195+
158196
builder.setJdbcDialect(jdbcDialect);
159197
builder.setJdbcConf(jdbcConf);
160198
builder.setRowConverter(jdbcDialect.getRowConverter(rowType));

0 commit comments

Comments
 (0)