Skip to content

Commit bab2374

Browse files
authored
Add Pause and Paused states (#37581)
1 parent 0699f35 commit bab2374

5 files changed

Lines changed: 896 additions & 548 deletions

File tree

sdks/python/apache_beam/runners/dataflow/dataflow_runner.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,9 @@ def rank_error(msg):
156156
state_update_callback(response.currentState)
157157
_LOGGER.info('Job %s is in state %s', job_id, response.currentState)
158158
last_job_state = response.currentState
159-
if str(response.currentState) != 'JOB_STATE_RUNNING':
159+
if str(response.currentState) not in ('JOB_STATE_RUNNING',
160+
'JOB_STATE_PAUSED',
161+
'JOB_STATE_PAUSING'):
160162
# Stop checking for new messages on timeout, explanatory
161163
# message received, success, or a terminal job state caused
162164
# by the user that therefore doesn't require explanation.
@@ -751,6 +753,8 @@ def api_jobstate_to_pipeline_state(api_jobstate):
751753
values_enum.JOB_STATE_CANCELLING: PipelineState.CANCELLING,
752754
values_enum.JOB_STATE_RESOURCE_CLEANING_UP: PipelineState.
753755
RESOURCE_CLEANING_UP,
756+
values_enum.JOB_STATE_PAUSING: PipelineState.PAUSING,
757+
values_enum.JOB_STATE_PAUSED: PipelineState.PAUSED,
754758
})
755759

756760
return (

sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,22 @@ def get_job_side_effect(*args, **kwargs):
195195
result = duration_timedout_result.wait_until_finish(5000)
196196
self.assertEqual(result, PipelineState.RUNNING)
197197

198+
with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20, 20])):
199+
duration_timedout_runner = MockDataflowRunner(
200+
[values_enum.JOB_STATE_PAUSING])
201+
duration_timedout_result = DataflowPipelineResult(
202+
duration_timedout_runner.job, duration_timedout_runner, options)
203+
result = duration_timedout_result.wait_until_finish(5000)
204+
self.assertEqual(result, PipelineState.PAUSING)
205+
206+
with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20, 20])):
207+
duration_timedout_runner = MockDataflowRunner(
208+
[values_enum.JOB_STATE_PAUSED])
209+
duration_timedout_result = DataflowPipelineResult(
210+
duration_timedout_runner.job, duration_timedout_runner, options)
211+
result = duration_timedout_result.wait_until_finish(5000)
212+
self.assertEqual(result, PipelineState.PAUSED)
213+
198214
with mock.patch('time.time', mock.MagicMock(side_effect=[1, 1, 2, 2, 3])):
199215
with self.assertRaisesRegex(DataflowRuntimeException,
200216
'Dataflow pipeline failed. State: CANCELLED'):
@@ -239,6 +255,32 @@ def __init__(self, state, cancel_result):
239255
terminal_runner.job, terminal_runner, options)
240256
terminal_result.cancel()
241257

258+
def test_api_jobstate_to_pipeline_state(self):
259+
values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
260+
expected_mappings = [
261+
(values_enum.JOB_STATE_UNKNOWN, PipelineState.UNKNOWN),
262+
(values_enum.JOB_STATE_STOPPED, PipelineState.STOPPED),
263+
(values_enum.JOB_STATE_RUNNING, PipelineState.RUNNING),
264+
(values_enum.JOB_STATE_DONE, PipelineState.DONE),
265+
(values_enum.JOB_STATE_FAILED, PipelineState.FAILED),
266+
(values_enum.JOB_STATE_CANCELLED, PipelineState.CANCELLED),
267+
(values_enum.JOB_STATE_UPDATED, PipelineState.UPDATED),
268+
(values_enum.JOB_STATE_DRAINING, PipelineState.DRAINING),
269+
(values_enum.JOB_STATE_DRAINED, PipelineState.DRAINED),
270+
(values_enum.JOB_STATE_PENDING, PipelineState.PENDING),
271+
(values_enum.JOB_STATE_CANCELLING, PipelineState.CANCELLING),
272+
(
273+
values_enum.JOB_STATE_RESOURCE_CLEANING_UP,
274+
PipelineState.RESOURCE_CLEANING_UP),
275+
(values_enum.JOB_STATE_PAUSING, PipelineState.PAUSING),
276+
(values_enum.JOB_STATE_PAUSED, PipelineState.PAUSED),
277+
]
278+
279+
for api_state, pipeline_state in expected_mappings:
280+
self.assertEqual(
281+
DataflowPipelineResult.api_jobstate_to_pipeline_state(api_state),
282+
pipeline_state)
283+
242284
def test_create_runner(self):
243285
self.assertTrue(isinstance(create_runner('DataflowRunner'), DataflowRunner))
244286
self.assertTrue(

0 commit comments

Comments
 (0)