Skip to content

Commit 486de0f

Browse files
authored
Reenable prism as default (#35621)
* Reenable prism as default * Fix race condition * Spotless * Portable test * Fix a few new test issues * Update CHANGES
1 parent 73b1090 commit 486de0f

6 files changed

Lines changed: 31 additions & 21 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 11
3+
"modification": 12
44
}

CHANGES.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464

6565
* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
6666
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
67+
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
6768

6869
## I/Os
6970

@@ -81,6 +82,7 @@
8182
* Upgraded Beam vendored Calcite to 1.40.0 for Beam SQL ([#35483](https://github.com/apache/beam/issues/35483)), which
8283
improves support for BigQuery and other SQL dialects. Note: Minor behavior changes are observed such as output
8384
significant digits related to casting.
85+
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
8486

8587
## Deprecations
8688

@@ -100,7 +102,6 @@
100102

101103
* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
102104
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
103-
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
104105

105106
## I/Os
106107

@@ -127,7 +128,6 @@
127128

128129
## Breaking Changes
129130

130-
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
131131
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
132132
* Go: The pubsubio.Read transform now accepts ReadOptions as a value type instead of a pointer, and requires exactly one of Topic or Subscription to be set (they are mutually exclusive). Additionally, the ReadOptions struct now includes a Topic field for specifying the topic directly, replacing the previous topic parameter in the Read function signature ([#35369])(https://github.com/apache/beam/pull/35369).
133133
* SQL: The `ParquetTable` external table provider has changed its handling of the `LOCATION` property. To read from a directory, the path must now end with a trailing slash (e.g., `LOCATION '/path/to/data/'`). Previously, a trailing slash was not required. This change was made to enable support for glob patterns and single-file paths ([#35582])(https://github.com/apache/beam/pull/35582).

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Map;
3030
import java.util.Set;
3131
import java.util.concurrent.CompletableFuture;
32-
import java.util.concurrent.ExecutorService;
3332
import java.util.function.Function;
3433
import javax.annotation.Nullable;
3534
import org.apache.beam.fn.harness.control.BeamFnControlClient;
@@ -64,6 +63,7 @@
6463
import org.apache.beam.sdk.options.ExperimentalOptions;
6564
import org.apache.beam.sdk.options.PipelineOptions;
6665
import org.apache.beam.sdk.options.SdkHarnessOptions;
66+
import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
6767
import org.apache.beam.sdk.util.construction.CoderTranslation;
6868
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
6969
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.TextFormat;
@@ -276,8 +276,8 @@ public static void main(
276276

277277
IdGenerator idGenerator = IdGenerators.decrementingLongs();
278278
ShortIdMap metricsShortIds = new ShortIdMap();
279-
ExecutorService executorService =
280-
options.as(ExecutorOptions.class).getScheduledExecutorService();
279+
UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
280+
options.as(ExecutorOptions.class).setScheduledExecutorService(executorService);
281281
CompletableFuture<Void> samplerTerminationFuture = new CompletableFuture<>();
282282
ExecutionStateSampler executionStateSampler =
283283
new ExecutionStateSampler(

sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,9 @@ def dynamic_destination_resolver(element, *side_inputs):
884884
# For now we don't care about the return value.
885885
mock_insert_copy_job.return_value = None
886886

887-
with TestPipeline('DirectRunner') as p:
887+
# Pin to FnApiRunner for now to make mocks act appropriately.
888+
# TODO(https://github.com/apache/beam/issues/34549)
889+
with TestPipeline('FnApiRunner') as p:
888890
_ = (
889891
p
890892
| beam.Create([

sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def test_default_schema_missing_embedding(self):
117117
Chunk(id="1", content=Content(text="foo"), metadata={"a": "b"}),
118118
Chunk(id="2", content=Content(text="bar"), metadata={"c": "d"})
119119
]
120-
with self.assertRaises(ValueError):
120+
with self.assertRaisesRegex(Exception, "must contain dense embedding"):
121121
with beam.Pipeline() as p:
122122
_ = (p | beam.Create(chunks) | config.create_write_transform())
123123

sdks/python/apache_beam/runners/direct/direct_runner.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ def visit_transform(self, applied_ptransform):
184184
for state in state_specs:
185185
if isinstance(state, userstate.CombiningValueStateSpec):
186186
self.supported_by_prism_runner = False
187+
if isinstance(
188+
dofn,
189+
beam.transforms.combiners._PartialGroupByKeyCombiningValues):
190+
if len(transform.side_inputs) > 0:
191+
# Prism doesn't support side input combiners (this is within spec)
192+
self.supported_by_prism_runner = False
193+
187194
# TODO(https://github.com/apache/beam/issues/33623): Prism seems to
188195
# not handle session windows correctly. Examples are:
189196
# util_test.py::ReshuffleTest::test_reshuffle_window_fn_preserved
@@ -195,21 +202,9 @@ def visit_transform(self, applied_ptransform):
195202
# Use BundleBasedDirectRunner if other runners are missing needed features.
196203
runner = BundleBasedDirectRunner()
197204

198-
# Check whether all transforms used in the pipeline are supported by the
199-
# FnApiRunner, and the pipeline was not meant to be run as streaming.
200-
if _FnApiRunnerSupportVisitor().accept(pipeline):
201-
from apache_beam.portability.api import beam_provision_api_pb2
202-
from apache_beam.runners.portability.fn_api_runner import fn_runner
203-
from apache_beam.runners.portability.portable_runner import JobServiceHandle
204-
all_options = options.get_all_options()
205-
encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
206-
provision_info = fn_runner.ExtendedProvisionInfo(
207-
beam_provision_api_pb2.ProvisionInfo(
208-
pipeline_options=encoded_options))
209-
runner = fn_runner.FnApiRunner(provision_info=provision_info)
210205
# Check whether all transforms used in the pipeline are supported by the
211206
# PrismRunner
212-
elif _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
207+
if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
213208
_LOGGER.info('Running pipeline with PrismRunner.')
214209
from apache_beam.runners.portability import prism_runner
215210
runner = prism_runner.PrismRunner()
@@ -233,6 +228,19 @@ def visit_transform(self, applied_ptransform):
233228
_LOGGER.info('Falling back to DirectRunner')
234229
runner = BundleBasedDirectRunner()
235230

231+
# Check whether all transforms used in the pipeline are supported by the
232+
# FnApiRunner, and the pipeline was not meant to be run as streaming.
233+
if _FnApiRunnerSupportVisitor().accept(pipeline):
234+
from apache_beam.portability.api import beam_provision_api_pb2
235+
from apache_beam.runners.portability.fn_api_runner import fn_runner
236+
from apache_beam.runners.portability.portable_runner import JobServiceHandle
237+
all_options = options.get_all_options()
238+
encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
239+
provision_info = fn_runner.ExtendedProvisionInfo(
240+
beam_provision_api_pb2.ProvisionInfo(
241+
pipeline_options=encoded_options))
242+
runner = fn_runner.FnApiRunner(provision_info=provision_info)
243+
236244
return runner.run_pipeline(pipeline, options)
237245

238246

0 commit comments

Comments
 (0)