( )
( ( ) )\ ) ( ( /(
)\))( ' ( ( ) ( ( /( (()/( ( )\ )\()) (
((_)()\ ) ))\ )\ ( ( ( ))\ )\()) ( /(_)) )((_) ((_)\ ( ( ))\
_(())\_)() /((_) (( ) )\ )\ )\ ' /((_) (_))/ )\ (_))_ ((_)_ _((_) )\ )\ /((_)
\ \((_)/ /(_)) | | ((_) ((_) _((_)) (_)) | |_ ((_) | \ | _ ) | || | ((_) ((_) (_))
\ \/\/ / / -_) | | / _| / _ \ | ' ' | / -_) | _| / _ \ | |) | | _ \ | __ | / _ \ (_-< / -_)
\_/\_/ \___| |_| \__| \___/ |_|_|_| \___| \__| \___/ |___/ |___/ |_||_| \___/ /__/ \___|
DBHose is an Apache Airflow module for extremely fast data exchange between DBMSs using native binary formats and CSV format.
This project is in beta testing and may contain bugs. Use with caution in production environments.
Currently, data transfer is supported between the following databases:
- ClickHouse
- Greenplum
- PostgreSQL
DBHose is a tool for safe and efficient data movement between:
- Dump files
- Python iterables
- DataFrames (Pandas/Polars)
- Supported DBMS (ClickHouse, Greenplum, PostgreSQL)
The class includes built-in Data Quality checks and supports various data movement methods. DBHoseOperator provides native Airflow integration for simplified DAG development.
pip install dbhose-airflow -U --index-url https://dns-technologies.github.io/dbhose-dev-pip/simple/from dbhose_airflow import DBHose
DBHose(
destination_table: str,
destination_conn: str | ConnectionConfig | DBConnector,
source_conn: str | ConnectionConfig | DBConnector | None = None,
dq_extra_conn: str | ConnectionConfig | DBConnector | None = None,
*,
source_filter: list[str] | None = None,
staging: StagingConfig | None = None,
move_method: MoveMethod = MoveMethod.AUTO,
custom_move_sql: str | None = None,
mode: DumperMode = DumperMode.DEBUG,
dump_format: DumpFormat | None = None,
dq: DQConfig | None = None,
)| Parameter | Type | Description |
|---|---|---|
destination_table |
str |
Fully qualified destination table name (e.g., "schema.table") |
destination_conn |
str | ConnectionConfig | DBConnector |
Destination connection Airflow conn_id, configuration, or DBConnector instance |
| Parameter | Type | Default | Description |
|---|---|---|---|
source_conn |
str | ConnectionConfig | DBConnector | None |
None |
Source connection (if None, destination is used) |
dq_extra_conn |
str | ConnectionConfig | DBConnector | None |
None |
External connection for DQ comparison table |
source_filter |
list[str] | None |
None |
List of columns for auto generate filter expressions to insert into source table |
staging |
StagingConfig | None |
None |
Staging table configuration |
move_method |
MoveMethod |
MoveMethod.AUTO |
Method for moving data from staging to destination |
custom_move_sql |
str | None |
None |
Custom SQL for move_method.CUSTOM |
mode |
DumperMode |
DumperMode.DEBUG |
Operation mode (DEBUG, TEST, PRODUCTION) |
dump_format |
DumpFormat | None |
None |
Dump format for data transfer (auto-detected if None) |
dq |
DQConfig | None |
None |
Data Quality check configuration |
| Method | Description | Requires Partition | Requires Filter | Uses Temp Table |
|---|---|---|---|---|
APPEND |
Simple INSERT - adds new rows without deleting old ones | No | No | Yes |
REWRITE |
TRUNCATE + INSERT - completely replaces table content | No | No | Yes |
DELETE |
DELETE matching rows + INSERT - for incremental updates | No | Yes | No |
REPLACE |
REPLACE/ATTACH PARTITION - atomic partition replacement | Yes | No | No |
AUTO |
Automatically selects best strategy based on table metadata | - | - | - |
CUSTOM |
User-provided custom SQL for data movement | No | No | No |
When move_method=MoveMethod.AUTO, DBHose automatically selects the optimal strategy:
| Condition | Selected Strategy |
|---|---|
source_filter is provided |
DELETE |
Table has partitions AND no source_filter |
REPLACE |
No partitions AND no source_filter |
REWRITE |
Configuration for a single database connection.
@dataclass
class ConnectionConfig:
conn_id: str
isolation: IsolationLevel = IsolationLevel.committed
compression: CompressionMethod = CompressionMethod.ZSTD
compression_level: int = CompressionLevel.ZSTD_DEFAULT
timeout: int | None = None| Field | Type | Default | Description |
|---|---|---|---|
conn_id |
str |
required | Airflow connection ID |
isolation |
IsolationLevel |
committed |
Transaction isolation level |
compression |
CompressionMethod |
ZSTD |
Compression method |
compression_level |
int |
3 |
Compression level |
timeout |
int | None |
None |
Connection timeout in seconds |
Configuration for staging table behavior.
@dataclass
class StagingConfig:
use_origin: bool = False
drop_after: bool = True
random_suffix: bool = True| Field | Type | Default | Description |
|---|---|---|---|
use_origin |
bool |
False |
Skip staging table, write directly to destination |
drop_after |
bool |
True |
Drop staging table after operation |
random_suffix |
bool |
True |
Add random suffix to staging table name |
Configuration for Data Quality checks.
@dataclass
class DQConfig:
disabled_checks: list[DQCheck] = field(default_factory=list)
custom_queries: list[str] = field(default_factory=list)
exclude_columns: list[str] = field(default_factory=list)
filter_columns: list[str] = field(default_factory=list)
column_mapping: dict[str, str] = field(default_factory=dict)
comparison_object: str | None = None
use_destination_conn: bool = False| Field | Type | Default | Description |
|---|---|---|---|
disabled_checks |
list[DQCheck] |
[] |
List of DQ checks to skip |
custom_queries |
list[str] |
[] |
Custom DQ query paths |
exclude_columns |
list[str] |
[] |
Columns to exclude from DQ checks |
filter_columns |
list[str] |
[] |
Columns to include for filter-based checks |
column_mapping |
dict[str, str] |
{} |
Map comparison table column names for compare with destination table column names |
comparison_object |
str | None |
None |
Table to compare against for DQ checks |
use_destination_conn |
bool |
False |
Use destination connection for comparison table |
Structure returned by DDL generation.
@dataclass
class ETLInfo:
name: str
ddl: str
staging_table: str
staging_temp: str
staging_ddl: str
staging_ddl_simple: str
staging_ddl_temp: str
table_metadata: TableMetadata| Field | Type | Description |
|---|---|---|
name |
str |
Fully qualified destination table name |
ddl |
str |
Original DDL of destination table |
staging_table |
str |
Full staging table name with schema |
staging_temp |
str |
Temporary table name (without schema) |
staging_ddl |
str |
Full DDL for staging table (MergeTree / LIKE INCLUDING ALL) |
staging_ddl_simple |
str |
Simplified DDL (Log / UNLOGGED with columns only) |
staging_ddl_temp |
str |
DDL for temporary table (Memory / TEMPORARY) |
table_metadata |
TableMetadata |
Complete table metadata |
class MoveType(NamedTuple):
description: str
requires_partition: bool = False
requires_filter: bool = False
use_temp_table: bool = False
class MoveMethod(MoveType, Enum):
APPEND = MoveType(
"Simple INSERT - adds new rows without deleting old ones",
use_temp_table=True,
)
REWRITE = MoveType(
"TRUNCATE + INSERT - completely replaces table content",
use_temp_table=True,
)
DELETE = MoveType(
"DELETE matching rows + INSERT - for incremental updates",
requires_filter=True,
)
REPLACE = MoveType(
"REPLACE/ATTACH PARTITION - atomic partition replacement",
requires_partition=True,
)
AUTO = MoveType(
"Automatically selects best strategy based on table metadata",
)
CUSTOM = MoveType(
"User-provided custom SQL for data movement",
)| Check | Description | Generates Queries | Needs Comparison |
|---|---|---|---|
empty |
Table is not empty | No | No |
uniq |
No duplicate rows | No | No |
future |
No dates from future | Yes | No |
infinity |
No infinity values | Yes | No |
nan |
No NaN values | Yes | No |
total |
Row count matches comparison | No | Yes |
sum |
Numeric column sums match comparison | Yes | Yes |
Upload data from another DBMS using SQL query or direct table export.
Parameters:
query(str, optional) – SQL query for data selectiontable(str, optional) – Source table name for direct export
Upload data from a dump file.
Parameters:
file_path(str | Path) – Path to the dump file
Upload data from a Python iterable.
Parameters:
dtype_data(Iterable[Any]) – Iterable containing data rows
Upload data from a Pandas or Polars DataFrame.
Parameters:
data_frame(PdFrame | PlFrame | LfFrame) – DataFrame to upload
Native Airflow operator for DBHose ETL operations with template fields support.
from dbhose_airflow import DBHoseOperator
DBHoseOperator(
task_id: str,
destination_table: str,
destination_conn: str | ConnectionConfig,
source_type: str = "dbms",
source_conn: str | ConnectionConfig | None = None,
source_query: str | None = None,
source_table: str | None = None,
source_file: str | Path | None = None,
source_iterable: Iterable[Any] | None = None,
source_frame: PdFrame | PlFrame | LfFrame | None = None,
dq_extra_conn: str | ConnectionConfig | None = None,
source_filter: list[str] | None = None,
staging: StagingConfig | None = None,
move_method: MoveMethod = MoveMethod.replace,
custom_move_sql: str | None = None,
mode: DumperMode = DumperMode.DEBUG,
dump_format: DumpFormat | None = None,
dq: DQConfig | None = None,
**kwargs,
)Template Fields: destination_table, source_query, source_table, source_file, custom_move_sql
from dbhose_airflow import DBHose, MoveMethod
from native_dumper import CHConnector
from pgpack_dumper import PGConnector
# Create connectors directly
pg_connector = PGConnector(
host="localhost",
dbname="source_db",
user="postgres",
password="secret",
port=5432,
)
ch_connector = CHConnector(
host="localhost",
dbname="dest_db",
user="default",
password="secret",
port=8123,
)
dbhose = DBHose(
destination_table="analytics.events",
destination_conn=ch_connector, # Direct DBConnector
source_conn=pg_connector, # Direct DBConnector
move_method=MoveMethod.AUTO,
)
dbhose.from_dbms(table="public.raw_events")from dbhose_airflow import DBHose, MoveMethod, StagingConfig
# AUTO will select DELETE strategy because source_filter is provided
dbhose = DBHose(
destination_table="analytics.daily_stats",
destination_conn="clickhouse_prod",
source_conn="postgres_stage",
source_filter=["date", "region"], # Triggers DELETE strategy
move_method=MoveMethod.AUTO,
staging=StagingConfig(random_suffix=True),
)
dbhose.from_dbms(table="daily_stats")from dbhose_airflow import DBHose, MoveMethod
# APPEND - fast insert using temp table
dbhose_append = DBHose(
destination_table="logs.app_events",
destination_conn="clickhouse_prod",
move_method=MoveMethod.APPEND,
)
dbhose_append.from_file("events_dump.zst")
# REWRITE - full table replacement
dbhose_rewrite = DBHose(
destination_table="reference.dictionary",
destination_conn="postgres_prod",
move_method=MoveMethod.REWRITE,
)
dbhose_rewrite.from_iterable(dict_data)
# REPLACE - atomic partition replacement (requires partitioned table)
dbhose_replace = DBHose(
destination_table="analytics.monthly_stats",
destination_conn="clickhouse_prod",
move_method=MoveMethod.REPLACE,
)
dbhose_replace.from_dbms(table="monthly_stats")from dbhose_airflow import DBHose, MoveMethod
dbhose = DBHose(
destination_table="public.users",
destination_conn="postgres_prod",
source_conn="postgres_stage",
move_method=MoveMethod.replace,
)
# Transfer entire table
dbhose.from_dbms(table="users")from dbhose_airflow import (
DBHose,
DQCheck,
DQConfig,
MoveMethod,
StagingConfig,
)
dbhose = DBHose(
destination_table="analytics.events",
destination_conn="clickhouse_prod",
source_conn="postgres_stage",
source_filter=["created_at", "status"],
staging=StagingConfig(random_suffix=True, drop_after=True),
move_method=MoveMethod.delete,
dq=DQConfig(
disabled_checks=[DQCheck.future],
exclude_columns=["password_hash"],
),
)
dbhose.from_dbms(query="SELECT * FROM raw_events WHERE processed = false")from dbhose_airflow import (
DBHose,
CompressionMethod,
ConnectionConfig,
IsolationLevel,
)
dest_conn = ConnectionConfig(
conn_id="greenplum_prod",
isolation=IsolationLevel.repeatable,
compression=CompressionMethod.LZ4,
compression_level=5,
timeout=600,
)
dbhose = DBHose(
destination_table="warehouse.facts",
destination_conn=dest_conn,
)
dbhose.from_file("facts_dump.zst")from dbhose_airflow import DBHose, StagingConfig
dbhose = DBHose(
destination_table="public.temp_logs",
destination_conn="postgres_prod",
staging=StagingConfig(use_origin=True),
)
# Writes directly to destination without staging
dbhose.from_iterable(log_rows)from dbhose_airflow import DBHose, MoveMethod
dbhose = DBHose(
destination_table="public.metrics",
destination_conn="postgres_prod",
move_method=MoveMethod.custom,
custom_move_sql="""
MERGE INTO public.metrics AS target
USING public.metrics_staging AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET value = source.value
WHEN NOT MATCHED THEN INSERT (id, value) VALUES (source.id, source.value);
""",
)
dbhose.from_dbms(table="metrics")from datetime import datetime
from airflow import DAG
from dbhose_airflow import DBHoseOperator, MoveMethod, StagingConfig, DQConfig, DQCheck
with DAG(
dag_id="dbhose_transfer",
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
tags=["etl", "analytics"],
) as dag:
transfer_task = DBHoseOperator(
task_id="transfer_daily_stats",
destination_table="analytics.daily_stats",
destination_conn="clickhouse_prod",
source_conn="postgres_stage",
source_table="daily_stats",
move_method=MoveMethod.replace,
staging=StagingConfig(random_suffix=True, drop_after=True),
dq=DQConfig(
disabled_checks=[DQCheck.future],
exclude_columns=["password_hash"],
),
)from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from dbhose_airflow import DBHose, MoveMethod
def transfer_data():
dbhose = DBHose(
destination_table="analytics.daily_stats",
destination_conn="clickhouse_conn",
source_conn="postgres_conn",
move_method=MoveMethod.replace,
)
dbhose.from_dbms(table="daily_stats")
with DAG(
dag_id="daily_stats_transfer",
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
tags=["etl", "analytics"],
) as dag:
transfer_task = PythonOperator(
task_id="transfer_daily_stats",
python_callable=transfer_data,
)Metadata for a single column.
class ColumnMeta(NamedTuple):
name: str
data_type: str
nullable: bool
has_default: bool
default_value: str | None
comment: str | None
position: int
type_oid: int | None
type_namespace: int | None
generated: str | None
identity: str | NoneComplete table metadata.
class TableMetadata(NamedTuple):
name: str
schema: str
owner: str | None
comment: str | None
columns: list[ColumnMeta]
partition_by: str | None
order_by: list[str] | None
primary_key: list[str] | None
engine: str
settings: dict[str, Any] | None
oid: int | NoneStructure returned by DDL generation.
@dataclass
class ETLInfo:
name: str
ddl: str
staging_table: str
staging_ddl: str
table_metadata: TableMetadataAll data loading methods (from_dbms, from_file, from_iterable, from_frame) are decorated with @etl_pipeline, which automatically manages:
- Staging table creation (unless
use_origin=True) - Data loading
- Data Quality checks (if configured)
- Moving data to destination
- Staging table cleanup (unless
drop_after=False)
- Automatic staging tables Data is loaded to staging before final destination with three DDL variants:
- Full DDL (with indexes, partitions, engine)
- Simple DDL (columns only, no indexes)
- Temp DDL (temporary table, fastest for simple operations)
- Smart move strategies Flexible strategies for data movement with automatic selection
- Direct DBConnector support Pass
DBConnectorinstances directly instead of Airflow connection IDs - Data Quality checks Built-in checks before final move
- Multiple source support Files, DataFrames, DBMS
- Native Airflow operator
DBHoseOperatorwith template fields support - Detailed logging All stages are logged with visual framing
- Automatic format detection BINARY or CSV format auto-selected based on source/destination compatibility
- Cross-platform column mapping Unified column metadata across PostgreSQL and ClickHouse
- Cython-optimized DDL generation Fast metadata extraction and staging DDL generation
- Only ClickHouse, Greenplum, and PostgreSQL are supported
- Some edge cases with complex data types may not be fully tested
- API may change in future versions
- Documentation may be incomplete
- Apache Airflow >= 2.0
- Python >= 3.10
- native-dumper (for ClickHouse)
- pgpack-dumper (for PostgreSQL/Greenplum)
- dr-herriot (for dumps manipulations)
All exceptions are wrapped in DBHoseError hierarchy:
DBHoseError– Base exceptionDBHoseValueError– Invalid configuration or parametersDBHoseTypeError– Type mismatchesDBHoseNotFoundError– Missing objectsDBHosePermissionError– Permission issues
When encountering bugs or unexpected behavior, please report them to help improve the project's stability.
MIT