[Python] Add UnboundedSource SDF wrapper (#19137)#38724
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request brings streaming source capabilities to the Python SDK by implementing the UnboundedSource abstraction via a Splittable DoFn wrapper. This change allows developers to define unbounded sources that are compatible with the portable Fn API, significantly improving the parity between Python and Java streaming I/O implementations. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a minimal, self-contained UnboundedSource implementation for the Python SDK, allowing Java-like unbounded source abstractions to run on the portable Fn API path via a Splittable DoFn. The feedback identifies several key improvement opportunities: refactoring _UnboundedSourceRestrictionCoder to decode the source dynamically so that the provider and DoFn can be defined as standard module-level classes (avoiding potential serialization issues with standard pickle); correcting the instantiation of RestrictionProgress to use completed_work and remaining_work instead of fraction; registering the source's coder on the pipeline-specific coder_registry rather than the global registry to prevent side effects; and improving the robustness of the dynamic tracker unwrapping logic.
| class _UnboundedSourceRestrictionCoder(Coder): | ||
| """Encodes :class:`_UnboundedSourceRestriction` as a fixed 5-tuple. | ||
|
|
||
| Shape: pickled source + nullable resume checkpoint (encoded with the | ||
| source's own checkpoint coder if provided, else pickle) + watermark + | ||
| done flag + nullable finalization checkpoint (same coder as resume). | ||
| """ | ||
| def __init__(self, checkpoint_mark_coder: Optional[Coder] = None): | ||
| nullable_checkpoint = NullableCoder( | ||
| checkpoint_mark_coder or _MemoizingPickleCoder()) | ||
| self._tuple_coder = TupleCoder(( | ||
| _MemoizingPickleCoder(), # source | ||
| nullable_checkpoint, # checkpoint_mark (RESUME state, may be None) | ||
| TimestampCoder(), # watermark | ||
| BooleanCoder(), # is_done | ||
| nullable_checkpoint)) # finalization_checkpoint_mark (commit hook) | ||
|
|
||
| def encode(self, restriction: '_UnboundedSourceRestriction') -> bytes: | ||
| return self._tuple_coder.encode(( | ||
| restriction.source, | ||
| restriction.checkpoint_mark, | ||
| restriction.watermark, | ||
| restriction.is_done, | ||
| restriction.finalization_checkpoint_mark)) | ||
|
|
||
| def decode(self, encoded: bytes) -> '_UnboundedSourceRestriction': | ||
| (source, checkpoint_mark, watermark, is_done, | ||
| finalization_checkpoint_mark) = self._tuple_coder.decode(encoded) | ||
| return _UnboundedSourceRestriction( | ||
| source=source, | ||
| checkpoint_mark=checkpoint_mark, | ||
| watermark=watermark, | ||
| is_done=is_done, | ||
| finalization_checkpoint_mark=finalization_checkpoint_mark) | ||
|
|
||
| def is_deterministic(self) -> bool: | ||
| # The source and checkpoint are pickled, which is not guaranteed | ||
| # deterministic; matches the bounded SDF restriction coder in iobase.py. | ||
| # NOTE on forward-compat: the wire format is a fixed 5-tuple. Adding a | ||
| # 6th field in a future version would break decoding of in-flight blobs | ||
| # from older workers. If/when another field is needed, switch this to a | ||
| # length-prefixed or version-tagged encoding -- out of scope for W1. | ||
| return False | ||
|
|
There was a problem hiding this comment.
The _UnboundedSourceRestrictionCoder currently requires the checkpoint_mark_coder at initialization time. This forces the _UnboundedSourceRestrictionProvider and the _ReadFromUnboundedSourceDoFn to be defined dynamically inside expand() to close over the source-specific coder. Defining DoFns inside methods can cause serialization issues with standard pickle on some runners.
By making _UnboundedSourceRestrictionCoder dynamic (decoding the source first, then using its coder to decode the checkpoint marks), we can remove the initialization dependency and define both the provider and the DoFn as standard module-level classes.
class _UnboundedSourceRestrictionCoder(Coder):
"""Encodes :class:`_UnboundedSourceRestriction` as a fixed 5-tuple.
Shape: pickled source + nullable resume checkpoint (encoded with the
source's own checkpoint coder if provided, else pickle) + watermark +
done flag + nullable finalization checkpoint (same coder as resume).
"""
def encode(self, restriction: '_UnboundedSourceRestriction') -> bytes:
from apache_beam.coders.coders import BytesCoder
source_coder = _MemoizingPickleCoder()
source_bytes = source_coder.encode(restriction.source)
checkpoint_coder = NullableCoder(restriction.source.get_checkpoint_mark_coder())
checkpoint_bytes = checkpoint_coder.encode(restriction.checkpoint_mark)
finalize_bytes = checkpoint_coder.encode(restriction.finalization_checkpoint_mark)
return TupleCoder((
BytesCoder(),
BytesCoder(),
TimestampCoder(),
BooleanCoder(),
BytesCoder()
)).encode((
source_bytes,
checkpoint_bytes,
restriction.watermark,
restriction.is_done,
finalize_bytes
))
def decode(self, encoded: bytes) -> '_UnboundedSourceRestriction':
from apache_beam.coders.coders import BytesCoder
source_bytes, checkpoint_bytes, watermark, is_done, finalize_bytes = TupleCoder((
BytesCoder(),
BytesCoder(),
TimestampCoder(),
BooleanCoder(),
BytesCoder()
)).decode(encoded)
source = _MemoizingPickleCoder().decode(source_bytes)
checkpoint_coder = NullableCoder(source.get_checkpoint_mark_coder())
checkpoint_mark = checkpoint_coder.decode(checkpoint_bytes)
finalization_checkpoint_mark = checkpoint_coder.decode(finalize_bytes)
return _UnboundedSourceRestriction(
source=source,
checkpoint_mark=checkpoint_mark,
watermark=watermark,
is_done=is_done,
finalization_checkpoint_mark=finalization_checkpoint_mark
)
def is_deterministic(self) -> bool:
return False| def current_progress(self) -> 'iobase.RestrictionProgress': | ||
| # Backlog-based progress is out of scope; report a coarse done/not-done | ||
| # fraction so the runner has a (recommended) signal. | ||
| return iobase.RestrictionProgress( | ||
| fraction=1.0 if self._restriction.is_done else 0.0) |
There was a problem hiding this comment.
In Apache Beam, RestrictionProgress does not accept a fraction parameter. Its standard parameters are completed_work and remaining_work. Passing fraction directly will result in it being ignored or placed in **kwargs without being recognized by the runner.
| def current_progress(self) -> 'iobase.RestrictionProgress': | |
| # Backlog-based progress is out of scope; report a coarse done/not-done | |
| # fraction so the runner has a (recommended) signal. | |
| return iobase.RestrictionProgress( | |
| fraction=1.0 if self._restriction.is_done else 0.0) | |
| def current_progress(self) -> 'iobase.RestrictionProgress': | |
| # Backlog-based progress is out of scope; report a coarse done/not-done | |
| # signal so the runner has a (recommended) signal. | |
| if self._restriction.is_done: | |
| return iobase.RestrictionProgress(completed_work=1.0, remaining_work=0.0) | |
| return iobase.RestrictionProgress(completed_work=0.0, remaining_work=1.0) |
| if type_hint is not None: | ||
| try: | ||
| coders.registry.register_coder(type_hint, type(output_coder)) |
There was a problem hiding this comment.
Modifying the global coders.registry can cause side effects across different pipelines running in the same process (e.g., in multi-tenant service environments or unit tests). It is safer to register the coder on the pipeline-specific coder_registry.
| if type_hint is not None: | |
| try: | |
| coders.registry.register_coder(type_hint, type(output_coder)) | |
| if type_hint is not None: | |
| try: | |
| pbegin.pipeline.coder_registry.register_coder(type_hint, type(output_coder)) |
There was a problem hiding this comment.
Pipeline has no coder_registry attr
| finally: | ||
| current = tracker.current_restriction() | ||
| # Register finalization only when a real checkpoint was cut this | ||
| # bundle. Restriction identity (`current is not initial`) mirrors | ||
| # Java's reference-equality gate in Read.java. We read the explicit | ||
| # finalization channel, NOT ``checkpoint_mark`` (which is the | ||
| # RESUME state and may belong to the residual after a split). | ||
| finalize_mark = current.finalization_checkpoint_mark | ||
| if current is not initial and finalize_mark is not None: | ||
| bundle_finalizer.register(finalize_mark.finalize_checkpoint) | ||
| # Release the underlying reader on every exit path, including the | ||
| # exception path where a downstream yield raised between two | ||
| # try_claim calls (reader-method failures are already closed inside | ||
| # the tracker). ``RestrictionTrackerView`` does not expose the inner | ||
| # tracker, so traverse the (stable-but-private) wrapper chain. If | ||
| # the chain changes in a future Beam version we log a warning and | ||
| # let GC eventually close; never call ``close`` on an unrelated | ||
| # tracker subclass. | ||
| threadsafe = getattr(tracker, '_threadsafe_restriction_tracker', None) | ||
| inner_tracker = getattr(threadsafe, '_restriction_tracker', None) | ||
| if isinstance(inner_tracker, _UnboundedSourceRestrictionTracker): | ||
| inner_tracker._close_reader_if_open() | ||
| elif inner_tracker is not None or threadsafe is not None: | ||
| _LOGGER.warning( | ||
| 'UnboundedSource DoFn could not reach the inner tracker via ' | ||
| '_threadsafe_restriction_tracker._restriction_tracker; reader ' | ||
| 'close on exception path skipped, relying on GC. Beam SDF ' | ||
| 'wrapper internals may have changed -- file an issue.') |
There was a problem hiding this comment.
The logic to unwrap the inner tracker from tracker assumes a specific nesting structure (_threadsafe_restriction_tracker -> _restriction_tracker). If the tracker is not wrapped (e.g., in some testing or direct execution scenarios), or if the wrapper chain changes, this lookup will fail. A more robust approach is to check for the attributes dynamically and fallback gracefully.
finally:
current = tracker.current_restriction()
# Register finalization only when a real checkpoint was cut this
# bundle. Restriction identity (`current is not initial`) mirrors
# Java's reference-equality gate in Read.java. We read the explicit
# finalization channel, NOT ``checkpoint_mark`` (which is the
# RESUME state and may belong to the residual after a split).
finalize_mark = current.finalization_checkpoint_mark
if current is not initial and finalize_mark is not None:
bundle_finalizer.register(finalize_mark.finalize_checkpoint)
# Release the underlying reader on every exit path, including the
# exception path where a downstream yield raised between two
# try_claim calls (reader-method failures are already closed inside
# the tracker). ``RestrictionTrackerView`` does not expose the inner
# tracker, so traverse the (stable-but-private) wrapper chain. If
# the chain changes in a future Beam version we log a warning and
# let GC eventually close; never call ``close`` on an unrelated
# tracker subclass.
inner_tracker = tracker
if hasattr(inner_tracker, '_threadsafe_restriction_tracker'):
inner_tracker = getattr(inner_tracker, '_threadsafe_restriction_tracker')
if hasattr(inner_tracker, '_restriction_tracker'):
inner_tracker = getattr(inner_tracker, '_restriction_tracker')
if isinstance(inner_tracker, _UnboundedSourceRestrictionTracker):
inner_tracker._close_reader_if_open()
else:
_LOGGER.warning(
'UnboundedSource DoFn could not reach the inner tracker; reader '
'close on exception path skipped, relying on GC. Beam SDF '
'wrapper internals may have changed -- file an issue.')|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
dcbcdd5 to
9a71653
Compare
| @@ -918,7 +919,11 @@ def __init__(self, source: SourceBase) -> None: | |||
| """Initializes a Read transform. | |||
|
|
|||
| Args: | |||
| source: Data source to read from. | |||
| source: Data source to read from. A ``BoundedSource`` is wrapped in the | |||
| bounded SDF reader; an ``UnboundedSource`` is dispatched through | |||
There was a problem hiding this comment.
pydoc audiences are Beam users, not necessarily mentioning the implement details for BoundedSource or UnboundedSource (like bounded SDF reader etc)
However it's worthwhile to mention source can be a BoundedSource, UnboundedSource, PTransform, or treat Read itself as a primitive (relay to runner implementation)
| # limitations under the License. | ||
| # | ||
|
|
||
| """A minimal, self-contained ``UnboundedSource`` for the Python SDK. |
There was a problem hiding this comment.
Again, pydoc is user faced documentation. Information regarding backgrounds (GSoC, Java inspired) are less relevant. Also this API is publicly faced. We don't use phrasing like "A minimal, self-contained" here
Instead, consider formal statement like UnboundedSource support is currently experimental in Python SDK.
| ``UnboundedSourceRestriction(source, checkpoint, watermark)`` plus an explicit | ||
| ``is_done`` flag for the terminal (MAX-watermark) transition and a separate | ||
| ``finalization_checkpoint_mark`` so a done primary can carry a commit-hook | ||
| without polluting the RESUME-state ``checkpoint_mark`` (matches W1 design |
There was a problem hiding this comment.
(matches W1 design ...
Not suitable for pydoc. We need a clean-up of all pydoc and comments
| finalization_checkpoint_mark=None) | ||
|
|
||
| def restriction_size(self, element, restriction) -> int: | ||
| # Backlog estimation is out of scope; report a constant non-negative size. |
There was a problem hiding this comment.
Need further check if this statement is true. Nevertheless please use TODO convention e.g. TODO(https://github.com/apache/beam/issues/19137) implement backlog estimation
| holder = [None] | ||
| if not tracker.try_claim(holder): | ||
| # EOF (restriction is_done==True, watermark already set to MAX in | ||
| # the tracker). Mirrors Java Read.java:625 -- advance the |
There was a problem hiding this comment.
Mirrors Java Read.java:625 <-- these kind of comments are not necessary and will soon out-of-date as code base evolves.
| @core.DoFn.unbounded_per_element() | ||
| def process( | ||
| self, | ||
| unused_element, |
There was a problem hiding this comment.
since the element is unused. We may not need to have Impulse | core.Map(lambda _: source) at all in ReadFromUnboundedSource
|
|
||
| ## I/Os | ||
|
|
||
| * (Python) Added a Splittable-DoFn `UnboundedSource` wrapper bringing Java's `UnboundedSource` semantics (checkpoint-based pause/resume, monotonic watermark reporting, bundle finalization) to the Python SDK via the new `apache_beam.io.unbounded_source` module (`UnboundedSource`, `UnboundedReader`, `CheckpointMark`, `ReadFromUnboundedSource`). Runs on the portable Fn API path (e.g. the default DirectRunner) ([#19137](https://github.com/apache/beam/issues/19137)). |
There was a problem hiding this comment.
We can defer the announcement after milestones completed (at least ValidatesRunner tests confirmed passing)
| from apache_beam.io.unbounded_source import ( | ||
| UnboundedSource, UnboundedReader, CheckpointMark, ReadFromUnboundedSource) | ||
|
|
||
| class MySource(UnboundedSource): |
There was a problem hiding this comment.
The module doc is an important entry point showing how to use these classes.
List the necessarily methods for MySource implementation here.
As it involves create_reader(...), also add examples of UnboundedReader here
|
|
||
| with beam.Pipeline() as p: | ||
| p | ReadFromUnboundedSource(MySource()) | beam.Map(print) | ||
| # Equivalent (since iobase.Read.expand dispatches on source type): |
There was a problem hiding this comment.
I think we should just recommend beam.io.Read(MySource()) in public doc.
I notice there is a polling configuration in ReadFromUnboundedSource transform. However there is no Java correspondence in BoundedSourceAsSDFWrapperFn. Is this something new or what is the consideration here?
Instead, Java implementation has a DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS of 10 minutes controls the frequency of self-checkpointing
There was a problem hiding this comment.
Thanks for pointing this out. I revisited this and removed poll_interval_seconds from the public
ReadFromUnboundedSource API.
The Python wrapper still needs an internal delay on the no-data path because the portable SDF implementation
resumes work via tracker.defer_remainder(...); if no delay is provided, the residual is scheduled immediately. This is now just a private _DEFAULT_POLL_INTERVAL_SECONDS constant used by the wrapper, so users should not
configure it and should normally use beam.io.Read(MySource()).
I also understand DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS in Java as the bundle-finalization callback deadline rather than a user-facing polling option, so I did not add a corresponding public option in Python in the lastest commit.
| windows can close), checkpoint-based pause/resume (``defer_remainder``), | ||
| deterministic reader close on EOF / split / exception, and bundle finalization. | ||
|
|
||
| Out of scope for this PoC (tracked under #19137): |
There was a problem hiding this comment.
"Out of scope for this PoC ..." statements not suitable in a public pydoc, see above
d294106 to
85f10ef
Compare
|
Assigning reviewers: R: @tvalentyn for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Hi @Abacn I pushed an update addressing your latest review feedback. Summary:
Verified: Could you please take another look when you have time? |
e800e7b to
0750e11
Compare
|
Hi @Abacn, thanks for the continued review. I pushed one squashed follow-up commit after the reviewed commit for incremental review. Changes:
Verified locally:
I would appreciate another look when you have time. Thanks again. |
79add18 to
99e3cd0
Compare
…urce wrapper Bump trigger files to run PostCommit ValidatesRunner on PR apache#38724. Revert before merge.
1dc2bc6 to
99e3cd0
Compare
Add test_unbounded_source_read to PortableRunnerTest so the portable ValidatesRunner suites exercise the UnboundedSource SDF wrapper end to end: read a self-terminating source through the job service, assert the elements and that the EOF MAX_TIMESTAMP watermark lets a downstream FixedWindows + GroupByKey fire. The embedded portable runner variants, the Flink suites, and PrismRunnerTest inherit the test. SparkRunnerTest skips it because portable Spark does not execute SDFs (apache#19468), matching its other SDF test skips. Includes the UnboundedSource SDF wrapper from apache#38724, which this PR is stacked on.
Add test_unbounded_source_read to PortableRunnerTest so the portable ValidatesRunner suites exercise the UnboundedSource SDF wrapper end to end: read a self-terminating source through the job service, assert the elements and that the EOF MAX_TIMESTAMP watermark lets a downstream FixedWindows + GroupByKey fire. The embedded portable runner variants, the Flink suites, and PrismRunnerTest inherit the test. SparkRunnerTest skips it because portable Spark does not execute SDFs (apache#19468), matching its other SDF test skips. Bump the Dataflow, Flink and Spark Python ValidatesRunner trigger files so the PostCommit suites run on this PR. Includes the UnboundedSource SDF wrapper from apache#38724, which this PR is stacked on.
8446b17 to
a50743f
Compare
Brings Java's ``UnboundedSource`` / ``UnboundedReader`` / ``CheckpointMark`` abstractions to the Python SDK as a Splittable-DoFn wrapper runnable on the portable Fn API (DirectRunner / FnApiRunner). Wires the new source type into ``iobase.Read.expand()`` so ``p | beam.io.Read(my_unbounded_source)`` dispatches alongside the existing ``BoundedSource`` branch. Loosely inspired by Java's ``Read.UnboundedSourceAsSDFWrapperFn``; the streaming-SDF template followed for the process loop / watermark / defer plumbing is ``apache_beam.transforms.periodicsequence``. addresses apache#19137 What's added ------------ ``sdks/python/apache_beam/io/unbounded_source.py`` Public ABCs (``CheckpointMark``, ``UnboundedReader``, ``UnboundedSource``, ``ReadFromUnboundedSource``) plus the SDF wrapper internals (``_UnboundedSourceRestriction``, ``_UnboundedSourceRestrictionCoder``, ``_UnboundedSourceRestrictionTracker``, ``_UnboundedSourceRestrictionProvider``). ``sdks/python/apache_beam/io/unbounded_source_test.py`` Deterministic tests covering ABC contracts, restriction coder round-trip, tracker state machine (claim / split / EOF / no-data / check_done / progress / is_bounded), finalize idempotency, fan-out via ``source.split``, source-watermark vs. record-event-time, finalize vs. resume channel separation, tracker-internal exception close on reader-method failures, DoFn generator close (unit + integration with downstream raising ``Map``), cloudpickle round-trip, circular import in three subprocess orderings, per-bundle read caps, and an end-to-end DirectRunner pipeline. What's changed in iobase.py --------------------------- * ``Read.expand`` gains an ``UnboundedSource`` branch (function-local lazy import to break the iobase <-> unbounded_source cycle) that delegates to ``ReadFromUnboundedSource``. * ``Read.to_runner_api_parameter`` widens the source ``isinstance`` to ``(BoundedSource, UnboundedSource)``, writing ``READ.urn`` + ``ReadPayload(is_bounded=UNBOUNDED)``. The ``BoundedSource`` branch keeps its ``is_bounded()`` conditional so a source reporting unbounded still serializes as UNBOUNDED. Decode rides the existing ``PICKLED_SOURCE`` URN on ``SourceBase``. Correctness covered ------------------- * Data-path watermark uses ``reader.get_watermark()`` (Java ``Read.java:594`` parity), not the per-record event time. Each record is emitted before the watermark estimator advances, so a reader that reports ``MAX_TIMESTAMP`` on the same claim as its final record cannot push the output watermark past that record. * Restriction has separate ``checkpoint_mark`` (resume) and ``finalization_checkpoint_mark`` (commit hook) channels; coder is a fixed 5-tuple. The split residual gets its own clone of the checkpoint mark so a ``finalize_checkpoint()`` that mutates the primary's mark cannot corrupt the residual's resume position. * A continuously busy reader cannot stay in ``process()`` forever: the has-data path is capped at ``max_records_per_bundle`` (10000) or ``max_read_time_seconds`` (10), then self-checkpoints via ``defer_remainder`` so the runner commits progress and resumes. Caps are validated at the transform boundary, and the read deadline is armed on the first emitted record so reader startup is excluded. ``UnboundedReader.advance`` must not block, since the caps are enforced only between records. * Reader is closed on every exit path -- tracker-internal close on EOF / split / reader-method exception; DoFn ``finally`` defense-in-depth for yield / downstream raise. * ``finalize_checkpoint()`` failures are logged and swallowed to match the ``CheckpointMark`` best-effort contract; finalize is idempotent on retry. * EOF advances the watermark estimator to ``MAX_TIMESTAMP`` so downstream event-time windows can close. * ``UnboundedSource.split(desired_num_splits=20, options)`` is honoured; returned sub-sources are validated as ``UnboundedSource``; on split-exception the provider falls back to a single restriction. * ``default_output_coder`` reaches the output PCollection via ``coders.registry.register_coder`` + ``element_type``. Out of scope (tracked under apache#19137) ----------------------------------- Listed exhaustively in the module docstring: * Record-id-based deduplication (Java's ``ValueWithRecordId``). * Backlog-byte reporting (``restriction_size`` is constant 1). * Dynamic split fractions / runner-initiated work stealing. * Source-specific checkpoint coders threaded through the restriction coder. * Reader caching across bundles. * Runner-side ``IsBounded.UNBOUNDED`` dispatch in ``bundle_processor.IMPULSE_READ_TRANSFORM``.
a50743f to
8133a5d
Compare
| if current is not initial and finalize_mark is not None: | ||
| bundle_finalizer.register(_FinalizeCheckpointOnce(finalize_mark)) | ||
| finally: | ||
| # Release the reader on downstream-yield errors. |
There was a problem hiding this comment.
Release the reader on downstream-yield errors.
Realizing this comment might not be accurate. Here close reader happens in finally clause of the DoFn process, not only when error happens. Would it mean reader close happens on every bundle finalization (due to reached max num of elements; no data). Reader should be persistent across bundles for performance reason, and only gets invalidated when there are exceptions / split / DoFn teardown etc
cd14b54 to
e3c0005
Compare
A self-checkpoint parks the live reader in a per-DoFn _ReaderCache keyed by the residual source and checkpoint, and the resuming bundle reclaims it, so one reader stays alive across SDF bundles. The reader is closed only on EOF, an exception, idle or size eviction, or DoFn teardown.
e3c0005 to
dea202a
Compare
The Python SDK has no public
UnboundedSourceAPI: pipelines that need to read a custom unbounded source (Pub/Sub-like queues, CDC feeds, etc.) cannot do so in Python without dropping to the legacy non-portableReadprimitive.This change adds a Splittable-DoFn wrapper that brings Java's
UnboundedSourcesemantics to the Python SDK:UnboundedSource,UnboundedReader,CheckpointMarkand theReadFromUnboundedSourcePTransform. Names are Pythonic; semantics match the Java contract._UnboundedSourceRestrictioncarries source / resume checkpoint / watermark / done flag / finalize checkpoint (fixed 5-tuple coder)._UnboundedSourceRestrictionTrackerdrives the reader, capturesreader.get_watermark()on the data path (JavaRead.java:594parity), closes the reader on EOF / split / reader-method exception, and advances the watermark estimator toMAX_TIMESTAMPon the terminal claim so downstream event-time windows can close.iobase.Read.expanddispatch — a function-local lazy-import branch routesRead(UnboundedSource(...))throughReadFromUnboundedSource, breaking theiobase↔unbounded_sourcecycle.Read.to_runner_api_parameterwidens to(BoundedSource, UnboundedSource), writingREAD.urn+IsBounded.UNBOUNDED. Decode rides the existingPICKLED_SOURCEURN onSourceBase. Runner-side dispatch onUNBOUNDEDinbundle_processor.IMPULSE_READ_TRANSFORMis deferred.ReadFromUnboundedSource.expandcallscoders.registry.register_coder+ setselement_typeso a source'sdefault_output_coderpropagates to the output PCollection._UnboundedSourceRestrictionProvider.splitinvokesUnboundedSource.split(20, options)with strictisinstancevalidation; falls back to a single restriction on split-refusal exceptions.Tests:
unbounded_source_test.py— ABC contracts, restriction coder round-trip, tracker state machine, finalize idempotency, source-watermark vs. record-timestamp regression, finalize / resume channel separation, tracker-internal close onreader.advance/reader.get_watermarkexceptions, DoFnfinallyclose on downstream yield exception (unit viagenerator.close()+ integration with raisingMap), cloudpickle round-trip, circular import in three subprocess orderings, e2e DirectRunner withFixedWindows+GroupByKey.iobase_test.py—Read(UnboundedSource)dispatch through the newexpandbranch;Read.to_runner_api/from_runner_apiround-trip asserting theIsBounded.UNBOUNDEDenum.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.