diff --git a/pom.xml b/pom.xml
index 83dc4603de..dac54092bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,12 @@
3.1.1
0.23.1
${project.basedir}/src/test/java/
+
+
+ 0.0.2-SNAPSHOT
+ 0.0.2-SNAPSHOT
+ 1.11.3
+ 3.17.0
@@ -782,6 +788,39 @@
0.2.0
+
+
+ ai.festcloud
+ cloud-metadata-provider
+ ${version.cloud-metadata-provider}
+
+
+ ai.festcloud
+ cloud-metadata-model
+ ${version.cloud-metadata-common}
+
+
+ org.apache.avro
+ avro
+ ${apache.avro.version}
+
+
+ org.apache.commons
+ commons-lang3
+ ${version.commons-lang3}
+
+
+ com.h2database
+ h2
+ 2.2.224
+
+
+ ai.festcloud
+ cloud-data-plugins-common
+ 0.0.1-SNAPSHOT
+
+
+
@@ -867,6 +906,11 @@
org.apache.hadoop.hbase.mapreduce.*;
org.apache.hadoop.hbase.security.token.*;
com.google.cloud.spark.bigquery.*;
+ org.apache.commons.collections4.*;
+ ai.festcloud.model.*;
+ ai.festcloud.datafabric.plugins.common.integrity.*;
+ org.apache.avro.*;
+ org.h2.*;
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java
new file mode 100644
index 0000000000..ec288cd534
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java
@@ -0,0 +1,192 @@
+package io.cdap.plugin.gcp.bigquery.fctransform;
+
+import ai.festcloud.datafabric.plugins.common.integrity.CDAPUtils;
+import ai.festcloud.datafabric.plugins.common.integrity.IntegrityService;
+import ai.festcloud.datafabric.plugins.common.integrity.IntegrityServiceBQ;
+import ai.festcloud.datafabric.plugins.common.integrity.MetadataUtils;
+import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingEntryConfig;
+import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingObj;
+import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingParsingService;
+import ai.festcloud.metadata.model.TypeRecord;
+import com.google.auth.Credentials;
+import com.google.cloud.bigquery.BigQuery;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.InvalidEntry;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.StageSubmitterContext;
+import io.cdap.cdap.etl.api.Transform;
+import io.cdap.cdap.etl.api.TransformContext;
+import io.cdap.plugin.gcp.common.GCPUtils;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Plugin(type = Transform.PLUGIN_TYPE)
+@Name("BigQueryMdmIntegrityValidation")
+@Description("Verify whether the requested values are present in MDM and add new specified field.")
+public class MdmIntegrityBigQueryTransformer extends Transform {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MdmIntegrityBigQueryTransformer.class);
+
+ private static final String OPERATION = "operation";
+ private static final String OPERATION_CREATE = "create";
+ private static final String OPERATION_UPDATE = "update";
+
+ private final MdmIntegrityBigQueryTransformerConfig config;
+ private Schema outputSchema;
+
+ private MappingObj mapping;
+ private Map entities;
+ private IntegrityService integrityService;
+ private boolean containsOperationField = false;
+
+ public MdmIntegrityBigQueryTransformer(MdmIntegrityBigQueryTransformerConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void initialize(TransformContext context) throws Exception {
+ LOG.info("Initializing BigQuery integrity validation...");
+ super.initialize(context);
+
+ FailureCollector failureCollector = context.getFailureCollector();
+ outputSchema = config.getSchema(failureCollector);
+
+ String configServerUrl = context.getArguments()
+ .get(MetadataUtils.CONFIGSERVER_METADATA_SCHEMA_URL);
+ String metadataRootPath = context.getArguments().get(MetadataUtils.METADATA_ROOT_PATH);
+ entities = MetadataUtils.getTypeRecordByUrl(configServerUrl,
+ metadataRootPath);
+ config.validate(failureCollector, entities, context.getInputSchema());
+
+ MappingParsingService mappingParsingService
+ = new MappingParsingService(config.getMapping(),
+ config.getFullyQualifiedEntityName(),
+ failureCollector,
+ entities,
+ outputSchema);
+ Optional mappingOpt = mappingParsingService.getMapping();
+ mapping = mappingOpt.orElse(null);
+
+ Credentials credentials = config.getConnection().getCredentials(failureCollector);
+ BigQuery bigQuery = GCPUtils.getBigQuery(config.getConnection().getProject(), credentials);
+
+ failureCollector.getOrThrowException();
+
+ integrityService = new IntegrityServiceBQ(bigQuery, entities, mapping);
+ containsOperationField = outputSchema.getFields()
+ .stream().anyMatch(field -> field.getName().equals(OPERATION));
+
+ LOG.info("BigQueryMdmIntegrityValidation initialized.");
+ }
+
+ @Override
+ public void onRunFinish(boolean succeeded, StageSubmitterContext context) {
+ super.onRunFinish(succeeded, context);
+
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ try {
+ integrityService.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ super.configurePipeline(pipelineConfigurer);
+ FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(collector));
+ }
+
+
+ @Override
+ public void transform(StructuredRecord input, Emitter emitter)
+ throws Exception {
+ try {
+ StructuredRecord structuredRecord = fillIds(input);
+ emitter.emit(structuredRecord);
+ } catch (Exception e) {
+ emitter.emitError(new InvalidEntry<>(MetadataUtils.ERROR_CODE, e.getMessage(), input));
+ }
+ }
+
+ private StructuredRecord fillIds(StructuredRecord input) {
+
+ Map result = new HashMap<>();
+ Map> mappingEntryConfigs = mapping.getMappingEntryConfigs();
+
+ mappingEntryConfigs.forEach((targetFieldName, mappingEntryConfig) -> {
+
+ for (MappingEntryConfig entryConfig : mappingEntryConfig) {
+ List ids = integrityService.getIds(entryConfig, input);
+ if (ids.size() > 1) {
+ throw new RuntimeException(
+ "More than one id found for request: " + entryConfig.toString());
+ }
+ if (ids.size() == 1) {
+ result.put(targetFieldName, ids.get(0));
+ break;
+ }
+ }
+ });
+ if (result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null && config.getFcidRequired()) {
+ throw new RuntimeException("ID is required but not provided.");
+ }
+
+ if (containsOperationField) {
+ String operationType = result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null
+ ? OPERATION_CREATE
+ : OPERATION_UPDATE;
+ result.put(OPERATION, operationType);
+ }
+
+ return setValuesToTargetFields(input, result);
+ }
+
+
+ private StructuredRecord setValuesToTargetFields(StructuredRecord input,
+ Map values) {
+ StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
+ setFieldValues(input, values, builder, outputSchema);
+ return builder.build();
+ }
+
+ private void setFieldValues(StructuredRecord input,
+ Map values,
+ StructuredRecord.Builder builder,
+ Schema schema) {
+ for (Schema.Field field : schema.getFields()) {
+ String fieldName = field.getName();
+ Object fieldValue = input.get(fieldName);
+
+ if (CDAPUtils.isRecordType(field) && fieldValue != null) {
+ StructuredRecord nestedRecord = (StructuredRecord) fieldValue;
+ Schema nestedSchema = CDAPUtils.getNonNullableSchema(field.getSchema());
+
+ StructuredRecord.Builder nestedBuilder = StructuredRecord.builder(nestedSchema);
+ setFieldValues(nestedRecord, values, nestedBuilder, nestedSchema);
+ builder.set(fieldName, nestedBuilder.build());
+ } else {
+ builder.set(fieldName, values.getOrDefault(fieldName, fieldValue));
+ }
+ }
+ }
+
+
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java
new file mode 100644
index 0000000000..955e32b3c5
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java
@@ -0,0 +1,137 @@
+package io.cdap.plugin.gcp.bigquery.fctransform;
+
+
+import ai.festcloud.datafabric.plugins.common.integrity.MetadataUtils;
+import ai.festcloud.metadata.model.TypeField;
+import ai.festcloud.metadata.model.TypeRecord;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.common.ConfigUtil;
+import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+public class MdmIntegrityBigQueryTransformerConfig extends PluginConfig {
+
+ public static final String MAPPING = "mapping";
+ public static final String FULLY_QUALIFIED_ENTITY_NAME = "fullyQualifiedEntityName";
+ public static final String SCHEMA = "schema";
+
+
+ @Name(ConfigUtil.NAME_CONNECTION)
+ @Macro
+ @Nullable
+ @Description("The existing connection to use.")
+ public BigQueryConnectorConfig connection;
+
+
+ @Name(ConfigUtil.NAME_USE_CONNECTION)
+ @Nullable
+ @Description("Whether to use an existing connection.")
+ public Boolean useConnection;
+
+ @Name(MAPPING)
+ @Description("Properties to validate")
+ @Macro
+ private final String mapping;
+
+ @Name(FULLY_QUALIFIED_ENTITY_NAME)
+ @Description("Metadata server url")
+ @Macro
+ private final String fullyQualifiedEntityName;
+
+ @Name("fcidRequired")
+ @Description("Indicates whether FestCloudID is required. "
+ + "If true, records without a FestCloudID are sent to the error flow. "
+ + "If false, records without a FestCloudID are processed and not sent to the error flow.")
+ @Macro
+ private final Boolean fcidRequired;
+
+ @Name(SCHEMA)
+ @Description("Schema of the output records.")
+ @Macro
+ private final String schema;
+
+
+ public MdmIntegrityBigQueryTransformerConfig(BigQueryConnectorConfig connection,
+ Boolean useConnection, String mapping,
+ String fullyQualifiedEntityName,
+ Boolean fcidRequired, String schema) {
+ this.connection = connection;
+ this.useConnection = useConnection;
+ this.mapping = mapping;
+ this.fullyQualifiedEntityName = fullyQualifiedEntityName;
+ this.fcidRequired = fcidRequired;
+ this.schema = schema;
+ }
+
+
+ public void validate(FailureCollector collector, Map entities,
+ Schema outputSchema) {
+ ConfigUtil.validateConnection(this, useConnection, connection, collector);
+ TypeRecord typeRecord = entities.get(fullyQualifiedEntityName);
+ List fields = typeRecord.getFields();
+
+ fields.stream().filter(MetadataUtils::integrityRequired).map(MetadataUtils::getFieldName)
+ .forEach(fieldName -> validateField(fieldName, outputSchema, collector));
+ }
+
+ private void validateField(String fieldName, Schema outputSchema, FailureCollector collector) {
+ if (outputSchema.getField(fieldName) == null) {
+ collector.addFailure(String.format("Can't find field %s in output record", fieldName),
+ String.format(
+ "Field %s mandatory for integrity validation according to metadata definition",
+ fieldName));
+ }
+ }
+
+ public Schema getSchema(FailureCollector collector) {
+ try {
+ return Schema.parseJson(schema);
+ } catch (IOException e) {
+ collector.addFailure(String.format("Failed to parse schema: %s", schema), null);
+ throw collector.getOrThrowException();
+ }
+ }
+
+ @Nullable
+ public BigQueryConnectorConfig getConnection() {
+ return connection;
+ }
+
+ public void setConnection(@Nullable BigQueryConnectorConfig connection) {
+ this.connection = connection;
+ }
+
+ @Nullable
+ public Boolean getUseConnection() {
+ return useConnection;
+ }
+
+ public void setUseConnection(@Nullable Boolean useConnection) {
+ this.useConnection = useConnection;
+ }
+
+ public String getFullyQualifiedEntityName() {
+ return fullyQualifiedEntityName;
+ }
+
+ public Boolean getFcidRequired() {
+ return fcidRequired;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public String getMapping() {
+ return mapping;
+ }
+}
+
diff --git a/widgets/BigQueryMdmIntegrityValidation-transform.json b/widgets/BigQueryMdmIntegrityValidation-transform.json
new file mode 100644
index 0000000000..7d170f33a8
--- /dev/null
+++ b/widgets/BigQueryMdmIntegrityValidation-transform.json
@@ -0,0 +1,254 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "display-name": "MDM BigQuery integrity validation",
+ "configuration-groups": [
+ {
+ "label": "Connection",
+ "properties": [
+ {
+ "widget-type": "toggle",
+ "label": "Use connection",
+ "name": "useConnection",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "YES"
+ },
+ "off": {
+ "value": "false",
+ "label": "NO"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "connection-select",
+ "label": "Connection",
+ "name": "connection",
+ "widget-attributes": {
+ "connectionType": "BigQuery"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Project ID",
+ "name": "project",
+ "widget-attributes": {
+ "default": "auto-detect"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Dataset Project ID",
+ "name": "datasetProject",
+ "widget-attributes": {
+ "placeholder": "Project the dataset belongs to, if different from the Project ID."
+ }
+ },
+ {
+ "name": "serviceAccountType",
+ "label": "Service Account Type",
+ "widget-type": "radio-group",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "filePath",
+ "options": [
+ {
+ "id": "filePath",
+ "label": "File Path"
+ },
+ {
+ "id": "JSON",
+ "label": "JSON"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account File Path",
+ "name": "serviceFilePath",
+ "widget-attributes": {
+ "default": "auto-detect"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account JSON",
+ "name": "serviceAccountJSON"
+ }
+ ]
+ },
+ {
+ "label": "FC integrity configs",
+ "properties": [
+ {
+ "name": "mapping",
+ "label": "Mapping",
+ "widget-type": "ds-multiplevalues",
+ "widget-attributes": {
+ "numValues": "3",
+ "placeholders": [
+ "Mdm entity name",
+ "Source field",
+ "External field"
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Fully Qualified Entity name",
+ "name": "fullyQualifiedEntityName"
+ },
+ {
+ "name": "fcidRequired",
+ "label": "Is FestCloudID required",
+ "widget-type": "toggle",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "YES"
+ },
+ "off": {
+ "value": "false",
+ "label": "NO"
+ },
+ "default": "true"
+ }
+ }
+ ]
+ },
+ {
+ "label": "Views",
+ "properties": [
+ {
+ "widget-type": "toggle",
+ "label": "Enable querying views",
+ "name": "enableQueryingViews",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "YES"
+ },
+ "off": {
+ "value": "false",
+ "label": "NO"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Temporary Table Creation Project",
+ "name": "viewMaterializationProject"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Temporary Table Creation Dataset",
+ "name": "viewMaterializationDataset"
+ }
+ ]
+ }
+ ],
+ "outputs": [
+ {
+ "name": "schema",
+ "label": "schema",
+ "widget-type": "schema",
+ "widget-attributes": {
+ "schema-types": [
+ "boolean",
+ "long",
+ "double",
+ "bytes",
+ "string",
+ "array"
+ ],
+ "schema-default-type": "string"
+ }
+ }
+ ],
+ "filters": [
+ {
+ "name": "ViewsProperties",
+ "condition": {
+ "expression": "enableQueryingViews == true "
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "viewMaterializationProject"
+ },
+ {
+ "type": "property",
+ "name": "viewMaterializationDataset"
+ }
+ ]
+ },
+ {
+ "name": "ServiceAuthenticationTypeFilePath",
+ "condition": {
+ "expression": "useConnection == false && serviceAccountType == 'filePath'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "serviceFilePath"
+ }
+ ]
+ },
+ {
+ "name": "ServiceAuthenticationTypeJSON",
+ "condition": {
+ "expression": "useConnection == false && serviceAccountType == 'JSON'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "serviceAccountJSON"
+ }
+ ]
+ },
+ {
+ "name": "showConnectionProperties ",
+ "condition": {
+ "expression": "useConnection == false"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "project"
+ },
+ {
+ "type": "property",
+ "name": "datasetProject"
+ },
+ {
+ "type": "property",
+ "name": "serviceAccountType"
+ }
+ ]
+ },
+ {
+ "name": "showConnectionId",
+ "condition": {
+ "expression": "useConnection == true"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "connection"
+ }
+ ]
+ }
+ ],
+ "jump-config": {
+ "datasets": [
+ {
+ "ref-property-name": "referenceName"
+ }
+ ]
+ }
+}