Skip to content

Commit 02ac93e

Browse files
authored
Add DiskProvisionedIops/ThroughputMibps pipeline options for the Python SDK (#38370)
* Restore Java and Go changes for disk provisioned IOPS and throughput * Add CHANGES.md entry for disk provisioned IOPS and throughput * restore go changes * initialize options map in dataflow job to prevent nil pointer exceptions * go fmt * add testDiskProvisionedOptionsConfig unit test * Update pr id in changes.md * add disk_provisioned_iops and disk_provisioned_throughput_mibps options to pipeline configuration * add Python support to disk IOPS and throughput pipeline options in CHANGES.md * revert manual changes * ran gen_client * gen client * trigger postcommit_python * Delete R74 * undo gen_client deletion * delete change to trigger postcom
1 parent 422f630 commit 02ac93e

6 files changed

Lines changed: 104 additions & 44 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@
7070

7171
## New Features / Improvements
7272

73-
* Added support for setting disk provisioned IOPS and throughput in Dataflow runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` pipeline options (Java/Go) ([#38349](https://github.com/apache/beam/issues/38349)).
74-
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
73+
* Added support for setting disk provisioned IOPS and throughput in Dataflow runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` pipeline options (Java/Go/Python) ([#38349](https://github.com/apache/beam/issues/38349)).
7574
* TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to
7675
encode finished bitset. SentinelBitSetCoder and BitSetCoder are state
7776
compatible. Both coders can decode encoded bytes from the other coder

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,6 +1402,24 @@ def _add_argparse_args(cls, parser):
14021402
dest='disk_type',
14031403
default=None,
14041404
help=('Specifies what type of persistent disk should be used.'))
1405+
parser.add_argument(
1406+
'--disk_provisioned_iops',
1407+
type=int,
1408+
default=None,
1409+
dest='disk_provisioned_iops',
1410+
help=(
1411+
'The provisioned IOPS of the disk. If not set, the Dataflow service'
1412+
' will choose a reasonable default.'),
1413+
)
1414+
parser.add_argument(
1415+
'--disk_provisioned_throughput_mibps',
1416+
type=int,
1417+
default=None,
1418+
dest='disk_provisioned_throughput_mibps',
1419+
help=(
1420+
'The provisioned throughput of the disk in MiB/s. If not set, the'
1421+
' Dataflow service will choose a reasonable default.'),
1422+
)
14051423
parser.add_argument(
14061424
'--worker_region',
14071425
default=None,

sdks/python/apache_beam/options/pipeline_options_test.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,12 +444,18 @@ def test_worker_options(self):
444444
'abc',
445445
'--disk_type',
446446
'def',
447+
'--disk_provisioned_iops',
448+
'4000',
449+
'--disk_provisioned_throughput_mibps',
450+
'200',
447451
'--element_processing_timeout_minutes',
448452
'10',
449453
])
450454
worker_options = options.view_as(WorkerOptions)
451455
self.assertEqual(worker_options.machine_type, 'abc')
452456
self.assertEqual(worker_options.disk_type, 'def')
457+
self.assertEqual(worker_options.disk_provisioned_iops, 4000)
458+
self.assertEqual(worker_options.disk_provisioned_throughput_mibps, 200)
453459
self.assertEqual(worker_options.element_processing_timeout_minutes, 10)
454460

455461
options = PipelineOptions(

sdks/python/apache_beam/runners/dataflow/internal/apiclient.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,11 @@ def __init__(
208208
pool.diskSizeGb = self.worker_options.disk_size_gb
209209
if self.worker_options.disk_type:
210210
pool.diskType = self.worker_options.disk_type
211+
if self.worker_options.disk_provisioned_iops is not None:
212+
pool.diskProvisionedIops = self.worker_options.disk_provisioned_iops
213+
if self.worker_options.disk_provisioned_throughput_mibps is not None:
214+
pool.diskProvisionedThroughputMibps = (
215+
self.worker_options.disk_provisioned_throughput_mibps)
211216
if self.worker_options.zone:
212217
pool.zone = self.worker_options.zone
213218
if self.worker_options.network:

sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,23 @@ def test_number_of_worker_harness_threads(self):
644644
FAKE_PIPELINE_URL)
645645
self.assertEqual(env.proto.workerPools[0].numThreadsPerWorker, 2)
646646

647+
def test_disk_provisioning_options(self):
648+
pipeline_options = PipelineOptions([
649+
'--temp_location',
650+
'gs://any-location/temp',
651+
'--disk_provisioned_iops',
652+
'4000',
653+
'--disk_provisioned_throughput_mibps',
654+
'200'
655+
])
656+
env = apiclient.Environment([],
657+
pipeline_options,
658+
'2.0.0',
659+
FAKE_PIPELINE_URL)
660+
self.assertEqual(env.proto.workerPools[0].diskProvisionedIops, 4000)
661+
self.assertEqual(
662+
env.proto.workerPools[0].diskProvisionedThroughputMibps, 200)
663+
647664
@mock.patch(
648665
'apache_beam.runners.dataflow.internal.apiclient.'
649666
'beam_version.__version__',

sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py

Lines changed: 57 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2634,8 +2634,8 @@ class FlexTemplateRuntimeEnvironment(_messages.Message):
26342634
ipConfiguration: Configuration for VM IPs.
26352635
kmsKeyName: Name for the Cloud KMS key for the job. Key format is:
26362636
projects//locations//keyRings//cryptoKeys/
2637-
launcherMachineType: The machine type to use for launching the job. The
2638-
default is n1-standard-1.
2637+
launcherMachineType: The machine type to use for launching the job. If not
2638+
set, Dataflow will select a default machine type.
26392639
machineType: The machine type to use for the job. Defaults to the value
26402640
from the template if not specified.
26412641
maxWorkers: The maximum number of Google Compute Engine instances to be
@@ -3209,6 +3209,7 @@ class Job(_messages.Message):
32093209
attempts to create a job with the same name as an active job that
32103210
already exists, the attempt returns the existing job. The name must
32113211
match the regular expression `[a-z]([-a-z0-9]{0,1022}[a-z0-9])?`
3212+
pausable: Output only. Indicates whether the job can be paused.
32123213
pipelineDescription: Preliminary field: The format of this data may change
32133214
at any time. A description of the user pipeline and stages through which
32143215
it is executed. Created by Cloud Dataflow service. Only retrieved with
@@ -3498,22 +3499,23 @@ class AdditionalProperty(_messages.Message):
34983499
labels = _messages.MessageField('LabelsValue', 10)
34993500
location = _messages.StringField(11)
35003501
name = _messages.StringField(12)
3501-
pipelineDescription = _messages.MessageField('PipelineDescription', 13)
3502-
projectId = _messages.StringField(14)
3503-
replaceJobId = _messages.StringField(15)
3504-
replacedByJobId = _messages.StringField(16)
3505-
requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 17)
3506-
runtimeUpdatableParams = _messages.MessageField('RuntimeUpdatableParams', 18)
3507-
satisfiesPzi = _messages.BooleanField(19)
3508-
satisfiesPzs = _messages.BooleanField(20)
3509-
serviceResources = _messages.MessageField('ServiceResources', 21)
3510-
stageStates = _messages.MessageField('ExecutionStageState', 22, repeated=True)
3511-
startTime = _messages.StringField(23)
3512-
steps = _messages.MessageField('Step', 24, repeated=True)
3513-
stepsLocation = _messages.StringField(25)
3514-
tempFiles = _messages.StringField(26, repeated=True)
3515-
transformNameMapping = _messages.MessageField('TransformNameMappingValue', 27)
3516-
type = _messages.EnumField('TypeValueValuesEnum', 28)
3502+
pausable = _messages.BooleanField(13)
3503+
pipelineDescription = _messages.MessageField('PipelineDescription', 14)
3504+
projectId = _messages.StringField(15)
3505+
replaceJobId = _messages.StringField(16)
3506+
replacedByJobId = _messages.StringField(17)
3507+
requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 18)
3508+
runtimeUpdatableParams = _messages.MessageField('RuntimeUpdatableParams', 19)
3509+
satisfiesPzi = _messages.BooleanField(20)
3510+
satisfiesPzs = _messages.BooleanField(21)
3511+
serviceResources = _messages.MessageField('ServiceResources', 22)
3512+
stageStates = _messages.MessageField('ExecutionStageState', 23, repeated=True)
3513+
startTime = _messages.StringField(24)
3514+
steps = _messages.MessageField('Step', 25, repeated=True)
3515+
stepsLocation = _messages.StringField(26)
3516+
tempFiles = _messages.StringField(27, repeated=True)
3517+
transformNameMapping = _messages.MessageField('TransformNameMappingValue', 28)
3518+
type = _messages.EnumField('TypeValueValuesEnum', 29)
35173519

35183520

35193521
class JobExecutionDetails(_messages.Message):
@@ -5342,8 +5344,14 @@ class RuntimeUpdatableParams(_messages.Message):
53425344
during job creation.
53435345
53445346
Fields:
5345-
acceptableBacklogDuration: Optional. The backlog threshold duration in
5346-
seconds for autoscaling. Value must be non-negative.
5347+
acceptableBacklogDuration: Optional. Deprecated: Use `latency_tier`
5348+
instead. The backlog threshold duration in seconds for autoscaling.
5349+
Value must be non-negative.
5350+
autoscalingTier: Optional. Deprecated: Use `latency_tier` instead. The
5351+
backlog threshold tier for autoscaling. Value must be one of "low-
5352+
latency", "medium-latency", or "high-latency".
5353+
latencyTier: Optional. The backlog threshold tier for autoscaling. Value
5354+
must be one of "low-latency", "medium-latency", or "high-latency".
53475355
maxNumWorkers: The maximum number of workers to cap autoscaling at. This
53485356
field is currently only supported for Streaming Engine jobs.
53495357
minNumWorkers: The minimum number of workers to scale down to. This field
@@ -5357,9 +5365,11 @@ class RuntimeUpdatableParams(_messages.Message):
53575365
"""
53585366

53595367
acceptableBacklogDuration = _messages.StringField(1)
5360-
maxNumWorkers = _messages.IntegerField(2, variant=_messages.Variant.INT32)
5361-
minNumWorkers = _messages.IntegerField(3, variant=_messages.Variant.INT32)
5362-
workerUtilizationHint = _messages.FloatField(4)
5368+
autoscalingTier = _messages.StringField(2)
5369+
latencyTier = _messages.StringField(3)
5370+
maxNumWorkers = _messages.IntegerField(4, variant=_messages.Variant.INT32)
5371+
minNumWorkers = _messages.IntegerField(5, variant=_messages.Variant.INT32)
5372+
workerUtilizationHint = _messages.FloatField(6)
53635373

53645374

53655375
class SDKInfo(_messages.Message):
@@ -7775,6 +7785,9 @@ class WorkerPool(_messages.Message):
77757785
defaultPackageSet: The default package set to install. This allows the
77767786
service to select a default set of packages which are useful to worker
77777787
harnesses written in a particular language.
7788+
diskProvisionedIops: Optional. IOPS provisioned for the root disk for VMs.
7789+
diskProvisionedThroughputMibps: Optional. Throughput provisioned for the
7790+
root disk for VMs.
77787791
diskSizeGb: Size of root disk for VMs, in GB. If zero or unspecified, the
77797792
service will attempt to choose a reasonable default.
77807793
diskSourceImage: Fully qualified source image for disks.
@@ -7938,25 +7951,27 @@ class AdditionalProperty(_messages.Message):
79387951
autoscalingSettings = _messages.MessageField('AutoscalingSettings', 1)
79397952
dataDisks = _messages.MessageField('Disk', 2, repeated=True)
79407953
defaultPackageSet = _messages.EnumField('DefaultPackageSetValueValuesEnum', 3)
7941-
diskSizeGb = _messages.IntegerField(4, variant=_messages.Variant.INT32)
7942-
diskSourceImage = _messages.StringField(5)
7943-
diskType = _messages.StringField(6)
7944-
ipConfiguration = _messages.EnumField('IpConfigurationValueValuesEnum', 7)
7945-
kind = _messages.StringField(8)
7946-
machineType = _messages.StringField(9)
7947-
metadata = _messages.MessageField('MetadataValue', 10)
7948-
network = _messages.StringField(11)
7949-
numThreadsPerWorker = _messages.IntegerField(12, variant=_messages.Variant.INT32)
7950-
numWorkers = _messages.IntegerField(13, variant=_messages.Variant.INT32)
7951-
onHostMaintenance = _messages.StringField(14)
7952-
packages = _messages.MessageField('Package', 15, repeated=True)
7953-
poolArgs = _messages.MessageField('PoolArgsValue', 16)
7954-
sdkHarnessContainerImages = _messages.MessageField('SdkHarnessContainerImage', 17, repeated=True)
7955-
subnetwork = _messages.StringField(18)
7956-
taskrunnerSettings = _messages.MessageField('TaskRunnerSettings', 19)
7957-
teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 20)
7958-
workerHarnessContainerImage = _messages.StringField(21)
7959-
zone = _messages.StringField(22)
7954+
diskProvisionedIops = _messages.IntegerField(4)
7955+
diskProvisionedThroughputMibps = _messages.IntegerField(5)
7956+
diskSizeGb = _messages.IntegerField(6, variant=_messages.Variant.INT32)
7957+
diskSourceImage = _messages.StringField(7)
7958+
diskType = _messages.StringField(8)
7959+
ipConfiguration = _messages.EnumField('IpConfigurationValueValuesEnum', 9)
7960+
kind = _messages.StringField(10)
7961+
machineType = _messages.StringField(11)
7962+
metadata = _messages.MessageField('MetadataValue', 12)
7963+
network = _messages.StringField(13)
7964+
numThreadsPerWorker = _messages.IntegerField(14, variant=_messages.Variant.INT32)
7965+
numWorkers = _messages.IntegerField(15, variant=_messages.Variant.INT32)
7966+
onHostMaintenance = _messages.StringField(16)
7967+
packages = _messages.MessageField('Package', 17, repeated=True)
7968+
poolArgs = _messages.MessageField('PoolArgsValue', 18)
7969+
sdkHarnessContainerImages = _messages.MessageField('SdkHarnessContainerImage', 19, repeated=True)
7970+
subnetwork = _messages.StringField(20)
7971+
taskrunnerSettings = _messages.MessageField('TaskRunnerSettings', 21)
7972+
teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 22)
7973+
workerHarnessContainerImage = _messages.StringField(23)
7974+
zone = _messages.StringField(24)
79607975

79617976

79627977
class WorkerSettings(_messages.Message):

0 commit comments

Comments
 (0)