diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java b/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java new file mode 100644 index 000000000..3aee9c749 --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java @@ -0,0 +1,69 @@ +package io.odpf.dagger.common.core; + +import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.exceptions.DaggerContextException; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The DaggerContext singleton object. + * It initializes with StreamExecutionEnvironment, StreamTableEnvironment and Configuration. + */ +public class DaggerContext { + private static final Logger LOGGER = LoggerFactory.getLogger(DaggerContext.class.getName()); + private static volatile DaggerContext daggerContext = null; + private final StreamExecutionEnvironment executionEnvironment; + private final StreamTableEnvironment tableEnvironment; + private final Configuration configuration; + + /** + * Instantiates a new DaggerContext. + * + * @param configuration the Configuration + */ + private DaggerContext(Configuration configuration) { + this.executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); + tableEnvironment = StreamTableEnvironment.create(executionEnvironment, environmentSettings); + this.configuration = configuration; + } + + /** + * Get the instance of DaggerContext. + */ + public static DaggerContext getInstance() { + if (daggerContext == null) { + throw new DaggerContextException("DaggerContext object is not initialized"); + } + return daggerContext; + } + + /** + * Initialization of a new DaggerContext. + * + * @param configuration the Configuration + */ + public static synchronized DaggerContext init(Configuration configuration) { + if (daggerContext != null) { + throw new DaggerContextException("DaggerContext object is already initialized"); + } + daggerContext = new DaggerContext(configuration); + LOGGER.info("DaggerContext is initialized"); + return daggerContext; + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return executionEnvironment; + } + + public StreamTableEnvironment getTableEnvironment() { + return tableEnvironment; + } + + public Configuration getConfiguration() { + return configuration; + } +} diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java b/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java new file mode 100644 index 000000000..b0e79da2a --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java @@ -0,0 +1,16 @@ +package io.odpf.dagger.common.exceptions; + +/** + * The class Exception if there is something wrong with Dagger context object. + */ +public class DaggerContextException extends RuntimeException { + + /** + * Instantiates a new Dagger context exception with specified error message. + * + * @param message the message + */ + public DaggerContextException(String message) { + super(message); + } +} diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/core/DaggerContextTestBase.java b/dagger-common/src/test/java/io/odpf/dagger/common/core/DaggerContextTestBase.java new file mode 100644 index 000000000..19608a028 --- /dev/null +++ b/dagger-common/src/test/java/io/odpf/dagger/common/core/DaggerContextTestBase.java @@ -0,0 +1,60 @@ +package io.odpf.dagger.common.core; + +import io.odpf.dagger.common.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.junit.After; +import org.junit.Before; + +import java.lang.reflect.Field; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public abstract class DaggerContextTestBase { + + protected DaggerContext daggerContext; + + protected DataStream inputStream; + + protected StreamExecutionEnvironment streamExecutionEnvironment; + + protected StreamTableEnvironment streamTableEnvironment; + + protected Configuration configuration; + + @Before + public void setupBase() { + daggerContext = mock(DaggerContext.class); + configuration = mock(Configuration.class); + inputStream = mock(DataStream.class); + streamExecutionEnvironment = mock(StreamExecutionEnvironment.class); + streamTableEnvironment = mock(StreamTableEnvironment.class); + initMocks(this); + setMock(daggerContext); + when(daggerContext.getConfiguration()).thenReturn(configuration); + when(inputStream.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getTableEnvironment()).thenReturn(streamTableEnvironment); + } + + @After + public void resetSingletonBase() throws Exception { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(null, null); + } + + private void setMock(DaggerContext mock) { + try { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(instance, mock); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java b/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java index 41e79e232..0001b1326 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java @@ -4,9 +4,7 @@ import io.odpf.dagger.core.config.ConfigurationProvider; import io.odpf.dagger.core.config.ConfigurationProviderFactory; import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import io.odpf.dagger.common.core.DaggerContext; import java.util.TimeZone; @@ -26,11 +24,8 @@ public static void main(String[] args) throws ProgramInvocationException { ConfigurationProvider provider = new ConfigurationProviderFactory(args).provider(); Configuration configuration = provider.get(); TimeZone.setDefault(TimeZone.getTimeZone("UTC")); - StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, environmentSettings); - - StreamManager streamManager = new StreamManager(configuration, executionEnvironment, tableEnvironment); + DaggerContext daggerContext = DaggerContext.init(configuration); + StreamManager streamManager = new StreamManager(daggerContext); streamManager .registerConfigs() .registerSourceWithPreProcessors() diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java index ea6b4f4ee..72df6b6ce 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java @@ -1,6 +1,7 @@ package io.odpf.dagger.core; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.udfs.UdfFactory; @@ -11,7 +12,6 @@ import io.odpf.dagger.core.exception.UDFFactoryClassNotDefinedException; import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter; import io.odpf.dagger.core.processors.PostProcessorFactory; -import io.odpf.dagger.core.processors.PreProcessorConfig; import io.odpf.dagger.core.processors.PreProcessorFactory; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.types.PostProcessor; @@ -53,17 +53,18 @@ public class StreamManager { private StencilClientOrchestrator stencilClientOrchestrator; private DaggerStatsDReporter daggerStatsDReporter; + private final DaggerContext daggerContext; + /** * Instantiates a new Stream manager. * - * @param configuration the configuration in form of param - * @param executionEnvironment the execution environment - * @param tableEnvironment the table environment + * @param daggerContext the daggerContext in form of param */ - public StreamManager(Configuration configuration, StreamExecutionEnvironment executionEnvironment, StreamTableEnvironment tableEnvironment) { - this.configuration = configuration; - this.executionEnvironment = executionEnvironment; - this.tableEnvironment = tableEnvironment; + public StreamManager(DaggerContext daggerContext) { + this.daggerContext = daggerContext; + this.configuration = daggerContext.getConfiguration(); + this.executionEnvironment = daggerContext.getExecutionEnvironment(); + this.tableEnvironment = daggerContext.getTableEnvironment(); } /** @@ -99,7 +100,6 @@ public StreamManager registerConfigs() { public StreamManager registerSourceWithPreProcessors() { long watermarkDelay = configuration.getLong(FLINK_WATERMARK_DELAY_MS_KEY, FLINK_WATERMARK_DELAY_MS_DEFAULT); Boolean enablePerPartitionWatermark = configuration.getBoolean(FLINK_WATERMARK_PER_PARTITION_ENABLE_KEY, FLINK_WATERMARK_PER_PARTITION_ENABLE_DEFAULT); - PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration); StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter) .forEach(stream -> { String tableName = stream.getStreamName(); @@ -112,7 +112,7 @@ public StreamManager registerSourceWithPreProcessors() { TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType()); StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames()); - streamInfo = addPreProcessor(streamInfo, tableName, preProcessorConfig); + streamInfo = addPreProcessor(streamInfo, tableName); Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo)); tableEnvironment.createTemporaryView(tableName, table); @@ -211,15 +211,15 @@ protected StreamInfo createStreamInfo(Table table) { } private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); for (PostProcessor postProcessor : postProcessors) { streamInfo = postProcessor.process(streamInfo); } return streamInfo; } - private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName, PreProcessorConfig preProcessorConfig) { - List preProcessors = PreProcessorFactory.getPreProcessors(configuration, preProcessorConfig, tableName, telemetryExporter); + private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName) { + List preProcessors = PreProcessorFactory.getPreProcessors(daggerContext, tableName, telemetryExporter); for (Preprocessor preprocessor : preProcessors) { streamInfo = preprocessor.process(streamInfo); } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/ParentPostProcessor.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/ParentPostProcessor.java index 12875b26a..0373155e2 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/ParentPostProcessor.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/ParentPostProcessor.java @@ -1,5 +1,6 @@ package io.odpf.dagger.core.processors; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; @@ -26,23 +27,29 @@ */ public class ParentPostProcessor implements PostProcessor { private final PostProcessorConfig postProcessorConfig; - private Configuration configuration; + private final StencilClientOrchestrator stencilClientOrchestrator; private TelemetrySubscriber telemetrySubscriber; + private final DaggerContext daggerContext; + /** * Instantiates a new Parent post processor. * - * @param postProcessorConfig the post processor config - * @param configuration the configuration + * @param daggerContext the daggerContext * @param stencilClientOrchestrator the stencil client orchestrator * @param telemetrySubscriber the telemetry subscriber */ - public ParentPostProcessor(PostProcessorConfig postProcessorConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, TelemetrySubscriber telemetrySubscriber) { - this.postProcessorConfig = postProcessorConfig; - this.configuration = configuration; + public ParentPostProcessor(DaggerContext daggerContext, StencilClientOrchestrator stencilClientOrchestrator, TelemetrySubscriber telemetrySubscriber) { + this.daggerContext = daggerContext; this.stencilClientOrchestrator = stencilClientOrchestrator; this.telemetrySubscriber = telemetrySubscriber; + this.postProcessorConfig = parsePostProcessorConfig(daggerContext.getConfiguration()); + } + + private static PostProcessorConfig parsePostProcessorConfig(Configuration configuration) { + String postProcessorConfigString = configuration.getString(Constants.PROCESSOR_POSTPROCESSOR_CONFIG_KEY, ""); + return PostProcessorConfig.parse(postProcessorConfigString); } @Override @@ -56,7 +63,7 @@ public StreamInfo process(StreamInfo streamInfo) { InitializationDecorator initializationDecorator = new InitializationDecorator(columnNameManager); resultStream = initializationDecorator.decorate(resultStream); streamInfo = new StreamInfo(resultStream, streamInfo.getColumnNames()); - SchemaConfig schemaConfig = new SchemaConfig(configuration, stencilClientOrchestrator, columnNameManager); + SchemaConfig schemaConfig = new SchemaConfig(daggerContext.getConfiguration(), stencilClientOrchestrator, columnNameManager); List enabledPostProcessors = getEnabledPostProcessors(telemetrySubscriber, schemaConfig); for (PostProcessor postProcessor : enabledPostProcessors) { @@ -66,7 +73,7 @@ public StreamInfo process(StreamInfo streamInfo) { FetchOutputDecorator fetchOutputDecorator = new FetchOutputDecorator(schemaConfig, postProcessorConfig.hasSQLTransformer()); resultStream = fetchOutputDecorator.decorate(streamInfo.getDataStream()); StreamInfo resultantStreamInfo = new StreamInfo(resultStream, columnNameManager.getOutputColumnNames()); - TransformProcessor transformProcessor = new TransformProcessor(postProcessorConfig.getTransformers(), configuration); + TransformProcessor transformProcessor = new TransformProcessor(postProcessorConfig.getTransformers(), daggerContext); if (transformProcessor.canProcess(postProcessorConfig)) { transformProcessor.notifySubscriber(telemetrySubscriber); resultantStreamInfo = transformProcessor.process(resultantStreamInfo); @@ -80,11 +87,11 @@ public boolean canProcess(PostProcessorConfig config) { } private List getEnabledPostProcessors(TelemetrySubscriber subscriber, SchemaConfig schemaConfig) { - if (!configuration.getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) { + if (!daggerContext.getConfiguration().getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) { return new ArrayList<>(); } - ExternalMetricConfig externalMetricConfig = getExternalMetricConfig(configuration, subscriber); + ExternalMetricConfig externalMetricConfig = getExternalMetricConfig(daggerContext.getConfiguration(), subscriber); ArrayList processors = new ArrayList<>(); processors.add(new ExternalPostProcessor(schemaConfig, postProcessorConfig.getExternalSource(), externalMetricConfig)); processors.add(new InternalPostProcessor(postProcessorConfig, schemaConfig)); diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PostProcessorFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PostProcessorFactory.java index 19b950bcd..b7480b282 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PostProcessorFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PostProcessorFactory.java @@ -1,6 +1,7 @@ package io.odpf.dagger.core.processors; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.core.processors.longbow.LongbowFactory; import io.odpf.dagger.core.processors.longbow.LongbowSchema; @@ -22,22 +23,22 @@ public class PostProcessorFactory { /** * Gets post processors. * - * @param configuration the configuration + * @param daggerContext the daggerContext * @param stencilClientOrchestrator the stencil client orchestrator * @param columnNames the column names * @param metricsTelemetryExporter the metrics telemetry exporter * @return the post processors */ - public static List getPostProcessors(Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, String[] columnNames, MetricsTelemetryExporter metricsTelemetryExporter) { + public static List getPostProcessors(DaggerContext daggerContext, StencilClientOrchestrator stencilClientOrchestrator, String[] columnNames, MetricsTelemetryExporter metricsTelemetryExporter) { List postProcessors = new ArrayList<>(); if (Arrays.stream(columnNames).anyMatch(s -> Pattern.compile(".*\\blongbow.*key\\b.*").matcher(s).find())) { - postProcessors.add(getLongBowProcessor(columnNames, configuration, metricsTelemetryExporter, stencilClientOrchestrator)); + postProcessors.add(getLongBowProcessor(columnNames, daggerContext.getConfiguration(), metricsTelemetryExporter, stencilClientOrchestrator)); } - if (configuration.getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) { - postProcessors.add(new ParentPostProcessor(parsePostProcessorConfig(configuration), configuration, stencilClientOrchestrator, metricsTelemetryExporter)); + if (daggerContext.getConfiguration().getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) { + postProcessors.add(new ParentPostProcessor(daggerContext, stencilClientOrchestrator, metricsTelemetryExporter)); } - if (configuration.getBoolean(Constants.METRIC_TELEMETRY_ENABLE_KEY, Constants.METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT)) { + if (daggerContext.getConfiguration().getBoolean(Constants.METRIC_TELEMETRY_ENABLE_KEY, Constants.METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT)) { postProcessors.add(new TelemetryProcessor(metricsTelemetryExporter)); } return postProcessors; @@ -49,9 +50,4 @@ private static PostProcessor getLongBowProcessor(String[] columnNames, Configura return longbowFactory.getLongbowProcessor(); } - - private static PostProcessorConfig parsePostProcessorConfig(Configuration configuration) { - String postProcessorConfigString = configuration.getString(Constants.PROCESSOR_POSTPROCESSOR_CONFIG_KEY, ""); - return PostProcessorConfig.parse(postProcessorConfigString); - } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorFactory.java index 6e79136c0..ed411f1c1 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorFactory.java @@ -1,14 +1,8 @@ package io.odpf.dagger.core.processors; -import com.google.gson.FieldNamingPolicy; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonSyntaxException; -import com.jayway.jsonpath.InvalidJsonException; -import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.types.Preprocessor; -import io.odpf.dagger.core.utils.Constants; import java.util.Collections; import java.util.List; @@ -17,38 +11,15 @@ * The factory class for Preprocessor. */ public class PreProcessorFactory { - private static final Gson GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); - - /** - * Parse config preprocessor config. - * - * @param configuration the configuration - * @return the preprocessor config - */ - public static PreProcessorConfig parseConfig(Configuration configuration) { - if (!configuration.getBoolean(Constants.PROCESSOR_PREPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)) { - return null; - } - String configJson = configuration.getString(Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY, ""); - PreProcessorConfig config; - try { - config = GSON.fromJson(configJson, PreProcessorConfig.class); - } catch (JsonSyntaxException exception) { - throw new InvalidJsonException("Invalid JSON Given for " + Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY); - } - return config; - } - /** * Gets preprocessors. * - * @param configuration the configuration - * @param processorConfig the processor config + * @param daggerContext the daggerContext * @param tableName the table name * @param metricsTelemetryExporter the metrics telemetry exporter * @return the preprocessors */ - public static List getPreProcessors(Configuration configuration, PreProcessorConfig processorConfig, String tableName, MetricsTelemetryExporter metricsTelemetryExporter) { - return Collections.singletonList(new PreProcessorOrchestrator(configuration, processorConfig, metricsTelemetryExporter, tableName)); + public static List getPreProcessors(DaggerContext daggerContext, String tableName, MetricsTelemetryExporter metricsTelemetryExporter) { + return Collections.singletonList(new PreProcessorOrchestrator(daggerContext, metricsTelemetryExporter, tableName)); } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java index e3ad929a3..4adb63f56 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java @@ -1,12 +1,19 @@ package io.odpf.dagger.core.processors; +import com.google.gson.FieldNamingPolicy; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonSyntaxException; +import com.jayway.jsonpath.InvalidJsonException; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.core.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.core.processors.common.ValidRecordsDecorator; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.transformers.TransformProcessor; import io.odpf.dagger.core.processors.types.Preprocessor; +import io.odpf.dagger.core.utils.Constants; import java.util.ArrayList; import java.util.List; @@ -17,32 +24,52 @@ public class PreProcessorOrchestrator implements Preprocessor { private final MetricsTelemetryExporter metricsTelemetryExporter; - private Configuration configuration; private final PreProcessorConfig processorConfig; private final String tableName; + private final DaggerContext daggerContext; + private static final Gson GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); /** * Instantiates a new Preprocessor orchestrator. * - * @param configuration the configuration - * @param processorConfig the processor config + * @param daggerContext the daggerContext * @param metricsTelemetryExporter the metrics telemetry exporter * @param tableName the table name */ - public PreProcessorOrchestrator(Configuration configuration, PreProcessorConfig processorConfig, MetricsTelemetryExporter metricsTelemetryExporter, String tableName) { - this.configuration = configuration; - this.processorConfig = processorConfig; + public PreProcessorOrchestrator(DaggerContext daggerContext, MetricsTelemetryExporter metricsTelemetryExporter, String tableName) { + this.daggerContext = daggerContext; + this.processorConfig = parseConfig(daggerContext.getConfiguration()); this.metricsTelemetryExporter = metricsTelemetryExporter; this.tableName = tableName; } + /** + * Parse config preprocessor config. + * + * @param configuration the configuration + * @return the preprocessor config + */ + public PreProcessorConfig parseConfig(Configuration configuration) { + if (!configuration.getBoolean(Constants.PROCESSOR_PREPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)) { + return null; + } + String configJson = configuration.getString(Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY, ""); + PreProcessorConfig config; + try { + config = GSON.fromJson(configJson, PreProcessorConfig.class); + } catch (JsonSyntaxException exception) { + throw new InvalidJsonException("Invalid JSON Given for " + Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY); + } + return config; + } + @Override public StreamInfo process(StreamInfo streamInfo) { for (Preprocessor processor : getProcessors()) { streamInfo = processor.process(streamInfo); } return new StreamInfo( - new ValidRecordsDecorator(tableName, streamInfo.getColumnNames(), configuration) + new ValidRecordsDecorator(tableName, streamInfo.getColumnNames(), daggerContext.getConfiguration()) .decorate(streamInfo.getDataStream()), streamInfo.getColumnNames()); } @@ -64,7 +91,7 @@ protected List getProcessors() { elem.getTableName(), TelemetryTypes.PRE_PROCESSOR_TYPE, elem.getTransformers(), - configuration); + daggerContext); processor.notifySubscriber(metricsTelemetryExporter); preprocessors.add(processor); }); diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/transformers/TransformProcessor.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/transformers/TransformProcessor.java index 642b83400..ba5cff334 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/transformers/TransformProcessor.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/transformers/TransformProcessor.java @@ -1,6 +1,6 @@ package io.odpf.dagger.core.processors.transformers; -import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.core.exception.TransformClassNotDefinedException; @@ -37,16 +37,16 @@ public String getTableName() { protected final String tableName; private final Map> metrics = new HashMap<>(); protected final TelemetryTypes type; - private Configuration configuration; + private final DaggerContext daggerContext; /** * Instantiates a new Transform processor. * * @param transformConfigs the transform configs - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public TransformProcessor(List transformConfigs, Configuration configuration) { - this("NULL", TelemetryTypes.POST_PROCESSOR_TYPE, transformConfigs, configuration); + public TransformProcessor(List transformConfigs, DaggerContext daggerContext) { + this("NULL", TelemetryTypes.POST_PROCESSOR_TYPE, transformConfigs, daggerContext); } /** @@ -54,13 +54,13 @@ public TransformProcessor(List transformConfigs, Configuration * * @param tableName the table name * @param type the type - * @param configuration the configuration + * @param daggerContext the configuration */ - public TransformProcessor(String tableName, TelemetryTypes type, List transformConfigs, Configuration configuration) { + public TransformProcessor(String tableName, TelemetryTypes type, List transformConfigs, DaggerContext daggerContext) { this.transformConfigs = transformConfigs == null ? new ArrayList<>() : transformConfigs; this.tableName = tableName; this.type = type; - this.configuration = configuration; + this.daggerContext = daggerContext; TransformerUtils.populateDefaultArguments(this); } @@ -126,7 +126,7 @@ private void addMetric(String key, String value) { */ protected Transformer getTransformMethod(TransformConfig transformConfig, String className, String[] columnNames) throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { Class transformerClass = Class.forName(className); - Constructor transformerClassConstructor = transformerClass.getConstructor(Map.class, String[].class, Configuration.class); - return (Transformer) transformerClassConstructor.newInstance(transformConfig.getTransformationArguments(), columnNames, configuration); + Constructor transformerClassConstructor = transformerClass.getConstructor(Map.class, String[].class, DaggerContext.class); + return (Transformer) transformerClassConstructor.newInstance(transformConfig.getTransformationArguments(), columnNames, daggerContext); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/StreamManagerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/StreamManagerTest.java index 652daf9db..0194a7b88 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/StreamManagerTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/StreamManagerTest.java @@ -1,6 +1,7 @@ package io.odpf.dagger.core; -import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContextTestBase; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StreamInfo; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -11,13 +12,11 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.junit.Before; import org.junit.Test; @@ -38,7 +37,7 @@ @PrepareForTest(TableSchema.class) @RunWith(PowerMockRunner.class) -public class StreamManagerTest { +public class StreamManagerTest extends DaggerContextTestBase { private StreamManager streamManager; @@ -55,9 +54,6 @@ public class StreamManagerTest { + " }\n" + "]"; - @Mock - private StreamExecutionEnvironment env; - @Mock private ExecutionConfig executionConfig; @@ -67,9 +63,6 @@ public class StreamManagerTest { @Mock private CheckpointConfig checkpointConfig; - @Mock - private StreamTableEnvironment tableEnvironment; - @Mock private DataStream dataStream; @@ -85,10 +78,6 @@ public class StreamManagerTest { @Mock private TableSchema schema; - @Mock - private Configuration configuration; - - @Mock private org.apache.flink.configuration.Configuration flinkConfiguration; @@ -113,26 +102,26 @@ public void setup() { when(configuration.getString("FLINK_SQL_QUERY", "")).thenReturn(""); when(configuration.getInteger("FLINK_RETENTION_IDLE_STATE_MINUTE", 10)).thenReturn(10); when(flinkConfiguration.getString(any())).thenReturn("10"); - when(env.getConfig()).thenReturn(executionConfig); - when(env.getConfiguration()).thenReturn(flinkConfiguration); - when(env.getCheckpointConfig()).thenReturn(checkpointConfig); - when(env.addSource(any(FlinkKafkaConsumerBase.class))).thenReturn(source); - when(tableEnvironment.getConfig()).thenReturn(tableConfig); - when(env.fromSource(any(KafkaSource.class), any(WatermarkStrategy.class), any(String.class))).thenReturn(source); + when(streamExecutionEnvironment.getConfig()).thenReturn(executionConfig); + when(streamExecutionEnvironment.getConfiguration()).thenReturn(flinkConfiguration); + when(streamExecutionEnvironment.getCheckpointConfig()).thenReturn(checkpointConfig); + when(streamExecutionEnvironment.addSource(any(FlinkKafkaConsumerBase.class))).thenReturn(source); + when(streamTableEnvironment.getConfig()).thenReturn(tableConfig); + when(streamExecutionEnvironment.fromSource(any(KafkaSource.class), any(WatermarkStrategy.class), any(String.class))).thenReturn(source); when(source.getType()).thenReturn(typeInformation); when(typeInformation.getTypeClass()).thenReturn(Row.class); when(schema.getFieldNames()).thenReturn(new String[0]); PowerMockito.mockStatic(TableSchema.class); when(TableSchema.fromTypeInfo(typeInformation)).thenReturn(schema); - streamManager = new StreamManager(configuration, env, tableEnvironment); + streamManager = new StreamManager(daggerContext); } @Test public void shouldRegisterRequiredConfigsOnExecutionEnvironment() { streamManager.registerConfigs(); - verify(env, Mockito.times(1)).setParallelism(1); - verify(env, Mockito.times(1)).enableCheckpointing(30000); + verify(streamExecutionEnvironment, Mockito.times(1)).setParallelism(1); + verify(streamExecutionEnvironment, Mockito.times(1)).enableCheckpointing(30000); verify(executionConfig, Mockito.times(1)).setAutoWatermarkInterval(10000); verify(executionConfig, Mockito.times(1)).setGlobalJobParameters(configuration.getParam()); verify(checkpointConfig, Mockito.times(1)).setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); @@ -147,33 +136,33 @@ public void shouldRegisterSourceWithPreprocessorsWithWaterMarks() { when(source.assignTimestampsAndWatermarks(any(WatermarkStrategy.class))).thenReturn(singleOutputStream); when(singleOutputStream.getType()).thenReturn(typeInformation); - StreamManagerStub streamManagerStub = new StreamManagerStub(configuration, env, tableEnvironment, new StreamInfo(dataStream, new String[]{})); + StreamManagerStub streamManagerStub = new StreamManagerStub(daggerContext, new StreamInfo(dataStream, new String[]{})); streamManagerStub.registerConfigs(); streamManagerStub.registerSourceWithPreProcessors(); - verify(tableEnvironment, Mockito.times(1)).fromDataStream(any(), new ApiExpression[]{}); + verify(streamTableEnvironment, Mockito.times(1)).fromDataStream(any(), new ApiExpression[]{}); } @Test public void shouldCreateOutputStream() { - StreamManagerStub streamManagerStub = new StreamManagerStub(configuration, env, tableEnvironment, new StreamInfo(dataStream, new String[]{})); + StreamManagerStub streamManagerStub = new StreamManagerStub(daggerContext, new StreamInfo(dataStream, new String[]{})); streamManagerStub.registerOutputStream(); - verify(tableEnvironment, Mockito.times(1)).sqlQuery(""); + verify(streamTableEnvironment, Mockito.times(1)).sqlQuery(""); } @Test public void shouldExecuteJob() throws Exception { streamManager.execute(); - verify(env, Mockito.times(1)).execute("SQL Flink job"); + verify(streamExecutionEnvironment, Mockito.times(1)).execute("SQL Flink job"); } final class StreamManagerStub extends StreamManager { private final StreamInfo streamInfo; - private StreamManagerStub(Configuration configuration, StreamExecutionEnvironment executionEnvironment, StreamTableEnvironment tableEnvironment, StreamInfo streamInfo) { - super(configuration, executionEnvironment, tableEnvironment); + private StreamManagerStub(DaggerContext daggerContext, StreamInfo streamInfo) { + super(daggerContext); this.streamInfo = streamInfo; } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java index 01d72ab2b..0125fe382 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java @@ -1,6 +1,8 @@ package io.odpf.dagger.core.processors; +import io.odpf.dagger.common.core.DaggerContextTestBase; import io.odpf.dagger.core.metrics.telemetry.TelemetrySubscriber; +import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -8,20 +10,25 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ParentPostProcessorTest { +public class ParentPostProcessorTest extends DaggerContextTestBase { @Mock private TelemetrySubscriber telemetrySubscriber; + @Before + public void init() { + configuration = null; + } + @Test public void shouldNotBeAbleToProcessWhenConfigIsNull() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor(daggerContext, null, telemetrySubscriber); assertFalse(parentPostProcessor.canProcess(null)); } @Test public void shouldNotBeAbleToProcessWhenConfigIsEmpty() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor(daggerContext, null, telemetrySubscriber); PostProcessorConfig mockConfig = mock(PostProcessorConfig.class); when(mockConfig.isEmpty()).thenReturn(true); assertFalse(parentPostProcessor.canProcess(mockConfig)); @@ -29,7 +36,7 @@ public void shouldNotBeAbleToProcessWhenConfigIsEmpty() { @Test public void shouldBeAbleToProcessWhenConfigIsNotEmpty() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor(daggerContext, null, telemetrySubscriber); PostProcessorConfig mockConfig = mock(PostProcessorConfig.class); when(mockConfig.isEmpty()).thenReturn(false); assertTrue(parentPostProcessor.canProcess(mockConfig)); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorFactoryTest.java index 8f8c23f4e..54d2478a5 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorFactoryTest.java @@ -1,6 +1,6 @@ package io.odpf.dagger.core.processors; -import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContextTestBase; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.core.processors.longbow.LongbowProcessor; import io.odpf.dagger.core.processors.telemetry.TelemetryProcessor; @@ -19,9 +19,7 @@ import static org.mockito.MockitoAnnotations.initMocks; -public class PostProcessorFactoryTest { - @Mock - private Configuration configuration; +public class PostProcessorFactoryTest extends DaggerContextTestBase { @Mock private StencilClientOrchestrator stencilClientOrchestrator; @@ -58,7 +56,7 @@ public void shouldReturnLongbowProcessor() { when(configuration.getBoolean(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); when(configuration.getString(INPUT_STREAMS, "")).thenReturn(jsonArray); - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); assertEquals(1, postProcessors.size()); assertEquals(LongbowProcessor.class, postProcessors.get(0).getClass()); @@ -69,7 +67,7 @@ public void shouldReturnParentPostProcessor() { when(configuration.getString(FLINK_SQL_QUERY_KEY, FLINK_SQL_QUERY_DEFAULT)).thenReturn("test-sql"); when(configuration.getBoolean(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); assertEquals(1, postProcessors.size()); assertEquals(ParentPostProcessor.class, postProcessors.get(0).getClass()); @@ -81,7 +79,7 @@ public void shouldReturnTelemetryPostProcessor() { when(configuration.getBoolean(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); when(configuration.getBoolean(METRIC_TELEMETRY_ENABLE_KEY, METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT)).thenReturn(true); - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); assertEquals(1, postProcessors.size()); assertEquals(TelemetryProcessor.class, postProcessors.get(0).getClass()); @@ -91,7 +89,7 @@ public void shouldReturnTelemetryPostProcessor() { public void shouldNotReturnAnyPostProcessor() { when(configuration.getString(FLINK_SQL_QUERY_KEY, FLINK_SQL_QUERY_DEFAULT)).thenReturn("test-sql"); when(configuration.getBoolean(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); assertEquals(0, postProcessors.size()); } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java index ae5d9eaa6..45a92edd9 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java @@ -1,7 +1,6 @@ package io.odpf.dagger.core.processors; -import com.jayway.jsonpath.InvalidJsonException; -import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContextTestBase; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.types.Preprocessor; import org.junit.Before; @@ -14,13 +13,10 @@ import static io.odpf.dagger.core.utils.Constants.PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT; import static io.odpf.dagger.core.utils.Constants.PROCESSOR_PREPROCESSOR_ENABLE_KEY; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class PreProcessorFactoryTest { +public class PreProcessorFactoryTest extends DaggerContextTestBase { @Mock private MetricsTelemetryExporter metricsTelemetryExporter; @@ -46,9 +42,6 @@ public class PreProcessorFactoryTest { + " ]\n" + "}"; - @Mock - private Configuration configuration; - @Before public void setup() { initMocks(this); @@ -58,35 +51,7 @@ public void setup() { public void shouldReturnPreProcessors() { when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); - PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration); - assertNotNull(preProcessorConfig); - List preProcessors = PreProcessorFactory.getPreProcessors(configuration, preProcessorConfig, "booking", metricsTelemetryExporter); + List preProcessors = PreProcessorFactory.getPreProcessors(daggerContext, "booking", metricsTelemetryExporter); assertEquals(1, preProcessors.size()); } - - @Test - public void shouldParseConfig() { - when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); - when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); - PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration); - assertEquals(2, preProcessorConfig.getTableTransformers().size()); - assertEquals(2, preProcessorConfig.getTableTransformers().get(0).getTransformers().size()); - assertEquals("PreProcessorClass", preProcessorConfig.getTableTransformers().get(0).getTransformers().get(0).getTransformationClass()); - } - - @Test - public void shouldThrowExceptionForInvalidJson() { - when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); - when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn("blah"); - InvalidJsonException exception = assertThrows(InvalidJsonException.class, () -> PreProcessorFactory.parseConfig(configuration)); - assertEquals("Invalid JSON Given for PROCESSOR_PREPROCESSOR_CONFIG", exception.getMessage()); - } - - @Test - public void shouldNotParseConfigWhenDisabled() { - when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); - when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); - PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration); - assertNull(preProcessorConfig); - } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java index 85dc2e4ca..635bd84b5 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java @@ -1,9 +1,11 @@ package io.odpf.dagger.core.processors; +import com.jayway.jsonpath.InvalidJsonException; +import io.odpf.dagger.common.core.DaggerContextTestBase; +import io.odpf.dagger.core.utils.Constants; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.transformers.TableTransformConfig; @@ -20,10 +22,14 @@ import java.util.HashMap; import java.util.List; -import static org.junit.Assert.assertEquals; +import static io.odpf.dagger.core.utils.Constants.*; +import static io.odpf.dagger.core.utils.Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY; +import static org.junit.Assert.*; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class PreProcessorOrchestratorTest { +public class PreProcessorOrchestratorTest extends DaggerContextTestBase { @Mock private MetricsTelemetryExporter exporter; @@ -34,14 +40,40 @@ public class PreProcessorOrchestratorTest { @Mock private DataStream stream; - @Mock - private Configuration configuration; - @Before public void setup() { initMocks(this); } + private String preProcessorConfigJson = "{\n" + + " \"table_transformers\": [{\n" + + " \"table_name\": \"booking\",\n" + + " \"transformers\": [{\n" + + " \"transformation_class\": \"PreProcessorClass\"\n" + + " }, {\n" + + " \"transformation_class\": \"PreProcessorClass\",\n" + + " \"transformation_arguments\": {\n" + + " \"key\": \"value\"\n" + + " }\n" + + " }]\n" + + " },\n" + + " {\n" + + " \"table_name\": \"another_booking\",\n" + + " \"transformers\": [{\n" + + " \"transformation_class\": \"PreProcessorClass\"\n" + + " }]\n" + + " }\n" + + " ]\n" + + "}"; + private String preProcessorFilterConfigJson = "{\n" + + " \"table_transformers\": [{\n" + + " \"table_name\": \"test\",\n" + + " \"transformers\": [{\n" + + " \"transformation_class\": \"InvalidRecordFilterTransformer\"\n" + + " }]\n" + + " }]\n" + + "}"; + @Test public void shouldGetProcessors() { PreProcessorConfig config = new PreProcessorConfig(); @@ -49,7 +81,9 @@ public void shouldGetProcessors() { transformConfigs.add(new TransformConfig("InvalidRecordFilterTransformer", new HashMap<>())); TableTransformConfig ttc = new TableTransformConfig("test", transformConfigs); config.tableTransformers = Collections.singletonList(ttc); - PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(configuration, config, exporter, "test"); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorFilterConfigJson); + when(configuration.getBoolean(Constants.PROCESSOR_PREPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); Mockito.when(streamInfo.getColumnNames()).thenReturn(new String[0]); Mockito.when(streamInfo.getDataStream()).thenReturn(stream); @@ -61,8 +95,7 @@ public void shouldGetProcessors() { @Test public void shouldNotGetProcessors() { - PreProcessorConfig config = new PreProcessorConfig(); - PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(configuration, config, exporter, "test"); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); Mockito.when(streamInfo.getColumnNames()).thenReturn(new String[0]); Mockito.when(streamInfo.getDataStream()).thenReturn(stream); @@ -70,4 +103,42 @@ public void shouldNotGetProcessors() { assertEquals(0, processors.size()); } + + @Test + public void shouldNotNullConfig() { + when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); + PreProcessorConfig preProcessorConfig = ppo.parseConfig(configuration); + assertNotNull(preProcessorConfig); + } + + @Test + public void shouldParseConfig() { + when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); + PreProcessorConfig preProcessorConfig = ppo.parseConfig(configuration); + assertEquals(2, preProcessorConfig.getTableTransformers().size()); + assertEquals(2, preProcessorConfig.getTableTransformers().get(0).getTransformers().size()); + assertEquals("PreProcessorClass", preProcessorConfig.getTableTransformers().get(0).getTransformers().get(0).getTransformationClass()); + } + + @Test + public void shouldThrowExceptionForInvalidJson() { + when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn("blah"); + InvalidJsonException exception = assertThrows(InvalidJsonException.class, + () -> new PreProcessorOrchestrator(daggerContext, exporter, "test")); + assertEquals("Invalid JSON Given for PROCESSOR_PREPROCESSOR_CONFIG", exception.getMessage()); + } + + @Test + public void shouldNotParseConfigWhenDisabled() { + when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); + PreProcessorConfig preProcessorConfig = ppo.parseConfig(configuration); + assertNull(preProcessorConfig); + } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java index 4a8358642..0b5332f69 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java @@ -1,18 +1,18 @@ package io.odpf.dagger.core.processors.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import java.util.Map; public class MockTransformer implements Transformer, MapFunction { - public MockTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public MockTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { } @Override diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java index d797b9b1b..c18c1fb94 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java @@ -1,11 +1,9 @@ package io.odpf.dagger.core.processors.transformers; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.core.metrics.telemetry.TelemetryTypes; @@ -28,7 +26,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class TransformProcessorTest { +public class TransformProcessorTest extends DaggerContextTestBase { @Mock @@ -37,18 +35,12 @@ public class TransformProcessorTest { @Mock private StreamInfo streamInfo; - @Mock - private DataStream dataStream; - @Mock private SingleOutputStreamOperator mappedDataStream; @Mock private Transformer transformer; - @Mock - private Configuration configuration; - @Mock private MetricsTelemetryExporter metricsTelemetryExporter; @@ -59,35 +51,35 @@ public void setup() { @Test public void shouldThrowExceptionInCaseOfWrongClassName() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); transfromConfigs = new ArrayList<>(); transfromConfigs.add(new TransformConfig("wrongClassName", transformationArguments)); - TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, configuration); + TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, daggerContext); RuntimeException exception = assertThrows(RuntimeException.class, () -> transformProcessor.process(streamInfo)); assertEquals("wrongClassName", exception.getMessage()); } @Test public void shouldThrowExceptionInCaseOfWrongConstructorTypeSupported() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); transfromConfigs = new ArrayList<>(); transfromConfigs.add(new TransformConfig("io.odpf.dagger.core.processors.transformers.TransformProcessor", transformationArguments)); - TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, configuration); + TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, daggerContext); RuntimeException exception = assertThrows(RuntimeException.class, () -> transformProcessor.process(streamInfo)); - assertEquals("io.odpf.dagger.core.processors.transformers.TransformProcessor.(java.util.Map, [Ljava.lang.String;, io.odpf.dagger.common.configuration.Configuration)", exception.getMessage()); + assertEquals("io.odpf.dagger.core.processors.transformers.TransformProcessor.(java.util.Map, [Ljava.lang.String;, io.odpf.dagger.common.core.DaggerContext)", exception.getMessage()); } @Test public void shouldProcessClassExtendingMapFunction() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); @@ -102,7 +94,7 @@ public void shouldProcessClassExtendingMapFunction() { @Test public void shouldAddPostProcessorTypeMetrics() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); @@ -122,7 +114,7 @@ public void shouldAddPostProcessorTypeMetrics() { @Test public void shouldAddPreProcessorTypeMetrics() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); @@ -142,7 +134,7 @@ public void shouldAddPreProcessorTypeMetrics() { @Test public void shouldNotifySubscribers() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); @@ -158,9 +150,9 @@ public void shouldNotifySubscribers() { @Test public void shouldProcessTwoPostTransformers() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); - when(dataStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); + when(inputStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); transfromConfigs = new ArrayList<>(); transfromConfigs.add(new TransformConfig("io.odpf.dagger.core.processors.transformers.MockTransformer", new HashMap() {{ put("keyField", "keystore"); @@ -169,16 +161,16 @@ public void shouldProcessTwoPostTransformers() { put("keyField", "keystore"); }})); - TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, configuration); + TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, daggerContext); transformProcessor.process(streamInfo); verify(mappedDataStream, times(1)).map(any()); } @Test public void shouldProcessMultiplePostTransformers() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); - when(dataStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); + when(inputStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); when(mappedDataStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); transfromConfigs = new ArrayList<>(); transfromConfigs.add(new TransformConfig("io.odpf.dagger.core.processors.transformers.MockTransformer", new HashMap() {{ @@ -191,7 +183,7 @@ public void shouldProcessMultiplePostTransformers() { put("keyField", "keystore"); }})); - TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, configuration); + TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, daggerContext); transformProcessor.process(streamInfo); verify(mappedDataStream, times(2)).map(any()); @@ -202,7 +194,7 @@ public void shouldPopulateDefaultArguments() { TransformConfig config = new TransformConfig("io.odpf.TestProcessor", new HashMap() {{ put("test-key", "test-value"); }}); - TransformProcessor processor = new TransformProcessor("test_table", PRE_PROCESSOR_TYPE, Collections.singletonList(config), configuration); + TransformProcessor processor = new TransformProcessor("test_table", PRE_PROCESSOR_TYPE, Collections.singletonList(config), daggerContext); assertEquals("test_table", processor.tableName); assertEquals(PRE_PROCESSOR_TYPE, processor.type); assertEquals(1, processor.transformConfigs.size()); @@ -223,12 +215,12 @@ final class TransformProcessorMock extends TransformProcessor { private Transformer mockMapFunction; private TransformProcessorMock(Transformer mockMapFunction, List transformConfigs) { - super(transformConfigs, configuration); + super(transformConfigs, daggerContext); this.mockMapFunction = mockMapFunction; } private TransformProcessorMock(String table, TelemetryTypes type, Transformer mockMapFunction, List transformConfigs) { - super(table, type, transformConfigs, configuration); + super(table, type, transformConfigs, daggerContext); this.mockMapFunction = mockMapFunction; } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java index 22da89c38..e37446eca 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java @@ -1,12 +1,12 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; @@ -26,9 +26,9 @@ public class ClearColumnTransformer implements MapFunction, Transforme * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public ClearColumnTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public ClearColumnTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.columnNames = columnNames; this.targetColumnName = transformationArguments.get(TARGET_KEY_COLUMN_NAME); } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java index bd7eaa0aa..611b28434 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java @@ -1,5 +1,6 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; @@ -10,7 +11,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; @@ -31,9 +31,9 @@ public class DeDuplicationTransformer extends RichFilterFunction implements * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public DeDuplicationTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public DeDuplicationTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { keyIndex = Arrays.asList(columnNames).indexOf(String.valueOf(transformationArguments.get("key_column"))); ttlInSeconds = Integer.valueOf(String.valueOf(transformationArguments.get("ttl_in_seconds"))); } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java index af92c774f..bba768a45 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java @@ -1,11 +1,11 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.functions.udfs.aggregate.feast.FeatureUtils; @@ -30,9 +30,9 @@ public class FeatureTransformer implements MapFunction, Transformer { * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public FeatureTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public FeatureTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.columnNames = columnNames; this.keyColumn = transformationArguments.get(KEY_COLUMN_NAME); this.valueColumn = transformationArguments.get(VALUE_COLUMN_NAME); diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java index ac510eec1..4a0a5f789 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java @@ -1,11 +1,11 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.functions.transformers.feature.FeatureWithTypeHandler; @@ -26,9 +26,9 @@ public class FeatureWithTypeTransformer implements MapFunction, Transf * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public FeatureWithTypeTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public FeatureWithTypeTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.featureWithTypeHandler = new FeatureWithTypeHandler(transformationArguments, columnNames); } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java index f39ac6a86..c4b7e333c 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java @@ -1,12 +1,12 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; import com.google.protobuf.Descriptors; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; @@ -28,8 +28,7 @@ public class HashTransformer extends RichMapFunction implements Serial private static final String SINK_KAFKA_PROTO_MESSAGE = "SINK_KAFKA_PROTO_MESSAGE"; private static final String ENCRYPTION_FIELD_KEY = "maskColumns"; private final List fieldsToHash; - private Configuration configuration; - + private final DaggerContext daggerContext; private final String[] columnNames; private Map rowHasherMap; @@ -38,12 +37,12 @@ public class HashTransformer extends RichMapFunction implements Serial * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public HashTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public HashTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.fieldsToHash = getFieldsToHash(transformationArguments); this.columnNames = columnNames; - this.configuration = configuration; + this.daggerContext = daggerContext; } private ArrayList getFieldsToHash(Map transformationArguments) { @@ -71,8 +70,8 @@ public StreamInfo transform(StreamInfo streamInfo) { * @return the map */ protected Map createRowHasherMap() { - String outputProtoClassName = configuration.getString(SINK_KAFKA_PROTO_MESSAGE, ""); - StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration); + String outputProtoClassName = daggerContext.getConfiguration().getString(SINK_KAFKA_PROTO_MESSAGE, ""); + StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(daggerContext.getConfiguration()); Descriptors.Descriptor outputDescriptor = stencilClientOrchestrator.getStencilClient().get(outputProtoClassName); if (outputDescriptor == null) { throw new DescriptorNotFoundException("Output Descriptor for class: " + outputProtoClassName diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java index 622b4e0c8..5fbf1434f 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java @@ -1,10 +1,10 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.common.metrics.managers.CounterStatsManager; @@ -34,9 +34,9 @@ public class InvalidRecordFilterTransformer extends RichFilterFunction impl * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public InvalidRecordFilterTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public InvalidRecordFilterTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.tableName = (String) transformationArguments.getOrDefault("table_name", ""); validationIndex = Arrays.asList(columnNames).indexOf(INTERNAL_VALIDATION_FILED); } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java index a2b228401..5ab75d350 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java @@ -1,13 +1,12 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.common.watermark.RowtimeFieldWatermark; @@ -27,19 +26,21 @@ public class SQLTransformer implements Serializable, Transformer { private final String tableName; private final long allowedLatenessInMs; private static final String ROWTIME = "rowtime"; + private final DaggerContext daggerContext; /** * Instantiates a new Sql transformer. * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public SQLTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public SQLTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.columnNames = columnNames; this.sqlQuery = transformationArguments.get("sqlQuery"); this.tableName = transformationArguments.getOrDefault("tableName", "data_stream"); this.allowedLatenessInMs = Long.parseLong(transformationArguments.getOrDefault("allowedLatenessInMs", "0")); + this.daggerContext = daggerContext; } @Override @@ -53,8 +54,7 @@ public StreamInfo transform(StreamInfo inputStreamInfo) { schema = schema.replace(ROWTIME, ROWTIME + ".rowtime"); inputStream = assignTimeAttribute(inputStream); } - StreamExecutionEnvironment streamExecutionEnvironment = inputStream.getExecutionEnvironment(); - StreamTableEnvironment streamTableEnvironment = getStreamTableEnvironment(streamExecutionEnvironment); + StreamTableEnvironment streamTableEnvironment = daggerContext.getTableEnvironment(); streamTableEnvironment.registerDataStream(tableName, inputStream, schema); Table table = streamTableEnvironment.sqlQuery(sqlQuery); @@ -65,16 +65,6 @@ public StreamInfo transform(StreamInfo inputStreamInfo) { return new StreamInfo(outputStream, table.getSchema().getFieldNames()); } - /** - * Gets stream table environment. - * - * @param streamExecutionEnvironment the stream execution environment - * @return the stream table environment - */ - protected StreamTableEnvironment getStreamTableEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) { - return StreamTableEnvironment.create(streamExecutionEnvironment); - } - private DataStream assignTimeAttribute(DataStream inputStream) { StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new RowtimeFieldWatermark(columnNames)); return streamWatermarkAssigner.assignTimeStampAndWatermark(inputStream, allowedLatenessInMs); diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/ClearColumnTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/ClearColumnTransformerTest.java index 10eee0d7e..89e0af749 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/ClearColumnTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/ClearColumnTransformerTest.java @@ -1,13 +1,11 @@ package io.odpf.dagger.functions.transformers; -import org.apache.flink.streaming.api.datastream.DataStream; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; import java.util.HashMap; import java.util.Map; @@ -20,13 +18,7 @@ import static org.mockito.MockitoAnnotations.initMocks; -public class ClearColumnTransformerTest { - - @Mock - private DataStream dataStream; - - @Mock - private Configuration configuration; +public class ClearColumnTransformerTest extends DaggerContextTestBase { @Before public void setup() { @@ -44,7 +36,7 @@ public void shouldSetTargetColumnToEmpty() { inputRow.setField(0, "NEWDEVICE.FREEZE.CR.UPDATE.PIN"); inputRow.setField(1, "wallet-id-123"); inputRow.setField(2, commsData); - ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, configuration); + ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, daggerContext); Row outputRow = clearColumnTransformer.map(inputRow); assertEquals("", outputRow.getField(1)); } @@ -58,7 +50,7 @@ public void shouldThrowExceptionWhenTargetColumnIsNotPresent() { inputRow.setField(0, "NEWDEVICE.FREEZE.CR.UPDATE.PIN"); inputRow.setField(1, "wallet-id-123"); inputRow.setField(2, "random"); - ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, configuration); + ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, daggerContext); clearColumnTransformer.map(inputRow); } @@ -73,10 +65,10 @@ public void shouldTransformInputStreamWithItsTransformer() { inputRow.setField(0, "NEWDEVICE.FREEZE.CR.UPDATE.PIN"); inputRow.setField(1, "wallet-id-123"); inputRow.setField(2, commsData); - ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); clearColumnTransformer.transform(inputStreamInfo); - verify(dataStream, times(1)).map(any(ClearColumnTransformer.class)); + verify(inputStream, times(1)).map(any(ClearColumnTransformer.class)); } @Test @@ -90,8 +82,8 @@ public void shouldReturnSameColumnNames() { inputRow.setField(0, "NEWDEVICE.FREEZE.CR.UPDATE.PIN"); inputRow.setField(1, "wallet-id-123"); inputRow.setField(2, commsData); - ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); StreamInfo outputStreamInfo = clearColumnTransformer.transform(inputStreamInfo); assertArrayEquals(columnNames, outputStreamInfo.getColumnNames()); } diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java index cae7fae16..17bedef79 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java @@ -1,14 +1,13 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import org.junit.Assert; import org.junit.Before; @@ -26,28 +25,21 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -@RunWith(MockitoJUnitRunner.class) -public class DeDuplicationTransformerTest { +@RunWith(MockitoJUnitRunner.Silent.class) +public class DeDuplicationTransformerTest extends DaggerContextTestBase { @Mock private RuntimeContext runtimeContext; - @Mock - private Configuration configuration; - @Mock private org.apache.flink.configuration.Configuration flinkInternalConfig; @Mock private MapState mapState; - @Mock - private DataStream dataStream; - @Mock private KeyedStream keyedStream; - @Before public void setup() { initMocks(this); @@ -114,7 +106,7 @@ public void shouldFilterRecordsIfKeyAlreadyPresent() throws Exception { @Test public void shouldTransformInputStreamWithKeyByAndApplyFilterFunction() { - when(dataStream.keyBy(any(KeySelector.class))).thenReturn(keyedStream); + when(inputStream.keyBy(any(KeySelector.class))).thenReturn(keyedStream); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("key_column", "status"); transformationArguments.put("ttl_in_seconds", 10); @@ -123,15 +115,15 @@ public void shouldTransformInputStreamWithKeyByAndApplyFilterFunction() { inputRow.setField(0, "123"); inputRow.setField(1, "TEST_SERVICE_TYPE"); inputRow.setField(2, "TEST_STATUS"); - DeDuplicationTransformer deDuplicationTransformer = new DeDuplicationTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + DeDuplicationTransformer deDuplicationTransformer = new DeDuplicationTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); deDuplicationTransformer.transform(inputStreamInfo); verify(keyedStream, times(1)).filter(any(DeDuplicationTransformer.class)); } @Test public void shouldReturnSameColumnNames() { - when(dataStream.keyBy(any(KeySelector.class))).thenReturn(keyedStream); + when(inputStream.keyBy(any(KeySelector.class))).thenReturn(keyedStream); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("key_column", "status"); transformationArguments.put("ttl_in_seconds", 10); @@ -140,8 +132,8 @@ public void shouldReturnSameColumnNames() { inputRow.setField(0, "123"); inputRow.setField(1, "TEST_SERVICE_TYPE"); inputRow.setField(2, "TEST_STATUS"); - DeDuplicationTransformer deDuplicationTransformer = new DeDuplicationTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + DeDuplicationTransformer deDuplicationTransformer = new DeDuplicationTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); StreamInfo outputStreamInfo = deDuplicationTransformer.transform(inputStreamInfo); Assert.assertArrayEquals(columnNames, outputStreamInfo.getColumnNames()); } @@ -149,7 +141,7 @@ public void shouldReturnSameColumnNames() { public class DeDuplicationTransformerStub extends DeDuplicationTransformer { public DeDuplicationTransformerStub(Map transformationArguments, String[] columnNames) { - super(transformationArguments, columnNames, configuration); + super(transformationArguments, columnNames, daggerContext); } @Override diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java index 9c01caff5..fbde1dc32 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java @@ -1,14 +1,12 @@ package io.odpf.dagger.functions.transformers; -import org.apache.flink.streaming.api.datastream.DataStream; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; import java.util.HashMap; @@ -17,14 +15,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; -public class FeatureTransformerTest { - - @Mock - private DataStream dataStream; - - @Mock - private Configuration configuration; - +public class FeatureTransformerTest extends DaggerContextTestBase { @Before public void setup() { initMocks(this); @@ -40,7 +31,7 @@ public void shouldReturnOutputRowAsFeatureRowOnPassingFloatAsValue() throws Exce inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2f); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); Row outputRow = featureTransformer.map(inputRow); Assert.assertEquals(3, outputRow.getArity()); Assert.assertEquals(inputRow.getField(0), outputRow.getField(0)); @@ -61,7 +52,7 @@ public void shouldReturnOutputRowAsFeatureRowOnPassingIntegerAsValue() throws Ex inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); Row outputRow = featureTransformer.map(inputRow); Assert.assertEquals(3, outputRow.getArity()); Assert.assertEquals(inputRow.getField(0), outputRow.getField(0)); @@ -81,7 +72,7 @@ public void shouldThrowExceptionForUnSupportedDataType() throws Exception { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, "value".getBytes()); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); featureTransformer.map(inputRow); } @@ -95,7 +86,7 @@ public void shouldHandleWhenKeyIsNotPresent() throws Exception { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2f); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); featureTransformer.map(inputRow); } @@ -109,7 +100,7 @@ public void shouldHandleWhenValueIsNotPresent() throws Exception { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2f); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); featureTransformer.map(inputRow); } @@ -123,10 +114,10 @@ public void shouldTransformInputStreamWithItsTransformer() { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); featureTransformer.transform(inputStreamInfo); - verify(dataStream, times(1)).map(any(FeatureTransformer.class)); + verify(inputStream, times(1)).map(any(FeatureTransformer.class)); } @Test @@ -139,8 +130,8 @@ public void shouldReturnSameColumnNames() { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); StreamInfo outputStreamInfo = featureTransformer.transform(inputStreamInfo); Assert.assertArrayEquals(columnNames, outputStreamInfo.getColumnNames()); } diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformerTest.java index bc354c1a4..83f99eb15 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformerTest.java @@ -1,9 +1,8 @@ package io.odpf.dagger.functions.transformers; -import org.apache.flink.streaming.api.datastream.DataStream; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import org.junit.Assert; import org.junit.Before; @@ -19,17 +18,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; -public class FeatureWithTypeTransformerTest { +public class FeatureWithTypeTransformerTest extends DaggerContextTestBase { - @Mock - private DataStream dataStream; @Mock private org.apache.flink.configuration.Configuration flinkInternalConfig; - @Mock - private Configuration configuration; - @Before public void setup() { initMocks(this); @@ -51,7 +45,7 @@ public void shouldReturnOutputRowAsFeatureRowOnPassingStringAsValue() throws Exc inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "test_order_number"); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transfromationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transfromationArguments, columnNames, daggerContext); Row outputRow = featureWithTypeTransformer.map(inputRow); Assert.assertEquals(3, outputRow.getArity()); Assert.assertEquals(inputRow.getField(0), outputRow.getField(0)); @@ -78,7 +72,7 @@ public void shouldReturnOutputRowAsFeatureRowOnPassingFloatType() throws Excepti inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "1.4"); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); Row outputRow = featureWithTypeTransformer.map(inputRow); Assert.assertEquals(3, outputRow.getArity()); Assert.assertEquals(inputRow.getField(0), outputRow.getField(0)); @@ -106,7 +100,7 @@ public void shouldThrowExceptionForUnSupportedDataType() throws Exception { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "value".getBytes()); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); featureWithTypeTransformer.map(inputRow); } @@ -126,7 +120,7 @@ public void shouldThrowIfKeyColumnNameNotExist() throws Exception { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "value".getBytes()); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); featureWithTypeTransformer.map(inputRow); } @@ -146,7 +140,7 @@ public void shouldThrowIfValueColumnNameNotExist() throws Exception { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "value".getBytes()); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); featureWithTypeTransformer.map(inputRow); } @@ -166,7 +160,7 @@ public void shouldThrowIfVoutputColumnNameNotExist() throws Exception { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "value".getBytes()); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); featureWithTypeTransformer.map(inputRow); } @@ -186,10 +180,10 @@ public void shouldTransformInputStreamWithItsTransformer() { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "1.4"); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); featureWithTypeTransformer.transform(inputStreamInfo); - verify(dataStream, times(1)).map(any(FeatureWithTypeTransformer.class)); + verify(inputStream, times(1)).map(any(FeatureWithTypeTransformer.class)); } @Test @@ -208,8 +202,8 @@ public void shouldReturnSameColumnNames() { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "1.4"); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); StreamInfo outputStreamInfo = featureWithTypeTransformer.transform(inputStreamInfo); Assert.assertArrayEquals(columnNames, outputStreamInfo.getColumnNames()); } diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java index aeb2bd30f..8d8de8459 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java @@ -1,10 +1,9 @@ package io.odpf.dagger.functions.transformers; -import org.apache.flink.streaming.api.datastream.DataStream; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.types.Row; import com.google.protobuf.Timestamp; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.exceptions.DescriptorNotFoundException; import io.odpf.dagger.consumer.TestBookingLogMessage; @@ -24,16 +23,9 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class HashTransformerTest { +public class HashTransformerTest extends DaggerContextTestBase { @Rule public ExpectedException thrown = ExpectedException.none(); - - @Mock - private DataStream inputStream; - - @Mock - private Configuration configuration; - @Mock private org.apache.flink.configuration.Configuration flinkInternalConfig; @@ -64,7 +56,7 @@ public void shouldHashSingleStringFieldInInputRow() throws Exception { inputRow.setField(1, 1); inputRow.setField(2, false); - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); Row outputRow = hashTransformer.map(inputRow); @@ -93,7 +85,7 @@ public void shouldHashMultipleFieldsOfSupportedDatatypeInInputRow() throws Excep inputRow.setField(1, 1); inputRow.setField(2, false); - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); Row outputRow = hashTransformer.map(inputRow); @@ -125,7 +117,7 @@ public void shouldHashAllFieldsOfSupportedDataTypesInInputRow() throws Exception inputRow.setField(1, 1); inputRow.setField(2, 1L); - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); Row outputRow = hashTransformer.map(inputRow); @@ -170,7 +162,7 @@ public void shouldHashNestedFields() throws Exception { inputRow.setField(0, bookingLogRow); - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); Row outputRow = hashTransformer.map(inputRow); @@ -195,7 +187,7 @@ public void shouldCreateRowHasherMapInOpen() throws Exception { transformationArguments.put("valueColumnName", fieldsToEncrypt); String[] columnNames = {"order_number", "cancel_reason_id", "is_reblast"}; - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); verify(configuration, times(1)).getString("SINK_KAFKA_PROTO_MESSAGE", ""); @@ -215,7 +207,7 @@ public void shouldThrowErrorIfUnableToCreateRowHasherMap() throws Exception { transformationArguments.put("valueColumnName", fieldsToEncrypt); String[] columnNames = {"order_number", "cancel_reason_id", "is_reblast"}; - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); } @@ -234,7 +226,7 @@ public void shouldThrowErrorIfUnableToFindOpDescriptor() throws Exception { transformationArguments.put("valueColumnName", fieldsToEncrypt); String[] columnNames = {"order_number", "cancel_reason_id", "is_reblast"}; - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); } @@ -249,7 +241,7 @@ public void shouldTransformInputStreamToOutputStream() { transformationArguments.put("valueColumnName", fieldsToEncrypt); String[] columnNames = {"order_number", "cancel_reason_id", "is_reblast"}; - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.transform(new StreamInfo(inputStream, columnNames)); verify(inputStream, times(1)).map(hashTransformer); diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformerTest.java index 8954e47cb..22bc83e7b 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformerTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.OperatorMetricGroup; @@ -7,7 +8,6 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.consumer.TestBookingLogMessage; import org.junit.Assert; import org.junit.Before; @@ -26,7 +26,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class InvalidRecordFilterTransformerTest { +public class InvalidRecordFilterTransformerTest extends DaggerContextTestBase { @Mock private RuntimeContext runtimeContext; @@ -35,9 +35,6 @@ public class InvalidRecordFilterTransformerTest { @Mock private Counter counter; - @Mock - private Configuration configuration; - @Mock private org.apache.flink.configuration.Configuration flinkInternalConfig; @@ -85,7 +82,7 @@ private Row createDefaultValidRow(DynamicMessage defaultInstance) { public void shouldFilterBadRecords() throws Exception { InvalidRecordFilterTransformer filter = new InvalidRecordFilterTransformer(new HashMap() {{ put("table_name", "test"); - }}, getColumns(), configuration); + }}, getColumns(), daggerContext); filter.setRuntimeContext(runtimeContext); when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); when(metricGroup.addGroup("per_table", "test")).thenReturn(metricGroup); @@ -106,7 +103,7 @@ public void shouldFilterBadRecords() throws Exception { public void shouldPassValidRecords() throws Exception { InvalidRecordFilterTransformer filter = new InvalidRecordFilterTransformer(new HashMap() {{ put("table_name", "test"); - }}, getColumns(), configuration); + }}, getColumns(), daggerContext); filter.setRuntimeContext(runtimeContext); when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); when(metricGroup.addGroup("per_table", "test")).thenReturn(metricGroup); diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java index c00ec5ca2..8330fa7c8 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java @@ -1,22 +1,24 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContextTestBase; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; @@ -28,16 +30,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class SQLTransformerTest { - - @Mock - private DataStream inputStream; - - @Mock - private StreamExecutionEnvironment streamExecutionEnvironment; - - @Mock - private StreamTableEnvironment streamTableEnvironment; +public class SQLTransformerTest extends DaggerContextTestBase { @Mock private Table table; @@ -57,13 +50,34 @@ public class SQLTransformerTest { @Mock private SingleOutputStreamOperator watermarkedStream; - @Mock - private Configuration configuration; + private Multiply multiply; @Before public void setup() { initMocks(this); + setMock(daggerContext); + when(daggerContext.getConfiguration()).thenReturn(configuration); when(inputStream.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getTableEnvironment()).thenReturn(streamTableEnvironment); + multiply = new Multiply(); + } + + @After + public void resetSingleton() throws Exception { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(null, null); + } + + private void setMock(DaggerContext mock) { + try { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(instance, mock); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Test @@ -89,6 +103,28 @@ public void shouldApplyThePassedSQL() { assertEquals(outputStream, outputStreamInfo.getDataStream()); } + @Test + public void shouldReturnColumnNamesReturnedBySQLAndUdf() { + HashMap transformationArguments = new HashMap<>(); + String sqlQuery = "SELECT order_number, service_type, multiply(1,100) product FROM data_stream"; + transformationArguments.put("sqlQuery", sqlQuery); + String[] columnNames = {"order_number", "service_type", "status", "product"}; + streamTableEnvironment.createTemporaryFunction("multiply", multiply); + + when(streamTableEnvironment.sqlQuery(sqlQuery)).thenReturn(table); + when(table.getSchema()).thenReturn(tableSchema); + String[] outputColumns = {"order_number", "service_type", "product"}; + when(tableSchema.getFieldNames()).thenReturn(outputColumns); + when(streamTableEnvironment.toRetractStream(table, Row.class)).thenReturn(retractStream); + when(retractStream.filter(any())).thenReturn(filteredRetractStream); + when(filteredRetractStream.map(any())).thenReturn(outputStream); + SQLTransformer sqlTransformer = new SQLTransformerStub(transformationArguments, columnNames); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); + StreamInfo outputStreamInfo = sqlTransformer.transform(inputStreamInfo); + + assertArrayEquals(outputColumns, outputStreamInfo.getColumnNames()); + } + @Test public void shouldReturnColumnNamesReturnedBySQL() { HashMap transformationArguments = new HashMap<>(); @@ -195,12 +231,13 @@ public void shouldThrowExceptionIfSqlNotProvided() { class SQLTransformerStub extends SQLTransformer { SQLTransformerStub(Map transformationArguments, String[] columnNames) { - super(transformationArguments, columnNames, configuration); + super(transformationArguments, columnNames, daggerContext); } + } - @Override - protected StreamTableEnvironment getStreamTableEnvironment(StreamExecutionEnvironment executionEnvironment) { - return streamTableEnvironment; + class Multiply extends ScalarFunction { + public Integer eval(Integer i1, Integer i2) { + return i1 * i2; } } diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/EsExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/EsExternalPostProcessorIntegrationTest.java index 1b11bbe79..b8d3db9f8 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/EsExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/EsExternalPostProcessorIntegrationTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.integrationtest; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -39,7 +40,7 @@ import static org.junit.Assert.assertTrue; -public class EsExternalPostProcessorIntegrationTest { +public class EsExternalPostProcessorIntegrationTest extends DaggerContextTestBase { private static final Logger LOGGER = LoggerFactory.getLogger(EsExternalPostProcessorIntegrationTest.class.getName()); @@ -319,7 +320,7 @@ private RestClient getESClient() { } private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); StreamInfo postProcessedStream = streamInfo; for (PostProcessor postProcessor : postProcessors) { postProcessedStream = postProcessor.process(postProcessedStream); diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/GrpcExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/GrpcExternalPostProcessorIntegrationTest.java index adf04470a..52dbb09c2 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/GrpcExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/GrpcExternalPostProcessorIntegrationTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.integrationtest; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -37,7 +38,7 @@ import static org.junit.Assert.assertTrue; -public class GrpcExternalPostProcessorIntegrationTest { +public class GrpcExternalPostProcessorIntegrationTest extends DaggerContextTestBase { private StencilClientOrchestrator stencilClientOrchestrator; private MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter(); @@ -321,7 +322,7 @@ public synchronized void invoke(Row inputRow, Context context) { } private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); StreamInfo postProcessedStream = streamInfo; for (PostProcessor postProcessor : postProcessors) { postProcessedStream = postProcessor.process(postProcessedStream); diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java index fa2574cbb..429978ad0 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.integrationtest; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -33,7 +34,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class HttpExternalPostProcessorIntegrationTest { +public class HttpExternalPostProcessorIntegrationTest extends DaggerContextTestBase { @Rule public WireMockRule wireMockRule = new WireMockRule(8089); @@ -408,7 +409,7 @@ public synchronized void invoke(Row inputRow, Context context) { } private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); StreamInfo postProcessedStream = streamInfo; for (PostProcessor postProcessor : postProcessors) { postProcessedStream = postProcessor.process(postProcessedStream); diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java index f16eb36a4..cc4918a29 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java @@ -1,10 +1,12 @@ package io.odpf.dagger.integrationtest; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; @@ -18,13 +20,11 @@ import io.vertx.pgclient.PgPool; import io.vertx.sqlclient.PoolOptions; import org.apache.commons.lang3.StringUtils; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; @@ -38,7 +38,8 @@ import static io.vertx.pgclient.PgPool.pool; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @Ignore public class PostGresExternalPostProcessorIntegrationTest { @@ -56,6 +57,15 @@ public class PostGresExternalPostProcessorIntegrationTest { private StencilClientOrchestrator stencilClientOrchestrator; private final MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter(); + private static DaggerContext daggerContext; + + private static StreamExecutionEnvironment streamExecutionEnvironment; + + private static StreamTableEnvironment streamTableEnvironment; + + private static Configuration configuration; + + @BeforeClass public static void setUp() { host = System.getenv("PG_HOST"); @@ -63,11 +73,20 @@ public static void setUp() { host = "localhost"; } - try { + String streams = "[{\"SOURCE_KAFKA_TOPIC_NAMES\":\"dummy-topic\",\"INPUT_SCHEMA_TABLE\":\"testbooking\",\"INPUT_SCHEMA_PROTO_CLASS\":\"io.odpf.dagger.consumer.TestBookingLogMessage\",\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"41\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"localhost:6668\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"test-consumer\",\"SOURCE_KAFKA_NAME\":\"localkafka\"}]"; + configurationMap.put(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, "true"); + configurationMap.put(INPUT_STREAMS, streams); + configuration = new Configuration(ParameterTool.fromMap(configurationMap)); + + daggerContext = mock(DaggerContext.class); + streamExecutionEnvironment = mock(StreamExecutionEnvironment.class); + streamTableEnvironment = mock(StreamTableEnvironment.class); + setMock(daggerContext); + when(daggerContext.getConfiguration()).thenReturn(configuration); + when(daggerContext.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getTableEnvironment()).thenReturn(streamTableEnvironment); - String streams = "[{\"SOURCE_KAFKA_TOPIC_NAMES\":\"dummy-topic\",\"INPUT_SCHEMA_TABLE\":\"testbooking\",\"INPUT_SCHEMA_PROTO_CLASS\":\"io.odpf.dagger.consumer.TestBookingLogMessage\",\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"41\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"localhost:6668\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"test-consumer\",\"SOURCE_KAFKA_NAME\":\"localkafka\"}]"; - configurationMap.put(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, "true"); - configurationMap.put(INPUT_STREAMS, streams); + try { pgClient = getPGClient(); @@ -98,6 +117,23 @@ public static void setUp() { } } + @AfterClass + public static void resetSingleton() throws Exception { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(null, null); + } + + private static void setMock(DaggerContext mock) { + try { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(instance, mock); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private static PgPool getPGClient() { String dbHost = System.getenv("PG_HOST"); @@ -316,7 +352,7 @@ public void shouldPopulateFieldFromPostgresOnSuccessResponseWithAllThreeSourcesI private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(new Configuration(ParameterTool.fromMap(configurationMap)), stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); StreamInfo postProcessedStream = streamInfo; for (PostProcessor postProcessor : postProcessors) { postProcessedStream = postProcessor.process(postProcessedStream);