Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
605 changes: 605 additions & 0 deletions tests/perf/data_source/dbapi_test_framework/README.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions tests/perf/data_source/dbapi_test_framework/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#
263 changes: 263 additions & 0 deletions tests/perf/data_source/dbapi_test_framework/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
#
# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved.
#

"""
Simple configuration for DBAPI ingestion tests.

Define your test matrix here by specifying:
- dbms: which database to test
- table: existing table name to read from
- ingestion_method: 'local', 'udtf', 'local_sproc', 'udtf_sproc'
- fetch_size, max_workers: DBAPI parameters
"""

import os

# Load environment variables from .env file in the same directory if it exists and dotenv is installed
try:
from dotenv import load_dotenv

load_dotenv()
except ImportError:
pass

from db_setup_util.large_query_generation import get_large_query

# Snowflake connection parameters
SNOWFLAKE_PARAMS = {
"account": os.getenv("SNOWFLAKE_ACCOUNT"),
"user": os.getenv("SNOWFLAKE_USER"),
"password": os.getenv("SNOWFLAKE_PASSWORD"),
"database": os.getenv("SNOWFLAKE_DATABASE"),
"schema": os.getenv("SNOWFLAKE_SCHEMA"),
"warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
"role": os.getenv("SNOWFLAKE_ROLE"),
"host": os.getenv("SNOWFLAKE_HOST"),
"port": int(os.getenv("SNOWFLAKE_PORT", 443)),
"protocol": os.getenv("SNOWFLAKE_PROTOCOL", "https"),
}

# Source database connection parameters
MYSQL_PARAMS = {
"host": os.getenv("MYSQL_HOST"),
"port": int(os.getenv("MYSQL_PORT", 3306)),
"user": os.getenv("MYSQL_USERNAME"), # Connection function expects 'user'
"password": os.getenv("MYSQL_PASSWORD"),
"database": os.getenv("MYSQL_DATABASE"),
}

POSTGRES_PARAMS = {
"host": os.getenv("POSTGRES_HOST"),
"port": int(os.getenv("POSTGRES_PORT", 5432)),
"user": os.getenv("POSTGRES_USER"),
"password": os.getenv("POSTGRES_PASSWORD"),
"database": os.getenv("POSTGRES_DBNAME"), # Connection function expects 'database'
}

MSSQL_PARAMS = {
"host": os.getenv("MSSQL_SERVER"),
"port": int(os.getenv("MSSQL_PORT", 1433)),
"user": os.getenv("MSSQL_UID"),
"password": os.getenv("MSSQL_PWD"),
"database": os.getenv("MSSQL_DATABASE", "test_db"), # Default to test_db
"driver": os.getenv("MSSQL_DRIVER", "{ODBC Driver 18 for SQL Server}"),
}

ORACLE_PARAMS = {
"host": os.getenv("ORACLEDB_HOST"),
"port": int(os.getenv("ORACLEDB_PORT", 1521)),
"user": os.getenv("ORACLEDB_USERNAME"), # Connection function expects 'user'
"password": os.getenv("ORACLEDB_PASSWORD"),
"service_name": os.getenv("ORACLEDB_SERVICE_NAME"),
}

DATABRICKS_PARAMS = {
"server_hostname": os.getenv("DATABRICKS_SERVER_HOSTNAME"),
"http_path": os.getenv("DATABRICKS_HTTP_PATH"),
"access_token": os.getenv("DATABRICKS_ACCESS_TOKEN"),
}

# DBAPI ingestion parameters
DBAPI_PARAMS = {} # CHANGE ME TO RUN THE DBAPI PARAMETERS YOU WANT
DBAPI_PARAMS_WITH_PARTITION = (
{ # CHANGE ME TO RUN THE DBAPI PARAMETERS WITH PARTITION YOU WANT
"column": "id",
"lower_bound": 1000,
"upper_bound": 9000,
"num_partitions": 10,
}
)
Comment thread
sfc-gh-aling marked this conversation as resolved.

# Cleanup configuration
# Set to False to keep target tables for debugging
CLEANUP_TARGET_TABLES = True

# Show target table info before cleanup (first row + count)
# Set to False to skip showing table info
SHOW_TARGET_TABLE_INFO = True

# Export test results to CSV file
# Set to False to skip CSV export
EXPORT_RESULTS_TO_CSV = True

# Package requirements for stored procedures by DBMS type
SPROC_PACKAGES = {
"mysql": ["pymysql"],
"postgres": ["psycopg2"],
"mssql": ["pyodbc", "msodbcsql"],
"oracle": ["oracledb"],
"databricks": ["databricks-sql-connector"],
}

# JDBC driver JAR filenames (place these in drivers/ directory)
JDBC_DRIVER_JARS = {} # CHANGE ME TO RUN THE JDBC DRIVER JARS YOU WANT
# *** EXAMPLE ***
# JDBC_DRIVER_JARS = {
# "mysql": "mysql-connector-j-9.1.0.jar",
# "postgres": "postgresql-42.7.7.jar",
# "mssql": "mssql-jdbc-12.8.1.jre11.jar",
# "oracle": "ojdbc17-23.9.0.25.07.jar",
# "databricks": "DatabricksJDBC42-2.6.40.jar",
# }


