Skip to content

dns-technologies/dbhose_airflow

 
 

Repository files navigation

DBHose for Apache Airflow

                                                                 (                )
 (  (                                                 )          )\ )     (    ( /(
 )\))(   '   (    (                   )       (     ( /(        (()/(   ( )\   )\())               (
((_)()\ )   ))\   )\    (     (      (       ))\    )\())   (    /(_))  )((_) ((_)\    (    (     ))\
_(())\_)() /((_) (( )   )\    )\     )\  '  /((_)  (_))/    )\   (_))_  ((_)_   _((_)   )\   )\   /((_)
\ \((_)/ /(_))   | |   ((_)  ((_)  _((_))  (_))    | |_    ((_)   |   \  | _ ) | || |  ((_) ((_) (_))
 \ \/\/ / / -_)  | |  / _|  / _ \ | ' ' |  / -_)   |  _|  / _ \   | |) | | _ \ | __ | / _ \ (_-< / -_)
  \_/\_/  \___|  |_|  \__|  \___/ |_|_|_|  \___|    \__|  \___/   |___/  |___/ |_||_| \___/ /__/ \___|

DBHose is an Apache Airflow module for extremely fast data exchange between DBMSs using native binary formats and CSV format.

Documentation

⚠️ Project Status

This project is in beta testing and may contain bugs. Use with caution in production environments.

Supported DBMS

Currently, data transfer is supported between the following databases:

  • ClickHouse
  • Greenplum
  • PostgreSQL

Description

DBHose is a tool for safe and efficient data movement between:

  • Dump files
  • Python iterables
  • DataFrames (Pandas/Polars)
  • Supported DBMS (ClickHouse, Greenplum, PostgreSQL)

The class includes built-in Data Quality checks and supports various data movement methods. DBHoseOperator provides native Airflow integration for simplified DAG development.

Installation

pip install dbhose-airflow -U --index-url https://dns-technologies.github.io/dbhose-dev-pip/simple/

Initialization

from dbhose_airflow import DBHose

DBHose(
    destination_table: str,
    destination_conn: str | ConnectionConfig | DBConnector,
    source_conn: str | ConnectionConfig | DBConnector | None = None,
    dq_extra_conn: str | ConnectionConfig | DBConnector | None = None,
    *,
    source_filter: list[str] | None = None,
    staging: StagingConfig | None = None,
    move_method: MoveMethod = MoveMethod.AUTO,
    custom_move_sql: str | None = None,
    mode: DumperMode = DumperMode.DEBUG,
    dump_format: DumpFormat | None = None,
    dq: DQConfig | None = None,
)

Parameters

Required Parameters

Parameter Type Description
destination_table str Fully qualified destination table name (e.g., "schema.table")
destination_conn str | ConnectionConfig | DBConnector Destination connection Airflow conn_id, configuration, or DBConnector instance

Optional Parameters

Parameter Type Default Description
source_conn str | ConnectionConfig | DBConnector | None None Source connection (if None, destination is used)
dq_extra_conn str | ConnectionConfig | DBConnector | None None External connection for DQ comparison table
source_filter list[str] | None None List of columns for auto generate filter expressions to insert into source table
staging StagingConfig | None None Staging table configuration
move_method MoveMethod MoveMethod.AUTO Method for moving data from staging to destination
custom_move_sql str | None None Custom SQL for move_method.CUSTOM
mode DumperMode DumperMode.DEBUG Operation mode (DEBUG, TEST, PRODUCTION)
dump_format DumpFormat | None None Dump format for data transfer (auto-detected if None)
dq DQConfig | None None Data Quality check configuration

Move Methods

Method Description Requires Partition Requires Filter Uses Temp Table
APPEND Simple INSERT - adds new rows without deleting old ones No No Yes
REWRITE TRUNCATE + INSERT - completely replaces table content No No Yes
DELETE DELETE matching rows + INSERT - for incremental updates No Yes No
REPLACE REPLACE/ATTACH PARTITION - atomic partition replacement Yes No No
AUTO Automatically selects best strategy based on table metadata - - -
CUSTOM User-provided custom SQL for data movement No No No

Move Strategy Auto-Selection

When move_method=MoveMethod.AUTO, DBHose automatically selects the optimal strategy:

Condition Selected Strategy
source_filter is provided DELETE
Table has partitions AND no source_filter REPLACE
No partitions AND no source_filter REWRITE

Configuration Classes

ConnectionConfig

Configuration for a single database connection.

@dataclass
class ConnectionConfig:
    conn_id: str
    isolation: IsolationLevel = IsolationLevel.committed
    compression: CompressionMethod = CompressionMethod.ZSTD
    compression_level: int = CompressionLevel.ZSTD_DEFAULT
    timeout: int | None = None
