Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.odpf.dagger.common.core;

import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.common.exceptions.DaggerContextException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The DaggerContext singleton object.
* It initializes with StreamExecutionEnvironment, StreamTableEnvironment and Configuration.
*/
public class DaggerContext {
private static final Logger LOGGER = LoggerFactory.getLogger(DaggerContext.class.getName());
private static volatile DaggerContext daggerContext = null;
private final StreamExecutionEnvironment executionEnvironment;
private final StreamTableEnvironment tableEnvironment;
private final Configuration configuration;

/**
* Instantiates a new DaggerContext.
*
* @param configuration the Configuration
*/
private DaggerContext(Configuration configuration) {
this.executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
tableEnvironment = StreamTableEnvironment.create(executionEnvironment, environmentSettings);
this.configuration = configuration;
}

/**
* Get the instance of DaggerContext.
*/
public static DaggerContext getInstance() {
if (daggerContext == null) {
throw new DaggerContextException("DaggerContext object is not initialized");
}
return daggerContext;
}

/**
* Initialization of a new DaggerContext.
*
* @param configuration the Configuration
*/
public static synchronized DaggerContext init(Configuration configuration) {
if (daggerContext != null) {
throw new DaggerContextException("DaggerContext object is already initialized");
}
daggerContext = new DaggerContext(configuration);
LOGGER.info("DaggerContext is initialized");
return daggerContext;
}

public StreamExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}

public StreamTableEnvironment getTableEnvironment() {
return tableEnvironment;
}

