Skip to content

VDK: Support multiple databases in the same job

Dilyan Marinov edited this page Feb 28, 2024 · 19 revisions

Overview

VDK currently supports configuring just one database to use for executing sql files and sending queries with job_input. This is configured using VDK_DB_DEFAULT_TYPE. VDK supports a separate config for ingestion - VDK_INGESTION_METHOD_DEFAULT. This limits us to a maximum of two databases in a best case scenario where we query one database and ingest into another. If we want to query the same database we ingest into, that works fine. However if we want to query and ingest into the same database, but also query another db, we're out of options.

Requirements

Users should be able to:

  1. Specify multiple database configurations
  2. Each database connection is registered under a user-defined name
  3. Registering a database connection also registers an ingestor for the same database under the same name
  4. Fetch database connections/ingestors by name using the job_input api and then run queries/ingest data
  5. Have a default user-configured database that vdk can fall back to. If the user hasn't configured a default database and calls the managed connection/ingestion API, this should result in an error.

Implementation

Configuration

Option 1: Nest ingestion database configurations

[vdk]
; general vdk config

[vdk.ingest]
; general ingestion config

[vdk.ingest.default]
type=oracle
use_secrets=True
user=defaultuser
host=defaulthost.com
port=1522
service_name=default_service
thick_mode=True

[vdk.ingest.oracle1]
type=oracle
use_secrets=True
user=myuser
host=localhost
port=1521
sid=free
thick_mode=True

[vdk.ingest.impala1]
type=impala
use_secrets=True
user=impala_user

[vdk.ingest.duckdb1]
; some other database config

[vdk.ingest.sqlite1]

Option 2: Configure ingestion databases in separate step

00_configure_ingestion.py

def run(job_input):
    job_input.register_ingestion_db(key="default", type="oracle", host="localhost", port="1521", sid="free")
    job_input.register_ingestion_db(key="oracle_1", type="oracle", host="http://some_host_whatever", port="1522", sid="not_free")
    job_input.register_ingestion_db(key="impala_1", type="impala", host="http://impala_host", database="mydatabase") # <- pass other db-specific properties here

Ingestion API

def run(job_input):
    obj = {"id": 1, "name": "Bob"}
    job_input.send_object_for_ingestion(
        payload=obj, destination_table="name_table"
    )
    job_input.send_object_for_ingestion(
        payload=obj, destination_table="another_table", destination_db="oracle_1"
    )
    job_input.send_object_for_ingestion(
        payload=obj, destination_table="storage_table", destination_db="impala_1"
    )

Ingestion implementation

To pull this off, we should support the following

  1. Register managed connection and ingester classes for every plugin
  2. Pass different configs to the same interface, e.g. ManagedConnectionBase. We must be able to instantiate managed connection classes from a dict of configurations.
  3. Name connections and ingesters and access them by name
  4. Attach this functionality either to the config.ini files or to the job_input API.

router.py

    def add_connection_class(
        self,
        dbtype: str,
        clazz: Type[ManagedConnectionBase]
    ) -> None:
        self._supported_connection_types[dbtype.lower()] = clazz

    def get_conn_class(
        self,
        dbtype: str,
    ) -> Type[ManagedConnectionBase]:
        return self._supported_connection_types.get(dbtype.lower(), None)

    def add_open_named_connection_factory_method(
        self,
        connection_name: str,
        open_connection_func: Callable[
            [], Union[ManagedConnectionBase, PEP249Connection]
        ],
    ) -> None:
        self._named_connection_builders[connection_name] = open_connection_func

Do the same for ingestion.

ingester_router.py

    def add_ingester_class(
        self,
        dbtype: str,
        clazz: Type[IIngesterPlugin]
    ) -> None:
        self._supported_ingester_types[dbtype.lower()] = clazz

    def add_named_ingester_factory_method(
        self,
        name: str,
        ingester_plugin: IngesterPluginFactory,
    ) -> None:
        """
        Add new ingester.
        """
        self._named_ingester_builders[name.lower()] = ingester_plugin

Add an abstract class method that instantiates ManagedConnectionBase from a dictionary

