Skip to content

Commit 0514015

Browse files
committed
dbt-materialize: Support overriding options in dbt adapter
Purpose is to allow OIDC auth via the oidc_auth_enabled connection option variable. - Removes the initial monkey patch - Copies much of the implementation in PGConnectionManager
1 parent 6168aa9 commit 0514015

4 files changed

Lines changed: 104 additions & 41 deletions

File tree

misc/dbt-materialize/dbt/adapters/materialize/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515
# limitations under the License.
1616

1717
# If you bump this version, bump it in setup.py too.
18-
version = "1.9.7"
18+
version = "1.9.8"

misc/dbt-materialize/dbt/adapters/materialize/connections.py

Lines changed: 100 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# limitations under the License.
1616

1717
from dataclasses import dataclass
18-
from typing import Optional
18+
from typing import Dict, Optional
1919

2020
import dbt_common.exceptions
2121
import psycopg2
@@ -31,43 +31,27 @@
3131

3232
logger = AdapterLogger("Materialize")
3333

34-
35-
# Override the psycopg2 connect function in order to inject Materialize-specific
36-
# session parameter defaults.
37-
#
38-
# This approach is a bit hacky, but some of these session parameters *must* be
39-
# set as part of connection initiation, so we can't simply run `SET` commands
40-
# after the session is established.
41-
def connect(**kwargs):
42-
options = [
43-
# Ensure that dbt's catalog queries get routed to the
44-
# `mz_catalog_server` cluster, even if the server or role's default is
45-
# different.
46-
"--auto_route_catalog_queries=on",
47-
# dbt prints notices to stdout, which is very distracting because dbt
48-
# can establish many new connections during `dbt run`.
49-
"--welcome_message=off",
50-
# Disable warnings about the session's default database or cluster not
51-
# existing, as these get quite spammy, especially with multiple threads.
52-
#
53-
# Details: it's common for the default cluster for the role dbt is
54-
# connecting as (often `quickstart`) to be absent. For many dbt
55-
# deployments, clusters are explicitly specified on a model-by-model
56-
# basis, and there in fact is no natural "default" cluster. So warning
57-
# repeatedly that the default cluster doesn't exist isn't helpful, since
58-
# each DDL statement will specify a different, valid cluster. If a DDL
59-
# statement ever specifies an invalid cluster, dbt will still produce an
60-
# error about the invalid cluster, even with this setting enabled.
61-
"--current_object_missing_warnings=off",
62-
*(kwargs.get("options") or []),
63-
]
64-
kwargs["options"] = " ".join(options)
65-
66-
return _connect(**kwargs)
67-
68-
69-
_connect = psycopg2.connect
70-
psycopg2.connect = connect
34+
DEFAULT_SESSION_PARAMETERS = {
35+
# Ensure that dbt's catalog queries get routed to the
36+
# `mz_catalog_server` cluster, even if the server or role's default is
37+
# different.
38+
"auto_route_catalog_queries": "on",
39+
# dbt prints notices to stdout, which is very distracting because dbt
40+
# can establish many new connections during `dbt run`.
41+
"welcome_message": "off",
42+
# Disable warnings about the session's default database or cluster not
43+
# existing, as these get quite spammy, especially with multiple threads.
44+
#
45+
# Details: it's common for the default cluster for the role dbt is
46+
# connecting as (often `quickstart`) to be absent. For many dbt
47+
# deployments, clusters are explicitly specified on a model-by-model
48+
# basis, and there in fact is no natural "default" cluster. So warning
49+
# repeatedly that the default cluster doesn't exist isn't helpful, since
50+
# each DDL statement will specify a different, valid cluster. If a DDL
51+
# statement ever specifies an invalid cluster, dbt will still produce an
52+
# error about the invalid cluster, even with this setting enabled.
53+
"current_object_missing_warnings": "off",
54+
}
7155

7256

7357
@dataclass
@@ -84,6 +68,9 @@ class MaterializeCredentials(PostgresCredentials):
8468
# modified).
8569
cluster: Optional[str] = None
8670
application_name: Optional[str] = f"dbt-materialize v{__version__}"
71+
# Additional session parameters to pass via the connection options string.
72+
# User-provided options override DEFAULT_SESSION_PARAMETERS.
73+
options: Optional[Dict[str, str]] = None
8774

