Skip to content

Commit a506eb3

Browse files
authored
feat(migrations): support specifying schema for migrations (#471)
## Summary Add `MigrationConfig.default_schema` and `version_table_schema` so a migration run can target a specific schema/dataset, plus the per-adapter driver hooks that apply, validate, and log that selection. Includes related cleanup found during review: shared identifier-quoting helpers, Oracle literal-case contract, and a comprehensive docs matrix. ## Feature: migration default schema - `MigrationConfig.default_schema` — sets the session schema for the migration run (per-dialect: `SET search_path` / `ALTER SESSION SET CURRENT_SCHEMA` / `SET schema` / default_dataset). - `MigrationConfig.version_table_schema` — controls where the tracker lives; defaults to `default_schema` when present. - Fail-fast validation via `driver.has_schema()` before any DDL runs; misconfigured runs raise `MigrationError` with a clear message. - Structured logging: every transition emits a `migration.schema.*` event (`applied`, `noop`, `reset`) through `log_with_context`, with the resolved driver class in the payload. ### Adapter support matrix | Adapter | `supports_migration_schemas` | Hook | | --- | --- | --- | | asyncpg, psycopg, psqlpy | yes | `SET search_path TO "<schema>"` | | cockroach_asyncpg, cockroach_psycopg | yes | inherits PG behavior | | adbc (postgres dialect) | yes (derived from dialect) | `SET search_path TO "<schema>"` | | adbc (sqlite/duckdb/snowflake/...) | no | structured noop | | oracledb | yes | `ALTER SESSION SET CURRENT_SCHEMA = "<schema>"` (literal case) | | duckdb | yes | `SET search_path = "<schema>"` (identifier form) | | bigquery | yes | `default_dataset` on the client | | spanner | yes | per-session default schema | | sqlite, aiosqlite, asyncmy, aiomysql, mysqlconnector, pymysql, mssql_python, arrow_odbc | no | structured noop | See `docs/usage/migrations.rst` for the per-adapter contract (literal-case requirements, opt-in flags, expected SQL surface). ## Refactor: identifier-quoting helpers Consolidated seven adapter-local duplicates into `sqlspec/utils/text.py`: - `quote_identifier(value)` — ANSI form `"..."` with `""`-escaping. Used by sqlite, aiosqlite, postgres family (asyncpg, psycopg, psqlpy, adbc), duckdb, oracle. - `quote_backtick_identifier(value)` — MySQL family form `` `...` `` with `` `` ``-escaping. Used by asyncmy, aiomysql, mysqlconnector, pymysql. Removed duplicates: `_quote_sqlite_identifier`, `_quote_identifier` (psqlpy), `_quote_mysql_identifier` (×4), `_quote_duckdb_search_path`. Both helpers added to the mypyc include list. ## Bug fix: Oracle case-folding Oracle migration hooks previously called `.upper()` and bound `UPPER(:schema_name)`, which broke mixed-case Oracle users (`CREATE USER "MixedCase"` was unreachable). Now matches the PG/DuckDB contract: the schema value is interpolated literally and bound verbatim. Callers pass the stored identifier (typically uppercase for unquoted creates, exact case for quoted creates) — documented in `migrations.rst`. ## Other adjustments - BigQuery: shared `SyncMigrationTracker` instead of an adapter-specific tracker; SQLGlot DDL rendering patched so tracker SQL emits `PRIMARY KEY NOT ENFORCED` and orders `DEFAULT` before `NOT NULL`. Integration tests for the full migration workflow are gated behind native BigQuery (emulator rejects column `DEFAULT` values in tracker DDL). - ADBC: `supports_migration_schemas` is now a `@property` deriving from dialect, so a single config class lights up only when targeting a supported dialect. - Tracker: handles dotted-form `version_table_name` correctly (strips embedded `schema.table` prefixes so the qualified version_table cannot be double-prefixed). - Removed dead `_get_existing_columns_sql` from `oracledb/migrations.py` (replaced by `data_dictionary.get_columns(..., schema=...)` earlier in this PR). ## Docs - `docs/usage/migrations.rst` — comprehensive adapter matrix, per-dialect contract, literal-case guidance. - `docs/examples/patterns/migrations_with_schema.py` — runnable example. ## Tests - Unit: per-adapter migration-schema tests (postgres family, oracle, duckdb, adbc), noop assertions for unsupported adapters, tracker idempotency, command/runner validation, `quote_identifier` escape behavior. - Integration: full migration workflow against asyncpg, psycopg, psqlpy, oracledb, duckdb, adbc; data-dictionary coverage tests for psycopg/psqlpy/asyncpg; Spanner session-defaults test. ## Follow-up Identifier handling for non-migration sites (data-dictionary metadata queries, per-dialect case-folding normalizers) is intentionally out of scope here and tracked separately in #477, #478, #479.
1 parent 7baad34 commit a506eb3

51 files changed

Lines changed: 3008 additions & 123 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from pathlib import Path
2+
3+
__all__ = ("test_migrations_with_schema",)
4+
5+
6+
def test_migrations_with_schema(tmp_path: Path) -> None:
7+
# start-example
8+
from sqlspec.adapters.duckdb import DuckDBConfig
9+
from sqlspec.migrations.commands import SyncMigrationCommands
10+
11+
migration_dir = tmp_path / "migrations"
12+
db_path = tmp_path / "app.duckdb"
13+
14+
config = DuckDBConfig(
15+
connection_config={"database": str(db_path)},
16+
migration_config={
17+
"script_location": str(migration_dir),
18+
"version_table_name": "schema_versions",
19+
"default_schema": "app_schema",
20+
"version_table_schema": "admin_schema",
21+
},
22+
)
23+
24+
try:
25+
with config.provide_session() as session:
26+
session.execute("CREATE SCHEMA app_schema")
27+
session.execute("CREATE SCHEMA admin_schema")
28+
29+
commands = SyncMigrationCommands(config)
30+
commands.init(str(migration_dir), package=True)
31+
32+
(migration_dir / "0001_create_users.py").write_text(
33+
'''"""Create users."""
34+
35+
36+
def up():
37+
"""Create an unqualified table in app_schema."""
38+
return ["CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR NOT NULL)"]
39+
40+
41+
def down():
42+
"""Drop the unqualified table from app_schema."""
43+
return ["DROP TABLE IF EXISTS users"]
44+
'''
45+
)
46+
47+
commands.upgrade()
48+
49+
with config.provide_session() as session:
50+
users_table = session.select_value(
51+
"""
52+
SELECT table_name
53+
FROM information_schema.tables
54+
WHERE table_schema = ? AND table_name = ?
55+
""",
56+
("app_schema", "users"),
57+
)
58+
tracker_table = session.select_value(
59+
"""
60+
SELECT table_name
61+
FROM information_schema.tables
62+
WHERE table_schema = ? AND table_name = ?
63+
""",
64+
("admin_schema", "schema_versions"),
65+
)
66+
67+
assert users_table == "users"
68+
assert tracker_table == "schema_versions"
69+
finally:
70+
if config.connection_instance:
71+
config.close_pool()
72+
# end-example

docs/usage/migrations.rst

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,119 @@ specific extension, ``migration_config["include_extensions"]`` to opt in
7474
explicitly by extension name, or ``migration_config["enabled"] = False`` to
7575
disable migrations entirely for a database config.
7676

77+
Configuring a Default Schema
78+
----------------------------
79+
80+
Use ``migration_config["default_schema"]`` when migration SQL should run
81+
against a pre-existing schema without qualifying every table in each migration
82+
file. SQLSpec validates the schema before creating the tracker table or applying
83+
DDL, then configures the migration session before each migration is executed.
84+
85+
Use ``migration_config["version_table_schema"]`` when the migration tracker
86+
table should live somewhere different from the objects managed by migrations.
87+
If ``version_table_schema`` is not set, the tracker schema resolves to
88+
``default_schema``. If neither field is set, the tracker table is unqualified and
89+
uses the adapter's normal default namespace.
90+
91+
.. code-block:: python
92+
93+
from sqlspec.adapters.asyncpg import AsyncpgConfig
94+
95+
config = AsyncpgConfig(
96+
connection_config={"dsn": "postgresql://localhost/app"},
97+
migration_config={
98+
"script_location": "migrations/postgres",
99+
"version_table_name": "schema_versions",
100+
"default_schema": "app_schema",
101+
"version_table_schema": "admin_schema",
102+
},
103+
)
104+
105+
The operator must create the target schema before running migrations. The
106+
migration role also needs the database-specific privileges to create objects
107+
there. For PostgreSQL, that usually means ``USAGE`` and
108+
``CREATE`` on the target schema, plus permission to create or update the
109+
tracker table.
110+
111+
Adapter support is opt-in via the ``supports_migration_schemas`` class flag on
112+
each config. Configuring ``default_schema`` against an adapter that does not
113+
opt in raises ``MigrationError`` before any DDL is issued.
114+
115+
.. list-table::
116+
:header-rows: 1
117+
:widths: 28 18 54
118+
119+
* - Adapter
120+
- Default schema
121+
- Mechanism
122+
* - ``asyncpg``
123+
- Supported
124+
- ``SET LOCAL search_path`` (transactional) / ``SET search_path`` + ``RESET`` (non-transactional);
125+
validates ``information_schema.schemata``.
126+
* - ``psycopg`` (sync and async)
127+
- Supported
128+
- Same as ``asyncpg``.
129+
* - ``psqlpy``
130+
- Supported
131+
- Same as ``asyncpg``.
132+
* - ``cockroach_asyncpg``
133+
- Supported
134+
- Inherits ``asyncpg`` behavior. CockroachDB exposes the PostgreSQL
135+
wire protocol and accepts ``SET search_path``.
136+
* - ``cockroach_psycopg`` (sync and async)
137+
- Supported
138+
- Inherits ``psycopg`` behavior.
139+
* - ``adbc`` (PostgreSQL dialect)
140+
- Supported
141+
- Same as ``asyncpg``. Detection is dialect-based on the configured
142+
ADBC URI; ``supports_migration_schemas`` becomes ``True`` only when
143+
the resolved dialect is PostgreSQL-compatible.
144+
* - ``oracledb`` (sync and async)
145+
- Supported
146+
- ``ALTER SESSION SET CURRENT_SCHEMA``; validates ``ALL_USERS``. The
147+
schema is matched verbatim, so callers must pass the literal stored
148+
identifier — typically uppercase for users created unquoted, the
149+
exact case for users created with quoted identifiers.
150+
* - ``duckdb``
151+
- Supported
152+
- ``SET search_path``; validates ``information_schema.schemata``.
153+
* - ``sqlite``, ``aiosqlite``
154+
- Not supported
155+
- SQLite has no schema namespace; use ``ATTACH DATABASE`` to layer
156+
additional databases instead.
157+
* - ``asyncmy``, ``aiomysql``, ``mysqlconnector``, ``pymysql``
158+
- Not supported
159+
- MySQL conflates schema and database. Select the target database in
160+
the connection URL or via ``USE`` inside the migration.
161+
* - ``adbc`` (non-PostgreSQL dialects, including SQL Server)
162+
- Not supported
163+
- ADBC does not expose a portable per-session schema setter for these
164+
dialects. Configure the default schema at the user or login level in
165+
the underlying database.
166+
* - ``mssql_python``
167+
- Not supported
168+
- SQL Server resolves the default schema from the login. Set it with
169+
``ALTER USER ... WITH DEFAULT_SCHEMA = ...`` in your database.
170+
* - ``bigquery``
171+
- Not supported
172+
- BigQuery requires fully qualified ``project.dataset.table`` references
173+
for cross-dataset DDL; there is no session-scoped default dataset.
174+
* - ``spanner``
175+
- Not supported
176+
- Cloud Spanner ties objects to a single schema per database; there is
177+
no session-scoped switch.
178+
* - ``arrow_odbc``
179+
- Not supported
180+
- ODBC connection-string semantics vary per driver. Configure the
181+
default schema through the underlying DSN.
182+
183+
Example with unqualified DDL:
184+
185+
.. literalinclude:: /examples/patterns/migrations_with_schema.py
186+
:language: python
187+
:start-after: # start-example
188+
:end-before: # end-example
189+
77190
Logging and Echo Controls
78191
-------------------------
79192

sqlspec/adapters/adbc/config.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,10 @@ def __init__(
224224
self._pgvector_available: bool | None = None
225225
self._paradedb_available: bool | None = None
226226

227+
dialect = resolve_dialect_from_config(self.connection_config)
228+
227229
if statement_config is None:
228-
statement_config = get_statement_config(resolve_dialect_from_config(self.connection_config))
230+
statement_config = get_statement_config(dialect)
229231

230232
statement_config, driver_features = apply_driver_features(statement_config, driver_features)
231233

@@ -241,6 +243,11 @@ def __init__(
241243
**kwargs,
242244
)
243245

246+
@property
247+
def supports_migration_schemas(self) -> bool: # type: ignore[override]
248+
"""Migration schema support is only available for PostgreSQL-backed ADBC drivers."""
249+
return is_postgres_dialect(resolve_dialect_from_config(self.connection_config))
250+
244251
def create_connection(self) -> AdbcConnection:
245252
"""Create and return a new connection using the specified driver.
246253

sqlspec/adapters/adbc/driver.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from sqlspec.utils.logging import get_logger
3232
from sqlspec.utils.module_loader import ensure_pyarrow
3333
from sqlspec.utils.serializers import to_json
34+
from sqlspec.utils.text import quote_identifier
3435

3536
if TYPE_CHECKING:
3637
from collections.abc import Callable
@@ -292,6 +293,35 @@ def rollback(self) -> None:
292293
msg = f"Failed to rollback transaction: {e}"
293294
raise SQLSpecError(msg) from e
294295

296+
def set_migration_session_schema(self, schema: str) -> None:
297+
"""Set the PostgreSQL search path for migration SQL when using ADBC PostgreSQL."""
298+
if not self._is_postgres:
299+
super().set_migration_session_schema(schema)
300+
return
301+
quoted_schema = quote_identifier(schema)
302+
with self.with_cursor(self.connection) as cursor:
303+
cursor.execute(f'SET search_path TO {quoted_schema}, "$user", public')
304+
305+
def set_migration_non_transactional_schema(self, schema: str) -> None:
306+
"""Set the PostgreSQL search path for non-transactional migration SQL."""
307+
self.set_migration_session_schema(schema)
308+
309+
def reset_migration_session_schema(self) -> None:
310+
"""Reset PostgreSQL search path after non-transactional migration SQL."""
311+
if not self._is_postgres:
312+
super().reset_migration_session_schema()
313+
return
314+
with self.with_cursor(self.connection) as cursor:
315+
cursor.execute("RESET search_path")
316+
317+
def has_schema(self, schema: str) -> bool:
318+
"""Return whether a PostgreSQL schema exists when using ADBC PostgreSQL."""
319+
if not self._is_postgres:
320+
return super().has_schema(schema)
321+
with self.with_cursor(self.connection) as cursor:
322+
cursor.execute("SELECT 1 FROM information_schema.schemata WHERE schema_name = $1", parameters=[schema])
323+
return cursor.fetchone() is not None
324+
295325
def with_cursor(self, connection: "AdbcConnection") -> "AdbcCursor":
296326
"""Create context manager for cursor.
297327

sqlspec/adapters/aiomysql/core.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
UniqueViolationError,
2222
)
2323
from sqlspec.utils.serializers import from_json, to_json
24+
from sqlspec.utils.text import quote_backtick_identifier
2425
from sqlspec.utils.type_converters import build_uuid_coercions
2526
from sqlspec.utils.type_guards import has_cursor_metadata, has_lastrowid, has_rowcount
2627

@@ -117,23 +118,18 @@ def _bool_to_int(value: bool) -> int:
117118
return int(value)
118119

119120

120-
def _quote_mysql_identifier(identifier: str) -> str:
121-
normalized = identifier.replace("`", "``")
122-
return f"`{normalized}`"
123-
124-
125121
def format_identifier(identifier: str) -> str:
126122
cleaned = identifier.strip()
127123
if not cleaned:
128124
msg = "Table name must not be empty"
129125
raise SQLSpecError(msg)
130126
parts = [part for part in cleaned.split(".") if part]
131-
formatted = ".".join(_quote_mysql_identifier(part) for part in parts)
132-
return formatted or _quote_mysql_identifier(cleaned)
127+
formatted = ".".join(quote_backtick_identifier(part) for part in parts)
128+
return formatted or quote_backtick_identifier(cleaned)
133129

134130

135131
def build_insert_statement(table: str, columns: "list[str]") -> str:
136-
column_clause = ", ".join(_quote_mysql_identifier(column) for column in columns)
132+
column_clause = ", ".join(quote_backtick_identifier(column) for column in columns)
137133
placeholders = ", ".join("%s" for _ in columns)
138134
return f"INSERT INTO {format_identifier(table)} ({column_clause}) VALUES ({placeholders})"
139135

sqlspec/adapters/aiosqlite/core.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
UniqueViolationError,
1919
)
2020
from sqlspec.utils.serializers import from_json, to_json
21+
from sqlspec.utils.text import quote_identifier
2122
from sqlspec.utils.type_converters import build_decimal_converter, build_time_iso_converter, build_uuid_coercions
2223
from sqlspec.utils.type_guards import has_rowcount, has_sqlite_error
2324

