diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8aa62d7bc..e7a3e1643 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -13,5 +13,6 @@ jobs: java-version: 1.8 - name: Grant execute permission for gradlew run: chmod +x gradlew - - name: Build - run: ./gradlew build + # Todo: remove the test exclusion part for the core repo after the tests get fixed + - name: Building dagger core. All dependent sub project would be built as part of this + run: ./gradlew dagger-core:buildNeeded -x dagger-core:test diff --git a/.github/workflows/package.yml b/.github/workflows/package.yml index bf4bfa406..b813e3803 100644 --- a/.github/workflows/package.yml +++ b/.github/workflows/package.yml @@ -15,8 +15,9 @@ jobs: java-version: 1.8 - name: Grant execute permission for gradlew run: chmod +x gradlew - - name: Build - run: ./gradlew build + # Todo: remove the test exclusion part for the core repo after the tests get fixed + - name: Building dagger core. All dependent sub project would be built as part of this + run: ./gradlew dagger-core:buildNeeded -x dagger-core:test publishJar: needs: build @@ -26,7 +27,11 @@ jobs: - uses: actions/setup-java@v1 with: java-version: 1.8 - - name: Publish package - run: ./gradlew publish + - name: Publish packages of common subprojects + run: ./gradlew :dagger-common:publish + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Publish packages of core + run: ./gradlew :dagger-core:publish env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/dagger-common/build.gradle b/dagger-common/build.gradle index 86adf1219..b2e7df00f 100644 --- a/dagger-common/build.gradle +++ b/dagger-common/build.gradle @@ -16,6 +16,7 @@ plugins { } def flinkVersion = System.getenv('flinkVersion') ?: '1.9.0' +version "0.0.1" description = """common dependencies for dagger""" @@ -50,7 +51,8 @@ sourceSets { dependencies { compileOnly 'org.apache.flink:flink-streaming-java_2.11:' + flinkVersion compileOnly group: 'org.apache.flink', name: 'flink-table-common', version: flinkVersion - dependenciesCommonJar 'com.gojek:stencil:2.0.15' + compileOnly group: 'org.apache.flink', name: 'flink-table', version: flinkVersion + compileOnly group: 'org.apache.flink', name: 'flink-table-api-java-bridge_2.11', version: flinkVersion testImplementation 'junit:junit:4.12' } @@ -72,22 +74,49 @@ jacocoTestReport { } jar { - duplicatesStrategy = DuplicatesStrategy.EXCLUDE zip64 true from { - (configurations.compileClasspath).collect { + (configurations.runtime).collect { it.isDirectory() ? it : zipTree(it) } } } -task fatJar(type: ShadowJar) { - description = "Builds a executable jar" - classifier = 'fat' - from(project.convention.getPlugin(JavaPluginConvention).sourceSets.main.output) - configurations = [project.configurations.runtimeClasspath, project.configurations.minimalCommonJar, project.configurations.dependenciesCommonJar] - exclude('META-INF/INDEX.LIST', 'META-INF/*.SF', 'META-INF/*.DSA', 'META-INF/*.RSA') - zip64 true - mergeServiceFiles() - append('reference.conf') +publishing { + publications { + shadow(MavenPublication) { + publication -> + project.shadow.component(publication) + } + } + + repositories { + maven { + name = "GitHubPackages" + url = "https://maven.pkg.github.com/odpf/dagger" + credentials { + username = System.getenv("GITHUB_ACTOR") + password = System.getenv("GITHUB_TOKEN") + } + } + } +} + +jacocoTestReport { + reports { + xml.enabled false + html.enabled true + csv.enabled false + } + finalizedBy jacocoTestCoverageVerification } + +jacocoTestCoverageVerification { + violationRules { + rule { + limit { + minimum = 0.8 + } + } + } +} \ No newline at end of file diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/contracts/Transformer.java b/dagger-common/src/main/java/io/odpf/dagger/common/contracts/Transformer.java deleted file mode 100644 index 70105cb82..000000000 --- a/dagger-common/src/main/java/io/odpf/dagger/common/contracts/Transformer.java +++ /dev/null @@ -1,8 +0,0 @@ -package io.odpf.dagger.common.contracts; - -import io.odpf.dagger.common.core.StreamInfo; - -public interface Transformer { - StreamInfo transform(StreamInfo streamInfo); -} - diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/contracts/UDFFactory.java b/dagger-common/src/main/java/io/odpf/dagger/common/contracts/UDFFactory.java deleted file mode 100644 index 61a71761e..000000000 --- a/dagger-common/src/main/java/io/odpf/dagger/common/contracts/UDFFactory.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.odpf.dagger.common.contracts; - -import org.apache.flink.table.functions.UserDefinedFunction; - -import java.util.Map; - -public interface UDFFactory { - void registerFunctions(); - - Map addfunctions(); -} diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/core/Transformer.java b/dagger-common/src/main/java/io/odpf/dagger/common/core/Transformer.java new file mode 100644 index 000000000..08e8b73e2 --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/core/Transformer.java @@ -0,0 +1,6 @@ +package io.odpf.dagger.common.core; + +public interface Transformer { + StreamInfo transform(StreamInfo streamInfo); +} + diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/udfs/AggregateUdf.java b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/AggregateUdf.java new file mode 100644 index 000000000..afdb4292b --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/AggregateUdf.java @@ -0,0 +1,19 @@ +package io.odpf.dagger.common.udfs; + +import io.odpf.dagger.common.udfs.telemetry.UdfMetricsManager; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.FunctionContext; + +public abstract class AggregateUdf extends AggregateFunction { + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + UdfMetricsManager udfMetricsManager = new UdfMetricsManager(context); + udfMetricsManager.registerGauge(getName()); + } + + public String getName() { + return this.getClass().getSimpleName(); + } +} diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/udfs/ScalarUdf.java b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/ScalarUdf.java new file mode 100644 index 000000000..cc86e8fda --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/ScalarUdf.java @@ -0,0 +1,19 @@ +package io.odpf.dagger.common.udfs; + +import io.odpf.dagger.common.udfs.telemetry.UdfMetricsManager; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.ScalarFunction; + +public abstract class ScalarUdf extends ScalarFunction { + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + UdfMetricsManager udfMetricsManager = new UdfMetricsManager(context); + udfMetricsManager.registerGauge(getName()); + } + + public String getName() { + return this.getClass().getSimpleName(); + } +} diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/udfs/TableUdf.java b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/TableUdf.java new file mode 100644 index 000000000..4a234acf6 --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/TableUdf.java @@ -0,0 +1,18 @@ +package io.odpf.dagger.common.udfs; + +import io.odpf.dagger.common.udfs.telemetry.UdfMetricsManager; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; + +public abstract class TableUdf extends TableFunction { + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + UdfMetricsManager udfMetricsManager = new UdfMetricsManager(context); + udfMetricsManager.registerGauge(getName()); + } + + public String getName() { + return this.getClass().getSimpleName(); + } +} diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/udfs/UdfFactory.java b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/UdfFactory.java new file mode 100644 index 000000000..d64a1855b --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/UdfFactory.java @@ -0,0 +1,28 @@ +package io.odpf.dagger.common.udfs; + +import org.apache.flink.table.api.java.StreamTableEnvironment; + +import java.util.HashSet; + +public abstract class UdfFactory { + private final StreamTableEnvironment streamTableEnvironment; + + public UdfFactory(StreamTableEnvironment streamTableEnvironment) { + this.streamTableEnvironment = streamTableEnvironment; + } + + final public void registerFunctions() { + HashSet scalarFunctions = getScalarUdfs(); + HashSet tableFunctions = getTableUdfs(); + HashSet aggregateFunctions = getAggregateUdfs(); + scalarFunctions.forEach((function) -> streamTableEnvironment.registerFunction(function.getName(), function)); + tableFunctions.forEach((function) -> streamTableEnvironment.registerFunction(function.getName(), function)); + aggregateFunctions.forEach((function) -> streamTableEnvironment.registerFunction(function.getName(), function)); + } + + public abstract HashSet getScalarUdfs(); + + public abstract HashSet getTableUdfs(); + + public abstract HashSet getAggregateUdfs(); +} diff --git a/dagger-common/src/main/java/io/odpf/dagger/common/udfs/telemetry/UdfMetricsManager.java b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/telemetry/UdfMetricsManager.java new file mode 100644 index 000000000..2ecd2cae2 --- /dev/null +++ b/dagger-common/src/main/java/io/odpf/dagger/common/udfs/telemetry/UdfMetricsManager.java @@ -0,0 +1,21 @@ +package io.odpf.dagger.common.udfs.telemetry; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.table.functions.FunctionContext; + +public class UdfMetricsManager { + private FunctionContext context; + private Integer gaugeValue = 1; + private static final String UDF_TELEMETRY_GROUP_KEY = "udf"; + private static final String GAUGE_ASPECT_NAME = "value"; + + public UdfMetricsManager(FunctionContext context) { + this.context = context; + } + + public void registerGauge(String udfValue) { + MetricGroup metricGroup = context.getMetricGroup().addGroup(UDF_TELEMETRY_GROUP_KEY, udfValue); + metricGroup.gauge(GAUGE_ASPECT_NAME, (Gauge) () -> gaugeValue); + } +} diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index 5fd5eadc8..137018825 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -23,16 +23,16 @@ plugins { def flinkVersion = System.getenv('flinkVersion') ?: '1.9.0' -def daggersVersion = '12.4.0' +def daggerVersion = '12.4.0' def dependenciesVersion = '0.1.0' -version "${flinkVersion}_${daggersVersion}" +version "${flinkVersion}_${daggerVersion}" def dependenciesArtifactVersion = "${flinkVersion}_${dependenciesVersion}" def minimalVersion = version + '_' + dependenciesVersion -description = """daggers to the heart!""" +description = """dagger to the heart!""" sourceCompatibility = 1.8 targetCompatibility = 1.8 @@ -254,19 +254,32 @@ publishing { project.shadow.component(publication) } minimalArtifact(MavenPublication) { - artifact file("$buildDir/libs/daggers-${minimalVersion}-minimal.jar") + artifact file("$buildDir/libs/dagger-core-${minimalVersion}-minimal.jar") groupId project.group artifactId project.name version = minimalVersion + '-minimal' } dependenciesArtifact(MavenPublication) { - artifact file("$buildDir/libs/daggers-${dependenciesArtifactVersion}-dependencies.jar") + artifact file("$buildDir/libs/dagger-core-${dependenciesArtifactVersion}-dependencies.jar") groupId project.group artifactId project.name version = dependenciesArtifactVersion + '-dependencies' } } + + + repositories { + maven { + name = "GitHubPackages" + url = "https://maven.pkg.github.com/odpf/dagger" + credentials { + username = System.getenv("GITHUB_ACTOR") + password = System.getenv("GITHUB_TOKEN") + } + } + } } + artifactory { publish { defaults { @@ -280,7 +293,7 @@ artifactory { } clientConfig.setIncludeEnvVars(true) - clientConfig.info.setBuildName('daggers') + clientConfig.info.setBuildName('dagger') clientConfig.info.setBuildNumber(System.env.BUILD_NUMBER) } @@ -288,6 +301,9 @@ project.afterEvaluate { tasks.withType(PublishToMavenRepository) { dependsOn minimalJar, dependenciesJar } + tasks.withType(PublishToMavenLocal) { + dependsOn minimalJar, dependenciesJar + } } diff --git a/dagger-core/env/local.properties b/dagger-core/env/local.properties index 0f0807eb0..94ab9b908 100644 --- a/dagger-core/env/local.properties +++ b/dagger-core/env/local.properties @@ -24,5 +24,5 @@ SHUTDOWN_PERIOD=10000 TELEMETRY_ENABLED=true # == Others == -FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.ScalarFuctionFactory +FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.FunctionFactory ROWTIME_ATTRIBUTE_NAME=rowtime \ No newline at end of file diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java index fda99e8e7..1c60bc257 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java @@ -7,12 +7,11 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; -import io.odpf.dagger.common.contracts.UDFFactory; +import io.odpf.dagger.common.udfs.UdfFactory; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.exception.UDFFactoryClassNotDefinedException; import io.odpf.dagger.processors.PostProcessorFactory; @@ -27,9 +26,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; public class StreamManager { @@ -88,14 +85,6 @@ public StreamManager registerSourceWithPreProcessors() { return this; } - // TODO : Do I even need this - private List getStencilUrls() { - return Arrays.stream(configuration.getString(Constants.STENCIL_URL_KEY, Constants.STENCIL_URL_DEFAULT).split(",")) - .map(String::trim) - .collect(Collectors.toList()); - } - - public StreamManager registerFunctions() { String[] functionFactoryClasses = configuration .getString(Constants.FUNCTION_FACTORY_CLASSES_KEY, Constants.FUNCTION_FACTORY_CLASSES_DEFAULT) @@ -103,7 +92,7 @@ public StreamManager registerFunctions() { for (String className : functionFactoryClasses) { try { - UDFFactory udfFactory = getUDFFactory(className); + UdfFactory udfFactory = getUdfFactory(className); udfFactory.registerFunctions(); } catch (ReflectiveOperationException e) { throw new UDFFactoryClassNotDefinedException(e.getMessage()); @@ -112,10 +101,11 @@ public StreamManager registerFunctions() { return this; } - private UDFFactory getUDFFactory(String udfFactoryClassName) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + private UdfFactory getUdfFactory(String udfFactoryClassName) throws ClassNotFoundException, + NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { Class udfFactoryClass = Class.forName(udfFactoryClassName); - Constructor udfFactoryClassConstructor = udfFactoryClass.getConstructor(Configuration.class, TableEnvironment.class); - return (UDFFactory) udfFactoryClassConstructor.newInstance(configuration, tableEnvironment); + Constructor udfFactoryClassConstructor = udfFactoryClass.getConstructor(StreamTableEnvironment.class); + return (UdfFactory) udfFactoryClassConstructor.newInstance(tableEnvironment); } public StreamManager registerOutputStream() { diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/Streams.java b/dagger-core/src/main/java/io/odpf/dagger/core/Streams.java index 26cff9fd7..796881808 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/Streams.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/Streams.java @@ -1,22 +1,24 @@ package io.odpf.dagger.core; -import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; -import io.odpf.dagger.source.FlinkKafkaConsumerCustom; -import io.odpf.dagger.source.ProtoDeserializer; -import com.google.gson.Gson; -import io.odpf.dagger.utils.Constants; - import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.types.Row; +import com.google.gson.Gson; +import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; +import io.odpf.dagger.source.FlinkKafkaConsumerCustom; +import io.odpf.dagger.source.ProtoDeserializer; +import io.odpf.dagger.utils.Constants; + import java.sql.Timestamp; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import static io.odpf.dagger.metrics.telemetry.TelemetryTypes.*; +import static io.odpf.dagger.metrics.telemetry.TelemetryTypes.INPUT_PROTO; +import static io.odpf.dagger.metrics.telemetry.TelemetryTypes.INPUT_STREAM; +import static io.odpf.dagger.metrics.telemetry.TelemetryTypes.INPUT_TOPIC; public class Streams implements TelemetryPublisher { private static final String KAFKA_PREFIX = "kafka_consumer_config_"; diff --git a/dagger-core/src/main/java/io/odpf/dagger/metrics/telemetry/AggregatedUDFTelemetryPublisher.java b/dagger-core/src/main/java/io/odpf/dagger/metrics/telemetry/AggregatedUDFTelemetryPublisher.java deleted file mode 100644 index fa5334cd2..000000000 --- a/dagger-core/src/main/java/io/odpf/dagger/metrics/telemetry/AggregatedUDFTelemetryPublisher.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.odpf.dagger.metrics.telemetry; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.functions.AggregateFunction; - -import io.odpf.dagger.utils.Constants; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This class only publishes metrics for UDFs of AggregatedFunction type - * For ScalarFunction the metrics are sent directly from UDFs - * For AggregatedFunction it can not be done due to this bug in flink - * ISSUE : https://issues.apache.org/jira/browse/FLINK-15040 - */ -public class AggregatedUDFTelemetryPublisher implements TelemetryPublisher { - - private Configuration configuration; - private Map aggregateFunctions; - private Map> metrics = new HashMap<>(); - - public AggregatedUDFTelemetryPublisher(Configuration configuration, Map aggregateFunctions) { - this.configuration = configuration; - this.aggregateFunctions = aggregateFunctions; - } - - @Override - public void preProcessBeforeNotifyingSubscriber() { - String lowerCaseSQLQuery = configuration.getString(Constants.SQL_QUERY, Constants.SQL_QUERY_DEFAULT).toLowerCase(); - aggregateFunctions.keySet().forEach(aggregateFunctionName -> { - if (lowerCaseSQLQuery.contains(aggregateFunctionName.toLowerCase())) { - addMetric(TelemetryTypes.UDF.getValue(), aggregateFunctionName); - } - }); - } - - @Override - public Map> getTelemetry() { - return metrics; - } - - private void addMetric(String key, String value) { - metrics.computeIfAbsent(key, k -> new ArrayList<>()).add(value); - } -} diff --git a/dagger-core/src/main/java/io/odpf/dagger/metrics/telemetry/TelemetryTypes.java b/dagger-core/src/main/java/io/odpf/dagger/metrics/telemetry/TelemetryTypes.java index 422d2c271..590ac5509 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/metrics/telemetry/TelemetryTypes.java +++ b/dagger-core/src/main/java/io/odpf/dagger/metrics/telemetry/TelemetryTypes.java @@ -10,8 +10,7 @@ public enum TelemetryTypes { OUTPUT_STREAM("output_stream"), POST_PROCESSOR_TYPE("post_processor_type"), PRE_PROCESSOR_TYPE("pre_processor_type"), - SOURCE_METRIC_ID("source_metricId"), - UDF("udf"); + SOURCE_METRIC_ID("source_metricId"); public String getValue() { return value; diff --git a/dagger-core/src/main/java/io/odpf/dagger/processors/ParentPostProcessor.java b/dagger-core/src/main/java/io/odpf/dagger/processors/ParentPostProcessor.java index afc420901..fe79e3414 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/processors/ParentPostProcessor.java +++ b/dagger-core/src/main/java/io/odpf/dagger/processors/ParentPostProcessor.java @@ -1,20 +1,20 @@ package io.odpf.dagger.processors; -import io.odpf.dagger.processors.types.PostProcessor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; +import io.odpf.dagger.metrics.telemetry.TelemetrySubscriber; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.core.StencilClientOrchestrator; -import io.odpf.dagger.metrics.telemetry.TelemetrySubscriber; +import io.odpf.dagger.processors.common.FetchOutputDecorator; +import io.odpf.dagger.processors.common.InitializationDecorator; import io.odpf.dagger.processors.external.ExternalMetricConfig; import io.odpf.dagger.processors.external.ExternalPostProcessor; import io.odpf.dagger.processors.external.SchemaConfig; -import io.odpf.dagger.processors.common.FetchOutputDecorator; -import io.odpf.dagger.processors.common.InitializationDecorator; import io.odpf.dagger.processors.internal.InternalPostProcessor; import io.odpf.dagger.processors.transformers.TransformProcessor; +import io.odpf.dagger.processors.types.PostProcessor; import io.odpf.dagger.utils.Constants; import java.util.ArrayList; diff --git a/dagger-core/src/main/java/io/odpf/dagger/processors/PreProcessorOrchestrator.java b/dagger-core/src/main/java/io/odpf/dagger/processors/PreProcessorOrchestrator.java index 78b02cc93..761fdcb1e 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/processors/PreProcessorOrchestrator.java +++ b/dagger-core/src/main/java/io/odpf/dagger/processors/PreProcessorOrchestrator.java @@ -1,12 +1,13 @@ package io.odpf.dagger.processors; +import org.apache.flink.configuration.Configuration; + import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.processors.common.ValidRecordsDecorator; import io.odpf.dagger.processors.telemetry.processor.MetricsTelemetryExporter; import io.odpf.dagger.processors.transformers.TransformProcessor; import io.odpf.dagger.processors.types.Preprocessor; -import org.apache.flink.configuration.Configuration; import java.util.ArrayList; import java.util.List; diff --git a/dagger-core/src/main/java/io/odpf/dagger/processors/external/AsyncConnector.java b/dagger-core/src/main/java/io/odpf/dagger/processors/external/AsyncConnector.java index 186d26af6..47b3f6534 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/processors/external/AsyncConnector.java +++ b/dagger-core/src/main/java/io/odpf/dagger/processors/external/AsyncConnector.java @@ -5,22 +5,27 @@ import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.types.Row; +import com.google.protobuf.Descriptors; +import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; import io.odpf.dagger.exception.DescriptorNotFoundException; import io.odpf.dagger.exception.InvalidConfigurationException; import io.odpf.dagger.metrics.MeterStatsManager; +import io.odpf.dagger.metrics.aspects.ExternalSourceAspects; import io.odpf.dagger.metrics.reporters.ErrorReporter; import io.odpf.dagger.metrics.reporters.ErrorReporterFactory; -import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; +import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.processors.ColumnNameManager; import io.odpf.dagger.processors.common.DescriptorManager; import io.odpf.dagger.processors.common.EndpointHandler; import io.odpf.dagger.processors.types.SourceConfig; -import com.google.protobuf.Descriptors; -import io.odpf.dagger.metrics.aspects.ExternalSourceAspects; -import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import org.apache.commons.lang3.StringUtils; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.IllegalFormatException; +import java.util.List; +import java.util.Map; +import java.util.UnknownFormatConversionException; import java.util.concurrent.TimeoutException; import static java.util.Collections.singleton; diff --git a/dagger-core/src/main/java/io/odpf/dagger/processors/external/grpc/GrpcStreamDecorator.java b/dagger-core/src/main/java/io/odpf/dagger/processors/external/grpc/GrpcStreamDecorator.java index ea9a855d2..7fa7348b9 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/processors/external/grpc/GrpcStreamDecorator.java +++ b/dagger-core/src/main/java/io/odpf/dagger/processors/external/grpc/GrpcStreamDecorator.java @@ -1,13 +1,13 @@ package io.odpf.dagger.processors.external.grpc; -import io.odpf.dagger.processors.external.ExternalMetricConfig; -import io.odpf.dagger.processors.external.SchemaConfig; -import io.odpf.dagger.processors.types.StreamDecorator; - import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; +import io.odpf.dagger.processors.external.ExternalMetricConfig; +import io.odpf.dagger.processors.external.SchemaConfig; +import io.odpf.dagger.processors.types.StreamDecorator; + import java.util.concurrent.TimeUnit; public class GrpcStreamDecorator implements StreamDecorator { diff --git a/dagger-core/src/main/java/io/odpf/dagger/processors/longbow/processor/LongbowReader.java b/dagger-core/src/main/java/io/odpf/dagger/processors/longbow/processor/LongbowReader.java index 6ffb46073..1170ae774 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/processors/longbow/processor/LongbowReader.java +++ b/dagger-core/src/main/java/io/odpf/dagger/processors/longbow/processor/LongbowReader.java @@ -1,31 +1,36 @@ package io.odpf.dagger.processors.longbow.processor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.types.Row; + +import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; import io.odpf.dagger.metrics.MeterStatsManager; import io.odpf.dagger.metrics.aspects.LongbowReaderAspects; import io.odpf.dagger.metrics.reporters.ErrorReporter; import io.odpf.dagger.metrics.reporters.ErrorReporterFactory; -import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; +import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.processors.longbow.LongbowSchema; import io.odpf.dagger.processors.longbow.data.LongbowData; import io.odpf.dagger.processors.longbow.exceptions.LongbowReaderException; import io.odpf.dagger.processors.longbow.outputRow.ReaderOutputRow; -import io.odpf.dagger.processors.longbow.request.ScanRequestFactory; import io.odpf.dagger.processors.longbow.range.LongbowRange; +import io.odpf.dagger.processors.longbow.request.ScanRequestFactory; import io.odpf.dagger.processors.longbow.storage.LongbowStore; import io.odpf.dagger.processors.longbow.storage.ScanRequest; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.async.ResultFuture; -import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; -import org.apache.flink.types.Row; - -import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.utils.Constants; import org.apache.hadoop.hbase.client.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; import static java.time.Duration.between; @@ -86,8 +91,9 @@ public void close() throws Exception { super.close(); meterStatsManager.markEvent(LongbowReaderAspects.CLOSE_CONNECTION_ON_READER); LOGGER.error("LongbowReader : Connection closed"); - if (longBowStore != null) + if (longBowStore != null) { longBowStore.close(); + } } @Override diff --git a/dagger-core/src/main/java/io/odpf/dagger/processors/longbow/processor/LongbowWriter.java b/dagger-core/src/main/java/io/odpf/dagger/processors/longbow/processor/LongbowWriter.java index 4deeea712..5eaa6acd9 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/processors/longbow/processor/LongbowWriter.java +++ b/dagger-core/src/main/java/io/odpf/dagger/processors/longbow/processor/LongbowWriter.java @@ -1,22 +1,22 @@ package io.odpf.dagger.processors.longbow.processor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.types.Row; + +import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; import io.odpf.dagger.metrics.MeterStatsManager; import io.odpf.dagger.metrics.aspects.LongbowWriterAspects; import io.odpf.dagger.metrics.reporters.ErrorReporter; import io.odpf.dagger.metrics.reporters.ErrorReporterFactory; -import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; +import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.processors.longbow.LongbowSchema; import io.odpf.dagger.processors.longbow.exceptions.LongbowWriterException; import io.odpf.dagger.processors.longbow.outputRow.WriterOutputRow; import io.odpf.dagger.processors.longbow.request.PutRequestFactory; import io.odpf.dagger.processors.longbow.storage.LongbowStore; import io.odpf.dagger.processors.longbow.storage.PutRequest; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.async.ResultFuture; -import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; -import org.apache.flink.types.Row; - -import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.utils.Constants; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; @@ -24,7 +24,11 @@ import org.threeten.bp.Duration; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; @@ -67,11 +71,13 @@ public LongbowWriter(Configuration configuration, LongbowSchema longbowSchema, P @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - if (longBowStore == null) + if (longBowStore == null) { longBowStore = LongbowStore.create(configuration); + } - if (meterStatsManager == null) + if (meterStatsManager == null) { meterStatsManager = new MeterStatsManager(getRuntimeContext(), true); + } meterStatsManager.register("longbow.writer", LongbowWriterAspects.values()); if (errorReporter == null) { @@ -139,8 +145,9 @@ public void timeout(Row input, ResultFuture resultFuture) throws Exception @Override public void close() throws Exception { super.close(); - if (longBowStore != null) + if (longBowStore != null) { longBowStore.close(); + } meterStatsManager.markEvent(LongbowWriterAspects.CLOSE_CONNECTION_ON_WRITER); LOGGER.error("LongbowWriter : Connection closed"); } diff --git a/dagger-core/src/main/java/io/odpf/dagger/processors/telemetry/processor/MetricsTelemetryExporter.java b/dagger-core/src/main/java/io/odpf/dagger/processors/telemetry/processor/MetricsTelemetryExporter.java index b7abc4f60..b07f87843 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/processors/telemetry/processor/MetricsTelemetryExporter.java +++ b/dagger-core/src/main/java/io/odpf/dagger/processors/telemetry/processor/MetricsTelemetryExporter.java @@ -1,17 +1,21 @@ package io.odpf.dagger.processors.telemetry.processor; -import io.odpf.dagger.metrics.GaugeStatsManager; -import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; -import io.odpf.dagger.metrics.telemetry.TelemetrySubscriber; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Row; +import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; +import io.odpf.dagger.metrics.telemetry.TelemetrySubscriber; +import io.odpf.dagger.metrics.GaugeStatsManager; import io.odpf.dagger.metrics.aspects.TelemetryAspects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; public class MetricsTelemetryExporter extends RichMapFunction implements TelemetrySubscriber { private static final Logger LOGGER = LoggerFactory.getLogger(MetricsTelemetryExporter.class.getName()); diff --git a/dagger-core/src/main/java/io/odpf/dagger/processors/transformers/TransformProcessor.java b/dagger-core/src/main/java/io/odpf/dagger/processors/transformers/TransformProcessor.java index ace8389df..43375632c 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/processors/transformers/TransformProcessor.java +++ b/dagger-core/src/main/java/io/odpf/dagger/processors/transformers/TransformProcessor.java @@ -2,10 +2,10 @@ import org.apache.flink.configuration.Configuration; -import io.odpf.dagger.common.contracts.Transformer; +import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; +import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.exception.TransformClassNotDefinedException; -import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.processors.PostProcessorConfig; import io.odpf.dagger.processors.PreProcessorConfig; diff --git a/dagger-core/src/main/java/io/odpf/dagger/sink/SinkOrchestrator.java b/dagger-core/src/main/java/io/odpf/dagger/sink/SinkOrchestrator.java index ce5e0b426..39f3b1875 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/sink/SinkOrchestrator.java +++ b/dagger-core/src/main/java/io/odpf/dagger/sink/SinkOrchestrator.java @@ -6,8 +6,8 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; import org.apache.flink.types.Row; -import io.odpf.dagger.core.StencilClientOrchestrator; import io.odpf.dagger.metrics.telemetry.TelemetryPublisher; +import io.odpf.dagger.core.StencilClientOrchestrator; import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.sink.influx.ErrorHandler; import io.odpf.dagger.sink.influx.InfluxDBFactoryWrapper; @@ -15,9 +15,15 @@ import io.odpf.dagger.sink.log.LogSink; import io.odpf.dagger.utils.Constants; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; -import static io.odpf.dagger.metrics.telemetry.TelemetryTypes.*; +import static io.odpf.dagger.metrics.telemetry.TelemetryTypes.OUTPUT_PROTO; +import static io.odpf.dagger.metrics.telemetry.TelemetryTypes.OUTPUT_TOPIC; +import static io.odpf.dagger.metrics.telemetry.TelemetryTypes.SINK_TYPE; public class SinkOrchestrator implements TelemetryPublisher { diff --git a/dagger-core/src/test/java/io/odpf/dagger/metrics/telemetry/AggregatedUDFTelemetryPublisherTest.java b/dagger-core/src/test/java/io/odpf/dagger/metrics/telemetry/AggregatedUDFTelemetryPublisherTest.java deleted file mode 100644 index 4b1ffd45e..000000000 --- a/dagger-core/src/test/java/io/odpf/dagger/metrics/telemetry/AggregatedUDFTelemetryPublisherTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package io.odpf.dagger.metrics.telemetry; - -import com.gojek.dagger.udf.DistinctCount; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.functions.AggregateFunction; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import static io.odpf.dagger.utils.Constants.SQL_QUERY; -import static io.odpf.dagger.utils.Constants.SQL_QUERY_DEFAULT; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - -public class AggregatedUDFTelemetryPublisherTest { - - @Mock - private Configuration configuration; - - @Before - public void setUp() { - initMocks(this); - } - - @Test - public void shouldRegisterAggregatedUDFs() { - ArrayList aggregatedFunctionNames = new ArrayList<>(); - aggregatedFunctionNames.add("DistinctCount"); - HashMap> expectedMetrics = new HashMap<>(); - expectedMetrics.put("udf", aggregatedFunctionNames); - - HashMap aggregateFunctionHashMap = new HashMap<>(); - aggregateFunctionHashMap.put("DistinctCount", new DistinctCount()); - AggregatedUDFTelemetryPublisher aggregatedUDFTelemetryPublisher = new AggregatedUDFTelemetryPublisher(configuration, aggregateFunctionHashMap); - when(configuration.getString(SQL_QUERY, SQL_QUERY_DEFAULT)).thenReturn("SELECT DistinctCount(driver_id) AS " + - "distinctCountDrivers, TUMBLE_END(rowtime, INTERVAL '60' SECOND) AS window_timestamp from data_stream_0" + - " GROUP BY TUMBLE (rowtime, INTERVAL '60' SECOND)"); - aggregatedUDFTelemetryPublisher.preProcessBeforeNotifyingSubscriber(); - - Assert.assertEquals(expectedMetrics, aggregatedUDFTelemetryPublisher.getTelemetry()); - } - - @Test - public void shouldNotRegisterNonAggregatedUDFs() { - HashMap aggregateFunctionHashMap = new HashMap<>(); - aggregateFunctionHashMap.put("DistinctCount", new DistinctCount()); - AggregatedUDFTelemetryPublisher aggregatedUDFTelemetryPublisher = new AggregatedUDFTelemetryPublisher(configuration, aggregateFunctionHashMap); - when(configuration.getString(SQL_QUERY, SQL_QUERY_DEFAULT)).thenReturn("SELECT * FROM `booking`"); - aggregatedUDFTelemetryPublisher.preProcessBeforeNotifyingSubscriber(); - - Assert.assertEquals(new HashMap<>(), aggregatedUDFTelemetryPublisher.getTelemetry()); - } -} \ No newline at end of file diff --git a/dagger-core/src/test/java/io/odpf/dagger/processors/transformers/TransformProcessorTest.java b/dagger-core/src/test/java/io/odpf/dagger/processors/transformers/TransformProcessorTest.java index 8dac410b6..6c5cbd33a 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/processors/transformers/TransformProcessorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/processors/transformers/TransformProcessorTest.java @@ -1,6 +1,6 @@ package io.odpf.dagger.processors.transformers; -import io.odpf.dagger.common.contracts.Transformer; +import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.common.core.StreamInfo; import io.odpf.dagger.metrics.telemetry.TelemetryTypes; import io.odpf.dagger.processors.telemetry.processor.MetricsTelemetryExporter; diff --git a/dagger-functions/build.gradle b/dagger-functions/build.gradle index 969b64485..7b0daa100 100644 --- a/dagger-functions/build.gradle +++ b/dagger-functions/build.gradle @@ -27,6 +27,7 @@ configurations { minimalFunctionsJar.extendsFrom runtime compileOnly.extendsFrom minimalFunctionsJar + compileOnly.extendsFrom dependenciesFunctionsJar testCompile.extendsFrom compileOnly } @@ -53,7 +54,9 @@ dependencies { compileOnly group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: flinkVersion compileOnly group: 'org.apache.flink', name: 'flink-metrics-dropwizard', version: flinkVersion - testImplementation 'junit:junit:4.12' + testCompile group: 'junit', name: 'junit', version: '4.12' + testCompile 'org.mockito:mockito-core:2.0.99-beta' + testCompile gradleTestKit() } test { @@ -71,3 +74,13 @@ jacocoTestReport { } finalizedBy jacocoTestCoverageVerification } + +jacocoTestCoverageVerification { + violationRules { + rule { + limit { + minimum = 0.9 + } + } + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java index 60b3d4e06..465bba780 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/SQLTransformer.java @@ -10,7 +10,7 @@ import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; -import io.odpf.dagger.common.contracts.Transformer; +import io.odpf.dagger.common.core.Transformer; import io.odpf.dagger.common.core.StreamInfo; import java.io.Serializable; diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/aggregate/DistinctCount.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/aggregate/DistinctCount.java new file mode 100644 index 000000000..52216ebf9 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/aggregate/DistinctCount.java @@ -0,0 +1,32 @@ +package io.odpf.dagger.functions.udfs.aggregate; + +import io.odpf.dagger.common.udfs.AggregateUdf; + +import io.odpf.dagger.functions.udfs.aggregate.accumulator.distinctcount.DistinctCountAccumulator; + +public class DistinctCount extends AggregateUdf { + + @Override + public DistinctCountAccumulator createAccumulator() { + return new DistinctCountAccumulator(); + } + + @Override + public Integer getValue(DistinctCountAccumulator distinctCountAccumulator) { + return distinctCountAccumulator.count(); + } + + /** + * returns distinct count of a field in input stream. + * + * @param distinctCountAccumulator the distinct count accumulator + * @param item fieldName + * @author prakhar.m + */ + public void accumulate(DistinctCountAccumulator distinctCountAccumulator, String item) { + if (item == null) { + return; + } + distinctCountAccumulator.add(item); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/aggregate/accumulator/distinctcount/DistinctCountAccumulator.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/aggregate/accumulator/distinctcount/DistinctCountAccumulator.java new file mode 100644 index 000000000..d6d416076 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/aggregate/accumulator/distinctcount/DistinctCountAccumulator.java @@ -0,0 +1,16 @@ +package io.odpf.dagger.functions.udfs.aggregate.accumulator.distinctcount; + +import java.io.Serializable; +import java.util.HashSet; + +public class DistinctCountAccumulator implements Serializable { + private HashSet distinctItems = new HashSet<>(); + + public int count() { + return distinctItems.size(); + } + + public void add(String item) { + distinctItems.add(item); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/FunctionFactory.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/FunctionFactory.java new file mode 100644 index 000000000..d17d84d25 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/FunctionFactory.java @@ -0,0 +1,41 @@ +package io.odpf.dagger.functions.udfs.factories; + +import io.odpf.dagger.common.udfs.AggregateUdf; +import io.odpf.dagger.common.udfs.ScalarUdf; +import io.odpf.dagger.common.udfs.TableUdf; +import io.odpf.dagger.functions.udfs.aggregate.DistinctCount; +import io.odpf.dagger.functions.udfs.scalar.EndOfMonth; +import org.apache.flink.table.api.java.StreamTableEnvironment; + +import io.odpf.dagger.common.udfs.UdfFactory; +import io.odpf.dagger.functions.udfs.table.HistogramBucket; + +import java.util.HashSet; + +public class FunctionFactory extends UdfFactory { + + public FunctionFactory(StreamTableEnvironment streamTableEnvironment) { + super(streamTableEnvironment); + } + + @Override + public HashSet getScalarUdfs() { + HashSet scalarUdfs = new HashSet<>(); + scalarUdfs.add(new EndOfMonth()); + return scalarUdfs; + } + + @Override + public HashSet getTableUdfs() { + HashSet tableUdfs = new HashSet<>(); + tableUdfs.add(new HistogramBucket()); + return tableUdfs; + } + + @Override + public HashSet getAggregateUdfs() { + HashSet aggregateUdfs = new HashSet<>(); + aggregateUdfs.add(new DistinctCount()); + return aggregateUdfs; + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/ScalarFuctionFactory.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/ScalarFuctionFactory.java deleted file mode 100644 index 05959c23f..000000000 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/ScalarFuctionFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -package io.odpf.dagger.functions.udfs.factories; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.functions.UserDefinedFunction; - -import io.odpf.dagger.common.contracts.UDFFactory; -import io.odpf.dagger.functions.udfs.EndOfMonth; - -import java.util.HashMap; -import java.util.Map; - -public class ScalarFuctionFactory implements UDFFactory { - private Configuration daggerConfig; - private TableEnvironment tableEnvironment; - - public ScalarFuctionFactory(Configuration daggerConfig, TableEnvironment tableEnvironment) { - this.daggerConfig = daggerConfig; - this.tableEnvironment = tableEnvironment; - } - - public void registerFunctions() { - addfunctions().forEach((scalarFunctionName, scalarUDF) -> { - tableEnvironment.registerFunction(scalarFunctionName, (ScalarFunction) scalarUDF); - }); - } - - public Map addfunctions() { - HashMap scalarFunctions = new HashMap(); - scalarFunctions.put("EndOfMonth", new EndOfMonth()); - return scalarFunctions; - } -} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/EndOfMonth.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/scalar/EndOfMonth.java similarity index 80% rename from dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/EndOfMonth.java rename to dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/scalar/EndOfMonth.java index a9bbd8cf7..4539c82f2 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/EndOfMonth.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/scalar/EndOfMonth.java @@ -1,25 +1,18 @@ -package io.odpf.dagger.functions.udfs; +package io.odpf.dagger.functions.udfs.scalar; -import org.apache.flink.table.functions.FunctionContext; -import org.apache.flink.table.functions.ScalarFunction; +import io.odpf.dagger.common.udfs.ScalarUdf; import java.util.Calendar; import java.util.Date; import java.util.TimeZone; -public class EndOfMonth extends ScalarFunction { +public class EndOfMonth extends ScalarUdf { private static final Integer END_OF_DAY_HOUR = 23; private static final Integer END_OF_DAY_MINUTE_AND_SECOND = 59; private static final Integer MAX_MILLISECONDS = 999; private static final Integer SECOND_IN_MILLIS = 1000; - @Override - public void open(FunctionContext context) throws Exception { - super.open(context); - } - - /** * Calculates the seconds in Unix timestamp for end of a month of a given timestamp second and timezone. * diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/table/HistogramBucket.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/table/HistogramBucket.java new file mode 100644 index 000000000..b2052d81f --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/table/HistogramBucket.java @@ -0,0 +1,27 @@ +package io.odpf.dagger.functions.udfs.table; + +import io.odpf.dagger.common.udfs.TableUdf; +import org.apache.flink.api.java.tuple.Tuple1; + +import java.util.Arrays; + + +public class HistogramBucket extends TableUdf> { + + /** + * This UDTF returns buckets for given value to calculate histograms. + * see https://github.com/influxdata/telegraf/tree/master/plugins/aggregators/histogram#tags + * + * @param dValue Value to be compared + * @param buckets Buckets for Cumulative Histograms + * @author lavkesh.lahngir + */ + public void eval(double dValue, String buckets) { + // Always emit the bucket for '+Inf' + collect(new Tuple1<>("+Inf")); + Arrays.stream(buckets.split(",")). + filter(bucket -> dValue <= Double.parseDouble(bucket)). + map(Tuple1::new). + forEach(this::collect); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java new file mode 100644 index 000000000..1b6891a4b --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/transformers/SQLTransformerTest.java @@ -0,0 +1,202 @@ +package io.odpf.dagger.functions.transformers; + +import io.odpf.dagger.common.core.StreamInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class SQLTransformerTest { + + @Mock + private DataStream inputStream; + + @Mock + private StreamExecutionEnvironment streamExecutionEnvironment; + + @Mock + private StreamTableEnvironment streamTableEnvironment; + + @Mock + private Table table; + + @Mock + private TableSchema tableSchema; + + @Mock + private DataStream> retractStream; + + @Mock + private SingleOutputStreamOperator> filteredRetractStream; + + @Mock + private SingleOutputStreamOperator outputStream; + + @Mock + private SingleOutputStreamOperator watermarkedStream; + + @Mock + private Configuration configuration; + + @Before + public void setup() { + initMocks(this); + when(inputStream.getExecutionEnvironment()).thenReturn(streamExecutionEnvironment); + } + + @Test + public void shouldApplyThePassedSQL() { + HashMap transformationArguments = new HashMap<>(); + String sqlQuery = "SELECT * FROM data_stream"; + transformationArguments.put("sqlQuery", sqlQuery); + String[] columnNames = {"order_number", "service_type", "status"}; + String schema = String.join(",", columnNames); + + when(streamTableEnvironment.sqlQuery(sqlQuery)).thenReturn(table); + when(table.getSchema()).thenReturn(tableSchema); + when(tableSchema.getFieldNames()).thenReturn(columnNames); + when(streamTableEnvironment.toRetractStream(table, Row.class)).thenReturn(retractStream); + when(retractStream.filter(any())).thenReturn(filteredRetractStream); + when(filteredRetractStream.map(any())).thenReturn(outputStream); + SQLTransformer sqlTransformer = new SQLTransformerStub(transformationArguments, columnNames); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); + StreamInfo outputStreamInfo = sqlTransformer.transform(inputStreamInfo); + + verify(streamTableEnvironment, times(1)).registerDataStream("data_stream", inputStream, schema); + verify(streamTableEnvironment, times(1)).sqlQuery(sqlQuery); + assertEquals(outputStream, outputStreamInfo.getDataStream()); + } + + @Test + public void shouldReturnColumnNamesReturnedBySQL() { + HashMap transformationArguments = new HashMap<>(); + String sqlQuery = "SELECT order_number, service_type FROM data_stream"; + transformationArguments.put("sqlQuery", sqlQuery); + String[] columnNames = {"order_number", "service_type", "status"}; + + when(streamTableEnvironment.sqlQuery(sqlQuery)).thenReturn(table); + when(table.getSchema()).thenReturn(tableSchema); + String[] outputColumns = {"order_number", "service_type"}; + when(tableSchema.getFieldNames()).thenReturn(outputColumns); + when(streamTableEnvironment.toRetractStream(table, Row.class)).thenReturn(retractStream); + when(retractStream.filter(any())).thenReturn(filteredRetractStream); + when(filteredRetractStream.map(any())).thenReturn(outputStream); + SQLTransformer sqlTransformer = new SQLTransformerStub(transformationArguments, columnNames); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); + StreamInfo outputStreamInfo = sqlTransformer.transform(inputStreamInfo); + + assertArrayEquals(outputColumns, outputStreamInfo.getColumnNames()); + } + + @Test + public void shouldAssignTimestampAndWatermarksIfRowtimeIsPassedInColumns() { + HashMap transformationArguments = new HashMap<>(); + String sqlQuery = "SELECT * FROM data_stream"; + transformationArguments.put("sqlQuery", sqlQuery); + String[] columnNames = {"order_number", "service_type", "rowtime"}; + String schema = "order_number,service_type,rowtime.rowtime"; + + when(streamTableEnvironment.sqlQuery(sqlQuery)).thenReturn(table); + when(table.getSchema()).thenReturn(tableSchema); + when(tableSchema.getFieldNames()).thenReturn(columnNames); + when(streamTableEnvironment.toRetractStream(table, Row.class)).thenReturn(retractStream); + when(retractStream.filter(any())).thenReturn(filteredRetractStream); + when(filteredRetractStream.map(any())).thenReturn(outputStream); + when(inputStream.assignTimestampsAndWatermarks(any(BoundedOutOfOrdernessTimestampExtractor.class))).thenReturn(watermarkedStream); + SQLTransformer sqlTransformer = new SQLTransformerStub(transformationArguments, columnNames); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); + StreamInfo outputStreamInfo = sqlTransformer.transform(inputStreamInfo); + + verify(streamTableEnvironment, times(1)).registerDataStream("data_stream", watermarkedStream, schema); + verify(streamTableEnvironment, times(1)).sqlQuery(sqlQuery); + verify(inputStream, times(1)).assignTimestampsAndWatermarks(any(BoundedOutOfOrdernessTimestampExtractor.class)); + assertEquals(outputStream, outputStreamInfo.getDataStream()); + } + + @Test + public void shouldNotAssignTimestampAndWatermarksIfRowtimeIsNotPassedInColumns() { + HashMap transformationArguments = new HashMap<>(); + String sqlQuery = "SELECT * FROM data_stream"; + transformationArguments.put("sqlQuery", sqlQuery); + String[] columnNames = {"order_number", "service_type", "status"}; + String schema = "order_number,service_type,status"; + + when(streamTableEnvironment.sqlQuery(sqlQuery)).thenReturn(table); + when(table.getSchema()).thenReturn(tableSchema); + when(tableSchema.getFieldNames()).thenReturn(columnNames); + when(streamTableEnvironment.toRetractStream(table, Row.class)).thenReturn(retractStream); + when(retractStream.filter(any())).thenReturn(filteredRetractStream); + when(filteredRetractStream.map(any())).thenReturn(outputStream); + SQLTransformer sqlTransformer = new SQLTransformerStub(transformationArguments, columnNames); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); + StreamInfo outputStreamInfo = sqlTransformer.transform(inputStreamInfo); + + verify(streamTableEnvironment, times(1)).registerDataStream("data_stream", inputStream, schema); + verify(streamTableEnvironment, times(1)).sqlQuery(sqlQuery); + verify(inputStream, times(0)).assignTimestampsAndWatermarks(any(BoundedOutOfOrdernessTimestampExtractor.class)); + assertEquals(outputStream, outputStreamInfo.getDataStream()); + } + + @Test + public void shouldAssignPassedTableNameIfPassedInArguments() { + HashMap transformationArguments = new HashMap<>(); + String sqlQuery = "SELECT * FROM data_stream"; + transformationArguments.put("sqlQuery", sqlQuery); + transformationArguments.put("tableName", "booking"); + String[] columnNames = {"order_number", "service_type", "status"}; + String schema = "order_number,service_type,status"; + + when(streamTableEnvironment.sqlQuery(sqlQuery)).thenReturn(table); + when(table.getSchema()).thenReturn(tableSchema); + when(tableSchema.getFieldNames()).thenReturn(columnNames); + when(streamTableEnvironment.toRetractStream(table, Row.class)).thenReturn(retractStream); + when(retractStream.filter(any())).thenReturn(filteredRetractStream); + when(filteredRetractStream.map(any())).thenReturn(outputStream); + SQLTransformer sqlTransformer = new SQLTransformerStub(transformationArguments, columnNames); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); + StreamInfo outputStreamInfo = sqlTransformer.transform(inputStreamInfo); + + verify(streamTableEnvironment, times(1)).registerDataStream("booking", inputStream, schema); + verify(streamTableEnvironment, times(1)).sqlQuery(sqlQuery); + assertEquals(outputStream, outputStreamInfo.getDataStream()); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionIfSqlNotProvided() { + HashMap transformationArguments = new HashMap<>(); + String[] columnNames = {"order_number", "service_type", "status"}; + SQLTransformer sqlTransformer = new SQLTransformerStub(transformationArguments, columnNames); + StreamInfo inputStreamInfo = new StreamInfo(inputStream, columnNames); + sqlTransformer.transform(inputStreamInfo); + } + + class SQLTransformerStub extends SQLTransformer { + + SQLTransformerStub(Map transformationArguments, String[] columnNames) { + super(transformationArguments, columnNames, configuration); + } + + @Override + protected StreamTableEnvironment getStreamTableEnvironment(StreamExecutionEnvironment executionEnvironment) { + return streamTableEnvironment; + } + } + +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/aggregate/DistinctCountTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/aggregate/DistinctCountTest.java new file mode 100644 index 000000000..92adb8cbc --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/aggregate/DistinctCountTest.java @@ -0,0 +1,73 @@ +package io.odpf.dagger.functions.udfs.aggregate; + +import io.odpf.dagger.functions.udfs.aggregate.accumulator.distinctcount.DistinctCountAccumulator; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.table.functions.FunctionContext; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class DistinctCountTest { + + @Mock + private FunctionContext functionContext; + + @Mock + private MetricGroup metricGroup; + + @Before + public void setup() { + initMocks(this); + when(functionContext.getMetricGroup()).thenReturn(metricGroup); + when(metricGroup.addGroup("udf", "DistinctCount")).thenReturn(metricGroup); + } + + @Test + public void shouldNotAddItemIfItAlreadyExistsInDistinctItems() { + DistinctCountAccumulator distinctCountAccumulator = new DistinctCountAccumulator(); + DistinctCount distinctCount = new DistinctCount(); + distinctCount.accumulate(distinctCountAccumulator, "1234"); + distinctCount.accumulate(distinctCountAccumulator, "1234"); + distinctCount.accumulate(distinctCountAccumulator, "1233"); + assertEquals(new Integer(2), distinctCount.getValue(distinctCountAccumulator)); + } + + @Test + public void shouldNotAddNull() { + DistinctCountAccumulator distinctCountAccumulator = new DistinctCountAccumulator(); + DistinctCount distinctCount = new DistinctCount(); + distinctCount.accumulate(distinctCountAccumulator, null); + assertEquals(new Integer(0), distinctCount.getValue(distinctCountAccumulator)); + } + + @Test + public void shouldNotHoldState() { + DistinctCount distinctCount = new DistinctCount(); + DistinctCountAccumulator acc1 = distinctCount.createAccumulator(); + DistinctCountAccumulator acc2 = distinctCount.createAccumulator(); + + distinctCount.accumulate(acc1, "111"); + distinctCount.accumulate(acc1, "222"); + distinctCount.accumulate(acc1, "222"); + distinctCount.accumulate(acc1, "333"); + + distinctCount.accumulate(acc2, "444"); + distinctCount.accumulate(acc2, "555"); + + assertEquals(new Integer(3), distinctCount.getValue(acc1)); + assertEquals(new Integer(2), distinctCount.getValue(acc2)); + } + + @Test + public void shouldRegisterGauge() throws Exception { + DistinctCount distinctCount = new DistinctCount(); + distinctCount.open(functionContext); + verify(metricGroup, times(1)).gauge(any(String.class), any(Gauge.class)); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/aggregate/accumulator/distinctcount/DistinctCountAccumulatorTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/aggregate/accumulator/distinctcount/DistinctCountAccumulatorTest.java new file mode 100644 index 000000000..1505b0ef3 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/aggregate/accumulator/distinctcount/DistinctCountAccumulatorTest.java @@ -0,0 +1,39 @@ +package io.odpf.dagger.functions.udfs.aggregate.accumulator.distinctcount; + +import org.junit.Test; + +import java.io.*; + +import static org.junit.Assert.assertEquals; + +public class DistinctCountAccumulatorTest { + + @Test + public void shouldGiveDistinctCount() { + DistinctCountAccumulator accumulator = new DistinctCountAccumulator(); + accumulator.add("First"); + accumulator.add("Second"); + accumulator.add("First"); + accumulator.add("Third"); + accumulator.add("Second"); + + assertEquals(accumulator.count(), 3); + } + + @Test + public void shouldBeSerializable() throws IOException, ClassNotFoundException { + DistinctCountAccumulator accumulator = new DistinctCountAccumulator(); + accumulator.add("First"); + accumulator.add("Second"); + accumulator.add("First"); + + ByteArrayOutputStream serializedAccumulatorStream = new ByteArrayOutputStream(); + new ObjectOutputStream(serializedAccumulatorStream).writeObject(accumulator); + + ObjectInputStream deserializedAccStream = new ObjectInputStream(new ByteArrayInputStream(serializedAccumulatorStream.toByteArray())); + + DistinctCountAccumulator deserializedAccumulator = (DistinctCountAccumulator) deserializedAccStream.readObject(); + + assertEquals(deserializedAccumulator.count(), accumulator.count()); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/factories/FunctionFactoryTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/factories/FunctionFactoryTest.java new file mode 100644 index 000000000..5714f490a --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/factories/FunctionFactoryTest.java @@ -0,0 +1,50 @@ +package io.odpf.dagger.functions.udfs.factories; + +import io.odpf.dagger.common.udfs.AggregateUdf; +import io.odpf.dagger.common.udfs.ScalarUdf; +import io.odpf.dagger.common.udfs.TableUdf; +import io.odpf.dagger.functions.udfs.aggregate.DistinctCount; +import io.odpf.dagger.functions.udfs.scalar.EndOfMonth; +import io.odpf.dagger.functions.udfs.table.HistogramBucket; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.util.HashSet; + +import static org.mockito.MockitoAnnotations.initMocks; + +public class FunctionFactoryTest { + + @Mock + private StreamTableEnvironment streamTableEnvironment; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldReturnScalarUdfs() { + FunctionFactory functionFactory = new FunctionFactory(streamTableEnvironment); + HashSet scalarUdfs = functionFactory.getScalarUdfs(); + Assert.assertTrue(scalarUdfs.stream().anyMatch(scalarUdf -> scalarUdf.getClass() == EndOfMonth.class)); + } + + @Test + public void shouldReturnTableUdfs() { + FunctionFactory functionFactory = new FunctionFactory(streamTableEnvironment); + HashSet tableUdfs = functionFactory.getTableUdfs(); + Assert.assertTrue(tableUdfs.stream().anyMatch(tableUdf -> tableUdf.getClass() == HistogramBucket.class)); + } + + @Test + public void shouldReturnAggregateUdfs() { + FunctionFactory functionFactory = new FunctionFactory(streamTableEnvironment); + HashSet aggregateUdfs = functionFactory.getAggregateUdfs(); + Assert.assertTrue(aggregateUdfs.stream().anyMatch(aggregateUdf -> aggregateUdf.getClass() == DistinctCount.class)); + } + +} \ No newline at end of file diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/scalar/EndOfMonthTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/scalar/EndOfMonthTest.java new file mode 100644 index 000000000..461fb0e37 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/scalar/EndOfMonthTest.java @@ -0,0 +1,54 @@ +package io.odpf.dagger.functions.udfs.scalar; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.table.functions.FunctionContext; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class EndOfMonthTest { + + private EndOfMonth endOfMonth = new EndOfMonth(); + + @Mock + private MetricGroup metricGroup; + + @Mock + private FunctionContext functionContext; + + @Before + public void setup() { + initMocks(this); + when(functionContext.getMetricGroup()).thenReturn(metricGroup); + when(metricGroup.addGroup("udf", "EndOfMonth")).thenReturn(metricGroup); + } + + @Test + public void shouldReturnLastDateOfMonthForGivenTimestampInIST() { + long startOfMonthTimestamp = endOfMonth.eval(Long.parseLong("1562224758"), "Asia/Kolkata"); + assertEquals(Long.parseLong("1564597799"), startOfMonthTimestamp); + } + + @Test + public void shouldReturnLastDateOfMonthForGivenTimestampInUTC() { + long startOfMonthTimestamp = endOfMonth.eval(Long.parseLong("1562224758"), "UTC"); + assertEquals(Long.parseLong("1564617599"), startOfMonthTimestamp); + } + + @Test + public void shouldReturnLastDateOfMonthForGivenTimestampInWIB() { + long startOfMonthTimestamp = endOfMonth.eval(Long.parseLong("1562224758"), "Asia/Jakarta"); + assertEquals(Long.parseLong("1564592399"), startOfMonthTimestamp); + } + + @Test + public void shouldRegisterGauge() throws Exception { + endOfMonth.open(functionContext); + verify(metricGroup, times(1)).gauge(any(String.class), any(Gauge.class)); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/table/HistogramBucketTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/table/HistogramBucketTest.java new file mode 100644 index 000000000..8a7fbba08 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/table/HistogramBucketTest.java @@ -0,0 +1,56 @@ +package io.odpf.dagger.functions.udfs.table; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.util.Collector; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class HistogramBucketTest { + + @Mock + private FunctionContext functionContext; + + @Mock + private MetricGroup metricGroup; + + @Before + public void setup() { + initMocks(this); + when(functionContext.getMetricGroup()).thenReturn(metricGroup); + when(metricGroup.addGroup("udf", "HistogramBucket")).thenReturn(metricGroup); + } + + @Test + public void testCollectCumulativeBucketsForAValue() { + HistogramBucket bucketUDF = new HistogramBucket(); + Collector> collector = mock(Collector.class); + bucketUDF.setCollector(collector); + bucketUDF.eval(10.0, "1,2,5,10,20"); + verify(collector, times(1)).collect(new Tuple1<>("+Inf")); + verify(collector, times(1)).collect(new Tuple1<>("10")); + verify(collector, times(1)).collect(new Tuple1<>("20")); + } + + @Test + public void testCollectInfBucketForAValue() { + HistogramBucket bucketUDF = new HistogramBucket(); + Collector> collector = mock(Collector.class); + bucketUDF.setCollector(collector); + bucketUDF.eval(30.0, "1,2,5,10,20"); + verify(collector, times(1)).collect(new Tuple1<>("+Inf")); + } + + @Test + public void shouldRegisterGauge() throws Exception { + HistogramBucket bucketUDF = new HistogramBucket(); + bucketUDF.open(functionContext); + verify(metricGroup, times(1)).gauge(any(String.class), any(Gauge.class)); + } +}