Skip to content

Commit 8871dc5

Browse files
committed
docs: document per-bundle read-bound gap, fix stale docstrings
- unbounded_source.py: add the missing per-bundle read-bound limitation to the out-of-scope list (process() reads until no-data/EOF; a saturated source keeps one bundle running without committing watermark/checkpoint, unlike Java's OutputAndTimeBoundedSplittableProcessElementInvoker). - unbounded_source_test.py: correct two stale docstrings/messages that still described the DoFn as defined inside ReadFromUnboundedSource.expand; it is now the module-level _ReadFromUnboundedSourceDoFn with a module-level _PROVIDER.
1 parent 8a4328a commit 8871dc5

2 files changed

Lines changed: 18 additions & 6 deletions

File tree

sdks/python/apache_beam/io/unbounded_source.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ class MySource(UnboundedSource):
4444
deterministic reader close on EOF / split / exception, and bundle finalization.
4545
4646
Out of scope for this PoC (tracked under #19137):
47+
* Per-bundle read bound. ``process()`` reads every record the reader makes
48+
available and only yields control on a no-data poll (self-checkpoint) or
49+
EOF. A genuinely saturated source that always has data will therefore
50+
keep one bundle running indefinitely -- the watermark advances on the
51+
in-bundle estimator but is not committed downstream, and no checkpoint /
52+
finalize is cut, until the source idles. Java bounds each SDF call via
53+
``OutputAndTimeBoundedSplittableProcessElementInvoker`` (a max element
54+
count / wall-clock per invocation); the equivalent forced-defer is W2
55+
work. Self-terminating finite sources (and any source that periodically
56+
returns no-data) are unaffected.
4757
* Record-id-based deduplication (Java's ``ValueWithRecordId``).
4858
* Backlog-byte reporting (``restriction_size`` is a constant 1; per-restriction
4959
progress is binary 0.0 / 1.0).

sdks/python/apache_beam/io/unbounded_source_test.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -889,11 +889,12 @@ def test_poll_interval_must_be_positive(self):
889889

890890

891891
class CloudpicklePicklabilityTest(unittest.TestCase):
892-
"""The DoFn class is defined inside ``ReadFromUnboundedSource.expand`` so it
893-
can close over the source-specific provider. Beam's default pickler is
894-
cloudpickle; stdlib pickle would fail on a closure-defined class. This is a
895-
regression guard for cross-runner portability (Dataflow / Flink portable
896-
workers also use cloudpickle).
892+
"""``_ReadFromUnboundedSourceDoFn`` and ``_PROVIDER`` are module-level (not
893+
nested in ``ReadFromUnboundedSource.expand``) specifically so stdlib pickle --
894+
not just cloudpickle -- can serialise them. Beam's default pickler is
895+
cloudpickle, but some runners fall back to stdlib pickle, which fails on
896+
closure-defined classes. This is a regression guard for cross-runner
897+
portability (Dataflow / Flink portable workers also use cloudpickle).
897898
"""
898899
def test_transform_round_trips_through_cloudpickle(self):
899900
from apache_beam.internal import pickler
@@ -1043,7 +1044,8 @@ def test_dofn_finally_closes_reader_on_generator_close(self):
10431044
_wait_for_marker(marker),
10441045
'DoFn finally did not invoke reader.close() when the generator '
10451046
'was closed (GeneratorExit) -- reader leaked. Private-chain '
1046-
'close in unbounded_source.py:expand finally may be broken.')
1047+
'close in _ReadFromUnboundedSourceDoFn.process finally may be '
1048+
'broken.')
10471049
finally:
10481050
if os.path.exists(marker):
10491051
os.unlink(marker)

0 commit comments

Comments
 (0)