@@ -58,23 +59,18 @@ def _bool_to_int(value: bool) -> int:
5859
return int(value)
5960

6061

61-
def _quote_sqlite_identifier(identifier: str) -> str:
62-
normalized = identifier.replace('"', '""')
63-
return f'"{normalized}"'
64-
65-
6662
def format_identifier(identifier: str) -> str:
6763
cleaned = identifier.strip()
6864
if not cleaned:
6965
msg = "Table name must not be empty"
7066
raise SQLSpecError(msg)
7167
parts = [part for part in cleaned.split(".") if part]
72-
formatted = ".".join(_quote_sqlite_identifier(part) for part in parts)
73-
return formatted or _quote_sqlite_identifier(cleaned)
68+
formatted = ".".join(quote_identifier(part) for part in parts)
69+
return formatted or quote_identifier(cleaned)
7470

7571

7672
def build_insert_statement(table: str, columns: "list[str]") -> str:
77-
column_clause = ", ".join(_quote_sqlite_identifier(column) for column in columns)
73+
column_clause = ", ".join(quote_identifier(column) for column in columns)
7874
placeholders = ", ".join("?" for _ in columns)
7975
return f"INSERT INTO {format_identifier(table)} ({column_clause}) VALUES ({placeholders})"
8076

sqlspec/adapters/asyncmy/core.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
UniqueViolationError,
2222
)
2323
from sqlspec.utils.serializers import from_json, to_json
24+
from sqlspec.utils.text import quote_backtick_identifier
2425
from sqlspec.utils.type_converters import build_uuid_coercions
2526
from sqlspec.utils.type_guards import has_cursor_metadata, has_lastrowid, has_rowcount
2627

