diff --git a/dagger-core/env/local.properties b/dagger-core/env/local.properties index 6127bbae2..8d4372bf8 100644 --- a/dagger-core/env/local.properties +++ b/dagger-core/env/local.properties @@ -26,3 +26,7 @@ METRIC_TELEMETRY_ENABLE=true # == Others == FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.FunctionFactory FLINK_ROWTIME_ATTRIBUTE_NAME=rowtime + +# == Python Udf == +PYTHON_UDF_ENABLE=false +PYTHON_UDF_CONFIG={"PYTHON_FILES":"/path/to/files.zip", "PYTHON_REQUIREMENTS": "requirements.txt", "PYTHON_FN_EXECUTION_BUNDLE_SIZE": "1000"} \ No newline at end of file diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java index a2ed9b426..969a0bb21 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java @@ -19,6 +19,8 @@ import io.odpf.dagger.core.source.Stream; import io.odpf.dagger.core.source.StreamsFactory; import io.odpf.dagger.core.utils.Constants; +import io.odpf.dagger.functions.udfs.python.PythonUdfConfig; +import io.odpf.dagger.functions.udfs.python.PythonUdfManager; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -28,12 +30,15 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.util.List; import static io.odpf.dagger.core.utils.Constants.*; +import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_DEFAULT; +import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_KEY; import static org.apache.flink.table.api.Expressions.$; /** @@ -138,7 +143,13 @@ private ApiExpression[] getApiExpressions(StreamInfo streamInfo) { * * @return the stream manager */ - public StreamManager registerFunctions() { + public StreamManager registerFunctions() throws IOException { + if (configuration.getBoolean(PYTHON_UDF_ENABLE_KEY, PYTHON_UDF_ENABLE_DEFAULT)) { + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + } + String[] functionFactoryClasses = configuration .getString(Constants.FUNCTION_FACTORY_CLASSES_KEY, Constants.FUNCTION_FACTORY_CLASSES_DEFAULT) .split(","); diff --git a/dagger-functions/build.gradle b/dagger-functions/build.gradle index 476331d14..4dbe083c2 100644 --- a/dagger-functions/build.gradle +++ b/dagger-functions/build.gradle @@ -44,6 +44,10 @@ sourceSets { } dependencies { + + compileOnly 'org.projectlombok:lombok:1.18.8' + annotationProcessor 'org.projectlombok:lombok:1.18.8' + compileOnly project(path: ':dagger-common', configuration: 'minimalCommonJar') compileOnly project(path: ':dagger-common', configuration: 'dependenciesCommonJar') compileOnly 'org.apache.flink:flink-streaming-java_2.11:' + flinkVersion @@ -56,6 +60,7 @@ dependencies { compileOnly group: 'org.apache.flink', name: 'flink-metrics-dropwizard', version: flinkVersion dependenciesFunctionsJar 'com.github.davidmoten:geo:0.7.6' + dependenciesFunctionsJar 'org.apache.flink:flink-python_2.11:' + flinkVersion dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1' dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1' dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '1.67.0' diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/common/Constants.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/common/Constants.java index 485bcd275..fff13666c 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/common/Constants.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/common/Constants.java @@ -7,4 +7,17 @@ public class Constants { public static final String UDF_DART_GCS_PROJECT_ID_DEFAULT = ""; public static final String UDF_DART_GCS_BUCKET_ID_KEY = "UDF_DART_GCS_BUCKET_ID"; public static final String UDF_DART_GCS_BUCKET_ID_DEFAULT = ""; + + public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG"; + public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE"; + public static final boolean PYTHON_UDF_ENABLE_DEFAULT = false; + public static final String PYTHON_FILES_KEY = "PYTHON_FILES"; + public static final String PYTHON_REQUIREMENTS_KEY = "PYTHON_REQUIREMENTS"; + public static final String PYTHON_ARCHIVES_KEY = "PYTHON_ARCHIVES"; + public static final String PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY = "PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE"; + public static final Integer PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT = 10000; + public static final String PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY = "PYTHON_FN_EXECUTION_BUNDLE_SIZE"; + public static final Integer PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT = 100000; + public static final String PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY = "PYTHON_FN_EXECUTION_BUNDLE_TIME"; + public static final long PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT = 1000; } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesFormatException.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesFormatException.java new file mode 100644 index 000000000..02771fda6 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesFormatException.java @@ -0,0 +1,17 @@ +package io.odpf.dagger.functions.exceptions; + +/** + * The type Python files format exception. + */ +public class PythonFilesFormatException extends RuntimeException { + + /** + * Instantiates a new Python files format exception. + * + * @param message the message + */ + public PythonFilesFormatException(String message) { + super(message); + } + +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesNotFoundException.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesNotFoundException.java new file mode 100644 index 000000000..ca321ced7 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesNotFoundException.java @@ -0,0 +1,17 @@ +package io.odpf.dagger.functions.exceptions; + +/** + * The type Python files not found exception. + */ +public class PythonFilesNotFoundException extends RuntimeException { + + /** + * Instantiates a new Python files not found exception. + * + * @param message the message + */ + public PythonFilesNotFoundException(String message) { + super(message); + } + +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java new file mode 100644 index 000000000..3537a6bb7 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java @@ -0,0 +1,64 @@ +package io.odpf.dagger.functions.udfs.python; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; +import io.odpf.dagger.common.configuration.Configuration; +import lombok.Getter; + +import static io.odpf.dagger.functions.common.Constants.*; + +public class PythonUdfConfig { + private static final Gson GSON = new GsonBuilder() + .enableComplexMapKeySerialization() + .setPrettyPrinting() + .create(); + + @SerializedName(PYTHON_FILES_KEY) + @Getter + private String pythonFiles; + + @SerializedName(PYTHON_REQUIREMENTS_KEY) + @Getter + private String pythonRequirements; + + @SerializedName(PYTHON_ARCHIVES_KEY) + @Getter + private String pythonArchives; + + @SerializedName(PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY) + private Integer pythonArrowBatchSize; + + @SerializedName(PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY) + private Integer pythonBundleSize; + + @SerializedName(PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY) + private Long pythonBundleTime; + + public int getPythonArrowBatchSize() { + if (pythonArrowBatchSize == null) { + return PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT; + } + return pythonArrowBatchSize; + } + + public int getPythonBundleSize() { + if (pythonBundleSize == null) { + return PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT; + } + return pythonBundleSize; + } + + public long getPythonBundleTime() { + if (pythonBundleTime == null) { + return PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT; + } + return pythonBundleTime; + } + + public static PythonUdfConfig parse(Configuration configuration) { + String jsonString = configuration.getString(PYTHON_UDF_CONFIG, ""); + + return GSON.fromJson(jsonString, PythonUdfConfig.class); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java new file mode 100644 index 000000000..d1f56d9b5 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java @@ -0,0 +1,68 @@ +package io.odpf.dagger.functions.udfs.python; + +import io.odpf.dagger.functions.exceptions.PythonFilesFormatException; +import io.odpf.dagger.functions.exceptions.PythonFilesNotFoundException; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +import java.io.IOException; +import java.util.Enumeration; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +public class PythonUdfManager { + + private StreamTableEnvironment tableEnvironment; + private PythonUdfConfig pythonUdfConfig; + + public PythonUdfManager(StreamTableEnvironment tableEnvironment, PythonUdfConfig pythonUdfConfig) { + this.tableEnvironment = tableEnvironment; + this.pythonUdfConfig = pythonUdfConfig; + } + + public void registerPythonFunctions() throws IOException { + + String inputFiles = pythonUdfConfig.getPythonFiles(); + String[] pythonFilesSource; + if (inputFiles != null) { + registerPythonConfig(); + pythonFilesSource = inputFiles.split(","); + } else { + throw new PythonFilesNotFoundException("Python files not found"); + } + + for (String pythonFile : pythonFilesSource) { + if (pythonFile.contains(".zip")) { + ZipFile zf = new ZipFile(pythonFile); + for (Enumeration e = zf.entries(); e.hasMoreElements();) { + ZipEntry entry = (ZipEntry) e.nextElement(); + String name = entry.getName(); + if (name.endsWith(".py")) { + name = name.replace(".py", "").replace("/", "."); + String udfName = name.substring(name.lastIndexOf(".") + 1); + String query = "CREATE TEMPORARY FUNCTION " + udfName.toUpperCase() + " AS '" + name + "." + udfName + "' LANGUAGE PYTHON"; + tableEnvironment.executeSql(query); + } + } + } else if (pythonFile.contains(".py")) { + String name = pythonFile.substring(pythonFile.lastIndexOf('/') + 1).replace(".py", ""); + String query = "CREATE TEMPORARY FUNCTION " + name.toUpperCase() + " AS '" + name + "." + name + "' LANGUAGE PYTHON"; + tableEnvironment.executeSql(query); + } else { + throw new PythonFilesFormatException("Python files should be in .py or .zip format"); + } + } + } + + private void registerPythonConfig() { + if (pythonUdfConfig.getPythonRequirements() != null) { + tableEnvironment.getConfig().getConfiguration().setString("python.requirements", pythonUdfConfig.getPythonRequirements()); + } + if (pythonUdfConfig.getPythonArchives() != null) { + tableEnvironment.getConfig().getConfiguration().setString("python.archives", pythonUdfConfig.getPythonArchives()); + } + tableEnvironment.getConfig().getConfiguration().setString("python.files", pythonUdfConfig.getPythonFiles()); + tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.arrow.batch.size", pythonUdfConfig.getPythonArrowBatchSize()); + tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.bundle.size", pythonUdfConfig.getPythonBundleSize()); + tableEnvironment.getConfig().getConfiguration().setLong("python.fn-execution.bundle.time", pythonUdfConfig.getPythonBundleTime()); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java new file mode 100644 index 000000000..ae2679e7c --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java @@ -0,0 +1,62 @@ +package io.odpf.dagger.functions.udfs.python; + +import io.odpf.dagger.common.configuration.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static io.odpf.dagger.functions.common.Constants.*; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class PythonUdfConfigTest { + + @Mock + private Configuration configuration; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldParseConfig() { + String pythonJsonConfig = "{ \"PYTHON_FILES\": \"/path/to/function.zip\", \"PYTHON_ARCHIVES\": \"/path/to/file.txt\", \"PYTHON_REQUIREMENTS\": \"requirements.txt\", \"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE\": \"10000\", \"PYTHON_FN_EXECUTION_BUNDLE_SIZE\": \"100000\", \"PYTHON_FN_EXECUTION_BUNDLE_TIME\": \"1000\" }"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertNotNull(pythonUdfConfig); + Assert.assertEquals(pythonUdfConfig.getPythonFiles(), "/path/to/function.zip"); + Assert.assertEquals(pythonUdfConfig.getPythonArchives(), "/path/to/file.txt"); + Assert.assertEquals(pythonUdfConfig.getPythonRequirements(), "requirements.txt"); + Assert.assertEquals(pythonUdfConfig.getPythonArrowBatchSize(), 10000); + Assert.assertEquals(pythonUdfConfig.getPythonBundleSize(), 100000); + Assert.assertEquals(pythonUdfConfig.getPythonBundleTime(), 1000); + } + + @Test + public void shouldUseDefaultValueIfConfigIsNotGiven() { + String pythonJsonConfig = "{ \"PYTHON_FILES\": \"/path/to/function.zip\", \"PYTHON_ARCHIVES\": \"/path/to/file.txt\", \"PYTHON_REQUIREMENTS\": \"requirements.txt\" }"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertEquals(pythonUdfConfig.getPythonArrowBatchSize(), 10000); + Assert.assertEquals(pythonUdfConfig.getPythonBundleSize(), 100000); + Assert.assertEquals(pythonUdfConfig.getPythonBundleTime(), 1000); + } + + @Test + public void shouldReturnNullIfPythonFilesConfigIsNotGiven() { + String pythonJsonConfig = "{\"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE\": \"10000\", \"PYTHON_FN_EXECUTION_BUNDLE_SIZE\": \"100000\", \"PYTHON_FN_EXECUTION_BUNDLE_TIME\": \"1000\"}"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertNull(pythonUdfConfig.getPythonFiles()); + Assert.assertNull(pythonUdfConfig.getPythonArchives()); + Assert.assertNull(pythonUdfConfig.getPythonRequirements()); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java new file mode 100644 index 000000000..c66256b90 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java @@ -0,0 +1,183 @@ +package io.odpf.dagger.functions.udfs.python; + +import io.odpf.dagger.functions.exceptions.PythonFilesFormatException; +import io.odpf.dagger.functions.exceptions.PythonFilesNotFoundException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; + +import java.io.File; +import java.io.IOException; +import java.util.Objects; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import static org.mockito.MockitoAnnotations.initMocks; + +public class PythonUdfManagerTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Mock + private StreamTableEnvironment tableEnvironment; + + @Mock + private PythonUdfConfig pythonUdfConfig; + + @Mock + private TableConfig tableConfig; + + @Mock + private Configuration configuration; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldRegisterPythonUdfConfig() throws IOException { + String pathFile = getPath("python_udf.zip"); + String sqlRegisterFirstUdf = "CREATE TEMPORARY FUNCTION ADD AS 'python_udf.scalar.add.add' LANGUAGE PYTHON"; + String sqlRegisterSecondUdf = "CREATE TEMPORARY FUNCTION SUBSTRACT AS 'python_udf.vectorized.substract.substract' LANGUAGE PYTHON"; + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(pathFile); + when(pythonUdfConfig.getPythonArchives()).thenReturn("/path/to/file.txt"); + when(pythonUdfConfig.getPythonRequirements()).thenReturn("requirements.txt"); + when(pythonUdfConfig.getPythonArrowBatchSize()).thenReturn(10000); + when(pythonUdfConfig.getPythonBundleSize()).thenReturn(100000); + when(pythonUdfConfig.getPythonBundleTime()).thenReturn(1000L); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", pathFile); + verify(configuration, times(1)).setString("python.archives", "/path/to/file.txt"); + verify(configuration, times(1)).setString("python.requirements", "requirements.txt"); + verify(configuration, times(1)).setInteger("python.fn-execution.arrow.batch.size", 10000); + verify(configuration, times(1)).setInteger("python.fn-execution.bundle.size", 100000); + verify(configuration, times(1)).setLong("python.fn-execution.bundle.time", 1000); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterFirstUdf); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterSecondUdf); + } + + @Test + public void shouldNotRegisterConfigIfNotSet() throws IOException { + String pathFile = getPath("python_udf.zip"); + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(pathFile); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", pathFile); + verify(configuration, times(0)).setString("python.archives", "/path/to/file.txt"); + verify(configuration, times(0)).setString("python.requirements", "requirements.txt"); + } + + @Test + public void shouldRegisterPythonUdfFromPyFile() throws IOException { + String pathFile = getPath("test_udf.py"); + String sqlRegisterUdf = "CREATE TEMPORARY FUNCTION TEST_UDF AS 'test_udf.test_udf' LANGUAGE PYTHON"; + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(pathFile); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", pathFile); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterUdf); + } + + @Test + public void shouldOnlyExecutePyFormatInsideZipFile() throws IOException { + String pathFile = getPath("python_udf.zip"); + + String sqlRegisterFirstUdf = "CREATE TEMPORARY FUNCTION MULTIPLY AS 'python_udf.scalar.multiply.multiply' LANGUAGE PYTHON"; + String sqlRegisterSecondUdf = "CREATE TEMPORARY FUNCTION ADD AS 'python_udf.scalar.add.add' LANGUAGE PYTHON"; + String sqlRegisterThirdUdf = "CREATE TEMPORARY FUNCTION SUBSTRACT AS 'python_udf.vectorized.substract.substract' LANGUAGE PYTHON"; + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(pathFile); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", pathFile); + verify(tableEnvironment, times(0)).executeSql(sqlRegisterFirstUdf); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterSecondUdf); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterThirdUdf); + } + + @Test + public void shouldRegisterPythonUdfFromPyAndZipFile() throws IOException { + String zipPathFile = getPath("python_udf.zip"); + String pyPathFile = getPath("test_udf.py"); + + String sqlRegisterFirstUdf = "CREATE TEMPORARY FUNCTION ADD AS 'python_udf.scalar.add.add' LANGUAGE PYTHON"; + String sqlRegisterSecondUdf = "CREATE TEMPORARY FUNCTION SUBSTRACT AS 'python_udf.vectorized.substract.substract' LANGUAGE PYTHON"; + String sqlRegisterThirdUdf = "CREATE TEMPORARY FUNCTION TEST_UDF AS 'test_udf.test_udf' LANGUAGE PYTHON"; + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(zipPathFile + "," + pyPathFile); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", zipPathFile + "," + pyPathFile); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterFirstUdf); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterSecondUdf); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterThirdUdf); + } + + @Test + public void shouldThrowExceptionIfPythonFilesNotInZipOrPyFormat() throws IOException { + expectedEx.expect(PythonFilesFormatException.class); + expectedEx.expectMessage("Python files should be in .py or .zip format"); + + File file = File.createTempFile("test_file", ".txt"); + file.deleteOnExit(); + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn("test_file.txt"); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + } + + @Test + public void shouldThrowExceptionIfPythonFilesNotExist() throws IOException { + expectedEx.expect(PythonFilesNotFoundException.class); + expectedEx.expectMessage("Python files not found"); + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + } + + private String getPath(String filename) { + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(Objects.requireNonNull(classLoader.getResource(filename)).getFile()); + + return file.getAbsolutePath(); + } +} diff --git a/dagger-functions/src/test/resources/python_udf.zip b/dagger-functions/src/test/resources/python_udf.zip new file mode 100644 index 000000000..d55b8d1e2 Binary files /dev/null and b/dagger-functions/src/test/resources/python_udf.zip differ diff --git a/dagger-functions/src/test/resources/test_udf.py b/dagger-functions/src/test/resources/test_udf.py new file mode 100644 index 000000000..a77f7cb85 --- /dev/null +++ b/dagger-functions/src/test/resources/test_udf.py @@ -0,0 +1,7 @@ +from pyflink.table import DataTypes +from pyflink.table.udf import udf + + +@udf(result_type=DataTypes.STRING()) +def test_function(text: str): + return text + "_added_text" diff --git a/version.txt b/version.txt index 53a75d673..b0032849c 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.2.6 +0.2.7