public Configuration getConfiguration() {
return configuration;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.odpf.dagger.common.exceptions;

/**
* The class Exception if there is something wrong with Dagger context object.
*/
public class DaggerContextException extends RuntimeException {

/**
* Instantiates a new Dagger context exception with specified error message.
*
* @param message the message
*/
public DaggerContextException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.odpf.dagger.common.core;

import io.odpf.dagger.common.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.After;
import org.junit.Before;

import java.lang.reflect.Field;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

public abstract class DaggerContextTestBase {

protected DaggerContext daggerContext;

protected DataStream<Row> inputStream;

protected StreamExecutionEnvironment streamExecutionEnvironment;

protected StreamTableEnvironment streamTableEnvironment;

protected Configuration configuration;

@Before
public void setupBase() {
daggerContext = mock(DaggerContext.class);
configuration = mock(Configuration.class);
inputStream = mock(DataStream.class);
streamExecutionEnvironment = mock(StreamExecutionEnvironment.class);
streamTableEnvironment = mock(StreamTableEnvironment.class);
initMocks(this);
setMock(daggerContext);
when(daggerContext.getConfiguration()).thenReturn(configuration);
when(inputStream.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment);
when(daggerContext.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment);
when(daggerContext.getTableEnvironment()).thenReturn(streamTableEnvironment);
}

@After
public void resetSingletonBase() throws Exception {
Field instance = DaggerContext.class.getDeclaredField("daggerContext");
instance.setAccessible(true);
instance.set(null, null);
}

private void setMock(DaggerContext mock) {
try {
Field instance = DaggerContext.class.getDeclaredField("daggerContext");
instance.setAccessible(true);
instance.set(instance, mock);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import io.odpf.dagger.core.config.ConfigurationProvider;
import io.odpf.dagger.core.config.ConfigurationProviderFactory;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import io.odpf.dagger.common.core.DaggerContext;

import java.util.TimeZone;

Expand All @@ -26,11 +24,8 @@ public static void main(String[] args) throws ProgramInvocationException {
ConfigurationProvider provider = new ConfigurationProviderFactory(args).provider();
Configuration configuration = provider.get();
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, environmentSettings);

StreamManager streamManager = new StreamManager(configuration, executionEnvironment, tableEnvironment);
DaggerContext daggerContext = DaggerContext.init(configuration);
StreamManager streamManager = new StreamManager(daggerContext);
streamManager
.registerConfigs()
.registerSourceWithPreProcessors()
Expand Down
26 changes: 13 additions & 13 deletions dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.odpf.dagger.core;

import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.common.core.DaggerContext;
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.core.StreamInfo;
import io.odpf.dagger.common.udfs.UdfFactory;
Expand All @@ -11,7 +12,6 @@
import io.odpf.dagger.core.exception.UDFFactoryClassNotDefinedException;
import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.processors.PostProcessorFactory;
import io.odpf.dagger.core.processors.PreProcessorConfig;
import io.odpf.dagger.core.processors.PreProcessorFactory;
import io.odpf.dagger.core.processors.telemetry.processor.MetricsTelemetryExporter;
import io.odpf.dagger.core.processors.types.PostProcessor;
Expand Down Expand Up @@ -53,17 +53,18 @@ public class StreamManager {
private StencilClientOrchestrator stencilClientOrchestrator;
private DaggerStatsDReporter daggerStatsDReporter;

private final DaggerContext daggerContext;

/**
* Instantiates a new Stream manager.
*
* @param configuration the configuration in form of param
* @param executionEnvironment the execution environment
* @param tableEnvironment the table environment
* @param daggerContext the daggerContext in form of param
*/
public StreamManager(Configuration configuration, StreamExecutionEnvironment executionEnvironment, StreamTableEnvironment tableEnvironment) {
this.configuration = configuration;
this.executionEnvironment = executionEnvironment;
this.tableEnvironment = tableEnvironment;
public StreamManager(DaggerContext daggerContext) {
this.daggerContext = daggerContext;
this.configuration = daggerContext.getConfiguration();
this.executionEnvironment = daggerContext.getExecutionEnvironment();
this.tableEnvironment = daggerContext.getTableEnvironment();
}

/**
Expand Down Expand Up @@ -99,7 +100,6 @@ public StreamManager registerConfigs() {
public StreamManager registerSourceWithPreProcessors() {
long watermarkDelay = configuration.getLong(FLINK_WATERMARK_DELAY_MS_KEY, FLINK_WATERMARK_DELAY_MS_DEFAULT);
Boolean enablePerPartitionWatermark = configuration.getBoolean(FLINK_WATERMARK_PER_PARTITION_ENABLE_KEY, FLINK_WATERMARK_PER_PARTITION_ENABLE_DEFAULT);
PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration);
StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter)
.forEach(stream -> {
String tableName = stream.getStreamName();
Expand All @@ -112,7 +112,7 @@ public StreamManager registerSourceWithPreProcessors() {

TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType());
StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames());
streamInfo = addPreProcessor(streamInfo, tableName, preProcessorConfig);
streamInfo = addPreProcessor(streamInfo, tableName);

Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo));
tableEnvironment.createTemporaryView(tableName, table);
Expand Down Expand Up @@ -211,15 +211,15 @@ protected StreamInfo createStreamInfo(Table table) {
}

private StreamInfo addPostProcessor(StreamInfo streamInfo) {
List<PostProcessor> postProcessors = PostProcessorFactory.getPostProcessors(configuration, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter);
List<PostProcessor> postProcessors = PostProcessorFactory.getPostProcessors(daggerContext, stencilClientOrchestrator, streamInfo.getColumnNames(), telemetryExporter);
for (PostProcessor postProcessor : postProcessors) {
streamInfo = postProcessor.process(streamInfo);
}
return streamInfo;
}

private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName, PreProcessorConfig preProcessorConfig) {
List<Preprocessor> preProcessors = PreProcessorFactory.getPreProcessors(configuration, preProcessorConfig, tableName, telemetryExporter);
private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName) {
List<Preprocessor> preProcessors = PreProcessorFactory.getPreProcessors(daggerContext, tableName, telemetryExporter);
for (Preprocessor preprocessor : preProcessors) {
streamInfo = preprocessor.process(streamInfo);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.odpf.dagger.core.processors;

import io.odpf.dagger.common.core.DaggerContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;

Expand All @@ -26,23 +27,29 @@
*/
public class ParentPostProcessor implements PostProcessor {
private final PostProcessorConfig postProcessorConfig;
private Configuration configuration;

private final StencilClientOrchestrator stencilClientOrchestrator;
private TelemetrySubscriber telemetrySubscriber;

private final DaggerContext daggerContext;

/**
* Instantiates a new Parent post processor.
*
* @param postProcessorConfig the post processor config
* @param configuration the configuration
* @param daggerContext the daggerContext
* @param stencilClientOrchestrator the stencil client orchestrator
* @param telemetrySubscriber the telemetry subscriber
*/
public ParentPostProcessor(PostProcessorConfig postProcessorConfig, Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, TelemetrySubscriber telemetrySubscriber) {
this.postProcessorConfig = postProcessorConfig;
this.configuration = configuration;
public ParentPostProcessor(DaggerContext daggerContext, StencilClientOrchestrator stencilClientOrchestrator, TelemetrySubscriber telemetrySubscriber) {
this.daggerContext = daggerContext;
this.stencilClientOrchestrator = stencilClientOrchestrator;
this.telemetrySubscriber = telemetrySubscriber;
this.postProcessorConfig = parsePostProcessorConfig(daggerContext.getConfiguration());
}

private static PostProcessorConfig parsePostProcessorConfig(Configuration configuration) {
String postProcessorConfigString = configuration.getString(Constants.PROCESSOR_POSTPROCESSOR_CONFIG_KEY, "");
return PostProcessorConfig.parse(postProcessorConfigString);
}

@Override
Expand All @@ -56,7 +63,7 @@ public StreamInfo process(StreamInfo streamInfo) {
InitializationDecorator initializationDecorator = new InitializationDecorator(columnNameManager);
resultStream = initializationDecorator.decorate(resultStream);
streamInfo = new StreamInfo(resultStream, streamInfo.getColumnNames());
SchemaConfig schemaConfig = new SchemaConfig(configuration, stencilClientOrchestrator, columnNameManager);
SchemaConfig schemaConfig = new SchemaConfig(daggerContext.getConfiguration(), stencilClientOrchestrator, columnNameManager);

List<PostProcessor> enabledPostProcessors = getEnabledPostProcessors(telemetrySubscriber, schemaConfig);
for (PostProcessor postProcessor : enabledPostProcessors) {
Expand All @@ -66,7 +73,7 @@ public StreamInfo process(StreamInfo streamInfo) {
FetchOutputDecorator fetchOutputDecorator = new FetchOutputDecorator(schemaConfig, postProcessorConfig.hasSQLTransformer());
resultStream = fetchOutputDecorator.decorate(streamInfo.getDataStream());
StreamInfo resultantStreamInfo = new StreamInfo(resultStream, columnNameManager.getOutputColumnNames());
TransformProcessor transformProcessor = new TransformProcessor(postProcessorConfig.getTransformers(), configuration);
TransformProcessor transformProcessor = new TransformProcessor(postProcessorConfig.getTransformers(), daggerContext);
if (transformProcessor.canProcess(postProcessorConfig)) {
transformProcessor.notifySubscriber(telemetrySubscriber);
resultantStreamInfo = transformProcessor.process(resultantStreamInfo);
Expand All @@ -80,11 +87,11 @@ public boolean canProcess(PostProcessorConfig config) {
}

private List<PostProcessor> getEnabledPostProcessors(TelemetrySubscriber subscriber, SchemaConfig schemaConfig) {
if (!configuration.getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) {
if (!daggerContext.getConfiguration().getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) {
return new ArrayList<>();
}

ExternalMetricConfig externalMetricConfig = getExternalMetricConfig(configuration, subscriber);
ExternalMetricConfig externalMetricConfig = getExternalMetricConfig(daggerContext.getConfiguration(), subscriber);
ArrayList<PostProcessor> processors = new ArrayList<>();
processors.add(new ExternalPostProcessor(schemaConfig, postProcessorConfig.getExternalSource(), externalMetricConfig));
processors.add(new InternalPostProcessor(postProcessorConfig, schemaConfig));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.odpf.dagger.core.processors;

import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.common.core.DaggerContext;
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.core.processors.longbow.LongbowFactory;
import io.odpf.dagger.core.processors.longbow.LongbowSchema;
Expand All @@ -22,22 +23,22 @@ public class PostProcessorFactory {
/**
* Gets post processors.
*
* @param configuration the configuration
* @param daggerContext the daggerContext
* @param stencilClientOrchestrator the stencil client orchestrator
* @param columnNames the column names
* @param metricsTelemetryExporter the metrics telemetry exporter
* @return the post processors
*/
public static List<PostProcessor> getPostProcessors(Configuration configuration, StencilClientOrchestrator stencilClientOrchestrator, String[] columnNames, MetricsTelemetryExporter metricsTelemetryExporter) {
public static List<PostProcessor> getPostProcessors(DaggerContext daggerContext, StencilClientOrchestrator stencilClientOrchestrator, String[] columnNames, MetricsTelemetryExporter metricsTelemetryExporter) {
List<PostProcessor> postProcessors = new ArrayList<>();

if (Arrays.stream(columnNames).anyMatch(s -> Pattern.compile(".*\\blongbow.*key\\b.*").matcher(s).find())) {
postProcessors.add(getLongBowProcessor(columnNames, configuration, metricsTelemetryExporter, stencilClientOrchestrator));
postProcessors.add(getLongBowProcessor(columnNames, daggerContext.getConfiguration(), metricsTelemetryExporter, stencilClientOrchestrator));
}
if (configuration.getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) {
postProcessors.add(new ParentPostProcessor(parsePostProcessorConfig(configuration), configuration, stencilClientOrchestrator, metricsTelemetryExporter));
if (daggerContext.getConfiguration().getBoolean(Constants.PROCESSOR_POSTPROCESSOR_ENABLE_KEY, Constants.PROCESSOR_POSTPROCESSOR_ENABLE_DEFAULT)) {
postProcessors.add(new ParentPostProcessor(daggerContext, stencilClientOrchestrator, metricsTelemetryExporter));
}
if (configuration.getBoolean(Constants.METRIC_TELEMETRY_ENABLE_KEY, Constants.METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT)) {
if (daggerContext.getConfiguration().getBoolean(Constants.METRIC_TELEMETRY_ENABLE_KEY, Constants.METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT)) {
postProcessors.add(new TelemetryProcessor(metricsTelemetryExporter));
}
return postProcessors;
Expand All @@ -49,9 +50,4 @@ private static PostProcessor getLongBowProcessor(String[] columnNames, Configura

return longbowFactory.getLongbowProcessor();
}

private static PostProcessorConfig parsePostProcessorConfig(Configuration configuration) {
String postProcessorConfigString = configuration.getString(Constants.PROCESSOR_POSTPROCESSOR_CONFIG_KEY, "");
return PostProcessorConfig.parse(postProcessorConfigString);
}
}
Loading