Skip to content

[Python] Add UnboundedSource SDF wrapper (#19137)#38724

Open
Eliaaazzz wants to merge 2 commits into
apache:masterfrom
Eliaaazzz:gsoc-week1-minimal-poc
Open

[Python] Add UnboundedSource SDF wrapper (#19137)#38724
Eliaaazzz wants to merge 2 commits into
apache:masterfrom
Eliaaazzz:gsoc-week1-minimal-poc

Conversation

@Eliaaazzz

@Eliaaazzz Eliaaazzz commented May 28, 2026

Copy link
Copy Markdown
Contributor

The Python SDK has no public UnboundedSource API: pipelines that need to read a custom unbounded source (Pub/Sub-like queues, CDC feeds, etc.) cannot do so in Python without dropping to the legacy non-portable Read primitive.

This change adds a Splittable-DoFn wrapper that brings Java's UnboundedSource semantics to the Python SDK:

  • Public ABCs — new UnboundedSource, UnboundedReader, CheckpointMark and the ReadFromUnboundedSource PTransform. Names are Pythonic; semantics match the Java contract.
  • SDF wrapper_UnboundedSourceRestriction carries source / resume checkpoint / watermark / done flag / finalize checkpoint (fixed 5-tuple coder). _UnboundedSourceRestrictionTracker drives the reader, captures reader.get_watermark() on the data path (Java Read.java:594 parity), closes the reader on EOF / split / reader-method exception, and advances the watermark estimator to MAX_TIMESTAMP on the terminal claim so downstream event-time windows can close.
  • iobase.Read.expand dispatch — a function-local lazy-import branch routes Read(UnboundedSource(...)) through ReadFromUnboundedSource, breaking the iobaseunbounded_source cycle.
  • Runner-APIRead.to_runner_api_parameter widens to (BoundedSource, UnboundedSource), writing READ.urn + IsBounded.UNBOUNDED. Decode rides the existing PICKLED_SOURCE URN on SourceBase. Runner-side dispatch on UNBOUNDED in bundle_processor.IMPULSE_READ_TRANSFORM is deferred.
  • Output coderReadFromUnboundedSource.expand calls coders.registry.register_coder + sets element_type so a source's default_output_coder propagates to the output PCollection.
  • Initial fan-out_UnboundedSourceRestrictionProvider.split invokes UnboundedSource.split(20, options) with strict isinstance validation; falls back to a single restriction on split-refusal exceptions.

Tests:

  • unbounded_source_test.py — ABC contracts, restriction coder round-trip, tracker state machine, finalize idempotency, source-watermark vs. record-timestamp regression, finalize / resume channel separation, tracker-internal close on reader.advance / reader.get_watermark exceptions, DoFn finally close on downstream yield exception (unit via generator.close() + integration with raising Map), cloudpickle round-trip, circular import in three subprocess orderings, e2e DirectRunner with FixedWindows + GroupByKey.
  • iobase_test.pyRead(UnboundedSource) dispatch through the new expand branch; Read.to_runner_api / from_runner_api round-trip asserting the IsBounded.UNBOUNDED enum.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: `addresses [BEAM-121] Add DisplayData for IO transforms #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead.
  • Update `CHANGES.md` with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request brings streaming source capabilities to the Python SDK by implementing the UnboundedSource abstraction via a Splittable DoFn wrapper. This change allows developers to define unbounded sources that are compatible with the portable Fn API, significantly improving the parity between Python and Java streaming I/O implementations.

Highlights

  • UnboundedSource Abstractions: Introduced new abstract base classes including UnboundedSource, UnboundedReader, and CheckpointMark to the Python SDK, mirroring Java's streaming I/O model.
  • SDF Wrapper Implementation: Implemented a Splittable DoFn wrapper that enables unbounded sources to run on portable runners like DirectRunner and FnApiRunner.
  • Integration with iobase: Updated iobase.Read.expand to automatically dispatch UnboundedSource instances, ensuring seamless integration with existing pipeline construction patterns.
  • Testing and Validation: Added a comprehensive suite of unit and integration tests covering source contracts, restriction tracker state machines, and runner API round-trip compatibility.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a minimal, self-contained UnboundedSource implementation for the Python SDK, allowing Java-like unbounded source abstractions to run on the portable Fn API path via a Splittable DoFn. The feedback identifies several key improvement opportunities: refactoring _UnboundedSourceRestrictionCoder to decode the source dynamically so that the provider and DoFn can be defined as standard module-level classes (avoiding potential serialization issues with standard pickle); correcting the instantiation of RestrictionProgress to use completed_work and remaining_work instead of fraction; registering the source's coder on the pipeline-specific coder_registry rather than the global registry to prevent side effects; and improving the robustness of the dynamic tracker unwrapping logic.

Comment on lines +288 to +331
class _UnboundedSourceRestrictionCoder(Coder):
"""Encodes :class:`_UnboundedSourceRestriction` as a fixed 5-tuple.

Shape: pickled source + nullable resume checkpoint (encoded with the
source's own checkpoint coder if provided, else pickle) + watermark +
done flag + nullable finalization checkpoint (same coder as resume).
"""
def __init__(self, checkpoint_mark_coder: Optional[Coder] = None):
nullable_checkpoint = NullableCoder(
checkpoint_mark_coder or _MemoizingPickleCoder())
self._tuple_coder = TupleCoder((
_MemoizingPickleCoder(), # source
nullable_checkpoint, # checkpoint_mark (RESUME state, may be None)
TimestampCoder(), # watermark
BooleanCoder(), # is_done
nullable_checkpoint)) # finalization_checkpoint_mark (commit hook)

def encode(self, restriction: '_UnboundedSourceRestriction') -> bytes:
return self._tuple_coder.encode((
restriction.source,
restriction.checkpoint_mark,
restriction.watermark,
restriction.is_done,
restriction.finalization_checkpoint_mark))

def decode(self, encoded: bytes) -> '_UnboundedSourceRestriction':
(source, checkpoint_mark, watermark, is_done,
finalization_checkpoint_mark) = self._tuple_coder.decode(encoded)
return _UnboundedSourceRestriction(
source=source,
checkpoint_mark=checkpoint_mark,
watermark=watermark,
is_done=is_done,
finalization_checkpoint_mark=finalization_checkpoint_mark)

def is_deterministic(self) -> bool:
# The source and checkpoint are pickled, which is not guaranteed
# deterministic; matches the bounded SDF restriction coder in iobase.py.
# NOTE on forward-compat: the wire format is a fixed 5-tuple. Adding a
# 6th field in a future version would break decoding of in-flight blobs
# from older workers. If/when another field is needed, switch this to a
# length-prefixed or version-tagged encoding -- out of scope for W1.
return False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _UnboundedSourceRestrictionCoder currently requires the checkpoint_mark_coder at initialization time. This forces the _UnboundedSourceRestrictionProvider and the _ReadFromUnboundedSourceDoFn to be defined dynamically inside expand() to close over the source-specific coder. Defining DoFns inside methods can cause serialization issues with standard pickle on some runners.

By making _UnboundedSourceRestrictionCoder dynamic (decoding the source first, then using its coder to decode the checkpoint marks), we can remove the initialization dependency and define both the provider and the DoFn as standard module-level classes.

class _UnboundedSourceRestrictionCoder(Coder):
  """Encodes :class:`_UnboundedSourceRestriction` as a fixed 5-tuple.

  Shape: pickled source + nullable resume checkpoint (encoded with the
  source's own checkpoint coder if provided, else pickle) + watermark +
  done flag + nullable finalization checkpoint (same coder as resume).
  """
  def encode(self, restriction: '_UnboundedSourceRestriction') -> bytes:
    from apache_beam.coders.coders import BytesCoder
    source_coder = _MemoizingPickleCoder()
    source_bytes = source_coder.encode(restriction.source)
    
    checkpoint_coder = NullableCoder(restriction.source.get_checkpoint_mark_coder())
    checkpoint_bytes = checkpoint_coder.encode(restriction.checkpoint_mark)
    finalize_bytes = checkpoint_coder.encode(restriction.finalization_checkpoint_mark)
    
    return TupleCoder((
        BytesCoder(),
        BytesCoder(),
        TimestampCoder(),
        BooleanCoder(),
        BytesCoder()
    )).encode((
        source_bytes,
        checkpoint_bytes,
        restriction.watermark,
        restriction.is_done,
        finalize_bytes
    ))

  def decode(self, encoded: bytes) -> '_UnboundedSourceRestriction':
    from apache_beam.coders.coders import BytesCoder
    source_bytes, checkpoint_bytes, watermark, is_done, finalize_bytes = TupleCoder((
        BytesCoder(),
        BytesCoder(),
        TimestampCoder(),
        BooleanCoder(),
        BytesCoder()
    )).decode(encoded)
    
    source = _MemoizingPickleCoder().decode(source_bytes)
    checkpoint_coder = NullableCoder(source.get_checkpoint_mark_coder())
    
    checkpoint_mark = checkpoint_coder.decode(checkpoint_bytes)
    finalization_checkpoint_mark = checkpoint_coder.decode(finalize_bytes)
    
    return _UnboundedSourceRestriction(
        source=source,
        checkpoint_mark=checkpoint_mark,
        watermark=watermark,
        is_done=is_done,
        finalization_checkpoint_mark=finalization_checkpoint_mark
    )

  def is_deterministic(self) -> bool:
    return False

Comment on lines +525 to +529
def current_progress(self) -> 'iobase.RestrictionProgress':
# Backlog-based progress is out of scope; report a coarse done/not-done
# fraction so the runner has a (recommended) signal.
return iobase.RestrictionProgress(
fraction=1.0 if self._restriction.is_done else 0.0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In Apache Beam, RestrictionProgress does not accept a fraction parameter. Its standard parameters are completed_work and remaining_work. Passing fraction directly will result in it being ignored or placed in **kwargs without being recognized by the runner.

Suggested change
def current_progress(self) -> 'iobase.RestrictionProgress':
# Backlog-based progress is out of scope; report a coarse done/not-done
# fraction so the runner has a (recommended) signal.
return iobase.RestrictionProgress(
fraction=1.0 if self._restriction.is_done else 0.0)
def current_progress(self) -> 'iobase.RestrictionProgress':
# Backlog-based progress is out of scope; report a coarse done/not-done
# signal so the runner has a (recommended) signal.
if self._restriction.is_done:
return iobase.RestrictionProgress(completed_work=1.0, remaining_work=0.0)
return iobase.RestrictionProgress(completed_work=0.0, remaining_work=1.0)

Comment on lines +758 to +760
if type_hint is not None:
try:
coders.registry.register_coder(type_hint, type(output_coder))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Modifying the global coders.registry can cause side effects across different pipelines running in the same process (e.g., in multi-tenant service environments or unit tests). It is safer to register the coder on the pipeline-specific coder_registry.

Suggested change
if type_hint is not None:
try:
coders.registry.register_coder(type_hint, type(output_coder))
if type_hint is not None:
try:
pbegin.pipeline.coder_registry.register_coder(type_hint, type(output_coder))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline has no coder_registry attr

Comment on lines +713 to +740
finally:
current = tracker.current_restriction()
# Register finalization only when a real checkpoint was cut this
# bundle. Restriction identity (`current is not initial`) mirrors
# Java's reference-equality gate in Read.java. We read the explicit
# finalization channel, NOT ``checkpoint_mark`` (which is the
# RESUME state and may belong to the residual after a split).
finalize_mark = current.finalization_checkpoint_mark
if current is not initial and finalize_mark is not None:
bundle_finalizer.register(finalize_mark.finalize_checkpoint)
# Release the underlying reader on every exit path, including the
# exception path where a downstream yield raised between two
# try_claim calls (reader-method failures are already closed inside
# the tracker). ``RestrictionTrackerView`` does not expose the inner
# tracker, so traverse the (stable-but-private) wrapper chain. If
# the chain changes in a future Beam version we log a warning and
# let GC eventually close; never call ``close`` on an unrelated
# tracker subclass.
threadsafe = getattr(tracker, '_threadsafe_restriction_tracker', None)
inner_tracker = getattr(threadsafe, '_restriction_tracker', None)
if isinstance(inner_tracker, _UnboundedSourceRestrictionTracker):
inner_tracker._close_reader_if_open()
elif inner_tracker is not None or threadsafe is not None:
_LOGGER.warning(
'UnboundedSource DoFn could not reach the inner tracker via '
'_threadsafe_restriction_tracker._restriction_tracker; reader '
'close on exception path skipped, relying on GC. Beam SDF '
'wrapper internals may have changed -- file an issue.')

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to unwrap the inner tracker from tracker assumes a specific nesting structure (_threadsafe_restriction_tracker -> _restriction_tracker). If the tracker is not wrapped (e.g., in some testing or direct execution scenarios), or if the wrapper chain changes, this lookup will fail. A more robust approach is to check for the attributes dynamically and fallback gracefully.

        finally:
          current = tracker.current_restriction()
          # Register finalization only when a real checkpoint was cut this
          # bundle. Restriction identity (`current is not initial`) mirrors
          # Java's reference-equality gate in Read.java. We read the explicit
          # finalization channel, NOT ``checkpoint_mark`` (which is the
          # RESUME state and may belong to the residual after a split).
          finalize_mark = current.finalization_checkpoint_mark
          if current is not initial and finalize_mark is not None:
            bundle_finalizer.register(finalize_mark.finalize_checkpoint)
          # Release the underlying reader on every exit path, including the
          # exception path where a downstream yield raised between two
          # try_claim calls (reader-method failures are already closed inside
          # the tracker). ``RestrictionTrackerView`` does not expose the inner
          # tracker, so traverse the (stable-but-private) wrapper chain. If
          # the chain changes in a future Beam version we log a warning and
          # let GC eventually close; never call ``close`` on an unrelated
          # tracker subclass.
          inner_tracker = tracker
          if hasattr(inner_tracker, '_threadsafe_restriction_tracker'):
            inner_tracker = getattr(inner_tracker, '_threadsafe_restriction_tracker')
          if hasattr(inner_tracker, '_restriction_tracker'):
            inner_tracker = getattr(inner_tracker, '_restriction_tracker')
          if isinstance(inner_tracker, _UnboundedSourceRestrictionTracker):
            inner_tracker._close_reader_if_open()
          else:
            _LOGGER.warning(
                'UnboundedSource DoFn could not reach the inner tracker; reader '
                'close on exception path skipped, relying on GC. Beam SDF '
                'wrapper internals may have changed -- file an issue.')

@github-actions

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@Eliaaazzz Eliaaazzz force-pushed the gsoc-week1-minimal-poc branch from dcbcdd5 to 9a71653 Compare May 28, 2026 14:03
Comment thread sdks/python/apache_beam/io/iobase.py Outdated
@@ -918,7 +919,11 @@ def __init__(self, source: SourceBase) -> None:
"""Initializes a Read transform.

Args:
source: Data source to read from.
source: Data source to read from. A ``BoundedSource`` is wrapped in the
bounded SDF reader; an ``UnboundedSource`` is dispatched through

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pydoc audiences are Beam users, not necessarily mentioning the implement details for BoundedSource or UnboundedSource (like bounded SDF reader etc)

However it's worthwhile to mention source can be a BoundedSource, UnboundedSource, PTransform, or treat Read itself as a primitive (relay to runner implementation)

# limitations under the License.
#

"""A minimal, self-contained ``UnboundedSource`` for the Python SDK.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, pydoc is user faced documentation. Information regarding backgrounds (GSoC, Java inspired) are less relevant. Also this API is publicly faced. We don't use phrasing like "A minimal, self-contained" here

Instead, consider formal statement like UnboundedSource support is currently experimental in Python SDK.

``UnboundedSourceRestriction(source, checkpoint, watermark)`` plus an explicit
``is_done`` flag for the terminal (MAX-watermark) transition and a separate
``finalization_checkpoint_mark`` so a done primary can carry a commit-hook
without polluting the RESUME-state ``checkpoint_mark`` (matches W1 design

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(matches W1 design ...

Not suitable for pydoc. We need a clean-up of all pydoc and comments

finalization_checkpoint_mark=None)

def restriction_size(self, element, restriction) -> int:
# Backlog estimation is out of scope; report a constant non-negative size.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need further check if this statement is true. Nevertheless please use TODO convention e.g. TODO(https://github.com/apache/beam/issues/19137) implement backlog estimation

holder = [None]
if not tracker.try_claim(holder):
# EOF (restriction is_done==True, watermark already set to MAX in
# the tracker). Mirrors Java Read.java:625 -- advance the

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirrors Java Read.java:625 <-- these kind of comments are not necessary and will soon out-of-date as code base evolves.

@core.DoFn.unbounded_per_element()
def process(
self,
unused_element,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the element is unused. We may not need to have Impulse | core.Map(lambda _: source) at all in ReadFromUnboundedSource

Comment thread CHANGES.md Outdated

## I/Os

* (Python) Added a Splittable-DoFn `UnboundedSource` wrapper bringing Java's `UnboundedSource` semantics (checkpoint-based pause/resume, monotonic watermark reporting, bundle finalization) to the Python SDK via the new `apache_beam.io.unbounded_source` module (`UnboundedSource`, `UnboundedReader`, `CheckpointMark`, `ReadFromUnboundedSource`). Runs on the portable Fn API path (e.g. the default DirectRunner) ([#19137](https://github.com/apache/beam/issues/19137)).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can defer the announcement after milestones completed (at least ValidatesRunner tests confirmed passing)

from apache_beam.io.unbounded_source import (
UnboundedSource, UnboundedReader, CheckpointMark, ReadFromUnboundedSource)

class MySource(UnboundedSource):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module doc is an important entry point showing how to use these classes.

List the necessarily methods for MySource implementation here.

As it involves create_reader(...), also add examples of UnboundedReader here


with beam.Pipeline() as p:
p | ReadFromUnboundedSource(MySource()) | beam.Map(print)
# Equivalent (since iobase.Read.expand dispatches on source type):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just recommend beam.io.Read(MySource()) in public doc.

I notice there is a polling configuration in ReadFromUnboundedSource transform. However there is no Java correspondence in BoundedSourceAsSDFWrapperFn. Is this something new or what is the consideration here?

Instead, Java implementation has a DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS of 10 minutes controls the frequency of self-checkpointing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I revisited this and removed poll_interval_seconds from the public
ReadFromUnboundedSource API.

The Python wrapper still needs an internal delay on the no-data path because the portable SDF implementation
resumes work via tracker.defer_remainder(...); if no delay is provided, the residual is scheduled immediately. This is now just a private _DEFAULT_POLL_INTERVAL_SECONDS constant used by the wrapper, so users should not
configure it and should normally use beam.io.Read(MySource()).

I also understand DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS in Java as the bundle-finalization callback deadline rather than a user-facing polling option, so I did not add a corresponding public option in Python in the lastest commit.

windows can close), checkpoint-based pause/resume (``defer_remainder``),
deterministic reader close on EOF / split / exception, and bundle finalization.

Out of scope for this PoC (tracked under #19137):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Out of scope for this PoC ..." statements not suitable in a public pydoc, see above

@Eliaaazzz Eliaaazzz force-pushed the gsoc-week1-minimal-poc branch 2 times, most recently from d294106 to 85f10ef Compare June 1, 2026 14:08
@github-actions

github-actions Bot commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @tvalentyn for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Eliaaazzz

Eliaaazzz commented Jun 2, 2026

Copy link
Copy Markdown
Contributor Author

Hi @Abacn I pushed an update addressing your latest review feedback.

Summary:

  • cleaned user-facing pydoc and private comments
  • kept ReadFromUnboundedSource docs focused on beam.io.Read
  • added warning coverage for close errors while preserving processing failures
  • updated UnboundedCountingSource tests with realistic timestamps, @override usage, and optional even/odd splitting
  • kept CHANGES.md unchanged

Verified:
python -m pytest -q sdks/python/apache_beam/io/unbounded_source_test.py sdks/python/apache_beam/io/iobase_test.py -q

Could you please take another look when you have time?

@Eliaaazzz Eliaaazzz force-pushed the gsoc-week1-minimal-poc branch from e800e7b to 0750e11 Compare June 2, 2026 13:31
Comment thread sdks/python/apache_beam/io/iobase_test.py Outdated
Comment thread sdks/python/apache_beam/io/iobase_test.py Outdated
Comment thread sdks/python/apache_beam/io/unbounded_source.py Outdated
Comment thread sdks/python/apache_beam/io/unbounded_source_test.py Outdated
Comment thread sdks/python/apache_beam/io/unbounded_source_test.py
Comment thread sdks/python/apache_beam/io/iobase.py Outdated
@Eliaaazzz

Eliaaazzz commented Jun 4, 2026

Copy link
Copy Markdown
Contributor Author

Hi @Abacn, thanks for the continued review.

I pushed one squashed follow-up commit after the reviewed commit for incremental review.

Changes:

  • made the unbounded Read boundedness test graph-only
  • moved portability imports to module scope
  • removed new-file pytype skips
  • implemented an at-most-once guard for checkpoint finalization and added regression coverage
  • removed the generic pickle/subprocess/syntax tests
  • kept UnboundedSource Read on the expanded composite runner API path, with a regression test for avoiding deprecated READ serialization

Verified locally:

  • python -m pytest -q sdks/python/apache_beam/io/unbounded_source_test.py sdks/python/apache_beam/io/iobase_test.py -q
  • ./gradlew :pythonFormatterPreCommit --console=plain
  • ./gradlew :pythonLintPreCommit --console=plain

I would appreciate another look when you have time. Thanks again.

@Eliaaazzz Eliaaazzz force-pushed the gsoc-week1-minimal-poc branch 3 times, most recently from 79add18 to 99e3cd0 Compare June 8, 2026 13:58
Eliaaazzz added a commit to Eliaaazzz/beam that referenced this pull request Jun 10, 2026
…urce wrapper

Bump trigger files to run PostCommit ValidatesRunner on PR apache#38724. Revert before merge.
@github-actions github-actions Bot added runners and removed runners labels Jun 10, 2026
@Eliaaazzz Eliaaazzz force-pushed the gsoc-week1-minimal-poc branch from 1dc2bc6 to 99e3cd0 Compare June 10, 2026 12:21
Eliaaazzz added a commit to Eliaaazzz/beam that referenced this pull request Jun 10, 2026
Add test_unbounded_source_read to PortableRunnerTest so the portable
ValidatesRunner suites exercise the UnboundedSource SDF wrapper end to
end: read a self-terminating source through the job service, assert the
elements and that the EOF MAX_TIMESTAMP watermark lets a downstream
FixedWindows + GroupByKey fire.

The embedded portable runner variants, the Flink suites, and
PrismRunnerTest inherit the test. SparkRunnerTest skips it because
portable Spark does not execute SDFs (apache#19468), matching its other SDF
test skips.

Includes the UnboundedSource SDF wrapper from apache#38724, which this PR is
stacked on.
Eliaaazzz added a commit to Eliaaazzz/beam that referenced this pull request Jun 12, 2026
Add test_unbounded_source_read to PortableRunnerTest so the portable
ValidatesRunner suites exercise the UnboundedSource SDF wrapper end to
end: read a self-terminating source through the job service, assert the
elements and that the EOF MAX_TIMESTAMP watermark lets a downstream
FixedWindows + GroupByKey fire.

The embedded portable runner variants, the Flink suites, and
PrismRunnerTest inherit the test. SparkRunnerTest skips it because
portable Spark does not execute SDFs (apache#19468), matching its other SDF
test skips.

Bump the Dataflow, Flink and Spark Python ValidatesRunner trigger files
so the PostCommit suites run on this PR.

Includes the UnboundedSource SDF wrapper from apache#38724, which this PR is
stacked on.
@Eliaaazzz Eliaaazzz force-pushed the gsoc-week1-minimal-poc branch from 8446b17 to a50743f Compare June 14, 2026 11:49
Brings Java's ``UnboundedSource`` / ``UnboundedReader`` / ``CheckpointMark``
abstractions to the Python SDK as a Splittable-DoFn wrapper runnable on the
portable Fn API (DirectRunner / FnApiRunner). Wires the new source type into
``iobase.Read.expand()`` so ``p | beam.io.Read(my_unbounded_source)``
dispatches alongside the existing ``BoundedSource`` branch. Loosely inspired
by Java's ``Read.UnboundedSourceAsSDFWrapperFn``; the streaming-SDF template
followed for the process loop / watermark / defer plumbing is
``apache_beam.transforms.periodicsequence``.

addresses apache#19137

What's added
------------

``sdks/python/apache_beam/io/unbounded_source.py``

  Public ABCs (``CheckpointMark``, ``UnboundedReader``, ``UnboundedSource``,
  ``ReadFromUnboundedSource``) plus the SDF wrapper internals
  (``_UnboundedSourceRestriction``, ``_UnboundedSourceRestrictionCoder``,
  ``_UnboundedSourceRestrictionTracker``,
  ``_UnboundedSourceRestrictionProvider``).

``sdks/python/apache_beam/io/unbounded_source_test.py``

  Deterministic tests covering ABC contracts, restriction coder round-trip,
  tracker state machine (claim / split / EOF / no-data / check_done /
  progress / is_bounded), finalize idempotency, fan-out via ``source.split``,
  source-watermark vs. record-event-time, finalize vs. resume channel
  separation, tracker-internal exception close on reader-method failures,
  DoFn generator close (unit + integration with downstream raising ``Map``),
  cloudpickle round-trip, circular import in three subprocess orderings,
  per-bundle read caps, and an end-to-end DirectRunner pipeline.

What's changed in iobase.py
---------------------------

* ``Read.expand`` gains an ``UnboundedSource`` branch (function-local lazy
  import to break the iobase <-> unbounded_source cycle) that delegates to
  ``ReadFromUnboundedSource``.
* ``Read.to_runner_api_parameter`` widens the source ``isinstance`` to
  ``(BoundedSource, UnboundedSource)``, writing ``READ.urn`` +
  ``ReadPayload(is_bounded=UNBOUNDED)``. The ``BoundedSource`` branch keeps
  its ``is_bounded()`` conditional so a source reporting unbounded still
  serializes as UNBOUNDED. Decode rides the existing ``PICKLED_SOURCE`` URN
  on ``SourceBase``.

Correctness covered
-------------------

* Data-path watermark uses ``reader.get_watermark()`` (Java ``Read.java:594``
  parity), not the per-record event time. Each record is emitted before the
  watermark estimator advances, so a reader that reports ``MAX_TIMESTAMP`` on
  the same claim as its final record cannot push the output watermark past
  that record.
* Restriction has separate ``checkpoint_mark`` (resume) and
  ``finalization_checkpoint_mark`` (commit hook) channels; coder is a fixed
  5-tuple. The split residual gets its own clone of the checkpoint mark so a
  ``finalize_checkpoint()`` that mutates the primary's mark cannot corrupt the
  residual's resume position.
* A continuously busy reader cannot stay in ``process()`` forever: the
  has-data path is capped at ``max_records_per_bundle`` (10000) or
  ``max_read_time_seconds`` (10), then self-checkpoints via
  ``defer_remainder`` so the runner commits progress and resumes. Caps are
  validated at the transform boundary, and the read deadline is armed on the
  first emitted record so reader startup is excluded. ``UnboundedReader.advance``
  must not block, since the caps are enforced only between records.
* Reader is closed on every exit path -- tracker-internal close on EOF /
  split / reader-method exception; DoFn ``finally`` defense-in-depth for
  yield / downstream raise.
* ``finalize_checkpoint()`` failures are logged and swallowed to match the
  ``CheckpointMark`` best-effort contract; finalize is idempotent on retry.
* EOF advances the watermark estimator to ``MAX_TIMESTAMP`` so downstream
  event-time windows can close.
* ``UnboundedSource.split(desired_num_splits=20, options)`` is honoured;
  returned sub-sources are validated as ``UnboundedSource``; on
  split-exception the provider falls back to a single restriction.
* ``default_output_coder`` reaches the output PCollection via
  ``coders.registry.register_coder`` + ``element_type``.

Out of scope (tracked under apache#19137)
-----------------------------------

Listed exhaustively in the module docstring:

* Record-id-based deduplication (Java's ``ValueWithRecordId``).
* Backlog-byte reporting (``restriction_size`` is constant 1).
* Dynamic split fractions / runner-initiated work stealing.
* Source-specific checkpoint coders threaded through the restriction coder.
* Reader caching across bundles.
* Runner-side ``IsBounded.UNBOUNDED`` dispatch in
  ``bundle_processor.IMPULSE_READ_TRANSFORM``.
@Eliaaazzz Eliaaazzz force-pushed the gsoc-week1-minimal-poc branch from a50743f to 8133a5d Compare June 16, 2026 04:54
if current is not initial and finalize_mark is not None:
bundle_finalizer.register(_FinalizeCheckpointOnce(finalize_mark))
finally:
# Release the reader on downstream-yield errors.

@Abacn Abacn Jun 16, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Release the reader on downstream-yield errors.

Realizing this comment might not be accurate. Here close reader happens in finally clause of the DoFn process, not only when error happens. Would it mean reader close happens on every bundle finalization (due to reached max num of elements; no data). Reader should be persistent across bundles for performance reason, and only gets invalidated when there are exceptions / split / DoFn teardown etc

@Eliaaazzz Eliaaazzz force-pushed the gsoc-week1-minimal-poc branch from cd14b54 to e3c0005 Compare June 17, 2026 02:35
A self-checkpoint parks the live reader in a per-DoFn _ReaderCache keyed by the
residual source and checkpoint, and the resuming bundle reclaims it, so one
reader stays alive across SDF bundles. The reader is closed only on EOF, an
exception, idle or size eviction, or DoFn teardown.
@Eliaaazzz Eliaaazzz force-pushed the gsoc-week1-minimal-poc branch from e3c0005 to dea202a Compare June 17, 2026 02:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants