From 35b567d730bb8450bc4f1202257c02c4b5c2fec7 Mon Sep 17 00:00:00 2001 From: psainics Date: Tue, 3 Mar 2026 01:15:35 +0000 Subject: [PATCH 1/2] Add Oracle UPSERT --- .../main/java/io/cdap/plugin/db/DBRecord.java | 5 +- .../oracle/OracleETLDBOutputFormat.java | 126 +++++++++++++++++ .../io/cdap/plugin/oracle/OracleSink.java | 12 +- .../plugin/oracle/OracleSinkDBRecord.java | 15 +- .../plugin/oracle/OracleSourceDBRecord.java | 2 +- .../oracle/OracleETLDBOutputFormatTest.java | 129 ++++++++++++++++++ oracle-plugin/widgets/Oracle-batchsink.json | 28 +++- 7 files changed, 308 insertions(+), 9 deletions(-) create mode 100644 oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java create mode 100644 oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java diff --git a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java index a5a9fcf5f..b187c7670 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java @@ -314,10 +314,7 @@ protected void upsertOperation(PreparedStatement stmt) throws SQLException { } private boolean fillUpdateParams(List updatedKeyList, ColumnType columnType) { - if (operationName.equals(Operation.UPDATE) && updatedKeyList.contains(columnType.getName())) { - return true; - } - return false; + return operationName.equals(Operation.UPDATE) && updatedKeyList.contains(columnType.getName()); } private Schema getNonNullableSchema(Schema.Field field) { diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java new file mode 100644 index 000000000..4a7b18278 --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java @@ -0,0 +1,126 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.plugin.db.sink.ETLDBOutputFormat; + +/** + * Class that extends {@link ETLDBOutputFormat} to implement the abstract methods + */ +public class OracleETLDBOutputFormat extends ETLDBOutputFormat { + + /** + * This method is used to construct the upsert query for Oracle using MERGE statement. + * Example - MERGE INTO my_table target + * USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source + * ON (target.id = source.id) + * WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age + * WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age) + * @param table - Name of the table + * @param fieldNames - All the columns of the table + * @param listKeys - The columns used as keys for matching + * @return Upsert query in the form of string + */ + @Override + public String constructUpsertQuery(String table, String[] fieldNames, String[] listKeys) { + if (listKeys == null) { + throw new IllegalArgumentException( + "'Relation Table Key' must be specified for upsert operations. " + + "Please provide the list of key columns used to match records in the target table."); + } else if (fieldNames == null) { + throw new IllegalArgumentException( + "'Field Names' must be specified for upsert operations. " + + "Please provide the list of columns to be written to the target table."); + } else { + StringBuilder query = new StringBuilder(); + + // MERGE INTO target_table target + query.append("MERGE INTO ").append(table).append(" target "); + + // USING (SELECT ? AS col1, ? AS col2, ... FROM dual) source + query.append("USING (SELECT "); + for (int i = 0; i < fieldNames.length; ++i) { + query.append("? AS ").append(fieldNames[i]); + if (i != fieldNames.length - 1) { + query.append(", "); + } + } + query.append(" FROM dual) source "); + + // ON (target.key1 = source.key1 AND target.key2 = source.key2 ...) + query.append("ON ("); + for (int i = 0; i < listKeys.length; ++i) { + query.append("target.").append(listKeys[i]).append(" = source.").append(listKeys[i]); + if (i != listKeys.length - 1) { + query.append(" AND "); + } + } + query.append(") "); + + // WHEN MATCHED THEN UPDATE SET target.col1 = source.col1, target.col2 = source.col2 ... + // Only update non-key columns + query.append("WHEN MATCHED THEN UPDATE SET "); + boolean firstUpdateColumn = true; + for (String fieldName : fieldNames) { + boolean isKeyColumn = false; + for (String listKey : listKeys) { + String listKeyNoQuote = listKey.replace("\"", ""); + if (listKeyNoQuote.equals(fieldName)) { + isKeyColumn = true; + break; + } + } + if (!isKeyColumn) { + if (!firstUpdateColumn) { + query.append(", "); + } + query.append("target.").append(fieldName).append(" = source.").append(fieldName); + firstUpdateColumn = false; + } + } + + // WHEN NOT MATCHED THEN INSERT (col1, col2, ...) VALUES (source.col1, source.col2, ...) + query.append(" WHEN NOT MATCHED THEN INSERT ("); + for (int i = 0; i < fieldNames.length; ++i) { + query.append(fieldNames[i]); + if (i != fieldNames.length - 1) { + query.append(", "); + } + } + query.append(") VALUES ("); + for (int i = 0; i < fieldNames.length; ++i) { + query.append("source.").append(fieldNames[i]); + if (i != fieldNames.length - 1) { + query.append(", "); + } + } + query.append(")"); + + return query.toString(); + } + } + + @Override + public String constructUpdateQuery(String table, String[] fieldNames, String[] listKeys) { + // Oracle JDBC does not accept a trailing semicolon in prepared statements. + String query = super.constructUpdateQuery(table, fieldNames, listKeys); + if (query.endsWith(";")) { + return query.substring(0, query.length() - 1); + } + return query; + } +} diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java index 40ecfbe9e..0cf6320de 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java @@ -23,6 +23,7 @@ import io.cdap.cdap.api.annotation.MetadataProperty; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Output; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; @@ -31,6 +32,7 @@ import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider; import io.cdap.plugin.common.db.DBErrorDetailsProvider; import io.cdap.plugin.db.DBRecord; import io.cdap.plugin.db.SchemaReader; @@ -60,7 +62,8 @@ public OracleSink(OracleSinkConfig oracleSinkConfig) { @Override protected DBRecord getDBRecord(StructuredRecord output) { - return new OracleSinkDBRecord(output, columnTypes); + return new OracleSinkDBRecord(output, columnTypes, oracleSinkConfig.getOperationName(), + oracleSinkConfig.getRelationTableKey()); } @Override @@ -72,6 +75,13 @@ protected FieldsValidator getFieldsValidator() { protected SchemaReader getSchemaReader() { return new OracleSinkSchemaReader(); } + + @Override + protected void addOutputContext(BatchSinkContext context) { + context.addOutput(Output.of(oracleSinkConfig.getReferenceName(), + new SinkOutputFormatProvider(OracleETLDBOutputFormat.class, getConfiguration()))); + } + @Override protected LineageRecorder getLineageRecorder(BatchSinkContext context) { String fqn = DBUtils.constructFQN("oracle", diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java index 01b9a8247..ee77a7fc9 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java @@ -19,6 +19,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.db.ColumnType; +import io.cdap.plugin.db.Operation; import io.cdap.plugin.db.SchemaReader; import java.sql.PreparedStatement; @@ -31,9 +32,12 @@ */ public class OracleSinkDBRecord extends OracleSourceDBRecord { - public OracleSinkDBRecord(StructuredRecord record, List columnTypes) { + public OracleSinkDBRecord(StructuredRecord record, List columnTypes, Operation operationName, + String relationTableKey) { this.record = record; this.columnTypes = columnTypes; + this.operationName = operationName; + this.relationTableKey = relationTableKey; } @Override @@ -50,4 +54,13 @@ protected void insertOperation(PreparedStatement stmt) throws SQLException { writeToDB(stmt, field, fieldIndex); } } + + @Override + protected void upsertOperation(PreparedStatement stmt) throws SQLException { + for (int fieldIndex = 0; fieldIndex < columnTypes.size(); fieldIndex++) { + ColumnType columnType = columnTypes.get(fieldIndex); + Schema.Field field = record.getSchema().getField(columnType.getName()); + writeToDB(stmt, field, fieldIndex); + } + } } diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java index 7d7c69d2b..44131a01b 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java @@ -116,8 +116,8 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB @Override protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema, String fieldName, int fieldIndex) throws SQLException { - int sqlType = columnTypes.get(fieldIndex).getType(); int sqlIndex = fieldIndex + 1; + int sqlType = modifiableColumnTypes.get(fieldIndex).getType(); // TIMESTAMP and TIMESTAMPTZ types needs to be handled using the specific oracle types to ensure that the data // inserted matches with the provided value. As Oracle driver internally alters the values provided diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java new file mode 100644 index 000000000..994c86b55 --- /dev/null +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java @@ -0,0 +1,129 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import org.junit.Assert; +import org.junit.Test; + +public class OracleETLDBOutputFormatTest { + + private final OracleETLDBOutputFormat outputFormat = new OracleETLDBOutputFormat(); + + @Test + public void testConstructUpsertQueryBasic() { + String[] fieldNames = {"id", "name", "age"}; + String[] listKeys = {"id"}; + String table = "my_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO my_table target " + + "USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source " + + "ON (target.id = source.id) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age " + + "WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age)"; + + Assert.assertEquals(expected, result); + } + + @Test + public void testConstructUpsertQueryMultipleKeys() { + String[] fieldNames = {"id", "code", "name", "value"}; + String[] listKeys = {"id", "code"}; + String table = "composite_key_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO composite_key_table target " + + "USING (SELECT ? AS id, ? AS code, ? AS name, ? AS value FROM dual) source " + + "ON (target.id = source.id AND target.code = source.code) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name, target.value = source.value " + + "WHEN NOT MATCHED THEN INSERT (id, code, name, value) VALUES (source.id, source.code, source.name, source" + + ".value)"; + + Assert.assertEquals(expected, result); + } + + @Test + public void testConstructUpsertQuerySingleField() { + String[] fieldNames = {"id", "name"}; + String[] listKeys = {"id"}; + String table = "single_field_update_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO single_field_update_table target " + + "USING (SELECT ? AS id, ? AS name FROM dual) source " + + "ON (target.id = source.id) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name " + + "WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)"; + + Assert.assertEquals(expected, result); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructUpsertQueryNullListKeys() { + String[] fieldNames = {"id", "name", "age"}; + String table = "my_table"; + + outputFormat.constructUpsertQuery(table, fieldNames, null); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructUpsertQueryNullFieldNames() { + String[] listKeys = {"id"}; + String table = "my_table"; + + outputFormat.constructUpsertQuery(table, null, listKeys); + } + + @Test + public void testConstructUpsertQueryAllFieldsAreKeys() { + String[] fieldNames = {"id", "code"}; + String[] listKeys = {"id", "code"}; + String table = "all_keys_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + // When all fields are keys, the UPDATE SET clause will be empty after "SET " + // Note: There's an extra space before "WHEN NOT MATCHED" due to implementation + String expected = "MERGE INTO all_keys_table target " + + "USING (SELECT ? AS id, ? AS code FROM dual) source " + + "ON (target.id = source.id AND target.code = source.code) " + + "WHEN MATCHED THEN UPDATE SET " + + "WHEN NOT MATCHED THEN INSERT (id, code) VALUES (source.id, source.code)"; + + Assert.assertEquals(expected, result); + } + + @Test + public void testConstructUpsertQueryWithSpecialTableName() { + String[] fieldNames = {"id", "name"}; + String[] listKeys = {"id"}; + String table = "SCHEMA.MY_TABLE"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO SCHEMA.MY_TABLE target " + + "USING (SELECT ? AS id, ? AS name FROM dual) source " + + "ON (target.id = source.id) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name " + + "WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)"; + + Assert.assertEquals(expected, result); + } +} diff --git a/oracle-plugin/widgets/Oracle-batchsink.json b/oracle-plugin/widgets/Oracle-batchsink.json index 8d8fc79a2..d48a1cb1c 100644 --- a/oracle-plugin/widgets/Oracle-batchsink.json +++ b/oracle-plugin/widgets/Oracle-batchsink.json @@ -192,6 +192,29 @@ "label": "Schema Name", "name": "dbSchemaName" }, + { + "widget-type": "radio-group", + "label": "Operation Name", + "name": "operationName", + "widget-attributes": { + "default": "insert", + "layout": "inline", + "options": [ + { + "id": "insert", + "label": "INSERT" + }, + { + "id": "update", + "label": "UPDATE" + }, + { + "id": "upsert", + "label": "UPSERT" + } + ] + } + }, { "widget-type": "hidden", "label": "Operation Name", @@ -201,9 +224,10 @@ } }, { - "widget-type": "hidden", + "name": "relationTableKey", + "widget-type": "csv", "label": "Table Key", - "name": "relationTableKey" + "widget-attributes": {} } ] }, From c44d8af6347282b313292ddea058dbec5f297796 Mon Sep 17 00:00:00 2001 From: suryakumari Date: Tue, 17 Mar 2026 19:05:21 +0530 Subject: [PATCH 2/2] e2e oracle sink update upsert tests --- .../sink/OracleDesignTimeValidation.feature | 48 ++++++++ .../features/sink/OracleRunTime.feature | 108 ++++++++++++++++++ .../features/sink/OracleRunTimeMacro.feature | 84 +++++++++++++- .../resources/errorMessage.properties | 1 + .../resources/pluginParameters.properties | 6 + 5 files changed, 246 insertions(+), 1 deletion(-) diff --git a/oracle-plugin/src/e2e-test/features/sink/OracleDesignTimeValidation.feature b/oracle-plugin/src/e2e-test/features/sink/OracleDesignTimeValidation.feature index 5b69fbbb2..d9cb71e38 100644 --- a/oracle-plugin/src/e2e-test/features/sink/OracleDesignTimeValidation.feature +++ b/oracle-plugin/src/e2e-test/features/sink/OracleDesignTimeValidation.feature @@ -262,3 +262,51 @@ Feature: Oracle sink- Verify Oracle sink plugin design time validation scenarios Then Click on the Validate button Then Verify that the Plugin is displaying an error message: "blank.HostBlank.message" on the header + @ORACLE_SOURCE_DATATYPES_TEST @ORACLE_TARGET_DATATYPES_TEST @Oracle_Required + Scenario Outline: To verify Oracle sink plugin validation error message for update, upsert operation name and table key + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Oracle" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Oracle" from the plugins list as: "Sink" + Then Connect plugins: "Oracle" and "Oracle2" to establish connection + Then Navigate to the properties page of plugin: "Oracle" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "databaseName" + Then Enter textarea plugin property: "importQuery" with value: "selectQuery" + Then Click on the Get Schema button + Then Verify the Output Schema matches the Expected Schema: "outputDatatypesSchema" + Then Validate "Oracle" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Oracle2" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "databaseName" + Then Replace input plugin property: "tableName" with value: "targetTable" + Then Replace input plugin property: "dbSchemaName" with value: "schema" + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Enter input plugin property: "referenceName" with value: "targetRef" + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Select radio button plugin property: "operationName" with value: "" + Then Click on the Validate button + Then Verify that the Plugin Property: "operationName" is displaying an in-line error message: "errorMessageUpdateUpsertOperationName" + Then Verify that the Plugin Property: "relationTableKey" is displaying an in-line error message: "errorMessageUpdateUpsertOperationName" + Examples: + | options | + | upsert | + | update | diff --git a/oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature b/oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature index 8bd0cd536..7faae82a4 100644 --- a/oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature +++ b/oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature @@ -272,3 +272,111 @@ Feature: Oracle - Verify data transfer from BigQuery source to Oracle sink Then Verify the pipeline status is "Succeeded" Then Validate records transferred to target table with record counts of BigQuery table Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table + + @ORACLE_SOURCE_DATATYPES_TEST @ORACLE_TARGET_DATATYPES_TEST @Oracle_Required + Scenario Outline: To verify pipeline preview failed with invalid table key + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Oracle" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Oracle" from the plugins list as: "Sink" + Then Connect plugins: "Oracle" and "Oracle2" to establish connection + Then Navigate to the properties page of plugin: "Oracle" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "databaseName" + Then Enter textarea plugin property: "importQuery" with value: "selectQuery" + Then Click on the Get Schema button + Then Verify the Output Schema matches the Expected Schema: "outputDatatypesSchema" + Then Validate "Oracle" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Oracle2" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "databaseName" + Then Replace input plugin property: "tableName" with value: "targetTable" + Then Replace input plugin property: "dbSchemaName" with value: "schema" + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Enter input plugin property: "referenceName" with value: "targetRef" + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Select radio button plugin property: "operationName" with value: "" + Then Click on the Add Button of the property: "relationTableKey" with value: + | invalidOracleTableKey | + Then Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Verify the preview of pipeline is "Failed" + Examples: + | options | + | upsert | + | update | + + @BQ_SOURCE_TEST @ORACLE_TEST_TABLE @Oracle_Required + Scenario Outline: To verify data is getting transferred from BigQuery to Oracle successfully using upsert,update operation with table key + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Oracle" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "Oracle" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "datasetProject" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Verify the Output Schema matches the Expected Schema: "bqOutputDatatypesSchema" + Then Validate "BigQuery" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Oracle" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "databaseName" + Then Replace input plugin property: "tableName" with value: "targetTable" + Then Replace input plugin property: "dbSchemaName" with value: "schema" + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Enter input plugin property: "referenceName" with value: "targetRef" + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Select radio button plugin property: "operationName" with value: "" + Then Click on the Add Button of the property: "relationTableKey" with value: + | oracleTableKey | + Then Validate "Oracle" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Verify the preview of pipeline is "success" + Then Click on the Preview Data link on the Sink plugin node: "Oracle" + Then Close the preview data + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table + Examples: + | options | + | upsert | + | update | diff --git a/oracle-plugin/src/e2e-test/features/sink/OracleRunTimeMacro.feature b/oracle-plugin/src/e2e-test/features/sink/OracleRunTimeMacro.feature index 218a1a29e..7727a23cb 100644 --- a/oracle-plugin/src/e2e-test/features/sink/OracleRunTimeMacro.feature +++ b/oracle-plugin/src/e2e-test/features/sink/OracleRunTimeMacro.feature @@ -90,7 +90,7 @@ Feature: Oracle - Verify data transfer to Oracle sink with macro arguments Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table @ORACLE_SOURCE_DATATYPES_TEST @ORACLE_TARGET_DATATYPES_TEST @Oracle_Required - Scenario: To verify data is getting transferred from Oracle to Oracle successfully with connection argument, transaction isolation macro enabled + Scenario: To verify data is getting transferred from Oracle to Oracle successfully with connection argument, transaction isolation, operationName macro enabled Given Open Datafusion Project to configure pipeline When Expand Plugin group in the LHS plugins list: "Source" When Select plugin: "Oracle" from the plugins list as: "Source" @@ -126,12 +126,16 @@ Feature: Oracle - Verify data transfer to Oracle sink with macro arguments Then Select radio button plugin property: "role" with value: "normal" Then Click on the Macro button of Property: "connectionArguments" and set the value to: "connArgumentsSink" Then Click on the Macro button of Property: "transactionIsolationLevel" and set the value to: "transactionIsolationLevel" + Then Click on the Macro button of Property: "operationName" and set the value to: "oracleOperationName" + Then Click on the Macro button of Property: "relationTableKey" and set the value to: "oracleTableKey" Then Validate "Oracle2" plugin properties Then Close the Plugin Properties page Then Save the pipeline Then Preview and run the pipeline Then Enter runtime argument value "connectionArguments" for key "connArgumentsSink" Then Enter runtime argument value "transactionIsolationLevel" for key "transactionIsolationLevel" + Then Enter runtime argument value "upsertOperationName" for key "oracleOperationName" + Then Enter runtime argument value "upsertRelationTableKey" for key "oracleTableKey" Then Run the preview of pipeline with runtime arguments Then Wait till pipeline preview is in running state Then Open and capture pipeline preview logs @@ -142,6 +146,8 @@ Feature: Oracle - Verify data transfer to Oracle sink with macro arguments Then Run the Pipeline in Runtime Then Enter runtime argument value "connectionArguments" for key "connArgumentsSink" Then Enter runtime argument value "transactionIsolationLevel" for key "transactionIsolationLevel" + Then Enter runtime argument value "upsertOperationName" for key "oracleOperationName" + Then Enter runtime argument value "upsertRelationTableKey" for key "oracleTableKey" Then Run the Pipeline in Runtime with runtime arguments Then Wait till pipeline is in running state Then Open and capture logs @@ -149,3 +155,79 @@ Feature: Oracle - Verify data transfer to Oracle sink with macro arguments Then Close the pipeline logs Then Validate the values of records transferred to target table is equal to the values from source table + @BQ_SOURCE_TEST @ORACLE_TEST_TABLE @Oracle_Required + Scenario: To verify data is getting transferred from BigQuery source to Oracle sink using macro arguments for operation name + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "Oracle" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "Oracle" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Click on the Macro button of Property: "projectId" and set the value to: "bqProjectId" + Then Click on the Macro button of Property: "datasetProjectId" and set the value to: "bqDatasetProjectId" + Then Click on the Macro button of Property: "dataset" and set the value to: "bqDataset" + Then Click on the Macro button of Property: "table" and set the value to: "bqTable" + Then Validate "BigQuery" plugin properties + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "Oracle" + Then Click on the Macro button of Property: "jdbcPluginName" and set the value to: "oracleDriverName" + Then Click on the Macro button of Property: "host" and set the value to: "oracleHost" + Then Click on the Macro button of Property: "port" and set the value to: "oraclePort" + Then Click on the Macro button of Property: "user" and set the value to: "oracleUsername" + Then Click on the Macro button of Property: "password" and set the value to: "oraclePassword" + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Click on the Macro button of Property: "database" and set the value to: "oracleDatabaseName" + Then Click on the Macro button of Property: "tableName" and set the value to: "oracleTableName" + Then Click on the Macro button of Property: "dbSchemaName" and set the value to: "oracleSchemaName" + Then Click on the Macro button of Property: "operationName" and set the value to: "oracleOperationName" + Then Click on the Macro button of Property: "relationTableKey" and set the value to: "oracleTableKey" + Then Enter input plugin property: "referenceName" with value: "targetRef" + Then Validate "Oracle" plugin properties + Then Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Enter runtime argument value "projectId" for key "bqProjectId" + Then Enter runtime argument value "projectId" for key "bqDatasetProjectId" + Then Enter runtime argument value "dataset" for key "bqDataset" + Then Enter runtime argument value "bqSourceTable" for key "bqTable" + Then Enter runtime argument value "driverName" for key "oracleDriverName" + Then Enter runtime argument value from environment variable "host" for key "oracleHost" + Then Enter runtime argument value from environment variable "port" for key "oraclePort" + Then Enter runtime argument value from environment variable "username" for key "oracleUsername" + Then Enter runtime argument value from environment variable "password" for key "oraclePassword" + Then Enter runtime argument value "databaseName" for key "oracleDatabaseName" + Then Enter runtime argument value "targetTable" for key "oracleTableName" + Then Enter runtime argument value "schema" for key "oracleSchemaName" + Then Enter runtime argument value "operationName" for key "oracleOperationName" + Then Enter runtime argument value "relationTableKey" for key "oracleTableKey" + Then Run the preview of pipeline with runtime arguments + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Enter runtime argument value "projectId" for key "bqProjectId" + Then Enter runtime argument value "projectId" for key "bqDatasetProjectId" + Then Enter runtime argument value "dataset" for key "bqDataset" + Then Enter runtime argument value "bqSourceTable" for key "bqTable" + Then Enter runtime argument value "driverName" for key "oracleDriverName" + Then Enter runtime argument value from environment variable "host" for key "oracleHost" + Then Enter runtime argument value from environment variable "port" for key "oraclePort" + Then Enter runtime argument value from environment variable "username" for key "oracleUsername" + Then Enter runtime argument value from environment variable "password" for key "oraclePassword" + Then Enter runtime argument value "databaseName" for key "oracleDatabaseName" + Then Enter runtime argument value "targetTable" for key "oracleTableName" + Then Enter runtime argument value "schema" for key "oracleSchemaName" + Then Enter runtime argument value "operationName" for key "oracleOperationName" + Then Enter runtime argument value "relationTableKey" for key "oracleTableKey" + Then Run the Pipeline in Runtime with runtime arguments + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Close the pipeline logs + Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table diff --git a/oracle-plugin/src/e2e-test/resources/errorMessage.properties b/oracle-plugin/src/e2e-test/resources/errorMessage.properties index 895444408..10d147773 100644 --- a/oracle-plugin/src/e2e-test/resources/errorMessage.properties +++ b/oracle-plugin/src/e2e-test/resources/errorMessage.properties @@ -19,3 +19,4 @@ errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: blank.database.message=Required property 'database' has no value. blank.connection.message=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '72000', errorCode: '1005', errorMessage: SQL Exception occurred: [Message='ORA-01005: null password given; logon denied ', SQLState='72000', ErrorCode='1005'].' blank.HostBlank.message=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '08006', errorCode: '17002', errorMessage: SQL Exception occurred: [Message='IO Error: The Network Adapter could not establish the connection', SQLState='08006', ErrorCode='17002'].' +errorMessageUpdateUpsertOperationName=Table key must be set if the operation is 'Update' or 'Upsert'. diff --git a/oracle-plugin/src/e2e-test/resources/pluginParameters.properties b/oracle-plugin/src/e2e-test/resources/pluginParameters.properties index e3f81f1bc..55dba1533 100644 --- a/oracle-plugin/src/e2e-test/resources/pluginParameters.properties +++ b/oracle-plugin/src/e2e-test/resources/pluginParameters.properties @@ -86,6 +86,7 @@ invalidPassword=testPassword invalidBoundingQuery=SELECT MIN(id),MAX(id) FROM table invalidBoundingQueryValue=select; invalidTable=table +invalidOracleTableKey=asdas #ORACLE Valid Properties connectionArgumentsList=[{"key":"queryTimeout","value":"-1"}] @@ -97,6 +98,11 @@ splitByColumn=ID importQuery=where $CONDITIONS connectionArguments=queryTimeout=50 transactionIsolationLevel=TRANSACTION_READ_COMMITTED +operationName=update +oracleTableKey=LASTNAME +relationTableKey=ID +upsertOperationName=upsert +upsertRelationTableKey=COL2 #bq properties projectId=cdf-athena