diff --git a/.github/workflows/python_release.yml b/.github/workflows/python_release.yml index 4679a3e95..28ef5ac57 100644 --- a/.github/workflows/python_release.yml +++ b/.github/workflows/python_release.yml @@ -10,8 +10,10 @@ jobs: - uses: actions/checkout@v2 - name: Zip Python Udf run: | - zip -r python_udfs.zip dagger-py-functions/udfs -x "*/__init__.py" - zip -r dagger-py-functions.zip python_functions/requirements.txt python_functions/data python_udfs.zip + cd dagger-py-functions + zip -r python_udfs.zip udfs -x "*/__init__.py" + zip -jr data.zip data + zip -r dagger-py-functions.zip requirements.txt data.zip python_udfs.zip - name: Upload Release uses: ncipollo/release-action@v1 with: diff --git a/dagger-common/build.gradle b/dagger-common/build.gradle index 842714d5f..dd04e715d 100644 --- a/dagger-common/build.gradle +++ b/dagger-common/build.gradle @@ -59,6 +59,10 @@ dependencies { compileOnly group: 'org.apache.flink', name: 'flink-table-api-java-bridge_2.11', version: flinkVersion compileOnly group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: flinkVersion + dependenciesCommonJar ('org.apache.hadoop:hadoop-client:2.8.3') { + exclude module:"commons-cli" + } + dependenciesCommonJar 'com.google.cloud.bigdataoss:gcs-connector:1.9.0-hadoop2' dependenciesCommonJar 'org.apache.flink:flink-metrics-dropwizard:' + flinkVersion dependenciesCommonJar 'org.apache.flink:flink-json:' + flinkVersion dependenciesCommonJar 'com.jayway.jsonpath:json-path:2.4.0' diff --git a/dagger-functions/build.gradle b/dagger-functions/build.gradle index 4dbe083c2..6bd4278ba 100644 --- a/dagger-functions/build.gradle +++ b/dagger-functions/build.gradle @@ -64,7 +64,7 @@ dependencies { dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1' dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1' dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '1.67.0' - + testImplementation project(':dagger-common').sourceSets.test.output testImplementation group: 'junit', name: 'junit', version: '4.12' testImplementation 'org.mockito:mockito-core:2.0.99-beta' diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesEmptyException.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesEmptyException.java new file mode 100644 index 000000000..e849dc208 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesEmptyException.java @@ -0,0 +1,17 @@ +package io.odpf.dagger.functions.exceptions; + +/** + * The type Python files empty exception. + */ +public class PythonFilesEmptyException extends RuntimeException { + + /** + * Instantiates a new Python files empty exception. + * + * @param message the message + */ + public PythonFilesEmptyException(String message) { + super(message); + } + +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesNotFoundException.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesNotFoundException.java deleted file mode 100644 index ca321ced7..000000000 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesNotFoundException.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.odpf.dagger.functions.exceptions; - -/** - * The type Python files not found exception. - */ -public class PythonFilesNotFoundException extends RuntimeException { - - /** - * Instantiates a new Python files not found exception. - * - * @param message the message - */ - public PythonFilesNotFoundException(String message) { - super(message); - } - -} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java index 3537a6bb7..2f33f75c8 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java @@ -8,6 +8,9 @@ import static io.odpf.dagger.functions.common.Constants.*; +/** + * The type Python udf config. + */ public class PythonUdfConfig { private static final Gson GSON = new GsonBuilder() .enableComplexMapKeySerialization() @@ -15,7 +18,6 @@ public class PythonUdfConfig { .create(); @SerializedName(PYTHON_FILES_KEY) - @Getter private String pythonFiles; @SerializedName(PYTHON_REQUIREMENTS_KEY) @@ -23,7 +25,6 @@ public class PythonUdfConfig { private String pythonRequirements; @SerializedName(PYTHON_ARCHIVES_KEY) - @Getter private String pythonArchives; @SerializedName(PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY) @@ -35,6 +36,35 @@ public class PythonUdfConfig { @SerializedName(PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY) private Long pythonBundleTime; + /** + * Gets python files. + * + * @return the python files + */ + public String getPythonFiles() { + if (pythonFiles != null) { + return pythonFiles.replaceAll("\\s+", ""); + } + return null; + } + + /** + * Gets python archives. + * + * @return the python archives + */ + public String getPythonArchives() { + if (pythonArchives != null) { + return pythonArchives.replaceAll("\\s+", ""); + } + return null; + } + + /** + * Gets python arrow batch size. + * + * @return the python arrow batch size + */ public int getPythonArrowBatchSize() { if (pythonArrowBatchSize == null) { return PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT; @@ -42,6 +72,11 @@ public int getPythonArrowBatchSize() { return pythonArrowBatchSize; } + /** + * Gets python bundle size. + * + * @return the python bundle size + */ public int getPythonBundleSize() { if (pythonBundleSize == null) { return PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT; @@ -49,6 +84,11 @@ public int getPythonBundleSize() { return pythonBundleSize; } + /** + * Gets python bundle time. + * + * @return the python bundle time + */ public long getPythonBundleTime() { if (pythonBundleTime == null) { return PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT; @@ -56,6 +96,12 @@ public long getPythonBundleTime() { return pythonBundleTime; } + /** + * Parse python udf config. + * + * @param configuration the configuration + * @return the python udf config + */ public static PythonUdfConfig parse(Configuration configuration) { String jsonString = configuration.getString(PYTHON_UDF_CONFIG, ""); diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java index d1f56d9b5..59fb99d2a 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java @@ -1,55 +1,51 @@ package io.odpf.dagger.functions.udfs.python; -import io.odpf.dagger.functions.exceptions.PythonFilesFormatException; -import io.odpf.dagger.functions.exceptions.PythonFilesNotFoundException; +import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException; +import io.odpf.dagger.functions.udfs.python.file.type.FileType; +import io.odpf.dagger.functions.udfs.python.file.type.FileTypeFactory; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.io.IOException; -import java.util.Enumeration; -import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; +import java.util.ArrayList; +import java.util.List; +/** + * The type Python udf manager. + */ public class PythonUdfManager { private StreamTableEnvironment tableEnvironment; private PythonUdfConfig pythonUdfConfig; + /** + * Instantiates a new Python udf manager. + * + * @param tableEnvironment the table environment + * @param pythonUdfConfig the python udf config + */ public PythonUdfManager(StreamTableEnvironment tableEnvironment, PythonUdfConfig pythonUdfConfig) { this.tableEnvironment = tableEnvironment; this.pythonUdfConfig = pythonUdfConfig; } + /** + * Register python functions. + */ public void registerPythonFunctions() throws IOException { - String inputFiles = pythonUdfConfig.getPythonFiles(); - String[] pythonFilesSource; + String[] pythonFiles; if (inputFiles != null) { registerPythonConfig(); - pythonFilesSource = inputFiles.split(","); + pythonFiles = inputFiles.split(","); } else { - throw new PythonFilesNotFoundException("Python files not found"); + throw new PythonFilesEmptyException("Python files can not be null"); } - for (String pythonFile : pythonFilesSource) { - if (pythonFile.contains(".zip")) { - ZipFile zf = new ZipFile(pythonFile); - for (Enumeration e = zf.entries(); e.hasMoreElements();) { - ZipEntry entry = (ZipEntry) e.nextElement(); - String name = entry.getName(); - if (name.endsWith(".py")) { - name = name.replace(".py", "").replace("/", "."); - String udfName = name.substring(name.lastIndexOf(".") + 1); - String query = "CREATE TEMPORARY FUNCTION " + udfName.toUpperCase() + " AS '" + name + "." + udfName + "' LANGUAGE PYTHON"; - tableEnvironment.executeSql(query); - } - } - } else if (pythonFile.contains(".py")) { - String name = pythonFile.substring(pythonFile.lastIndexOf('/') + 1).replace(".py", ""); - String query = "CREATE TEMPORARY FUNCTION " + name.toUpperCase() + " AS '" + name + "." + name + "' LANGUAGE PYTHON"; - tableEnvironment.executeSql(query); - } else { - throw new PythonFilesFormatException("Python files should be in .py or .zip format"); - } + for (String pythonFile : pythonFiles) { + FileType fileType = FileTypeFactory.getFileType(pythonFile); + List fileNames = fileType.getFileNames(); + List sqlQueries = createQuery(fileNames); + executeSql(sqlQueries); } } @@ -65,4 +61,21 @@ private void registerPythonConfig() { tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.bundle.size", pythonUdfConfig.getPythonBundleSize()); tableEnvironment.getConfig().getConfiguration().setLong("python.fn-execution.bundle.time", pythonUdfConfig.getPythonBundleTime()); } + + private void executeSql(List sqlQueries) { + for (String query : sqlQueries) { + tableEnvironment.executeSql(query); + } + } + + private List createQuery(List fileNames) { + List sqlQueries = new ArrayList<>(); + for (String fileName : fileNames) { + fileName = fileName.replace(".py", "").replace("/", "."); + String functionName = fileName.substring(fileName.lastIndexOf(".") + 1); + String query = "CREATE TEMPORARY FUNCTION " + functionName.toUpperCase() + " AS '" + fileName + "." + functionName + "' LANGUAGE PYTHON"; + sqlQueries.add(query); + } + return sqlQueries; + } } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSource.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSource.java new file mode 100644 index 000000000..915f81d32 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSource.java @@ -0,0 +1,16 @@ +package io.odpf.dagger.functions.udfs.python.file.source; + +import java.io.IOException; + +/** + * The interface File source. + */ +public interface FileSource { + + /** + * Get object file byte [ ]. + * + * @return the byte [ ] + */ + byte[] getObjectFile() throws IOException; +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactory.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactory.java new file mode 100644 index 000000000..8053f19fd --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactory.java @@ -0,0 +1,29 @@ +package io.odpf.dagger.functions.udfs.python.file.source; + +import io.odpf.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; +import io.odpf.dagger.functions.udfs.python.file.source.local.LocalFileSource; + +/** + * The type File source factory. + */ +public class FileSourceFactory { + + /** + * Gets file source. + * + * @param pythonFile the python file + * @return the file source + */ + public static FileSource getFileSource(String pythonFile) { + if ("GS".equals(getFileSourcePrefix(pythonFile))) { + return new GcsFileSource(pythonFile); + } else { + return new LocalFileSource(pythonFile); + } + } + + private static String getFileSourcePrefix(String pythonFile) { + String[] files = pythonFile.split("://"); + return files[0].toUpperCase(); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClient.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClient.java new file mode 100644 index 000000000..e3bd69a3f --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClient.java @@ -0,0 +1,56 @@ +package io.odpf.dagger.functions.udfs.python.file.source.gcs; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * The type Gcs client. + */ +public class GcsClient { + + private Storage storage; + + /** + * Instantiates a new Gcs client. + */ + public GcsClient() { + + if (storage == null) { + storage = StorageOptions.newBuilder() + .build().getService(); + } + } + + /** + * Instantiates a new Gcs client. + * This constructor used for unit test purposes. + * + * @param storage the storage + */ + public GcsClient(Storage storage) { + this.storage = storage; + } + + /** + * Get file byte [ ]. + * + * @param pythonFile the python file + * @return the byte [ ] + */ + public byte[] getFile(String pythonFile) { + List file = Arrays.asList(pythonFile.replace("gs://", "").split("/")); + + String bucketName = file.get(0); + String objectName = file.stream().skip(1).collect(Collectors.joining("/")); + + Blob blob = storage.get(BlobId.of(bucketName, objectName)); + + return blob.getContent(); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSource.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSource.java new file mode 100644 index 000000000..d7b7a5bef --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSource.java @@ -0,0 +1,51 @@ +package io.odpf.dagger.functions.udfs.python.file.source.gcs; + +import io.odpf.dagger.functions.udfs.python.file.source.FileSource; + + +/** + * The type Gcs file source. + */ +public class GcsFileSource implements FileSource { + + private GcsClient gcsClient; + private String pythonFile; + + /** + * Instantiates a new Gcs file source. + * + * @param pythonFile the python file + */ + public GcsFileSource(String pythonFile) { + this.pythonFile = pythonFile; + } + + /** + * Instantiates a new Gcs file source. + * This constructor used for unit test purposes. + * + * @param pythonFile the python file + * @param gcsClient the gcs client + */ + public GcsFileSource(String pythonFile, GcsClient gcsClient) { + this.pythonFile = pythonFile; + this.gcsClient = gcsClient; + } + + @Override + public byte[] getObjectFile() { + return getGcsClient().getFile(pythonFile); + } + + /** + * Gets gcs client. + * + * @return the gcs client + */ + private GcsClient getGcsClient() { + if (this.gcsClient == null) { + this.gcsClient = new GcsClient(); + } + return this.gcsClient; + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSource.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSource.java new file mode 100644 index 000000000..3f3aff624 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSource.java @@ -0,0 +1,29 @@ +package io.odpf.dagger.functions.udfs.python.file.source.local; + +import io.odpf.dagger.functions.udfs.python.file.source.FileSource; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * The type Local file source. + */ +public class LocalFileSource implements FileSource { + + private String pythonFile; + + /** + * Instantiates a new Local file source. + * + * @param pythonFile the python file + */ + public LocalFileSource(String pythonFile) { + this.pythonFile = pythonFile; + } + + @Override + public byte[] getObjectFile() throws IOException { + return Files.readAllBytes(Paths.get(pythonFile)); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileType.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileType.java new file mode 100644 index 000000000..1123b42f1 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileType.java @@ -0,0 +1,17 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import java.io.IOException; +import java.util.List; + +/** + * The interface File type. + */ +public interface FileType { + + /** + * Gets file names. + * + * @return the file names + */ + List getFileNames() throws IOException; +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactory.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactory.java new file mode 100644 index 000000000..83543ad19 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactory.java @@ -0,0 +1,34 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.exceptions.PythonFilesFormatException; +import io.odpf.dagger.functions.udfs.python.file.source.FileSource; +import io.odpf.dagger.functions.udfs.python.file.source.FileSourceFactory; + +/** + * The type File type factory. + */ +public class FileTypeFactory { + + /** + * Gets file type. + * + * @param pythonFile the python file + * @return the file type + */ + public static FileType getFileType(String pythonFile) { + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + switch (getFileTypeFormat(pythonFile)) { + case "PY": + return new PythonFileType(pythonFile); + case "ZIP": + return new ZipFileType(fileSource); + default: + throw new PythonFilesFormatException("Python files should be in .py or .zip format"); + } + } + + private static String getFileTypeFormat(String pythonFile) { + String[] files = pythonFile.split("\\."); + return files[files.length - 1].toUpperCase(); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileType.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileType.java new file mode 100644 index 000000000..2bc993e04 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileType.java @@ -0,0 +1,33 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException; + +import java.util.Collections; +import java.util.List; + +/** + * The type Python file type. + */ +public class PythonFileType implements FileType { + + private String pythonFile; + + /** + * Instantiates a new Python file type. + * + * @param pythonFile the python file + */ + public PythonFileType(String pythonFile) { + this.pythonFile = pythonFile; + } + + @Override + public List getFileNames() { + if (pythonFile == null) { + throw new PythonFilesEmptyException("Python files can not be null"); + } + String name = pythonFile.substring(pythonFile.lastIndexOf('/') + 1); + + return Collections.singletonList(name); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileType.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileType.java new file mode 100644 index 000000000..31fc00cd0 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileType.java @@ -0,0 +1,47 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.udfs.python.file.source.FileSource; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +/** + * The type Zip file type. + */ +public class ZipFileType implements FileType { + + private FileSource fileSource; + + public ZipFileType(FileSource fileSource) { + this.fileSource = fileSource; + } + + @Override + public List getFileNames() throws IOException { + byte[] object = fileSource.getObjectFile(); + + ZipInputStream zi = new ZipInputStream(new ByteArrayInputStream(object)); + ZipEntry zipEntry; + List entries = new ArrayList<>(); + while ((zipEntry = zi.getNextEntry()) != null) { + entries.add(zipEntry); + } + + List fileNames = new ArrayList<>(); + for (ZipEntry entry : entries) { + String name = entry.getName(); + if (isPythonFile(name)) { + fileNames.add(name); + } + } + return fileNames; + } + + private boolean isPythonFile(String fileName) { + return fileName.endsWith(".py"); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java index ae2679e7c..01be1b7af 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java @@ -59,4 +59,24 @@ public void shouldReturnNullIfPythonFilesConfigIsNotGiven() { Assert.assertNull(pythonUdfConfig.getPythonArchives()); Assert.assertNull(pythonUdfConfig.getPythonRequirements()); } + + @Test + public void shouldRemoveWhitespaceInPythonFilesConfig() { + String pythonJsonConfig = "{ \"PYTHON_FILES\": \" /path/to/function.zip, /path/to/files/test.py \"}"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertEquals(pythonUdfConfig.getPythonFiles(), "/path/to/function.zip,/path/to/files/test.py"); + } + + @Test + public void shouldRemoveWhitespaceInPythonArchivesConfig() { + String pythonJsonConfig = "{ \"PYTHON_FILES\": \"/path/to/function.zip\", \"PYTHON_ARCHIVES\": \" /path/to/data.zip, /path/to/files/second_data.zip \"}"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertEquals(pythonUdfConfig.getPythonArchives(), "/path/to/data.zip,/path/to/files/second_data.zip"); + } } diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java index c66256b90..d7a210db6 100644 --- a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java @@ -1,7 +1,7 @@ package io.odpf.dagger.functions.udfs.python; import io.odpf.dagger.functions.exceptions.PythonFilesFormatException; -import io.odpf.dagger.functions.exceptions.PythonFilesNotFoundException; +import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -13,7 +13,6 @@ import java.io.File; import java.io.IOException; -import java.util.Objects; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -46,8 +45,6 @@ public void setup() { @Test public void shouldRegisterPythonUdfConfig() throws IOException { String pathFile = getPath("python_udf.zip"); - String sqlRegisterFirstUdf = "CREATE TEMPORARY FUNCTION ADD AS 'python_udf.scalar.add.add' LANGUAGE PYTHON"; - String sqlRegisterSecondUdf = "CREATE TEMPORARY FUNCTION SUBSTRACT AS 'python_udf.vectorized.substract.substract' LANGUAGE PYTHON"; when(tableEnvironment.getConfig()).thenReturn(tableConfig); when(tableConfig.getConfiguration()).thenReturn(configuration); @@ -67,8 +64,6 @@ public void shouldRegisterPythonUdfConfig() throws IOException { verify(configuration, times(1)).setInteger("python.fn-execution.arrow.batch.size", 10000); verify(configuration, times(1)).setInteger("python.fn-execution.bundle.size", 100000); verify(configuration, times(1)).setLong("python.fn-execution.bundle.time", 1000); - verify(tableEnvironment, times(1)).executeSql(sqlRegisterFirstUdf); - verify(tableEnvironment, times(1)).executeSql(sqlRegisterSecondUdf); } @Test @@ -163,9 +158,22 @@ public void shouldThrowExceptionIfPythonFilesNotInZipOrPyFormat() throws IOExcep } @Test - public void shouldThrowExceptionIfPythonFilesNotExist() throws IOException { - expectedEx.expect(PythonFilesNotFoundException.class); - expectedEx.expectMessage("Python files not found"); + public void shouldThrowExceptionIfPythonFilesIsEmpty() throws IOException { + expectedEx.expect(PythonFilesFormatException.class); + expectedEx.expectMessage("Python files should be in .py or .zip format"); + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(""); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + } + + @Test + public void shouldThrowExceptionIfPythonFilesIsNull() throws IOException { + expectedEx.expect(PythonFilesEmptyException.class); + expectedEx.expectMessage("Python files can not be null"); when(tableEnvironment.getConfig()).thenReturn(tableConfig); when(tableConfig.getConfiguration()).thenReturn(configuration); @@ -176,8 +184,7 @@ public void shouldThrowExceptionIfPythonFilesNotExist() throws IOException { private String getPath(String filename) { ClassLoader classLoader = getClass().getClassLoader(); - File file = new File(Objects.requireNonNull(classLoader.getResource(filename)).getFile()); - return file.getAbsolutePath(); + return classLoader.getResource(filename).getPath(); } } diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java new file mode 100644 index 000000000..2cd5a472f --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java @@ -0,0 +1,27 @@ +package io.odpf.dagger.functions.udfs.python.file.source; + +import io.odpf.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; +import io.odpf.dagger.functions.udfs.python.file.source.local.LocalFileSource; +import org.junit.Assert; +import org.junit.Test; + +public class FileSourceFactoryTest { + + @Test + public void shouldGetLocalFileSource() { + String pythonFile = "/path/to/file/test_function.py"; + + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + + Assert.assertTrue(fileSource instanceof LocalFileSource); + } + + @Test + public void shouldGetGcsFileSource() { + String pythonFile = "gs://bucket-name/path/to/file/test_function.py"; + + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + + Assert.assertTrue(fileSource instanceof GcsFileSource); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClientTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClientTest.java new file mode 100644 index 000000000..5c0c7950e --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClientTest.java @@ -0,0 +1,47 @@ +package io.odpf.dagger.functions.udfs.python.file.source.gcs; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.util.Arrays; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class GcsClientTest { + + @Mock + private Storage storage; + + @Mock + private Blob blob; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() { + + String pythonFile = "gs://bucket_name/path/to/file/python_udf.zip"; + String bucketName = "bucket_name"; + String objectName = "path/to/file/python_udf.zip"; + String expectedValue = Arrays.toString("objectFile".getBytes()); + + when(storage.get(BlobId.of(bucketName, objectName))).thenReturn(blob); + when(blob.getContent()).thenReturn("objectFile".getBytes()); + + GcsClient gcsClient = new GcsClient(storage); + byte[] actualValue = gcsClient.getFile(pythonFile); + + verify(storage, times(1)).get(BlobId.of(bucketName, objectName)); + verify(blob, times(1)).getContent(); + Assert.assertEquals(expectedValue, Arrays.toString(actualValue)); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSourceTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSourceTest.java new file mode 100644 index 000000000..d72d6a8f2 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSourceTest.java @@ -0,0 +1,38 @@ +package io.odpf.dagger.functions.udfs.python.file.source.gcs; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class GcsFileSourceTest { + + @Mock + private GcsClient gcsClient; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + String pythonFile = classLoader.getResource("python_udf.zip").getFile(); + byte[] expectedObject = Files.readAllBytes(Paths.get(pythonFile)); + + when(gcsClient.getFile(pythonFile)).thenReturn(expectedObject); + GcsFileSource gcsFileSource = new GcsFileSource(pythonFile, gcsClient); + + byte[] actualObject = gcsFileSource.getObjectFile(); + + Assert.assertEquals(expectedObject, actualObject); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSourceTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSourceTest.java new file mode 100644 index 000000000..4094e186a --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSourceTest.java @@ -0,0 +1,28 @@ +package io.odpf.dagger.functions.udfs.python.file.source.local; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class LocalFileSourceTest { + + @Test + public void shouldGetObjectFile() throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + + String pythonFile = classLoader.getResource("python_udf.zip").getPath(); + + byte[] object = Files.readAllBytes(Paths.get(pythonFile)); + String stringObject = new String(object, StandardCharsets.UTF_8); + + LocalFileSource localFileSource = new LocalFileSource(pythonFile); + byte[] actualObject = localFileSource.getObjectFile(); + + String actualStringObject = new String(actualObject, StandardCharsets.UTF_8); + Assert.assertEquals(stringObject, actualStringObject); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactoryTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactoryTest.java new file mode 100644 index 000000000..67f58cf63 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactoryTest.java @@ -0,0 +1,41 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.exceptions.PythonFilesFormatException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class FileTypeFactoryTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Test + public void shouldGetPythonFileType() { + String pythonFile = "/path/to/file/test_udf.py"; + + FileType fileType = FileTypeFactory.getFileType(pythonFile); + + Assert.assertTrue(fileType instanceof PythonFileType); + } + + @Test + public void shouldGetZipFileType() { + String pythonFile = "/path/to/file/python_udf.zip"; + + FileType fileType = FileTypeFactory.getFileType(pythonFile); + + Assert.assertTrue(fileType instanceof ZipFileType); + } + + @Test + public void shouldThrowExceptionIfPythonFilesNotInZipOrPyFormat() { + expectedEx.expect(PythonFilesFormatException.class); + expectedEx.expectMessage("Python files should be in .py or .zip format"); + + String pythonFile = "/path/to/file/test_file.txt"; + + FileTypeFactory.getFileType(pythonFile); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileTypeTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileTypeTest.java new file mode 100644 index 000000000..7ff4e9c69 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileTypeTest.java @@ -0,0 +1,45 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.List; + +public class PythonFileTypeTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Test + public void shouldGetFileNames() { + ClassLoader classLoader = getClass().getClassLoader(); + String pythonFile = classLoader.getResource("test_udf.py").getPath(); + + PythonFileType pythonFileType = new PythonFileType(pythonFile); + List fileNames = pythonFileType.getFileNames(); + + Assert.assertEquals("[test_udf.py]", fileNames.toString()); + } + + @Test + public void shouldGetEmptyFileNamesIfPythonFilesIsEmpty() { + String pythonFile = ""; + + PythonFileType pythonFileType = new PythonFileType(pythonFile); + List fileNames = pythonFileType.getFileNames(); + + Assert.assertEquals("[]", fileNames.toString()); + } + + @Test + public void shouldThrowNullPointerExceptionIfPythonFilesIsNull() { + expectedEx.expect(PythonFilesEmptyException.class); + expectedEx.expectMessage("Python files can not be null"); + + PythonFileType pythonFileType = new PythonFileType(null); + pythonFileType.getFileNames(); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileTypeTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileTypeTest.java new file mode 100644 index 000000000..99acd37ae --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileTypeTest.java @@ -0,0 +1,72 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; +import io.odpf.dagger.functions.udfs.python.file.source.local.LocalFileSource; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.*; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class ZipFileTypeTest { + + @Mock + private GcsFileSource gcsFileSource; + + @Mock + private LocalFileSource localFileSource; + + private byte[] zipInBytes; + + @Before + public void setup() throws IOException { + initMocks(this); + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(Objects.requireNonNull(classLoader.getResource("python_udf.zip")).getFile()); + zipInBytes = Files.readAllBytes(file.toPath()); + } + + @Test + public void shouldGetFileNamesFromLocalZip() throws IOException { + + when(localFileSource.getObjectFile()).thenReturn(zipInBytes); + + ZipFileType zipFileType = new ZipFileType(localFileSource); + List fileNames = zipFileType.getFileNames(); + + Assert.assertEquals("[python_udf/scalar/add.py, python_udf/vectorized/substract.py]", fileNames.toString()); + } + + @Test + public void shouldGetFileNamesFromGcsZip() throws IOException { + + when(gcsFileSource.getObjectFile()).thenReturn(zipInBytes); + + ZipFileType zipFileType = new ZipFileType(gcsFileSource); + List fileNames = zipFileType.getFileNames(); + + Assert.assertEquals("[python_udf/scalar/add.py, python_udf/vectorized/substract.py]", fileNames.toString()); + } + + @Test + public void shouldGetEmptyFileNamesIfZipFileNotContainPyFile() throws IOException { + + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(Objects.requireNonNull(classLoader.getResource("test_no_py.zip")).getFile()); + zipInBytes = Files.readAllBytes(file.toPath()); + + when(gcsFileSource.getObjectFile()).thenReturn(zipInBytes); + + ZipFileType zipFileType = new ZipFileType(gcsFileSource); + List fileNames = zipFileType.getFileNames(); + + Assert.assertEquals("[]", fileNames.toString()); + } +} diff --git a/dagger-functions/src/test/resources/test_no_py.zip b/dagger-functions/src/test/resources/test_no_py.zip new file mode 100644 index 000000000..ffcfef40f Binary files /dev/null and b/dagger-functions/src/test/resources/test_no_py.zip differ diff --git a/dagger-functions/src/test/resources/test_udf.py b/dagger-functions/src/test/resources/test_udf.py index a77f7cb85..39e64f8be 100644 --- a/dagger-functions/src/test/resources/test_udf.py +++ b/dagger-functions/src/test/resources/test_udf.py @@ -3,5 +3,5 @@ @udf(result_type=DataTypes.STRING()) -def test_function(text: str): +def test_udf(text: str): return text + "_added_text" diff --git a/dagger-py-functions/test/scalar/sample_test.py b/dagger-py-functions/test/scalar/sample_test.py deleted file mode 100644 index f3f31aec7..000000000 --- a/dagger-py-functions/test/scalar/sample_test.py +++ /dev/null @@ -1,6 +0,0 @@ -from udfs.scalar.sample import sample - - -def testSample(): - f = sample._func - assert f("input_text_") == "input_text_sample_text" diff --git a/dagger-py-functions/test/__init__.py b/dagger-py-functions/tests/__init__.py similarity index 100% rename from dagger-py-functions/test/__init__.py rename to dagger-py-functions/tests/__init__.py diff --git a/dagger-py-functions/test/scalar/__init__.py b/dagger-py-functions/tests/udfs/__init__.py similarity index 100% rename from dagger-py-functions/test/scalar/__init__.py rename to dagger-py-functions/tests/udfs/__init__.py diff --git a/dagger-py-functions/tests/udfs/scalar/__init__.py b/dagger-py-functions/tests/udfs/scalar/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dagger-py-functions/tests/udfs/scalar/multiply_test.py b/dagger-py-functions/tests/udfs/scalar/multiply_test.py new file mode 100644 index 000000000..12aced4ef --- /dev/null +++ b/dagger-py-functions/tests/udfs/scalar/multiply_test.py @@ -0,0 +1,6 @@ +from udfs.scalar.multiply import multiply + + +def testMultiply(): + value = multiply._func + assert value(5,10) == 50 diff --git a/dagger-py-functions/tests/udfs/scalar/sample_test.py b/dagger-py-functions/tests/udfs/scalar/sample_test.py new file mode 100644 index 000000000..e6c9dc7d9 --- /dev/null +++ b/dagger-py-functions/tests/udfs/scalar/sample_test.py @@ -0,0 +1,6 @@ +from udfs.scalar.sample import sample + + +def testSample(): + value = sample._func + assert value("input_text_") == "input_text_sample_text" diff --git a/dagger-py-functions/udfs/scalar/multiply.py b/dagger-py-functions/udfs/scalar/multiply.py new file mode 100644 index 000000000..4078293c5 --- /dev/null +++ b/dagger-py-functions/udfs/scalar/multiply.py @@ -0,0 +1,7 @@ +from pyflink.table import DataTypes +from pyflink.table.udf import udf + + +@udf(result_type=DataTypes.FLOAT()) +def multiply(i, j): + return i * j \ No newline at end of file diff --git a/dagger-py-functions/udfs/scalar/sample.py b/dagger-py-functions/udfs/scalar/sample.py index 6b60f2bd4..99103d4bc 100644 --- a/dagger-py-functions/udfs/scalar/sample.py +++ b/dagger-py-functions/udfs/scalar/sample.py @@ -4,6 +4,6 @@ @udf(result_type=DataTypes.STRING()) def sample(text): - f = open("data/sample_data.txt", "r") - data = f.read() + file = open("data/sample_data.txt", "r") + data = file.read() return text + data