Skip to content

Commit 234ffc6

Browse files
authored
Feat: MySQL support (#1232)
1 parent ebad041 commit 234ffc6

24 files changed

Lines changed: 447 additions & 267 deletions

docs/integrations/engines.md

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
### BigQuery - Local/Built-in Scheduler
55
**Engine Adapter Type**: `bigquery`
66

7+
**Installation**:
8+
```
9+
pip install "sqlmesh[bigquery]"
10+
```
11+
712
| Option | Description | Type | Required |
813
|---------------------------------|----------------------------------------------------------------------------------------------------------------|:------:|:--------:|
914
| `method` | Connection methods. Can be `oath`, `oauth-secrets`, `service-account`, `service-account-json`. Default: `oath` | string | N |
@@ -71,6 +76,11 @@ sqlmesh_airflow = SQLMeshAirflow(
7176
### Databricks - Local/Built-in Scheduler
7277
**Engine Adapter Type**: `databricks`
7378

79+
**Installation**:
80+
```
81+
pip install "sqlmesh[databricks]"
82+
```
83+
7484
If you are always running SQLMesh commands directly on a Databricks Cluster (like in a Databricks Notebook) then the only relevant configuration is `catalog` and it is optional.
7585
The SparkSession provided by Databricks will be used to execute all SQLMesh commands.
7686

@@ -170,21 +180,44 @@ sqlmesh_airflow = SQLMeshAirflow(
170180
### DuckDB - Airflow
171181
DuckDB only works when running locally; therefore it does not support Airflow.
172182

183+
## MySQL
184+
### MySQL - Local/Built-in Scheduler
185+
**Engine Adapter Type**: `mysql`
186+
187+
**Installation**:
188+
```
189+
pip install "sqlmesh[mysql]"
190+
```
191+
192+
| Option | Description | Type | Required |
193+
|----------------|--------------------------------------------------------------|:------:|:--------:|
194+
| `host` | The hostname of the MysQL server | string | Y |
195+
| `user` | The username to use for authentication with the MySQL server | string | Y |
196+
| `password` | The password to use for authentication with the MySQL server | string | Y |
197+
| `port` | The port number of the MySQL server | int | Y |
198+
| `charset` | The character set used for the connection | string | N |
199+
| `ssl_disabled` | Is SSL disabled | bool | N |
200+
173201
## Postgres
174202
### Postgres - Local/Built-in Scheduler
175203
**Engine Adapter Type**: `postgres`
176204

177-
| Option | Description | Type | Required |
205+
**Installation**:
206+
```
207+
pip install "sqlmesh[postgres]"
208+
```
209+
210+
| Option | Description | Type | Required |
178211
|-------------------|---------------------------------------------------------------------------------|:------:|:--------:|
179-
| `host` | The hostname of the Postgres server | string | Y |
180-
| `user` | The username to use for authentication with the Postgres server | string | Y |
181-
| `password` | The password to use for authentication with the Postgres server | string | Y |
182-
| `port` | The port number of the Postgres server | int | Y |
183-
| `database` | The name of the database instance to connect to | string | Y |
184-
| `keepalives_idle` | The number of seconds between each keepalive packet sent to the server. | int | N |
185-
| `connect_timeout` | The number of seconds to wait for the connection to the server. (Default: `10`) | int | N |
186-
| `role` | The role to use for authentication with the Postgres server | string | N |
187-
| `sslmode` | The security of the connection to the Postgres server. | string | N |
212+
| `host` | The hostname of the Postgres server | string | Y |
213+
| `user` | The username to use for authentication with the Postgres server | string | Y |
214+
| `password` | The password to use for authentication with the Postgres server | string | Y |
215+
| `port` | The port number of the Postgres server | int | Y |
216+
| `database` | The name of the database instance to connect to | string | Y |
217+
| `keepalives_idle` | The number of seconds between each keepalive packet sent to the server. | int | N |
218+
| `connect_timeout` | The number of seconds to wait for the connection to the server. (Default: `10`) | int | N |
219+
| `role` | The role to use for authentication with the Postgres server | string | N |
220+
| `sslmode` | The security of the connection to the Postgres server | string | N |
188221

189222
### Postgres - Airflow
190223
**Engine Name:** `postgres`
@@ -213,6 +246,11 @@ sqlmesh_airflow = SQLMeshAirflow(
213246
### Redshift - Local/Built-in Scheduler
214247
**Engine Adapter Type**: `Redshift`
215248

249+
**Installation**:
250+
```
251+
pip install "sqlmesh[redshift]"
252+
```
253+
216254
| Option | Description | Type | Required |
217255
|-------------------------|-------------------------------------------------------------------------------------------------------------|:------:|:--------:|
218256
| `user` | The username to use for authentication with the Amazon Redshift cluster | string | N |
@@ -262,6 +300,11 @@ sqlmesh_airflow = SQLMeshAirflow(
262300
### GCP Postgres - Local/Built-in Scheduler
263301
**Engine Adapter Type**: `postgres`
264302

303+
**Installation**:
304+
```
305+
pip install "sqlmesh[gcppostgres]"
306+
```
307+
265308
| Option | Description | Type | Required |
266309
|---------------------------|-------------------------------------------------------------------------------------|:-------:|:--------:|
267310
| `instance_connection_str` | Connection name for the postgres instance | string | Y |
@@ -274,6 +317,11 @@ sqlmesh_airflow = SQLMeshAirflow(
274317
### Snowflake - Local/Built-in Scheduler
275318
**Engine Adapter Type**: `snowflake`
276319

320+
**Installation**:
321+
```
322+
pip install "sqlmesh[snowflake]"
323+
```
324+
277325
| Option | Description | Type | Required |
278326
|-----------------|------------------------------------|:------:|:--------:|
279327
| `user` | The Snowflake username | string | N |

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ ignore_missing_imports = True
6666
[mypy-fsspec]
6767
ignore_missing_imports = True
6868

69+
[mypy-mysql.*]
70+
ignore_missing_imports = True
71+
6972
[mypy-psycopg2.*]
7073
ignore_missing_imports = True
7174

setup.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
"requests",
4747
"rich",
4848
"ruamel.yaml",
49-
"sqlglot~=17.8.0",
49+
"sqlglot~=17.8.4",
5050
],
5151
extras_require={
5252
"bigquery": [
@@ -104,6 +104,9 @@
104104
"langchain",
105105
"openai",
106106
],
107+
"mysql": [
108+
"mysql-connector-python",
109+
],
107110
"postgres": [
108111
"psycopg2",
109112
],

sqlmesh/core/audit/builtin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
SELECT
3030
@EACH(
3131
@columns,
32-
c -> row_number() OVER (PARTITION BY c ORDER BY 1) AS rank_@c
32+
c -> row_number() OVER (PARTITION BY c ORDER BY c) AS rank_@c
3333
)
3434
FROM @this_model
3535
)

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
DatabricksConnectionConfig,
66
DuckDBConnectionConfig,
77
GCPPostgresConnectionConfig,
8+
MySQLConnectionConfig,
89
PostgresConnectionConfig,
910
RedshiftConnectionConfig,
1011
SnowflakeConnectionConfig,

sqlmesh/core/config/connection.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]:
5151
"""kwargs that are for execution config only"""
5252
return {}
5353

54+
@property
55+
def _cursor_kwargs(self) -> t.Optional[t.Dict[str, t.Any]]:
56+
"""Key-value arguments that will be passed during cursor construction."""
57+
return None
58+
5459
def create_engine_adapter(self) -> EngineAdapter:
5560
"""Returns a new instance of the Engine Adapter."""
5661
return self._engine_adapter(
@@ -61,6 +66,7 @@ def create_engine_adapter(self) -> EngineAdapter:
6166
}
6267
),
6368
multithreaded=self.concurrent_tasks > 1,
69+
cursor_kwargs=self._cursor_kwargs,
6470
**self._extra_engine_config,
6571
)
6672

@@ -621,6 +627,48 @@ def _connection_factory(self) -> t.Callable:
621627
return connect
622628

623629

630+
class MySQLConnectionConfig(_ConnectionConfig):
631+
host: str
632+
user: str
633+
password: str
634+
port: t.Optional[int] = None
635+
charset: t.Optional[str] = None
636+
ssl_disabled: t.Optional[bool] = None
637+
638+
concurrent_tasks: int = 4
639+
640+
type_: Literal["mysql"] = Field(alias="type", default="mysql")
641+
642+
_cursor_kwargs = {"buffered": True}
643+
644+
@property
645+
def _connection_kwargs_keys(self) -> t.Set[str]:
646+
connection_keys = {
647+
"host",
648+
"user",
649+
"password",
650+
"port",
651+
"database",
652+
}
653+
if self.port is not None:
654+
connection_keys.add("port")
655+
if self.charset is not None:
656+
connection_keys.add("charset")
657+
if self.ssl_disabled is not None:
658+
connection_keys.add("ssl_disabled")
659+
return connection_keys
660+
661+
@property
662+
def _engine_adapter(self) -> t.Type[EngineAdapter]:
663+
return engine_adapter.MySQLEngineAdapter
664+
665+
@property
666+
def _connection_factory(self) -> t.Callable:
667+
from mysql.connector import connect
668+
669+
return connect
670+
671+
624672
class SparkConnectionConfig(_ConnectionConfig):
625673
"""
626674
Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks.
@@ -675,6 +723,7 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
675723
GCPPostgresConnectionConfig,
676724
DatabricksConnectionConfig,
677725
DuckDBConnectionConfig,
726+
MySQLConnectionConfig,
678727
PostgresConnectionConfig,
679728
RedshiftConnectionConfig,
680729
SnowflakeConnectionConfig,

sqlmesh/core/engine_adapter/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from sqlmesh.core.engine_adapter.bigquery import BigQueryEngineAdapter
1010
from sqlmesh.core.engine_adapter.databricks import DatabricksEngineAdapter
1111
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
12+
from sqlmesh.core.engine_adapter.mysql import MySQLEngineAdapter
1213
from sqlmesh.core.engine_adapter.postgres import PostgresEngineAdapter
1314
from sqlmesh.core.engine_adapter.redshift import RedshiftEngineAdapter
1415
from sqlmesh.core.engine_adapter.shared import TransactionType
@@ -23,7 +24,7 @@
2324
"databricks": DatabricksEngineAdapter,
2425
"redshift": RedshiftEngineAdapter,
2526
"postgres": PostgresEngineAdapter,
26-
"mysql": EngineAdapterWithIndexSupport,
27+
"mysql": MySQLEngineAdapter,
2728
"mssql": EngineAdapterWithIndexSupport,
2829
}
2930

sqlmesh/core/engine_adapter/base.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,13 @@ def __init__(
9595
dialect: str = "",
9696
sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None,
9797
multithreaded: bool = False,
98+
cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
9899
**kwargs: t.Any,
99100
):
100101
self.dialect = dialect.lower() or self.DIALECT
101-
self._connection_pool = create_connection_pool(connection_factory, multithreaded)
102+
self._connection_pool = create_connection_pool(
103+
connection_factory, multithreaded, cursor_kwargs=cursor_kwargs
104+
)
102105
self.sql_gen_kwargs = sql_gen_kwargs or {}
103106
self._extra_config = kwargs
104107

@@ -489,7 +492,8 @@ def columns(
489492
self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
490493
describe_output = self.cursor.fetchall()
491494
return {
492-
column_name: exp.DataType.build(column_type, dialect=self.dialect)
495+
# Note: MySQL returns the column type as bytes.
496+
column_name: exp.DataType.build(_decoded_str(column_type), dialect=self.dialect)
493497
for column_name, column_type, *_ in itertools.takewhile(
494498
lambda t: not t[0].startswith("#"),
495499
describe_output,
@@ -1005,3 +1009,9 @@ def _add_where_to_query(self, query: Query, where: t.Optional[exp.Expression]) -
10051009

10061010
class EngineAdapterWithIndexSupport(EngineAdapter):
10071011
SUPPORTS_INDEXES = True
1012+
1013+
1014+
def _decoded_str(value: t.Union[str, bytes]) -> str:
1015+
if isinstance(value, bytes):
1016+
return value.decode("utf-8")
1017+
return value

sqlmesh/core/engine_adapter/base_postgres.py

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from sqlglot import exp
66

7-
from sqlmesh.core.engine_adapter.base import EngineAdapter
7+
from sqlmesh.core.engine_adapter.mixins import CommitOnExecuteMixin
88
from sqlmesh.core.engine_adapter.shared import (
99
DataObject,
1010
DataObjectType,
@@ -17,7 +17,7 @@
1717
from sqlmesh.core.engine_adapter._typing import QueryOrDF
1818

1919

20-
class BasePostgresEngineAdapter(EngineAdapter):
20+
class BasePostgresEngineAdapter(CommitOnExecuteMixin):
2121
COLUMNS_TABLE = "information_schema.columns"
2222
SUPPORTS_MATERIALIZED_VIEWS = True
2323

@@ -96,28 +96,6 @@ def create_view(
9696
**create_kwargs,
9797
)
9898

99-
def execute(
100-
self,
101-
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
102-
ignore_unsupported_errors: bool = False,
103-
quote_identifiers: bool = True,
104-
**kwargs: t.Any,
105-
) -> None:
106-
"""
107-
To make sure that inserts and updates take effect we need to commit explicitly unless the
108-
statement is executed as part of an active transaction.
109-
110-
Reference: https://www.psycopg.org/psycopg3/docs/basic/transactions.html
111-
"""
112-
super().execute(
113-
expressions,
114-
ignore_unsupported_errors=ignore_unsupported_errors,
115-
quote_identifiers=quote_identifiers,
116-
**kwargs,
117-
)
118-
if not self._connection_pool.is_transaction_active:
119-
self._connection_pool.commit()
120-
12199
def _get_data_objects(
122100
self, schema_name: str, catalog_name: t.Optional[str] = None
123101
) -> t.List[DataObject]:

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@ def __init__(
3333
dialect: str = "",
3434
sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None,
3535
multithreaded: bool = False,
36+
cursor_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
3637
**kwargs: t.Any,
3738
):
3839
super().__init__(
3940
connection_factory,
4041
dialect,
4142
sql_gen_kwargs,
4243
multithreaded,
44+
cursor_kwargs,
4345
**kwargs,
4446
)
4547
self._spark: t.Optional[PySparkSession] = None

0 commit comments

Comments
 (0)