Skip to content

Commit ed66941

Browse files
authored
Revert #37631 and #38497 in the release branch (#38514)
* Revert "Adds a new coder translator for Java SchemaCoder. (#37631)" This reverts commit 81769cb. * Revert "Sickbay two failed tests due to new schema coder urn. (#38497)" This reverts commit 3dbd7c8.
1 parent 866f800 commit ed66941

18 files changed

Lines changed: 72 additions & 392 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,6 @@
8383

8484
## Breaking Changes
8585

86-
* Portable Java SDK now encodes SchemaCoders in a portable way ([#34672](https://github.com/apache/beam/issues/34672)).
87-
- Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.73")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47) ([#34672](https://github.com/apache/beam/issues/34672)).
88-
- Fixes ([#36496](https://github.com/apache/beam/issues/36496)), ([#30276](https://github.com/apache/beam/issues/30276)), ([#29245](https://github.com/apache/beam/issues/29245)).
8986
* (Python) Made Beartype the default fallback type checking tool. This can be disabled with the `--disable_beartype` pipeline option. ([#38275](https://github.com/apache/beam/issues/38275))
9087
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
9188

@@ -2439,4 +2436,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss
24392436

24402437
## Highlights
24412438

2442-
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
2439+
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public Boolean visit(OrFinallyTrigger trigger) {
221221
private static byte[] serializeWindowingStrategy(
222222
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
223223
try {
224-
SdkComponents sdkComponents = SdkComponents.create(options);
224+
SdkComponents sdkComponents = SdkComponents.create();
225225

226226
String workerHarnessContainerImageURL =
227227
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,20 +1333,19 @@ public DataflowPipelineJob run(Pipeline pipeline) {
13331333
// with the SDK harness image (which implements Fn API).
13341334
//
13351335
// The same Environment is used in different and contradictory ways, depending on whether
1336-
// it is a portable or non-portable job submission.
1336+
// it is a v1 or v2 job submission.
13371337
RunnerApi.Environment defaultEnvironmentForDataflow =
13381338
Environments.createDockerEnvironment(workerHarnessContainerImageURL);
13391339

1340-
// The SdkComponents for portable and non-portable job submission must be kept distinct. Both
1340+
// The SdkComponents for portable an non-portable job submission must be kept distinct. Both
13411341
// need the default environment.
1342-
SdkComponents portableComponents =
1343-
SdkComponents.create(
1344-
options,
1345-
defaultEnvironmentForDataflow
1346-
.toBuilder()
1347-
.addAllDependencies(getDefaultArtifacts())
1348-
.addAllCapabilities(Environments.getJavaCapabilities())
1349-
.build());
1342+
SdkComponents portableComponents = SdkComponents.create();
1343+
portableComponents.registerEnvironment(
1344+
defaultEnvironmentForDataflow
1345+
.toBuilder()
1346+
.addAllDependencies(getDefaultArtifacts())
1347+
.addAllCapabilities(Environments.getJavaCapabilities())
1348+
.build());
13501349

13511350
RunnerApi.Pipeline portablePipelineProto =
13521351
PipelineTranslation.toProto(pipeline, portableComponents, false);
@@ -1375,30 +1374,28 @@ public DataflowPipelineJob run(Pipeline pipeline) {
13751374
options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash);
13761375

13771376
if (useUnifiedWorker(options)) {
1378-
LOG.info(
1379-
"Skipping non-portable transform replacements since job will run on portable worker.");
1377+
LOG.info("Skipping v1 transform replacements since job will run on v2.");
13801378
} else {
1381-
// Now rewrite things to be as needed for non-portable (mutates the pipeline).
1382-
// This way the job submitted is valid for portable and non-portable, simultaneously.
1379+
// Now rewrite things to be as needed for v1 (mutates the pipeline)
1380+
// This way the job submitted is valid for v1 and v2, simultaneously
13831381
replaceV1Transforms(pipeline);
13841382
}
1385-
// Capture the SdkComponents for look up during step translations.
1386-
SdkComponents dataflowNonPortableComponents =
1387-
SdkComponents.create(
1388-
options,
1389-
defaultEnvironmentForDataflow
1390-
.toBuilder()
1391-
.addAllDependencies(getDefaultArtifacts())
1392-
.addAllCapabilities(Environments.getJavaCapabilities())
1393-
.build());
1394-
// No need to perform transform upgrading for the non-portable runner proto.
1395-
RunnerApi.Pipeline dataflowNonPortablePipelineProto =
1396-
PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents, true, false);
1383+
// Capture the SdkComponents for look up during step translations
1384+
SdkComponents dataflowV1Components = SdkComponents.create();
1385+
dataflowV1Components.registerEnvironment(
1386+
defaultEnvironmentForDataflow
1387+
.toBuilder()
1388+
.addAllDependencies(getDefaultArtifacts())
1389+
.addAllCapabilities(Environments.getJavaCapabilities())
1390+
.build());
1391+
// No need to perform transform upgrading for the Runner v1 proto.
1392+
RunnerApi.Pipeline dataflowV1PipelineProto =
1393+
PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false);
13971394

13981395
if (LOG.isDebugEnabled()) {
13991396
LOG.debug(
1400-
"Dataflow non-portable worker pipeline proto:\n{}",
1401-
TextFormat.printer().printToString(dataflowNonPortablePipelineProto));
1397+
"Dataflow v1 pipeline proto:\n{}",
1398+
TextFormat.printer().printToString(dataflowV1PipelineProto));
14021399
}
14031400

14041401
// Set a unique client_request_id in the CreateJob request.
@@ -1418,11 +1415,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
14181415

14191416
JobSpecification jobSpecification =
14201417
translator.translate(
1421-
pipeline,
1422-
dataflowNonPortablePipelineProto,
1423-
dataflowNonPortableComponents,
1424-
this,
1425-
packages);
1418+
pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages);
14261419

14271420
if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) {
14281421
List<String> experiments =

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java

Lines changed: 14 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import com.google.api.services.dataflow.model.Job;
4848
import com.google.api.services.dataflow.model.Step;
4949
import com.google.api.services.dataflow.model.WorkerPool;
50-
import com.google.auto.value.AutoValue;
5150
import java.io.File;
5251
import java.io.IOException;
5352
import java.io.Serializable;
@@ -93,8 +92,6 @@
9392
import org.apache.beam.sdk.options.PipelineOptionsFactory;
9493
import org.apache.beam.sdk.options.StreamingOptions;
9594
import org.apache.beam.sdk.options.ValueProvider;
96-
import org.apache.beam.sdk.schemas.AutoValueSchema;
97-
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
9895
import org.apache.beam.sdk.state.StateSpec;
9996
import org.apache.beam.sdk.state.StateSpecs;
10097
import org.apache.beam.sdk.state.ValueState;
@@ -169,11 +166,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
169166
@Rule public transient ExpectedException thrown = ExpectedException.none();
170167

171168
private SdkComponents createSdkComponents(PipelineOptions options) {
169+
SdkComponents sdkComponents = SdkComponents.create();
170+
172171
String containerImageURL =
173172
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
174173
RunnerApi.Environment defaultEnvironmentForDataflow =
175174
Environments.createDockerEnvironment(containerImageURL);
176-
return SdkComponents.create(options, defaultEnvironmentForDataflow);
175+
176+
sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);
177+
return sdkComponents;
177178
}
178179

179180
// A Custom Mockito matcher for an initial Job that checks that all
@@ -1293,16 +1294,15 @@ public String apply(byte[] input) {
12931294
file1.deleteOnExit();
12941295
File file2 = File.createTempFile("file2-", ".txt");
12951296
file2.deleteOnExit();
1296-
SdkComponents sdkComponents =
1297-
SdkComponents.create(
1298-
options,
1299-
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
1300-
.toBuilder()
1301-
.addAllDependencies(
1302-
Environments.getArtifacts(
1303-
ImmutableList.of("file1.txt=" + file1, "file2.txt=" + file2)))
1304-
.addAllCapabilities(Environments.getJavaCapabilities())
1305-
.build());
1297+
SdkComponents sdkComponents = SdkComponents.create();
1298+
sdkComponents.registerEnvironment(
1299+
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
1300+
.toBuilder()
1301+
.addAllDependencies(
1302+
Environments.getArtifacts(
1303+
ImmutableList.of("file1.txt=" + file1, "file2.txt=" + file2)))
1304+
.addAllCapabilities(Environments.getJavaCapabilities())
1305+
.build());
13061306

13071307
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
13081308

@@ -1870,53 +1870,4 @@ public OffsetRange getInitialRange(@SuppressWarnings("unused") @Element String e
18701870
return null;
18711871
}
18721872
}
1873-
1874-
@AutoValue
1875-
@DefaultSchema(AutoValueSchema.class)
1876-
public abstract static class SimpleAutoValue {
1877-
public abstract String getString();
1878-
1879-
public abstract int getInt32();
1880-
1881-
public abstract long getInt64();
1882-
1883-
public static DataflowPipelineTranslatorTest.SimpleAutoValue of(
1884-
String string, int int32, long int64) {
1885-
return new AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64);
1886-
}
1887-
}
1888-
1889-
@Test
1890-
public void testSchemaCoderTranslation() throws Exception {
1891-
DataflowPipelineOptions options = buildPipelineOptions();
1892-
Pipeline pipeline = Pipeline.create(options);
1893-
pipeline
1894-
.apply(Impulse.create())
1895-
.apply(
1896-
MapElements.via(
1897-
new SimpleFunction<byte[], SimpleAutoValue>() {
1898-
@Override
1899-
public SimpleAutoValue apply(byte[] input) {
1900-
return SimpleAutoValue.of("foo", 5, 10L);
1901-
}
1902-
}))
1903-
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
1904-
{
1905-
SdkComponents sdkComponents = createSdkComponents(options);
1906-
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
1907-
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
1908-
assertTrue(coders.containsKey("SchemaCoder"));
1909-
assertEquals("beam:coder:schema:v1", coders.get("SchemaCoder").getSpec().getUrn());
1910-
}
1911-
1912-
// Prior to version 2.74, SchemaCoders are translated as custom java coders.
1913-
{
1914-
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.73");
1915-
SdkComponents sdkComponents = createSdkComponents(options);
1916-
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
1917-
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
1918-
assertTrue(coders.containsKey("SchemaCoder"));
1919-
assertEquals("beam:coders:javasdk:0.1", coders.get("SchemaCoder").getSpec().getUrn());
1920-
}
1921-
}
19221873
}

runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ public void testLengthPrefixingOfKeyCoderInStatefulExecutableStage() throws Exce
7878
// Add another stateful stage with a non-standard key coder
7979
Pipeline p = Pipeline.create();
8080
Coder<Void> keycoder = VoidCoder.of();
81-
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
82-
assertThat(coderRegistrar.isKnownCoder(keycoder, p.getOptions()), is(false));
81+
assertThat(ModelCoderRegistrar.isKnownCoder(keycoder), is(false));
8382
p.apply("impulse", Impulse.create())
8483
.apply(
8584
"create",
@@ -166,8 +165,7 @@ public void onTimer() {}
166165
public void testLengthPrefixingOfInputCoderExecutableStage() throws Exception {
167166
Pipeline p = Pipeline.create();
168167
Coder<Void> voidCoder = VoidCoder.of();
169-
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
170-
assertThat(coderRegistrar.isKnownCoder(voidCoder, p.getOptions()), is(false));
168+
assertThat(ModelCoderRegistrar.isKnownCoder(voidCoder), is(false));
171169
p.apply("impulse", Impulse.create())
172170
.apply(
173171
ParDo.of(

runners/portability/java/build.gradle

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,6 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = ""
214214
// TODO(https://github.com/apache/beam/issues/31231)
215215
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
216216

217-
// TODO(https://github.com/apache/beam/issues/33859): Failed with "KeyError: 'beam:coder:schema:v1'".
218-
// New schema coder urn is not yet supported in runners other than dataflow
219-
excludeTestsMatching 'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithShuffle'
220-
excludeTestsMatching 'org.apache.beam.sdk.transforms.PerKeyOrderingTest.testMultipleStatefulOrderingWithoutShuffle'
221-
222217
for (String test : sickbayTests) {
223218
excludeTestsMatching test
224219
}

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java

Lines changed: 4 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@
2525
import org.apache.beam.model.pipeline.v1.RunnerApi;
2626
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
2727
import org.apache.beam.sdk.coders.Coder;
28-
import org.apache.beam.sdk.options.PipelineOptions;
2928
import org.apache.beam.sdk.util.SerializableUtils;
3029
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
3130
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
3231
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
3332
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
34-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
3533
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3634
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
3735
import org.checkerframework.dataflow.qual.Deterministic;
@@ -64,8 +62,6 @@ private static class DefaultTranslationContext implements TranslationContext {}
6462

6563
private static @MonotonicNonNull BiMap<Class<? extends Coder>, String> knownCoderUrns;
6664

67-
private static @MonotonicNonNull List<CoderTranslatorRegistrar> coderTranslatorRegistrars;
68-
6965
private static @MonotonicNonNull Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
7066
knownTranslators;
7167

@@ -84,53 +80,6 @@ static BiMap<Class<? extends Coder>, String> getKnownCoderUrns() {
8480
return knownCoderUrns;
8581
}
8682

87-
private static void initializeCoderTranslatorRegistrars() {
88-
ImmutableList.Builder<CoderTranslatorRegistrar> registrars = ImmutableList.builder();
89-
for (CoderTranslatorRegistrar coderTranslatorRegistrar :
90-
ServiceLoader.load(CoderTranslatorRegistrar.class)) {
91-
registrars.add(coderTranslatorRegistrar);
92-
}
93-
coderTranslatorRegistrars = registrars.build();
94-
}
95-
96-
static boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
97-
if (coderTranslatorRegistrars == null) {
98-
initializeCoderTranslatorRegistrars();
99-
}
100-
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
101-
if (registrar.isKnownCoder(coder, options)) {
102-
return true;
103-
}
104-
}
105-
return false;
106-
}
107-
108-
static CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder> coderClass) {
109-
if (coderTranslatorRegistrars == null) {
110-
initializeCoderTranslatorRegistrars();
111-
}
112-
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
113-
CoderTranslator translator = registrar.getCoderTranslator(coderClass);
114-
if (translator != null) {
115-
return translator;
116-
}
117-
}
118-
return null;
119-
}
120-
121-
static Class<? extends Coder> getCoderForUrn(String coderUrn) {
122-
if (coderTranslatorRegistrars == null) {
123-
initializeCoderTranslatorRegistrars();
124-
}
125-
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
126-
Class<? extends Coder> coder = registrar.getCoderForUrn(coderUrn);
127-
if (coder != null) {
128-
return coder;
129-
}
130-
}
131-
return null;
132-
}
133-
13483
@VisibleForTesting
13584
@Deterministic
13685
static Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getKnownTranslators() {
@@ -158,7 +107,7 @@ public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOE
158107

159108
public static RunnerApi.Coder toProto(Coder<?> coder, SdkComponents components)
160109
throws IOException {
161-
if (isKnownCoder(coder, components.getPipelineOptions())) {
110+
if (getKnownCoderUrns().containsKey(coder.getClass())) {
162111
return toKnownCoder(coder, components);
163112
}
164113

@@ -180,10 +129,7 @@ private static RunnerApi.Coder toUnknownCoderWrapper(UnknownCoderWrapper coder)
180129

181130
private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
182131
throws IOException {
183-
CoderTranslator translator = getCoderTranslator(coder.getClass());
184-
if (translator == null) {
185-
throw new IOException("Unable to find CoderTranslator for known Coder");
186-
}
132+
CoderTranslator translator = getKnownTranslators().get(coder.getClass());
187133
List<String> componentIds = registerComponents(coder, translator, components);
188134
return RunnerApi.Coder.newBuilder()
189135
.addAllComponentCoderIds(componentIds)
@@ -240,8 +186,8 @@ private static Coder<?> fromKnownCoder(
240186
components.getComponents().getCodersOrThrow(componentId), components, context);
241187
coderComponents.add(innerCoder);
242188
}
243-
Class<? extends Coder> coderType = getCoderForUrn(coderUrn);
244-
CoderTranslator<?> translator = getCoderTranslator(coderType);
189+
Class<? extends Coder> coderType = getKnownCoderUrns().inverse().get(coderUrn);
190+
CoderTranslator<?> translator = getKnownTranslators().get(coderType);
245191
if (translator != null) {
246192
return translator.fromComponents(
247193
coderComponents, coder.getSpec().getPayload().toByteArray(), context);

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
* additional payload, which is not currently supported. This exists as a temporary measure.
2929
*/
3030
public interface CoderTranslator<T extends Coder<?>> {
31-
3231
/** Extract all component {@link Coder coders} within a coder. */
3332
List<? extends Coder<?>> getComponents(T from);
3433

0 commit comments

Comments
 (0)