Skip to content

Commit c01ceea

Browse files
Anuragp22Abacn
andauthored
[Python] Honor disableCounterMetrics, disableStringSetMetrics, and disableBoundedTrieMetrics experiments (#38749)
* [Python] Honor disableCounterMetrics / disableStringSetMetrics / disableBoundedTrieMetrics experiments Mirrors the Java SDK Metrics.MetricsFlag behavior so high-throughput Python pipelines can opt out of user metric kinds that add pressure to metric backends. Adds a process-wide MetricsFlag singleton initialized once at worker harness startup from pipeline experiments. When a flag is set, the corresponding Delegating* class short-circuits its update path so no MetricCell is touched and no monitoring info is emitted. For #38746. * [Python] Add unit tests for MetricsFlag Ports Java MetricsTest.testMetricsFlag and adds three smoke tests confirming the disabled Counter / StringSet / BoundedTrie short-circuit without raising when called on a no-current-tracker path. For #38746. * [Python] Note disable*Metrics support in CHANGES.md For #38746. * [Python] Use public class attrs on MetricsFlag for hot-path reads Drops the classmethod getter wrapper around each disabled flag and exposes counter_disabled / string_set_disabled / bounded_trie_disabled directly as public class attributes. The gate runs on every metric update, so swapping a descriptor lookup + function call for a single attribute load matters in exactly the high-throughput pipelines these experiments are designed for. Idiomatic Python; java parity is about behavior, not internal API shape. Addresses review feedback. For #38746. * [Python] Address PR review: simplify MetricsFlag pydoc and reset in finally * [Python] Address PR review: assert no MetricCell created when disabled * [Python] Initialize MetricsFlag from Pipeline so in-process runners honor disable* experiments The previous init only ran in the gRPC SDK harness (sdk_worker_main), so DirectRunner and other in-process runners silently ignored the disableCounterMetrics / disableStringSetMetrics / disableBoundedTrieMetrics experiments. Initializing in Pipeline.__init__ matches the pattern of FileSystems.set_options at the same call site and mirrors the Java init point (PipelineRunner.run). --------- Co-authored-by: Yi Hu <yathu@google.com>
1 parent dd3a454 commit c01ceea

5 files changed

Lines changed: 161 additions & 4 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969

7070
## New Features / Improvements
7171

72+
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
7273
* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` or `--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)).
7374

7475
## Breaking Changes

sdks/python/apache_beam/metrics/metric.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,51 @@
4646
from apache_beam.metrics.metricbase import Histogram
4747
from apache_beam.metrics.metricbase import MetricName
4848
from apache_beam.metrics.metricbase import StringSet
49+
from apache_beam.options.pipeline_options import DebugOptions
4950

5051
if TYPE_CHECKING:
5152
from apache_beam.internal.metrics.metric import MetricLogger
5253
from apache_beam.metrics.execution import MetricKey
5354
from apache_beam.metrics.metricbase import Metric
55+
from apache_beam.options.pipeline_options import PipelineOptions
5456
from apache_beam.utils.histogram import BucketType
5557

5658
__all__ = ['Metrics', 'MetricsFilter', 'Lineage']
5759

5860
_LOGGER = logging.getLogger(__name__)
5961

6062

