Skip to content

Commit 0750e11

Browse files
committed
Refine unbounded source review feedback
1 parent 11dbf62 commit 0750e11

2 files changed

Lines changed: 43 additions & 43 deletions

File tree

sdks/python/apache_beam/io/unbounded_source.py

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def start(self):
4848
...
4949
5050
def advance(self):
51-
# Move to the next record; ``False`` means no data right now (not EOF).
51+
# Move to the next record; ``False`` means no data is available now.
5252
...
5353
5454
def get_current(self):
@@ -164,7 +164,7 @@ def start(self) -> bool:
164164
raise NotImplementedError
165165

166166
def advance(self) -> bool:
167-
"""Advances to the next record. ``False`` == no data *now*, not EOF."""
167+
"""Advances to the next record. ``False`` means no data is available now."""
168168
raise NotImplementedError
169169

170170
def get_current(self) -> Any:
@@ -176,7 +176,7 @@ def get_current_timestamp(self) -> Timestamp:
176176
raise NotImplementedError
177177

178178
def get_watermark(self) -> Timestamp:
179-
"""A best-effort lower bound on timestamps of future records.
179+
"""An approximate lower bound on timestamps of future records.
180180
181181
Treated as monotonic by the wrapper. Return ``MAX_TIMESTAMP`` to signal that
182182
this reader has permanently finished.
@@ -222,8 +222,8 @@ def create_reader(
222222
produces the very first record of the source (or returns ``False`` if
223223
none yet).
224224
* When ``checkpoint_mark`` is not ``None``, the returned reader's
225-
``start()`` produces the FIRST record strictly AFTER the position
226-
encoded by ``checkpoint_mark``. The reader must NOT re-deliver records
225+
``start()`` produces the first record strictly after the position
226+
encoded by ``checkpoint_mark``. The reader must not re-deliver records
227227
already covered by the prior bundle.
228228
"""
229229
raise NotImplementedError
@@ -265,7 +265,7 @@ class _UnboundedSourceRestriction(object):
265265
266266
Field roles:
267267
* ``checkpoint_mark`` -- RESUME state. A reader rebuilt from this mark
268-
MUST produce the FIRST record strictly AFTER it (i.e. no re-delivery).
268+
must produce the first record strictly after it.
269269
* ``finalization_checkpoint_mark`` -- COMMIT hook. Only set on a done
270270
primary that was just cut this bundle. Registered with the runner's
271271
bundle finalizer to acknowledge upstream. Independent of
@@ -284,12 +284,11 @@ class _UnboundedSourceRestrictionCoder(Coder):
284284
285285
Stateless: at encode time the source's own
286286
:meth:`UnboundedSource.get_checkpoint_mark_coder` is looked up from the
287-
restriction; at decode time the source is decoded FIRST and its coder
287+
restriction; at decode time the source is decoded first and its coder
288288
drives the checkpoint-mark decoding. This avoids passing source-specific
289289
coder state into the coder's constructor, which in turn lets
290290
:class:`_UnboundedSourceRestrictionProvider` and
291-
:class:`_ReadFromUnboundedSourceDoFn` be module-level classes (avoiding
292-
stdlib-pickle gotchas for closure-defined DoFns on some runners).
291+
:class:`_ReadFromUnboundedSourceDoFn` be module-level classes.
293292
294293
Wire shape: source_bytes / checkpoint_bytes / watermark / done /
295294
finalization_checkpoint_bytes -- the checkpoint and finalization bytes
@@ -432,8 +431,7 @@ def _try_claim_inner(self, out: list[Any]) -> bool:
432431
self._checkpoint_taken = True
433432
out[0] = _NO_DATA
434433
return False
435-
# No data right now (not EOF): refresh the watermark so process() can
436-
# advance it before deferring, then let process() self-checkpoint.
434+
# No data is available now. Refresh the watermark before deferring.
437435
self._restriction = dataclasses.replace(
438436
self._restriction, watermark=watermark)
439437
out[0] = _NO_DATA
@@ -514,9 +512,8 @@ class _UnboundedSourceRestrictionProvider(core.RestrictionProvider):
514512
Stateless module-level singleton (see :data:`_PROVIDER`): all
515513
source-specific state (e.g. the source's checkpoint coder) is derived
516514
per-call from the restriction's ``source`` field, which lets
517-
:class:`_ReadFromUnboundedSourceDoFn` live at module level too -- avoiding
518-
stdlib-pickle gotchas for closure-defined DoFns. The provider currently
519-
passes ``None`` for the ``options`` forwarded to
515+
:class:`_ReadFromUnboundedSourceDoFn` live at module level too. The provider
516+
currently passes ``None`` for the ``options`` forwarded to
520517
:meth:`UnboundedSource.split`.
521518
"""
522519
def __init__(self):
@@ -594,9 +591,8 @@ def truncate(self, element, restriction):
594591
class _ReadFromUnboundedSourceDoFn(core.DoFn):
595592
"""SDF wrapper driving an :class:`UnboundedReader` for one restriction.
596593
597-
Module-level (not nested inside ``ReadFromUnboundedSource.expand``) so stdlib
598-
``pickle`` -- not just cloudpickle -- can serialise the DoFn. The restriction
599-
provider is the module-level :data:`_PROVIDER` singleton.
594+
Module-level so stdlib pickle and cloudpickle can serialise the DoFn. The
595+
restriction provider is the module-level :data:`_PROVIDER` singleton.
600596
"""
601597
@core.DoFn.unbounded_per_element()
602598
def process(
@@ -621,15 +617,15 @@ def process(
621617
break
622618
record = holder[0]
623619
if record is _NO_DATA:
624-
# No data right now: advance the watermark and self-checkpoint so
625-
# the runner reschedules us after a short delay.
620+
# No data is available now: advance the watermark and self-checkpoint
621+
# so the runner reschedules us after a short delay.
626622
_set_watermark_if_greater(
627623
watermark_estimator, tracker.current_restriction().watermark)
628624
tracker.defer_remainder(
629625
Duration(seconds=_DEFAULT_POLL_INTERVAL_SECONDS))
630626
break
631-
# Advance the estimator with the source watermark (third slot), not
632-
# the record's event time.
627+
# The third tuple field is the source watermark. The record timestamp
628+
# remains the output event time.
633629
value, record_timestamp, source_watermark = record
634630
_set_watermark_if_greater(watermark_estimator, source_watermark)
635631
yield TimestampedValue(value, record_timestamp)
@@ -641,7 +637,7 @@ def process(
641637
if current is not initial and finalize_mark is not None:
642638
bundle_finalizer.register(finalize_mark.finalize_checkpoint)
643639
finally:
644-
# Best-effort reader release for the downstream-yield-raised path.
640+
# Release the reader on downstream-yield errors.
645641
inner_tracker = tracker
646642
if hasattr(inner_tracker, '_threadsafe_restriction_tracker'):
647643
inner_tracker = inner_tracker._threadsafe_restriction_tracker
@@ -668,7 +664,7 @@ def _set_watermark_if_greater(
668664

669665

670666
class ReadFromUnboundedSource(PTransform):
671-
"""Reads an :class:`UnboundedSource` via a Splittable ``DoFn``.
667+
"""Reads an :class:`UnboundedSource`.
672668
673669
Most users should prefer :class:`apache_beam.io.Read`, which dispatches an
674670
``UnboundedSource`` here automatically::
@@ -692,8 +688,8 @@ def expand(self, pbegin):
692688
| 'Create' >> core.Create([source])
693689
| 'ReadUnbounded' >> core.ParDo(_ReadFromUnboundedSourceDoFn()))
694690
# Surface an element type only when the global registry already maps it to
695-
# an equivalent coder; we don't mutate ``coders.registry`` (can't register a
696-
# parameterized coder by class without leaking/losing state).
691+
# an equivalent coder. Avoid mutating ``coders.registry`` for a
692+
# parameterized coder whose instance state would be lost.
697693
try:
698694
type_hint = output_coder.to_type_hint()
699695
except NotImplementedError:

sdks/python/apache_beam/io/unbounded_source_test.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@
5757

5858
# pylint: disable=expression-not-assigned
5959

60-
# Realistic (non-epoch) event-time base for the demo source.
60+
# Realistic event-time base away from the Unix epoch.
6161
_EVENT_TIME_BASE = Timestamp(1729987200) # 2024-10-27T00:00:00Z
6262

6363
# ------------------------------------------------------------------------------
64-
# In-memory demo source emitting integers 0..count-1 (event time
65-
# ``_EVENT_TIME_BASE + index``); self-terminates at EOF, resumes from
66-
# ``last_index + 1``, and splits into even/odd sub-sources when is_splittable.
64+
# In-memory demo source emitting integers 0..count-1 with event time
65+
# ``_EVENT_TIME_BASE + index``. It self-terminates at EOF, resumes from
66+
# ``last_index + 1``, and splits into even/odd sub-sources when configured.
6767
# ------------------------------------------------------------------------------
6868

6969

@@ -254,7 +254,7 @@ def default_output_coder(self):
254254

255255

256256
class _NoDataReader(UnboundedReader):
257-
"""Always reports 'no data right now' (watermark < MAX, so never EOF)."""
257+
"""Always reports temporary absence of data with watermark below MAX."""
258258
@override
259259
def start(self):
260260
return False
@@ -273,7 +273,7 @@ def get_current_timestamp(self):
273273

274274
@override
275275
def get_watermark(self):
276-
return Timestamp(0)
276+
return _EVENT_TIME_BASE
277277

278278
@override
279279
def get_checkpoint_mark(self):
@@ -478,7 +478,7 @@ def get_current(self):
478478

479479
@override
480480
def get_current_timestamp(self):
481-
return Timestamp(0)
481+
return _EVENT_TIME_BASE
482482

483483
@override
484484
def get_watermark(self):
@@ -550,7 +550,7 @@ def test_try_split_nonzero_declined(self):
550550
def test_no_data_returns_sentinel_without_finishing(self):
551551
tracker = _new_tracker(_NoDataSource())
552552
claimed, record = _claim(tracker)
553-
self.assertTrue(claimed) # not EOF
553+
self.assertTrue(claimed)
554554
self.assertIs(record, _NO_DATA)
555555
# A self-checkpoint is still possible (poll/resume path).
556556
self.assertIsNotNone(tracker.try_split(0))
@@ -703,11 +703,11 @@ def get_current(self):
703703

704704
@override
705705
def get_current_timestamp(self):
706-
return Timestamp(0)
706+
return _EVENT_TIME_BASE
707707

708708
@override
709709
def get_watermark(self):
710-
return Timestamp(0)
710+
return _EVENT_TIME_BASE
711711

712712
@override
713713
def get_checkpoint_mark(self):
@@ -732,9 +732,10 @@ def get_checkpoint_mark_coder(self):
732732

733733
tracker = _new_tracker(_BoomSource())
734734
_claim(tracker)
735-
# Helper must not propagate the reader's close() exception, otherwise the
736-
# DoFn's finally / split paths would mask the original error.
737-
tracker._close_reader_if_open()
735+
with self.assertLogs(_unbounded_source_module._LOGGER, 'WARNING') as logs:
736+
tracker._close_reader_if_open()
737+
self.assertTrue(
738+
any('Error closing UnboundedReader' in line for line in logs.output))
738739
self.assertIsNone(tracker._reader)
739740

740741

@@ -845,11 +846,11 @@ def get_current(self):
845846

846847
@override
847848
def get_current_timestamp(self):
848-
return Timestamp(0)
849+
return _EVENT_TIME_BASE
849850

850851
@override
851852
def get_watermark(self):
852-
return Timestamp(0)
853+
return _EVENT_TIME_BASE
853854

854855
@override
855856
def get_checkpoint_mark(self):
@@ -963,9 +964,8 @@ def test_non_source_argument_raises(self):
963964

964965

965966
class StdlibPicklabilityTest(unittest.TestCase):
966-
"""``_ReadFromUnboundedSourceDoFn`` and ``_PROVIDER`` are module-level (not
967-
nested in ``ReadFromUnboundedSource.expand``) specifically so stdlib pickle --
968-
not just cloudpickle -- can serialise them.
967+
"""``_ReadFromUnboundedSourceDoFn`` and ``_PROVIDER`` are module-level so both
968+
stdlib pickle and cloudpickle can serialise them.
969969
"""
970970
def test_module_level_dofn_round_trips_through_stdlib_pickle(self):
971971
restored = pickle.loads(
@@ -1037,15 +1037,19 @@ def test_read_expand_lazy_imports_unbounded_source(self):
10371037
import apache_beam as beam
10381038
from apache_beam import coders
10391039
import apache_beam.io.iobase as iobase
1040+
from typing_extensions import override
10401041
# Now import unbounded_source AFTER iobase, then verify Read.expand
10411042
# successfully lazy-imports ReadFromUnboundedSource:
10421043
from apache_beam.io.unbounded_source import UnboundedSource
10431044
10441045
class _S(UnboundedSource):
1046+
@override
10451047
def split(self, n, options=None):
10461048
return [self]
1049+
@override
10471050
def create_reader(self, o, cp):
10481051
return None
1052+
@override
10491053
def get_checkpoint_mark_coder(self):
10501054
return coders.PickleCoder()
10511055

0 commit comments

Comments
 (0)