Skip to content

Commit 56c1c28

Browse files
committed
Replace deprecated PostgresOperator
1 parent ada5378 commit 56c1c28

9 files changed

Lines changed: 20 additions & 20 deletions

flowetl/flowetl/flowetl/mixins/fixed_sql_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ def fixed_sql_operator(*, class_name: str, sql: str, is_sensor: bool = False) ->
3434
if is_sensor:
3535
from airflow.sensors.sql import SqlSensor as op_base
3636
else:
37-
from airflow.providers.postgres.operators.postgres import (
38-
PostgresOperator as op_base,
37+
from airflow.providers.common.sql.operators.sql import (
38+
SQLExecuteQueryOperator as op_base,
3939
)
4040

4141
return type(

flowetl/flowetl/flowetl/mixins/fixed_sql_with_params_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ def fixed_sql_operator_with_params(
4343
if is_sensor:
4444
from airflow.sensors.sql import SqlSensor as op_base
4545
else:
46-
from airflow.providers.postgres.operators.postgres import (
47-
PostgresOperator as op_base,
46+
from airflow.providers.common.sql.operators.sql import (
47+
SQLExecuteQueryOperator as op_base,
4848
)
4949

5050
return type(

flowetl/flowetl/flowetl/mixins/wrapping_sql_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ def wrapped_sql_operator(*, class_name: str, sql: str) -> Type:
3636
3737
"""
3838
from flowetl.mixins.table_name_macros_mixin import TableNameMacrosMixin
39-
from airflow.providers.postgres.operators.postgres import PostgresOperator
39+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
4040

4141
return type(
4242
class_name,
43-
(TableNameMacrosMixin, WrappingSQLMixin, PostgresOperator),
43+
(TableNameMacrosMixin, WrappingSQLMixin, SQLExecuteQueryOperator),
4444
dict(
4545
wrapper_sql=sql,
4646
),

flowetl/flowetl/flowetl/operators/analyze_operator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# This Source Code Form is subject to the terms of the Mozilla Public
22
# License, v. 2.0. If a copy of the MPL was not distributed with this
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
4-
from airflow.providers.postgres.operators.postgres import PostgresOperator
4+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
55
from flowetl.mixins.table_name_macros_mixin import TableNameMacrosMixin
66

77

8-
class AnalyzeOperator(TableNameMacrosMixin, PostgresOperator):
8+
class AnalyzeOperator(TableNameMacrosMixin, SQLExecuteQueryOperator):
99
"""
1010
The analyze operator triggers the postgres analyze command on a table.
1111
@@ -14,7 +14,7 @@ class AnalyzeOperator(TableNameMacrosMixin, PostgresOperator):
1414
target : str
1515
jinja templated schema qualified table name.
1616
kwargs : dict
17-
Passed to airflow.operators.postgres_operator.PostgresOperator
17+
Passed to airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator
1818
"""
1919

2020
def __init__(self, *, target: str, **kwargs) -> None:

flowetl/flowetl/flowetl/operators/create_foreign_staging_table_operator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
from typing import Dict, Optional
66

7-
from airflow.providers.postgres.operators.postgres import PostgresOperator
7+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
88
from flowetl.mixins.table_name_macros_mixin import TableNameMacrosMixin
99

1010

11-
class CreateForeignStagingTableOperator(TableNameMacrosMixin, PostgresOperator):
11+
class CreateForeignStagingTableOperator(TableNameMacrosMixin, SQLExecuteQueryOperator):
1212
def __init__(
1313
self,
1414
*,
@@ -49,7 +49,7 @@ def __init__(
4949
encoding : str or None, default None
5050
String giving encoding type. Uses system locale by default.
5151
kwargs : dict
52-
Passed to airflow.operators.postgres_operator.PostgresOperator
52+
Passed to airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator
5353
"""
5454
# Using an f-string here because filename needs to be templated, which is won't be if it is a param
5555
sql = f"""

flowetl/mounts/dags/test_dag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
catchup=True,
1111
) as dag:
1212
from airflow.operators.bash import BashOperator
13-
from airflow.providers.postgres.operators.postgres import PostgresOperator
13+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
1414

1515
flowetl_test_op = BashOperator(
1616
task_id="flowetl_install_test_op", bash_command="date"
1717
)
18-
flowdb_test_op = PostgresOperator(
18+
flowdb_test_op = SQLExecuteQueryOperator(
1919
task_id="flowdb_connect_test_op",
2020
sql="SELECT * FROM geography.geo_kinds",
2121
postgres_conn_id="flowdb",

flowetl/tests/unit/test_fixed_sql_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44

55

66
def test_fixed_sql():
7-
from airflow.providers.postgres.operators.postgres import PostgresOperator
7+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
88
from airflow.sensors.sql import SqlSensor
99
from flowetl.mixins.table_name_macros_mixin import TableNameMacrosMixin
1010
from flowetl.mixins.fixed_sql_mixin import fixed_sql_operator
1111

1212
new_type = fixed_sql_operator(class_name="DUMMY_TYPE", sql="FIXED_SQL")
1313
new_instance = new_type(task_id="DUMMY")
1414
assert new_type.fixed_sql == "FIXED_SQL"
15-
assert isinstance(new_instance, PostgresOperator)
15+
assert isinstance(new_instance, SQLExecuteQueryOperator)
1616
assert isinstance(new_instance, TableNameMacrosMixin)
1717
assert type(new_instance).__name__ == "DUMMY_TYPE"
1818
new_instance.prepare_template()

flowetl/tests/unit/test_params_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
def test_fixed_sql_with_params():
7-
from airflow.providers.postgres.operators.postgres import PostgresOperator
7+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
88
from airflow.sensors.sql import SqlSensor
99

1010
from flowetl.mixins.fixed_sql_with_params_mixin import (
@@ -18,7 +18,7 @@ def test_fixed_sql_with_params():
1818
new_instance = new_type(task_id="DUMMY", DUMMY_PARAM="DUMMY_PARAM_VALUE")
1919
assert new_type.fixed_sql == "FIXED_SQL"
2020
assert new_type.named_params == ["DUMMY_PARAM"]
21-
assert isinstance(new_instance, PostgresOperator)
21+
assert isinstance(new_instance, SQLExecuteQueryOperator)
2222
assert isinstance(new_instance, TableNameMacrosMixin)
2323
assert type(new_instance).__name__ == "DUMMY_TYPE"
2424
assert {k: v.value for k, v in new_instance.params.items()} == dict(

flowetl/tests/unit/test_wrapping_sql_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44

55

66
def test_wrapped_sql(mock_basic_dag):
7-
from airflow.providers.postgres.operators.postgres import PostgresOperator
7+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
88
from flowetl.mixins.table_name_macros_mixin import TableNameMacrosMixin
99
from flowetl.mixins.wrapping_sql_mixin import wrapped_sql_operator
1010

1111
new_type = wrapped_sql_operator(class_name="DUMMY_TYPE", sql="WRAPS({sql})")
1212
new_instance = new_type(sql="WRAPPED", task_id="DUMMY", dag=mock_basic_dag)
1313
assert new_type.wrapper_sql == "WRAPS({sql})"
14-
assert isinstance(new_instance, PostgresOperator)
14+
assert isinstance(new_instance, SQLExecuteQueryOperator)
1515
assert isinstance(new_instance, TableNameMacrosMixin)
1616
assert type(new_instance).__name__ == "DUMMY_TYPE"
1717
new_instance.prepare_template()

0 commit comments

Comments
 (0)