Skip to content

Commit 325c9aa

Browse files
fix(duckdb): proactively refresh S3 secrets to prevent STS credential expiry
DuckDB's httpfs extension snapshots STS credentials into a secret at creation time and never re-queries the provider chain, even with `refresh: auto` configured. When using `CREDENTIAL_CHAIN` with `chain: sts` (AssumeRole), DuckDB requests the AWS minimum session duration of 900 seconds (15 minutes). Any SQLMesh plan that takes longer than 15 minutes to complete — such as large incremental backfills — fails with HTTP 400 (Bad Request) from S3 once the token expires. This was confirmed through controlled experiments (binary search narrowed the threshold to exactly ~15 minutes, and `duckdb_secrets()` showed the same `key_id` throughout a run — no refresh ever occurs). The upstream fix (duckdb/duckdb-httpfs#165) adds credential refresh at the httpfs layer but has not been merged yet. This patch adds a timer-based secret refresh mechanism to the DuckDB engine adapter as a workaround: - Before each SQL execution, checks if 12 minutes (80% of the 900s TTL) have elapsed since the last secret creation - If so, queries `duckdb_secrets()` for existing S3 secret names, drops them, and recreates from the original config — forcing a fresh STS AssumeRole call - Uses double-check locking to prevent concurrent refresh when `concurrent_tasks > 1` - Zero overhead for configs without S3 secrets (early return on null check) and minimal overhead otherwise (monotonic clock comparison on hot path) Changes: - sqlmesh/core/engine_adapter/duckdb.py: Add __init__, _execute override, and secret refresh methods to DuckDBEngineAdapter - sqlmesh/core/config/connection.py: Add _extra_engine_config to DuckDBConnectionConfig to pass secrets config to the adapter This workaround can be removed once the upstream duckdb-httpfs fix lands and we upgrade DuckDB. Refs: duckdb/duckdb-httpfs#165 Refs: duckdb/duckdb-aws#26
1 parent d8d653f commit 325c9aa

File tree

2 files changed

+98
-0
lines changed

2 files changed

+98
-0
lines changed

sqlmesh/core/config/connection.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,20 @@ class DuckDBConnectionConfig(BaseDuckDBConnectionConfig):
561561
DISPLAY_NAME: t.ClassVar[t.Literal["DuckDB"]] = "DuckDB"
562562
DISPLAY_ORDER: t.ClassVar[t.Literal[1]] = 1
563563

564+
@property
565+
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
566+
config: t.Dict[str, t.Any] = {}
567+
if self.secrets:
568+
if isinstance(self.secrets, list):
569+
secrets_items = [(secret_dict, "") for secret_dict in self.secrets]
570+
else:
571+
secrets_items = [
572+
(secret_dict, secret_name)
573+
for secret_name, secret_dict in self.secrets.items()
574+
]
575+
config["s3_secrets_config"] = secrets_items
576+
return config
577+
564578

