Commit a50743f
committed
[Python] Add UnboundedSource SDF wrapper (#19137)
Brings Java's ``UnboundedSource`` / ``UnboundedReader`` / ``CheckpointMark``
abstractions to the Python SDK as a Splittable-DoFn wrapper runnable on the
portable Fn API (DirectRunner / FnApiRunner). Wires the new source type into
``iobase.Read.expand()`` so ``p | beam.io.Read(my_unbounded_source)``
dispatches alongside the existing ``BoundedSource`` branch. Loosely inspired
by Java's ``Read.UnboundedSourceAsSDFWrapperFn``; the streaming-SDF template
followed for the process loop / watermark / defer plumbing is
``apache_beam.transforms.periodicsequence``.
addresses #19137
What's added
------------
``sdks/python/apache_beam/io/unbounded_source.py``
Public ABCs (``CheckpointMark``, ``UnboundedReader``, ``UnboundedSource``,
``ReadFromUnboundedSource``) plus the SDF wrapper internals
(``_UnboundedSourceRestriction``, ``_UnboundedSourceRestrictionCoder``,
``_UnboundedSourceRestrictionTracker``,
``_UnboundedSourceRestrictionProvider``).
``sdks/python/apache_beam/io/unbounded_source_test.py``
Deterministic tests covering ABC contracts, restriction coder round-trip,
tracker state machine (claim / split / EOF / no-data / check_done /
progress / is_bounded), finalize idempotency, fan-out via ``source.split``,
source-watermark vs. record-event-time, finalize vs. resume channel
separation, tracker-internal exception close on reader-method failures,
DoFn generator close (unit + integration with downstream raising ``Map``),
cloudpickle round-trip, circular import in three subprocess orderings,
per-bundle read caps, and an end-to-end DirectRunner pipeline.
What's changed in iobase.py
---------------------------
* ``Read.expand`` gains an ``UnboundedSource`` branch (function-local lazy
import to break the iobase <-> unbounded_source cycle) that delegates to
``ReadFromUnboundedSource``.
* ``Read.to_runner_api_parameter`` widens the source ``isinstance`` to
``(BoundedSource, UnboundedSource)``, writing ``READ.urn`` +
``ReadPayload(is_bounded=UNBOUNDED)``. The ``BoundedSource`` branch keeps
its ``is_bounded()`` conditional so a source reporting unbounded still
serializes as UNBOUNDED. Decode rides the existing ``PICKLED_SOURCE`` URN
on ``SourceBase``.
Correctness covered
-------------------
* Data-path watermark uses ``reader.get_watermark()`` (Java ``Read.java:594``
parity), not the per-record event time. Each record is emitted before the
watermark estimator advances, so a reader that reports ``MAX_TIMESTAMP`` on
the same claim as its final record cannot push the output watermark past
that record.
* Restriction has separate ``checkpoint_mark`` (resume) and
``finalization_checkpoint_mark`` (commit hook) channels; coder is a fixed
5-tuple. The split residual gets its own clone of the checkpoint mark so a
``finalize_checkpoint()`` that mutates the primary's mark cannot corrupt the
residual's resume position.
* A continuously busy reader cannot stay in ``process()`` forever: the
has-data path is capped at ``max_records_per_bundle`` (10000) or
``max_read_time_seconds`` (10), then self-checkpoints via
``defer_remainder`` so the runner commits progress and resumes. Caps are
validated at the transform boundary, and the read deadline is armed on the
first emitted record so reader startup is excluded. ``UnboundedReader.advance``
must not block, since the caps are enforced only between records.
* Reader is closed on every exit path -- tracker-internal close on EOF /
split / reader-method exception; DoFn ``finally`` defense-in-depth for
yield / downstream raise.
* ``finalize_checkpoint()`` failures are logged and swallowed to match the
``CheckpointMark`` best-effort contract; finalize is idempotent on retry.
* EOF advances the watermark estimator to ``MAX_TIMESTAMP`` so downstream
event-time windows can close.
* ``UnboundedSource.split(desired_num_splits=20, options)`` is honoured;
returned sub-sources are validated as ``UnboundedSource``; on
split-exception the provider falls back to a single restriction.
* ``default_output_coder`` reaches the output PCollection via
``coders.registry.register_coder`` + ``element_type``.
Out of scope (tracked under #19137)
-----------------------------------
Listed exhaustively in the module docstring:
* Record-id-based deduplication (Java's ``ValueWithRecordId``).
* Backlog-byte reporting (``restriction_size`` is constant 1).
* Dynamic split fractions / runner-initiated work stealing.
* Source-specific checkpoint coders threaded through the restriction coder.
* Reader caching across bundles.
* Runner-side ``IsBounded.UNBOUNDED`` dispatch in
``bundle_processor.IMPULSE_READ_TRANSFORM``.1 parent 8364da7 commit a50743f
4 files changed
Lines changed: 2126 additions & 5 deletions
File tree
- sdks/python/apache_beam/io
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
918 | 918 | | |
919 | 919 | | |
920 | 920 | | |
921 | | - | |
| 921 | + | |
| 922 | + | |
| 923 | + | |
| 924 | + | |
922 | 925 | | |
923 | 926 | | |
924 | 927 | | |
| |||
944 | 947 | | |
945 | 948 | | |
946 | 949 | | |
| 950 | + | |
| 951 | + | |
| 952 | + | |
| 953 | + | |
| 954 | + | |
947 | 955 | | |
948 | 956 | | |
949 | 957 | | |
| |||
993 | 1001 | | |
994 | 1002 | | |
995 | 1003 | | |
| 1004 | + | |
| 1005 | + | |
| 1006 | + | |
| 1007 | + | |
996 | 1008 | | |
997 | 1009 | | |
998 | 1010 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
21 | 21 | | |
22 | 22 | | |
23 | 23 | | |
24 | | - | |
25 | | - | |
26 | 24 | | |
27 | | - | |
28 | | - | |
| 25 | + | |
29 | 26 | | |
30 | 27 | | |
| 28 | + | |
| 29 | + | |
31 | 30 | | |
32 | 31 | | |
| 32 | + | |
| 33 | + | |
33 | 34 | | |
34 | 35 | | |
35 | 36 | | |
| |||
220 | 221 | | |
221 | 222 | | |
222 | 223 | | |
| 224 | + | |
| 225 | + | |
| 226 | + | |
| 227 | + | |
| 228 | + | |
| 229 | + | |
| 230 | + | |
| 231 | + | |
| 232 | + | |
| 233 | + | |
| 234 | + | |
| 235 | + | |
| 236 | + | |
| 237 | + | |
| 238 | + | |
| 239 | + | |
| 240 | + | |
| 241 | + | |
| 242 | + | |
| 243 | + | |
| 244 | + | |
| 245 | + | |
| 246 | + | |
| 247 | + | |
| 248 | + | |
| 249 | + | |
| 250 | + | |
| 251 | + | |
| 252 | + | |
| 253 | + | |
| 254 | + | |
| 255 | + | |
| 256 | + | |
| 257 | + | |
| 258 | + | |
| 259 | + | |
| 260 | + | |
| 261 | + | |
| 262 | + | |
| 263 | + | |
223 | 264 | | |
224 | 265 | | |
0 commit comments