Skip to content

Commit d9667e3

Browse files
committed
[Python] Add UnboundedSource ValidatesRunner test on portable runners
Add test_unbounded_source_read to PortableRunnerTest so the portable ValidatesRunner suites exercise the UnboundedSource SDF wrapper end to end: read a self-terminating source through the job service, assert the elements and that the EOF MAX_TIMESTAMP watermark lets a downstream FixedWindows + GroupByKey fire. The embedded portable runner variants, the Flink suites, and PrismRunnerTest inherit the test. SparkRunnerTest skips it because portable Spark does not execute SDFs (#19468), matching its other SDF test skips. Bump the Dataflow, Flink and Spark Python ValidatesRunner trigger files so the PostCommit suites run on this PR. Includes the UnboundedSource SDF wrapper from #38724, which this PR is stacked on.
1 parent 8364da7 commit d9667e3

8 files changed

Lines changed: 2006 additions & 8 deletions
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 2
3+
"modification": 3
44
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support",
33
"https://github.com/apache/beam/pull/34830": "testing",
4-
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
4+
"trigger-2026-04-04": "portable_runner expand_sdf opt-in",
5+
"https://github.com/apache/beam/pull/38892": "UnboundedSource portable VR test"
56
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"https://github.com/apache/beam/pull/34830": "testing",
33
"https://github.com/apache/beam/issues/35429": "testing",
4-
"trigger-2026-04-04": "portable_runner expand_sdf opt-in"
4+
"trigger-2026-04-04": "portable_runner expand_sdf opt-in",
5+
"https://github.com/apache/beam/pull/38892": "UnboundedSource portable VR test"
56
}

sdks/python/apache_beam/io/iobase.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,10 @@ def __init__(self, source: SourceBase) -> None:
918918
"""Initializes a Read transform.
919919
920920
Args:
921-
source: Data source to read from.
921+
source: the data source to read from. May be a ``BoundedSource``, an
922+
``UnboundedSource``, or a ``PTransform`` (which is applied directly).
923+
For any other source ``Read`` is treated as a primitive and relayed to
924+
the runner implementation.
922925
"""
923926
super().__init__()
924927
self.source = source
@@ -944,6 +947,11 @@ def expand(self, pbegin):
944947
| 'EmitSource' >>
945948
core.Map(lambda _: self.source).with_output_types(BoundedSource)
946949
| SDFBoundedSourceReader(display_data))
950+
# Local import to avoid a circular dependency.
951+
from apache_beam.io.unbounded_source import ReadFromUnboundedSource
952+
from apache_beam.io.unbounded_source import UnboundedSource
953+
if isinstance(self.source, UnboundedSource):
954+
return pbegin | ReadFromUnboundedSource(self.source)
947955
elif isinstance(self.source, ptransform.PTransform):
948956
# The Read transform can also admit a full PTransform as an input
949957
# rather than an anctual source. If the input is a PTransform, then
@@ -993,6 +1001,10 @@ def to_runner_api_parameter(
9931001
is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
9941002
if self.source.is_bounded() else
9951003
beam_runner_api_pb2.IsBounded.UNBOUNDED))
1004+
# Local import to avoid a circular dependency.
1005+
from apache_beam.io.unbounded_source import UnboundedSource
1006+
if isinstance(self.source, UnboundedSource):
1007+
return super().to_runner_api_parameter(context)
9961008
elif isinstance(self.source, ptransform.PTransform):
9971009
return self.source.to_runner_api_parameter(context)
9981010
raise NotImplementedError(

sdks/python/apache_beam/io/iobase_test.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@
2121

2222
import unittest
2323

24-
import mock
25-
2624
import apache_beam as beam
27-
from apache_beam.io.concat_source import ConcatSource
28-
from apache_beam.io.concat_source_test import RangeSource
25+
import mock
2926
from apache_beam.io import iobase
3027
from apache_beam.io import range_trackers
28+
from apache_beam.io.concat_source import ConcatSource
29+
from apache_beam.io.concat_source_test import RangeSource
3130
from apache_beam.io.iobase import SourceBundle
3231
from apache_beam.options.pipeline_options import DebugOptions
32+
from apache_beam.portability import common_urns
33+
from apache_beam.portability import python_urns
3334
from apache_beam.testing.util import assert_that
3435
from apache_beam.testing.util import equal_to
3536

@@ -220,5 +221,45 @@ def test_sdf_wrap_range_source(self):
220221
self._run_sdf_wrapper_pipeline(RangeSource(0, 4), [0, 1, 2, 3])
221222

222223

224+
class UseSdfUnboundedSourcesTests(unittest.TestCase):
225+
"""Covers the UnboundedSource branch in
226+
``iobase.Read.expand()``. Uses ``UnboundedCountingSource`` from
227+
``unbounded_source_test`` as a finite fake source (no network).
228+
"""
229+
def test_read_end_to_end_unbounded(self):
230+
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
231+
with beam.Pipeline() as p:
232+
out = p | beam.io.Read(UnboundedCountingSource(5))
233+
assert_that(out, equal_to([0, 1, 2, 3, 4]))
234+
235+
def test_read_unbounded_pcollection_is_unbounded(self):
236+
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
237+
p = beam.Pipeline()
238+
out = p | beam.io.Read(UnboundedCountingSource(3))
239+
self.assertFalse(out.is_bounded)
240+
241+
def test_read_unbounded_serializes_as_expanded_composite(self):
242+
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
243+
p = beam.Pipeline()
244+
p | 'ReadIt' >> beam.io.Read(UnboundedCountingSource(3))
245+
246+
proto = p.to_runner_api(use_fake_coders=True)
247+
transforms = proto.components.transforms.values()
248+
deprecated_reads = [
249+
transform.unique_name for transform in transforms
250+
if transform.spec.urn == common_urns.deprecated_primitives.READ.urn
251+
]
252+
read_transforms = [
253+
transform for transform in proto.components.transforms.values()
254+
if transform.unique_name == 'ReadIt'
255+
]
256+
257+
self.assertEqual([], deprecated_reads)
258+
self.assertEqual(1, len(read_transforms))
259+
self.assertEqual(
260+
python_urns.GENERIC_COMPOSITE_TRANSFORM, read_transforms[0].spec.urn)
261+
self.assertTrue(read_transforms[0].subtransforms)
262+
263+
223264
if __name__ == '__main__':
224265
unittest.main()

0 commit comments

Comments
 (0)