Skip to content

Commit 36f5e8b

Browse files
committed
add valuekind
1 parent a615657 commit 36f5e8b

21 files changed

Lines changed: 815 additions & 56 deletions

File tree

model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,15 @@ message Elements {
757757
}
758758
}
759759

760+
message ValueKind {
761+
enum Enum {
762+
INSERT = 0;
763+
UPDATE_BEFORE = 1;
764+
UPDATE_AFTER = 2;
765+
DELETE = 3;
766+
}
767+
}
768+
760769
// Element metadata passed as part of WindowedValue to make WindowedValue
761770
// extensible and backward compatible
762771
message ElementMetadata {
@@ -770,6 +779,9 @@ message Elements {
770779
// across IOs - Kafka, PubSub, http.
771780
// Example value: congo=t61rcWkgMzE
772781
optional string tracestate = 3;
782+
// (Optional) The kind of value for CDC metadata.
783+
// If missing or unspecified, implies INSERT for backwards compatibility.
784+
optional ValueKind.Enum value_kind = 4;
773785
}
774786

775787
// Represent the encoded user timer for a given instruction, transform and

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.beam.sdk.values.PCollectionView;
5353
import org.apache.beam.sdk.values.Row;
5454
import org.apache.beam.sdk.values.TupleTag;
55+
import org.apache.beam.sdk.values.ValueKind;
5556
import org.apache.beam.sdk.values.WindowedValue;
5657
import org.apache.beam.sdk.values.WindowedValues;
5758
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -407,6 +408,11 @@ public CausedByDrain causedByDrain() {
407408
return element.causedByDrain();
408409
}
409410

411+
@Override
412+
public ValueKind valueKind() {
413+
return element.getValueKind();
414+
}
415+
410416
@Override
411417
public PipelineOptions getPipelineOptions() {
412418
return pipelineOptions;
@@ -454,6 +460,31 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp)
454460
element.causedByDrain()));
455461
}
456462

463+
@Override
464+
public void outputWithKind(OutputT output, ValueKind kind) {
465+
outputWithKind(mainOutputTag, output, kind);
466+
}
467+
468+
@Override
469+
public <T> void outputWithKind(TupleTag<T> tag, T value, ValueKind kind) {
470+
noteOutput();
471+
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
472+
((TimestampObservingWatermarkEstimator) watermarkEstimator)
473+
.observeTimestamp(element.getTimestamp());
474+
}
475+
outputReceiver.output(
476+
tag,
477+
WindowedValues.of(
478+
value,
479+
element.getTimestamp(),
480+
element.getWindows(),
481+
element.getPaneInfo(),
482+
element.getRecordId(),
483+
element.getRecordOffset(),
484+
element.causedByDrain(),
485+
kind));
486+
}
487+
457488
@Override
458489
public <T> void outputWindowedValue(
459490
TupleTag<T> tag,

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.beam.sdk.values.PCollectionView;
6262
import org.apache.beam.sdk.values.Row;
6363
import org.apache.beam.sdk.values.TupleTag;
64+
import org.apache.beam.sdk.values.ValueKind;
6465
import org.apache.beam.sdk.values.WindowedValue;
6566
import org.apache.beam.sdk.values.WindowedValues;
6667
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -438,6 +439,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
438439
outputWithTimestamp(mainOutputTag, output, timestamp);
439440
}
440441

442+
@Override
443+
public void outputWithKind(OutputT output, ValueKind kind) {
444+
outputWithKind(mainOutputTag, output, kind);
445+
}
446+
441447
@Override
442448
public void outputWindowedValue(
443449
OutputT output,
@@ -471,6 +477,22 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
471477
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo());
472478
}
473479

480+
@Override
481+
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
482+
builderSupplier
483+
.builder(output)
484+
.setTimestamp(elem.getTimestamp())
485+
.setWindows(elem.getWindows())
486+
.setPaneInfo(elem.getPaneInfo())
487+
.setValueKind(kind)
488+
.setReceiver(
489+
wv -> {
490+
checkTimestamp(elem.getTimestamp(), wv.getTimestamp());
491+
SimpleDoFnRunner.this.outputWindowedValue(tag, wv);
492+
})
493+
.output();
494+
}
495+
474496
@Override
475497
public <T> void outputWindowedValue(
476498
TupleTag<T> tag,
@@ -506,6 +528,11 @@ public Instant timestamp() {
506528
return elem.getRecordOffset();
507529
}
508530

531+
@Override
532+
public ValueKind valueKind() {
533+
return elem.getValueKind();
534+
}
535+
509536
public Collection<? extends BoundedWindow> windows() {
510537
return elem.getWindows();
511538
}
@@ -581,6 +608,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
581608
return elem.causedByDrain();
582609
}
583610