63+
class MetricsFlag(object):
64+
"""Process-wide flags controlling which user metric kinds are emitted."""
65+
counter_disabled = False
66+
string_set_disabled = False
67+
bounded_trie_disabled = False
68+
_initialized = False
69+
70+
@classmethod
71+
def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None:
72+
if cls._initialized:
73+
return
74+
debug_options = options.view_as(DebugOptions)
75+
if debug_options.lookup_experiment('disableCounterMetrics'):
76+
cls.counter_disabled = True
77+
_LOGGER.info('Counter metrics are disabled.')
78+
if debug_options.lookup_experiment('disableStringSetMetrics'):
79+
cls.string_set_disabled = True
80+
_LOGGER.info('StringSet metrics are disabled.')
81+
if debug_options.lookup_experiment('disableBoundedTrieMetrics'):
82+
cls.bounded_trie_disabled = True
83+
_LOGGER.info('BoundedTrie metrics are disabled.')
84+
cls._initialized = True
85+
86+
@classmethod
87+
def reset(cls) -> None:
88+
cls.counter_disabled = False
89+
cls.string_set_disabled = False
90+
cls.bounded_trie_disabled = False
91+
cls._initialized = False
92+
93+
6194
class Metrics(object):
6295
"""Lets users create/access metric objects during pipeline execution."""
6396
@staticmethod
@@ -204,12 +237,17 @@ class DelegatingCounter(Counter):
204237
def __init__(
205238
self, metric_name: MetricName, process_wide: bool = False) -> None:
206239
super().__init__(metric_name)
207-
self.inc = MetricUpdater( # type: ignore[method-assign]
240+
self._updater = MetricUpdater(
208241
cells.CounterCell,
209242
metric_name,
210243
default_value=1,
211244
process_wide=process_wide)
212245

246+
def inc(self, n: int = 1) -> None:
247+
if MetricsFlag.counter_disabled:
248+
return
249+
self._updater(n)
250+
213251
class DelegatingDistribution(Distribution):
214252
"""Metrics Distribution Delegates functionality to MetricsEnvironment."""
215253
def __init__(
@@ -231,13 +269,23 @@ class DelegatingStringSet(StringSet):
231269
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
232270
def __init__(self, metric_name: MetricName) -> None:
233271
super().__init__(metric_name)
234-
self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign]
272+
self._updater = MetricUpdater(cells.StringSetCell, metric_name)
273+
274+
def add(self, value: str) -> None:
275+
if MetricsFlag.string_set_disabled:
276+
return
277+
self._updater(value)
235278

236279
class DelegatingBoundedTrie(BoundedTrie):
237-
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
280+
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
238281
def __init__(self, metric_name: MetricName) -> None:
239282
super().__init__(metric_name)
240-
self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign]
283+
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
284+
285+
def add(self, value) -> None:
286+
if MetricsFlag.bounded_trie_disabled:
287+
return
288+
self._updater(value)
241289

242290

243291
class MetricResults(object):

sdks/python/apache_beam/metrics/metric_test.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
from apache_beam.metrics.metric import MetricResults
3333
from apache_beam.metrics.metric import Metrics
3434
from apache_beam.metrics.metric import MetricsFilter
35+
from apache_beam.metrics.metric import MetricsFlag
3536
from apache_beam.metrics.metricbase import MetricName
37+
from apache_beam.options.pipeline_options import PipelineOptions
3638
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
3739
from apache_beam.runners.worker import statesampler
3840
from apache_beam.testing.metric_result_matchers import DistributionMatcher
@@ -121,6 +123,108 @@ def test_get_namespace_error(self):
121123
with self.assertRaises(ValueError):
122124
Metrics.get_namespace(object())
123125