Field Type Default Description
conn_id str required Airflow connection ID
isolation IsolationLevel committed Transaction isolation level
compression CompressionMethod ZSTD Compression method
compression_level int 3 Compression level
timeout int | None None Connection timeout in seconds

StagingConfig

Configuration for staging table behavior.

@dataclass
class StagingConfig:
    use_origin: bool = False
    drop_after: bool = True
    random_suffix: bool = True
Field Type Default Description
use_origin bool False Skip staging table, write directly to destination
drop_after bool True Drop staging table after operation
random_suffix bool True Add random suffix to staging table name

DQConfig

Configuration for Data Quality checks.

@dataclass
class DQConfig:
    disabled_checks: list[DQCheck] = field(default_factory=list)
    custom_queries: list[str] = field(default_factory=list)
    exclude_columns: list[str] = field(default_factory=list)
    filter_columns: list[str] = field(default_factory=list)
    column_mapping: dict[str, str] = field(default_factory=dict)
    comparison_object: str | None = None
    use_destination_conn: bool = False
Field Type Default Description
disabled_checks list[DQCheck] [] List of DQ checks to skip
custom_queries list[str] [] Custom DQ query paths
exclude_columns list[str] [] Columns to exclude from DQ checks
filter_columns list[str] [] Columns to include for filter-based checks
column_mapping dict[str, str] {} Map comparison table column names for compare with destination table column names
comparison_object str | None None Table to compare against for DQ checks
use_destination_conn bool False Use destination connection for comparison table

Data Structures

ETLInfo

Structure returned by DDL generation.

@dataclass
class ETLInfo:
    name: str
    ddl: str
    staging_table: str
    staging_temp: str
    staging_ddl: str
    staging_ddl_simple: str
    staging_ddl_temp: str
    table_metadata: TableMetadata
Field Type Description
name str Fully qualified destination table name
ddl str Original DDL of destination table
staging_table str Full staging table name with schema
staging_temp str Temporary table name (without schema)
staging_ddl str Full DDL for staging table (MergeTree / LIKE INCLUDING ALL)
staging_ddl_simple str Simplified DDL (Log / UNLOGGED with columns only)
staging_ddl_temp str DDL for temporary table (Memory / TEMPORARY)
table_metadata TableMetadata Complete table metadata

MoveMethod

class MoveType(NamedTuple):
    description: str
    requires_partition: bool = False
    requires_filter: bool = False
    use_temp_table: bool = False


class MoveMethod(MoveType, Enum):
    APPEND = MoveType(
        "Simple INSERT - adds new rows without deleting old ones",
        use_temp_table=True,
    )
    REWRITE = MoveType(
        "TRUNCATE + INSERT - completely replaces table content",
        use_temp_table=True,
    )
    DELETE = MoveType(
        "DELETE matching rows + INSERT - for incremental updates",
        requires_filter=True,
    )
    REPLACE = MoveType(
        "REPLACE/ATTACH PARTITION - atomic partition replacement",
        requires_partition=True,
    )
    AUTO = MoveType(
        "Automatically selects best strategy based on table metadata",
    )
    CUSTOM = MoveType(
        "User-provided custom SQL for data movement",
    )

DQ Checks

Check Description Generates Queries Needs Comparison
empty Table is not empty No No
uniq No duplicate rows No No
future No dates from future Yes No
infinity No infinity values Yes No
nan No NaN values Yes No
total Row count matches comparison No Yes
sum Numeric column sums match comparison Yes Yes

Public Methods

DBHose Class

from_dbms(query: str | None = None, table: str | None = None) -> None

Upload data from another DBMS using SQL query or direct table export.

Parameters:

  • query (str, optional) – SQL query for data selection
  • table (str, optional) – Source table name for direct export

from_file(file_path: str | Path) -> None

Upload data from a dump file.

Parameters:

  • file_path (str | Path) – Path to the dump file

from_iterable(dtype_data: Iterable[Any]) -> None

Upload data from a Python iterable.

Parameters:

  • dtype_data (Iterable[Any]) – Iterable containing data rows

from_frame(data_frame: PdFrame | PlFrame | LfFrame) -> None

Upload data from a Pandas or Polars DataFrame.

Parameters:

  • data_frame (PdFrame | PlFrame | LfFrame) – DataFrame to upload

DBHoseOperator Class

Native Airflow operator for DBHose ETL operations with template fields support.

from dbhose_airflow import DBHoseOperator

DBHoseOperator(
    task_id: str,
    destination_table: str,
    destination_conn: str | ConnectionConfig,
    source_type: str = "dbms",
    source_conn: str | ConnectionConfig | None = None,
    source_query: str | None = None,
    source_table: str | None = None,
    source_file: str | Path | None = None,
    source_iterable: Iterable[Any] | None = None,
    source_frame: PdFrame | PlFrame | LfFrame | None = None,
    dq_extra_conn: str | ConnectionConfig | None = None,
    source_filter: list[str] | None = None,
    staging: StagingConfig | None = None,
    move_method: MoveMethod = MoveMethod.replace,
    custom_move_sql: str | None = None,
    mode: DumperMode = DumperMode.DEBUG,
    dump_format: DumpFormat | None = None,
    dq: DQConfig | None = None,
    **kwargs,
)

