Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
909c8d5
[WIP] Migrate to Google Cloud Dataflow Client
jrmccluskey Feb 18, 2026
8582fc9
Trigger relevant postcommits
jrmccluskey Feb 18, 2026
a2a9bdf
base image update
jrmccluskey Feb 18, 2026
6d12051
fix camel case
jrmccluskey Feb 18, 2026
b774e36
update dataflow runner + tests
jrmccluskey Feb 18, 2026
b6e9bc8
slide import to avoid triggering unit tests
jrmccluskey Feb 18, 2026
27113b3
yapf stuff
jrmccluskey Feb 18, 2026
cb7fa6b
remove extra print
jrmccluskey Feb 18, 2026
715b2b2
further spec structs, fix incorrect piplineUrl option, remove old cli…
jrmccluskey Feb 19, 2026
ed4acfd
suppress line-too-longs
jrmccluskey Feb 19, 2026
829a4e0
formatting
jrmccluskey Feb 19, 2026
5b0de6c
linting, tweak metrics tests
jrmccluskey Feb 24, 2026
2188505
Proto-specific changes to metric processing tests
jrmccluskey Feb 24, 2026
8bc8fe1
try to dump logging
jrmccluskey Feb 24, 2026
53a2244
handle more straightforward metrics values
jrmccluskey Feb 24, 2026
03062ec
add skips since the unit tests now depend on the proto library
jrmccluskey Feb 24, 2026
3bf6ed7
testing if there's a disconnect between proto behavior locally and in…
jrmccluskey Feb 25, 2026
a73f868
correct scalar access
jrmccluskey Feb 25, 2026
2b23b0d
clean up dist accesses
jrmccluskey Feb 25, 2026
b534d4e
linting, various fixes
jrmccluskey Feb 25, 2026
74c7197
fix unit test setup for direct accesses
jrmccluskey Feb 25, 2026
5035637
linting
jrmccluskey Feb 25, 2026
119f40c
more linting
jrmccluskey Feb 26, 2026
f2b12e0
re-enable histograms
jrmccluskey Mar 25, 2026
457ba18
Bump dataflow client version, restore pausing/paused concept
jrmccluskey Apr 9, 2026
6eea686
formatting
jrmccluskey Apr 9, 2026
b41b94a
fix enum selection
jrmccluskey Apr 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "38069",
"modification": 40
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
55 changes: 15 additions & 40 deletions sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def _get_match(proto, filter_fn):


# V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
STEP_LABEL = 'step'
STRUCTURED_NAME_LABELS = set(
['execution_step', 'original_name', 'output_user_name'])

Expand Down Expand Up @@ -113,7 +112,7 @@ def _translate_step_name(self, internal_name):
step = _get_match(
self._job_graph.proto.steps, lambda x: x.name == internal_name)
user_step_name = _get_match(
step.properties.additionalProperties,
step.properties.properties,
lambda x: x.key == 'user_name').value.string_value
except ValueError:
pass # Exception is handled below.
Expand All @@ -135,24 +134,22 @@ def _get_metric_key(self, metric):
# step name (only happens for unstructured-named metrics).
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
step = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == STEP_LABEL).value
step = metric.name.context['step']
step = self._translate_step_name(step)
except ValueError:
pass

namespace = "dataflow/v1b3" # Try to extract namespace or add a default.
try:
namespace = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == 'namespace').value
carried_namespace = metric.name.context['namespace']
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this still need to be in a try except? If yes, can we just check for existence of the property first instead?

if carried_namespace:
namespace = carried_namespace
except ValueError:
pass

for kv in metric.name.context.additionalProperties:
if kv.key in STRUCTURED_NAME_LABELS:
labels[kv.key] = kv.value
for key in metric.name.context:
if key in STRUCTURED_NAME_LABELS:
labels[key] = metric.name.context[key]
# Package everything besides namespace and name the labels as well,
# including unmodified step names to assist in integration the exact
# unmodified values which come from dataflow.
Expand Down Expand Up @@ -185,10 +182,7 @@ def _populate_metrics(self, response, result, user_metrics=False):
# in the service.
# The second way is only useful for the UI, and should be ignored.
continue
is_tentative = [
prop for prop in metric.name.context.additionalProperties
if prop.key == 'tentative' and prop.value == 'true'
]
is_tentative = metric.name.context['tentative']
tentative_or_committed = 'tentative' if is_tentative else 'committed'

metric_key = self._get_metric_key(metric)
Expand All @@ -209,32 +203,13 @@ def _get_metric_value(self, metric):
return None

if metric.scalar is not None:
return metric.scalar.integer_value
# This will always be a single value if there is any data in the field.
return metric.scalar
elif metric.distribution is not None:
dist_count = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'count').value.integer_value
dist_min = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'min').value.integer_value
dist_max = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'max').value.integer_value
dist_sum = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.integer_value
if dist_sum is None:
# distribution metric is not meant to use on large values, but in case
# it is, the value can overflow and become double_value, the correctness
# of the value may not be guaranteed.
_LOGGER.info(
"Distribution metric sum value seems to have "
"overflowed integer_value range, the correctness of sum or mean "
"value may not be guaranteed: %s" % metric.distribution)
dist_sum = int(
_get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.double_value)
dist_count = metric.distribution['count']
dist_min = metric.distribution['min']
dist_max = metric.distribution['max']
dist_sum = metric.distribution['sum']
return DistributionResult(
DistributionData(dist_sum, dist_count, dist_min, dist_max))
#TODO(https://github.com/apache/beam/issues/31788) support StringSet after
Expand Down
Loading
Loading