# Snowflake secrets containing DB credentials (format: schema.secret_name)
# Users should create these secrets with USERNAME_PASSWORD_SECRET type
JDBC_SECRETS = {
"mysql": os.getenv("MYSQL_SECRET", "ADMIN.PUBLIC.SNOWPARK_DBAPI_MYSQL_TEST_CRED"),
"postgres": os.getenv(
"POSTGRES_SECRET", "ADMIN.PUBLIC.SNOWPARK_DBAPI_POSTGRES_TEST_CRED"
),
"mssql": os.getenv(
"MSSQL_SECRET", "ADMIN.PUBLIC.SNOWPARK_DBAPI_SQL_SERVER_TEST_CRED"
),
"oracle": os.getenv(
"ORACLE_SECRET", "ADMIN.PUBLIC.SNOWPARK_DBAPI_ORACLEDB_TEST_CRED"
),
"databricks": os.getenv(
"DATABRICKS_SECRET", "ADMIN.PUBLIC.SNOWPARK_DBAPI_DATABRICKS_TEST_CRED"
),
}

# JDBC connection properties (optional, per-database settings)
JDBC_PROPERTIES = {
"mysql": {"useSSL": "false"},
"postgres": {"ssl": "false"},
"mssql": {"trustServerCertificate": "true"},
"oracle": {},
"databricks": {},
}

# JDBC driver class names (for PySpark and other JDBC-based methods)
JDBC_DRIVER_CLASSES = {
"mysql": "com.mysql.cj.jdbc.Driver",
"postgres": "org.postgresql.Driver",
"mssql": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"oracle": "oracle.jdbc.driver.OracleDriver",
"databricks": "com.databricks.client.jdbc.Driver",
}

# PySpark session configuration
# Note: PySpark uses plain credentials from .env (not Snowflake secrets)
# and runs on a local Spark session
PYSPARK_SESSION_CONFIG = {
"spark.master": "local[*]", # Use all available cores
"spark.driver.extraClassPath": str(
os.path.join(os.path.dirname(__file__), "drivers", "*")
), # Path to JDBC JARs
# Optional: Uncomment and adjust for parallelism optimization
# "spark.sql.shuffle.partitions": 16,
# "spark.default.parallelism": 16,
# "spark.executor.cores": 8,
# "spark.executor.memory": "16g",
}

# UDTF configuration (for udtf and udtf_sproc methods)
# Each DBMS needs its own external access integration
# Names match the existing test integrations in tests/resources/test_data_source_dir/
UDTF_CONFIGS = {
"mysql": {
"external_access_integration": "snowpark_dbapi_mysql_test_integration",
},
"postgres": {
"external_access_integration": "snowpark_dbapi_postgres_test_integration",
},
"mssql": {
"external_access_integration": "snowpark_dbapi_sql_server_test_integration",
},
"oracle": {
"external_access_integration": "snowpark_dbapi_oracledb_test_integration",
},
"databricks": {
"external_access_integration": "snowpark_dbapi_databricks_test_integration",
},
}

# Test matrix - define which tests to run
# Each test config format:
# {
# "dbms": "mysql",
# "source": {"type": "table|query", "value": "..."},
# "ingestion_method": "local|udtf|local_sproc|udtf_sproc|jdbc|jdbc_sproc|pyspark",
# "dbapi_params": {...},
# }
DBMS_LIST = [
"mysql",
"postgres",
"mssql",
"oracle",
"databricks",
] # CHANGE ME TO RUN THE DBMS YOU WANT, full list: mysql, postgres, mssql, oracle, databricks
METHODS = [
"local",
"udtf",
] # CHANGE ME TO RUN THE METHODS YOU WANT, full list: local, udtf, local_sproc, udtf_sproc, jdbc, jdbc_sproc, pyspark

# Generate test matrix: table-based and query-based tests
TEST_MATRIX = [
# Table-based tests
*[
{
"dbms": dbms,
"source": {"type": "table", "value": "DBAPI_TEST_TABLE"},
"ingestion_method": method,
}
for dbms in DBMS_LIST
for method in METHODS
],
# Query-based tests
*[
{
"dbms": dbms,
"source": {"type": "query", "value": "SELECT * FROM DBAPI_TEST_TABLE"},
"ingestion_method": method,
}
for dbms in DBMS_LIST
for method in METHODS
],
]

# large query test matrix
TEST_MATRIX_LARGE_QUERY = [
{
"dbms": dbms,
"source": {
"type": "query",
"value": get_large_query(dbms, "100k"),
}, # other options: "1m", "10m", "100m", "1b", "10b"
"ingestion_method": method,
"dbapi_params": DBAPI_PARAMS_WITH_PARTITION, # CHANGE ME TO RUN THE DBAPI PARAMETERS WITH PARTITION YOU WANT
}
for dbms in DBMS_LIST
for method in METHODS
]


# Simple single test config (used by main.py if TEST_MATRIX is not used)
SINGLE_TEST_CONFIG = {
"dbms": "databricks",
"source": {"type": "query", "value": "SELECT * FROM DBAPI_TEST_TABLE"},
"ingestion_method": "udtf",
"dbapi_params": DBAPI_PARAMS, # CHANGE ME TO SETTINGS YOU WANT
}
Loading
Loading