8875
@property
8976
def type(self):
@@ -103,6 +90,7 @@ def _connection_keys(self):
10390
"search_path",
10491
"retries",
10592
"application_name",
93+
"options",
10694
)
10795

10896

@@ -111,7 +99,80 @@ class MaterializeConnectionManager(PostgresConnectionManager):
11199

112100
@classmethod
113101
def open(cls, connection):
114-
connection = super().open(connection)
102+
# Much of the `open` method setup is copied from the `PostgresConnectionManager.open` method
103+
# https://github.com/dbt-labs/dbt-adapters/blob/v1.17.3/dbt-postgres/src/dbt/adapters/postgres/connections.py#L102,
104+
# except we allow users to override options.
105+
106+
if connection.state == "open":
107+
logger.debug("Connection is already open, skipping open.")
108+
return connection
109+
110+
credentials = cls.get_credentials(connection.credentials)
111+
kwargs = {}
112+
113+
if credentials.keepalives_idle:
114+
kwargs["keepalives_idle"] = credentials.keepalives_idle
115+
116+
if credentials.sslmode:
117+
kwargs["sslmode"] = credentials.sslmode
118+
119+
if credentials.sslcert is not None:
120+
kwargs["sslcert"] = credentials.sslcert
121+
122+
if credentials.sslkey is not None:
123+
kwargs["sslkey"] = credentials.sslkey
124+
125+
if credentials.sslrootcert is not None:
126+
kwargs["sslrootcert"] = credentials.sslrootcert
127+
128+
if credentials.application_name:
129+
kwargs["application_name"] = credentials.application_name
130+
131+
options_dict = dict(DEFAULT_SESSION_PARAMETERS)
132+
if credentials.options:
133+
options_dict.update(credentials.options)
134+
135+
options_parts = [(k, v) for k, v in options_dict.items()]
136+
137+
search_path = credentials.search_path
138+
if search_path is not None and search_path != "":
139+
options_parts.append(("search_path", search_path))
140+
141+
kwargs["options"] = " ".join(
142+
[f"--{k}={v.replace(" ", "\\ ")}" for k, v in options_parts]
143+
)
144+
145+
def connect():
146+
handle = psycopg2.connect(
147+
dbname=credentials.database,
148+
user=credentials.user,
149+
host=credentials.host,
150+
password=credentials.password,
151+
port=credentials.port,
152+
connect_timeout=credentials.connect_timeout,
153+
**kwargs,
154+
)
155+
156+
if credentials.role:
157+
handle.cursor().execute(f"set role {credentials.role}")
158+
159+
return handle
160+
161+
retryable_exceptions = [
162+
psycopg2.errors.OperationalError,
163+
]
164+
165+
def exponential_backoff(attempt: int):
166+
return attempt * attempt
167+
168+
connection = cls.retry_connection(
169+
connection,
170+
connect=connect,
171+
logger=logger,
172+
retry_limit=credentials.retries,
173+
retry_timeout=exponential_backoff,
174+
retryable_exceptions=retryable_exceptions,
175+
)
115176

116177
# Prevents psycopg connection from automatically opening transactions.
117178
# More info: https://www.psycopg.org/docs/usage.html#transactions-control

misc/dbt-materialize/dbt/include/materialize/profile_template.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,5 @@ prompts:
4141
cluster:
4242
hint: 'dev cluster'
4343
default: 'quickstart'
44+
options:
45+
hint: 'overrides the PostgreSQL `options` connection parameter'

misc/dbt-materialize/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
# This adapter's minor version should match the required dbt-postgres version,
3333
# but patch versions may differ.
3434
# If you bump this version, bump it in __version__.py too.
35-
version="1.9.7",
35+
version="1.9.8",
3636
description="The Materialize adapter plugin for dbt.",
3737
long_description=(Path(__file__).parent / "README.md").open().read(),
3838
long_description_content_type="text/markdown",

0 commit comments

Comments
 (0)