Overview

In order to support multiple databases, we should be able to easily instantiate multiple database connection objects and register them under a name. For example, we can have three oracle connections named oracle1, oracle2 and foobar. We should also be able to fetch a connection by name and execute queries with it.
To achieve this, we need
- A way to instantiate connections from simple dict configs
- A way to register connection classes with vdk-core
- A built-in plugin that initializes all the named connection based on the available configurations (whatever they may be, the actual configurations are out of scope) and stores them in-memory.
- Pass a name to
job_input.get_managed_connection() and get the correct connection object.
Pseudocode
managed_connection_base.py
@classmethod
@abstractmethod
def _from_dict(cls, **kwargs):
"""
override this if you want to support multiple connections and ingestion operations
"""
pass
router.py
def add_connection_class(
self,
dbtype: str,
clazz: Type[ManagedConnectionBase]
) -> None:
self._supported_connection_types[dbtype.lower()] = clazz
def get_conn_class(
self,
dbtype: str,
) -> Type[ManagedConnectionBase]:
return self._supported_connection_types.get(dbtype.lower(), None)
def add_open_named_connection_factory_method(
self,
connection_name: str,
open_connection_func: Callable[
[], Union[ManagedConnectionBase, PEP249Connection]
],
) -> None:
self._named_connection_builders[connection_name] = open_connection_func
oracle_connection.py
class OracleConnection(ManagedConnectionBase):
def __init__(
self,
user: str,
password: str,
connection_string: str = None,
host=None,
port=1521,
sid: str = None,
service_name: str = None,
thick_mode: bool = True,
thick_mode_lib_dir: Optional[str] = None,
):
super().__init__(log)
self._oracle_user = user
self._oracle_password = password
self._host = host
self._port = port
self._sid = sid
self._service_name = service_name
self._oracle_connection_string = connection_string
self._thick_mode = thick_mode
self._thick_mode_lib_dir = thick_mode_lib_dir
@classmethod
def _from_dict(cls, **kwargs):
oracle_user = kwargs.get("user", None)
oracle_password = kwargs.get("password", None)
host = kwargs.get("host", "localhost")
port = kwargs.get("port", 1521)
sid = kwargs.get("sid", None)
service_name = kwargs.get("service_name", None)
oracle_connection_string = kwargs.get("connection_string", None)
thick_mode = kwargs.get("thick_mode", None)
thick_mode_lib_dir = kwargs.get("thick_mode_lib_dir", None)
return cls(oracle_user, oracle_password, host, port, sid,
service_name, oracle_connection_string, thick_mode, thick_mode_lib_dir)
def _connect(self) -> Connection:
import oracledb
if self._thick_mode:
if self._thick_mode_lib_dir:
oracledb.init_oracle_client(self._thick_mode_lib_dir)
else:
oracledb.init_oracle_client()
if self._oracle_connection_string:
log.debug("Connecting to Oracle using connection string")
params = oracledb.ConnectParams()
params.set(user=self._oracle_user)
params.set(password=self._oracle_password)
params.parse_connect_string(self._oracle_connection_string)
conn = oracledb.connect(params=params)
else:
log.debug("Connecting to Oracle using host,port,sid")
params = oracledb.ConnectParams(
user=self._oracle_user,
password=self._oracle_password,
host=self._host,
port=self._port,
sid=self._sid,
service_name=self._service_name,
)
conn = oracledb.connect(params=params)
return conn
oracle_plugin.py
@hookimpl(trylast=True)
def initialize_job(self, context: JobContext):
conf = OracleConfiguration(context.core_context.configuration)
context.connections.add_connection_class("oracle", OracleConnection)
built_in_connection_initializer.py
Creates all the named connection methods based on all the classes that were previously registered. For now, it can just do nothing until we add a configuration API.
@hookimpl(trylast=True)
def initialize_job(self, context: JobContext):
for conn_type, name in self._config.named_connections:
if conn_type in context.connections.get_supported_conn_types():
conn_class = context.connections.get_conn_class(conn_type)
ingest_class = context.ingester.get_ingest_class(conn_type)
context.connections.add_open_named_connection_factory_method(
name,
lambda: conn_class.from_dict(self._config.get_config_by_name(name))
)
10_new_step.py
def run(job_input):
job_input.get_managed_connection("oracle1")
Acceptance criteria
- The above setup is created and tested
Overview
In order to support multiple databases, we should be able to easily instantiate multiple database connection objects and register them under a name. For example, we can have three oracle connections named
oracle1,oracle2andfoobar. We should also be able to fetch a connection by name and execute queries with it.To achieve this, we need
job_input.get_managed_connection()and get the correct connection object.Pseudocode
managed_connection_base.py
router.py
oracle_connection.py
oracle_plugin.py
built_in_connection_initializer.py
Creates all the named connection methods based on all the classes that were previously registered. For now, it can just do nothing until we add a configuration API.
10_new_step.py
Acceptance criteria