Skip to content

Conversation

@Shreyansh228
Copy link
Contributor

Currently in Dagger Flink sql-query we can access all the java and python based UDF functions. But the same UDF functions are not accessible in the post-processor(SQLTransformer).

The Flink API StreamTableEnvironment instance is used to register the UDF function in method call registerFunctions() in StreamManager.java class. Since the same instance is not used to create Flink tables in SQLTransformer.java class, due to which UDFs are not accessible.

We can solve this by two approaches as below.

Approach-1:
We can introduce the DaggerContext singleton object which holds the StreamExecutionEnvironment, StreamTableEnvironment and Configuration instance variables, we can use these variables throughout the application.This context object gets initialized only once in driver class KafkaProtoSQLProcessor.java.

We can call the DaggerContext object as a static method call in the Transformer.java interface. With this DaggerContext we can register the Flink table in SQLTransformer.java. And can have access to the UDFs which were registered earlier.

Approach-2:
In SQLTransformer.java class we can create a new instance of StreamManager and call registerFunctions method for each SQLTransformer configuration. With this approach, if the user calls n times SqlTransformer configuration, then n times the registration of UDFs get called and n times Objects are initialized.

Here we have followed Approach-1.

Currently in Dagger Flink sql-query we can access all the java and python based UDF functions. But the same UDF functions are not accessible in the post-processor(SQLTransformer).

The Flink API StreamTableEnvironment instance is used to register the UDF function in method call registerFunctions() in StreamManager.java class. Since the same instance is not used to create Flink tables in SQLTransformer.java class, due to which UDFs are not accessible.

We can solve this by two approaches as below.

Approach-1:
We can introduce the DaggerContext singleton object which holds the StreamExecutionEnvironment, StreamTableEnvironment and Configuration instance variables, we can use these variables throughout the application.This context object gets initialized only once in driver class KafkaProtoSQLProcessor.java.

We can call the DaggerContext object as a static method call in the Transformer.java interface.
With this DaggerContext we can register the Flink table in SQLTransformer.java. And can have access to the UDFs which were registered earlier.

Approach-2:
In SQLTransformer.java class we can create a new instance of StreamManager and call registerFunctions method for each SQLTransformer configuration. With this approach, if the user calls n times SqlTransformer configuration, then n times the registration of UDFs get called and n times Objects are initialized.

Here we have followed Approach-1.
@Meghajit Meghajit added the enhancement New feature or request label Oct 12, 2022
KafkaProtoSQLProcessor -> StreamManager -> PostProcessorFactory -> ParentPostProcessor -> TransformProcessor and refactoring related code
import io.odpf.dagger.common.core.DaggerContextTestBase;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
//import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this import been commented ?
I can see its still used in Line 50 below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been un-commented and handled.

@vercel
Copy link

vercel bot commented Oct 27, 2022

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Updated
dagger ✅ Ready (Inspect) Visit Preview Oct 27, 2022 at 3:18AM (UTC)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants