Skip to content

Commit 81769cb

Browse files
authored
Adds a new coder translator for Java SchemaCoder. (#37631)
* Adds a new coder translator for Java SchemaCoder. Adds PipelineOptions to translation context so we can disable the new translator based on pipeline compatibility version. * Refactors ModelCoderRegistrars so decisions about known coders are made there instead of in the individual CoderTranslators. Removed SdkComponents from getCoderTranslator and getCoderForUrn since it wasn't needed yet. * Adds option to pass environment to SdkComponents. Otherwise, we register a default environment in the constructor, which was preventing DataflowRunner from registering its own custom environment. * Fixup DataflowPipelineTranslatorTest to also use the new SdkComponent constructor.
1 parent fba639a commit 81769cb

17 files changed

Lines changed: 387 additions & 72 deletions

File tree

CHANGES.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@
8282

8383
## Breaking Changes
8484

85+
* Portable Java SDK now encodes SchemaCoders in a portable way ([#34672](https://github.com/apache/beam/issues/34672)).
86+
- Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.73")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47) ([#34672](https://github.com/apache/beam/issues/34672)).
87+
- Fixes ([#36496](https://github.com/apache/beam/issues/36496)), ([#30276](https://github.com/apache/beam/issues/30276)), ([#29245](https://github.com/apache/beam/issues/29245)).
8588
* (Python) Made Beartype the default fallback type checking tool. This can be disabled with the `--disable_beartype` pipeline option. ([#38275](https://github.com/apache/beam/issues/38275))
8689
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
8790

@@ -2435,4 +2438,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss
24352438

24362439
## Highlights
24372440

2438-
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
2441+
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public Boolean visit(OrFinallyTrigger trigger) {
221221
private static byte[] serializeWindowingStrategy(
222222
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
223223
try {
224-
SdkComponents sdkComponents = SdkComponents.create();
224+
SdkComponents sdkComponents = SdkComponents.create(options);
225225

226226
String workerHarnessContainerImageURL =
227227
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,19 +1333,20 @@ public DataflowPipelineJob run(Pipeline pipeline) {
13331333
// with the SDK harness image (which implements Fn API).
13341334
//
13351335
// The same Environment is used in different and contradictory ways, depending on whether
1336-
// it is a v1 or v2 job submission.
1336+
// it is a portable or non-portable job submission.
13371337
RunnerApi.Environment defaultEnvironmentForDataflow =
13381338
Environments.createDockerEnvironment(workerHarnessContainerImageURL);
13391339

1340-
// The SdkComponents for portable an non-portable job submission must be kept distinct. Both
1340+
// The SdkComponents for portable and non-portable job submission must be kept distinct. Both
13411341
// need the default environment.
1342-
SdkComponents portableComponents = SdkComponents.create();
1343-
portableComponents.registerEnvironment(
1344-
defaultEnvironmentForDataflow
1345-
.toBuilder()
1346-
.addAllDependencies(getDefaultArtifacts())
1347-
.addAllCapabilities(Environments.getJavaCapabilities())
1348-
.build());
1342+
SdkComponents portableComponents =
1343+
SdkComponents.create(
1344+
options,
1345+
defaultEnvironmentForDataflow
1346+
.toBuilder()
1347+
.addAllDependencies(getDefaultArtifacts())
1348+
.addAllCapabilities(Environments.getJavaCapabilities())
1349+
.build());
13491350

13501351
RunnerApi.Pipeline portablePipelineProto =
13511352
PipelineTranslation.toProto(pipeline, portableComponents, false);
@@ -1374,28 +1375,30 @@ public DataflowPipelineJob run(Pipeline pipeline) {
13741375
options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash);
13751376

13761377
if (useUnifiedWorker(options)) {
1377-
LOG.info("Skipping v1 transform replacements since job will run on v2.");
1378+
LOG.info(
1379+
"Skipping non-portable transform replacements since job will run on portable worker.");
13781380
} else {
1379-
// Now rewrite things to be as needed for v1 (mutates the pipeline)
1380-
// This way the job submitted is valid for v1 and v2, simultaneously
1381+
// Now rewrite things to be as needed for non-portable (mutates the pipeline).
1382+
// This way the job submitted is valid for portable and non-portable, simultaneously.
13811383
replaceV1Transforms(pipeline);
13821384
}
1383-
// Capture the SdkComponents for look up during step translations
1384-
SdkComponents dataflowV1Components = SdkComponents.create();
1385-
dataflowV1Components.registerEnvironment(
1386-
defaultEnvironmentForDataflow
1387-
.toBuilder()
1388-
.addAllDependencies(getDefaultArtifacts())
1389-
.addAllCapabilities(Environments.getJavaCapabilities())
1390-
.build());
1391-
// No need to perform transform upgrading for the Runner v1 proto.
1392-
RunnerApi.Pipeline dataflowV1PipelineProto =
1393-
PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false);
1385+
// Capture the SdkComponents for look up during step translations.
1386+
SdkComponents dataflowNonPortableComponents =
1387+
SdkComponents.create(
1388+
options,
1389+
defaultEnvironmentForDataflow
1390+
.toBuilder()
1391+
.addAllDependencies(getDefaultArtifacts())
1392+
.addAllCapabilities(Environments.getJavaCapabilities())
1393+
.build());
1394+
// No need to perform transform upgrading for the non-portable runner proto.
1395+
RunnerApi.Pipeline dataflowNonPortablePipelineProto =
1396+
PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents, true, false);
13941397

13951398
if (LOG.isDebugEnabled()) {
13961399
LOG.debug(
1397-
"Dataflow v1 pipeline proto:\n{}",
1398-
TextFormat.printer().printToString(dataflowV1PipelineProto));
1400+
"Dataflow non-portable worker pipeline proto:\n{}",
1401+
TextFormat.printer().printToString(dataflowNonPortablePipelineProto));
13991402
}
14001403

14011404
// Set a unique client_request_id in the CreateJob request.
@@ -1415,7 +1418,11 @@ public DataflowPipelineJob run(Pipeline pipeline) {
14151418

14161419
JobSpecification jobSpecification =
14171420
translator.translate(
1418-
pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages);
1421+
pipeline,
1422+
dataflowNonPortablePipelineProto,
1423+
dataflowNonPortableComponents,
1424+
this,
1425+
packages);
14191426

14201427
if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) {
14211428
List<String> experiments =

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.google.api.services.dataflow.model.Job;
4848
import com.google.api.services.dataflow.model.Step;
4949
import com.google.api.services.dataflow.model.WorkerPool;
50+
import com.google.auto.value.AutoValue;
5051
import java.io.File;
5152
import java.io.IOException;
5253
import java.io.Serializable;
@@ -92,6 +93,8 @@
9293
import org.apache.beam.sdk.options.PipelineOptionsFactory;
9394
import org.apache.beam.sdk.options.StreamingOptions;
9495
import org.apache.beam.sdk.options.ValueProvider;
96+
import org.apache.beam.sdk.schemas.AutoValueSchema;
97+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
9598
import org.apache.beam.sdk.state.StateSpec;
9699
import org.apache.beam.sdk.state.StateSpecs;
97100
import org.apache.beam.sdk.state.ValueState;
@@ -166,15 +169,11 @@ public class DataflowPipelineTranslatorTest implements Serializable {
166169
@Rule public transient ExpectedException thrown = ExpectedException.none();
167170

168171
private SdkComponents createSdkComponents(PipelineOptions options) {
169-
SdkComponents sdkComponents = SdkComponents.create();
170-
171172
String containerImageURL =
172173
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
173174
RunnerApi.Environment defaultEnvironmentForDataflow =
174175
Environments.createDockerEnvironment(containerImageURL);
175-
176-
sdkComponents.registerEnvironment(defaultEnvironmentForDataflow);
177-
return sdkComponents;
176+
return SdkComponents.create(options, defaultEnvironmentForDataflow);
178177
}
179178

180179
// A Custom Mockito matcher for an initial Job that checks that all
@@ -1294,15 +1293,16 @@ public String apply(byte[] input) {
12941293
file1.deleteOnExit();
12951294
File file2 = File.createTempFile("file2-", ".txt");
12961295
file2.deleteOnExit();
1297-
SdkComponents sdkComponents = SdkComponents.create();
1298-
sdkComponents.registerEnvironment(
1299-
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
1300-
.toBuilder()
1301-
.addAllDependencies(
1302-
Environments.getArtifacts(
1303-
ImmutableList.of("file1.txt=" + file1, "file2.txt=" + file2)))
1304-
.addAllCapabilities(Environments.getJavaCapabilities())
1305-
.build());
1296+
SdkComponents sdkComponents =
1297+
SdkComponents.create(
1298+
options,
1299+
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
1300+
.toBuilder()
1301+
.addAllDependencies(
1302+
Environments.getArtifacts(
1303+
ImmutableList.of("file1.txt=" + file1, "file2.txt=" + file2)))
1304+
.addAllCapabilities(Environments.getJavaCapabilities())
1305+
.build());
13061306

13071307
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
13081308

@@ -1870,4 +1870,53 @@ public OffsetRange getInitialRange(@SuppressWarnings("unused") @Element String e
18701870
return null;
18711871
}
18721872
}
1873+
1874+
@AutoValue
1875+
@DefaultSchema(AutoValueSchema.class)
1876+
public abstract static class SimpleAutoValue {
1877+
public abstract String getString();
1878+
1879+
public abstract int getInt32();
1880+
1881+
public abstract long getInt64();
1882+
1883+
public static DataflowPipelineTranslatorTest.SimpleAutoValue of(
1884+
String string, int int32, long int64) {
1885+
return new AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64);
1886+
}
1887+
}
1888+
1889+
@Test
1890+
public void testSchemaCoderTranslation() throws Exception {
1891+
DataflowPipelineOptions options = buildPipelineOptions();
1892+
Pipeline pipeline = Pipeline.create(options);
1893+
pipeline
1894+
.apply(Impulse.create())
1895+
.apply(
1896+
MapElements.via(
1897+
new SimpleFunction<byte[], SimpleAutoValue>() {
1898+
@Override
1899+
public SimpleAutoValue apply(byte[] input) {
1900+
return SimpleAutoValue.of("foo", 5, 10L);
1901+
}
1902+
}))
1903+
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
1904+
{
1905+
SdkComponents sdkComponents = createSdkComponents(options);
1906+
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
1907+
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
1908+
assertTrue(coders.containsKey("SchemaCoder"));
1909+
assertEquals("beam:coder:schema:v1", coders.get("SchemaCoder").getSpec().getUrn());
1910+
}
1911+
1912+
// Prior to version 2.74, SchemaCoders are translated as custom java coders.
1913+
{
1914+
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.73");
1915+
SdkComponents sdkComponents = createSdkComponents(options);
1916+
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
1917+
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
1918+
assertTrue(coders.containsKey("SchemaCoder"));
1919+
assertEquals("beam:coders:javasdk:0.1", coders.get("SchemaCoder").getSpec().getUrn());
1920+
}
1921+
}
18731922
}

runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptorsTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public void testLengthPrefixingOfKeyCoderInStatefulExecutableStage() throws Exce
7878
// Add another stateful stage with a non-standard key coder
7979
Pipeline p = Pipeline.create();
8080
Coder<Void> keycoder = VoidCoder.of();
81-
assertThat(ModelCoderRegistrar.isKnownCoder(keycoder), is(false));
81+
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
82+
assertThat(coderRegistrar.isKnownCoder(keycoder, p.getOptions()), is(false));
8283
p.apply("impulse", Impulse.create())
8384
.apply(
8485
"create",
@@ -165,7 +166,8 @@ public void onTimer() {}
165166
public void testLengthPrefixingOfInputCoderExecutableStage() throws Exception {
166167
Pipeline p = Pipeline.create();
167168
Coder<Void> voidCoder = VoidCoder.of();
168-
assertThat(ModelCoderRegistrar.isKnownCoder(voidCoder), is(false));
169+
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
170+
assertThat(coderRegistrar.isKnownCoder(voidCoder, p.getOptions()), is(false));
169171
p.apply("impulse", Impulse.create())
170172
.apply(
171173
ParDo.of(

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import org.apache.beam.model.pipeline.v1.RunnerApi;
2626
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
2727
import org.apache.beam.sdk.coders.Coder;
28+
import org.apache.beam.sdk.options.PipelineOptions;
2829
import org.apache.beam.sdk.util.SerializableUtils;
2930
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
3031
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
3132
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
3233
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
34+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
3335
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3436
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
3537
import org.checkerframework.dataflow.qual.Deterministic;
@@ -62,6 +64,8 @@ private static class DefaultTranslationContext implements TranslationContext {}
6264

6365
private static @MonotonicNonNull BiMap<Class<? extends Coder>, String> knownCoderUrns;
6466

67+
private static @MonotonicNonNull List<CoderTranslatorRegistrar> coderTranslatorRegistrars;
68+
6569
private static @MonotonicNonNull Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
6670
knownTranslators;
6771

@@ -80,6 +84,53 @@ static BiMap<Class<? extends Coder>, String> getKnownCoderUrns() {
8084
return knownCoderUrns;
8185
}
8286

87+
private static void initializeCoderTranslatorRegistrars() {
88+
ImmutableList.Builder<CoderTranslatorRegistrar> registrars = ImmutableList.builder();
89+
for (CoderTranslatorRegistrar coderTranslatorRegistrar :
90+
ServiceLoader.load(CoderTranslatorRegistrar.class)) {
91+
registrars.add(coderTranslatorRegistrar);
92+
}
93+
coderTranslatorRegistrars = registrars.build();
94+
}
95+
96+
static boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
97+
if (coderTranslatorRegistrars == null) {
98+
initializeCoderTranslatorRegistrars();
99+
}
100+
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
101+
if (registrar.isKnownCoder(coder, options)) {
102+
return true;
103+
}
104+
}
105+
return false;
106+
}
107+
108+
static CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder> coderClass) {
109+
if (coderTranslatorRegistrars == null) {
110+
initializeCoderTranslatorRegistrars();
111+
}
112+
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
113+
CoderTranslator translator = registrar.getCoderTranslator(coderClass);
114+
if (translator != null) {
115+
return translator;
116+
}
117+
}
118+
return null;
119+
}
120+
121+
static Class<? extends Coder> getCoderForUrn(String coderUrn) {
122+
if (coderTranslatorRegistrars == null) {
123+
initializeCoderTranslatorRegistrars();
124+
}
125+
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
126+
Class<? extends Coder> coder = registrar.getCoderForUrn(coderUrn);
127+
if (coder != null) {
128+
return coder;
129+
}
130+
}
131+
return null;
132+
}
133+
83134
@VisibleForTesting
84135
@Deterministic
85136
static Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getKnownTranslators() {
@@ -107,7 +158,7 @@ public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOE
107158

108159
public static RunnerApi.Coder toProto(Coder<?> coder, SdkComponents components)
109160
throws IOException {
110-
if (getKnownCoderUrns().containsKey(coder.getClass())) {
161+
if (isKnownCoder(coder, components.getPipelineOptions())) {
111162
return toKnownCoder(coder, components);
112163
}
113164

@@ -129,7 +180,10 @@ private static RunnerApi.Coder toUnknownCoderWrapper(UnknownCoderWrapper coder)
129180

130181
private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
131182
throws IOException {
132-
CoderTranslator translator = getKnownTranslators().get(coder.getClass());
183+
CoderTranslator translator = getCoderTranslator(coder.getClass());
184+
if (translator == null) {
185+
throw new IOException("Unable to find CoderTranslator for known Coder");
186+
}
133187
List<String> componentIds = registerComponents(coder, translator, components);
134188
return RunnerApi.Coder.newBuilder()
135189
.addAllComponentCoderIds(componentIds)
@@ -186,8 +240,8 @@ private static Coder<?> fromKnownCoder(
186240
components.getComponents().getCodersOrThrow(componentId), components, context);
187241
coderComponents.add(innerCoder);
188242
}
189-
Class<? extends Coder> coderType = getKnownCoderUrns().inverse().get(coderUrn);
190-
CoderTranslator<?> translator = getKnownTranslators().get(coderType);
243+
Class<? extends Coder> coderType = getCoderForUrn(coderUrn);
244+
CoderTranslator<?> translator = getCoderTranslator(coderType);
191245
if (translator != null) {
192246
return translator.fromComponents(
193247
coderComponents, coder.getSpec().getPayload().toByteArray(), context);

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* additional payload, which is not currently supported. This exists as a temporary measure.
2929
*/
3030
public interface CoderTranslator<T extends Coder<?>> {
31+
3132
/** Extract all component {@link Coder coders} within a coder. */
3233
List<? extends Coder<?>> getComponents(T from);
3334

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslatorRegistrar.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import java.util.Map;
2121
import org.apache.beam.sdk.coders.Coder;
22+
import org.apache.beam.sdk.options.PipelineOptions;
23+
import org.checkerframework.checker.nullness.qual.Nullable;
2224

2325
/** A registrar of {@link Coder} URNs to the associated {@link CoderTranslator}. */
2426
@SuppressWarnings({
@@ -34,4 +36,18 @@ public interface CoderTranslatorRegistrar {
3436

3537
/** Returns a mapping of URN to {@link CoderTranslator}. */
3638
Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators();
39+
40+
/**
41+
* Returns whether the given Coder is known to this CoderTranslatorRegistrar. If the Coder is
42+
* known, then getCoderTranslator() will return a non-null CoderTranslator.
43+
*/
44+
boolean isKnownCoder(Coder<?> coder, PipelineOptions options);
45+
46+
/** Returns the CoderTranslator to use for this Coder, or null if the Coder is not known. */
47+
@Nullable
48+
CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder> coderClass);
49+
50+
/** Returns the Coder to use for the given Urn, or null if the Urn is for an unknown Coder. */
51+
@Nullable
52+
Class<? extends Coder> getCoderForUrn(String coderUrn);
3753
}

0 commit comments

Comments
 (0)