diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/conf/LogMinerConf.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/conf/LogMinerConf.java index 881162ac8a..7d063418a4 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/conf/LogMinerConf.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/conf/LogMinerConf.java @@ -88,11 +88,17 @@ public class LogMinerConf extends ChunJunCommonConf { /** 缓存的日志数 * */ private long transactionCacheNumSize = 1000; + /** 每个事务缓存的事件总数 * */ + private long transactionEventSize = 5000; + private Properties properties; /** 缓存的日志时间 * */ private long transactionExpireTime = 20; + /** 是否开启全量同步 * */ + private boolean enableFetchAll = false; + public boolean getSupportAutoAddLog() { return supportAutoAddLog; } @@ -277,6 +283,14 @@ public void setProperties(Properties properties) { this.properties = properties; } + public long getTransactionEventSize() { + return transactionEventSize; + } + + public void setTransactionEventSize(long transactionEventSize) { + this.transactionEventSize = transactionEventSize; + } + public boolean isDdlSkip() { return ddlSkip; } @@ -293,6 +307,14 @@ public void setInitialTableStructure(boolean initialTableStructure) { this.initialTableStructure = initialTableStructure; } + public void setEnableFetchAll(boolean isFetchAll) { + this.enableFetchAll = isFetchAll; + } + + public boolean getEnableFetchAll() { + return this.enableFetchAll; + } + @Override public String toString() { return "LogMinerConf{" @@ -351,6 +373,8 @@ public String toString() { + retryTimes + ", transactionCacheNumSize=" + transactionCacheNumSize + + ", transactionEventSize=" + + transactionEventSize + ", transactionExpireTime=" + transactionExpireTime + ", properties=" diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeConverter.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeConverter.java index 5b0e412e42..89ff1fcddc 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeConverter.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/converter/OracleRawTypeConverter.java @@ -52,15 +52,14 @@ public static DataType apply(String type) { case "NCHAR": case "NVARCHAR2": case "LONG": + case "RAW": + case "LONG RAW": case "BLOB": case "CLOB": case "NCLOB": case "INTERVAL YEAR": case "INTERVAL DAY": return DataTypes.STRING(); - case "RAW": - case "LONG RAW": - return DataTypes.BYTES(); case "INT": case "INTEGER": return DataTypes.INT(); diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/inputformat/OracleLogMinerInputFormatBuilder.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/inputformat/OracleLogMinerInputFormatBuilder.java index 31c55cc378..8c5a32d3b4 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/inputformat/OracleLogMinerInputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/inputformat/OracleLogMinerInputFormatBuilder.java @@ -92,6 +92,14 @@ protected void checkFormat() { sb.append("fetchSize must bigger than 0;\n"); } + if (config.getTransactionEventSize() <= 1) { + sb.append("transactionEventSize must bigger than 1;\n"); + } + + if (config.getTransactionCacheNumSize() <= 1) { + sb.append("transactionCacheNumSize must bigger than 1;\n"); + } + if (config.getPavingData() && config.isSplit()) { throw new IllegalArgumentException("can't use pavingData and split at the same time"); } diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerConnection.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerConnection.java index 463104fc00..f6dbcb0538 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerConnection.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerConnection.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -339,8 +340,14 @@ public boolean queryData(String logMinerSelectSql) { } /** 根据rollback的信息 找出对应的dml语句 */ - public void queryDataForDeleteRollback(RecordLog recordLog, String sql) { + public void queryDataForDeleteRollback( + RecordLog recordLog, + BigInteger startScn, + BigInteger endScn, + BigInteger earliestEndScn, + String sql) { try { + this.CURRENT_STATE.set(STATE.LOADING); closeStmt(); logMinerSelectStmt = connection.prepareStatement( @@ -348,23 +355,26 @@ public void queryDataForDeleteRollback(RecordLog recordLog, String sql) { configStatement(logMinerSelectStmt); logMinerSelectStmt.setFetchSize(logMinerConfig.getFetchSize()); - logMinerSelectStmt.setString(1, recordLog.getScn().toString()); - logMinerSelectStmt.setString(2, recordLog.getRowId()); - logMinerSelectStmt.setString(3, recordLog.getXidUsn()); - logMinerSelectStmt.setString(4, recordLog.getXidSlt()); - logMinerSelectStmt.setString(5, recordLog.getXidSqn()); - logMinerSelectStmt.setString(6, recordLog.getTableName()); - logMinerSelectStmt.setInt(7, 0); - logMinerSelectStmt.setInt(8, 1); - logMinerSelectStmt.setInt(9, 3); - logMinerSelectStmt.setString(10, recordLog.getRowId()); - logMinerSelectStmt.setString(11, recordLog.getXidUsn()); - logMinerSelectStmt.setString(12, recordLog.getXidSlt()); - logMinerSelectStmt.setString(13, recordLog.getXidSqn()); - logMinerSelectStmt.setString(14, recordLog.getScn().toString()); + logMinerSelectStmt.setString(1, recordLog.getXidUsn()); + logMinerSelectStmt.setString(2, recordLog.getXidSlt()); + logMinerSelectStmt.setString(3, recordLog.getXidSqn()); + logMinerSelectStmt.setString(4, recordLog.getTableName()); + logMinerSelectStmt.setInt(5, 0); + logMinerSelectStmt.setInt(6, 1); + logMinerSelectStmt.setInt(7, 3); + logMinerSelectStmt.setString(8, String.valueOf(startScn)); + logMinerSelectStmt.setString(9, String.valueOf(endScn)); + logMinerSelectStmt.setString(10, String.valueOf(earliestEndScn)); + long before = System.currentTimeMillis(); logMinerData = logMinerSelectStmt.executeQuery(); - + long timeConsuming = (System.currentTimeMillis() - before) / 1000; + LOG.info( + "queryDataForDeleteRollback, startScn:{},endScn:{},timeConsuming {}", + startScn, + endScn, + timeConsuming); + this.CURRENT_STATE.set(STATE.READABLE); } catch (SQLException e) { String message = String.format( @@ -624,9 +634,10 @@ protected Pair getEndScn( } LOG.info( - "getEndScn success,startScn:{},endScn:{}, loadRedoLog:{}", + "getEndScn success,startScn:{},endScn:{}, addRedoLog:{}, loadRedoLog:{}", startScn, endScn, + addRedoLog, loadRedoLog); return Pair.of(endScn, loadRedoLog); } @@ -655,6 +666,11 @@ private List queryAddedLogFiles() throws SQLException { } public boolean hasNext() throws SQLException, UnsupportedEncodingException, DecoderException { + return hasNext(null, null); + } + + public boolean hasNext(BigInteger endScn, String endRowid) + throws SQLException, UnsupportedEncodingException, DecoderException { if (null == logMinerData || logMinerData.isClosed() || this.CURRENT_STATE.get().equals(STATE.READEND)) { @@ -690,12 +706,22 @@ public boolean hasNext() throws SQLException, UnsupportedEncodingException, Deco String rowId = logMinerData.getString(KEY_ROW_ID); boolean rollback = logMinerData.getBoolean(KEY_ROLLBACK); - // 操作类型为commit,清理缓存 - if (operationCode == 7) { + // 操作类型为commit / rollback,清理缓存 + // refer to + // https://docs.oracle.com/cd/B19306_01/server.102/b14237/dynviews_1154.htm#REFRN30132 + if (operationCode == 7 || operationCode == 36) { transactionManager.cleanCache(xidUsn, xidSLt, xidSqn); continue; } + if (endScn != null && rowId != null) { + if (scn.compareTo(endScn) > 0) { + return false; + } + if (scn.compareTo(endScn) == 0 && rowId.equals(endRowid)) { + return false; + } + } // 用CSF来判断一条sql是在当前这一行结束,sql超过4000 字节,会处理成多行 boolean isSqlNotEnd = logMinerData.getBoolean(KEY_CSF); // 是否存在多条SQL @@ -739,10 +765,19 @@ public boolean hasNext() throws SQLException, UnsupportedEncodingException, Deco // 从缓存里查找rollback对应的DML语句 RecordLog recordLog = - transactionManager.queryUndoLogFromCache( - xidUsn, xidSLt, xidSqn, rowId, scn); + transactionManager.queryUndoLogFromCache(xidUsn, xidSLt, xidSqn); if (Objects.isNull(recordLog)) { + + Pair earliestRollbackOperation = + transactionManager.getEarliestRollbackOperation(xidUsn, xidSLt, xidSqn); + BigInteger earliestScn = scn; + String earliestRowid = rowId; + if (earliestRollbackOperation != null) { + earliestScn = earliestRollbackOperation.getLeft(); + earliestRowid = earliestRollbackOperation.getRight(); + } + // 如果DML语句不在缓存 或者 和rollback不再同一个日志文件里 会递归从日志文件里查找 recordLog = recursionQueryDataForRollback( @@ -756,7 +791,9 @@ public boolean hasNext() throws SQLException, UnsupportedEncodingException, Deco rowId, operationCode, false, - logMinerData.getString(KEY_TABLE_NAME))); + logMinerData.getString(KEY_TABLE_NAME)), + earliestScn, + earliestRowid); } if (Objects.nonNull(recordLog)) { @@ -792,18 +829,6 @@ public boolean hasNext() throws SQLException, UnsupportedEncodingException, Deco String redo = sqlRedo.toString(); String hexStr = new String(Hex.encodeHex(redo.getBytes("GBK"))); boolean hasChange = false; - - // delete 条件不以'结尾 如 where id = '1 以?结尾的需要加上' - if (operationCode == 2 && hexStr.endsWith("3f")) { - LOG.info( - "current scn is: {},\noriginal redo sql is: {},\nhex redo string is: {}", - scn, - redo, - hexStr); - hexStr = hexStr + "27"; - hasChange = true; - } - if (operationCode == 1 && hexStr.contains("3f2c")) { LOG.info( "current scn is: {},\noriginal redo sql is: {},\nhex redo string is: {}", @@ -1018,7 +1043,8 @@ private void closeStmt(Statement statement) { * @param rollbackRecord rollback参数 * @return insert语句 */ - public RecordLog recursionQueryDataForRollback(RecordLog rollbackRecord) + public RecordLog recursionQueryDataForRollback( + RecordLog rollbackRecord, BigInteger earliestEndScn, String earliestRowid) throws SQLException, UnsupportedEncodingException, DecoderException { if (Objects.isNull(queryDataForRollbackConnection)) { queryDataForRollbackConnection = @@ -1031,49 +1057,49 @@ public RecordLog recursionQueryDataForRollback(RecordLog rollbackRecord) queryDataForRollbackConnection.connect(); } - // 查找出当前加载归档日志文件里的最小scn 递归查找此scn之前的文件 - List logFiles = - queryAddedLogFiles().stream() - .filter( - i -> - i.getStatus() != 4 - && i.getType().equalsIgnoreCase(LOG_TYPE_ARCHIVED)) - .collect(Collectors.toList()); - - // 默认每次往前查询4000个scn - BigInteger step = new BigInteger("4000"); - if (CollectionUtils.isNotEmpty(logFiles)) { - // nextChange-firstChange 为一个文件包含多少的scn,将其*2 代表加载此scn之前2个文件 - step = - logFiles.get(0) - .getNextChange() - .subtract(logFiles.get(0).getFirstChange()) - .multiply(new BigInteger("2")); - } - - BigInteger startScn = rollbackRecord.getScn().subtract(step); - BigInteger endScn = rollbackRecord.getScn(); - - for (int i = 0; i < 2; i++) { - queryDataForRollbackConnection.startOrUpdateLogMiner(startScn, endScn); + BigInteger endScn = earliestEndScn; + BigInteger minScn = getMinScn(); + for (int i = 0; ; i++) { + BigInteger startScn = + endScn.subtract(new BigInteger("5000")) + .subtract(new BigInteger((2000 * i) + "")); + if (startScn.compareTo(minScn) <= 0) { + startScn = minScn; + } + LOG.info( + "queryDataForRollbackConnection startScn{}, endScn{}, earliestEndScn:{}, rowid:{},table:{}", + startScn, + endScn, + earliestEndScn, + earliestRowid, + rollbackRecord.getTableName()); + queryDataForRollbackConnection.startOrUpdateLogMiner( + startScn, endScn.add(BigInteger.ONE)); queryDataForRollbackConnection.queryDataForDeleteRollback( - rollbackRecord, SqlUtil.queryDataForRollback); + rollbackRecord, startScn, endScn, earliestEndScn, SqlUtil.queryDataForRollback); // while循环查找所有数据 并都加载到缓存里去 - while (queryDataForRollbackConnection.hasNext()) {} + while (queryDataForRollbackConnection.hasNext(earliestEndScn, earliestRowid)) {} // 从缓存里取 RecordLog dmlLog = transactionManager.queryUndoLogFromCache( rollbackRecord.getXidUsn(), rollbackRecord.getXidSlt(), - rollbackRecord.getXidSqn(), - rollbackRecord.getRowId(), - rollbackRecord.getScn()); + rollbackRecord.getXidSqn()); if (Objects.nonNull(dmlLog)) { return dmlLog; } endScn = startScn; - startScn = startScn.subtract(step); + if (startScn.compareTo(minScn) <= 0) { + LOG.warn( + "select all file but not found log for rollback data, xidUsn {},xidSlt {},xidSqn {},scn {}", + rollbackRecord.getXidUsn(), + rollbackRecord.getXidSlt(), + rollbackRecord.getXidSqn(), + rollbackRecord.getScn()); + break; + } } + return null; } @@ -1166,7 +1192,10 @@ protected Connection getConnection() { if (Objects.nonNull(logMinerConfig.getProperties())) { logMinerConfig.getProperties().forEach(info::put); } - LOG.info("connection properties is {}", info); + Properties printProperties = new Properties(); + printProperties.putAll(info); + printProperties.put("password", "******"); + LOG.info("connection properties is {}", printProperties); return RetryUtil.executeWithRetry( () -> DriverManager.getConnection(logMinerConfig.getJdbcUrl(), info), RETRY_TIMES, diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerHelper.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerHelper.java index 26fb552f05..bd122a8afb 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerHelper.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerHelper.java @@ -77,6 +77,7 @@ public LogMinerHelper( this.transactionManager = new TransactionManager( logMinerConfig.getTransactionCacheNumSize(), + logMinerConfig.getTransactionEventSize(), logMinerConfig.getTransactionExpireTime()); this.startScn = startScn; this.endScn = startScn; diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java index f61cd22688..0ce7479cb5 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogMinerListener.java @@ -28,7 +28,9 @@ import com.dtstack.chunjun.connector.oraclelogminer.util.OraUtil; import com.dtstack.chunjun.connector.oraclelogminer.util.SqlUtil; import com.dtstack.chunjun.converter.AbstractCDCRowConverter; +import com.dtstack.chunjun.element.ColumnRowData; import com.dtstack.chunjun.element.ErrorMsgRowData; +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; import com.dtstack.chunjun.util.ExceptionUtil; import com.dtstack.chunjun.util.RetryUtil; @@ -49,6 +51,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -75,6 +78,7 @@ * @date 2020/3/27 */ public class LogMinerListener implements Runnable { + private final BigInteger MINUS_ONE = new BigInteger("-1"); public static Logger LOG = LoggerFactory.getLogger(LogMinerListener.class); private final LogMinerConf logMinerConf; @@ -115,16 +119,11 @@ public void init() { namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); - logParser = new LogParser(); + logParser = new LogParser(logMinerConf); } public void start() { - BigInteger startScn = logMinerHelper.getStartScn(positionManager.getPosition()); - - positionManager.updatePosition(startScn); - logMinerHelper.setStartScn(startScn); - logMinerHelper.init(); - + BigInteger startScn; Connection connection = RetryUtil.executeWithRetry( () -> @@ -135,13 +134,28 @@ public void start() { RETRY_TIMES, SLEEP_TIME, false); + if (logMinerConf.getEnableFetchAll()) { + BigInteger cacheScn = positionManager.getPosition(); + if (null != cacheScn && cacheScn.compareTo(BigInteger.valueOf(-1)) != 0) { + startScn = cacheScn; + } else { + startScn = oracleFullSyncOperation(connection); + } + } else { + startScn = logMinerHelper.getStartScn(positionManager.getPosition()); + } + + positionManager.updatePosition(startScn); + logMinerHelper.setStartScn(startScn); + logMinerHelper.init(); + // LogMinerColumnConverter 需要connection获取元数据 if (rowConverter instanceof LogMinerColumnConverter) { ((LogMinerColumnConverter) rowConverter).setConnection(connection); } // 初始化 - if (logMinerConf.isInitialTableStructure()) { + if (logMinerConf.isInitialTableStructure() && !logMinerConf.getEnableFetchAll()) { initialTableStruct(connection); } @@ -211,6 +225,11 @@ public void stop() { } private void processData(QueueData log) throws Exception { + if (log.getData() instanceof DdlRowData) { + rowConverter.clearConverterCache(); + queue.put((new QueueData(log.getScn(), log.getData()))); + return; + } LinkedList rowDatalist = logParser.parse(log, rowConverter); RowData rowData; try { @@ -249,7 +268,9 @@ public RowData getData() { } rowData = null; } else { - positionManager.updatePosition(poll.getScn()); + if (poll.getScn().compareTo(MINUS_ONE) != 0) { + positionManager.updatePosition(poll.getScn()); + } failedTimes = 0; } } @@ -259,10 +280,6 @@ public RowData getData() { return rowData; } - public BigInteger getCurrentPosition() { - return positionManager.getPosition(); - } - private void initialTableStruct(Connection connection) { if (CollectionUtils.isNotEmpty(logMinerConf.getTable())) { Set schemas = new HashSet<>(); @@ -448,6 +465,10 @@ private void initialTableStruct(Connection connection) { } } + public BigInteger getCurrentPosition() { + return positionManager.getPosition(); + } + private List readPrimaryKeyNames(DatabaseMetaData metadata, String schema, String table) throws SQLException { final List pkColumnNames = new ArrayList<>(); @@ -459,4 +480,134 @@ private List readPrimaryKeyNames(DatabaseMetaData metadata, String schem } return pkColumnNames; } + + /** + * lock table get current scn generate create table ddl release lock generate insert event by + * scn + */ + public BigInteger oracleFullSyncOperation(Connection connection) { + try { + if (logMinerConf.getTable().size() != 1) { + throw new ChunJunRuntimeException( + "oracle logminer full sync, only support one table"); + } + + String tbnWithSchema = logMinerConf.getTable().get(0); + BigInteger scn = getLockTableScn(connection, tbnWithSchema); + + /* select data by scn, to columnRowData */ + queryDataByScnToColumnRowData(connection, tbnWithSchema, scn); + + return scn; + } catch (Exception e) { + throw new ChunJunRuntimeException(e); + } + } + + private BigInteger getLockTableScn(Connection conn, String tbnWithSchema) { + try { + Statement stmt = conn.createStatement(); + String lockTableSql = SqlUtil.formatLockTableWithRowShare(tbnWithSchema); + + /* lock table */ + stmt.execute(lockTableSql); + + /* get current scn */ + ResultSet rs = stmt.executeQuery(SqlUtil.SQL_GET_CURRENT_SCN); + + String scn = null; + if (rs.next()) { + scn = rs.getString("CURRENT_SCN"); + } else { + throw new ChunJunRuntimeException( + String.format("can't get scn of [%s]", tbnWithSchema)); + } + + /* generate create table ddl */ + initialTableStruct(conn); + + /* release lock */ + stmt.execute(SqlUtil.releaseTableLock()); + + if (stmt != null) { + stmt.close(); + } + return new BigInteger(scn); + } catch (Exception e) { + throw new ChunJunRuntimeException(e); + } + } + + private void queryDataByScnToColumnRowData( + Connection conn, String tbnWithSchema, BigInteger scn) { + try { + List columnInfos = getColumnInfoByTable(conn, tbnWithSchema); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(SqlUtil.queryDataByScn(tbnWithSchema, scn)); + List columnRowDatas = + SqlUtil.jdbcColumnRowColumnConvert(columnInfos, rs); + + for (ColumnRowData rowData : columnRowDatas) { + queue.put(new QueueData(new BigInteger("-1"), rowData)); + } + } catch (Exception e) { + throw new ChunJunRuntimeException(e); + } + } + + private List getColumnInfoByTable(Connection conn, String tbnWithSchema) { + String schema = tbnWithSchema.substring(0, tbnWithSchema.indexOf('.')); + String tbn = tbnWithSchema.substring(tbnWithSchema.indexOf('.') + 1); + List columnInfos = new ArrayList<>(); + + try { + Statement stmt = conn.createStatement(); + ResultSet resultSet = stmt.executeQuery(SqlUtil.formatGetTableInfoSql(schema, tbn)); + + while (resultSet.next()) { + String columnName = resultSet.getString(3); + String dataType = resultSet.getString(4); + Object dataPrecision = resultSet.getObject(5); + Object charLength = resultSet.getObject(6); + Object dataLength = resultSet.getObject(7); + Object dataScale = resultSet.getObject(8); + Object defaultValue = resultSet.getObject(9); + String nullable = resultSet.getString(10); + String comment = resultSet.getString(11); + + /* isPk is not useful, set false */ + boolean isPk = false; + + ColumnInfo columnInfo = + new ColumnInfo( + columnName, + dataType, + Objects.isNull(dataPrecision) + ? null + : Integer.valueOf(dataPrecision.toString()), + Objects.isNull(charLength) + ? null + : Integer.valueOf(charLength.toString()), + Objects.isNull(dataLength) + ? null + : Integer.valueOf(dataLength.toString()), + Objects.isNull(dataScale) + ? null + : Integer.valueOf(dataScale.toString()), + Objects.isNull(defaultValue) ? null : defaultValue.toString(), + "Y".equalsIgnoreCase(nullable), + comment, + isPk); + + columnInfos.add(columnInfo); + } + + if (stmt != null) { + stmt.close(); + } + return columnInfos; + } catch (Exception e) { + throw new ChunJunRuntimeException(e); + } + } } diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogParser.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogParser.java index 717d98c3c7..ecad8ed746 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogParser.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/LogParser.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.oraclelogminer.listener; +import com.dtstack.chunjun.connector.oraclelogminer.conf.LogMinerConf; import com.dtstack.chunjun.connector.oraclelogminer.entity.EventRow; import com.dtstack.chunjun.connector.oraclelogminer.entity.EventRowData; import com.dtstack.chunjun.connector.oraclelogminer.entity.QueueData; @@ -64,6 +65,12 @@ public class LogParser { public static SnowflakeIdWorker idWorker = new SnowflakeIdWorker(1, 1); + private final LogMinerConf config; + + public LogParser(LogMinerConf config) { + this.config = config; + } + private static String cleanString(String str) { if ("NULL".equalsIgnoreCase(str)) { return null; @@ -196,7 +203,6 @@ public static String parseTime(String value) { if (value.startsWith("TO_TIMESTAMP_TZ('")) { return value.substring(17, value.length() - 2); } - return value; } diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/TransactionManager.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/TransactionManager.java index cc06d61fc5..00820b919b 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/TransactionManager.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/listener/TransactionManager.java @@ -22,17 +22,16 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigInteger; -import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedList; -import java.util.List; +import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.TimeUnit; /** @@ -45,26 +44,25 @@ public class TransactionManager { public static Logger LOG = LoggerFactory.getLogger(TransactionManager.class); - /** 缓存的结构为 xidUsn+xidSLt+xidSqn(事务id)+rowId,Transaction* */ - private final Cache> recordCache; + /** 缓存的结构为 xidUsn+xidSLt+xidSqn(事务id),当前事务内数据 */ + private final Cache> recordCache; /** - * recordCache的二级缓存,缓存二级key,结构为: xidUsn+xidSLt+xidSqn(事务id), xidUsn+xidSLt+xidSqn(事务id)+rowId列表 - * * + * 缓存的结构为 xidUsn+xidSLt+xidSqn(事务id),最后一次处理回滚数据对应的业务操作scn以及rowid + * 回滚事务id以及最新回滚数据对应的业务操作scn和rowid的对应关系 */ - private final Cache> secondKeyCache; + private final Map> earliestResolveOperateForRollback; - public TransactionManager(Long transactionSize, long transactionExpireTime) { + private final Long eventSize; + + public TransactionManager(Long transactionSize, Long eventSize, long transactionExpireTime) { this.recordCache = CacheBuilder.newBuilder() .maximumSize(transactionSize) .expireAfterWrite(transactionExpireTime, TimeUnit.MINUTES) .build(); - this.secondKeyCache = - CacheBuilder.newBuilder() - .maximumSize(transactionSize) - .expireAfterWrite(transactionExpireTime, TimeUnit.MINUTES) - .build(); + this.eventSize = eventSize; + this.earliestResolveOperateForRollback = new HashMap<>(); } /** @@ -77,72 +75,33 @@ public void putCache(RecordLog recordLog) { if (recordLog.getOperationCode() == 2) { return; } - - recordLog.setSqlUndo(recordLog.getSqlUndo().replace("IS NULL", "= NULL")); - recordLog.setSqlRedo(recordLog.getSqlRedo().replace("IS NULL", "= NULL")); - - String key = - recordLog.getXidUsn() - + recordLog.getXidSlt() - + recordLog.getXidSqn() - + recordLog.getRowId(); - List transactionList = recordCache.getIfPresent(key); - if (CollectionUtils.isEmpty(transactionList)) { - List data = new ArrayList<>(32); + String key = recordLog.getXidUsn() + recordLog.getXidSlt() + recordLog.getXidSqn(); + LinkedList recordList = recordCache.getIfPresent(key); + if (Objects.isNull(recordList)) { + LinkedList data = new LinkedList(); recordCache.put(key, data); - transactionList = data; - } - Optional transaction = - transactionList.stream() - .filter(i -> i.getScn().compareTo(recordLog.getScn()) == 0) - .findFirst(); - - if (!transaction.isPresent()) { - transactionList.add(new Transaction(recordLog.getScn(), Lists.newArrayList(recordLog))); - } else { - transaction.get().addRecord(recordLog); + recordList = data; } - String txId = recordLog.getXidUsn() + recordLog.getXidSlt() + recordLog.getXidSqn(); - LinkedList keyList = secondKeyCache.getIfPresent(txId); - if (Objects.isNull(keyList)) { - keyList = new LinkedList<>(); + recordLog.setSqlUndo(recordLog.getSqlUndo().replace("IS NULL", "= NULL")); + recordLog.setSqlRedo(recordLog.getSqlRedo().replace("IS NULL", "= NULL")); + recordList.add(recordLog); + if (recordList.size() > eventSize) { + recordList.removeFirst(); } - keyList.add(key); - LOG.debug( - "add cache,XidSqn = {}, RowId = {}, recordLog = {}", - recordLog.getXidSqn(), - recordLog.getRowId(), - recordLog); - secondKeyCache.put(txId, keyList); - LOG.debug( - "after add,recordCache size = {}, secondKeyCache size = {}", - recordCache.size(), - secondKeyCache.size()); } /** 清理已提交事务的缓存 */ public void cleanCache(String xidUsn, String xidSLt, String xidSqn) { String txId = xidUsn + xidSLt + xidSqn; - LinkedList keyList = secondKeyCache.getIfPresent(txId); - if (Objects.isNull(keyList)) { - return; - } - for (String key : keyList) { - LOG.debug("clean recordCache,key = {}", key); - recordCache.invalidate(key); - } LOG.debug( "clean secondKeyCache,xidSqn = {}, xidUsn = {} ,xidSLt = {} ", xidSqn, xidUsn, xidSLt); - secondKeyCache.invalidate(txId); - - LOG.debug( - "after clean,recordCache size = {}, secondKeyCache size = {}", - recordCache.size(), - secondKeyCache.size()); + recordCache.invalidate(txId); + earliestResolveOperateForRollback.remove(xidUsn + xidSLt + xidSqn); + LOG.debug("after clean,current recordCache size = {}", recordCache.size()); } /** @@ -151,41 +110,22 @@ public void cleanCache(String xidUsn, String xidSLt, String xidSqn) { * @param xidUsn * @param xidSlt * @param xidSqn - * @param rowId - * @param scn scn of rollback * @return dml Log */ - public RecordLog queryUndoLogFromCache( - String xidUsn, String xidSlt, String xidSqn, String rowId, BigInteger scn) { - String key = xidUsn + xidSlt + xidSqn + rowId; - List transactionList = recordCache.getIfPresent(key); - if (CollectionUtils.isEmpty(transactionList)) { + public RecordLog queryUndoLogFromCache(String xidUsn, String xidSlt, String xidSqn) { + String key = xidUsn + xidSlt + xidSqn; + LinkedList recordLogs = recordCache.getIfPresent(key); + if (CollectionUtils.isEmpty(recordLogs)) { return null; } - // 根据scn号查找 如果scn号相同 则取此对应的最后DML语句 dml按顺序添加,rollback倒序取对应的语句 - Optional transactionOptional = - transactionList.stream().filter(i -> i.getScn().compareTo(scn) == 0).findFirst(); - // 如果scn相同的DML语句没有 则取同一个事务里rowId相同的最后一个 - Transaction transaction = - transactionOptional.orElse(transactionList.get(transactionList.size() - 1)); - RecordLog recordLog = null; - - if (!transaction.isEmpty()) { - recordLog = transaction.getLast(); - transaction.removeLast(); - LOG.info("query a insert sql for rollback in cache,rollback scn is {}", scn); - } - - if (transaction.isEmpty()) { - transactionList.remove(transaction); - } - - if (transactionList.isEmpty()) { - recordCache.invalidate(key); - } - - secondKeyCache.invalidate(xidUsn + xidSlt + xidSqn); - + RecordLog recordLog = recordLogs.removeLast(); + earliestResolveOperateForRollback.put( + key, Pair.of(recordLog.getScn(), recordLog.getRowId())); return recordLog; } + + public Pair getEarliestRollbackOperation( + String xidUsn, String xidSlt, String xidSqn) { + return earliestResolveOperateForRollback.get(xidUsn + xidSlt + xidSqn); + } } diff --git a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/util/SqlUtil.java b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/util/SqlUtil.java index 2fc212c55a..7a3e992c15 100644 --- a/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/util/SqlUtil.java +++ b/chunjun-connectors/chunjun-connector-oraclelogminer/src/main/java/com/dtstack/chunjun/connector/oraclelogminer/util/SqlUtil.java @@ -19,16 +19,31 @@ import com.dtstack.chunjun.connector.oraclelogminer.entity.ColumnInfo; import com.dtstack.chunjun.constants.ConstantValue; +import com.dtstack.chunjun.element.AbstractBaseColumn; +import com.dtstack.chunjun.element.ColumnRowData; +import com.dtstack.chunjun.element.column.BigDecimalColumn; +import com.dtstack.chunjun.element.column.StringColumn; +import com.dtstack.chunjun.element.column.TimestampColumn; +import com.dtstack.chunjun.throwable.ChunJunRuntimeException; +import com.dtstack.chunjun.util.DateUtil; + +import org.apache.flink.types.RowKind; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.ResultSet; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.stream.Collectors; /** @@ -307,6 +322,7 @@ public class SqlUtil { + " and a.TABLE_NAME = c.TABLE_NAME\n" + " and a.COLUMN_NAME = b.COLUMN_NAME\n" + "%s"; + // 查找加载到logminer的日志文件 public static final String SQL_QUERY_ADDED_LOG = "select filename ,thread_id ,low_scn,next_scn,type,filesize,status,type from V$LOGMNR_LOGS "; @@ -319,7 +335,7 @@ public class SqlUtil { public static final String SQL_GET_MAX_SCN_IN_CONTENTS = "select max(scn) as scn from v$logmnr_contents"; public static final String SQL_GET_LOG_FILE_START_POSITION = - "select min(FIRST_CHANGE#) FIRST_CHANGE# from (select FIRST_CHANGE# from v$log union select FIRST_CHANGE# from v$archived_log where standby_dest='NO' and name is not null)"; + "select min(FIRST_CHANGE#) FIRST_CHANGE# from (select FIRST_CHANGE# from v$log l where l.STATUS = 'CURRENT' OR l.STATUS = 'ACTIVE' union select FIRST_CHANGE# from v$archived_log where standby_dest='NO' and name is not null)"; public static final String SQL_GET_LOG_FILE_START_POSITION_BY_TIME = "select min(FIRST_CHANGE#) FIRST_CHANGE# from (select FIRST_CHANGE# from v$log where TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS') between FIRST_TIME and NVL(NEXT_TIME, TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS')) union select FIRST_CHANGE# from v$archived_log where TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS') between FIRST_TIME and NEXT_TIME and standby_dest='NO' and name is not null)"; public static final String SQL_GET_LOG_FILE_START_POSITION_BY_TIME_10 = @@ -353,34 +369,35 @@ public class SqlUtil { private static final List SUPPORTED_OPERATIONS = Arrays.asList("UPDATE", "INSERT", "DELETE"); public static Logger LOG = LoggerFactory.getLogger(SqlUtil.class); - /** - * 查找delete的rollback语句对应的insert语句 存在一个事务里rowid相同的其他语句 - * 所以需要子查询过滤掉scn相同rowid相同的语句(这是一对rollback和DML) - */ + + /** 查找事务里 回滚数据(delete 和 update)对应的操作数据 */ public static String queryDataForRollback = "SELECT\n" - + " scn,\n" + + " scn," + // oracle 10 没有该字段 // " commit_scn,\n" + - " timestamp,\n" - + " operation,\n" - + " operation_code,\n" - + " seg_owner,\n" - + " table_name,\n" - + " sql_redo,\n" - + " sql_undo,\n" - + " xidusn,\n" - + " xidslt,\n" - + " xidsqn,\n" - + " row_id,\n" - + " rollback,\n" + " timestamp," + + " operation," + + " operation_code," + + " seg_owner," + + " table_name," + + " sql_redo," + + " sql_undo," + + " xidusn," + + " xidslt," + + " xidsqn," + + " row_id," + + " rollback," + " csf\n" + "FROM\n" + " v$logmnr_contents a \n" + "where \n" - + "scn <=? and row_id=? and xidusn = ? and xidslt = ? and xidsqn = ? and table_name = ? and rollback =? and OPERATION_CODE in (?,?) \n" - + "and scn not in (select scn from v$logmnr_contents where row_id = ? and xidusn = ? and xidslt = ? and xidsqn = ? and scn !=? group by scn HAVING count(scn) >1 and sum(rollback)>0) \n"; + + " ( xidusn = ? and xidslt = ? and xidsqn = ? and table_name = ? and rollback =? and OPERATION_CODE in (?,?)) " + + "AND \n" + + " (scn >= ? " + + " AND scn < ?) \n" + + " or (scn = ?) \n"; public static List EXCLUDE_SCHEMAS = Collections.singletonList("SYS"); @@ -394,7 +411,6 @@ public class SqlUtil { public static String buildSelectSql( String listenerOptions, boolean ddlSkip, String listenerTables, boolean isCdb) { StringBuilder sqlBuilder = new StringBuilder(SQL_SELECT_DATA); - sqlBuilder.append(" and ( "); if (StringUtils.isNotEmpty(listenerTables)) { sqlBuilder.append(buildSchemaTableFilter(listenerTables, isCdb)); @@ -418,6 +434,7 @@ public static String buildSelectSql( * 构建需要采集操作类型字符串的过滤条件 * * @param listenerOptions 需要采集操作类型字符串 delete,insert,update + * @param ddlSkip 需要采集ddl数据 * @return */ private static String buildOperationFilter(String listenerOptions, boolean ddlSkip) { @@ -570,4 +587,111 @@ public static String formatGetTableInfoSql(List> tables) { } return String.format(SQL_QUERY_TABLE_COLUMN_INFO_TEMPLATE, where); } + + public static String formatGetTableInfoSql(String schema, String table) { + StringBuilder sb = new StringBuilder(); + sb.append(" and "); + sb.append(" a.OWNER = "); + sb.append("'"); + sb.append(schema); + sb.append("'"); + sb.append(" and a.TABLE_NAME = "); + sb.append("'"); + sb.append(table); + sb.append("'"); + String where = sb.toString(); + return String.format(SQL_QUERY_TABLE_COLUMN_INFO_TEMPLATE, where); + } + + public static String formatLockTableWithRowShare(String table) { + StringBuilder sb = new StringBuilder(); + sb.append("lock table "); + sb.append(table); + sb.append(" IN ROW SHARE MODE"); + return sb.toString(); + } + + public static String queryDataByScn(String tableWithSchema, BigInteger scn) { + String sql = "select * from %s as of scn %d"; + return String.format(sql, tableWithSchema, scn); + } + + public static String releaseTableLock() { + return "rollback"; + } + + /* convert resultSet to columnRowData */ + public static List jdbcColumnRowColumnConvert( + List columnInfos, ResultSet rs) { + List columnRowDatas = new ArrayList<>(); + try { + while (rs.next()) { + ColumnRowData columnRowData = new ColumnRowData(RowKind.INSERT, columnInfos.size()); + for (ColumnInfo columnInfo : columnInfos) { + Object value = rs.getObject(columnInfo.getName()); + columnRowData.addHeader(columnInfo.getName()); + columnRowData.addField(convertJdbcType(columnInfo.getType(), value)); + } + columnRowDatas.add(columnRowData); + } + return columnRowDatas; + } catch (Exception e) { + throw new ChunJunRuntimeException(e); + } + } + + private static AbstractBaseColumn convertJdbcType(String tpe, Object val) { + String substring = tpe; + int index = tpe.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); + if (index > 0) { + substring = tpe.substring(0, index); + } + + switch (substring.toUpperCase(Locale.ENGLISH)) { + case "NUMBER": + case "SMALLINT": + case "INT": + case "INTEGER": + case "FLOAT": + case "DECIMAL": + case "NUMERIC": + case "BINARY_FLOAT": + case "BINARY_DOUBLE": + return new BigDecimalColumn((BigDecimal) val); + + case "CHAR": + case "NCHAR": + case "NVARCHAR2": + case "ROWID": + case "VARCHAR2": + case "VARCHAR": + case "LONG": + case "RAW": + case "LONG RAW": + case "INTERVAL YEAR": + case "INTERVAL DAY": + case "BLOB": + case "CLOB": + case "NCLOB": + return new StringColumn((String) val); + + case "DATE": + if (val instanceof Timestamp) { + String formatTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:SS").format(val); + return new TimestampColumn(DateUtil.getTimestampFromStr(formatTime), 0); + } + + return new TimestampColumn(DateUtil.getTimestampFromStr((String) val), 0); + + case "TIMESTAMP": + return new TimestampColumn( + DateUtil.getTimestampFromStr(((oracle.sql.TIMESTAMP) val).stringValue()), + 0); + + case "BFILE": + case "XMLTYPE": + default: + throw new UnsupportedOperationException("Unsupported type:" + tpe); + } + } } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractCDCRowConverter.java b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractCDCRowConverter.java index abc5f9b821..5b10a919f5 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractCDCRowConverter.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractCDCRowConverter.java @@ -155,4 +155,8 @@ protected RowData createRowDataByConverters( } return genericRowData; } + + public void clearConverterCache() { + cdcConverterCacheMap.clear(); + } } diff --git a/chunjun-examples/json/logminer/logminer_full_sync_stream.json b/chunjun-examples/json/logminer/logminer_full_sync_stream.json new file mode 100644 index 0000000000..103e41df54 --- /dev/null +++ b/chunjun-examples/json/logminer/logminer_full_sync_stream.json @@ -0,0 +1,34 @@ +{ + "job": { + "content": [ + { + "reader": { + "parameter": { + "jdbcUrl": "jdbc:oracle:thin:@//127.0.0.1:1521/orcl11g.us.oracle.com", + "username": "oracle", + "password": "oracle", + "supportAutoAddLog": false, + "table": ["xxx.xxxx"], + "cat": "UPDATE,INSERT,DELETE", + "pavingData" : true, + "enableFetchAll": true, + "queryTimeout": 3000 + }, + "name": "oraclelogminerreader" + }, + "writer": { + "parameter": { + "print": true + }, + "name": "streamwriter" + } + } + ], + "setting" : { + "speed" : { + "bytes" : 0, + "channel" : 1 + } + } + } +} diff --git "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/logminer/LogMiner-source.md" "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/logminer/LogMiner-source.md" index 938232f2ba..367c64669d 100644 --- "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/logminer/LogMiner-source.md" +++ "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/logminer/LogMiner-source.md" @@ -123,6 +123,12 @@ oracle10,oracle11,oracle12,oracle19,支持RAC,主备架构 - 字段类型:boolean - 默认值:false +- **enableFetchAll** + - 描述:是否同步表的历史数据 + - 必选:否 + - 字段类型:boolean + - 默认值:false + ### 2、SQL - **url**