feat: Enable UDF access to SQLTransformers #208
Closed
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Currently in Dagger Flink sql-query we can access all the java and python based UDF functions. But the same UDF functions are not accessible in the post-processor(SQLTransformer).
The Flink API StreamTableEnvironment instance is used to register the UDF function in method call registerFunctions() in StreamManager.java class. Since the same instance is not used to create Flink tables in SQLTransformer.java class, due to which UDFs are not accessible.
We can solve this by two approaches as below.
Approach-1:
We can introduce the DaggerContext singleton object which holds the StreamExecutionEnvironment, StreamTableEnvironment and Configuration instance variables, we can use these variables throughout the application.This context object gets initialized only once in driver class KafkaProtoSQLProcessor.java.
The DaggerContext instance is made available to SQLTransformer through constructor(KafkaProtoSQLProcessor -> StreamManager -> PostProcessorFactory -> ParentPostProcessor -> TransformProcessor). With this DaggerContext we can register the Flink table in SQLTransformer.java. And can have access to the UDFs which were registered earlier.
Approach-2:
In SQLTransformer.java class we can create a new instance of StreamManager and call registerFunctions method for each SQLTransformer configuration. With this approach, if the user calls n times SqlTransformer configuration, then n times the registration of UDFs get called and n times Objects are initialized.
Here we have followed Approach-1.