611+
@Override
612+
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
613+
return elem.getValueKind();
614+
}
615+
584616
@Override
585617
public String timerId(DoFn<InputT, OutputT> doFn) {
586618
throw new UnsupportedOperationException(
@@ -862,6 +894,11 @@ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
862894
return causedByDrain;
863895
}
864896

897+
@Override
898+
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
899+
throw new UnsupportedOperationException("ValueKind parameters are not supported.");
900+
}
901+
865902
@Override
866903
public String timerId(DoFn<InputT, OutputT> doFn) {
867904
return timerId;
@@ -1008,6 +1045,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
10081045
outputWithTimestamp(mainOutputTag, output, timestamp);
10091046
}
10101047

1048+
@Override
1049+
public void outputWithKind(OutputT output, ValueKind kind) {
1050+
outputWithKind(mainOutputTag, output, kind);
1051+
}
1052+
10111053
@Override
10121054
public void outputWindowedValue(
10131055
OutputT output,
@@ -1030,6 +1072,19 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
10301072
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
10311073
}
10321074

1075+
@Override
1076+
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
1077+
checkTimestamp(timestamp(), timestamp);
1078+
builderSupplier
1079+
.builder(output)
1080+
.setTimestamp(timestamp)
1081+
.setWindows(Collections.singleton(window()))
1082+
.setPaneInfo(PaneInfo.NO_FIRING)
1083+
.setValueKind(kind)
1084+
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
1085+
.output();
1086+
}
1087+
10331088
@Override
10341089
public <T> void outputWindowedValue(
10351090
TupleTag<T> tag,
@@ -1177,6 +1232,11 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
11771232
"Cannot access time domain outside of @ProcessTimer method.");
11781233
}
11791234

1235+
@Override
1236+
public ValueKind valueKind(DoFn<InputT, OutputT> doFn) {
1237+
throw new UnsupportedOperationException("ValueKind parameters are not supported.");
1238+
}
1239+
11801240
@Override
11811241
public KeyT key() {
11821242
return key;
@@ -1279,6 +1339,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
12791339
outputWithTimestamp(mainOutputTag, output, timestamp);
12801340
}
12811341

1342+
@Override
1343+
public void outputWithKind(OutputT output, ValueKind kind) {
1344+
outputWithKind(mainOutputTag, output, kind);
1345+
}
1346+
12821347
@Override
12831348
public void outputWindowedValue(
12841349
OutputT output,
@@ -1301,6 +1366,18 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
13011366
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
13021367
}
13031368