managed_connection_base.py

    @classmethod
    @abstractmethod
    def _from_dict(cls, **kwargs):
        """
        override this if you want to support multiple connections and ingestion operations
        """
        pass

If database plugins want to support multiple databases, they should

  1. Extend ManagedConnectionBase
  2. Override _connect and _from_dict
  3. Call add_connection_class and pass the ManagedConnectionBase child class
  4. Call add_ingester_class and pass the IIngesterPlugin class

The following snippets use the oracle plugin as an example, but every plugin that supports ingestion into multiple data sources should conform to this interface.

oracle_connection.py

class OracleConnection(ManagedConnectionBase):
    def __init__(
        self,
        user: str,
        password: str,
        connection_string: str = None,
        host=None,
        port=1521,
        sid: str = None,
        service_name: str = None,
        thick_mode: bool = True,
        thick_mode_lib_dir: Optional[str] = None,
    ):
        super().__init__(log)
        self._oracle_user = user
        self._oracle_password = password
        self._host = host
        self._port = port
        self._sid = sid
        self._service_name = service_name
        self._oracle_connection_string = connection_string
        self._thick_mode = thick_mode
        self._thick_mode_lib_dir = thick_mode_lib_dir

    @classmethod
    def _from_dict(cls, **kwargs):
        oracle_user = kwargs.get("user", None)
        oracle_password = kwargs.get("password", None)
        host = kwargs.get("host", "localhost")
        port = kwargs.get("port", 1521)
        sid = kwargs.get("sid", None)
        service_name = kwargs.get("service_name", None)
        oracle_connection_string = kwargs.get("connection_string", None)
        thick_mode = kwargs.get("thick_mode", None)
        thick_mode_lib_dir = kwargs.get("thick_mode_lib_dir", None)
        return cls(oracle_user, oracle_password, host, port, sid,
                   service_name, oracle_connection_string, thick_mode, thick_mode_lib_dir)

    def _connect(self) -> Connection:
        import oracledb

        if self._thick_mode:
            if self._thick_mode_lib_dir:
                oracledb.init_oracle_client(self._thick_mode_lib_dir)
            else:
                oracledb.init_oracle_client()
        if self._oracle_connection_string:
            log.debug("Connecting to Oracle using connection string")
            params = oracledb.ConnectParams()
            params.set(user=self._oracle_user)
            params.set(password=self._oracle_password)
            params.parse_connect_string(self._oracle_connection_string)

            conn = oracledb.connect(params=params)
        else:
            log.debug("Connecting to Oracle using host,port,sid")
            params = oracledb.ConnectParams(
                user=self._oracle_user,
                password=self._oracle_password,
                host=self._host,
                port=self._port,
                sid=self._sid,
                service_name=self._service_name,
            )
            conn = oracledb.connect(params=params)
        return conn

ingest_to_oracle.py

class IngestToOracle(IIngesterPlugin):
    def __init__(self, name: str, connections: ManagedConnectionRouter):
        self.conn: PEP249Connection = connections.open_named_connection(name).connect()
        self.cursor: ManagedCursor = self.conn.cursor()

oracle_plugin.py

    @hookimpl(trylast=True)
    def initialize_job(self, context: JobContext):
        conf = OracleConfiguration(context.core_context.configuration)
        context.connections.add_connection_class("oracle", OracleConnection)
        context.ingester.add_ingester_class("oracle", IngestToOracle)

We then add a plugin in core that parses all the named configs and instantiates the named connections and ingesters.

named_ingester_conf.py

    @hookimpl(trylast=True)
    def initialize_job(self, context: JobContext):
        for conn_type, name in self._config.named_connections:
            if conn_type in context.connections.get_supported_conn_types():
                conn_class = context.connections.get_conn_class(conn_type)
                ingest_class = context.ingester.get_ingest_class(conn_type)
                context.connections.add_open_named_connection_factory_method(
                    name,
                    lambda: conn_class.from_dict(self._config.get_config_by_name(name))
                )
                context.ingester.add_named_ingester_factory_method(
                    name,
                    lambda: ingest_class(name, context.connections)
                )

Gaps

  1. SQL Files
  2. Config parsing

Clone this wiki locally