Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@
<spark3.version>3.1.1</spark3.version>
<spark-bq-connector.version>0.23.1</spark-bq-connector.version>
<testSourceLocation>${project.basedir}/src/test/java/</testSourceLocation>

<!--FC related properties-->
<version.cloud-metadata-provider>0.0.2-SNAPSHOT</version.cloud-metadata-provider>
<version.cloud-metadata-common>0.0.2-SNAPSHOT</version.cloud-metadata-common>
<apache.avro.version>1.11.3</apache.avro.version>
<version.commons-lang3>3.17.0</version.commons-lang3>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -782,6 +788,39 @@
<version>0.2.0</version>
</dependency>
<!-- End: dependency used by the Dataplex connector -->
<!-- Start: FC related dependencies -->
<dependency>
<groupId>ai.festcloud</groupId>
<artifactId>cloud-metadata-provider</artifactId>
<version>${version.cloud-metadata-provider}</version>
</dependency>
<dependency>
<groupId>ai.festcloud</groupId>
<artifactId>cloud-metadata-model</artifactId>
<version>${version.cloud-metadata-common}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache.avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${version.commons-lang3}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
<dependency>
<groupId>ai.festcloud</groupId>
<artifactId>cloud-data-plugins-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

<!-- End: FC related dependencies -->
</dependencies>

<build>
Expand Down Expand Up @@ -867,6 +906,11 @@
org.apache.hadoop.hbase.mapreduce.*;
org.apache.hadoop.hbase.security.token.*;
com.google.cloud.spark.bigquery.*;
org.apache.commons.collections4.*;
ai.festcloud.model.*;
ai.festcloud.datafabric.plugins.common.integrity.*;
org.apache.avro.*;
org.h2.*;
</_exportcontents>
</instructions>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package io.cdap.plugin.gcp.bigquery.fctransform;

import ai.festcloud.datafabric.plugins.common.integrity.CDAPUtils;
import ai.festcloud.datafabric.plugins.common.integrity.IntegrityService;
import ai.festcloud.datafabric.plugins.common.integrity.IntegrityServiceBQ;
import ai.festcloud.datafabric.plugins.common.integrity.MetadataUtils;
import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingEntryConfig;
import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingObj;
import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingParsingService;
import ai.festcloud.metadata.model.TypeRecord;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.InvalidEntry;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageSubmitterContext;
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.TransformContext;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type = Transform.PLUGIN_TYPE)
@Name("BigQueryMdmIntegrityValidation")
@Description("Verify whether the requested values are present in MDM and add new specified field.")
public class MdmIntegrityBigQueryTransformer extends Transform<StructuredRecord, StructuredRecord> {

private static final Logger LOG = LoggerFactory.getLogger(MdmIntegrityBigQueryTransformer.class);

private static final String OPERATION = "operation";
private static final String OPERATION_CREATE = "create";
private static final String OPERATION_UPDATE = "update";

private final MdmIntegrityBigQueryTransformerConfig config;
private Schema outputSchema;

private MappingObj mapping;
private Map<String, TypeRecord> entities;
private IntegrityService integrityService;
private boolean containsOperationField = false;

public MdmIntegrityBigQueryTransformer(MdmIntegrityBigQueryTransformerConfig config) {
this.config = config;
}

@Override
public void initialize(TransformContext context) throws Exception {
LOG.info("Initializing BigQuery integrity validation...");
super.initialize(context);

FailureCollector failureCollector = context.getFailureCollector();
outputSchema = config.getSchema(failureCollector);

String configServerUrl = context.getArguments()
.get(MetadataUtils.CONFIGSERVER_METADATA_SCHEMA_URL);
String metadataRootPath = context.getArguments().get(MetadataUtils.METADATA_ROOT_PATH);
entities = MetadataUtils.getTypeRecordByUrl(configServerUrl,
metadataRootPath);
config.validate(failureCollector, entities, context.getInputSchema());

MappingParsingService mappingParsingService
= new MappingParsingService(config.getMapping(),
config.getFullyQualifiedEntityName(),
failureCollector,
entities,
outputSchema);
Optional<MappingObj> mappingOpt = mappingParsingService.getMapping();
mapping = mappingOpt.orElse(null);

Credentials credentials = config.getConnection().getCredentials(failureCollector);
BigQuery bigQuery = GCPUtils.getBigQuery(config.getConnection().getProject(), credentials);

failureCollector.getOrThrowException();

integrityService = new IntegrityServiceBQ(bigQuery, entities, mapping);
containsOperationField = outputSchema.getFields()
.stream().anyMatch(field -> field.getName().equals(OPERATION));

LOG.info("BigQueryMdmIntegrityValidation initialized.");
}

@Override
public void onRunFinish(boolean succeeded, StageSubmitterContext context) {
super.onRunFinish(succeeded, context);

}

@Override
public void destroy() {
super.destroy();
try {
integrityService.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(collector));
}


@Override
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter)
throws Exception {
try {
StructuredRecord structuredRecord = fillIds(input);
emitter.emit(structuredRecord);
} catch (Exception e) {
emitter.emitError(new InvalidEntry<>(MetadataUtils.ERROR_CODE, e.getMessage(), input));
}
}

