Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,17 @@ public class LogMinerConf extends ChunJunCommonConf {
/** 缓存的日志数 * */
private long transactionCacheNumSize = 1000;

/** 每个事务缓存的事件总数 * */
private long transactionEventSize = 5000;

private Properties properties;

/** 缓存的日志时间 * */
private long transactionExpireTime = 20;

/** 是否开启全量同步 * */
private boolean enableFetchAll = false;

public boolean getSupportAutoAddLog() {
return supportAutoAddLog;
}
Expand Down Expand Up @@ -277,6 +283,14 @@ public void setProperties(Properties properties) {
this.properties = properties;
}

public long getTransactionEventSize() {
return transactionEventSize;
}

public void setTransactionEventSize(long transactionEventSize) {
this.transactionEventSize = transactionEventSize;
}

public boolean isDdlSkip() {
return ddlSkip;
}
Expand All @@ -293,6 +307,14 @@ public void setInitialTableStructure(boolean initialTableStructure) {
this.initialTableStructure = initialTableStructure;
}

public void setEnableFetchAll(boolean isFetchAll) {
this.enableFetchAll = isFetchAll;
}

public boolean getEnableFetchAll() {
return this.enableFetchAll;
}

@Override
public String toString() {
return "LogMinerConf{"
Expand Down Expand Up @@ -351,6 +373,8 @@ public String toString() {
+ retryTimes
+ ", transactionCacheNumSize="
+ transactionCacheNumSize
+ ", transactionEventSize="
+ transactionEventSize
+ ", transactionExpireTime="
+ transactionExpireTime
+ ", properties="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ public static DataType apply(String type) {
case "NCHAR":
case "NVARCHAR2":
case "LONG":
case "RAW":
case "LONG RAW":
case "BLOB":
case "CLOB":
case "NCLOB":
case "INTERVAL YEAR":
case "INTERVAL DAY":
return DataTypes.STRING();
case "RAW":
case "LONG RAW":
return DataTypes.BYTES();
case "INT":
case "INTEGER":
return DataTypes.INT();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ protected void checkFormat() {
sb.append("fetchSize must bigger than 0;\n");
}

if (config.getTransactionEventSize() <= 1) {
sb.append("transactionEventSize must bigger than 1;\n");
}

if (config.getTransactionCacheNumSize() <= 1) {
sb.append("transactionCacheNumSize must bigger than 1;\n");
}

if (config.getPavingData() && config.isSplit()) {
throw new IllegalArgumentException("can't use pavingData and split at the same time");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -339,32 +340,41 @@ public boolean queryData(String logMinerSelectSql) {
}

/** 根据rollback的信息 找出对应的dml语句 */
public void queryDataForDeleteRollback(RecordLog recordLog, String sql) {
public void queryDataForDeleteRollback(
RecordLog recordLog,
BigInteger startScn,
BigInteger endScn,
BigInteger earliestEndScn,
String sql) {
try {
this.CURRENT_STATE.set(STATE.LOADING);
closeStmt();
logMinerSelectStmt =
connection.prepareStatement(
sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
configStatement(logMinerSelectStmt);

logMinerSelectStmt.setFetchSize(logMinerConfig.getFetchSize());
logMinerSelectStmt.setString(1, recordLog.getScn().toString());
logMinerSelectStmt.setString(2, recordLog.getRowId());
logMinerSelectStmt.setString(3, recordLog.getXidUsn());
logMinerSelectStmt.setString(4, recordLog.getXidSlt());
logMinerSelectStmt.setString(5, recordLog.getXidSqn());
logMinerSelectStmt.setString(6, recordLog.getTableName());
logMinerSelectStmt.setInt(7, 0);
logMinerSelectStmt.setInt(8, 1);
logMinerSelectStmt.setInt(9, 3);
logMinerSelectStmt.setString(10, recordLog.getRowId());
logMinerSelectStmt.setString(11, recordLog.getXidUsn());
logMinerSelectStmt.setString(12, recordLog.getXidSlt());
logMinerSelectStmt.setString(13, recordLog.getXidSqn());
logMinerSelectStmt.setString(14, recordLog.getScn().toString());
logMinerSelectStmt.setString(1, recordLog.getXidUsn());
logMinerSelectStmt.setString(2, recordLog.getXidSlt());
logMinerSelectStmt.setString(3, recordLog.getXidSqn());
logMinerSelectStmt.setString(4, recordLog.getTableName());
logMinerSelectStmt.setInt(5, 0);
logMinerSelectStmt.setInt(6, 1);
logMinerSelectStmt.setInt(7, 3);
logMinerSelectStmt.setString(8, String.valueOf(startScn));
logMinerSelectStmt.setString(9, String.valueOf(endScn));
logMinerSelectStmt.setString(10, String.valueOf(earliestEndScn));

long before = System.currentTimeMillis();
logMinerData = logMinerSelectStmt.executeQuery();

long timeConsuming = (System.currentTimeMillis() - before) / 1000;
LOG.info(
"queryDataForDeleteRollback, startScn:{},endScn:{},timeConsuming {}",
startScn,
endScn,
timeConsuming);
this.CURRENT_STATE.set(STATE.READABLE);
} catch (SQLException e) {
String message =
String.format(
Expand Down Expand Up @@ -624,9 +634,10 @@ protected Pair<BigInteger, Boolean> getEndScn(
}

LOG.info(
"getEndScn success,startScn:{},endScn:{}, loadRedoLog:{}",
"getEndScn success,startScn:{},endScn:{}, addRedoLog:{}, loadRedoLog:{}",
startScn,
endScn,
addRedoLog,
loadRedoLog);
return Pair.of(endScn, loadRedoLog);
}
Expand Down Expand Up @@ -655,6 +666,11 @@ private List<LogFile> queryAddedLogFiles() throws SQLException {
}

public boolean hasNext() throws SQLException, UnsupportedEncodingException, DecoderException {
return hasNext(null, null);
}

public boolean hasNext(BigInteger endScn, String endRowid)
throws SQLException, UnsupportedEncodingException, DecoderException {
if (null == logMinerData
|| logMinerData.isClosed()
|| this.CURRENT_STATE.get().equals(STATE.READEND)) {
Expand Down Expand Up @@ -690,12 +706,22 @@ public boolean hasNext() throws SQLException, UnsupportedEncodingException, Deco
String rowId = logMinerData.getString(KEY_ROW_ID);
boolean rollback = logMinerData.getBoolean(KEY_ROLLBACK);

// 操作类型为commit,清理缓存
if (operationCode == 7) {
// 操作类型为commit / rollback,清理缓存
// refer to
// https://docs.oracle.com/cd/B19306_01/server.102/b14237/dynviews_1154.htm#REFRN30132
if (operationCode == 7 || operationCode == 36) {
transactionManager.cleanCache(xidUsn, xidSLt, xidSqn);
continue;
}

if (endScn != null && rowId != null) {
if (scn.compareTo(endScn) > 0) {
return false;
}
if (scn.compareTo(endScn) == 0 && rowId.equals(endRowid)) {
return false;
}
}
// 用CSF来判断一条sql是在当前这一行结束,sql超过4000 字节,会处理成多行
boolean isSqlNotEnd = logMinerData.getBoolean(KEY_CSF);
// 是否存在多条SQL
Expand Down Expand Up @@ -739,10 +765,19 @@ public boolean hasNext() throws SQLException, UnsupportedEncodingException, Deco

// 从缓存里查找rollback对应的DML语句
RecordLog recordLog =
transactionManager.queryUndoLogFromCache(
xidUsn, xidSLt, xidSqn, rowId, scn);
transactionManager.queryUndoLogFromCache(xidUsn, xidSLt, xidSqn);

if (Objects.isNull(recordLog)) {

Pair<BigInteger, String> earliestRollbackOperation =
transactionManager.getEarliestRollbackOperation(xidUsn, xidSLt, xidSqn);
BigInteger earliestScn = scn;
String earliestRowid = rowId;
if (earliestRollbackOperation != null) {
earliestScn = earliestRollbackOperation.getLeft();
earliestRowid = earliestRollbackOperation.getRight();
}

// 如果DML语句不在缓存 或者 和rollback不再同一个日志文件里 会递归从日志文件里查找
recordLog =
recursionQueryDataForRollback(
Expand All @@ -756,7 +791,9 @@ public boolean hasNext() throws SQLException, UnsupportedEncodingException, Deco
rowId,
operationCode,
false,
logMinerData.getString(KEY_TABLE_NAME)));
logMinerData.getString(KEY_TABLE_NAME)),
earliestScn,
earliestRowid);
}

if (Objects.nonNull(recordLog)) {
Expand Down Expand Up @@ -792,18 +829,6 @@ public boolean hasNext() throws SQLException, UnsupportedEncodingException, Deco
String redo = sqlRedo.toString();
String hexStr = new String(Hex.encodeHex(redo.getBytes("GBK")));
boolean hasChange = false;

// delete 条件不以'结尾 如 where id = '1 以?结尾的需要加上'
if (operationCode == 2 && hexStr.endsWith("3f")) {
LOG.info(
"current scn is: {},\noriginal redo sql is: {},\nhex redo string is: {}",
scn,
redo,
hexStr);
hexStr = hexStr + "27";
hasChange = true;
}

if (operationCode == 1 && hexStr.contains("3f2c")) {
LOG.info(
"current scn is: {},\noriginal redo sql is: {},\nhex redo string is: {}",
Expand Down Expand Up @@ -1018,7 +1043,8 @@ private void closeStmt(Statement statement) {
* @param rollbackRecord rollback参数
* @return insert语句
*/
public RecordLog recursionQueryDataForRollback(RecordLog rollbackRecord)
public RecordLog recursionQueryDataForRollback(
RecordLog rollbackRecord, BigInteger earliestEndScn, String earliestRowid)
throws SQLException, UnsupportedEncodingException, DecoderException {
if (Objects.isNull(queryDataForRollbackConnection)) {
queryDataForRollbackConnection =
Expand All @@ -1031,49 +1057,49 @@ public RecordLog recursionQueryDataForRollback(RecordLog rollbackRecord)
queryDataForRollbackConnection.connect();
}

// 查找出当前加载归档日志文件里的最小scn 递归查找此scn之前的文件
List<LogFile> logFiles =
queryAddedLogFiles().stream()
.filter(
i ->
i.getStatus() != 4
&& i.getType().equalsIgnoreCase(LOG_TYPE_ARCHIVED))
.collect(Collectors.toList());

// 默认每次往前查询4000个scn
BigInteger step = new BigInteger("4000");
if (CollectionUtils.isNotEmpty(logFiles)) {
// nextChange-firstChange 为一个文件包含多少的scn,将其*2 代表加载此scn之前2个文件
step =
logFiles.get(0)
.getNextChange()
.subtract(logFiles.get(0).getFirstChange())
.multiply(new BigInteger("2"));
}

BigInteger startScn = rollbackRecord.getScn().subtract(step);
BigInteger endScn = rollbackRecord.getScn();

for (int i = 0; i < 2; i++) {
queryDataForRollbackConnection.startOrUpdateLogMiner(startScn, endScn);
BigInteger endScn = earliestEndScn;
BigInteger minScn = getMinScn();
for (int i = 0; ; i++) {
BigInteger startScn =
endScn.subtract(new BigInteger("5000"))
.subtract(new BigInteger((2000 * i) + ""));
if (startScn.compareTo(minScn) <= 0) {
startScn = minScn;
}
LOG.info(
"queryDataForRollbackConnection startScn{}, endScn{}, earliestEndScn:{}, rowid:{},table:{}",
startScn,
endScn,
earliestEndScn,
earliestRowid,
rollbackRecord.getTableName());
queryDataForRollbackConnection.startOrUpdateLogMiner(
startScn, endScn.add(BigInteger.ONE));
queryDataForRollbackConnection.queryDataForDeleteRollback(
rollbackRecord, SqlUtil.queryDataForRollback);
rollbackRecord, startScn, endScn, earliestEndScn, SqlUtil.queryDataForRollback);
// while循环查找所有数据 并都加载到缓存里去
while (queryDataForRollbackConnection.hasNext()) {}
while (queryDataForRollbackConnection.hasNext(earliestEndScn, earliestRowid)) {}
// 从缓存里取
RecordLog dmlLog =
transactionManager.queryUndoLogFromCache(
rollbackRecord.getXidUsn(),
rollbackRecord.getXidSlt(),
rollbackRecord.getXidSqn(),
rollbackRecord.getRowId(),
rollbackRecord.getScn());
rollbackRecord.getXidSqn());
if (Objects.nonNull(dmlLog)) {
return dmlLog;
}
endScn = startScn;
startScn = startScn.subtract(step);
if (startScn.compareTo(minScn) <= 0) {
LOG.warn(
"select all file but not found log for rollback data, xidUsn {},xidSlt {},xidSqn {},scn {}",
rollbackRecord.getXidUsn(),
rollbackRecord.getXidSlt(),
rollbackRecord.getXidSqn(),
rollbackRecord.getScn());
break;
}
}

return null;
}

Expand Down Expand Up @@ -1166,7 +1192,10 @@ protected Connection getConnection() {
if (Objects.nonNull(logMinerConfig.getProperties())) {
logMinerConfig.getProperties().forEach(info::put);
}
LOG.info("connection properties is {}", info);
Properties printProperties = new Properties();
printProperties.putAll(info);
printProperties.put("password", "******");
LOG.info("connection properties is {}", printProperties);
return RetryUtil.executeWithRetry(
() -> DriverManager.getConnection(logMinerConfig.getJdbcUrl(), info),
RETRY_TIMES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public LogMinerHelper(
this.transactionManager =
new TransactionManager(
logMinerConfig.getTransactionCacheNumSize(),
logMinerConfig.getTransactionEventSize(),
logMinerConfig.getTransactionExpireTime());
this.startScn = startScn;
this.endScn = startScn;
Expand Down
Loading