Skip to content

Commit 6cb13d9

Browse files
committed
feat: add support for disk provisioned IOPS and throughput options for python sdk
1 parent 532dd85 commit 6cb13d9

5 files changed

Lines changed: 49 additions & 0 deletions

File tree

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,24 @@ def _add_argparse_args(cls, parser):
14051405
dest='disk_type',
14061406
default=None,
14071407
help=('Specifies what type of persistent disk should be used.'))
1408+
parser.add_argument(
1409+
'--disk_provisioned_iops',
1410+
type=int,
1411+
default=None,
1412+
dest='disk_provisioned_iops',
1413+
help=(
1414+
'The provisioned IOPS of the disk. If not set, the Dataflow service'
1415+
' will choose a reasonable default.'),
1416+
)
1417+
parser.add_argument(
1418+
'--disk_provisioned_throughput_mibps',
1419+
type=int,
1420+
default=None,
1421+
dest='disk_provisioned_throughput_mibps',
1422+
help=(
1423+
'The provisioned throughput of the disk in MiB/s. If not set, the'
1424+
' Dataflow service will choose a reasonable default.'),
1425+
)
14081426
parser.add_argument(
14091427
'--worker_region',
14101428
default=None,

sdks/python/apache_beam/options/pipeline_options_test.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,12 +444,18 @@ def test_worker_options(self):
444444
'abc',
445445
'--disk_type',
446446
'def',
447+
'--disk_provisioned_iops',
448+
'4000',
449+
'--disk_provisioned_throughput_mibps',
450+
'200',
447451
'--element_processing_timeout_minutes',
448452
'10',
449453
])
450454
worker_options = options.view_as(WorkerOptions)
451455
self.assertEqual(worker_options.machine_type, 'abc')
452456
self.assertEqual(worker_options.disk_type, 'def')
457+
self.assertEqual(worker_options.disk_provisioned_iops, 4000)
458+
self.assertEqual(worker_options.disk_provisioned_throughput_mibps, 200)
453459
self.assertEqual(worker_options.element_processing_timeout_minutes, 10)
454460

455461
options = PipelineOptions(

sdks/python/apache_beam/runners/dataflow/internal/apiclient.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,11 @@ def __init__(
207207
pool.diskSizeGb = self.worker_options.disk_size_gb
208208
if self.worker_options.disk_type:
209209
pool.diskType = self.worker_options.disk_type
210+
if self.worker_options.disk_provisioned_iops is not None:
211+
pool.diskProvisionedIops = self.worker_options.disk_provisioned_iops
212+
if self.worker_options.disk_provisioned_throughput_mibps is not None:
213+
pool.diskProvisionedThroughputMibps = (
214+
self.worker_options.disk_provisioned_throughput_mibps)
210215
if self.worker_options.zone:
211216
pool.zone = self.worker_options.zone
212217
if self.worker_options.network:

sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,24 @@ def test_number_of_worker_harness_threads(self):
609609
FAKE_PIPELINE_URL)
610610
self.assertEqual(env.proto.workerPools[0].numThreadsPerWorker, 2)
611611

612+
def test_disk_provisioning_options(self):
613+
pipeline_options = PipelineOptions([
614+
'--temp_location',
615+
'gs://any-location/temp',
616+
'--disk_provisioned_iops',
617+
'4000',
618+
'--disk_provisioned_throughput_mibps',
619+
'200'
620+
])
621+
env = apiclient.Environment([],
622+
pipeline_options,
623+
'2.0.0',
624+
FAKE_PIPELINE_URL)
625+
self.assertEqual(
626+
env.proto.workerPools[0].diskProvisionedIops, 4000)
627+
self.assertEqual(
628+
env.proto.workerPools[0].diskProvisionedThroughputMibps, 200)
629+
612630
@mock.patch(
613631
'apache_beam.runners.dataflow.internal.apiclient.'
614632
'beam_version.__version__',

sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7957,6 +7957,8 @@ class AdditionalProperty(_messages.Message):
79577957
teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 20)
79587958
workerHarnessContainerImage = _messages.StringField(21)
79597959
zone = _messages.StringField(22)
7960+
diskProvisionedIops = _messages.IntegerField(23, variant=_messages.Variant.INT32)
7961+
diskProvisionedThroughputMibps = _messages.IntegerField(24, variant=_messages.Variant.INT32)
79607962

79617963

79627964
class WorkerSettings(_messages.Message):

0 commit comments

Comments
 (0)