Skip to content

Commit 009e37e

Browse files
committed
Pin flaky Python unit tests to FnApiRunner to avoid Prism gRPC deadline timeouts
1 parent 9fe425f commit 009e37e

File tree

4 files changed

+23
-5
lines changed

4 files changed

+23
-5
lines changed

sdks/python/apache_beam/io/gcp/bigquery_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,14 +716,16 @@ def test_query_job_exception(self, exception_type, error_message):
716716
])
717717
def test_read_export_exception(self, exception_type, error_message):
718718

719+
# Mocked BigQuery + Prism can flake in CI; use in-process DirectRunner path.
719720
with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource,
720721
'estimate_size') as mock_estimate,\
721722
mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, 'Get'),\
722723
mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
723724
'Insert') as mock_query_job, \
724725
mock.patch('time.sleep'), \
725726
self.assertRaises(Exception) as exc,\
726-
beam.Pipeline() as p:
727+
beam.Pipeline(options=PipelineOptions(
728+
[], direct_runner_use_prism=False)) as p:
727729

728730
mock_estimate.return_value = None
729731
mock_query_job.side_effect = exception_type(error_message)

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,15 @@ def _add_argparse_args(cls, parser):
996996
'(in seconds) at which the split requests will be sent, and '
997997
'fractions is a corresponding list of floating points to use in the '
998998
'split requests themselves.')
999+
parser.add_argument(
1000+
'--no_direct_runner_use_prism',
1001+
action='store_false',
1002+
dest='direct_runner_use_prism',
1003+
default=True,
1004+
help='SwitchingDirectRunner will not execute supported batch pipelines '
1005+
'on PrismRunner (avoids the portable/gRPC subprocess path). Prefer this '
1006+
'when Prism is slow or flaky, e.g. in constrained or highly parallel '
1007+
'test environments.')
9991008

10001009

10011010
class GoogleCloudOptions(PipelineOptions):

sdks/python/apache_beam/runners/direct/direct_runner.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,10 @@ def visit_transform(self, applied_ptransform):
205205

206206
# Check whether all transforms used in the pipeline are supported by the
207207
# PrismRunner
208-
if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
208+
direct_opts = options.view_as(DirectOptions)
209+
use_prism = getattr(direct_opts, 'direct_runner_use_prism', True)
210+
if (use_prism and
211+
_PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive)):
209212
_LOGGER.info('Running pipeline with PrismRunner.')
210213
from apache_beam.runners.portability import prism_runner
211214
runner = prism_runner.PrismRunner()

sdks/python/apache_beam/yaml/yaml_transform_unit_test.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,13 @@
5050

5151

5252
def new_pipeline():
53+
# Default DirectRunner may run batch workloads on Prism (gRPC subprocess);
54+
# disable for stable unit tests (DirectOptions.direct_runner_use_prism).
5355
return beam.Pipeline(
5456
options=beam.options.pipeline_options.PipelineOptions(
55-
pickle_library='cloudpickle'))
57+
[],
58+
pickle_library='cloudpickle',
59+
direct_runner_use_prism=False))
5660

5761

5862
@unittest.skipIf(jsonschema is None, "Yaml dependencies not installed")
@@ -652,7 +656,7 @@ def test_preprocess_windowing_custom_type(self):
652656
size: 4
653657
input: {{input: input}}
654658
output: {result['transforms'][0]['__uuid__']}
655-
config:
659+
config:
656660
error_handling: {{}}
657661
'''
658662
self.assertYaml(expected, result)
@@ -775,7 +779,7 @@ def test_preprocess_windowing_other_type_with_no_inputs(self):
775779
type: fixed
776780
size: 4
777781
output: {result['transforms'][1]["__uuid__"]}
778-
config:
782+
config:
779783
error_handling: {{}}
780784
'''
781785
self.maxDiff = 1e9

0 commit comments

Comments
 (0)