1515# limitations under the License.
1616
1717from dataclasses import dataclass
18- from typing import Optional
18+ from typing import Dict , Optional
1919
2020import dbt_common .exceptions
2121import psycopg2
3131
3232logger = AdapterLogger ("Materialize" )
3333
34-
35- # Override the psycopg2 connect function in order to inject Materialize-specific
36- # session parameter defaults.
37- #
38- # This approach is a bit hacky, but some of these session parameters *must* be
39- # set as part of connection initiation, so we can't simply run `SET` commands
40- # after the session is established.
41- def connect (** kwargs ):
42- options = [
43- # Ensure that dbt's catalog queries get routed to the
44- # `mz_catalog_server` cluster, even if the server or role's default is
45- # different.
46- "--auto_route_catalog_queries=on" ,
47- # dbt prints notices to stdout, which is very distracting because dbt
48- # can establish many new connections during `dbt run`.
49- "--welcome_message=off" ,
50- # Disable warnings about the session's default database or cluster not
51- # existing, as these get quite spammy, especially with multiple threads.
52- #
53- # Details: it's common for the default cluster for the role dbt is
54- # connecting as (often `quickstart`) to be absent. For many dbt
55- # deployments, clusters are explicitly specified on a model-by-model
56- # basis, and there in fact is no natural "default" cluster. So warning
57- # repeatedly that the default cluster doesn't exist isn't helpful, since
58- # each DDL statement will specify a different, valid cluster. If a DDL
59- # statement ever specifies an invalid cluster, dbt will still produce an
60- # error about the invalid cluster, even with this setting enabled.
61- "--current_object_missing_warnings=off" ,
62- * (kwargs .get ("options" ) or []),
63- ]
64- kwargs ["options" ] = " " .join (options )
65-
66- return _connect (** kwargs )
67-
68-
69- _connect = psycopg2 .connect
70- psycopg2 .connect = connect
34+ DEFAULT_SESSION_PARAMETERS = {
35+ # Ensure that dbt's catalog queries get routed to the
36+ # `mz_catalog_server` cluster, even if the server or role's default is
37+ # different.
38+ "auto_route_catalog_queries" : "on" ,
39+ # dbt prints notices to stdout, which is very distracting because dbt
40+ # can establish many new connections during `dbt run`.
41+ "welcome_message" : "off" ,
42+ # Disable warnings about the session's default database or cluster not
43+ # existing, as these get quite spammy, especially with multiple threads.
44+ #
45+ # Details: it's common for the default cluster for the role dbt is
46+ # connecting as (often `quickstart`) to be absent. For many dbt
47+ # deployments, clusters are explicitly specified on a model-by-model
48+ # basis, and there in fact is no natural "default" cluster. So warning
49+ # repeatedly that the default cluster doesn't exist isn't helpful, since
50+ # each DDL statement will specify a different, valid cluster. If a DDL
51+ # statement ever specifies an invalid cluster, dbt will still produce an
52+ # error about the invalid cluster, even with this setting enabled.
53+ "current_object_missing_warnings" : "off" ,
54+ }
7155
7256
7357@dataclass
@@ -84,6 +68,9 @@ class MaterializeCredentials(PostgresCredentials):
8468 # modified).
8569 cluster : Optional [str ] = None
8670 application_name : Optional [str ] = f"dbt-materialize v{ __version__ } "
71+ # Additional session parameters to pass via the connection options string.
72+ # User-provided options override DEFAULT_SESSION_PARAMETERS.
73+ options : Optional [Dict [str , str ]] = None
8774
8875 @property
8976 def type (self ):
@@ -103,6 +90,7 @@ def _connection_keys(self):
10390 "search_path" ,
10491 "retries" ,
10592 "application_name" ,
93+ "options" ,
10694 )
10795
10896
@@ -111,7 +99,78 @@ class MaterializeConnectionManager(PostgresConnectionManager):
11199
112100 @classmethod
113101 def open (cls , connection ):
114- connection = super ().open (connection )
102+ # Much of the `open` method setup is copied from the `PostgresConnectionManager.open` method
103+ # https://github.com/dbt-labs/dbt-adapters/blob/v1.17.3/dbt-postgres/src/dbt/adapters/postgres/connections.py#L102,
104+ # except we allow users to override options.
105+
106+ if connection .state == "open" :
107+ logger .debug ("Connection is already open, skipping open." )
108+ return connection
109+
110+ credentials = cls .get_credentials (connection .credentials )
111+ kwargs = {}
112+
113+ if credentials .keepalives_idle :
114+ kwargs ["keepalives_idle" ] = credentials .keepalives_idle
115+
116+ if credentials .sslmode :
117+ kwargs ["sslmode" ] = credentials .sslmode
118+
119+ if credentials .sslcert is not None :
120+ kwargs ["sslcert" ] = credentials .sslcert
121+
122+ if credentials .sslkey is not None :
123+ kwargs ["sslkey" ] = credentials .sslkey
124+
125+ if credentials .sslrootcert is not None :
126+ kwargs ["sslrootcert" ] = credentials .sslrootcert
127+
128+ if credentials .application_name :
129+ kwargs ["application_name" ] = credentials .application_name
130+
131+ options_dict = dict (DEFAULT_SESSION_PARAMETERS )
132+ if credentials .options :
133+ options_dict .update (credentials .options )
134+
135+ options_parts = [(k , v ) for k , v in options_dict .items ()]
136+
137+ search_path = credentials .search_path
138+ if search_path is not None and search_path != "" :
139+ options_parts .append (("search_path" , search_path .replace (" " , "\\ " )))
140+
141+ kwargs ["options" ] = " " .join ([f"--{ k } ={ v } " for k , v in options_parts ])
142+
143+ def connect ():
144+ handle = psycopg2 .connect (
145+ dbname = credentials .database ,
146+ user = credentials .user ,
147+ host = credentials .host ,
148+ password = credentials .password ,
149+ port = credentials .port ,
150+ connect_timeout = credentials .connect_timeout ,
151+ ** kwargs ,
152+ )
153+
154+ if credentials .role :
155+ handle .cursor ().execute (f"set role { credentials .role } " )
156+
157+ return handle
158+
159+ retryable_exceptions = [
160+ psycopg2 .errors .OperationalError ,
161+ ]
162+
163+ def exponential_backoff (attempt : int ):
164+ return attempt * attempt
165+
166+ connection = cls .retry_connection (
167+ connection ,
168+ connect = connect ,
169+ logger = logger ,
170+ retry_limit = credentials .retries ,
171+ retry_timeout = exponential_backoff ,
172+ retryable_exceptions = retryable_exceptions ,
173+ )
115174
116175 # Prevents psycopg connection from automatically opening transactions.
117176 # More info: https://www.psycopg.org/docs/usage.html#transactions-control
0 commit comments