1369+
@Override
1370+
public <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind) {
1371+
checkTimestamp(this.timestamp, timestamp);
1372+
builderSupplier
1373+
.builder(output)
1374+
.setTimestamp(timestamp)
1375+
.setWindows(Collections.singleton(window()))
1376+
.setPaneInfo(PaneInfo.NO_FIRING)
1377+
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
1378+
.output();
1379+
}
1380+
13041381
@Override
13051382
public <T> void outputWindowedValue(
13061383
TupleTag<T> tag,

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
3939
import org.apache.beam.sdk.values.CausedByDrain;
4040
import org.apache.beam.sdk.values.KV;
41+
import org.apache.beam.sdk.values.ValueKind;
42+
import org.apache.beam.sdk.values.ValueKindUtil;
4143
import org.apache.beam.sdk.values.WindowedValue;
4244
import org.apache.beam.sdk.values.WindowedValues;
4345
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
@@ -139,13 +141,17 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
139141
* drain happened upstream
140142
*/
141143
CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
144+
ValueKind valueKind = ValueKind.INSERT;
142145
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
143146
BeamFnApi.Elements.ElementMetadata elementMetadata =
144147
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
145148
drainingValueFromUpstream =
146149
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
147150
? CausedByDrain.CAUSED_BY_DRAIN
148151
: CausedByDrain.NORMAL;
152+
if (elementMetadata.hasValueKind()) {
153+
valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
154+
}
149155
}
150156
if (valueCoder instanceof KvCoder) {
151157
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
@@ -156,7 +162,14 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
156162
T result =
157163
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
158164
return WindowedValues.of(
159-
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
165+
result,
166+
timestampMillis,
167+
windows,
168+
paneInfo,
169+
null,
170+
null,
171+
drainingValueFromUpstream,
172+
valueKind);
160173
} else {
161174
notifyElementRead(data.available() + metadata.available());
162175
return WindowedValues.of(
@@ -166,7 +179,8 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
166179
paneInfo,
167180
null,
168181
null,
169-
drainingValueFromUpstream);
182+
drainingValueFromUpstream,
183+
valueKind);
170184
}
171185
}
172186

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
4141
import org.apache.beam.sdk.util.ByteStringOutputStream;
4242
import org.apache.beam.sdk.values.KV;
43+
import org.apache.beam.sdk.values.ValueKindUtil;
4344
import org.apache.beam.sdk.values.ValueWithRecordId;
4445
import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
4546
import org.apache.beam.sdk.values.WindowedValue;
@@ -220,7 +221,9 @@ public long add(WindowedValue<T> data) throws IOException {
220221
ByteString id = ByteString.EMPTY;
221222
// todo #33176 specify additional metadata in the future
222223
BeamFnApi.Elements.ElementMetadata additionalMetadata =
223-
BeamFnApi.Elements.ElementMetadata.newBuilder().build();
224+
BeamFnApi.Elements.ElementMetadata.newBuilder()
225+
.setValueKind(ValueKindUtil.toProto(data.getValueKind()))
226+
.build();
224227
ByteString metadata =
225228
encodeMetadata(
226229
stream, windowsCoder, data.getWindows(), data.getPaneInfo(), additionalMetadata);

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.beam.sdk.values.Row;
5252
import org.apache.beam.sdk.values.TupleTag;
5353
import org.apache.beam.sdk.values.TypeDescriptor;
54+
import org.apache.beam.sdk.values.ValueKind;
5455
import org.apache.beam.sdk.values.WindowedValue;
5556
import org.apache.beam.sdk.values.WindowingStrategy;
5657
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -188,6 +189,25 @@ public abstract class WindowedContext {
188189
*/
189190
public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
190191

192+
/**
193+
* Adds the given element to the main output {@code PCollection}, with the given {@link
194+
* ValueKind}.
195+
*
196+
* <p>Once passed to {@code outputWithKind} the element should not be modified in any way.
197+
*
198+
* <p>If invoked from {@link ProcessElement}, the output element will have the same windowing
199+
* metadata as the input element.
200+
*
201+
* <p>If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the
202+
* {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to
203+
* determine what windows the element should be in, throwing an exception if the {@code
204+
* WindowFn} attempts to access any information about the input element.
205+
*
206+
* <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle}
207+
* or {@link FinishBundle} methods.
208+
*/
209+
public abstract void outputWithKind(OutputT output, ValueKind kind);
210+
191211
/**
192212
* Adds the given element to the main output {@code PCollection}, with the given windowing
193213
* metadata.
@@ -289,6 +309,17 @@ public abstract <T> void outputWindowedValue(
289309
public abstract <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue);
290310

291311
public abstract void outputWindowedValue(WindowedValue<OutputT> windowedValue);
312+
313+
/**
314+
* Adds the given element to the main output {@code PCollection} with the given {@link
315+
* ValueKind}.
316+
*
317+
* <p>Once passed to {@code outputWithKind} the element should not be modified in any way.
318+
*
319+
* <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from {@link StartBundle}
320+
* or {@link FinishBundle} methods.
321+
*/
322+
public abstract <T> void outputWithKind(TupleTag<T> tag, T output, ValueKind kind);
292323
}
293324

294325
/** Information accessible when running a {@link DoFn.ProcessElement} method. */
@@ -338,6 +369,9 @@ public abstract class ProcessContext extends WindowedContext {
338369

339370
@Pure
340371
public abstract org.apache.beam.sdk.values.CausedByDrain causedByDrain();
372+
373+
@Pure
374+
public abstract ValueKind valueKind();
341375
}
342376

343377
/** Information accessible when running a {@link DoFn.OnTimer} method. */
@@ -419,13 +453,31 @@ default void outputWithTimestamp(T value, Instant timestamp) {
419453
builder(value).setTimestamp(timestamp).output();
420454
}
421455

456+
default void outputWithKind(T value, ValueKind valueKind) {
457+
builder(value).setValueKind(valueKind).output();
458+
}
459+
422460
default void outputWindowedValue(
423461
T value,
424462
Instant timestamp,
425463
Collection<? extends BoundedWindow> windows,
426464
PaneInfo paneInfo) {
427465
builder(value).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output();
428466
}
467+
468+
default void outputWindowedValue(
469+
T value,
470+
Instant timestamp,
471+
Collection<? extends BoundedWindow> windows,
472+
PaneInfo paneInfo,
473+
ValueKind valueKind) {
474+
builder(value)
475+
.setTimestamp(timestamp)
476+
.setWindows(windows)
477+
.setPaneInfo(paneInfo)
478+
.setValueKind(valueKind)
479+
.output();
480+
}
429481
}
430482

431483
/** Receives tagged output for a multi-output function. */

0 commit comments

Comments
 (0)