Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support",
"https://github.com/apache/beam/pull/34830": "testing"
"https://github.com/apache/beam/pull/34830": "testing",
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34830": "testing",
"https://github.com/apache/beam/issues/35429": "testing"
"https://github.com/apache/beam/issues/35429": "testing",
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
}
3 changes: 2 additions & 1 deletion .github/trigger_files/beam_PostCommit_XVR_Flink.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"modification": 2
"modification": 2,
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
}
4 changes: 3 additions & 1 deletion .github/trigger_files/beam_PostCommit_XVR_Spark3.json
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
{}
{
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def _optimize_pipeline(
phases = []
for phase_name in pre_optimize.split(','):
# For now, these are all we allow.
if phase_name in ('pack_combiners', 'lift_combiners'):
if phase_name in ('pack_combiners', 'lift_combiners', 'expand_sdf'):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are compatibility/performance worries, this one is safer (opt in)

phases.append(getattr(translations, phase_name))
else:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,104 @@ def create_options(self):
return options


class PortableRunnerOptimizationTest(unittest.TestCase):
"""Tests for PortableRunner._optimize_pipeline."""
@staticmethod
def _transform_urns(proto, options):
optimized = PortableRunner._optimize_pipeline(proto, options)
return {
t.spec.urn
for t in optimized.components.transforms.values() if t.spec.urn
}

def test_custom_optimize_expand_sdf(self):
"""Verify that expand_sdf can be requested explicitly.

See https://github.com/apache/beam/issues/24422.
"""
from apache_beam.io import restriction_trackers
from apache_beam.portability import common_urns

class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
def initial_restriction(self, element):
return restriction_trackers.OffsetRange(0, len(element))

def create_tracker(self, restriction):
return restriction_trackers.OffsetRestrictionTracker(restriction)

def restriction_size(self, element, restriction):
return restriction.size()

class ExpandingStringsDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
ExpandStringsProvider())):
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
yield element[cur]
cur += 1

p = beam.Pipeline()
_ = (p | beam.Create(['abc']) | beam.ParDo(ExpandingStringsDoFn()))
proto = p.to_runner_api()

transform_urns = self._transform_urns(
proto, PipelineOptions(['--experiments=pre_optimize=expand_sdf']))

self.assertIn(
common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns)
self.assertIn(
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
transform_urns)
self.assertIn(
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
transform_urns)

def test_custom_optimize_expands_bounded_read(self):
"""Verify that iobase.Read(BoundedSource) expands with explicit expand_sdf.

This is the end-to-end scenario from
https://github.com/apache/beam/issues/24422: Read transforms like
ReadFromParquet use SDFs internally. With explicit expand_sdf, these are
expanded before reaching the Spark job server as a single ParDo,
executing on one partition with no parallelization.
"""
from apache_beam.io import iobase
from apache_beam.portability import common_urns

class _FakeBoundedSource(iobase.BoundedSource):
def get_range_tracker(self, start_position, stop_position):
return None

def read(self, range_tracker):
return iter([])

def estimate_size(self):
return 0

p = beam.Pipeline()
_ = p | beam.io.Read(_FakeBoundedSource())
proto = p.to_runner_api()

transform_urns = self._transform_urns(
proto, PipelineOptions(['--experiments=pre_optimize=expand_sdf']))

# The SDFBoundedSourceReader DoFn should have been expanded into
# SDF component stages.
self.assertIn(
common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns)
self.assertIn(
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
transform_urns)
self.assertIn(
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
transform_urns)
# Reshuffle should be present to enable parallelization.
self.assertIn(common_urns.composites.RESHUFFLE.urn, transform_urns)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Loading