Skip to content

Commit 1ee587c

Browse files
committed
[Python] Add ValidatesRunner coverage for UnboundedSource SDF wrapper
Mark EndToEndTest with it_validatesrunner and no_sickbay_batch so the Dataflow ValidatesRunner streaming suite exercises the wrapper end to end; the unbounded source requires --streaming, so it is excluded from the batch suite. Add test_unbounded_source_read to PortableRunnerTest so portable ValidatesRunner covers the wrapper. Prism and the DirectRunner execute it; Flink and Spark skip with documented runner limitations: SDF watermark tracking (BEAM-2939) and bundle finalization (#19526) on Flink, and SDF execution on portable Spark (#19468).
1 parent 99e3cd0 commit 1ee587c

2 files changed

Lines changed: 39 additions & 0 deletions

File tree

sdks/python/apache_beam/io/unbounded_source_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import logging
2525
import unittest
2626

27+
import pytest
2728
from typing_extensions import override
2829

2930
import apache_beam as beam
@@ -732,6 +733,8 @@ def test_finalize_checkpoint_invoked(self):
732733
self.assertEqual(finalize_log, [1])
733734

734735

736+
@pytest.mark.it_validatesrunner
737+
@pytest.mark.no_sickbay_batch
735738
class EndToEndTest(unittest.TestCase):
736739
def test_direct_runner_emits_all_in_order(self):
737740
with TestPipeline() as p:

sdks/python/apache_beam/runners/portability/portable_runner_test.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import grpc
3131

3232
import apache_beam as beam
33+
from apache_beam.io.unbounded_source_test import UnboundedCountingSource
3334
from apache_beam.options.pipeline_options import DebugOptions
3435
from apache_beam.options.pipeline_options import DirectOptions
3536
from apache_beam.options.pipeline_options import PipelineOptions
@@ -47,6 +48,7 @@
4748
from apache_beam.testing.util import equal_to
4849
from apache_beam.transforms import environments
4950
from apache_beam.transforms import userstate
51+
from apache_beam.transforms import window
5052

5153
_LOGGER = logging.getLogger(__name__)
5254

@@ -209,6 +211,40 @@ def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)):
209211
| beam.ParDo(AddIndex()),
210212
equal_to(expected))
211213

214+
def test_unbounded_source_read(self):
215+
"""Reads a self-terminating UnboundedSource through the job service.
216+
217+
Complements the DirectRunner tests in apache_beam.io.unbounded_source_test
218+
by exercising the runner-API round trip and the EOF ``MAX_TIMESTAMP``
219+
watermark propagation that lets the downstream window fire.
220+
"""
221+
if type(self).__name__ in {
222+
'FlinkRunnerTest',
223+
'FlinkRunnerTestOptimized',
224+
'FlinkRunnerTestStreaming',
225+
'SparkRunnerTest',
226+
}:
227+
# Flink lacks the SDF watermark tracking this source relies on
228+
# (BEAM-2939; all Flink variants, including Streaming, skip
229+
# test_sdf_with_watermark_tracking) and bundle finalization
230+
# (https://github.com/apache/beam/issues/19526); portable Spark does not
231+
# execute SDFs (https://github.com/apache/beam/issues/19468).
232+
raise unittest.SkipTest(
233+
'Requires SDF watermark tracking and bundle finalization (Flink) '
234+
'and SDF execution (Spark).')
235+
with self.create_pipeline() as p:
236+
out = p | beam.io.Read(UnboundedCountingSource(5))
237+
self.assertFalse(out.is_bounded)
238+
assert_that(out, equal_to([0, 1, 2, 3, 4]), label='assert_elements')
239+
windowed = (
240+
out
241+
| beam.WindowInto(window.FixedWindows(100))
242+
| beam.Map(lambda v: ('all', v))
243+
| beam.GroupByKey()
244+
| beam.MapTuple(lambda _key, values: sorted(values)))
245+
assert_that(
246+
windowed, equal_to([[0, 1, 2, 3, 4]]), label='assert_windowed')
247+
212248
# Inherits all other tests from fn_api_runner_test.FnApiRunnerTest
213249

214250
def test_sdf_default_truncate_when_bounded(self):

0 commit comments

Comments
 (0)