Skip to content

Commit 11dbf62

Browse files
committed
Polish UnboundedSource docs, cleanup, tests
1 parent 7497cb8 commit 11dbf62

2 files changed

Lines changed: 37 additions & 33 deletions

File tree

sdks/python/apache_beam/io/unbounded_source.py

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,7 @@ def get_checkpoint_mark_coder(self):
8989
import logging
9090
from typing import Any
9191
from typing import Iterable
92-
from typing import List
9392
from typing import Optional
94-
from typing import Tuple
9593

9694
from apache_beam import coders
9795
from apache_beam.coders.coders import BooleanCoder
@@ -120,15 +118,15 @@ def get_checkpoint_mark_coder(self):
120118

121119
_LOGGER = logging.getLogger(__name__)
122120

123-
# Holder sentinel for "no data right now" (distinct from end-of-stream).
121+
# Sentinel used when a reader has no data available right now.
122+
# This is distinct from end-of-stream.
124123
_NO_DATA = object()
125124

126125
_DEFAULT_POLL_INTERVAL_SECONDS = 1.0
127126
_DEFAULT_DESIRED_NUM_SPLITS = 20
128127

129128
# ------------------------------------------------------------------------------
130-
# Public abstract base classes (NotImplementedError rather than abc.ABC, per
131-
# iobase.py style).
129+
# Public abstract base classes.
132130
# ------------------------------------------------------------------------------
133131

134132

@@ -233,9 +231,9 @@ def create_reader(
233231
def get_checkpoint_mark_coder(self) -> Coder:
234232
"""Returns the coder for this source's :class:`CheckpointMark` instances.
235233
236-
Called once at pipeline construction (graph build), NOT per-bundle. Do not
237-
perform I/O here. Subclasses MUST override; the default raises with a
238-
helpful message naming the subclass.
234+
The SDK may call this while encoding or decoding source restrictions.
235+
Implementations should be deterministic, side-effect free, and should not
236+
perform I/O.
239237
"""
240238
raise NotImplementedError(
241239
'%s must override get_checkpoint_mark_coder() to return a Coder for '
@@ -383,7 +381,7 @@ def _close_reader_if_open(self) -> None:
383381
def current_restriction(self) -> _UnboundedSourceRestriction:
384382
return self._restriction
385383

386-
def try_claim(self, out: List[Any]) -> bool:
384+
def try_claim(self, out: list[Any]) -> bool:
387385
"""Advances the reader by one record.
388386
389387
``out[0]`` receives ``(value, record_timestamp, source_watermark)`` on the
@@ -401,7 +399,7 @@ def try_claim(self, out: List[Any]) -> bool:
401399
self._close_reader_if_open()
402400
raise
403401

404-
def _try_claim_inner(self, out: List[Any]) -> bool:
402+
def _try_claim_inner(self, out: list[Any]) -> bool:
405403
if self._restriction.is_done:
406404
out[0] = _NO_DATA
407405
return False
@@ -443,7 +441,7 @@ def _try_claim_inner(self, out: List[Any]) -> bool:
443441

444442
def try_split(
445443
self, fraction_of_remainder
446-
) -> Optional[Tuple[_UnboundedSourceRestriction,
444+
) -> Optional[tuple[_UnboundedSourceRestriction,
447445
_UnboundedSourceRestriction]]:
448446
"""Cuts a checkpoint, returning (primary, residual) or None.
449447
@@ -637,28 +635,27 @@ def process(
637635
yield TimestampedValue(value, record_timestamp)
638636
finally:
639637
current = tracker.current_restriction()
640-
# Register finalization only when a checkpoint was cut this bundle (the
641-
# restriction identity changed), reading the explicit finalize channel.
642-
finalize_mark = current.finalization_checkpoint_mark
643-
if current is not initial and finalize_mark is not None:
644-
bundle_finalizer.register(finalize_mark.finalize_checkpoint)
645-
# Best-effort reader release for the downstream-yield-raised path (the
646-
# tracker already closes on EOF, split, and reader-method failures),
647-
# reached through the wrapper chain since the view hides the tracker.
648-
inner_tracker = tracker
649-
if hasattr(inner_tracker, '_threadsafe_restriction_tracker'):
650-
inner_tracker = inner_tracker._threadsafe_restriction_tracker
651-
if hasattr(inner_tracker, '_restriction_tracker'):
652-
inner_tracker = inner_tracker._restriction_tracker
653-
if isinstance(inner_tracker, _UnboundedSourceRestrictionTracker):
654-
inner_tracker._close_reader_if_open()
655-
else:
656-
_LOGGER.warning(
657-
'UnboundedSource DoFn could not close a reader because the SDF '
658-
'tracker wrapper did not expose _UnboundedSourceRestrictionTracker '
659-
'(got %s). Reader resources may remain open until garbage '
660-
'collection.',
661-
type(inner_tracker).__name__)
638+
try:
639+
# Register finalization only when a checkpoint was cut this bundle.
640+
finalize_mark = current.finalization_checkpoint_mark
641+
if current is not initial and finalize_mark is not None:
642+
bundle_finalizer.register(finalize_mark.finalize_checkpoint)
643+
finally:
644+
# Best-effort reader release for the downstream-yield-raised path.
645+
inner_tracker = tracker
646+
if hasattr(inner_tracker, '_threadsafe_restriction_tracker'):
647+
inner_tracker = inner_tracker._threadsafe_restriction_tracker
648+
if hasattr(inner_tracker, '_restriction_tracker'):
649+
inner_tracker = inner_tracker._restriction_tracker
650+
if isinstance(inner_tracker, _UnboundedSourceRestrictionTracker):
651+
inner_tracker._close_reader_if_open()
652+
else:
653+
_LOGGER.warning(
654+
'UnboundedSource DoFn could not close a reader because the SDF '
655+
'tracker wrapper did not expose '
656+
'_UnboundedSourceRestrictionTracker (got %s). Reader resources '
657+
'may remain open until garbage collection.',
658+
type(inner_tracker).__name__)
662659

663660

664661
def _set_watermark_if_greater(
@@ -687,6 +684,9 @@ def __init__(self, source: UnboundedSource):
687684
def expand(self, pbegin):
688685
source = self._source
689686
output_coder = source.default_output_coder()
687+
# The source is the SDF element used to derive the initial restriction.
688+
# process() reads from the restriction, so it does not use the element
689+
# directly.
690690
output = (
691691
pbegin
692692
| 'Create' >> core.Create([source])

sdks/python/apache_beam/io/unbounded_source_test.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,17 +214,21 @@ class _PrefixStrCoder(coders.Coder):
214214
def __init__(self, prefix):
215215
self._prefix = prefix
216216

217+
@override
217218
def encode(self, value):
218219
if not value.startswith(self._prefix):
219220
raise ValueError('expected %r prefix' % self._prefix)
220221
return value[len(self._prefix):].encode('utf-8')
221222

223+
@override
222224
def decode(self, value):
223225
return self._prefix + value.decode('utf-8')
224226

227+
@override
225228
def is_deterministic(self):
226229
return True
227230

231+
@override
228232
def to_type_hint(self):
229233
return str
230234

0 commit comments

Comments
 (0)