private StructuredRecord fillIds(StructuredRecord input) {

Map<String, Object> result = new HashMap<>();
Map<String, List<MappingEntryConfig>> mappingEntryConfigs = mapping.getMappingEntryConfigs();

mappingEntryConfigs.forEach((targetFieldName, mappingEntryConfig) -> {

for (MappingEntryConfig entryConfig : mappingEntryConfig) {
List<String> ids = integrityService.getIds(entryConfig, input);
if (ids.size() > 1) {
throw new RuntimeException(
"More than one id found for request: " + entryConfig.toString());
}
if (ids.size() == 1) {
result.put(targetFieldName, ids.get(0));
break;
}
}
});
if (result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null && config.getFcidRequired()) {
throw new RuntimeException("ID is required but not provided.");
}

if (containsOperationField) {
String operationType = result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null
? OPERATION_CREATE
: OPERATION_UPDATE;
result.put(OPERATION, operationType);
}

return setValuesToTargetFields(input, result);
}


private StructuredRecord setValuesToTargetFields(StructuredRecord input,
Map<String, Object> values) {
StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
setFieldValues(input, values, builder, outputSchema);
return builder.build();
}

private void setFieldValues(StructuredRecord input,
Map<String, Object> values,
StructuredRecord.Builder builder,
Schema schema) {
for (Schema.Field field : schema.getFields()) {
String fieldName = field.getName();
Object fieldValue = input.get(fieldName);

if (CDAPUtils.isRecordType(field) && fieldValue != null) {
StructuredRecord nestedRecord = (StructuredRecord) fieldValue;
Schema nestedSchema = CDAPUtils.getNonNullableSchema(field.getSchema());

StructuredRecord.Builder nestedBuilder = StructuredRecord.builder(nestedSchema);
setFieldValues(nestedRecord, values, nestedBuilder, nestedSchema);
builder.set(fieldName, nestedBuilder.build());
} else {
builder.set(fieldName, values.getOrDefault(fieldName, fieldValue));
}
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package io.cdap.plugin.gcp.bigquery.fctransform;


import ai.festcloud.datafabric.plugins.common.integrity.MetadataUtils;
import ai.festcloud.metadata.model.TypeField;
import ai.festcloud.metadata.model.TypeRecord;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

public class MdmIntegrityBigQueryTransformerConfig extends PluginConfig {

public static final String MAPPING = "mapping";
public static final String FULLY_QUALIFIED_ENTITY_NAME = "fullyQualifiedEntityName";
public static final String SCHEMA = "schema";


@Name(ConfigUtil.NAME_CONNECTION)
@Macro
@Nullable
@Description("The existing connection to use.")
public BigQueryConnectorConfig connection;


@Name(ConfigUtil.NAME_USE_CONNECTION)
@Nullable
@Description("Whether to use an existing connection.")
public Boolean useConnection;

@Name(MAPPING)
@Description("Properties to validate")
@Macro
private final String mapping;

@Name(FULLY_QUALIFIED_ENTITY_NAME)
@Description("Metadata server url")
@Macro
private final String fullyQualifiedEntityName;

@Name("fcidRequired")
@Description("Indicates whether FestCloudID is required. "
+ "If true, records without a FestCloudID are sent to the error flow. "
+ "If false, records without a FestCloudID are processed and not sent to the error flow.")
@Macro
private final Boolean fcidRequired;

@Name(SCHEMA)
@Description("Schema of the output records.")
@Macro
private final String schema;


public MdmIntegrityBigQueryTransformerConfig(BigQueryConnectorConfig connection,
Boolean useConnection, String mapping,
String fullyQualifiedEntityName,
Boolean fcidRequired, String schema) {
this.connection = connection;
this.useConnection = useConnection;
this.mapping = mapping;
this.fullyQualifiedEntityName = fullyQualifiedEntityName;
this.fcidRequired = fcidRequired;
this.schema = schema;
}


public void validate(FailureCollector collector, Map<String, TypeRecord> entities,
Schema outputSchema) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
TypeRecord typeRecord = entities.get(fullyQualifiedEntityName);
List<TypeField> fields = typeRecord.getFields();

fields.stream().filter(MetadataUtils::integrityRequired).map(MetadataUtils::getFieldName)
.forEach(fieldName -> validateField(fieldName, outputSchema, collector));
}

private void validateField(String fieldName, Schema outputSchema, FailureCollector collector) {
if (outputSchema.getField(fieldName) == null) {
collector.addFailure(String.format("Can't find field %s in output record", fieldName),
String.format(
"Field %s mandatory for integrity validation according to metadata definition",
fieldName));
}
}

public Schema getSchema(FailureCollector collector) {
try {
return Schema.parseJson(schema);
} catch (IOException e) {
collector.addFailure(String.format("Failed to parse schema: %s", schema), null);
throw collector.getOrThrowException();
}
}

@Nullable
public BigQueryConnectorConfig getConnection() {
return connection;
}

public void setConnection(@Nullable BigQueryConnectorConfig connection) {
this.connection = connection;
}

@Nullable
public Boolean getUseConnection() {
return useConnection;
}

public void setUseConnection(@Nullable Boolean useConnection) {
this.useConnection = useConnection;
}

public String getFullyQualifiedEntityName() {
return fullyQualifiedEntityName;
}

public Boolean getFcidRequired() {
return fcidRequired;
}

public String getSchema() {
return schema;
}

public String getMapping() {
return mapping;
}
}

Loading