Skip to content

Commit c23d077

Browse files
committed
test: make downstream-raise reader-close test deterministic
The integration test asserted that reader.close() runs within 5s after a downstream Map raises mid-bundle. On that path the SDF DoFn generator is abandoned by the harness (no throw/close), so its finally -- and thus reader.close() -- runs only when the generator is garbage-collected. That frame is pinned by the in-flight exception traceback, and under a loopback/subprocess SDK worker the reader may live in another process whose GC the test cannot drive, so a bounded close-time assertion is inherently racy (failed on Linux CI and locally). Drop the racy close assertion from the integration test; it now asserts only the deterministic guarantee -- that the downstream exception propagates. Authoritative reader-close-on-drop coverage stays in the deterministic test_dofn_finally_closes_reader_on_generator_close, which drives the generator directly and calls generator.close() to force the GeneratorExit/finally path. Renamed the integration test accordingly and documented why close is not asserted there.
1 parent 8871dc5 commit c23d077

1 file changed

Lines changed: 32 additions & 32 deletions

File tree

sdks/python/apache_beam/io/unbounded_source_test.py

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,12 +1010,22 @@ class DoFnReaderCloseOnDownstreamRaiseTest(unittest.TestCase):
10101010
the generator (no explicit ``throw``); the generator's ``finally`` runs
10111011
when the generator is closed (``GeneratorExit``) or garbage collected.
10121012
1013-
We exercise that path two ways:
1014-
1. Unit-level: simulate the harness drop with ``generator.close()``
1015-
(raises ``GeneratorExit`` at the active yield, running ``finally``).
1016-
2. Integration: run a real pipeline with a downstream ``Map`` that
1017-
raises, and confirm the reader was closed before the pipeline
1018-
surfaced the error.
1013+
Reader-close coverage is split by what can be asserted deterministically:
1014+
1. Unit-level (``test_dofn_finally_closes_reader_on_generator_close``):
1015+
drive the generator directly and call ``generator.close()`` to force
1016+
the ``GeneratorExit``/``finally`` path. This is synchronous and
1017+
deterministic, and is the authoritative guarantee that the DoFn's
1018+
``finally`` closes the reader.
1019+
2. Integration (``test_downstream_raise_surfaces_through_pipeline``):
1020+
run a real pipeline with a raising downstream ``Map`` and assert the
1021+
error propagates. We deliberately do NOT assert prompt reader-close
1022+
here: on this path the generator's ``finally`` runs only when the
1023+
abandoned generator is GARBAGE-COLLECTED, which is not deterministic
1024+
(the frame is pinned by the in-flight exception's traceback), and under
1025+
a loopback / subprocess SDK worker the reader may live in a different
1026+
process whose GC the test process cannot drive. Asserting a bounded
1027+
close time there is inherently racy -- it is the unit test above that
1028+
pins down close correctness.
10191029
"""
10201030
def test_dofn_finally_closes_reader_on_generator_close(self):
10211031
marker = _new_marker_path('.gen_close.log')
@@ -1050,34 +1060,24 @@ def test_dofn_finally_closes_reader_on_generator_close(self):
10501060
if os.path.exists(marker):
10511061
os.unlink(marker)
10521062

1053-
def test_dofn_finally_closes_reader_via_integration_pipeline(self):
1054-
"""End-to-end harness coverage: a real pipeline with a downstream
1055-
``Map`` that raises must surface the exception AND must have closed
1056-
the reader. This complements the unit-level ``generator.close`` test
1057-
above by exercising the actual SDK harness output-handler path
1058-
(``common._OutputHandler.handle_process_outputs``).
1063+
def test_downstream_raise_surfaces_through_pipeline(self):
1064+
"""End-to-end harness smoke test: a real pipeline with a downstream
1065+
``Map`` that raises mid-bundle must surface the exception (errors on the
1066+
SDF read path are not swallowed). Reader-close on this path is GC-deferred
1067+
and therefore not asserted here -- see the class docstring and the
1068+
deterministic ``test_dofn_finally_closes_reader_on_generator_close``.
10591069
"""
1060-
marker = _new_marker_path('.integration_close.log')
1070+
raised = False
10611071
try:
1062-
raised = False
1063-
try:
1064-
with beam.Pipeline() as p:
1065-
_ = (
1066-
p
1067-
| ReadFromUnboundedSource(_MarkerCloseSource(marker))
1068-
| 'BoomMap' >> beam.Map(_downstream_boom))
1069-
except Exception: # pylint: disable=broad-except
1070-
raised = True
1071-
self.assertTrue(
1072-
raised, 'pipeline did not surface the downstream Map exception')
1073-
self.assertTrue(
1074-
_wait_for_marker(marker),
1075-
'reader leaked across the integration pipeline -- the SDK '
1076-
'harness path that drops the DoFn generator on downstream '
1077-
'failure did not trigger our finally close.')
1078-
finally:
1079-
if os.path.exists(marker):
1080-
os.unlink(marker)
1072+
with beam.Pipeline() as p:
1073+
_ = (
1074+
p
1075+
| ReadFromUnboundedSource(_MarkerCloseSource())
1076+
| 'BoomMap' >> beam.Map(_downstream_boom))
1077+
except Exception: # pylint: disable=broad-except
1078+
raised = True
1079+
self.assertTrue(
1080+
raised, 'pipeline did not surface the downstream Map exception')
10811081

10821082

10831083
# ------------------------------------------------------------------------------

0 commit comments

Comments
 (0)