Skip to content

Commit d5cc53a

Browse files
authored
Feat: Add Postgres Support (built-in, Airflow, dbt) (#677)
* add postgres support * remove single line returns * cleanup * fix type * fix test * fix primary key * update test
1 parent f437a16 commit d5cc53a

21 files changed

Lines changed: 584 additions & 228 deletions

File tree

docs/integrations/engines.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,43 @@ sqlmesh_airflow = SQLMeshAirflow(
142142
## DuckDB - Airflow
143143
DuckDB only works when running locally; therefore it does not support Airflow.
144144

145+
# Postgres
146+
## Postgres - Local/Built-in Scheduler
147+
| Option | Description | Type | Required |
148+
|-------------------|--------------------------------------------------------------------------------------------------------------------------------------|:------:|:--------:|
149+
| `host` | The hostname of the Postgres server | string | Y |
150+
| `user` | The username to use for authentication with the Postgres server | string | Y |
151+
| `password` | The password to use for authentication with the Postgres server | string | Y |
152+
| `port` | The port number of the Postgres server | int | Y |
153+
| `database` | The name of the database instance to connect to | string | Y |
154+
| `keepalives_idle` | The number of seconds between each keepalive packet sent to the server. If set to 0, the system default will be used. (Default: `0`) | int | N |
155+
| `connect_timeout` | The number of seconds to wait for the connection to the server. (Default: `10`) | int | N |
156+
| `role` | The role to use for authentication with the Postgres server | string | N |
157+
| `sslmode` | The security of the connection to the Postgres server. | string | N |
158+
159+
## Postgres - Airflow
160+
**Engine Name:** `postgres`
161+
162+
The SQLMesh Postgres Operator is similar to the [PostgresOperator](https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_api/airflow/providers/postgres/operators/postgres/index.html), and relies on the same [PostgresHook](https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_api/airflow/providers/postgres/hooks/postgres/index.html) implementation.
163+
164+
To enable support for this operator, the Airflow Postgres provider package should be installed on the target Airflow cluster along with SQLMesh with the Postgres extra:
165+
```
166+
pip install "apache-airflow-providers-postgres"
167+
pip install "sqlmesh[postgres]"
168+
```
169+
170+
The operator requires an [Airflow connection](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html) to determine the target Postgres account. Refer to [Postgres connection](https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/connections/postgres.html) for more details.
171+
172+
By default, the connection ID is set to `postgres_default`, but can be overridden using the `engine_operator_args` parameter to the `SQLMeshAirflow` instance as in the example below:
173+
```python linenums="1"
174+
sqlmesh_airflow = SQLMeshAirflow(
175+
"postgres",
176+
engine_operator_args={
177+
"postgres_conn_id": "<Connection ID>"
178+
},
179+
)
180+
```
181+
145182
# Redshift
146183
## Redshift - Local/Built-in Scheduler
147184
| Option | Description | Type | Required |

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ ignore_missing_imports = True
6363
[mypy-fsspec]
6464
ignore_missing_imports = True
6565

66+
[mypy-psycopg2.*]
67+
ignore_missing_imports = True
68+
6669
[autoflake]
6770
in-place = True
6871
expand-star-imports = True

setup.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@
4747
"fsspec",
4848
],
4949
extras_require={
50+
"bigquery": [
51+
"google-cloud-bigquery[pandas]",
52+
],
53+
"databricks": [
54+
"databricks-sql-connector",
55+
"databricks-cli",
56+
],
5057
"dev": [
5158
f"apache-airflow=={os.environ.get('AIRFLOW_VERSION', '2.3.3')}",
5259
"autoflake==1.7.7",
@@ -76,27 +83,23 @@
7683
"types-pytz",
7784
"types-requests==2.28.8",
7885
],
79-
"web": [
80-
"fastapi==0.95.0",
81-
"hyperscript==0.0.1",
82-
"pyarrow==11.0.0",
83-
"uvicorn==0.21.1",
84-
],
85-
"snowflake": [
86-
"snowflake-connector-python[pandas]",
87-
],
88-
"bigquery": [
89-
"google-cloud-bigquery[pandas]",
86+
"dbt": [
87+
"dbt-core",
9088
],
91-
"databricks": [
92-
"databricks-sql-connector",
93-
"databricks-cli",
89+
"postgres": [
90+
"psycopg2",
9491
],
9592
"redshift": [
9693
"redshift_connector",
9794
],
98-
"dbt": [
99-
"dbt-core",
95+
"snowflake": [
96+
"snowflake-connector-python[pandas]",
97+
],
98+
"web": [
99+
"fastapi==0.95.0",
100+
"hyperscript==0.0.1",
101+
"pyarrow==11.0.0",
102+
"uvicorn==0.21.1",
100103
],
101104
},
102105
classifiers=[

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
DatabricksSparkSessionConnectionConfig,
77
DatabricksSQLConnectionConfig,
88
DuckDBConnectionConfig,
9+
PostgresConnectionConfig,
910
RedshiftConnectionConfig,
1011
SnowflakeConnectionConfig,
1112
)

sqlmesh/core/config/connection.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -516,15 +516,56 @@ def _connection_factory(self) -> t.Callable:
516516
return connect
517517

518518

519+
class PostgresConnectionConfig(_ConnectionConfig):
520+
host: str
521+
user: str
522+
password: str
523+
port: int
524+
database: str
525+
keepalives_idle: int = 0
526+
connect_timeout: int = 10
527+
role: t.Optional[str] = None
528+
sslmode: t.Optional[str] = None
529+
530+
concurrent_tasks: int = 4
531+
532+
type_: Literal["postgres"] = Field(alias="type", default="postgres")
533+
534+
@property
535+
def _connection_kwargs_keys(self) -> t.Set[str]:
536+
return {
537+
"host",
538+
"user",
539+
"password",
540+
"port",
541+
"database",
542+
"keepalives_idle",
543+
"connect_timeout",
544+
"role",
545+
"sslmode",
546+
}
547+
548+
@property
549+
def _engine_adapter(self) -> t.Type[EngineAdapter]:
550+
return engine_adapter.PostgresEngineAdapter
551+
552+
@property
553+
def _connection_factory(self) -> t.Callable:
554+
from psycopg2 import connect
555+
556+
return connect
557+
558+
519559
ConnectionConfig = Annotated[
520560
t.Union[
521-
DuckDBConnectionConfig,
522-
SnowflakeConnectionConfig,
561+
BigQueryConnectionConfig,
523562
DatabricksSQLConnectionConfig,
524563
DatabricksSparkSessionConnectionConfig,
525564
DatabricksConnectionConfig,
526-
BigQueryConnectionConfig,
565+
DuckDBConnectionConfig,
566+
PostgresConnectionConfig,
527567
RedshiftConnectionConfig,
568+
SnowflakeConnectionConfig,
528569
],
529570
Field(discriminator="type_"),
530571
]

sqlmesh/core/engine_adapter/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
"mssql": EngineAdapterWithIndexSupport,
2828
}
2929

30+
DIALECT_ALIASES = {
31+
"postgresql": "postgres",
32+
}
33+
3034

3135
def create_engine_adapter(
3236
connection_factory: t.Callable[[], t.Any], dialect: str, multithreaded: bool = False
3337
) -> EngineAdapter:
3438
dialect = dialect.lower()
35-
if dialect == "postgresql":
36-
dialect = "postgres"
39+
dialect = DIALECT_ALIASES.get(dialect, dialect)
3740
if dialect == "databricks":
3841
try:
3942
from pyspark.sql import SparkSession

0 commit comments

Comments
 (0)