Skip to content

Commit 85f10ef

Browse files
committed
[Python] Polish UnboundedSource SDF wrapper API
Refine the experimental Python UnboundedSource wrapper for a user-facing API: - document source, reader, and checkpoint contracts with a beam.io.Read example - keep no-data polling as a private implementation detail - simplify ReadFromUnboundedSource expansion with Create([source]) - make cleanup diagnostics visible without user-facing escalation wording - tighten tests with a splittable UnboundedCountingSource, realistic timestamps, @OverRide annotations, and real Read expansion coverage - defer the CHANGES entry until ValidatesRunner coverage is in place
1 parent c23d077 commit 85f10ef

4 files changed

Lines changed: 467 additions & 727 deletions

File tree

sdks/python/apache_beam/io/iobase.py

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -918,11 +918,10 @@ def __init__(self, source: SourceBase) -> None:
918918
"""Initializes a Read transform.
919919
920920
Args:
921-
source: Data source to read from. A ``BoundedSource`` is wrapped in the
922-
bounded SDF reader; an ``UnboundedSource`` is dispatched through
923-
:class:`apache_beam.io.unbounded_source.ReadFromUnboundedSource` with
924-
the default poll interval (users wanting a custom poll cadence must
925-
instantiate ``ReadFromUnboundedSource`` directly).
921+
source: the data source to read from. May be a ``BoundedSource``, an
922+
``UnboundedSource``, or a ``PTransform`` (which is applied directly).
923+
For any other source ``Read`` is treated as a primitive and relayed to
924+
the runner implementation.
926925
"""
927926
super().__init__()
928927
self.source = source
@@ -948,15 +947,10 @@ def expand(self, pbegin):
948947
| 'EmitSource' >>
949948
core.Map(lambda _: self.source).with_output_types(BoundedSource)
950949
| SDFBoundedSourceReader(display_data))
951-
# Lazy import to break the iobase <-> unbounded_source cycle: the
952-
# unbounded_source module imports iobase (UnboundedSource extends
953-
# SourceBase). Pattern matches the _PubSubSource lazy import below.
950+
# Local import to avoid a circular dependency.
954951
from apache_beam.io.unbounded_source import ReadFromUnboundedSource
955952
from apache_beam.io.unbounded_source import UnboundedSource
956953
if isinstance(self.source, UnboundedSource):
957-
# Delegate to the dedicated SDF PTransform; identical to the user
958-
# writing `p | ReadFromUnboundedSource(self.source)` directly. Custom
959-
# poll_interval_seconds requires using ReadFromUnboundedSource directly.
960954
return pbegin | ReadFromUnboundedSource(self.source)
961955
elif isinstance(self.source, ptransform.PTransform):
962956
# The Read transform can also admit a full PTransform as an input
@@ -999,15 +993,9 @@ def to_runner_api_parameter(
999993
timestamp_attribute=self.source.timestamp_attribute,
1000994
with_attributes=self.source.with_attributes,
1001995
id_attribute=self.source.id_label))
1002-
# Lazy import to avoid the iobase <-> unbounded_source cycle.
996+
# Local import to avoid a circular dependency.
1003997
from apache_beam.io.unbounded_source import UnboundedSource
1004998
if isinstance(self.source, (BoundedSource, UnboundedSource)):
1005-
# READ.urn covers both source flavours; the IsBounded enum distinguishes
1006-
# them. NB: today the bundle_processor.py IMPULSE_READ_TRANSFORM handler
1007-
# only consumes BOUNDED - the UNBOUNDED branch round-trips correctly
1008-
# through the protobuf graph but execution still flows through this
1009-
# composite's expanded sub-transforms (Impulse | Map | SDF-ParDo), not
1010-
# through the URN-handler. Runner-side UNBOUNDED dispatch is W2 work.
1011999
return (
10121000
common_urns.deprecated_primitives.READ.urn,
10131001
beam_runner_api_pb2.ReadPayload(

sdks/python/apache_beam/io/iobase_test.py

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -220,46 +220,33 @@ def test_sdf_wrap_range_source(self):
220220

221221

222222
class UseSdfUnboundedSourcesTests(unittest.TestCase):
223-
"""Mirrors UseSdfBoundedSourcesTests for the new UnboundedSource branch in
224-
iobase.Read.expand(). Uses CountingSource from unbounded_source_test as the
225-
fake finite UnboundedSource (avoids dragging the network in).
223+
"""Covers the UnboundedSource branch in
224+
``iobase.Read.expand()``. Uses ``UnboundedCountingSource`` from
225+
``unbounded_source_test`` as a finite fake source (no network).
226226
"""
227-
def test_read_dispatches_to_read_from_unbounded_source(self):
228-
from apache_beam.io.unbounded_source_test import CountingSource
229-
with mock.patch(
230-
'apache_beam.io.unbounded_source.ReadFromUnboundedSource.expand'
231-
) as mock_expand:
232-
mock_expand.side_effect = (
233-
lambda pbegin: pbegin | beam.Impulse() | beam.Map(lambda _: 'fake'))
234-
with beam.Pipeline() as p:
235-
out = p | beam.io.Read(CountingSource(3))
236-
assert_that(out, equal_to(['fake']))
237-
mock_expand.assert_called_once()
238-
239227
def test_read_end_to_end_unbounded(self):
240-
from apache_beam.io.unbounded_source_test import CountingSource
228+
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
241229
with beam.Pipeline() as p:
242-
out = p | beam.io.Read(CountingSource(5))
230+
out = p | beam.io.Read(UnboundedCountingSource(5))
243231
assert_that(out, equal_to([0, 1, 2, 3, 4]))
244232

245233
def test_read_unbounded_pcollection_is_unbounded(self):
246-
from apache_beam.io.unbounded_source_test import CountingSource
234+
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
247235
with beam.Pipeline() as p:
248-
out = p | beam.io.Read(CountingSource(3))
236+
out = p | beam.io.Read(UnboundedCountingSource(3))
249237
self.assertFalse(out.is_bounded)
250238

251239
def test_to_runner_api_emits_unbounded_read_payload(self):
252240
"""``Read.to_runner_api_parameter`` must serialize an UnboundedSource as
253-
``READ.urn`` with ``IsBounded.UNBOUNDED``. The runner-side handler is W2
254-
and ignores this enum today, but the wire format must round-trip
255-
consistently for pipeline persistence / cross-runner submission.
241+
``READ.urn`` with ``IsBounded.UNBOUNDED`` so the wire format round-trips
242+
consistently for pipeline persistence and cross-runner submission.
256243
"""
257-
from apache_beam.io.unbounded_source_test import CountingSource
244+
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
258245
from apache_beam.portability import common_urns
259246
from apache_beam.portability.api import beam_runner_api_pb2
260247
from apache_beam.runners.pipeline_context import PipelineContext
261248

262-
read = beam.io.Read(CountingSource(5))
249+
read = beam.io.Read(UnboundedCountingSource(5))
263250
urn, payload = read.to_runner_api_parameter(PipelineContext())
264251

265252
self.assertEqual(urn, common_urns.deprecated_primitives.READ.urn)
@@ -274,12 +261,11 @@ def test_read_unbounded_round_trips_through_runner_api(self):
274261
transform must be a ``Read`` wrapping an equivalent UnboundedSource.
275262
"""
276263
from apache_beam.io.unbounded_source import UnboundedSource
277-
from apache_beam.io.unbounded_source_test import CountingSource
278-
from apache_beam.portability import common_urns
264+
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
279265
from apache_beam.portability.api import beam_runner_api_pb2
280266
from apache_beam.runners.pipeline_context import PipelineContext
281267

282-
original = beam.io.Read(CountingSource(7))
268+
original = beam.io.Read(UnboundedCountingSource(7))
283269
context = PipelineContext()
284270
urn, payload = original.to_runner_api_parameter(context)
285271

@@ -290,7 +276,7 @@ def test_read_unbounded_round_trips_through_runner_api(self):
290276

291277
self.assertIsInstance(restored, iobase.Read)
292278
self.assertIsInstance(restored.source, UnboundedSource)
293-
self.assertIsInstance(restored.source, CountingSource)
279+
self.assertIsInstance(restored.source, UnboundedCountingSource)
294280
self.assertFalse(restored.source.is_bounded())
295281
# Verify the source's internal state survived pickle round-trip.
296282
self.assertEqual(restored.source._count, 7)

0 commit comments

Comments
 (0)