From f77fb14c095b9de3134fbc18947d1e514b37a7fe Mon Sep 17 00:00:00 2001 From: Shreyansh Date: Wed, 26 Oct 2022 12:21:04 +0530 Subject: [PATCH 1/5] feat: Enable UDF access to SQLTransformers Currently in Dagger Flink sql-query we can access all the java and python based UDF functions. But the same UDF functions are not accessible in the post-processor(SQLTransformer). The Flink API StreamTableEnvironment instance is used to register the UDF function in method call registerFunctions() in StreamManager.java class. Since the same instance is not used to create Flink tables in SQLTransformer.java class, due to which UDFs are not accessible. We can solve this by two approaches as below. Approach-1: We can introduce the DaggerContext singleton object which holds the StreamExecutionEnvironment, StreamTableEnvironment and Configuration instance variables, we can use these variables throughout the application.This context object gets initialized only once in driver class KafkaProtoSQLProcessor.java. The DaggerContext instance is made available to SQLTransformer through constructor(KafkaProtoSQLProcessor -> StreamManager -> PostProcessorFactory -> ParentPostProcessor -> TransformProcessor). With this DaggerContext we can register the Flink table in SQLTransformer.java. And can have access to the UDFs which were registered earlier. Approach-2: In SQLTransformer.java class we can create a new instance of StreamManager and call registerFunctions method for each SQLTransformer configuration. With this approach, if the user calls n times SqlTransformer configuration, then n times the registration of UDFs get called and n times Objects are initialized. Here we have followed Approach-1. --- .../dagger/common/core/DaggerContext.java | 67 ++++++++++++++++ .../exceptions/DaggerContextException.java | 16 ++++ .../common/core/DaggerContextTestBase.java | 60 +++++++++++++++ .../dagger/core/KafkaProtoSQLProcessor.java | 13 +--- .../io/odpf/dagger/core/StreamManager.java | 25 +++--- .../core/processors/ParentPostProcessor.java | 27 ++++--- .../core/processors/PostProcessorFactory.java | 18 ++--- .../core/processors/PreProcessorFactory.java | 37 +-------- .../processors/PreProcessorOrchestrator.java | 43 +++++++++-- .../transformers/TransformProcessor.java | 20 ++--- .../odpf/dagger/core/StreamManagerTest.java | 49 +++++------- .../processors/ParentPostProcessorTest.java | 6 +- .../processors/PostProcessorFactoryTest.java | 14 ++-- .../processors/PreProcessorFactoryTest.java | 36 +-------- .../PreProcessorOrchestratorTest.java | 77 +++++++++++++++++-- .../transformers/MockTransformer.java | 3 +- .../transformers/TransformProcessorTest.java | 46 +++++------ .../transformers/ClearColumnTransformer.java | 5 +- .../DeDuplicationTransformer.java | 5 +- .../transformers/FeatureTransformer.java | 5 +- .../FeatureWithTypeTransformer.java | 5 +- .../transformers/HashTransformer.java | 14 ++-- .../InvalidRecordFilterTransformer.java | 5 +- .../transformers/SQLTransformer.java | 22 ++---- .../ClearColumnTransformerTest.java | 26 +++---- .../DeDuplicationTransformerTest.java | 26 +++---- .../transformers/FeatureTransformerTest.java | 34 ++++---- .../FeatureWithTypeTransformerTest.java | 32 ++++---- .../transformers/HashTransformerTest.java | 29 +++---- .../InvalidRecordFilterTransformerTest.java | 11 +-- .../transformers/SQLTransformerTest.java | 74 +++++++++++++----- ...sExternalPostProcessorIntegrationTest.java | 5 +- ...cExternalPostProcessorIntegrationTest.java | 5 +- ...pExternalPostProcessorIntegrationTest.java | 7 +- ...sExternalPostProcessorIntegrationTest.java | 57 +++++++++++--- 35 files changed, 554 insertions(+), 370 deletions(-) create mode 100644 dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java create mode 100644 dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java create mode 100644 dagger-common/src/test/java/io/odpf/dagger/common/core/DaggerContextTestBase.java diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java b/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java new file mode 100644 index 000000000..a8685cbfc --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java @@ -0,0 +1,67 @@ +package io.odpf.dagger.common.core; + +import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.exceptions.DaggerContextException; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The DaggerContext singleton object initializes with StreamExecutionEnvironment, StreamTableEnvironment and Configuration + */ +public class DaggerContext { + private static final Logger LOGGER = LoggerFactory.getLogger(DaggerContext.class.getName()); + private static volatile DaggerContext daggerContext = null; + private final StreamExecutionEnvironment executionEnvironment; + private final StreamTableEnvironment tableEnvironment; + private final Configuration configuration; + + /** + * Instantiates a new DaggerContext. + * + * @param configuration the Configuration + */ + private DaggerContext(Configuration configuration) { + this.executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); + tableEnvironment = StreamTableEnvironment.create(executionEnvironment, environmentSettings); + this.configuration = configuration; + } + + /** + * Get the instance of DaggerContext + */ + public static DaggerContext getInstance() { + if (daggerContext == null) { + throw new DaggerContextException("DaggerContext object is not initialized"); + } + return daggerContext; + } + + /** + * Initialization of a new DaggerContext. + * @param configuration the Configuration + */ + public synchronized static DaggerContext init(Configuration configuration) { + if (daggerContext != null) { + throw new DaggerContextException("DaggerContext object is already initialized"); + } + daggerContext = new DaggerContext(configuration); + LOGGER.info("DaggerContext is initialized"); + return daggerContext; + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return executionEnvironment; + } + + public StreamTableEnvironment getTableEnvironment() { + return tableEnvironment; + } + + public Configuration getConfiguration() { + return configuration; + } +} diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java b/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java new file mode 100644 index 000000000..13d7e0f1b --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java @@ -0,0 +1,16 @@ +package io.odpf.dagger.common.exceptions; + +/** + * The class Exception if there is something wrong with Dagger context object. + */ +public class DaggerContextException extends RuntimeException{ + + /** + * Instantiates a new Dagger context exception with specified error message. + * + * @param message the message + */ + public DaggerContextException(String message) { + super(message); + } +} diff --git a/dagger-common/src/test/java/io/odpf/dagger/common/core/DaggerContextTestBase.java b/dagger-common/src/test/java/io/odpf/dagger/common/core/DaggerContextTestBase.java new file mode 100644 index 000000000..19608a028 --- /dev/null +++ b/dagger-common/src/test/java/io/odpf/dagger/common/core/DaggerContextTestBase.java @@ -0,0 +1,60 @@ +package io.odpf.dagger.common.core; + +import io.odpf.dagger.common.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.junit.After; +import org.junit.Before; + +import java.lang.reflect.Field; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public abstract class DaggerContextTestBase { + + protected DaggerContext daggerContext; + + protected DataStream inputStream; + + protected StreamExecutionEnvironment streamExecutionEnvironment; + + protected StreamTableEnvironment streamTableEnvironment; + + protected Configuration configuration; + + @Before + public void setupBase() { + daggerContext = mock(DaggerContext.class); + configuration = mock(Configuration.class); + inputStream = mock(DataStream.class); + streamExecutionEnvironment = mock(StreamExecutionEnvironment.class); + streamTableEnvironment = mock(StreamTableEnvironment.class); + initMocks(this); + setMock(daggerContext); + when(daggerContext.getConfiguration()).thenReturn(configuration); + when(inputStream.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getTableEnvironment()).thenReturn(streamTableEnvironment); + } + + @After + public void resetSingletonBase() throws Exception { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(null, null); + } + + private void setMock(DaggerContext mock) { + try { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(instance, mock); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java b/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java index 41e79e232..add69b254 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java @@ -4,9 +4,7 @@ import io.odpf.dagger.core.config.ConfigurationProvider; import io.odpf.dagger.core.config.ConfigurationProviderFactory; import org.apache.flink.client.program.ProgramInvocationException; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import io.odpf.dagger.common.core.DaggerContext; import java.util.TimeZone; @@ -26,11 +24,8 @@ public static void main(String[] args) throws ProgramInvocationException { ConfigurationProvider provider = new ConfigurationProviderFactory(args).provider(); Configuration configuration = provider.get(); TimeZone.setDefault(TimeZone.getTimeZone("UTC")); - StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); - EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, environmentSettings); - - StreamManager streamManager = new StreamManager(configuration, executionEnvironment, tableEnvironment); + DaggerContext daggerContext = DaggerContext.init(configuration); + StreamManager streamManager = new StreamManager(daggerContext); streamManager .registerConfigs() .registerSourceWithPreProcessors() @@ -42,4 +37,4 @@ public static void main(String[] args) throws ProgramInvocationException { throw new ProgramInvocationException(e); } } -} +} \ No newline at end of file diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java index ea6b4f4ee..baeb8a328 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java @@ -1,6 +1,7 @@ package io.odpf.dagger.core; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.udfs.UdfFactory; @@ -53,17 +54,18 @@ public class StreamManager { private StencilClientOrchestrator stencilClientOrchestrator; private DaggerStatsDReporter daggerStatsDReporter; + private final DaggerContext daggerContext; + /** * Instantiates a new Stream manager. * - * @param configuration the configuration in form of param - * @param executionEnvironment the execution environment - * @param tableEnvironment the table environment + * @param daggerContext the daggerContext in form of param */ - public StreamManager(Configuration configuration, StreamExecutionEnvironment executionEnvironment, StreamTableEnvironment tableEnvironment) { - this.configuration = configuration; - this.executionEnvironment = executionEnvironment; - this.tableEnvironment = tableEnvironment; + public StreamManager(DaggerContext daggerContext) { + this.daggerContext = daggerContext; + this.configuration = daggerContext.getConfiguration(); + this.executionEnvironment = daggerContext.getExecutionEnvironment(); + this.tableEnvironment = daggerContext.getTableEnvironment(); } /** @@ -99,7 +101,6 @@ public StreamManager registerConfigs() { public StreamManager registerSourceWithPreProcessors() { long watermarkDelay = configuration.getLong(FLINK_WATERMARK_DELAY_MS_KEY, FLINK_WATERMARK_DELAY_MS_DEFAULT); Boolean enablePerPartitionWatermark = configuration.getBoolean(FLINK_WATERMARK_PER_PARTITION_ENABLE_KEY, FLINK_WATERMARK_PER_PARTITION_ENABLE_DEFAULT); - PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration); StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter) .forEach(stream -> { String tableName = stream.getStreamName(); @@ -112,7 +113,7 @@ public StreamManager registerSourceWithPreProcessors() { TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType()); StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames()); - streamInfo = addPreProcessor(streamInfo, tableName, preProcessorConfig); + streamInfo = addPreProcessor(streamInfo, tableName); Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo)); tableEnvironment.createTemporaryView(tableName, table); @@ -211,15 +212,15 @@ protected StreamInfo createStreamInfo(Table table) { } private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); for (PostProcessor postProcessor : postProcessors) { streamInfo = postProcessor.process(streamInfo); } return streamInfo; } - private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName, PreProcessorConfig preProcessorConfig) { - List preProcessors = PreProcessorFactory.getPreProcessors(configuration, preProcessorConfig, tableName, telemetryExporter); + private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName) { + List preProcessors = PreProcessorFactory.getPreProcessors(daggerContext, tableName, telemetryExporter); for (Preprocessor preprocessor : preProcessors) { streamInfo = preprocessor.process(streamInfo); } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/ParentPostProcessor.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/ParentPostProcessor.java index 12875b26a..0373155e2 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/ParentPostProcessor.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/ParentPostProcessor.java @@ -1,5 +1,6 @@ package io.odpf.dagger.core.processors; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; @@ -26,23 +27,29 @@ */ public class ParentPostProcessor implements PostProcessor { private final PostProcessorConfig postProcessorConfig; - private Configuration configuration; + private final StencilClientOrchestrator stencilClientOrchestrator; private TelemetrySubscriber telemetrySubscriber; + private final DaggerContext daggerContext; + /** * Instantiates a new Parent post processor. * - * @param postProcessorConfig the post processor config - * @param configuration the configuration + * @param daggerContext the daggerContext * @param stencilClientOrchestrator the stencil client orchestrator * @param telemetrySubscriber the telemetry subscriber */ - public ParentPostProcessor(PostProcessorConfig postProcessorConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, TelemetrySubscriber telemetrySubscriber) { - this.postProcessorConfig = postProcessorConfig; - this.configuration = configuration; + public ParentPostProcessor(DaggerContext daggerContext, StencilClientOrchestrator stencilClientOrchestrator, TelemetrySubscriber telemetrySubscriber) { + this.daggerContext = daggerContext; this.stencilClientOrchestrator = stencilClientOrchestrator; this.telemetrySubscriber = telemetrySubscriber; + this.postProcessorConfig = parsePostProcessorConfig(daggerContext.getConfiguration()); + } + + private static PostProcessorConfig parsePostProcessorConfig(Configuration configuration) { + String postProcessorConfigString = configuration.getString(Constants.PROCESSOR_POSTPROCESSOR_CONFIG_KEY, ""); + return PostProcessorConfig.parse(postProcessorConfigString); } @Override @@ -56,7 +63,7 @@ public StreamInfo process(StreamInfo streamInfo) { InitializationDecorator initializationDecorator = new InitializationDecorator(columnNameManager); resultStream = initializationDecorator.decorate(resultStream); streamInfo = new StreamInfo(resultStream, streamInfo.getColumnNames()); - SchemaConfig schemaConfig = new SchemaConfig(configuration, stencilClientOrchestrator, columnNameManager); + SchemaConfig schemaConfig = new SchemaConfig(daggerContext.getConfiguration(), stencilClientOrchestrator, columnNameManager); List enabledPostProcessors = getEnabledPostProcessors(telemetrySubscriber, schemaConfig); for (PostProcessor postProcessor : enabledPostProcessors) { @@ -66,7 +73,7 @@ public StreamInfo process(StreamInfo streamInfo) { FetchOutputDecorator fetchOutputDecorator = new FetchOutputDecorator(schemaConfig, postProcessorConfig.hasSQLTransformer()); resultStream = fetchOutputDecorator.decorate(streamInfo.getDataStream()); StreamInfo resultantStreamInfo = new StreamInfo(resultStream, columnNameManager.getOutputColumnNames()); - TransformProcessor transformProcessor = new TransformProcessor(postProcessorConfig.getTransformers(), configuration); + TransformProcessor transformProcessor = new TransformProcessor(postProcessorConfig.getTransformers(), daggerContext); if (transformProcessor.canProcess(postProcessorConfig)) { transformProcessor.notifySubscriber(telemetrySubscriber); resultantStreamInfo = transformProcessor.process(resultantStreamInfo); @@ -80,11 +87,11 @@ public boolean canProcess(PostProcessorConfig config) { } private List getEnabledPostProcessors(TelemetrySubscriber subscriber, SchemaConfig schemaConfig) { - if (!configuration.getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) { + if (!daggerContext.getConfiguration().getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) { return new ArrayList<>(); } - ExternalMetricConfig externalMetricConfig = getExternalMetricConfig(configuration, subscriber); + ExternalMetricConfig externalMetricConfig = getExternalMetricConfig(daggerContext.getConfiguration(), subscriber); ArrayList processors = new ArrayList<>(); processors.add(new ExternalPostProcessor(schemaConfig, postProcessorConfig.getExternalSource(), externalMetricConfig)); processors.add(new InternalPostProcessor(postProcessorConfig, schemaConfig)); diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PostProcessorFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PostProcessorFactory.java index 19b950bcd..b7480b282 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PostProcessorFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PostProcessorFactory.java @@ -1,6 +1,7 @@ package io.odpf.dagger.core.processors; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.core.processors.longbow.LongbowFactory; import io.odpf.dagger.core.processors.longbow.LongbowSchema; @@ -22,22 +23,22 @@ public class PostProcessorFactory { /** * Gets post processors. * - * @param configuration the configuration + * @param daggerContext the daggerContext * @param stencilClientOrchestrator the stencil client orchestrator * @param columnNames the column names * @param metricsTelemetryExporter the metrics telemetry exporter * @return the post processors */ - public static List getPostProcessors(Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, String[] columnNames, MetricsTelemetryExporter metricsTelemetryExporter) { + public static List getPostProcessors(DaggerContext daggerContext, StencilClientOrchestrator stencilClientOrchestrator, String[] columnNames, MetricsTelemetryExporter metricsTelemetryExporter) { List postProcessors = new ArrayList<>(); if (Arrays.stream(columnNames).anyMatch(s -> Pattern.compile(".*\\blongbow.*key\\b.*").matcher(s).find())) { - postProcessors.add(getLongBowProcessor(columnNames, configuration, metricsTelemetryExporter, stencilClientOrchestrator)); + postProcessors.add(getLongBowProcessor(columnNames, daggerContext.getConfiguration(), metricsTelemetryExporter, stencilClientOrchestrator)); } - if (configuration.getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) { - postProcessors.add(new ParentPostProcessor(parsePostProcessorConfig(configuration), configuration, stencilClientOrchestrator, metricsTelemetryExporter)); + if (daggerContext.getConfiguration().getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) { + postProcessors.add(new ParentPostProcessor(daggerContext, stencilClientOrchestrator, metricsTelemetryExporter)); } - if (configuration.getBoolean(Constants.METRIC_TELEMETRY_ENABLE_KEY, Constants.METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT)) { + if (daggerContext.getConfiguration().getBoolean(Constants.METRIC_TELEMETRY_ENABLE_KEY, Constants.METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT)) { postProcessors.add(new TelemetryProcessor(metricsTelemetryExporter)); } return postProcessors; @@ -49,9 +50,4 @@ private static PostProcessor getLongBowProcessor(String[] columnNames, Configura return longbowFactory.getLongbowProcessor(); } - - private static PostProcessorConfig parsePostProcessorConfig(Configuration configuration) { - String postProcessorConfigString = configuration.getString(Constants.PROCESSOR_POSTPROCESSOR_CONFIG_KEY, ""); - return PostProcessorConfig.parse(postProcessorConfigString); - } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorFactory.java index 6e79136c0..ed411f1c1 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorFactory.java @@ -1,14 +1,8 @@ package io.odpf.dagger.core.processors; -import com.google.gson.FieldNamingPolicy; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonSyntaxException; -import com.jayway.jsonpath.InvalidJsonException; -import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.types.Preprocessor; -import io.odpf.dagger.core.utils.Constants; import java.util.Collections; import java.util.List; @@ -17,38 +11,15 @@ * The factory class for Preprocessor. */ public class PreProcessorFactory { - private static final Gson GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); - - /** - * Parse config preprocessor config. - * - * @param configuration the configuration - * @return the preprocessor config - */ - public static PreProcessorConfig parseConfig(Configuration configuration) { - if (!configuration.getBoolean(Constants.PROCESSOR_PREPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)) { - return null; - } - String configJson = configuration.getString(Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY, ""); - PreProcessorConfig config; - try { - config = GSON.fromJson(configJson, PreProcessorConfig.class); - } catch (JsonSyntaxException exception) { - throw new InvalidJsonException("Invalid JSON Given for " + Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY); - } - return config; - } - /** * Gets preprocessors. * - * @param configuration the configuration - * @param processorConfig the processor config + * @param daggerContext the daggerContext * @param tableName the table name * @param metricsTelemetryExporter the metrics telemetry exporter * @return the preprocessors */ - public static List getPreProcessors(Configuration configuration, PreProcessorConfig processorConfig, String tableName, MetricsTelemetryExporter metricsTelemetryExporter) { - return Collections.singletonList(new PreProcessorOrchestrator(configuration, processorConfig, metricsTelemetryExporter, tableName)); + public static List getPreProcessors(DaggerContext daggerContext, String tableName, MetricsTelemetryExporter metricsTelemetryExporter) { + return Collections.singletonList(new PreProcessorOrchestrator(daggerContext, metricsTelemetryExporter, tableName)); } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java index e3ad929a3..ed7055c6f 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java @@ -1,12 +1,19 @@ package io.odpf.dagger.core.processors; +import com.google.gson.FieldNamingPolicy; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonSyntaxException; +import com.jayway.jsonpath.InvalidJsonException; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.core.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.core.processors.common.ValidRecordsDecorator; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.transformers.TransformProcessor; import io.odpf.dagger.core.processors.types.Preprocessor; +import io.odpf.dagger.core.utils.Constants; import java.util.ArrayList; import java.util.List; @@ -17,32 +24,52 @@ public class PreProcessorOrchestrator implements Preprocessor { private final MetricsTelemetryExporter metricsTelemetryExporter; - private Configuration configuration; private final PreProcessorConfig processorConfig; private final String tableName; + private final DaggerContext daggerContext; + private final Gson GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); /** * Instantiates a new Preprocessor orchestrator. * - * @param configuration the configuration - * @param processorConfig the processor config + * @param daggerContext the daggerContext * @param metricsTelemetryExporter the metrics telemetry exporter * @param tableName the table name */ - public PreProcessorOrchestrator(Configuration configuration, PreProcessorConfig processorConfig, MetricsTelemetryExporter metricsTelemetryExporter, String tableName) { - this.configuration = configuration; - this.processorConfig = processorConfig; + public PreProcessorOrchestrator(DaggerContext daggerContext, MetricsTelemetryExporter metricsTelemetryExporter, String tableName) { + this.daggerContext = daggerContext; + this.processorConfig = parseConfig(daggerContext.getConfiguration()); this.metricsTelemetryExporter = metricsTelemetryExporter; this.tableName = tableName; } + /** + * Parse config preprocessor config. + * + * @param configuration the configuration + * @return the preprocessor config + */ + public PreProcessorConfig parseConfig(Configuration configuration) { + if (!configuration.getBoolean(Constants.PROCESSOR_PREPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)) { + return null; + } + String configJson = configuration.getString(Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY, ""); + PreProcessorConfig config; + try { + config = GSON.fromJson(configJson, PreProcessorConfig.class); + } catch (JsonSyntaxException exception) { + throw new InvalidJsonException("Invalid JSON Given for " + Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY); + } + return config; + } + @Override public StreamInfo process(StreamInfo streamInfo) { for (Preprocessor processor : getProcessors()) { streamInfo = processor.process(streamInfo); } return new StreamInfo( - new ValidRecordsDecorator(tableName, streamInfo.getColumnNames(), configuration) + new ValidRecordsDecorator(tableName, streamInfo.getColumnNames(), daggerContext.getConfiguration()) .decorate(streamInfo.getDataStream()), streamInfo.getColumnNames()); } @@ -64,7 +91,7 @@ protected List getProcessors() { elem.getTableName(), TelemetryTypes.PRE_PROCESSOR_TYPE, elem.getTransformers(), - configuration); + daggerContext); processor.notifySubscriber(metricsTelemetryExporter); preprocessors.add(processor); }); diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/transformers/TransformProcessor.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/transformers/TransformProcessor.java index 642b83400..ba5cff334 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/transformers/TransformProcessor.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/transformers/TransformProcessor.java @@ -1,6 +1,6 @@ package io.odpf.dagger.core.processors.transformers; -import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.core.exception.TransformClassNotDefinedException; @@ -37,16 +37,16 @@ public String getTableName() { protected final String tableName; private final Map> metrics = new HashMap<>(); protected final TelemetryTypes type; - private Configuration configuration; + private final DaggerContext daggerContext; /** * Instantiates a new Transform processor. * * @param transformConfigs the transform configs - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public TransformProcessor(List transformConfigs, Configuration configuration) { - this("NULL", TelemetryTypes.POST_PROCESSOR_TYPE, transformConfigs, configuration); + public TransformProcessor(List transformConfigs, DaggerContext daggerContext) { + this("NULL", TelemetryTypes.POST_PROCESSOR_TYPE, transformConfigs, daggerContext); } /** @@ -54,13 +54,13 @@ public TransformProcessor(List transformConfigs, Configuration * * @param tableName the table name * @param type the type - * @param configuration the configuration + * @param daggerContext the configuration */ - public TransformProcessor(String tableName, TelemetryTypes type, List transformConfigs, Configuration configuration) { + public TransformProcessor(String tableName, TelemetryTypes type, List transformConfigs, DaggerContext daggerContext) { this.transformConfigs = transformConfigs == null ? new ArrayList<>() : transformConfigs; this.tableName = tableName; this.type = type; - this.configuration = configuration; + this.daggerContext = daggerContext; TransformerUtils.populateDefaultArguments(this); } @@ -126,7 +126,7 @@ private void addMetric(String key, String value) { */ protected Transformer getTransformMethod(TransformConfig transformConfig, String className, String[] columnNames) throws ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { Class transformerClass = Class.forName(className); - Constructor transformerClassConstructor = transformerClass.getConstructor(Map.class, String[].class, Configuration.class); - return (Transformer) transformerClassConstructor.newInstance(transformConfig.getTransformationArguments(), columnNames, configuration); + Constructor transformerClassConstructor = transformerClass.getConstructor(Map.class, String[].class, DaggerContext.class); + return (Transformer) transformerClassConstructor.newInstance(transformConfig.getTransformationArguments(), columnNames, daggerContext); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/StreamManagerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/StreamManagerTest.java index 652daf9db..0194a7b88 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/StreamManagerTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/StreamManagerTest.java @@ -1,6 +1,7 @@ package io.odpf.dagger.core; -import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContextTestBase; +import io.odpf.dagger.common.core.DaggerContext; import io.odpf.dagger.common.core.StreamInfo; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -11,13 +12,11 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.junit.Before; import org.junit.Test; @@ -38,7 +37,7 @@ @PrepareForTest(TableSchema.class) @RunWith(PowerMockRunner.class) -public class StreamManagerTest { +public class StreamManagerTest extends DaggerContextTestBase { private StreamManager streamManager; @@ -55,9 +54,6 @@ public class StreamManagerTest { + " }\n" + "]"; - @Mock - private StreamExecutionEnvironment env; - @Mock private ExecutionConfig executionConfig; @@ -67,9 +63,6 @@ public class StreamManagerTest { @Mock private CheckpointConfig checkpointConfig; - @Mock - private StreamTableEnvironment tableEnvironment; - @Mock private DataStream dataStream; @@ -85,10 +78,6 @@ public class StreamManagerTest { @Mock private TableSchema schema; - @Mock - private Configuration configuration; - - @Mock private org.apache.flink.configuration.Configuration flinkConfiguration; @@ -113,26 +102,26 @@ public void setup() { when(configuration.getString("FLINK_SQL_QUERY", "")).thenReturn(""); when(configuration.getInteger("FLINK_RETENTION_IDLE_STATE_MINUTE", 10)).thenReturn(10); when(flinkConfiguration.getString(any())).thenReturn("10"); - when(env.getConfig()).thenReturn(executionConfig); - when(env.getConfiguration()).thenReturn(flinkConfiguration); - when(env.getCheckpointConfig()).thenReturn(checkpointConfig); - when(env.addSource(any(FlinkKafkaConsumerBase.class))).thenReturn(source); - when(tableEnvironment.getConfig()).thenReturn(tableConfig); - when(env.fromSource(any(KafkaSource.class), any(WatermarkStrategy.class), any(String.class))).thenReturn(source); + when(streamExecutionEnvironment.getConfig()).thenReturn(executionConfig); + when(streamExecutionEnvironment.getConfiguration()).thenReturn(flinkConfiguration); + when(streamExecutionEnvironment.getCheckpointConfig()).thenReturn(checkpointConfig); + when(streamExecutionEnvironment.addSource(any(FlinkKafkaConsumerBase.class))).thenReturn(source); + when(streamTableEnvironment.getConfig()).thenReturn(tableConfig); + when(streamExecutionEnvironment.fromSource(any(KafkaSource.class), any(WatermarkStrategy.class), any(String.class))).thenReturn(source); when(source.getType()).thenReturn(typeInformation); when(typeInformation.getTypeClass()).thenReturn(Row.class); when(schema.getFieldNames()).thenReturn(new String[0]); PowerMockito.mockStatic(TableSchema.class); when(TableSchema.fromTypeInfo(typeInformation)).thenReturn(schema); - streamManager = new StreamManager(configuration, env, tableEnvironment); + streamManager = new StreamManager(daggerContext); } @Test public void shouldRegisterRequiredConfigsOnExecutionEnvironment() { streamManager.registerConfigs(); - verify(env, Mockito.times(1)).setParallelism(1); - verify(env, Mockito.times(1)).enableCheckpointing(30000); + verify(streamExecutionEnvironment, Mockito.times(1)).setParallelism(1); + verify(streamExecutionEnvironment, Mockito.times(1)).enableCheckpointing(30000); verify(executionConfig, Mockito.times(1)).setAutoWatermarkInterval(10000); verify(executionConfig, Mockito.times(1)).setGlobalJobParameters(configuration.getParam()); verify(checkpointConfig, Mockito.times(1)).setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); @@ -147,33 +136,33 @@ public void shouldRegisterSourceWithPreprocessorsWithWaterMarks() { when(source.assignTimestampsAndWatermarks(any(WatermarkStrategy.class))).thenReturn(singleOutputStream); when(singleOutputStream.getType()).thenReturn(typeInformation); - StreamManagerStub streamManagerStub = new StreamManagerStub(configuration, env, tableEnvironment, new StreamInfo(dataStream, new String[]{})); + StreamManagerStub streamManagerStub = new StreamManagerStub(daggerContext, new StreamInfo(dataStream, new String[]{})); streamManagerStub.registerConfigs(); streamManagerStub.registerSourceWithPreProcessors(); - verify(tableEnvironment, Mockito.times(1)).fromDataStream(any(), new ApiExpression[]{}); + verify(streamTableEnvironment, Mockito.times(1)).fromDataStream(any(), new ApiExpression[]{}); } @Test public void shouldCreateOutputStream() { - StreamManagerStub streamManagerStub = new StreamManagerStub(configuration, env, tableEnvironment, new StreamInfo(dataStream, new String[]{})); + StreamManagerStub streamManagerStub = new StreamManagerStub(daggerContext, new StreamInfo(dataStream, new String[]{})); streamManagerStub.registerOutputStream(); - verify(tableEnvironment, Mockito.times(1)).sqlQuery(""); + verify(streamTableEnvironment, Mockito.times(1)).sqlQuery(""); } @Test public void shouldExecuteJob() throws Exception { streamManager.execute(); - verify(env, Mockito.times(1)).execute("SQL Flink job"); + verify(streamExecutionEnvironment, Mockito.times(1)).execute("SQL Flink job"); } final class StreamManagerStub extends StreamManager { private final StreamInfo streamInfo; - private StreamManagerStub(Configuration configuration, StreamExecutionEnvironment executionEnvironment, StreamTableEnvironment tableEnvironment, StreamInfo streamInfo) { - super(configuration, executionEnvironment, tableEnvironment); + private StreamManagerStub(DaggerContext daggerContext, StreamInfo streamInfo) { + super(daggerContext); this.streamInfo = streamInfo; } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java index 01d72ab2b..02769f6b7 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java @@ -15,13 +15,13 @@ public class ParentPostProcessorTest { @Test public void shouldNotBeAbleToProcessWhenConfigIsNull() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor( null, null, telemetrySubscriber); assertFalse(parentPostProcessor.canProcess(null)); } @Test public void shouldNotBeAbleToProcessWhenConfigIsEmpty() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, telemetrySubscriber); PostProcessorConfig mockConfig = mock(PostProcessorConfig.class); when(mockConfig.isEmpty()).thenReturn(true); assertFalse(parentPostProcessor.canProcess(mockConfig)); @@ -29,7 +29,7 @@ public void shouldNotBeAbleToProcessWhenConfigIsEmpty() { @Test public void shouldBeAbleToProcessWhenConfigIsNotEmpty() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor( null, null, telemetrySubscriber); PostProcessorConfig mockConfig = mock(PostProcessorConfig.class); when(mockConfig.isEmpty()).thenReturn(false); assertTrue(parentPostProcessor.canProcess(mockConfig)); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorFactoryTest.java index 8f8c23f4e..54d2478a5 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorFactoryTest.java @@ -1,6 +1,6 @@ package io.odpf.dagger.core.processors; -import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContextTestBase; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.core.processors.longbow.LongbowProcessor; import io.odpf.dagger.core.processors.telemetry.TelemetryProcessor; @@ -19,9 +19,7 @@ import static org.mockito.MockitoAnnotations.initMocks; -public class PostProcessorFactoryTest { - @Mock - private Configuration configuration; +public class PostProcessorFactoryTest extends DaggerContextTestBase { @Mock private StencilClientOrchestrator stencilClientOrchestrator; @@ -58,7 +56,7 @@ public void shouldReturnLongbowProcessor() { when(configuration.getBoolean(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); when(configuration.getString(INPUT_STREAMS, "")).thenReturn(jsonArray); - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); assertEquals(1, postProcessors.size()); assertEquals(LongbowProcessor.class, postProcessors.get(0).getClass()); @@ -69,7 +67,7 @@ public void shouldReturnParentPostProcessor() { when(configuration.getString(FLINK_SQL_QUERY_KEY, FLINK_SQL_QUERY_DEFAULT)).thenReturn("test-sql"); when(configuration.getBoolean(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); assertEquals(1, postProcessors.size()); assertEquals(ParentPostProcessor.class, postProcessors.get(0).getClass()); @@ -81,7 +79,7 @@ public void shouldReturnTelemetryPostProcessor() { when(configuration.getBoolean(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); when(configuration.getBoolean(METRIC_TELEMETRY_ENABLE_KEY, METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT)).thenReturn(true); - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); assertEquals(1, postProcessors.size()); assertEquals(TelemetryProcessor.class, postProcessors.get(0).getClass()); @@ -91,7 +89,7 @@ public void shouldReturnTelemetryPostProcessor() { public void shouldNotReturnAnyPostProcessor() { when(configuration.getString(FLINK_SQL_QUERY_KEY, FLINK_SQL_QUERY_DEFAULT)).thenReturn("test-sql"); when(configuration.getBoolean(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, columnNames, metricsTelemetryExporter); assertEquals(0, postProcessors.size()); } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java index ae5d9eaa6..cf3a98872 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java @@ -2,6 +2,7 @@ import com.jayway.jsonpath.InvalidJsonException; import io.odpf.dagger.common.configuration.Configuration; +import io.odpf.dagger.common.core.DaggerContextTestBase; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.types.Preprocessor; import org.junit.Before; @@ -20,7 +21,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class PreProcessorFactoryTest { +public class PreProcessorFactoryTest extends DaggerContextTestBase { @Mock private MetricsTelemetryExporter metricsTelemetryExporter; @@ -46,9 +47,6 @@ public class PreProcessorFactoryTest { + " ]\n" + "}"; - @Mock - private Configuration configuration; - @Before public void setup() { initMocks(this); @@ -58,35 +56,7 @@ public void setup() { public void shouldReturnPreProcessors() { when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); - PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration); - assertNotNull(preProcessorConfig); - List preProcessors = PreProcessorFactory.getPreProcessors(configuration, preProcessorConfig, "booking", metricsTelemetryExporter); + List preProcessors = PreProcessorFactory.getPreProcessors(daggerContext, "booking", metricsTelemetryExporter); assertEquals(1, preProcessors.size()); } - - @Test - public void shouldParseConfig() { - when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); - when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); - PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration); - assertEquals(2, preProcessorConfig.getTableTransformers().size()); - assertEquals(2, preProcessorConfig.getTableTransformers().get(0).getTransformers().size()); - assertEquals("PreProcessorClass", preProcessorConfig.getTableTransformers().get(0).getTransformers().get(0).getTransformationClass()); - } - - @Test - public void shouldThrowExceptionForInvalidJson() { - when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); - when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn("blah"); - InvalidJsonException exception = assertThrows(InvalidJsonException.class, () -> PreProcessorFactory.parseConfig(configuration)); - assertEquals("Invalid JSON Given for PROCESSOR_PREPROCESSOR_CONFIG", exception.getMessage()); - } - - @Test - public void shouldNotParseConfigWhenDisabled() { - when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); - when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); - PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration); - assertNull(preProcessorConfig); - } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java index 85dc2e4ca..04272e764 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java @@ -1,5 +1,7 @@ package io.odpf.dagger.core.processors; +import com.jayway.jsonpath.InvalidJsonException; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; @@ -20,10 +22,14 @@ import java.util.HashMap; import java.util.List; -import static org.junit.Assert.assertEquals; +import static io.odpf.dagger.core.utils.Constants.*; +import static io.odpf.dagger.core.utils.Constants.PROCESSOR_PREPROCESSOR_CONFIG_KEY; +import static org.junit.Assert.*; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class PreProcessorOrchestratorTest { +public class PreProcessorOrchestratorTest extends DaggerContextTestBase { @Mock private MetricsTelemetryExporter exporter; @@ -34,14 +40,32 @@ public class PreProcessorOrchestratorTest { @Mock private DataStream stream; - @Mock - private Configuration configuration; - @Before public void setup() { initMocks(this); } + private String preProcessorConfigJson = "{\n" + + " \"table_transformers\": [{\n" + + " \"table_name\": \"booking\",\n" + + " \"transformers\": [{\n" + + " \"transformation_class\": \"PreProcessorClass\"\n" + + " }, {\n" + + " \"transformation_class\": \"PreProcessorClass\",\n" + + " \"transformation_arguments\": {\n" + + " \"key\": \"value\"\n" + + " }\n" + + " }]\n" + + " },\n" + + " {\n" + + " \"table_name\": \"another_booking\",\n" + + " \"transformers\": [{\n" + + " \"transformation_class\": \"PreProcessorClass\"\n" + + " }]\n" + + " }\n" + + " ]\n" + + "}"; + @Test public void shouldGetProcessors() { PreProcessorConfig config = new PreProcessorConfig(); @@ -49,7 +73,7 @@ public void shouldGetProcessors() { transformConfigs.add(new TransformConfig("InvalidRecordFilterTransformer", new HashMap<>())); TableTransformConfig ttc = new TableTransformConfig("test", transformConfigs); config.tableTransformers = Collections.singletonList(ttc); - PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(configuration, config, exporter, "test"); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); Mockito.when(streamInfo.getColumnNames()).thenReturn(new String[0]); Mockito.when(streamInfo.getDataStream()).thenReturn(stream); @@ -61,8 +85,7 @@ public void shouldGetProcessors() { @Test public void shouldNotGetProcessors() { - PreProcessorConfig config = new PreProcessorConfig(); - PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(configuration, config, exporter, "test"); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); Mockito.when(streamInfo.getColumnNames()).thenReturn(new String[0]); Mockito.when(streamInfo.getDataStream()).thenReturn(stream); @@ -70,4 +93,42 @@ public void shouldNotGetProcessors() { assertEquals(0, processors.size()); } + + @Test + public void shouldNotNullConfig() { + when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); + PreProcessorConfig preProcessorConfig = ppo.parseConfig(configuration); + assertNotNull(preProcessorConfig); + } + + @Test + public void shouldParseConfig() { + when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); + PreProcessorConfig preProcessorConfig = ppo.parseConfig(configuration); + assertEquals(2, preProcessorConfig.getTableTransformers().size()); + assertEquals(2, preProcessorConfig.getTableTransformers().get(0).getTransformers().size()); + assertEquals("PreProcessorClass", preProcessorConfig.getTableTransformers().get(0).getTransformers().get(0).getTransformationClass()); + } + + @Test + public void shouldThrowExceptionForInvalidJson() { + when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn("blah"); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); + InvalidJsonException exception = assertThrows(InvalidJsonException.class, () -> ppo.parseConfig(configuration)); + assertEquals("Invalid JSON Given for PROCESSOR_PREPROCESSOR_CONFIG", exception.getMessage()); + } + + @Test + public void shouldNotParseConfigWhenDisabled() { + when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(false); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorConfigJson); + PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); + PreProcessorConfig preProcessorConfig = ppo.parseConfig(configuration); + assertNull(preProcessorConfig); + } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java index 4a8358642..2c51cb376 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java @@ -1,5 +1,6 @@ package io.odpf.dagger.core.processors.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -12,7 +13,7 @@ import java.util.Map; public class MockTransformer implements Transformer, MapFunction { - public MockTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public MockTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { } @Override diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java index d797b9b1b..e36db10bc 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java @@ -1,11 +1,9 @@ package io.odpf.dagger.core.processors.transformers; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.core.metrics.telemetry.TelemetryTypes; @@ -28,7 +26,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class TransformProcessorTest { +public class TransformProcessorTest extends DaggerContextTestBase { @Mock @@ -37,18 +35,12 @@ public class TransformProcessorTest { @Mock private StreamInfo streamInfo; - @Mock - private DataStream dataStream; - @Mock private SingleOutputStreamOperator mappedDataStream; @Mock private Transformer transformer; - @Mock - private Configuration configuration; - @Mock private MetricsTelemetryExporter metricsTelemetryExporter; @@ -59,35 +51,35 @@ public void setup() { @Test public void shouldThrowExceptionInCaseOfWrongClassName() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); transfromConfigs = new ArrayList<>(); transfromConfigs.add(new TransformConfig("wrongClassName", transformationArguments)); - TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, configuration); + TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, daggerContext); RuntimeException exception = assertThrows(RuntimeException.class, () -> transformProcessor.process(streamInfo)); assertEquals("wrongClassName", exception.getMessage()); } @Test public void shouldThrowExceptionInCaseOfWrongConstructorTypeSupported() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); transfromConfigs = new ArrayList<>(); transfromConfigs.add(new TransformConfig("io.odpf.dagger.core.processors.transformers.TransformProcessor", transformationArguments)); - TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, configuration); + TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, daggerContext); RuntimeException exception = assertThrows(RuntimeException.class, () -> transformProcessor.process(streamInfo)); assertEquals("io.odpf.dagger.core.processors.transformers.TransformProcessor.(java.util.Map, [Ljava.lang.String;, io.odpf.dagger.common.configuration.Configuration)", exception.getMessage()); } @Test public void shouldProcessClassExtendingMapFunction() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); @@ -102,7 +94,7 @@ public void shouldProcessClassExtendingMapFunction() { @Test public void shouldAddPostProcessorTypeMetrics() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); @@ -122,7 +114,7 @@ public void shouldAddPostProcessorTypeMetrics() { @Test public void shouldAddPreProcessorTypeMetrics() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); @@ -142,7 +134,7 @@ public void shouldAddPreProcessorTypeMetrics() { @Test public void shouldNotifySubscribers() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("keyField", "keystore"); @@ -158,9 +150,9 @@ public void shouldNotifySubscribers() { @Test public void shouldProcessTwoPostTransformers() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); - when(dataStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); + when(inputStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); transfromConfigs = new ArrayList<>(); transfromConfigs.add(new TransformConfig("io.odpf.dagger.core.processors.transformers.MockTransformer", new HashMap() {{ put("keyField", "keystore"); @@ -169,16 +161,16 @@ public void shouldProcessTwoPostTransformers() { put("keyField", "keystore"); }})); - TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, configuration); + TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, daggerContext); transformProcessor.process(streamInfo); verify(mappedDataStream, times(1)).map(any()); } @Test public void shouldProcessMultiplePostTransformers() { - when(streamInfo.getDataStream()).thenReturn(dataStream); + when(streamInfo.getDataStream()).thenReturn(inputStream); when(streamInfo.getColumnNames()).thenReturn(null); - when(dataStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); + when(inputStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); when(mappedDataStream.map(any(MapFunction.class))).thenReturn(mappedDataStream); transfromConfigs = new ArrayList<>(); transfromConfigs.add(new TransformConfig("io.odpf.dagger.core.processors.transformers.MockTransformer", new HashMap() {{ @@ -191,7 +183,7 @@ public void shouldProcessMultiplePostTransformers() { put("keyField", "keystore"); }})); - TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, configuration); + TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, daggerContext); transformProcessor.process(streamInfo); verify(mappedDataStream, times(2)).map(any()); @@ -202,7 +194,7 @@ public void shouldPopulateDefaultArguments() { TransformConfig config = new TransformConfig("io.odpf.TestProcessor", new HashMap() {{ put("test-key", "test-value"); }}); - TransformProcessor processor = new TransformProcessor("test_table", PRE_PROCESSOR_TYPE, Collections.singletonList(config), configuration); + TransformProcessor processor = new TransformProcessor("test_table", PRE_PROCESSOR_TYPE, Collections.singletonList(config), daggerContext); assertEquals("test_table", processor.tableName); assertEquals(PRE_PROCESSOR_TYPE, processor.type); assertEquals(1, processor.transformConfigs.size()); @@ -223,12 +215,12 @@ final class TransformProcessorMock extends TransformProcessor { private Transformer mockMapFunction; private TransformProcessorMock(Transformer mockMapFunction, List transformConfigs) { - super(transformConfigs, configuration); + super(transformConfigs, daggerContext); this.mockMapFunction = mockMapFunction; } private TransformProcessorMock(String table, TelemetryTypes type, Transformer mockMapFunction, List transformConfigs) { - super(table, type, transformConfigs, configuration); + super(table, type, transformConfigs, daggerContext); this.mockMapFunction = mockMapFunction; } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java index 22da89c38..e6985a488 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java @@ -1,6 +1,7 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -26,9 +27,9 @@ public class ClearColumnTransformer implements MapFunction, Transforme * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public ClearColumnTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public ClearColumnTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.columnNames = columnNames; this.targetColumnName = transformationArguments.get(TARGET_KEY_COLUMN_NAME); } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java index bd7eaa0aa..d667c0b8f 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java @@ -1,5 +1,6 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; @@ -31,9 +32,9 @@ public class DeDuplicationTransformer extends RichFilterFunction implements * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public DeDuplicationTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public DeDuplicationTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { keyIndex = Arrays.asList(columnNames).indexOf(String.valueOf(transformationArguments.get("key_column"))); ttlInSeconds = Integer.valueOf(String.valueOf(transformationArguments.get("ttl_in_seconds"))); } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java index af92c774f..dba8714e1 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java @@ -1,5 +1,6 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -30,9 +31,9 @@ public class FeatureTransformer implements MapFunction, Transformer { * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public FeatureTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public FeatureTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.columnNames = columnNames; this.keyColumn = transformationArguments.get(KEY_COLUMN_NAME); this.valueColumn = transformationArguments.get(VALUE_COLUMN_NAME); diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java index ac510eec1..afbbabca5 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java @@ -1,5 +1,6 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -26,9 +27,9 @@ public class FeatureWithTypeTransformer implements MapFunction, Transf * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public FeatureWithTypeTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public FeatureWithTypeTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.featureWithTypeHandler = new FeatureWithTypeHandler(transformationArguments, columnNames); } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java index f39ac6a86..68613e43f 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java @@ -1,5 +1,6 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -28,8 +29,7 @@ public class HashTransformer extends RichMapFunction implements Serial private static final String SINK_KAFKA_PROTO_MESSAGE = "SINK_KAFKA_PROTO_MESSAGE"; private static final String ENCRYPTION_FIELD_KEY = "maskColumns"; private final List fieldsToHash; - private Configuration configuration; - + private final DaggerContext daggerContext; private final String[] columnNames; private Map rowHasherMap; @@ -38,12 +38,12 @@ public class HashTransformer extends RichMapFunction implements Serial * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public HashTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public HashTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.fieldsToHash = getFieldsToHash(transformationArguments); this.columnNames = columnNames; - this.configuration = configuration; + this.daggerContext = daggerContext; } private ArrayList getFieldsToHash(Map transformationArguments) { @@ -71,8 +71,8 @@ public StreamInfo transform(StreamInfo streamInfo) { * @return the map */ protected Map createRowHasherMap() { - String outputProtoClassName = configuration.getString(SINK_KAFKA_PROTO_MESSAGE, ""); - StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration); + String outputProtoClassName = daggerContext.getConfiguration().getString(SINK_KAFKA_PROTO_MESSAGE, ""); + StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(daggerContext.getConfiguration()); Descriptors.Descriptor outputDescriptor = stencilClientOrchestrator.getStencilClient().get(outputProtoClassName); if (outputDescriptor == null) { throw new DescriptorNotFoundException("Output Descriptor for class: " + outputProtoClassName diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java index 622b4e0c8..81cbd0890 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java @@ -1,5 +1,6 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.types.Row; @@ -34,9 +35,9 @@ public class InvalidRecordFilterTransformer extends RichFilterFunction impl * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public InvalidRecordFilterTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public InvalidRecordFilterTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.tableName = (String) transformationArguments.getOrDefault("table_name", ""); validationIndex = Arrays.asList(columnNames).indexOf(INTERNAL_VALIDATION_FILED); } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java index a2b228401..5ab75d350 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java @@ -1,13 +1,12 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.common.watermark.RowtimeFieldWatermark; @@ -27,19 +26,21 @@ public class SQLTransformer implements Serializable, Transformer { private final String tableName; private final long allowedLatenessInMs; private static final String ROWTIME = "rowtime"; + private final DaggerContext daggerContext; /** * Instantiates a new Sql transformer. * * @param transformationArguments the transformation arguments * @param columnNames the column names - * @param configuration the configuration + * @param daggerContext the daggerContext */ - public SQLTransformer(Map transformationArguments, String[] columnNames, Configuration configuration) { + public SQLTransformer(Map transformationArguments, String[] columnNames, DaggerContext daggerContext) { this.columnNames = columnNames; this.sqlQuery = transformationArguments.get("sqlQuery"); this.tableName = transformationArguments.getOrDefault("tableName", "data_stream"); this.allowedLatenessInMs = Long.parseLong(transformationArguments.getOrDefault("allowedLatenessInMs", "0")); + this.daggerContext = daggerContext; } @Override @@ -53,8 +54,7 @@ public StreamInfo transform(StreamInfo inputStreamInfo) { schema = schema.replace(ROWTIME, ROWTIME + ".rowtime"); inputStream = assignTimeAttribute(inputStream); } - StreamExecutionEnvironment streamExecutionEnvironment = inputStream.getExecutionEnvironment(); - StreamTableEnvironment streamTableEnvironment = getStreamTableEnvironment(streamExecutionEnvironment); + StreamTableEnvironment streamTableEnvironment = daggerContext.getTableEnvironment(); streamTableEnvironment.registerDataStream(tableName, inputStream, schema); Table table = streamTableEnvironment.sqlQuery(sqlQuery); @@ -65,16 +65,6 @@ public StreamInfo transform(StreamInfo inputStreamInfo) { return new StreamInfo(outputStream, table.getSchema().getFieldNames()); } - /** - * Gets stream table environment. - * - * @param streamExecutionEnvironment the stream execution environment - * @return the stream table environment - */ - protected StreamTableEnvironment getStreamTableEnvironment(StreamExecutionEnvironment streamExecutionEnvironment) { - return StreamTableEnvironment.create(streamExecutionEnvironment); - } - private DataStream assignTimeAttribute(DataStream inputStream) { StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new RowtimeFieldWatermark(columnNames)); return streamWatermarkAssigner.assignTimeStampAndWatermark(inputStream, allowedLatenessInMs); diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/ClearColumnTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/ClearColumnTransformerTest.java index 10eee0d7e..89e0af749 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/ClearColumnTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/ClearColumnTransformerTest.java @@ -1,13 +1,11 @@ package io.odpf.dagger.functions.transformers; -import org.apache.flink.streaming.api.datastream.DataStream; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; import java.util.HashMap; import java.util.Map; @@ -20,13 +18,7 @@ import static org.mockito.MockitoAnnotations.initMocks; -public class ClearColumnTransformerTest { - - @Mock - private DataStream dataStream; - - @Mock - private Configuration configuration; +public class ClearColumnTransformerTest extends DaggerContextTestBase { @Before public void setup() { @@ -44,7 +36,7 @@ public void shouldSetTargetColumnToEmpty() { inputRow.setField(0, "NEWDEVICE.FREEZE.CR.UPDATE.PIN"); inputRow.setField(1, "wallet-id-123"); inputRow.setField(2, commsData); - ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, configuration); + ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, daggerContext); Row outputRow = clearColumnTransformer.map(inputRow); assertEquals("", outputRow.getField(1)); } @@ -58,7 +50,7 @@ public void shouldThrowExceptionWhenTargetColumnIsNotPresent() { inputRow.setField(0, "NEWDEVICE.FREEZE.CR.UPDATE.PIN"); inputRow.setField(1, "wallet-id-123"); inputRow.setField(2, "random"); - ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, configuration); + ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, daggerContext); clearColumnTransformer.map(inputRow); } @@ -73,10 +65,10 @@ public void shouldTransformInputStreamWithItsTransformer() { inputRow.setField(0, "NEWDEVICE.FREEZE.CR.UPDATE.PIN"); inputRow.setField(1, "wallet-id-123"); inputRow.setField(2, commsData); - ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); clearColumnTransformer.transform(inputStreamInfo); - verify(dataStream, times(1)).map(any(ClearColumnTransformer.class)); + verify(inputStream, times(1)).map(any(ClearColumnTransformer.class)); } @Test @@ -90,8 +82,8 @@ public void shouldReturnSameColumnNames() { inputRow.setField(0, "NEWDEVICE.FREEZE.CR.UPDATE.PIN"); inputRow.setField(1, "wallet-id-123"); inputRow.setField(2, commsData); - ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + ClearColumnTransformer clearColumnTransformer = new ClearColumnTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); StreamInfo outputStreamInfo = clearColumnTransformer.transform(inputStreamInfo); assertArrayEquals(columnNames, outputStreamInfo.getColumnNames()); } diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java index cae7fae16..37856190d 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java @@ -1,14 +1,13 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import org.junit.Assert; import org.junit.Before; @@ -27,27 +26,20 @@ import static org.mockito.MockitoAnnotations.initMocks; @RunWith(MockitoJUnitRunner.class) -public class DeDuplicationTransformerTest { +public class DeDuplicationTransformerTest extends DaggerContextTestBase { @Mock private RuntimeContext runtimeContext; - @Mock - private Configuration configuration; - @Mock private org.apache.flink.configuration.Configuration flinkInternalConfig; @Mock private MapState mapState; - @Mock - private DataStream dataStream; - @Mock private KeyedStream keyedStream; - @Before public void setup() { initMocks(this); @@ -114,7 +106,7 @@ public void shouldFilterRecordsIfKeyAlreadyPresent() throws Exception { @Test public void shouldTransformInputStreamWithKeyByAndApplyFilterFunction() { - when(dataStream.keyBy(any(KeySelector.class))).thenReturn(keyedStream); + when(inputStream.keyBy(any(KeySelector.class))).thenReturn(keyedStream); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("key_column", "status"); transformationArguments.put("ttl_in_seconds", 10); @@ -123,15 +115,15 @@ public void shouldTransformInputStreamWithKeyByAndApplyFilterFunction() { inputRow.setField(0, "123"); inputRow.setField(1, "TEST_SERVICE_TYPE"); inputRow.setField(2, "TEST_STATUS"); - DeDuplicationTransformer deDuplicationTransformer = new DeDuplicationTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + DeDuplicationTransformer deDuplicationTransformer = new DeDuplicationTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); deDuplicationTransformer.transform(inputStreamInfo); verify(keyedStream, times(1)).filter(any(DeDuplicationTransformer.class)); } @Test public void shouldReturnSameColumnNames() { - when(dataStream.keyBy(any(KeySelector.class))).thenReturn(keyedStream); + when(inputStream.keyBy(any(KeySelector.class))).thenReturn(keyedStream); HashMap transformationArguments = new HashMap<>(); transformationArguments.put("key_column", "status"); transformationArguments.put("ttl_in_seconds", 10); @@ -140,8 +132,8 @@ public void shouldReturnSameColumnNames() { inputRow.setField(0, "123"); inputRow.setField(1, "TEST_SERVICE_TYPE"); inputRow.setField(2, "TEST_STATUS"); - DeDuplicationTransformer deDuplicationTransformer = new DeDuplicationTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + DeDuplicationTransformer deDuplicationTransformer = new DeDuplicationTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); StreamInfo outputStreamInfo = deDuplicationTransformer.transform(inputStreamInfo); Assert.assertArrayEquals(columnNames, outputStreamInfo.getColumnNames()); } @@ -149,7 +141,7 @@ public void shouldReturnSameColumnNames() { public class DeDuplicationTransformerStub extends DeDuplicationTransformer { public DeDuplicationTransformerStub(Map transformationArguments, String[] columnNames) { - super(transformationArguments, columnNames, configuration); + super(transformationArguments, columnNames, daggerContext); } @Override diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java index 9c01caff5..0a8da869a 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java @@ -1,14 +1,12 @@ package io.odpf.dagger.functions.transformers; -import org.apache.flink.streaming.api.datastream.DataStream; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; import java.util.HashMap; @@ -17,14 +15,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; -public class FeatureTransformerTest { - - @Mock - private DataStream dataStream; - - @Mock - private Configuration configuration; - +public class FeatureTransformerTest extends DaggerContextTestBase { + @Before public void setup() { initMocks(this); @@ -40,7 +32,7 @@ public void shouldReturnOutputRowAsFeatureRowOnPassingFloatAsValue() throws Exce inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2f); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); Row outputRow = featureTransformer.map(inputRow); Assert.assertEquals(3, outputRow.getArity()); Assert.assertEquals(inputRow.getField(0), outputRow.getField(0)); @@ -61,7 +53,7 @@ public void shouldReturnOutputRowAsFeatureRowOnPassingIntegerAsValue() throws Ex inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); Row outputRow = featureTransformer.map(inputRow); Assert.assertEquals(3, outputRow.getArity()); Assert.assertEquals(inputRow.getField(0), outputRow.getField(0)); @@ -81,7 +73,7 @@ public void shouldThrowExceptionForUnSupportedDataType() throws Exception { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, "value".getBytes()); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); featureTransformer.map(inputRow); } @@ -95,7 +87,7 @@ public void shouldHandleWhenKeyIsNotPresent() throws Exception { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2f); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); featureTransformer.map(inputRow); } @@ -109,7 +101,7 @@ public void shouldHandleWhenValueIsNotPresent() throws Exception { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2f); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); featureTransformer.map(inputRow); } @@ -123,10 +115,10 @@ public void shouldTransformInputStreamWithItsTransformer() { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); featureTransformer.transform(inputStreamInfo); - verify(dataStream, times(1)).map(any(FeatureTransformer.class)); + verify(inputStream, times(1)).map(any(FeatureTransformer.class)); } @Test @@ -139,8 +131,8 @@ public void shouldReturnSameColumnNames() { inputRow.setField(0, "test"); inputRow.setField(1, 1L); inputRow.setField(2, 2); - FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + FeatureTransformer featureTransformer = new FeatureTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); StreamInfo outputStreamInfo = featureTransformer.transform(inputStreamInfo); Assert.assertArrayEquals(columnNames, outputStreamInfo.getColumnNames()); } diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformerTest.java index bc354c1a4..83f99eb15 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformerTest.java @@ -1,9 +1,8 @@ package io.odpf.dagger.functions.transformers; -import org.apache.flink.streaming.api.datastream.DataStream; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import org.junit.Assert; import org.junit.Before; @@ -19,17 +18,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; -public class FeatureWithTypeTransformerTest { +public class FeatureWithTypeTransformerTest extends DaggerContextTestBase { - @Mock - private DataStream dataStream; @Mock private org.apache.flink.configuration.Configuration flinkInternalConfig; - @Mock - private Configuration configuration; - @Before public void setup() { initMocks(this); @@ -51,7 +45,7 @@ public void shouldReturnOutputRowAsFeatureRowOnPassingStringAsValue() throws Exc inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "test_order_number"); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transfromationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transfromationArguments, columnNames, daggerContext); Row outputRow = featureWithTypeTransformer.map(inputRow); Assert.assertEquals(3, outputRow.getArity()); Assert.assertEquals(inputRow.getField(0), outputRow.getField(0)); @@ -78,7 +72,7 @@ public void shouldReturnOutputRowAsFeatureRowOnPassingFloatType() throws Excepti inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "1.4"); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); Row outputRow = featureWithTypeTransformer.map(inputRow); Assert.assertEquals(3, outputRow.getArity()); Assert.assertEquals(inputRow.getField(0), outputRow.getField(0)); @@ -106,7 +100,7 @@ public void shouldThrowExceptionForUnSupportedDataType() throws Exception { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "value".getBytes()); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); featureWithTypeTransformer.map(inputRow); } @@ -126,7 +120,7 @@ public void shouldThrowIfKeyColumnNameNotExist() throws Exception { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "value".getBytes()); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); featureWithTypeTransformer.map(inputRow); } @@ -146,7 +140,7 @@ public void shouldThrowIfValueColumnNameNotExist() throws Exception { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "value".getBytes()); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); featureWithTypeTransformer.map(inputRow); } @@ -166,7 +160,7 @@ public void shouldThrowIfVoutputColumnNameNotExist() throws Exception { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "value".getBytes()); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); featureWithTypeTransformer.map(inputRow); } @@ -186,10 +180,10 @@ public void shouldTransformInputStreamWithItsTransformer() { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "1.4"); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); featureWithTypeTransformer.transform(inputStreamInfo); - verify(dataStream, times(1)).map(any(FeatureWithTypeTransformer.class)); + verify(inputStream, times(1)).map(any(FeatureWithTypeTransformer.class)); } @Test @@ -208,8 +202,8 @@ public void shouldReturnSameColumnNames() { inputRow.setField(0, "test_customer_id"); inputRow.setField(1, "1.4"); inputRow.setField(2, "test_features"); - FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, configuration); - StreamInfo inputStreamInfo = new StreamInfo(dataStream, columnNames); + FeatureWithTypeTransformer featureWithTypeTransformer = new FeatureWithTypeTransformer(transformationArguments, columnNames, daggerContext); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); StreamInfo outputStreamInfo = featureWithTypeTransformer.transform(inputStreamInfo); Assert.assertArrayEquals(columnNames, outputStreamInfo.getColumnNames()); } diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java index aeb2bd30f..22295bb1e 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java @@ -1,10 +1,9 @@ package io.odpf.dagger.functions.transformers; -import org.apache.flink.streaming.api.datastream.DataStream; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.types.Row; import com.google.protobuf.Timestamp; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.exceptions.DescriptorNotFoundException; import io.odpf.dagger.consumer.TestBookingLogMessage; @@ -24,16 +23,10 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class HashTransformerTest { +public class HashTransformerTest extends DaggerContextTestBase { @Rule public ExpectedException thrown = ExpectedException.none(); - - @Mock - private DataStream inputStream; - - @Mock - private Configuration configuration; - + @Mock private org.apache.flink.configuration.Configuration flinkInternalConfig; @@ -64,7 +57,7 @@ public void shouldHashSingleStringFieldInInputRow() throws Exception { inputRow.setField(1, 1); inputRow.setField(2, false); - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); Row outputRow = hashTransformer.map(inputRow); @@ -93,7 +86,7 @@ public void shouldHashMultipleFieldsOfSupportedDatatypeInInputRow() throws Excep inputRow.setField(1, 1); inputRow.setField(2, false); - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); Row outputRow = hashTransformer.map(inputRow); @@ -125,7 +118,7 @@ public void shouldHashAllFieldsOfSupportedDataTypesInInputRow() throws Exception inputRow.setField(1, 1); inputRow.setField(2, 1L); - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); Row outputRow = hashTransformer.map(inputRow); @@ -170,7 +163,7 @@ public void shouldHashNestedFields() throws Exception { inputRow.setField(0, bookingLogRow); - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); Row outputRow = hashTransformer.map(inputRow); @@ -195,7 +188,7 @@ public void shouldCreateRowHasherMapInOpen() throws Exception { transformationArguments.put("valueColumnName", fieldsToEncrypt); String[] columnNames = {"order_number", "cancel_reason_id", "is_reblast"}; - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); verify(configuration, times(1)).getString("SINK_KAFKA_PROTO_MESSAGE", ""); @@ -215,7 +208,7 @@ public void shouldThrowErrorIfUnableToCreateRowHasherMap() throws Exception { transformationArguments.put("valueColumnName", fieldsToEncrypt); String[] columnNames = {"order_number", "cancel_reason_id", "is_reblast"}; - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); } @@ -234,7 +227,7 @@ public void shouldThrowErrorIfUnableToFindOpDescriptor() throws Exception { transformationArguments.put("valueColumnName", fieldsToEncrypt); String[] columnNames = {"order_number", "cancel_reason_id", "is_reblast"}; - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.open(flinkInternalConfig); } @@ -249,7 +242,7 @@ public void shouldTransformInputStreamToOutputStream() { transformationArguments.put("valueColumnName", fieldsToEncrypt); String[] columnNames = {"order_number", "cancel_reason_id", "is_reblast"}; - HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, configuration); + HashTransformer hashTransformer = new HashTransformer(transformationArguments, columnNames, daggerContext); hashTransformer.transform(new StreamInfo(inputStream, columnNames)); verify(inputStream, times(1)).map(hashTransformer); diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformerTest.java index 8954e47cb..22bc83e7b 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformerTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.OperatorMetricGroup; @@ -7,7 +8,6 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.consumer.TestBookingLogMessage; import org.junit.Assert; import org.junit.Before; @@ -26,7 +26,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class InvalidRecordFilterTransformerTest { +public class InvalidRecordFilterTransformerTest extends DaggerContextTestBase { @Mock private RuntimeContext runtimeContext; @@ -35,9 +35,6 @@ public class InvalidRecordFilterTransformerTest { @Mock private Counter counter; - @Mock - private Configuration configuration; - @Mock private org.apache.flink.configuration.Configuration flinkInternalConfig; @@ -85,7 +82,7 @@ private Row createDefaultValidRow(DynamicMessage defaultInstance) { public void shouldFilterBadRecords() throws Exception { InvalidRecordFilterTransformer filter = new InvalidRecordFilterTransformer(new HashMap() {{ put("table_name", "test"); - }}, getColumns(), configuration); + }}, getColumns(), daggerContext); filter.setRuntimeContext(runtimeContext); when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); when(metricGroup.addGroup("per_table", "test")).thenReturn(metricGroup); @@ -106,7 +103,7 @@ public void shouldFilterBadRecords() throws Exception { public void shouldPassValidRecords() throws Exception { InvalidRecordFilterTransformer filter = new InvalidRecordFilterTransformer(new HashMap() {{ put("table_name", "test"); - }}, getColumns(), configuration); + }}, getColumns(), daggerContext); filter.setRuntimeContext(runtimeContext); when(runtimeContext.getMetricGroup()).thenReturn(metricGroup); when(metricGroup.addGroup("per_table", "test")).thenReturn(metricGroup); diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java index c00ec5ca2..51f8bc770 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java @@ -1,22 +1,24 @@ package io.odpf.dagger.functions.transformers; +import io.odpf.dagger.common.core.DaggerContextTestBase; +import io.odpf.dagger.common.core.DaggerContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; @@ -28,16 +30,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -public class SQLTransformerTest { - - @Mock - private DataStream inputStream; - - @Mock - private StreamExecutionEnvironment streamExecutionEnvironment; - - @Mock - private StreamTableEnvironment streamTableEnvironment; +public class SQLTransformerTest extends DaggerContextTestBase { @Mock private Table table; @@ -57,13 +50,34 @@ public class SQLTransformerTest { @Mock private SingleOutputStreamOperator watermarkedStream; - @Mock - private Configuration configuration; + private Multiply multiply; @Before public void setup() { initMocks(this); + setMock(daggerContext); + when(daggerContext.getConfiguration()).thenReturn(configuration); when(inputStream.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getTableEnvironment()).thenReturn(streamTableEnvironment); + multiply = new Multiply(); + } + + @After + public void resetSingleton() throws Exception { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(null, null); + } + + private void setMock(DaggerContext mock) { + try { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(instance, mock); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Test @@ -89,6 +103,27 @@ public void shouldApplyThePassedSQL() { assertEquals(outputStream, outputStreamInfo.getDataStream()); } + @Test + public void shouldReturnColumnNamesReturnedBySQLAndUdf() { + HashMap transformationArguments = new HashMap<>(); + String sqlQuery = "SELECT order_number, service_type, multiply(1,100) product FROM data_stream"; + transformationArguments.put("sqlQuery", sqlQuery); + String[] columnNames = {"order_number", "service_type", "status", "product"}; + streamTableEnvironment.createTemporaryFunction("multiply",multiply); + + when(streamTableEnvironment.sqlQuery(sqlQuery)).thenReturn(table); + when(table.getSchema()).thenReturn(tableSchema); + String[] outputColumns = {"order_number", "service_type","product"}; + when(tableSchema.getFieldNames()).thenReturn(outputColumns); + when(streamTableEnvironment.toRetractStream(table, Row.class)).thenReturn(retractStream); + when(retractStream.filter(any())).thenReturn(filteredRetractStream); + when(filteredRetractStream.map(any())).thenReturn(outputStream); + SQLTransformer sqlTransformer = new SQLTransformerStub(transformationArguments, columnNames); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); + StreamInfo outputStreamInfo = sqlTransformer.transform(inputStreamInfo); + + assertArrayEquals(outputColumns, outputStreamInfo.getColumnNames()); + } @Test public void shouldReturnColumnNamesReturnedBySQL() { HashMap transformationArguments = new HashMap<>(); @@ -195,12 +230,13 @@ public void shouldThrowExceptionIfSqlNotProvided() { class SQLTransformerStub extends SQLTransformer { SQLTransformerStub(Map transformationArguments, String[] columnNames) { - super(transformationArguments, columnNames, configuration); + super(transformationArguments, columnNames, daggerContext); } + } - @Override - protected StreamTableEnvironment getStreamTableEnvironment(StreamExecutionEnvironment executionEnvironment) { - return streamTableEnvironment; + class Multiply extends ScalarFunction{ + public Integer eval(Integer i1, Integer i2){ + return i1 * i2; } } diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/EsExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/EsExternalPostProcessorIntegrationTest.java index 1b11bbe79..b8d3db9f8 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/EsExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/EsExternalPostProcessorIntegrationTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.integrationtest; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -39,7 +40,7 @@ import static org.junit.Assert.assertTrue; -public class EsExternalPostProcessorIntegrationTest { +public class EsExternalPostProcessorIntegrationTest extends DaggerContextTestBase { private static final Logger LOGGER = LoggerFactory.getLogger(EsExternalPostProcessorIntegrationTest.class.getName()); @@ -319,7 +320,7 @@ private RestClient getESClient() { } private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); StreamInfo postProcessedStream = streamInfo; for (PostProcessor postProcessor : postProcessors) { postProcessedStream = postProcessor.process(postProcessedStream); diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/GrpcExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/GrpcExternalPostProcessorIntegrationTest.java index adf04470a..52dbb09c2 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/GrpcExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/GrpcExternalPostProcessorIntegrationTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.integrationtest; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -37,7 +38,7 @@ import static org.junit.Assert.assertTrue; -public class GrpcExternalPostProcessorIntegrationTest { +public class GrpcExternalPostProcessorIntegrationTest extends DaggerContextTestBase { private StencilClientOrchestrator stencilClientOrchestrator; private MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter(); @@ -321,7 +322,7 @@ public synchronized void invoke(Row inputRow, Context context) { } private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); StreamInfo postProcessedStream = streamInfo; for (PostProcessor postProcessor : postProcessors) { postProcessedStream = postProcessor.process(postProcessedStream); diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java index fa2574cbb..21d3338dd 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java @@ -1,7 +1,8 @@ package io.odpf.dagger.integrationtest; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +//import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -33,7 +34,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class HttpExternalPostProcessorIntegrationTest { +public class HttpExternalPostProcessorIntegrationTest extends DaggerContextTestBase { @Rule public WireMockRule wireMockRule = new WireMockRule(8089); @@ -408,7 +409,7 @@ public synchronized void invoke(Row inputRow, Context context) { } private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); StreamInfo postProcessedStream = streamInfo; for (PostProcessor postProcessor : postProcessors) { postProcessedStream = postProcessor.process(postProcessedStream); diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java index f16eb36a4..0d4bb6e87 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java @@ -1,10 +1,13 @@ package io.odpf.dagger.integrationtest; +import io.odpf.dagger.common.core.DaggerContext; +import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; @@ -18,13 +21,11 @@ import io.vertx.pgclient.PgPool; import io.vertx.sqlclient.PoolOptions; import org.apache.commons.lang3.StringUtils; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; @@ -38,6 +39,9 @@ import static io.vertx.pgclient.PgPool.pool; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; @Ignore public class PostGresExternalPostProcessorIntegrationTest { @@ -56,6 +60,15 @@ public class PostGresExternalPostProcessorIntegrationTest { private StencilClientOrchestrator stencilClientOrchestrator; private final MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter(); + private static DaggerContext daggerContext; + + private static StreamExecutionEnvironment streamExecutionEnvironment; + + private static StreamTableEnvironment streamTableEnvironment; + + private static Configuration configuration; + + @BeforeClass public static void setUp() { host = System.getenv("PG_HOST"); @@ -63,11 +76,20 @@ public static void setUp() { host = "localhost"; } - try { + String streams = "[{\"SOURCE_KAFKA_TOPIC_NAMES\":\"dummy-topic\",\"INPUT_SCHEMA_TABLE\":\"testbooking\",\"INPUT_SCHEMA_PROTO_CLASS\":\"io.odpf.dagger.consumer.TestBookingLogMessage\",\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"41\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"localhost:6668\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"test-consumer\",\"SOURCE_KAFKA_NAME\":\"localkafka\"}]"; + configurationMap.put(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, "true"); + configurationMap.put(INPUT_STREAMS, streams); + configuration = new Configuration(ParameterTool.fromMap(configurationMap)); - String streams = "[{\"SOURCE_KAFKA_TOPIC_NAMES\":\"dummy-topic\",\"INPUT_SCHEMA_TABLE\":\"testbooking\",\"INPUT_SCHEMA_PROTO_CLASS\":\"io.odpf.dagger.consumer.TestBookingLogMessage\",\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"41\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"localhost:6668\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"test-consumer\",\"SOURCE_KAFKA_NAME\":\"localkafka\"}]"; - configurationMap.put(PROCESSOR_POSTPROCESSOR_ENABLE_KEY, "true"); - configurationMap.put(INPUT_STREAMS, streams); + daggerContext = mock(DaggerContext.class); + streamExecutionEnvironment = mock(StreamExecutionEnvironment.class); + streamTableEnvironment = mock(StreamTableEnvironment.class); + setMock(daggerContext); + when(daggerContext.getConfiguration()).thenReturn(configuration); + when(daggerContext.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + when(daggerContext.getTableEnvironment()).thenReturn(streamTableEnvironment); + + try { pgClient = getPGClient(); @@ -98,6 +120,23 @@ public static void setUp() { } } + @AfterClass + public static void resetSingleton() throws Exception { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(null, null); + } + + private static void setMock(DaggerContext mock) { + try { + Field instance = DaggerContext.class.getDeclaredField("daggerContext"); + instance.setAccessible(true); + instance.set(instance, mock); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private static PgPool getPGClient() { String dbHost = System.getenv("PG_HOST"); @@ -316,7 +355,7 @@ public void shouldPopulateFieldFromPostgresOnSuccessResponseWithAllThreeSourcesI private StreamInfo addPostProcessor(StreamInfo streamInfo) { - List postProcessors = PostProcessorFactory.getPostProcessors(new Configuration(ParameterTool.fromMap(configurationMap)), stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); + List postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter); StreamInfo postProcessedStream = streamInfo; for (PostProcessor postProcessor : postProcessors) { postProcessedStream = postProcessor.process(postProcessedStream); From 6a7afa2b6a597ad1e329d5c305ca83a40bf4cbd8 Mon Sep 17 00:00:00 2001 From: Shreyansh Date: Wed, 26 Oct 2022 12:38:06 +0530 Subject: [PATCH 2/5] fix: Changes to accommodate JavadocStyle --- .../java/io/odpf/dagger/common/core/DaggerContext.java | 10 ++++++---- .../common/exceptions/DaggerContextException.java | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java b/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java index a8685cbfc..3aee9c749 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/core/DaggerContext.java @@ -9,7 +9,8 @@ import org.slf4j.LoggerFactory; /** - * The DaggerContext singleton object initializes with StreamExecutionEnvironment, StreamTableEnvironment and Configuration + * The DaggerContext singleton object. + * It initializes with StreamExecutionEnvironment, StreamTableEnvironment and Configuration. */ public class DaggerContext { private static final Logger LOGGER = LoggerFactory.getLogger(DaggerContext.class.getName()); @@ -31,7 +32,7 @@ private DaggerContext(Configuration configuration) { } /** - * Get the instance of DaggerContext + * Get the instance of DaggerContext. */ public static DaggerContext getInstance() { if (daggerContext == null) { @@ -42,9 +43,10 @@ public static DaggerContext getInstance() { /** * Initialization of a new DaggerContext. - * @param configuration the Configuration + * + * @param configuration the Configuration */ - public synchronized static DaggerContext init(Configuration configuration) { + public static synchronized DaggerContext init(Configuration configuration) { if (daggerContext != null) { throw new DaggerContextException("DaggerContext object is already initialized"); } diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java b/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java index 13d7e0f1b..b0e79da2a 100644 --- a/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java +++ b/dagger-common/src/main/java/io/odpf/dagger/common/exceptions/DaggerContextException.java @@ -3,7 +3,7 @@ /** * The class Exception if there is something wrong with Dagger context object. */ -public class DaggerContextException extends RuntimeException{ +public class DaggerContextException extends RuntimeException { /** * Instantiates a new Dagger context exception with specified error message. From 7b6c80ed889815d974ed0d1c48077133e93f7366 Mon Sep 17 00:00:00 2001 From: Shreyansh Date: Wed, 26 Oct 2022 12:51:00 +0530 Subject: [PATCH 3/5] fix: Changes to accommodate JavadocStyle --- .../main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java | 2 +- .../src/main/java/io/odpf/dagger/core/StreamManager.java | 1 - .../odpf/dagger/core/processors/PreProcessorOrchestrator.java | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java b/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java index add69b254..0001b1326 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/KafkaProtoSQLProcessor.java @@ -37,4 +37,4 @@ public static void main(String[] args) throws ProgramInvocationException { throw new ProgramInvocationException(e); } } -} \ No newline at end of file +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java index baeb8a328..72df6b6ce 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java @@ -12,7 +12,6 @@ import io.odpf.dagger.core.exception.UDFFactoryClassNotDefinedException; import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter; import io.odpf.dagger.core.processors.PostProcessorFactory; -import io.odpf.dagger.core.processors.PreProcessorConfig; import io.odpf.dagger.core.processors.PreProcessorFactory; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.types.PostProcessor; diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java index ed7055c6f..4adb63f56 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/PreProcessorOrchestrator.java @@ -27,7 +27,7 @@ public class PreProcessorOrchestrator implements Preprocessor { private final PreProcessorConfig processorConfig; private final String tableName; private final DaggerContext daggerContext; - private final Gson GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); + private static final Gson GSON = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); /** * Instantiates a new Preprocessor orchestrator. From f072f463a21dd5061331a9d25730353ce5bc02eb Mon Sep 17 00:00:00 2001 From: Shreyansh Date: Wed, 26 Oct 2022 13:04:40 +0530 Subject: [PATCH 4/5] fix: Changes to accommodate JavadocStyle --- .../odpf/dagger/core/processors/ParentPostProcessorTest.java | 4 ++-- .../odpf/dagger/core/processors/PreProcessorFactoryTest.java | 5 ----- .../dagger/core/processors/PreProcessorOrchestratorTest.java | 1 - .../dagger/core/processors/transformers/MockTransformer.java | 1 - .../HttpExternalPostProcessorIntegrationTest.java | 2 +- 5 files changed, 3 insertions(+), 10 deletions(-) diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java index 02769f6b7..2109f3ce0 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java @@ -15,7 +15,7 @@ public class ParentPostProcessorTest { @Test public void shouldNotBeAbleToProcessWhenConfigIsNull() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor( null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, telemetrySubscriber); assertFalse(parentPostProcessor.canProcess(null)); } @@ -29,7 +29,7 @@ public void shouldNotBeAbleToProcessWhenConfigIsEmpty() { @Test public void shouldBeAbleToProcessWhenConfigIsNotEmpty() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor( null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, telemetrySubscriber); PostProcessorConfig mockConfig = mock(PostProcessorConfig.class); when(mockConfig.isEmpty()).thenReturn(false); assertTrue(parentPostProcessor.canProcess(mockConfig)); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java index cf3a98872..45a92edd9 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorFactoryTest.java @@ -1,7 +1,5 @@ package io.odpf.dagger.core.processors; -import com.jayway.jsonpath.InvalidJsonException; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.DaggerContextTestBase; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.types.Preprocessor; @@ -15,9 +13,6 @@ import static io.odpf.dagger.core.utils.Constants.PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT; import static io.odpf.dagger.core.utils.Constants.PROCESSOR_PREPROCESSOR_ENABLE_KEY; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java index 04272e764..eb965628a 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java @@ -5,7 +5,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.core.processors.transformers.TableTransformConfig; diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java index 2c51cb376..0b5332f69 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/MockTransformer.java @@ -6,7 +6,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java index 21d3338dd..429978ad0 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/HttpExternalPostProcessorIntegrationTest.java @@ -2,7 +2,7 @@ import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.java.utils.ParameterTool; -//import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; From 530bae74ee7b0954d2ec18272bc2c8bf694359d0 Mon Sep 17 00:00:00 2001 From: Shreyansh Date: Thu, 27 Oct 2022 08:09:50 +0530 Subject: [PATCH 5/5] fix: Changes to accommodate JavadocStyle --- .../core/processors/ParentPostProcessorTest.java | 15 +++++++++++---- .../processors/PreProcessorOrchestratorTest.java | 15 +++++++++++++-- .../transformers/TransformProcessorTest.java | 2 +- .../transformers/ClearColumnTransformer.java | 1 - .../transformers/DeDuplicationTransformer.java | 1 - .../transformers/FeatureTransformer.java | 1 - .../transformers/FeatureWithTypeTransformer.java | 1 - .../functions/transformers/HashTransformer.java | 1 - .../InvalidRecordFilterTransformer.java | 1 - .../DeDuplicationTransformerTest.java | 2 +- .../transformers/FeatureTransformerTest.java | 1 - .../transformers/HashTransformerTest.java | 1 - .../transformers/SQLTransformerTest.java | 9 +++++---- ...tGresExternalPostProcessorIntegrationTest.java | 3 --- 14 files changed, 31 insertions(+), 23 deletions(-) diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java index 2109f3ce0..0125fe382 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/ParentPostProcessorTest.java @@ -1,6 +1,8 @@ package io.odpf.dagger.core.processors; +import io.odpf.dagger.common.core.DaggerContextTestBase; import io.odpf.dagger.core.metrics.telemetry.TelemetrySubscriber; +import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -8,20 +10,25 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ParentPostProcessorTest { +public class ParentPostProcessorTest extends DaggerContextTestBase { @Mock private TelemetrySubscriber telemetrySubscriber; + @Before + public void init() { + configuration = null; + } + @Test public void shouldNotBeAbleToProcessWhenConfigIsNull() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor(daggerContext, null, telemetrySubscriber); assertFalse(parentPostProcessor.canProcess(null)); } @Test public void shouldNotBeAbleToProcessWhenConfigIsEmpty() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor(daggerContext, null, telemetrySubscriber); PostProcessorConfig mockConfig = mock(PostProcessorConfig.class); when(mockConfig.isEmpty()).thenReturn(true); assertFalse(parentPostProcessor.canProcess(mockConfig)); @@ -29,7 +36,7 @@ public void shouldNotBeAbleToProcessWhenConfigIsEmpty() { @Test public void shouldBeAbleToProcessWhenConfigIsNotEmpty() { - ParentPostProcessor parentPostProcessor = new ParentPostProcessor(null, null, telemetrySubscriber); + ParentPostProcessor parentPostProcessor = new ParentPostProcessor(daggerContext, null, telemetrySubscriber); PostProcessorConfig mockConfig = mock(PostProcessorConfig.class); when(mockConfig.isEmpty()).thenReturn(false); assertTrue(parentPostProcessor.canProcess(mockConfig)); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java index eb965628a..635bd84b5 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PreProcessorOrchestratorTest.java @@ -2,6 +2,7 @@ import com.jayway.jsonpath.InvalidJsonException; import io.odpf.dagger.common.core.DaggerContextTestBase; +import io.odpf.dagger.core.utils.Constants; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; @@ -64,6 +65,14 @@ public void setup() { + " }\n" + " ]\n" + "}"; + private String preProcessorFilterConfigJson = "{\n" + + " \"table_transformers\": [{\n" + + " \"table_name\": \"test\",\n" + + " \"transformers\": [{\n" + + " \"transformation_class\": \"InvalidRecordFilterTransformer\"\n" + + " }]\n" + + " }]\n" + + "}"; @Test public void shouldGetProcessors() { @@ -72,6 +81,8 @@ public void shouldGetProcessors() { transformConfigs.add(new TransformConfig("InvalidRecordFilterTransformer", new HashMap<>())); TableTransformConfig ttc = new TableTransformConfig("test", transformConfigs); config.tableTransformers = Collections.singletonList(ttc); + when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn(preProcessorFilterConfigJson); + when(configuration.getBoolean(Constants.PROCESSOR_PREPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); Mockito.when(streamInfo.getColumnNames()).thenReturn(new String[0]); Mockito.when(streamInfo.getDataStream()).thenReturn(stream); @@ -117,8 +128,8 @@ public void shouldParseConfig() { public void shouldThrowExceptionForInvalidJson() { when(configuration.getBoolean(PROCESSOR_PREPROCESSOR_ENABLE_KEY, PROCESSOR_PREPROCESSOR_ENABLE_DEFAULT)).thenReturn(true); when(configuration.getString(PROCESSOR_PREPROCESSOR_CONFIG_KEY, "")).thenReturn("blah"); - PreProcessorOrchestrator ppo = new PreProcessorOrchestrator(daggerContext, exporter, "test"); - InvalidJsonException exception = assertThrows(InvalidJsonException.class, () -> ppo.parseConfig(configuration)); + InvalidJsonException exception = assertThrows(InvalidJsonException.class, + () -> new PreProcessorOrchestrator(daggerContext, exporter, "test")); assertEquals("Invalid JSON Given for PROCESSOR_PREPROCESSOR_CONFIG", exception.getMessage()); } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java index e36db10bc..c18c1fb94 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/transformers/TransformProcessorTest.java @@ -74,7 +74,7 @@ public void shouldThrowExceptionInCaseOfWrongConstructorTypeSupported() { TransformProcessor transformProcessor = new TransformProcessor(transfromConfigs, daggerContext); RuntimeException exception = assertThrows(RuntimeException.class, () -> transformProcessor.process(streamInfo)); - assertEquals("io.odpf.dagger.core.processors.transformers.TransformProcessor.(java.util.Map, [Ljava.lang.String;, io.odpf.dagger.common.configuration.Configuration)", exception.getMessage()); + assertEquals("io.odpf.dagger.core.processors.transformers.TransformProcessor.(java.util.Map, [Ljava.lang.String;, io.odpf.dagger.common.core.DaggerContext)", exception.getMessage()); } @Test diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java index e6985a488..e37446eca 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/ClearColumnTransformer.java @@ -7,7 +7,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java index d667c0b8f..611b28434 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformer.java @@ -11,7 +11,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java index dba8714e1..bba768a45 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureTransformer.java @@ -6,7 +6,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.functions.udfs.aggregate.feast.FeatureUtils; diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java index afbbabca5..4a0a5f789 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/FeatureWithTypeTransformer.java @@ -6,7 +6,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.functions.transformers.feature.FeatureWithTypeHandler; diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java index 68613e43f..c4b7e333c 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java @@ -7,7 +7,6 @@ import org.apache.flink.types.Row; import com.google.protobuf.Descriptors; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StencilClientOrchestrator; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java index 81cbd0890..5fbf1434f 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/InvalidRecordFilterTransformer.java @@ -5,7 +5,6 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.types.Row; -import io.odpf.dagger.common.configuration.Configuration; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.common.metrics.managers.CounterStatsManager; diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java index 37856190d..17bedef79 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/DeDuplicationTransformerTest.java @@ -25,7 +25,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -@RunWith(MockitoJUnitRunner.class) +@RunWith(MockitoJUnitRunner.Silent.class) public class DeDuplicationTransformerTest extends DaggerContextTestBase { @Mock diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java index 0a8da869a..fbde1dc32 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/FeatureTransformerTest.java @@ -16,7 +16,6 @@ import static org.mockito.MockitoAnnotations.initMocks; public class FeatureTransformerTest extends DaggerContextTestBase { - @Before public void setup() { initMocks(this); diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java index 22295bb1e..8d8de8459 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/HashTransformerTest.java @@ -26,7 +26,6 @@ public class HashTransformerTest extends DaggerContextTestBase { @Rule public ExpectedException thrown = ExpectedException.none(); - @Mock private org.apache.flink.configuration.Configuration flinkInternalConfig; diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java index 51f8bc770..8330fa7c8 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java @@ -109,11 +109,11 @@ public void shouldReturnColumnNamesReturnedBySQLAndUdf() { String sqlQuery = "SELECT order_number, service_type, multiply(1,100) product FROM data_stream"; transformationArguments.put("sqlQuery", sqlQuery); String[] columnNames = {"order_number", "service_type", "status", "product"}; - streamTableEnvironment.createTemporaryFunction("multiply",multiply); + streamTableEnvironment.createTemporaryFunction("multiply", multiply); when(streamTableEnvironment.sqlQuery(sqlQuery)).thenReturn(table); when(table.getSchema()).thenReturn(tableSchema); - String[] outputColumns = {"order_number", "service_type","product"}; + String[] outputColumns = {"order_number", "service_type", "product"}; when(tableSchema.getFieldNames()).thenReturn(outputColumns); when(streamTableEnvironment.toRetractStream(table, Row.class)).thenReturn(retractStream); when(retractStream.filter(any())).thenReturn(filteredRetractStream); @@ -124,6 +124,7 @@ public void shouldReturnColumnNamesReturnedBySQLAndUdf() { assertArrayEquals(outputColumns, outputStreamInfo.getColumnNames()); } + @Test public void shouldReturnColumnNamesReturnedBySQL() { HashMap transformationArguments = new HashMap<>(); @@ -234,8 +235,8 @@ class SQLTransformerStub extends SQLTransformer { } } - class Multiply extends ScalarFunction{ - public Integer eval(Integer i1, Integer i2){ + class Multiply extends ScalarFunction { + public Integer eval(Integer i1, Integer i2) { return i1 * i2; } } diff --git a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java index 0d4bb6e87..cc4918a29 100644 --- a/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java +++ b/dagger-tests/src/integrationtest/java/io/odpf/dagger/integrationtest/PostGresExternalPostProcessorIntegrationTest.java @@ -1,7 +1,6 @@ package io.odpf.dagger.integrationtest; import io.odpf.dagger.common.core.DaggerContext; -import io.odpf.dagger.common.core.DaggerContextTestBase; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -41,8 +40,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - @Ignore public class PostGresExternalPostProcessorIntegrationTest {