@@ -117,23 +118,18 @@ def _bool_to_int(value: bool) -> int:
117118
return int(value)
118119

119120

120-
def _quote_mysql_identifier(identifier: str) -> str:
121-
normalized = identifier.replace("`", "``")
122-
return f"`{normalized}`"
123-
124-
125121
def format_identifier(identifier: str) -> str:
126122
cleaned = identifier.strip()
127123
if not cleaned:
128124
msg = "Table name must not be empty"
129125
raise SQLSpecError(msg)
130126
parts = [part for part in cleaned.split(".") if part]
131-
formatted = ".".join(_quote_mysql_identifier(part) for part in parts)
132-
return formatted or _quote_mysql_identifier(cleaned)
127+
formatted = ".".join(quote_backtick_identifier(part) for part in parts)
128+
return formatted or quote_backtick_identifier(cleaned)
133129

134130

135131
def build_insert_statement(table: str, columns: "list[str]") -> str:
136-
column_clause = ", ".join(_quote_mysql_identifier(column) for column in columns)
132+
column_clause = ", ".join(quote_backtick_identifier(column) for column in columns)
137133
placeholders = ", ".join("%s" for _ in columns)
138134
return f"INSERT INTO {format_identifier(table)} ({column_clause}) VALUES ({placeholders})"
139135

sqlspec/adapters/asyncpg/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ class AsyncpgConfig(AsyncDatabaseConfig[AsyncpgConnection, "Pool[Record]", Async
260260
driver_type: "ClassVar[type[AsyncpgDriver]]" = AsyncpgDriver
261261
connection_type: "ClassVar[type[AsyncpgConnection]]" = type(AsyncpgConnection) # type: ignore[assignment]
262262
supports_transactional_ddl: "ClassVar[bool]" = True
263+
supports_migration_schemas: "ClassVar[bool]" = True
263264
supports_native_arrow_export: "ClassVar[bool]" = True
264265
supports_native_arrow_import: "ClassVar[bool]" = True
265266
supports_native_parquet_export: "ClassVar[bool]" = True

0 commit comments

Comments
 (0)