Comprehensive framework for testing DBAPI ingestion performance across different databases and ingestion methods.
dbapi_test_framework/
├── config.py # Configuration (loads from .env, test matrix)
├── connections.py # Connection factories for each DBMS
├── runner.py # Test runner with 4 ingestion methods
├── main.py # Entry point
├── .env # Environment variables (credentials - not in git)
└── db_setup_util/ # Database setup scripts
├── common_schema.py # Unified schema + data generator
├── base_setup.py # Base setup class
├── mysql_setup.py # MySQL table setup
├── postgres_setup.py # PostgreSQL table setup
├── oracle_setup.py # Oracle table setup
├── mssql_setup.py # SQL Server table setup
├── databricks_setup.py # Databricks table setup
└── setup_all.py # Run all setups at once
Create a .env file in the dbapi_test_framework/ directory:
# Snowflake
SNOWFLAKE_ACCOUNT=your-account
SNOWFLAKE_USER=your-user
SNOWFLAKE_PASSWORD=your-password
SNOWFLAKE_DATABASE=your-database
SNOWFLAKE_SCHEMA=your-schema
SNOWFLAKE_WAREHOUSE=your-warehouse
SNOWFLAKE_ROLE=your-role
SNOWFLAKE_HOST=your-account.snowflakecomputing.com
SNOWFLAKE_PORT=443
SNOWFLAKE_PROTOCOL=https
# MySQL
MYSQL_HOST=your-mysql-host
MYSQL_PORT=3306
MYSQL_USERNAME=your-user
MYSQL_PASSWORD=your-password
MYSQL_DATABASE=your-database
# PostgreSQL
POSTGRES_HOST=your-postgres-host
POSTGRES_PORT=5432
POSTGRES_USER=your-user
POSTGRES_PASSWORD=your-password
POSTGRES_DBNAME=your-database
# SQL Server
MSSQL_SERVER=your-sqlserver-host
MSSQL_PORT=1433
MSSQL_UID=your-user
MSSQL_PWD=your-password
MSSQL_DATABASE=test_db
MSSQL_DRIVER={ODBC Driver 18 for SQL Server}
# Oracle
ORACLEDB_HOST=your-oracle-host
ORACLEDB_PORT=1521
ORACLEDB_USERNAME=your-user
ORACLEDB_PASSWORD=your-password
ORACLEDB_SERVICE_NAME=your-service
# Databricks
DATABRICKS_SERVER_HOSTNAME=your-workspace.databricks.net
DATABRICKS_HTTP_PATH=sql/protocolv1/o/...
DATABRICKS_ACCESS_TOKEN=your-tokenCreate test tables with identical data across all databases:
cd db_setup_util
# Setup individual database
python3 mysql_setup.py
python3 postgres_setup.py
python3 oracle_setup.py
python3 mssql_setup.py
python3 databricks_setup.py
# Or setup all at once
python3 setup_all.pyThis creates DBAPI_TEST_TABLE with 10,000 rows of deterministic test data in each database.
# From the dbapi_test_framework directory
cd tests/perf/data_source/dbapi_test_framework
# Run a single test
python3 main.py
# Run full test matrix
python3 main.py --matrixOr as a module from project root:
python3 -m tests.perf.data_source.dbapi_test_framework.main
python3 -m tests.perf.data_source.dbapi_test_framework.main --matrixThe framework tests 7 different ingestion approaches:
-
local - Local ingestion using
session.read.dbapi()- Data fetched locally and uploaded to Snowflake
- No external access integration needed
-
udtf - UDTF ingestion using
session.read.dbapi(udtf_configs=...)- Data fetched via UDTF running on Snowflake
- Requires external access integration
-
local_sproc - Local ingestion inside a stored procedure
- Local ingestion logic runs inside a Snowflake stored procedure
- Requires external access integration + packages
-
udtf_sproc - UDTF ingestion inside a stored procedure
- UDTF ingestion logic runs inside a Snowflake stored procedure
- Requires external access integration + packages
-
jdbc - JDBC ingestion using
session.read.jdbc()- Data fetched via JDBC UDTF running on Snowflake
- Requires JDBC driver JAR, Snowflake secret, and external access integration
- Only supports UDTF-based ingestion (no local mode)
- Driver JARs are automatically uploaded to stage on first run
-
jdbc_sproc - JDBC ingestion inside a stored procedure
- JDBC ingestion logic runs inside a Snowflake stored procedure
- Requires JDBC driver JAR, Snowflake secret, and external access integration
- Driver JARs are automatically uploaded to stage on first run
- pyspark - PySpark JDBC ingestion
- Data fetched via PySpark JDBC running on local Spark session
- Written to Snowflake using Snowflake-Spark connector
- Requires PySpark, JDBC driver JARs in
drivers/directory, and Snowflake-Spark connector - Uses plain credentials from
.env(not Snowflake secrets) - Runs on local machine (not on Snowflake servers)
All databases support DBAPI, JDBC, and PySpark methods:
- MySQL (DBAPI:
pymysql, JDBC:mysql-connector-j) - PostgreSQL (DBAPI:
psycopg2, JDBC:postgresql) - MS SQL Server (DBAPI:
pyodbc, JDBC:mssql-jdbc) - Oracle (DBAPI:
oracledb, JDBC:ojdbc) - Databricks (DBAPI:
databricks-sql-connector, JDBC:DatabricksJDBC42)
Note: PySpark method uses the same JDBC drivers as the JDBC methods.
The test matrix in config.py supports two source types:
# Test config format
{
"dbms": "mysql",
"source": {
"type": "table|query", # Type: table or query
"value": "DBAPI_TEST_TABLE" # Table name or SQL query
},
"ingestion_method": "local|udtf|local_sproc|udtf_sproc|jdbc|jdbc_sproc"
}Use list comprehensions for compact configuration:
_DBMS_LIST = ["mysql", "postgres", "mssql", "oracle", "databricks"]
_METHODS = ["local", "udtf"]
TEST_MATRIX = [
# Table-based tests
*[
{
"dbms": dbms,
"source": {"type": "table", "value": "DBAPI_TEST_TABLE"},
"ingestion_method": method,
}
for dbms in _DBMS_LIST
for method in _METHODS
],
# Query-based tests
*[
{
"dbms": dbms,
"source": {"type": "query", "value": "SELECT * FROM DBAPI_TEST_TABLE"},
"ingestion_method": method,
}
for dbms in _DBMS_LIST
for method in _METHODS
],
]This generates 20 tests (5 DBMS × 2 methods × 2 source types).
Required for UDTF and stored procedure methods:
UDTF_CONFIGS = {
"mysql": {
"external_access_integration": "snowpark_dbapi_mysql_test_integration",
},
"postgres": {
"external_access_integration": "snowpark_dbapi_postgres_test_integration",
},
"mssql": {
"external_access_integration": "snowpark_dbapi_sql_server_test_integration",
},
"oracle": {
"external_access_integration": "snowpark_dbapi_oracledb_test_integration",
},
"databricks": {
"external_access_integration": "snowpark_dbapi_databricks_test_integration",
},
}Automatically configured per DBMS:
SPROC_PACKAGES = {
"mysql": ["pymysql"],
"postgres": ["psycopg2"],
"mssql": ["pyodbc", "msodbcsql"],
"oracle": ["oracledb"],
"databricks": ["databricks-sql-connector"],
}# Show table info (row count + first row) before cleanup
SHOW_TARGET_TABLE_INFO = True # Default
# Cleanup target tables after tests
CLEANUP_TARGET_TABLES = False # Set to True in production
# DBAPI parameters (optional)
DBAPI_PARAMS = {
"fetch_size": 10000,
"max_workers": 4,
}{
"dbms": "mysql",
"source": {"type": "table", "value": "DBAPI_TEST_TABLE"},
"ingestion_method": "local"
}{
"dbms": "postgres",
"source": {
"type": "query",
"value": "SELECT id, varchar_col FROM DBAPI_TEST_TABLE WHERE id < 5000"
},
"ingestion_method": "udtf"
}{
"dbms": "oracle",
"source": {"type": "table", "value": "DBAPI_TEST_TABLE"},
"ingestion_method": "udtf_sproc" # Runs inside Snowflake stored procedure
}{
"dbms": "mysql",
"source": {"type": "table", "value": "DBAPI_TEST_TABLE"},
"ingestion_method": "jdbc" # Uses JDBC driver instead of Python DBAPI
}{
"dbms": "postgres",
"source": {
"type": "query",
"value": "SELECT * FROM DBAPI_TEST_TABLE WHERE id BETWEEN 1 AND 1000"
},
"ingestion_method": "jdbc_sproc" # JDBC inside stored procedure
}{
"dbms": "mysql",
"source": {"type": "table", "value": "DBAPI_TEST_TABLE"},
"ingestion_method": "pyspark", # PySpark JDBC on local Spark session
"dbapi_params": {
"fetchsize": 10000,
"numPartitions": 10,
"partitionColumn": "id",
"lowerBound": 0,
"upperBound": 100000
}
}############################################################
TEST: MYSQL - LOCAL
Source Type: TABLE
Source Value: DBAPI_TEST_TABLE
############################################################
============================================================
Running: LOCAL INGESTION
============================================================
✓ Completed in 12.45 seconds
============================================================
TARGET TABLE INFO
============================================================
Row count: 10000
First row:
---------------------------------------------------------
|"ID" |"INT_COL" |"BIGINT_COL" |"VARCHAR_COL" |... |
---------------------------------------------------------
|1 |83811 |478163328 |varchar_0_... |... |
---------------------------------------------------------
✓ Cleaned up target table: TEST_MYSQL_LOCAL_1234567890
Test completed: success
================================================================================
TEST SUMMARY
================================================================================
Status DBMS Method Source Value Time
--------------------------------------------------------------------------------
✓ mysql local table DBAPI_TEST_TABLE 12.34s
✓ mysql udtf table DBAPI_TEST_TABLE 15.67s
✓ postgres local table DBAPI_TEST_TABLE 11.23s
✓ postgres udtf table DBAPI_TEST_TABLE 14.56s
✓ mysql local query SELECT * FROM DBAPI... 10.89s
✓ mysql udtf query SELECT * FROM DBAPI... 13.45s
--------------------------------------------------------------------------------
Total: 6 | Success: 6 | Failed: 0
pip install python-dotenv # For .env support
pip install pymysql # MySQL
pip install psycopg2-binary # PostgreSQL
pip install pyodbc # SQL Server
pip install oracledb # Oracle
pip install databricks-sql-connector # Databricks
pip install pyspark # For PySpark ingestion methodNote for PySpark: You also need the Snowflake-Spark connector and Snowflake JDBC driver:
- Snowflake Spark Connector:
spark-snowflake_2.13-3.1.0.jar(Scala 2.13, recommended)- Download: https://mvnrepository.com/artifact/net.snowflake/spark-snowflake_2.13/3.1.0
- Important: Use version 3.1.0 - version 3.1.1+ has issues with Oracle BLOB types
- Snowflake JDBC:
snowflake-jdbc-3.19.0.jaror later - Place both JARs in the
drivers/directory
See drivers/jdbc_drivers_readme.md for more details and known issues.
For UDTF and stored procedure methods, create external access integrations:
-- Example for MySQL
CREATE OR REPLACE NETWORK RULE snowpark_dbapi_mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('your-mysql-host:3306');
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION snowpark_dbapi_mysql_test_integration
ALLOWED_NETWORK_RULES = (snowpark_dbapi_mysql_network_rule)
ENABLED = TRUE;Repeat for each DBMS you want to test with UDTF/stored procedure methods.
Download the required JDBC driver JARs and place them in the drivers/ directory:
cd tests/perf/data_source/dbapi_test_framework
mkdir -p drivers
# Download drivers from official sources (see drivers/README.md)See drivers/README.md for download links and specific versions.
JDBC methods require Snowflake secrets to store database credentials:
-- Example for MySQL
CREATE OR REPLACE SECRET snowpark_dbapi_mysql_test_cred
TYPE = USERNAME_PASSWORD
USERNAME = 'your_mysql_user'
PASSWORD = 'your_mysql_password';
-- Grant usage to the role
GRANT USAGE ON SECRET snowpark_dbapi_mysql_test_cred TO ROLE your_role;Repeat for each database you want to test with JDBC.
Secret naming convention:
- MySQL:
ADMIN.PUBLIC.SNOWPARK_DBAPI_MYSQL_TEST_CRED - PostgreSQL:
ADMIN.PUBLIC.SNOWPARK_DBAPI_POSTGRES_TEST_CRED - SQL Server:
ADMIN.PUBLIC.SNOWPARK_DBAPI_SQL_SERVER_TEST_CRED - Oracle:
ADMIN.PUBLIC.SNOWPARK_DBAPI_ORACLEDB_TEST_CRED - Databricks:
ADMIN.PUBLIC.SNOWPARK_DBAPI_DATABRICKS_TEST_CRED
You can customize secret names via environment variables (e.g., MYSQL_SECRET).
The JDBC methods use the same external access integrations as DBAPI UDTF methods. No additional setup required if you already have UDTF working.
When you run JDBC tests:
- The framework automatically checks if the driver is on the stage
- If not found, it uploads from your local
drivers/directory - Subsequent runs skip the upload (fast)
- Each test only uploads the specific driver it needs
PySpark uses the same JDBC drivers as the JDBC methods. Ensure drivers are in the drivers/ directory (see JDBC Setup above).
Download the Snowflake-Spark connector JAR:
- Maven Central:
net.snowflake:spark-snowflake_2.12orspark-snowflake_2.13 - Place in
drivers/directory alongside JDBC drivers - Or configure
spark.jarsinconfig.PYSPARK_SESSION_CONFIG
Customize PySpark session settings in config.py:
PYSPARK_SESSION_CONFIG = {
"spark.master": "local[*]",
"spark.driver.extraClassPath": "./drivers/*",
# Optional: Tune for your machine
"spark.sql.shuffle.partitions": 16,
"spark.default.parallelism": 16,
"spark.executor.cores": 8,
"spark.executor.memory": "16g",
}PySpark uses plain credentials from .env (not Snowflake secrets):
- Reads directly from
MYSQL_USERNAME,MYSQL_PASSWORD, etc. - No external access integration required
- Runs entirely on local machine
The db_setup_util/ directory contains scripts to create identical test tables across all databases.
- Same table name:
DBAPI_TEST_TABLE - Same 15 columns with compatible types
- Deterministic data (row #N is identical across all DBMS)
- 10,000 rows by default (configurable)
See db_setup_util/README.md for detailed documentation.
- Verify
.envfile exists and has correct credentials - Check that
python-dotenvis installed - Test connection to source databases independently
- Ensure Snowflake user has CREATE TABLE privileges
- Ensure source database users have SELECT privileges
- For stored procedures, ensure external access integrations are granted
- Install all required Python packages
- For SQL Server, ensure ODBC driver is installed (
msodbcsqlorODBC Driver 18 for SQL Server) - Check package versions are compatible
- Run the setup scripts in
db_setup_util/first - Verify table name matches config (
DBAPI_TEST_TABLEby default)
DBAPI_PARAMS = {
"fetch_size": 5000,
"max_workers": 8,
}TEST_MATRIX = [
# Table with local method
{"dbms": "mysql", "source": {"type": "table", "value": "DBAPI_TEST_TABLE"}, "ingestion_method": "local"},
# Query with UDTF
{"dbms": "postgres", "source": {"type": "query", "value": "SELECT * FROM DBAPI_TEST_TABLE WHERE id < 5000"}, "ingestion_method": "udtf"},
# Stored procedure
{"dbms": "oracle", "source": {"type": "table", "value": "DBAPI_TEST_TABLE"}, "ingestion_method": "local_sproc"},
]# In config.py
CLEANUP_TARGET_TABLES = False # Tables persist after tests
SHOW_TARGET_TABLE_INFO = True # Show row count + first row- Lazy evaluation: Connection parameters are loaded from
.envviaconfig.py - Clean imports: Uses try/except pattern for both direct and module execution
- Extensible: Easy to add new DBMS or ingestion methods
- DRY principle: Common logic in base classes, DBMS-specific logic isolated
- Configurable: Runtime behavior controlled via config variables