Skip to content

Commit cfe840f

Browse files
authored
Introduced retries to the TableauSensor (#52770)
* Introduced retries to the TableauSensor to account for transient errors * Renamed param retries_on_failure to max_status_retries to ensure distinction to airflow retries
1 parent 5c7f037 commit cfe840f

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

providers/tableau/src/airflow/providers/tableau/sensors/tableau.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class TableauJobStatusSensor(BaseSensorOperator):
4040
:param site_id: The id of the site where the workbook belongs to.
4141
:param tableau_conn_id: The :ref:`Tableau Connection id <howto/connection:tableau>`
4242
containing the credentials to authenticate to the Tableau Server.
43+
:param max_status_retries: How often to rerun get_job_status in case
44+
of an error or cancellation, to account for transient errors.
4345
"""
4446

4547
template_fields: Sequence[str] = ("job_id",)
@@ -50,17 +52,23 @@ def __init__(
5052
job_id: str,
5153
site_id: str | None = None,
5254
tableau_conn_id: str = "tableau_default",
55+
max_status_retries: int = 0,
5356
**kwargs,
5457
) -> None:
5558
super().__init__(**kwargs)
5659
self.tableau_conn_id = tableau_conn_id
5760
self.job_id = job_id
5861
self.site_id = site_id
62+
self.max_status_retries = max_status_retries
63+
self._retry_attempts = 0
5964

6065
def poke(self, context: Context) -> bool:
6166
"""
6267
Pokes until the job has successfully finished.
6368
69+
When max_status_retries is set, the Sensor will retry
70+
in case of a TableauJobFinishCode.ERROR, TableauJobFinishCode.CANCELED before raising an Error.
71+
6472
:param context: The task context during execution.
6573
:return: True if it succeeded and False if not.
6674
"""
@@ -69,6 +77,10 @@ def poke(self, context: Context) -> bool:
6977
self.log.info("Current finishCode is %s (%s)", finish_code.name, finish_code.value)
7078

7179
if finish_code in (TableauJobFinishCode.ERROR, TableauJobFinishCode.CANCELED):
80+
if self._retry_attempts < self.max_status_retries:
81+
self.log.info("Retrying to get the job status")
82+
self._retry_attempts += 1
83+
return False
7284
message = "The Tableau Refresh Workbook Job failed!"
7385
raise TableauJobFailedException(message)
7486

providers/tableau/tests/unit/tableau/sensors/test_tableau.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from airflow.providers.common.compat.sdk import AirflowException
2424
from airflow.providers.tableau.sensors.tableau import (
25+
TableauJobFailedException,
2526
TableauJobFinishCode,
2627
TableauJobStatusSensor,
2728
)
@@ -68,3 +69,35 @@ def test_poke_failed(self, mock_tableau_hook, finish_code):
6869
with pytest.raises(AirflowException):
6970
sensor.poke({})
7071
mock_tableau_hook.get_job_status.assert_called_once_with(job_id=sensor.job_id)
72+
73+
@patch("airflow.providers.tableau.sensors.tableau.TableauHook")
74+
def test_poke_succeeds_on_last_try(self, mock_tableau_hook_class):
75+
mock_tableau_hook = Mock()
76+
mock_tableau_hook.get_job_status.side_effect = [
77+
TableauJobFinishCode.ERROR,
78+
TableauJobFinishCode.CANCELED,
79+
TableauJobFinishCode.SUCCESS,
80+
]
81+
mock_tableau_hook_class.return_value.__enter__.return_value = mock_tableau_hook
82+
sensor = TableauJobStatusSensor(**self.kwargs, max_status_retries=2)
83+
84+
assert not sensor.poke({})
85+
assert not sensor.poke({})
86+
assert sensor.poke({})
87+
assert mock_tableau_hook.get_job_status.call_count == 3
88+
89+
@patch("airflow.providers.tableau.sensors.tableau.TableauHook")
90+
def test_poke_failed_on_last_try(self, mock_tableau_hook_class):
91+
mock_tableau_hook = Mock()
92+
mock_tableau_hook.get_job_status.side_effect = [
93+
TableauJobFinishCode.ERROR,
94+
TableauJobFinishCode.CANCELED,
95+
TableauJobFinishCode.ERROR,
96+
]
97+
mock_tableau_hook_class.return_value.__enter__.return_value = mock_tableau_hook
98+
sensor = TableauJobStatusSensor(**self.kwargs, max_status_retries=2, poke_interval=10.0)
99+
assert not sensor.poke({})
100+
assert not sensor.poke({})
101+
with pytest.raises(TableauJobFailedException, match="The Tableau Refresh Workbook Job failed!"):
102+
sensor.poke({})
103+
assert mock_tableau_hook.get_job_status.call_count == 3

0 commit comments

Comments
 (0)