-
Notifications
You must be signed in to change notification settings - Fork 66
VDK: Support multiple databases in the same job
VDK currently supports configuring just one database to use for executing sql files and sending queries with job_input. This is configured using VDK_DB_DEFAULT_TYPE. VDK supports a separate config for ingestion - VDK_INGESTION_METHOD_DEFAULT. This limits us to a maximum of two databases in a best case scenario where we query one database and ingest into another. If we want to query the same database we ingest into, that works fine. However if we want to query and ingest into the same database, but also query another db, we're out of options.
Users should be able to:
- Specify multiple database configurations
- Each database connection is registered under a user-defined name
- Registering a database connection also registers an ingestor for the same database under the same name
- Fetch database connections/ingestors by name using the job_input api and then run queries/ingest data
- Have a default user-configured database that vdk can fall back to. If the user hasn't configured a default database and calls the managed connection/ingestion API, this should result in an error.
Option 1: Nest ingestion database configurations
[vdk]
; general vdk config
[vdk.ingest]
; general ingestion config
[vdk.ingest.default]
type=oracle
use_secrets=True
user=defaultuser
host=defaulthost.com
port=1522
service_name=default_service
thick_mode=True
[vdk.ingest.oracle1]
type=oracle
use_secrets=True
user=myuser
host=localhost
port=1521
sid=free
thick_mode=True
[vdk.ingest.impala1]
type=impala
use_secrets=True
user=impala_user
[vdk.ingest.duckdb1]
; some other database config
[vdk.ingest.sqlite1]Option 2: Configure ingestion databases in separate step
00_configure_ingestion.py
def run(job_input):
job_input.register_ingestion_db(key="default", type="oracle", host="localhost", port="1521", sid="free")
job_input.register_ingestion_db(key="oracle_1", type="oracle", host="http://some_host_whatever", port="1522", sid="not_free")
job_input.register_ingestion_db(key="impala_1", type="impala", host="http://impala_host", database="mydatabase") # <- pass other db-specific properties heredef run(job_input):
obj = {"id": 1, "name": "Bob"}
job_input.send_object_for_ingestion(
payload=obj, destination_table="name_table"
)
job_input.send_object_for_ingestion(
payload=obj, destination_table="another_table", destination_db="oracle_1"
)
job_input.send_object_for_ingestion(
payload=obj, destination_table="storage_table", destination_db="impala_1"
)To pull this off, we should support the following
- Register managed connection and ingester classes for every plugin
- Pass different configs to the same interface, e.g. ManagedConnectionBase. We must be able to instantiate managed connection classes from a dict of configurations.
- Name connections and ingesters and access them by name
- Attach this functionality either to the config.ini files or to the job_input API.
router.py
def add_connection_class(
self,
dbtype: str,
clazz: Type[ManagedConnectionBase]
) -> None:
self._supported_connection_types[dbtype.lower()] = clazz
def get_conn_class(
self,
dbtype: str,
) -> Type[ManagedConnectionBase]:
return self._supported_connection_types.get(dbtype.lower(), None)
def add_open_named_connection_factory_method(
self,
connection_name: str,
open_connection_func: Callable[
[], Union[ManagedConnectionBase, PEP249Connection]
],
) -> None:
self._named_connection_builders[connection_name] = open_connection_funcDo the same for ingestion.
ingester_router.py
def add_ingester_class(
self,
dbtype: str,
clazz: Type[IIngesterPlugin]
) -> None:
self._supported_ingester_types[dbtype.lower()] = clazz
def add_named_ingester_factory_method(
self,
name: str,
ingester_plugin: IngesterPluginFactory,
) -> None:
"""
Add new ingester.
"""
self._named_ingester_builders[name.lower()] = ingester_pluginAdd an abstract class method that instantiates ManagedConnectionBase from a dictionary
managed_connection_base.py
@classmethod
@abstractmethod
def _from_dict(cls, **kwargs):
"""
override this if you want to support multiple connections and ingestion operations
"""
passIf database plugins want to support multiple databases, they should
- Extend ManagedConnectionBase
- Override _connect and _from_dict
- Call add_connection_class and pass the ManagedConnectionBase child class
- Call add_ingester_class and pass the IIngesterPlugin class
The following snippets use the oracle plugin as an example, but every plugin that supports ingestion into multiple data sources should conform to this interface.
oracle_connection.py
class OracleConnection(ManagedConnectionBase):
def __init__(
self,
user: str,
password: str,
connection_string: str = None,
host=None,
port=1521,
sid: str = None,
service_name: str = None,
thick_mode: bool = True,
thick_mode_lib_dir: Optional[str] = None,
):
super().__init__(log)
self._oracle_user = user
self._oracle_password = password
self._host = host
self._port = port
self._sid = sid
self._service_name = service_name
self._oracle_connection_string = connection_string
self._thick_mode = thick_mode
self._thick_mode_lib_dir = thick_mode_lib_dir
@classmethod
def _from_dict(cls, **kwargs):
oracle_user = kwargs.get("user", None)
oracle_password = kwargs.get("password", None)
host = kwargs.get("host", "localhost")
port = kwargs.get("port", 1521)
sid = kwargs.get("sid", None)
service_name = kwargs.get("service_name", None)
oracle_connection_string = kwargs.get("connection_string", None)
thick_mode = kwargs.get("thick_mode", None)
thick_mode_lib_dir = kwargs.get("thick_mode_lib_dir", None)
return cls(oracle_user, oracle_password, host, port, sid,
service_name, oracle_connection_string, thick_mode, thick_mode_lib_dir)
def _connect(self) -> Connection:
import oracledb
if self._thick_mode:
if self._thick_mode_lib_dir:
oracledb.init_oracle_client(self._thick_mode_lib_dir)
else:
oracledb.init_oracle_client()
if self._oracle_connection_string:
log.debug("Connecting to Oracle using connection string")
params = oracledb.ConnectParams()
params.set(user=self._oracle_user)
params.set(password=self._oracle_password)
params.parse_connect_string(self._oracle_connection_string)
conn = oracledb.connect(params=params)
else:
log.debug("Connecting to Oracle using host,port,sid")
params = oracledb.ConnectParams(
user=self._oracle_user,
password=self._oracle_password,
host=self._host,
port=self._port,
sid=self._sid,
service_name=self._service_name,
)
conn = oracledb.connect(params=params)
return conningest_to_oracle.py
class IngestToOracle(IIngesterPlugin):
def __init__(self, name: str, connections: ManagedConnectionRouter):
self.conn: PEP249Connection = connections.open_named_connection(name).connect()
self.cursor: ManagedCursor = self.conn.cursor()oracle_plugin.py
@hookimpl(trylast=True)
def initialize_job(self, context: JobContext):
conf = OracleConfiguration(context.core_context.configuration)
context.connections.add_connection_class("oracle", OracleConnection)
context.ingester.add_ingester_class("oracle", IngestToOracle)We then add a plugin in core that parses all the named configs and instantiates the named connections and ingesters.
named_ingester_conf.py
@hookimpl(trylast=True)
def initialize_job(self, context: JobContext):
for conn_type, name in self._config.named_connections:
if conn_type in context.connections.get_supported_conn_types():
conn_class = context.connections.get_conn_class(conn_type)
ingest_class = context.ingester.get_ingest_class(conn_type)
context.connections.add_open_named_connection_factory_method(
name,
lambda: conn_class.from_dict(self._config.get_config_by_name(name))
)
context.ingester.add_named_ingester_factory_method(
name,
lambda: ingest_class(name, context.connections)
)- SQL Files
- Config parsing
SDK - Develop Data Jobs
SDK Key Concepts
Control Service - Deploy Data Jobs
Control Service Key Concepts
- Scheduling a Data Job for automatic execution
- Deployment
- Execution
- Production
- Properties and Secrets
Operations UI
Community
Contacts