565579
class SnowflakeConnectionConfig(ConnectionConfig):
566580
"""Configuration for the Snowflake connection.

sqlmesh/core/engine_adapter/duckdb.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from __future__ import annotations
22

3+
import logging
4+
import threading
5+
import time
36
import typing as t
47
from sqlglot import exp
58
from pathlib import Path
@@ -23,6 +26,8 @@
2326
from sqlmesh.core._typing import SchemaName, TableName
2427
from sqlmesh.core.engine_adapter._typing import DF
2528

29+
logger = logging.getLogger(__name__)
30+
2631

2732
@set_catalog(override_mapping={"_get_data_objects": CatalogSupport.REQUIRES_SET_CATALOG})
2833
class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin, RowDiffMixin):
@@ -38,6 +43,85 @@ class DuckDBEngineAdapter(LogicalMergeMixin, GetCurrentCatalogFromFunctionMixin,
3843
SUPPORTS_CREATE_DROP_CATALOG = True
3944
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA", "TABLE", "VIEW"]
4045

46+
# DuckDB STS AssumeRole tokens expire after 900s (15 min, the AWS minimum).
47+
# DuckDB's `refresh: auto` does not actually refresh credentials mid-session.
48+
# We proactively recreate secrets before expiry to avoid HTTP 400 errors.
49+
# See: https://github.com/duckdb/duckdb-httpfs/pull/165
50+
_S3_SECRET_REFRESH_INTERVAL_S = 720 # 12 minutes (80% of 900s TTL)
51+
52+
def __init__(self, *args: t.Any, **kwargs: t.Any):
53+
super().__init__(*args, **kwargs)
54+
# s3_secrets_config arrives via _extra_engine_config from DuckDBConnectionConfig
55+
# and is stored in self._extra_config by the base EngineAdapter.__init__.
56+
# It propagates through with_settings() automatically via _extra_config.
57+
self._s3_secrets_config: t.Optional[t.List[t.Tuple[t.Dict[str, t.Any], str]]] = (
58+
self._extra_config.get("s3_secrets_config")
59+
)
60+
self._s3_secret_refresh_lock = threading.Lock()
61+
self._last_secret_refresh_ts = time.monotonic()
62+
63+
def _refresh_s3_secrets(self) -> None:
64+
"""Drop and recreate all S3 secrets to obtain fresh STS credentials.
65+
66+
DuckDB's httpfs extension snapshots STS credentials at secret creation time
67+
and never re-queries the provider chain. When using AssumeRole with the AWS
68+
minimum session duration (900s / 15 min), credentials expire mid-run for any
69+
plan that takes longer than 15 minutes, causing HTTP 400 errors from S3.
70+
71+
This method forces fresh AssumeRole calls by dropping existing S3 secrets
72+
and recreating them from the original config.
73+
"""
74+
cursor = self.cursor
75+
76+
# Drop all existing S3-type secrets by querying their names from DuckDB
77+
try:
78+
cursor.execute(
79+
"SELECT name FROM duckdb_secrets() WHERE type = 's3'"
80+
)
81+
existing_secrets = [row[0] for row in cursor.fetchall()]
82+
except Exception:
83+
existing_secrets = []
84+
85+
for secret_name in existing_secrets:
86+
try:
87+
cursor.execute(f"DROP SECRET IF EXISTS {secret_name}")
88+
except Exception as e:
89+
logger.warning("Failed to drop secret '%s' during refresh: %s", secret_name, e)
90+
91+
# Recreate from config
92+
for secret_dict, secret_name in self._s3_secrets_config: # type: ignore
93+
secret_settings = [f"{field} '{setting}'" for field, setting in secret_dict.items()]
94+
if secret_settings:
95+
secret_clause = ", ".join(secret_settings)
96+
try:
97+
cursor.execute(
98+
f"CREATE OR REPLACE SECRET {secret_name} ({secret_clause});"
99+
)
100+
except Exception as e:
101+
logger.error("Failed to recreate secret during refresh: %s", e)
102+
raise
103+
104+
logger.info("Refreshed DuckDB S3 secrets (STS credential rotation)")
105+
106+
def _maybe_refresh_s3_secrets(self) -> None:
107+
"""Refresh S3 secrets if approaching the STS token expiry threshold."""
108+
if not self._s3_secrets_config:
109+
return
110+
if time.monotonic() - self._last_secret_refresh_ts < self._S3_SECRET_REFRESH_INTERVAL_S:
111+
return
112+
113+
with self._s3_secret_refresh_lock:
114+
# Double-check after acquiring lock — another thread may have refreshed
115+
if time.monotonic() - self._last_secret_refresh_ts < self._S3_SECRET_REFRESH_INTERVAL_S:
116+
return
117+
118+
self._refresh_s3_secrets()
119+
self._last_secret_refresh_ts = time.monotonic()
120+
121+
def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None:
122+
self._maybe_refresh_s3_secrets()
123+
super()._execute(sql, track_rows_processed, **kwargs)
124+
41125
@property
42126
def catalog_support(self) -> CatalogSupport:
43127
return CatalogSupport.FULL_SUPPORT

0 commit comments

Comments
 (0)