Skip to content

Commit 2b5386b

Browse files
committed
re-enable histograms
1 parent eb59b5f commit 2b5386b

3 files changed

Lines changed: 34 additions & 22 deletions

File tree

sdks/python/apache_beam/runners/dataflow/dataflow_runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,8 @@ def api_jobstate_to_pipeline_state(api_jobstate):
759759
values_enum.JOB_STATE_CANCELLING: PipelineState.CANCELLING,
760760
values_enum.JOB_STATE_RESOURCE_CLEANING_UP: PipelineState.
761761
RESOURCE_CLEANING_UP,
762+
# values_enum.JOB_STATE_PAUSING : PipelineState.PAUSING,
763+
# values_enum.JOB_STATE_PAUSED : PipelineState.PAUSED,
762764
})
763765

764766
return (

sdks/python/apache_beam/runners/dataflow/internal/apiclient.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
from apache_beam.runners.internal import names as shared_names
7070
from apache_beam.runners.pipeline_utils import validate_pipeline_graph
7171
from apache_beam.runners.portability.stager import Stager
72+
from apache_beam.transforms import DataflowDistributionCounter
7273
from apache_beam.transforms import cy_combiners
7374
from apache_beam.transforms.display import DisplayData
7475
from apache_beam.transforms.environments import is_apache_beam_container
@@ -1109,6 +1110,12 @@ def get_sdk_package_name():
11091110
return shared_names.BEAM_PACKAGE_NAME
11101111

11111112

1113+
class Histogram(object):
1114+
def __init__(self):
1115+
self.firstBucketOffset = None
1116+
self.bucketCounts = None
1117+
1118+
11121119
class DataflowJobAlreadyExistsError(retry.PermanentException):
11131120
"""A non-retryable exception that a job with the given name already exists."""
11141121
# Inherits retry.PermanentException to avoid retry in
@@ -1133,9 +1140,11 @@ def translate_distribution(
11331140
dist_update_proto.update({"count": distribution_update.count})
11341141
dist_update_proto.update({"sum": distribution_update.sum})
11351142
# DataflowDistributionCounter needs to translate histogram
1136-
# if isinstance(distribution_update, DataflowDistributionCounter):
1137-
# dist_update_proto.histogram = dataflow.Histogram()
1138-
# distribution_update.translate_to_histogram(dist_update_proto.histogram)
1143+
if isinstance(distribution_update, DataflowDistributionCounter):
1144+
histogram = Histogram()
1145+
distribution_update.translate_to_histogram(histogram)
1146+
dist_update_proto.update({"firstBucketOffset": histogram.firstBucketOffset})
1147+
dist_update_proto.update({"bucketCounts": histogram.bucketCounts})
11391148
metric_update_proto.distribution = dist_update_proto
11401149

11411150

sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from apache_beam.portability.api import beam_runner_api_pb2
4040
from apache_beam.runners.dataflow.internal import names
4141
from apache_beam.transforms import Create
42+
from apache_beam.transforms import DataflowDistributionCounter
4243
from apache_beam.transforms import DoFn
4344
from apache_beam.transforms import ParDo
4445
from apache_beam.transforms.environments import DockerEnvironment
@@ -492,25 +493,25 @@ def test_translate_distribution_using_distribution_data(self):
492493
metric_update.distribution['count'], distribution_update.count)
493494

494495

495-
# def test_translate_distribution_using_dataflow_distribution_counter(self):
496-
# counter_update = DataflowDistributionCounter()
497-
# counter_update.add_input(1)
498-
# counter_update.add_input(3)
499-
# metric_proto = dataflow.MetricUpdate()
500-
# apiclient.translate_distribution(counter_update, metric_proto)
501-
# histogram = mock.Mock(firstBucketOffset=None, bucketCounts=None)
502-
# counter_update.translate_to_histogram(histogram)
503-
# self.assertEqual(metric_proto.distribution['min'], counter_update.min)
504-
# self.assertEqual(metric_proto.distribution['max'], counter_update.max)
505-
# self.assertEqual(metric_proto.distribution['sum'], counter_update.sum)
506-
# self.assertEqual(
507-
# metric_proto.distribution['count'], counter_update.count)
508-
# self.assertEqual(
509-
# metric_proto.distribution.histogram.bucketCounts,
510-
# histogram.bucketCounts)
511-
# self.assertEqual(
512-
# metric_proto.distribution.histogram.firstBucketOffset,
513-
# histogram.firstBucketOffset)
496+
def test_translate_distribution_using_dataflow_distribution_counter(self):
497+
counter_update = DataflowDistributionCounter()
498+
counter_update.add_input(1)
499+
counter_update.add_input(3)
500+
metric_proto = dataflow.MetricUpdate()
501+
apiclient.translate_distribution(counter_update, metric_proto)
502+
histogram = mock.Mock(firstBucketOffset=None, bucketCounts=None)
503+
counter_update.translate_to_histogram(histogram)
504+
self.assertEqual(metric_proto.distribution['min'], counter_update.min)
505+
self.assertEqual(metric_proto.distribution['max'], counter_update.max)
506+
self.assertEqual(metric_proto.distribution['sum'], counter_update.sum)
507+
self.assertEqual(
508+
metric_proto.distribution['count'], counter_update.count)
509+
self.assertEqual(
510+
metric_proto.distribution['firstBucketOffset'],
511+
histogram.firstBucketOffset)
512+
self.assertEqual(
513+
metric_proto.distribution['bucketCounts'],
514+
histogram.bucketCounts)
514515

515516
def test_translate_means(self):
516517
metric_update = dataflow.MetricUpdate()

0 commit comments

Comments
 (0)