Template Fields: destination_table, source_query, source_table, source_file, custom_move_sql

Usage Examples

Using DBConnector Directly

from dbhose_airflow import DBHose, MoveMethod
from native_dumper import CHConnector
from pgpack_dumper import PGConnector

# Create connectors directly
pg_connector = PGConnector(
    host="localhost",
    dbname="source_db",
    user="postgres",
    password="secret",
    port=5432,
)

ch_connector = CHConnector(
    host="localhost",
    dbname="dest_db",
    user="default",
    password="secret",
    port=8123,
)

dbhose = DBHose(
    destination_table="analytics.events",
    destination_conn=ch_connector,  # Direct DBConnector
    source_conn=pg_connector,       # Direct DBConnector
    move_method=MoveMethod.AUTO,
)

dbhose.from_dbms(table="public.raw_events")

Auto Strategy Selection

from dbhose_airflow import DBHose, MoveMethod, StagingConfig

# AUTO will select DELETE strategy because source_filter is provided
dbhose = DBHose(
    destination_table="analytics.daily_stats",
    destination_conn="clickhouse_prod",
    source_conn="postgres_stage",
    source_filter=["date", "region"],  # Triggers DELETE strategy
    move_method=MoveMethod.AUTO,
    staging=StagingConfig(random_suffix=True),
)

dbhose.from_dbms(table="daily_stats")

Using Different Move Strategies

from dbhose_airflow import DBHose, MoveMethod

# APPEND - fast insert using temp table
dbhose_append = DBHose(
    destination_table="logs.app_events",
    destination_conn="clickhouse_prod",
    move_method=MoveMethod.APPEND,
)
dbhose_append.from_file("events_dump.zst")

# REWRITE - full table replacement
dbhose_rewrite = DBHose(
    destination_table="reference.dictionary",
    destination_conn="postgres_prod",
    move_method=MoveMethod.REWRITE,
)
dbhose_rewrite.from_iterable(dict_data)

# REPLACE - atomic partition replacement (requires partitioned table)
dbhose_replace = DBHose(
    destination_table="analytics.monthly_stats",
    destination_conn="clickhouse_prod",
    move_method=MoveMethod.REPLACE,
)
dbhose_replace.from_dbms(table="monthly_stats")

Basic Transfer Between Databases

from dbhose_airflow import DBHose, MoveMethod

dbhose = DBHose(
    destination_table="public.users",
    destination_conn="postgres_prod",
    source_conn="postgres_stage",
    move_method=MoveMethod.replace,
)

# Transfer entire table
dbhose.from_dbms(table="users")

Transfer with Filter and DQ Checks

from dbhose_airflow import (
    DBHose,
    DQCheck,
    DQConfig,
    MoveMethod,
    StagingConfig,
)

dbhose = DBHose(
    destination_table="analytics.events",
    destination_conn="clickhouse_prod",
    source_conn="postgres_stage",
    source_filter=["created_at", "status"],
    staging=StagingConfig(random_suffix=True, drop_after=True),
    move_method=MoveMethod.delete,
    dq=DQConfig(
        disabled_checks=[DQCheck.future],
        exclude_columns=["password_hash"],
    ),
)

dbhose.from_dbms(query="SELECT * FROM raw_events WHERE processed = false")

Using Custom Connection Configuration

from dbhose_airflow import (
    DBHose,
    CompressionMethod,
    ConnectionConfig,
    IsolationLevel,
)

dest_conn = ConnectionConfig(
    conn_id="greenplum_prod",
    isolation=IsolationLevel.repeatable,
    compression=CompressionMethod.LZ4,
    compression_level=5,
    timeout=600,
)

dbhose = DBHose(
    destination_table="warehouse.facts",
    destination_conn=dest_conn,
)

dbhose.from_file("facts_dump.zst")

Skip Staging Table

from dbhose_airflow import DBHose, StagingConfig

dbhose = DBHose(
    destination_table="public.temp_logs",
    destination_conn="postgres_prod",
    staging=StagingConfig(use_origin=True),
)

# Writes directly to destination without staging
dbhose.from_iterable(log_rows)

Custom Move SQL

from dbhose_airflow import DBHose, MoveMethod

