Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.jdbc.sink;

import com.dtstack.chunjun.conf.FieldConf;
Expand All @@ -25,7 +24,6 @@
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.enums.EWriteMode;
import com.dtstack.chunjun.util.TableUtil;

import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -73,7 +71,6 @@ public static DynamicPreparedStmt buildStmt(
RowKind rowKind,
Connection connection,
JdbcDialect jdbcDialect,
JdbcConf jdbcConf,
boolean writeExtInfo)
throws SQLException {
DynamicPreparedStmt dynamicPreparedStmt = new DynamicPreparedStmt();
Expand All @@ -84,14 +81,7 @@ public static DynamicPreparedStmt buildStmt(
dynamicPreparedStmt.getColumnMeta(schemaName, tableName, connection);
dynamicPreparedStmt.buildRowConvert();

String sql =
dynamicPreparedStmt.prepareTemplates(
rowKind,
schemaName,
tableName,
jdbcConf.getUniqueKey().toArray(new String[0]),
jdbcConf.getMode(),
jdbcConf.isAllReplace());
String sql = dynamicPreparedStmt.prepareTemplates(rowKind, schemaName, tableName);
String[] fieldNames = new String[dynamicPreparedStmt.columnNameList.size()];
dynamicPreparedStmt.columnNameList.toArray(fieldNames);
dynamicPreparedStmt.fieldNamedPreparedStatement =
Expand All @@ -105,29 +95,20 @@ public static DynamicPreparedStmt buildStmt(
RowKind rowKind,
Connection connection,
JdbcDialect jdbcDialect,
JdbcConf jdbcConf,
List<FieldConf> fieldConfList,
AbstractRowConverter<?, ?, ?, ?> rowConverter)
throws SQLException {
List<FieldConf> fieldConfList = jdbcConf.getColumn();
DynamicPreparedStmt dynamicPreparedStmt = new DynamicPreparedStmt();
dynamicPreparedStmt.jdbcDialect = jdbcDialect;
dynamicPreparedStmt.rowConverter = rowConverter;
dynamicPreparedStmt.jdbcConf = jdbcConf;
String[] fieldNames = new String[fieldConfList.size()];
for (int i = 0; i < fieldConfList.size(); i++) {
FieldConf fieldConf = fieldConfList.get(i);
fieldNames[i] = fieldConf.getName();
dynamicPreparedStmt.columnNameList.add(fieldConf.getName());
dynamicPreparedStmt.columnTypeList.add(fieldConf.getType());
}
String sql =
dynamicPreparedStmt.prepareTemplates(
rowKind,
schemaName,
tableName,
jdbcConf.getUniqueKey().toArray(new String[0]),
jdbcConf.getMode(),
jdbcConf.isAllReplace());
String sql = dynamicPreparedStmt.prepareTemplates(rowKind, schemaName, tableName);
dynamicPreparedStmt.fieldNamedPreparedStatement =
FieldNamedPreparedStatementImpl.prepareStatement(connection, sql, fieldNames);
return dynamicPreparedStmt;
Expand All @@ -150,20 +131,14 @@ public static DynamicPreparedStmt buildStmt(
return dynamicPreparedStmt;
}

protected String prepareTemplates(
RowKind rowKind,
String schemaName,
String tableName,
String[] uniqueKeys,
String mode,
boolean allReplace) {
protected String prepareTemplates(RowKind rowKind, String schemaName, String tableName) {
String singleSql = null;
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
singleSql =
this.getInsertStatementWithWriteMode(
mode, schemaName, tableName, uniqueKeys, allReplace);
jdbcDialect.getInsertIntoStatement(
schemaName, tableName, columnNameList.toArray(new String[0]));
break;
case DELETE:
case UPDATE_BEFORE:
Expand All @@ -179,40 +154,6 @@ protected String prepareTemplates(
return singleSql;
}

protected String getInsertStatementWithWriteMode(
String mode,
String schemaName,
String tableName,
String[] uniqueKeys,
boolean allReplace) {
String singleSql;
if (EWriteMode.INSERT.name().equalsIgnoreCase(mode)) {
singleSql =
jdbcDialect.getInsertIntoStatement(
schemaName, tableName, columnNameList.toArray(new String[0]));
} else if (EWriteMode.REPLACE.name().equalsIgnoreCase(mode)) {
singleSql =
jdbcDialect
.getReplaceStatement(
schemaName, tableName, columnNameList.toArray(new String[0]))
.get();
} else if (EWriteMode.UPDATE.name().equalsIgnoreCase(mode)) {
singleSql =
jdbcDialect
.getUpsertStatement(
schemaName,
tableName,
columnNameList.toArray(new String[0]),
uniqueKeys,
allReplace)
.get();
} else {
throw new IllegalArgumentException("Unknown write mode:" + mode);
}

return singleSql;
}

public void getColumnNameList(Map<String, Integer> header, Set<String> extHeader) {
if (writeExtInfo) {
columnNameList.addAll(header.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.UPDATE_BEFORE)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.chunjun.connector.jdbc.sink;

import com.dtstack.chunjun.connector.jdbc.conf.JdbcConf;
Expand Down Expand Up @@ -71,7 +70,7 @@ public class PreparedStmtProxy implements FieldNamedPreparedStatement {

private final int cacheDurationMin = 10;

/** LRU cache key info: database_table_rowkind * */
/** LUR cache key info: database_table_rowkind * */
protected Cache<String, DynamicPreparedStmt> pstmtCache;

/** 当前的执行sql的preparestatement */
Expand Down Expand Up @@ -160,7 +159,6 @@ public void getOrCreateFieldNamedPstmt(RowData row) throws ExecutionException {
columnRowData.getRowKind(),
connection,
jdbcDialect,
jdbcConf,
writeExtInfo);
} catch (SQLException e) {
LOG.warn("", e);
Expand All @@ -184,7 +182,7 @@ public void getOrCreateFieldNamedPstmt(RowData row) throws ExecutionException {
row.getRowKind(),
connection,
jdbcDialect,
jdbcConf,
jdbcConf.getColumn(),
currentRowConverter);
} catch (SQLException e) {
LOG.warn("", e);
Expand Down
84 changes: 84 additions & 0 deletions chunjun-examples/json/logminer/logminer_stream_insert_mode.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
{
"job": {
"content": [
{
"nameMapping": {
"schemaMappings": {
"LOGMINER_TEST": "LOGMINER_TEST"
},
"tableMappings": {
"LOGMINER_TEST": {
"ZJH_LG_FROM2": "ZJH_LG_TO2"
}
},
"fieldMappings": {
"LOGMINER_TEST": {
"ZJH_LG_FROM2": {
"ID": "TO2_ID",
"NAME": "NAME"
}
}
}
},
"reader": {
"parameter": {
"jdbcUrl": "jdbc:oracle:thin:@localhost:1521:helowin",
"username": "logminer_test",
"password": "123456",
"supportAutoAddLog": false,
"table": [
"LOGMINER_TEST.ZJH_LG_FROM2"
],
"pavingData": false,
"split": true,
"cat": "INSERT,UPDATE,DELETE",
"readPosition": "CURRENT",
"queryTimeout": 3000
},
"name": "oraclelogminerreader"
},
"writer": {
"parameter": {
"writeMode": "insert",
"uniqueKey": [
],
"allReplace": true,
"username": "logminer_test",
"password": "123456",
"connection": [
{
"schema": "LOGMINER_TEST",
"jdbcUrl": "jdbc:oracle:thin:@localhost:1521:helowin",
"table": [
"*"
]
}
]
},
"name": "oraclewriter"
}
}
],
"setting": {
"speed": {
"bytes": 0,
"channel": 1
},
"errorLimit": {
"record": 1
},
"restore": {
"maxRowNumForCheckpoint": 0,
"isRestore": false,
"restoreColumnName": "",
"restoreColumnIndex": 0
},
"log": {
"isLogger": true,
"level": "debug",
"path": "",
"pattern": ""
}
}
}
}