diff --git a/misc/dbt-materialize/CHANGELOG.md b/misc/dbt-materialize/CHANGELOG.md index a65bf63e1b25d..f39dd88cdcfa3 100644 --- a/misc/dbt-materialize/CHANGELOG.md +++ b/misc/dbt-materialize/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +* Support overriding the `options` PostgreSQL connection parameter + ## 1.9.7 - 2026-03-16 * Reduce catalog server load during diff --git a/misc/dbt-materialize/dbt/adapters/materialize/connections.py b/misc/dbt-materialize/dbt/adapters/materialize/connections.py index 314863d417e23..7f47ee82cdb08 100644 --- a/misc/dbt-materialize/dbt/adapters/materialize/connections.py +++ b/misc/dbt-materialize/dbt/adapters/materialize/connections.py @@ -15,7 +15,7 @@ # limitations under the License. from dataclasses import dataclass -from typing import Optional +from typing import Dict, Optional import dbt_common.exceptions import psycopg2 @@ -31,43 +31,27 @@ logger = AdapterLogger("Materialize") - -# Override the psycopg2 connect function in order to inject Materialize-specific -# session parameter defaults. -# -# This approach is a bit hacky, but some of these session parameters *must* be -# set as part of connection initiation, so we can't simply run `SET` commands -# after the session is established. -def connect(**kwargs): - options = [ - # Ensure that dbt's catalog queries get routed to the - # `mz_catalog_server` cluster, even if the server or role's default is - # different. - "--auto_route_catalog_queries=on", - # dbt prints notices to stdout, which is very distracting because dbt - # can establish many new connections during `dbt run`. - "--welcome_message=off", - # Disable warnings about the session's default database or cluster not - # existing, as these get quite spammy, especially with multiple threads. - # - # Details: it's common for the default cluster for the role dbt is - # connecting as (often `quickstart`) to be absent. For many dbt - # deployments, clusters are explicitly specified on a model-by-model - # basis, and there in fact is no natural "default" cluster. So warning - # repeatedly that the default cluster doesn't exist isn't helpful, since - # each DDL statement will specify a different, valid cluster. If a DDL - # statement ever specifies an invalid cluster, dbt will still produce an - # error about the invalid cluster, even with this setting enabled. - "--current_object_missing_warnings=off", - *(kwargs.get("options") or []), - ] - kwargs["options"] = " ".join(options) - - return _connect(**kwargs) - - -_connect = psycopg2.connect -psycopg2.connect = connect +DEFAULT_SESSION_PARAMETERS = { + # Ensure that dbt's catalog queries get routed to the + # `mz_catalog_server` cluster, even if the server or role's default is + # different. + "auto_route_catalog_queries": "on", + # dbt prints notices to stdout, which is very distracting because dbt + # can establish many new connections during `dbt run`. + "welcome_message": "off", + # Disable warnings about the session's default database or cluster not + # existing, as these get quite spammy, especially with multiple threads. + # + # Details: it's common for the default cluster for the role dbt is + # connecting as (often `quickstart`) to be absent. For many dbt + # deployments, clusters are explicitly specified on a model-by-model + # basis, and there in fact is no natural "default" cluster. So warning + # repeatedly that the default cluster doesn't exist isn't helpful, since + # each DDL statement will specify a different, valid cluster. If a DDL + # statement ever specifies an invalid cluster, dbt will still produce an + # error about the invalid cluster, even with this setting enabled. + "current_object_missing_warnings": "off", +} @dataclass @@ -84,6 +68,9 @@ class MaterializeCredentials(PostgresCredentials): # modified). cluster: Optional[str] = None application_name: Optional[str] = f"dbt-materialize v{__version__}" + # Additional session parameters to pass via the connection options string. + # User-provided options override DEFAULT_SESSION_PARAMETERS. + options: Optional[Dict[str, str]] = None @property def type(self): @@ -103,6 +90,7 @@ def _connection_keys(self): "search_path", "retries", "application_name", + "options", ) @@ -111,7 +99,78 @@ class MaterializeConnectionManager(PostgresConnectionManager): @classmethod def open(cls, connection): - connection = super().open(connection) + # Much of the `open` method setup is copied from the `PostgresConnectionManager.open` method + # https://github.com/dbt-labs/dbt-adapters/blob/v1.17.3/dbt-postgres/src/dbt/adapters/postgres/connections.py#L102, + # except we allow users to override options. + + if connection.state == "open": + logger.debug("Connection is already open, skipping open.") + return connection + + credentials = cls.get_credentials(connection.credentials) + kwargs = {} + + if credentials.keepalives_idle: + kwargs["keepalives_idle"] = credentials.keepalives_idle + + if credentials.sslmode: + kwargs["sslmode"] = credentials.sslmode + + if credentials.sslcert is not None: + kwargs["sslcert"] = credentials.sslcert + + if credentials.sslkey is not None: + kwargs["sslkey"] = credentials.sslkey + + if credentials.sslrootcert is not None: + kwargs["sslrootcert"] = credentials.sslrootcert + + if credentials.application_name: + kwargs["application_name"] = credentials.application_name + + options_dict = dict(DEFAULT_SESSION_PARAMETERS) + if credentials.options: + options_dict.update(credentials.options) + + options_parts = [(k, v) for k, v in options_dict.items()] + + search_path = credentials.search_path + if search_path is not None and search_path != "": + options_parts.append(("search_path", search_path)) + + kwargs["options"] = " ".join([f"--{k}={v}" for k, v in options_parts]) + + def connect(): + handle = psycopg2.connect( + dbname=credentials.database, + user=credentials.user, + host=credentials.host, + password=credentials.password, + port=credentials.port, + connect_timeout=credentials.connect_timeout, + **kwargs, + ) + + if credentials.role: + handle.cursor().execute(f"set role {credentials.role}") + + return handle + + retryable_exceptions = [ + psycopg2.errors.OperationalError, + ] + + def exponential_backoff(attempt: int): + return attempt * attempt + + connection = cls.retry_connection( + connection, + connect=connect, + logger=logger, + retry_limit=credentials.retries, + retry_timeout=exponential_backoff, + retryable_exceptions=retryable_exceptions, + ) # Prevents psycopg connection from automatically opening transactions. # More info: https://www.psycopg.org/docs/usage.html#transactions-control diff --git a/misc/dbt-materialize/dbt/include/materialize/profile_template.yml b/misc/dbt-materialize/dbt/include/materialize/profile_template.yml index 6a69522923dff..97bc1295662a5 100644 --- a/misc/dbt-materialize/dbt/include/materialize/profile_template.yml +++ b/misc/dbt-materialize/dbt/include/materialize/profile_template.yml @@ -41,3 +41,5 @@ prompts: cluster: hint: 'dev cluster' default: 'quickstart' + options: + hint: 'overrides the PostgreSQL `options` connection parameter' diff --git a/misc/dbt-materialize/tests/adapter/test_connection_options.py b/misc/dbt-materialize/tests/adapter/test_connection_options.py new file mode 100644 index 0000000000000..8b537caae0ec8 --- /dev/null +++ b/misc/dbt-materialize/tests/adapter/test_connection_options.py @@ -0,0 +1,48 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License in the LICENSE file at the +# root of this repository, or online at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + + +class TestConnectionOptionsOverride: + """Verify that `options` in the dbt profile reaches the server.""" + + @pytest.fixture(scope="class") + def dbt_profile_target(self): + return { + "type": "materialize", + "threads": 1, + "host": "{{ env_var('DBT_HOST', 'localhost') }}", + "user": "materialize", + "pass": "password", + "database": "materialize", + "port": "{{ env_var('DBT_PORT', 6875) }}", + "options": { + "welcome_message": "on", + "auto_route_catalog_queries": "off", + }, + } + + def test_options_override(self, project): + # Override these session variables opposite to their default values + result = project.run_sql( + "SELECT current_setting('welcome_message')", fetch="one" + ) + assert result[0] == "on" + + result = project.run_sql( + "SELECT current_setting('auto_route_catalog_queries')", fetch="one" + ) + assert result[0] == "off"