Skip to content

Commit 8e7c691

Browse files
committed
Bump dataflow client version, restore pausing/paused concept
1 parent 2b5386b commit 8e7c691

3 files changed

Lines changed: 21 additions & 3 deletions

File tree

sdks/python/apache_beam/runners/dataflow/dataflow_runner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -759,8 +759,8 @@ def api_jobstate_to_pipeline_state(api_jobstate):
759759
values_enum.JOB_STATE_CANCELLING: PipelineState.CANCELLING,
760760
values_enum.JOB_STATE_RESOURCE_CLEANING_UP: PipelineState.
761761
RESOURCE_CLEANING_UP,
762-
# values_enum.JOB_STATE_PAUSING : PipelineState.PAUSING,
763-
# values_enum.JOB_STATE_PAUSED : PipelineState.PAUSED,
762+
values_enum.JOB_STATE_PAUSING : PipelineState.PAUSING,
763+
values_enum.JOB_STATE_PAUSED : PipelineState.PAUSED,
764764
})
765765

766766
return (

sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,22 @@ def get_job_side_effect(*args, **kwargs):
197197
result = duration_timedout_result.wait_until_finish(5000)
198198
self.assertEqual(result, PipelineState.RUNNING)
199199

200+
with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20, 20])):
201+
duration_timedout_runner = MockDataflowRunner(
202+
[values_enum.JOB_STATE_PAUSING])
203+
duration_timedout_result = DataflowPipelineResult(
204+
duration_timedout_runner.job, duration_timedout_runner, options)
205+
result = duration_timedout_result.wait_until_finish(5000)
206+
self.assertEqual(result, PipelineState.PAUSING)
207+
208+
with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20, 20])):
209+
duration_timedout_runner = MockDataflowRunner(
210+
[values_enum.JOB_STATE_PAUSED])
211+
duration_timedout_result = DataflowPipelineResult(
212+
duration_timedout_runner.job, duration_timedout_runner, options)
213+
result = duration_timedout_result.wait_until_finish(5000)
214+
self.assertEqual(result, PipelineState.PAUSED)
215+
200216
with mock.patch('time.time', mock.MagicMock(side_effect=[1, 1, 2, 2, 3])):
201217
with self.assertRaisesRegex(DataflowRuntimeException,
202218
'Dataflow pipeline failed. State: CANCELLED'):
@@ -255,6 +271,8 @@ def test_api_jobstate_to_pipeline_state(self):
255271
(values_enum.JOB_STATE_DRAINED, PipelineState.DRAINED),
256272
(values_enum.JOB_STATE_PENDING, PipelineState.PENDING),
257273
(values_enum.JOB_STATE_CANCELLING, PipelineState.CANCELLING),
274+
(values_enum.JOB_STATE_PAUSING, PipelineState.PAUSING),
275+
(values_enum.JOB_STATE_PAUSED, PipelineState.PAUSED),
258276
(
259277
values_enum.JOB_STATE_RESOURCE_CLEANING_UP,
260278
PipelineState.RESOURCE_CLEANING_UP),

sdks/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ def get_portability_package_data():
486486
'google-cloud-datastore>=2.0.0,<3',
487487
'google-cloud-pubsub>=2.1.0,<3',
488488
'google-cloud-storage>=2.18.2,<3',
489-
'google-cloud-dataflow-client>=0.11.0,<0.12.0',
489+
'google-cloud-dataflow-client>=0.13.0,<0.14.0',
490490
# GCP packages required by tests
491491
'google-cloud-bigquery>=2.0.0,<4',
492492
'google-cloud-bigquery-storage>=2.6.3,<3',

0 commit comments

Comments
 (0)