Skip to content

Commit 1ae3fe7

Browse files
committed
[Python] Add UnboundedSource ValidatesRunner test on portable runners
Add test_unbounded_source_read to PortableRunnerTest so the portable ValidatesRunner suites exercise the UnboundedSource SDF wrapper end to end: read a self-terminating source through the job service, assert the elements and that the EOF MAX_TIMESTAMP watermark lets a downstream FixedWindows + GroupByKey fire. The embedded portable runner variants, the Flink suites, and PrismRunnerTest inherit the test. SparkRunnerTest skips it because portable Spark does not execute SDFs (#19468), matching its other SDF test skips.
1 parent 6e6a40c commit 1ae3fe7

1 file changed

Lines changed: 26 additions & 0 deletions

File tree

sdks/python/apache_beam/runners/portability/portable_runner_test.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import grpc
3131

3232
import apache_beam as beam
33+
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
3334
from apache_beam.options.pipeline_options import DebugOptions
3435
from apache_beam.options.pipeline_options import DirectOptions
3536
from apache_beam.options.pipeline_options import PipelineOptions
@@ -47,6 +48,7 @@
4748
from apache_beam.testing.util import equal_to
4849
from apache_beam.transforms import environments
4950
from apache_beam.transforms import userstate
51+
from apache_beam.transforms import window
5052

5153
_LOGGER = logging.getLogger(__name__)
5254

@@ -209,6 +211,30 @@ def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)):
209211
| beam.ParDo(AddIndex()),
210212
equal_to(expected))
211213

214+
def test_unbounded_source_read(self):
215+
"""Reads a self-terminating UnboundedSource through the job service.
216+
217+
Complements the DirectRunner tests in apache_beam.io.unbounded_source_test
218+
by exercising the runner-API round trip and the EOF ``MAX_TIMESTAMP``
219+
watermark propagation that lets the downstream window fire.
220+
"""
221+
if type(self).__name__ == 'SparkRunnerTest':
222+
# Portable Spark does not execute SDFs, so SparkRunnerTest skips every
223+
# SDF test (see its test_sdf* overrides).
224+
raise unittest.SkipTest('https://github.com/apache/beam/issues/19468')
225+
with self.create_pipeline() as p:
226+
out = p | beam.io.Read(UnboundedCountingSource(5))
227+
self.assertFalse(out.is_bounded)
228+
assert_that(out, equal_to([0, 1, 2, 3, 4]), label='assert_elements')
229+
windowed = (
230+
out
231+
| beam.WindowInto(window.FixedWindows(100))
232+
| beam.Map(lambda v: ('all', v))
233+
| beam.GroupByKey()
234+
| beam.MapTuple(lambda _key, values: sorted(values)))
235+
assert_that(
236+
windowed, equal_to([[0, 1, 2, 3, 4]]), label='assert_windowed')
237+
212238
# Inherits all other tests from fn_api_runner_test.FnApiRunnerTest
213239

214240
def test_sdf_default_truncate_when_bounded(self):

0 commit comments

Comments
 (0)