Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dagger-core/env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ METRIC_TELEMETRY_ENABLE=true
# == Others ==
FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.FunctionFactory
FLINK_ROWTIME_ATTRIBUTE_NAME=rowtime

# == Python Udf ==
PYTHON_UDF_ENABLE=false
PYTHON_UDF_CONFIG={"PYTHON_FILES":"/path/to/files.zip", "PYTHON_REQUIREMENTS": "requirements.txt", "PYTHON_FN_EXECUTION_BUNDLE_SIZE": "1000"}
13 changes: 12 additions & 1 deletion dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.odpf.dagger.core.source.Stream;
import io.odpf.dagger.core.source.StreamsFactory;
import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.functions.udfs.python.PythonUdfConfig;
import io.odpf.dagger.functions.udfs.python.PythonUdfManager;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -28,12 +30,15 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.List;

import static io.odpf.dagger.core.utils.Constants.*;
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_DEFAULT;
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_KEY;
import static org.apache.flink.table.api.Expressions.$;

/**
Expand Down Expand Up @@ -138,7 +143,13 @@ private ApiExpression[] getApiExpressions(StreamInfo streamInfo) {
*
* @return the stream manager
*/
public StreamManager registerFunctions() {
public StreamManager registerFunctions() throws IOException {
if (configuration.getBoolean(PYTHON_UDF_ENABLE_KEY, PYTHON_UDF_ENABLE_DEFAULT)) {
PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration);
PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig);
pythonUdfManager.registerPythonFunctions();
}

String[] functionFactoryClasses = configuration
.getString(Constants.FUNCTION_FACTORY_CLASSES_KEY, Constants.FUNCTION_FACTORY_CLASSES_DEFAULT)
.split(",");
Expand Down
5 changes: 5 additions & 0 deletions dagger-functions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ sourceSets {
}

dependencies {

compileOnly 'org.projectlombok:lombok:1.18.8'
annotationProcessor 'org.projectlombok:lombok:1.18.8'

compileOnly project(path: ':dagger-common', configuration: 'minimalCommonJar')
compileOnly project(path: ':dagger-common', configuration: 'dependenciesCommonJar')
compileOnly 'org.apache.flink:flink-streaming-java_2.11:' + flinkVersion
Expand All @@ -56,6 +60,7 @@ dependencies {
compileOnly group: 'org.apache.flink', name: 'flink-metrics-dropwizard', version: flinkVersion

dependenciesFunctionsJar 'com.github.davidmoten:geo:0.7.6'
dependenciesFunctionsJar 'org.apache.flink:flink-python_2.11:' + flinkVersion
dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1'
dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1'
dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '1.67.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,17 @@ public class Constants {
public static final String UDF_DART_GCS_PROJECT_ID_DEFAULT = "";
public static final String UDF_DART_GCS_BUCKET_ID_KEY = "UDF_DART_GCS_BUCKET_ID";
public static final String UDF_DART_GCS_BUCKET_ID_DEFAULT = "";

public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG";
public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE";
public static final boolean PYTHON_UDF_ENABLE_DEFAULT = false;
public static final String PYTHON_FILES_KEY = "PYTHON_FILES";
public static final String PYTHON_REQUIREMENTS_KEY = "PYTHON_REQUIREMENTS";
public static final String PYTHON_ARCHIVES_KEY = "PYTHON_ARCHIVES";
public static final String PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY = "PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE";
public static final Integer PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT = 10000;
public static final String PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY = "PYTHON_FN_EXECUTION_BUNDLE_SIZE";
public static final Integer PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT = 100000;
public static final String PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY = "PYTHON_FN_EXECUTION_BUNDLE_TIME";
public static final long PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT = 1000;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.odpf.dagger.functions.exceptions;

/**
* The type Python files format exception.
*/
public class PythonFilesFormatException extends RuntimeException {

/**
* Instantiates a new Python files format exception.
*
* @param message the message
*/
public PythonFilesFormatException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.odpf.dagger.functions.exceptions;

/**
* The type Python files not found exception.
*/
public class PythonFilesNotFoundException extends RuntimeException {

/**
* Instantiates a new Python files not found exception.
*
* @param message the message
*/
public PythonFilesNotFoundException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.odpf.dagger.functions.udfs.python;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import io.odpf.dagger.common.configuration.Configuration;
import lombok.Getter;

import static io.odpf.dagger.functions.common.Constants.*;

public class PythonUdfConfig {
private static final Gson GSON = new GsonBuilder()
.enableComplexMapKeySerialization()
.setPrettyPrinting()
.create();

@SerializedName(PYTHON_FILES_KEY)
@Getter
private String pythonFiles;

@SerializedName(PYTHON_REQUIREMENTS_KEY)
@Getter
private String pythonRequirements;

@SerializedName(PYTHON_ARCHIVES_KEY)
@Getter
private String pythonArchives;

@SerializedName(PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY)
private Integer pythonArrowBatchSize;

@SerializedName(PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY)
private Integer pythonBundleSize;

@SerializedName(PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY)
private Long pythonBundleTime;

public int getPythonArrowBatchSize() {
if (pythonArrowBatchSize == null) {
return PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT;
}
return pythonArrowBatchSize;
}

public int getPythonBundleSize() {
if (pythonBundleSize == null) {
return PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT;
}
return pythonBundleSize;
}

public long getPythonBundleTime() {
if (pythonBundleTime == null) {
return PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT;
}
return pythonBundleTime;
}

public static PythonUdfConfig parse(Configuration configuration) {
String jsonString = configuration.getString(PYTHON_UDF_CONFIG, "");

return GSON.fromJson(jsonString, PythonUdfConfig.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.odpf.dagger.functions.udfs.python;

import io.odpf.dagger.functions.exceptions.PythonFilesFormatException;
import io.odpf.dagger.functions.exceptions.PythonFilesNotFoundException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.io.IOException;
import java.util.Enumeration;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

public class PythonUdfManager {

private StreamTableEnvironment tableEnvironment;
private PythonUdfConfig pythonUdfConfig;

public PythonUdfManager(StreamTableEnvironment tableEnvironment, PythonUdfConfig pythonUdfConfig) {
this.tableEnvironment = tableEnvironment;
this.pythonUdfConfig = pythonUdfConfig;
}

public void registerPythonFunctions() throws IOException {

String inputFiles = pythonUdfConfig.getPythonFiles();
String[] pythonFilesSource;
if (inputFiles != null) {
registerPythonConfig();
pythonFilesSource = inputFiles.split(",");
} else {
throw new PythonFilesNotFoundException("Python files not found");
}

for (String pythonFile : pythonFilesSource) {
if (pythonFile.contains(".zip")) {
ZipFile zf = new ZipFile(pythonFile);
for (Enumeration e = zf.entries(); e.hasMoreElements();) {
ZipEntry entry = (ZipEntry) e.nextElement();
String name = entry.getName();
if (name.endsWith(".py")) {
name = name.replace(".py", "").replace("/", ".");
String udfName = name.substring(name.lastIndexOf(".") + 1);
String query = "CREATE TEMPORARY FUNCTION " + udfName.toUpperCase() + " AS '" + name + "." + udfName + "' LANGUAGE PYTHON";
tableEnvironment.executeSql(query);
}
}
} else if (pythonFile.contains(".py")) {
String name = pythonFile.substring(pythonFile.lastIndexOf('/') + 1).replace(".py", "");
String query = "CREATE TEMPORARY FUNCTION " + name.toUpperCase() + " AS '" + name + "." + name + "' LANGUAGE PYTHON";
tableEnvironment.executeSql(query);
} else {
throw new PythonFilesFormatException("Python files should be in .py or .zip format");
}
}
}

private void registerPythonConfig() {
if (pythonUdfConfig.getPythonRequirements() != null) {
tableEnvironment.getConfig().getConfiguration().setString("python.requirements", pythonUdfConfig.getPythonRequirements());
}
if (pythonUdfConfig.getPythonArchives() != null) {
tableEnvironment.getConfig().getConfiguration().setString("python.archives", pythonUdfConfig.getPythonArchives());
}
tableEnvironment.getConfig().getConfiguration().setString("python.files", pythonUdfConfig.getPythonFiles());
tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.arrow.batch.size", pythonUdfConfig.getPythonArrowBatchSize());
tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.bundle.size", pythonUdfConfig.getPythonBundleSize());
tableEnvironment.getConfig().getConfiguration().setLong("python.fn-execution.bundle.time", pythonUdfConfig.getPythonBundleTime());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.odpf.dagger.functions.udfs.python;

import io.odpf.dagger.common.configuration.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import static io.odpf.dagger.functions.common.Constants.*;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

public class PythonUdfConfigTest {

@Mock
private Configuration configuration;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldParseConfig() {
String pythonJsonConfig = "{ \"PYTHON_FILES\": \"/path/to/function.zip\", \"PYTHON_ARCHIVES\": \"/path/to/file.txt\", \"PYTHON_REQUIREMENTS\": \"requirements.txt\", \"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE\": \"10000\", \"PYTHON_FN_EXECUTION_BUNDLE_SIZE\": \"100000\", \"PYTHON_FN_EXECUTION_BUNDLE_TIME\": \"1000\" }";

when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig);
PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration);

Assert.assertNotNull(pythonUdfConfig);
Assert.assertEquals(pythonUdfConfig.getPythonFiles(), "/path/to/function.zip");
Assert.assertEquals(pythonUdfConfig.getPythonArchives(), "/path/to/file.txt");
Assert.assertEquals(pythonUdfConfig.getPythonRequirements(), "requirements.txt");
Assert.assertEquals(pythonUdfConfig.getPythonArrowBatchSize(), 10000);
Assert.assertEquals(pythonUdfConfig.getPythonBundleSize(), 100000);
Assert.assertEquals(pythonUdfConfig.getPythonBundleTime(), 1000);
}

@Test
public void shouldUseDefaultValueIfConfigIsNotGiven() {
String pythonJsonConfig = "{ \"PYTHON_FILES\": \"/path/to/function.zip\", \"PYTHON_ARCHIVES\": \"/path/to/file.txt\", \"PYTHON_REQUIREMENTS\": \"requirements.txt\" }";

when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig);
PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration);

Assert.assertEquals(pythonUdfConfig.getPythonArrowBatchSize(), 10000);
Assert.assertEquals(pythonUdfConfig.getPythonBundleSize(), 100000);
Assert.assertEquals(pythonUdfConfig.getPythonBundleTime(), 1000);
}

@Test
public void shouldReturnNullIfPythonFilesConfigIsNotGiven() {
String pythonJsonConfig = "{\"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE\": \"10000\", \"PYTHON_FN_EXECUTION_BUNDLE_SIZE\": \"100000\", \"PYTHON_FN_EXECUTION_BUNDLE_TIME\": \"1000\"}";

when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig);
PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration);

Assert.assertNull(pythonUdfConfig.getPythonFiles());
Assert.assertNull(pythonUdfConfig.getPythonArchives());
Assert.assertNull(pythonUdfConfig.getPythonRequirements());
}
}
Loading