126+
def test_metrics_flag(self):
127+
MetricsFlag.reset()
128+
try:
129+
self.assertFalse(MetricsFlag.counter_disabled)
130+
self.assertFalse(MetricsFlag.string_set_disabled)
131+
self.assertFalse(MetricsFlag.bounded_trie_disabled)
132+
133+
options = PipelineOptions(['--experiments=disableCounterMetrics'])
134+
MetricsFlag.set_default_pipeline_options(options)
135+
self.assertTrue(MetricsFlag.counter_disabled)
136+
self.assertFalse(MetricsFlag.string_set_disabled)
137+
self.assertFalse(MetricsFlag.bounded_trie_disabled)
138+
139+
MetricsFlag.reset()
140+
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
141+
MetricsFlag.set_default_pipeline_options(options)
142+
self.assertFalse(MetricsFlag.counter_disabled)
143+
self.assertTrue(MetricsFlag.string_set_disabled)
144+
self.assertFalse(MetricsFlag.bounded_trie_disabled)
145+
146+
MetricsFlag.reset()
147+
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
148+
MetricsFlag.set_default_pipeline_options(options)
149+
self.assertFalse(MetricsFlag.counter_disabled)
150+
self.assertFalse(MetricsFlag.string_set_disabled)
151+
self.assertTrue(MetricsFlag.bounded_trie_disabled)
152+
153+
MetricsFlag.reset()
154+
options = PipelineOptions([
155+
'--experiments=disableCounterMetrics',
156+
'--experiments=disableStringSetMetrics',
157+
'--experiments=disableBoundedTrieMetrics',
158+
])
159+
MetricsFlag.set_default_pipeline_options(options)
160+
self.assertTrue(MetricsFlag.counter_disabled)
161+
self.assertTrue(MetricsFlag.string_set_disabled)
162+
self.assertTrue(MetricsFlag.bounded_trie_disabled)
163+
finally:
164+
MetricsFlag.reset()
165+
166+
def test_disabled_counter_is_noop(self):
167+
sampler = statesampler.StateSampler('', counters.CounterFactory())
168+
statesampler.set_current_tracker(sampler)
169+
state = sampler.scoped_state(
170+
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
171+
MetricsFlag.reset()
172+
try:
173+
sampler.start()
174+
with state:
175+
container = MetricsEnvironment.current_container()
176+
Metrics.counter('ns', 'baseline').inc()
177+
self.assertEqual(len(container.metrics), 1)
178+
options = PipelineOptions(['--experiments=disableCounterMetrics'])
179+
MetricsFlag.set_default_pipeline_options(options)
180+
Metrics.counter('ns', 'after_disable').inc()
181+
Metrics.counter('ns', 'after_disable').inc(5)
182+
Metrics.counter('ns', 'after_disable').dec()
183+
self.assertEqual(len(container.metrics), 1)
184+
finally:
185+
sampler.stop()
186+
MetricsFlag.reset()
187+
188+
def test_disabled_string_set_is_noop(self):
189+
sampler = statesampler.StateSampler('', counters.CounterFactory())
190+
statesampler.set_current_tracker(sampler)
191+
state = sampler.scoped_state(
192+
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
193+
MetricsFlag.reset()
194+
try:
195+
sampler.start()
196+
with state:
197+
container = MetricsEnvironment.current_container()
198+
Metrics.string_set('ns', 'baseline').add('seed')
199+
self.assertEqual(len(container.metrics), 1)
200+
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
201+
MetricsFlag.set_default_pipeline_options(options)
202+
Metrics.string_set('ns', 'after_disable').add('value')
203+
self.assertEqual(len(container.metrics), 1)
204+
finally:
205+
sampler.stop()
206+
MetricsFlag.reset()
207+
208+
def test_disabled_bounded_trie_is_noop(self):
209+
sampler = statesampler.StateSampler('', counters.CounterFactory())
210+
statesampler.set_current_tracker(sampler)
211+
state = sampler.scoped_state(
212+
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
213+
MetricsFlag.reset()
214+
try:
215+
sampler.start()
216+
with state:
217+
container = MetricsEnvironment.current_container()
218+
Metrics.bounded_trie('ns', 'baseline').add(['a'])
219+
self.assertEqual(len(container.metrics), 1)
220+
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
221+
MetricsFlag.set_default_pipeline_options(options)
222+
Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b'])
223+
self.assertEqual(len(container.metrics), 1)
224+
finally:
225+
sampler.stop()
226+
MetricsFlag.reset()
227+
124228
def test_counter_empty_name(self):
125229
with self.assertRaises(ValueError):
126230
Metrics.counter("namespace", "")

sdks/python/apache_beam/pipeline.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
from apache_beam.coders import typecoders
7474
from apache_beam.internal import pickler
7575
from apache_beam.io.filesystems import FileSystems
76+
from apache_beam.metrics.metric import MetricsFlag
7677
from apache_beam.options.pipeline_options import CrossLanguageOptions
7778
from apache_beam.options.pipeline_options import DebugOptions
7879
from apache_beam.options.pipeline_options import PipelineOptions
@@ -192,6 +193,7 @@ def __init__(
192193
self._options = PipelineOptions([])
193194

194195
FileSystems.set_options(self._options)
196+
MetricsFlag.set_default_pipeline_options(self._options)
195197

196198
if runner is None:
197199
runner = self._options.view_as(StandardOptions).runner

sdks/python/apache_beam/runners/worker/sdk_worker_main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
from apache_beam.internal import pickler
3535
from apache_beam.io import filesystems
36+
from apache_beam.metrics import metric
3637
from apache_beam.options.pipeline_options import DebugOptions
3738
from apache_beam.options.pipeline_options import GoogleCloudOptions
3839
from apache_beam.options.pipeline_options import PipelineOptions
@@ -123,6 +124,7 @@ def create_harness(environment, dry_run=False):
123124
RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
124125
sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
125126
filesystems.FileSystems.set_options(sdk_pipeline_options)
127+
metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options)
126128
pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library
127129
pickler.set_library(pickle_library)
128130

0 commit comments

Comments
 (0)