dbhose = DBHose(
    destination_table="public.metrics",
    destination_conn="postgres_prod",
    move_method=MoveMethod.custom,
    custom_move_sql="""
        MERGE INTO public.metrics AS target
        USING public.metrics_staging AS source
        ON target.id = source.id
        WHEN MATCHED THEN UPDATE SET value = source.value
        WHEN NOT MATCHED THEN INSERT (id, value) VALUES (source.id, source.value);
    """,
)

dbhose.from_dbms(table="metrics")

Airflow DAG with DBHoseOperator

from datetime import datetime
from airflow import DAG
from dbhose_airflow import DBHoseOperator, MoveMethod, StagingConfig, DQConfig, DQCheck

with DAG(
    dag_id="dbhose_transfer",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["etl", "analytics"],
) as dag:
    
    transfer_task = DBHoseOperator(
        task_id="transfer_daily_stats",
        destination_table="analytics.daily_stats",
        destination_conn="clickhouse_prod",
        source_conn="postgres_stage",
        source_table="daily_stats",
        move_method=MoveMethod.replace,
        staging=StagingConfig(random_suffix=True, drop_after=True),
        dq=DQConfig(
            disabled_checks=[DQCheck.future],
            exclude_columns=["password_hash"],
        ),
    )

Airflow DAG with PythonOperator (Legacy)

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from dbhose_airflow import DBHose, MoveMethod


def transfer_data():
    dbhose = DBHose(
        destination_table="analytics.daily_stats",
        destination_conn="clickhouse_conn",
        source_conn="postgres_conn",
        move_method=MoveMethod.replace,
    )
    dbhose.from_dbms(table="daily_stats")


with DAG(
    dag_id="daily_stats_transfer",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    tags=["etl", "analytics"],
) as dag:
    
    transfer_task = PythonOperator(
        task_id="transfer_daily_stats",
        python_callable=transfer_data,
    )

Data Structures

ColumnMeta

Metadata for a single column.

class ColumnMeta(NamedTuple):
    name: str
    data_type: str
    nullable: bool
    has_default: bool
    default_value: str | None
    comment: str | None
    position: int
    type_oid: int | None
    type_namespace: int | None
    generated: str | None
    identity: str | None

TableMetadata

Complete table metadata.

class TableMetadata(NamedTuple):
    name: str
    schema: str
    owner: str | None
    comment: str | None
    columns: list[ColumnMeta]
    partition_by: str | None
    order_by: list[str] | None
    primary_key: list[str] | None
    engine: str
    settings: dict[str, Any] | None
    oid: int | None

ETLInfo

Structure returned by DDL generation.

@dataclass
class ETLInfo:
    name: str
    ddl: str
    staging_table: str
    staging_ddl: str
    table_metadata: TableMetadata

ETL Pipeline Decorator

All data loading methods (from_dbms, from_file, from_iterable, from_frame) are decorated with @etl_pipeline, which automatically manages:

  1. Staging table creation (unless use_origin=True)
  2. Data loading
  3. Data Quality checks (if configured)
  4. Moving data to destination
  5. Staging table cleanup (unless drop_after=False)

Features

  • Automatic staging tables Data is loaded to staging before final destination with three DDL variants:
    • Full DDL (with indexes, partitions, engine)
    • Simple DDL (columns only, no indexes)
    • Temp DDL (temporary table, fastest for simple operations)
  • Smart move strategies Flexible strategies for data movement with automatic selection
  • Direct DBConnector support Pass DBConnector instances directly instead of Airflow connection IDs
  • Data Quality checks Built-in checks before final move
  • Multiple source support Files, DataFrames, DBMS
  • Native Airflow operator DBHoseOperator with template fields support
  • Detailed logging All stages are logged with visual framing
  • Automatic format detection BINARY or CSV format auto-selected based on source/destination compatibility
  • Cross-platform column mapping Unified column metadata across PostgreSQL and ClickHouse
  • Cython-optimized DDL generation Fast metadata extraction and staging DDL generation

Beta Version Limitations

  • Only ClickHouse, Greenplum, and PostgreSQL are supported
  • Some edge cases with complex data types may not be fully tested
  • API may change in future versions
  • Documentation may be incomplete

Requirements

  • Apache Airflow >= 2.0
  • Python >= 3.10
  • native-dumper (for ClickHouse)
  • pgpack-dumper (for PostgreSQL/Greenplum)
  • dr-herriot (for dumps manipulations)

Error Handling

All exceptions are wrapped in DBHoseError hierarchy:

  • DBHoseError – Base exception
  • DBHoseValueError – Invalid configuration or parameters
  • DBHoseTypeError – Type mismatches
  • DBHoseNotFoundError – Missing objects
  • DBHosePermissionError – Permission issues

Reporting Issues

When encountering bugs or unexpected behavior, please report them to help improve the project's stability.

License

MIT

About

client for exchanging data between DBMSs in native binary formats

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages

  • Python 48.5%
  • Rust 40.6%
  • Cython 10.0%
  • Other 0.9%