>> runner =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index c1568058435b..68af5ca63af9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -327,7 +327,7 @@ public void testCoderIsSerializableWithWellKnownCoderType() {
@Test
public void testDrainPropagated() throws Exception {
- WindowedValues.FullWindowedValueCoder.setMetadataSupported();
+ WindowedValues.FullWindowedValueCoder.setMetadataSupported(true);
Windmill.WorkItem.Builder workItem =
Windmill.WorkItem.newBuilder()
.setKey(SERIALIZED_KEY)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
index 4bc01bd205a9..5bb9ba7036d0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java
@@ -34,6 +34,7 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
@@ -121,6 +122,11 @@ public CausedByDrain causedByDrain() {
return CausedByDrain.NORMAL;
}
+ @Override
+ public ValueKind getValueKind() {
+ return ValueKind.INSERT;
+ }
+
@Override
public @Nullable Long getRecordOffset() {
return null;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 0d892ab12d33..c01d29fbd051 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -51,6 +51,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -188,6 +189,25 @@ public abstract class WindowedContext {
*/
public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
+ /**
+ * Adds the given element to the main output {@code PCollection}, with the given {@link
+ * ValueKind}.
+ *
+ * Once passed to {@code outputWithKind} the element should not be modified in any way.
+ *
+ *
If invoked from {@link ProcessElement}, the output element will have the same windowing
+ * metadata as the input element.
+ *
+ *
If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to
+ * determine what windows the element should be in, throwing an exception if the {@code
+ * WindowFn} attempts to access any information about the input element.
+ *
+ *
Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle}
+ * or {@link FinishBundle} methods.
+ */
+ public abstract void outputWithKind(OutputT output, ValueKind kind);
+
/**
* Adds the given element to the main output {@code PCollection}, with the given windowing
* metadata.
@@ -289,6 +309,17 @@ public abstract void outputWindowedValue(
public abstract void outputWindowedValue(TupleTag tag, WindowedValue windowedValue);
public abstract void outputWindowedValue(WindowedValue windowedValue);
+
+ /**
+ * Adds the given element to the main output {@code PCollection} with the given {@link
+ * ValueKind}.
+ *
+ * Once passed to {@code outputWithKind} the element should not be modified in any way.
+ *
+ *
Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle}
+ * or {@link FinishBundle} methods.
+ */
+ public abstract void outputWithKind(TupleTag tag, T output, ValueKind kind);
}
/** Information accessible when running a {@link DoFn.ProcessElement} method. */
@@ -338,6 +369,9 @@ public abstract class ProcessContext extends WindowedContext {
@Pure
public abstract org.apache.beam.sdk.values.CausedByDrain causedByDrain();
+
+ @Pure
+ public abstract ValueKind valueKind();
}
/** Information accessible when running a {@link DoFn.OnTimer} method. */
@@ -419,6 +453,10 @@ default void outputWithTimestamp(T value, Instant timestamp) {
builder(value).setTimestamp(timestamp).output();
}
+ default void outputWithKind(T value, ValueKind valueKind) {
+ builder(value).setValueKind(valueKind).output();
+ }
+
default void outputWindowedValue(
T value,
Instant timestamp,
@@ -426,6 +464,20 @@ default void outputWindowedValue(
PaneInfo paneInfo) {
builder(value).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output();
}
+
+ default void outputWindowedValue(
+ T value,
+ Instant timestamp,
+ Collection extends BoundedWindow> windows,
+ PaneInfo paneInfo,
+ ValueKind valueKind) {
+ builder(value)
+ .setTimestamp(timestamp)
+ .setWindows(windows)
+ .setPaneInfo(paneInfo)
+ .setValueKind(valueKind)
+ .output();
+ }
}
/** Receives tagged output for a multi-output function. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 4de2c3d2c9c0..4f4e08074daf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -56,6 +56,7 @@
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
@@ -502,7 +503,8 @@ public void output(TupleTag tag, T output, Instant timestamp, BoundedWind
PaneInfo.NO_FIRING,
null,
null,
- CausedByDrain.NORMAL));
+ CausedByDrain.NORMAL,
+ ValueKind.INSERT));
}
};
}
@@ -606,6 +608,11 @@ public CausedByDrain causedByDrain() {
return element.getCausedByDrain();
}
+ @Override
+ public ValueKind valueKind() {
+ return element.getValueKind();
+ }
+
@Override
public PipelineOptions getPipelineOptions() {
return options;
@@ -621,6 +628,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}
+ @Override
+ public void outputWithKind(OutputT output, ValueKind kind) {
+ outputWithKind(mainOutputTag, output, kind);
+ }
+
@Override
public void outputWindowedValue(
OutputT output,
@@ -646,7 +658,23 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp
element.getPaneInfo(),
null,
null,
- CausedByDrain.NORMAL));
+ CausedByDrain.NORMAL,
+ ValueKind.INSERT));
+ }
+
+ @Override
+ public void outputWithKind(TupleTag tag, T output, ValueKind kind) {
+ getMutableOutput(tag)
+ .add(
+ ValueInSingleWindow.of(
+ output,
+ element.getTimestamp(),
+ element.getWindow(),
+ element.getPaneInfo(),
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ kind));
}
@Override
@@ -666,7 +694,8 @@ public void outputWindowedValue(TupleTag tag, WindowedValue windowedVa
windowedValue.getPaneInfo(),
windowedValue.getRecordId(),
windowedValue.getRecordOffset(),
- windowedValue.causedByDrain()));
+ windowedValue.causedByDrain(),
+ windowedValue.getValueKind()));
}
}
@@ -681,7 +710,14 @@ public void outputWindowedValue(
getMutableOutput(tag)
.add(
ValueInSingleWindow.of(
- output, timestamp, w, paneInfo, null, null, CausedByDrain.NORMAL));
+ output,
+ timestamp,
+ w,
+ paneInfo,
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ ValueKind.INSERT));
}
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java
index 44f27824382d..5463365e4c6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java
@@ -188,6 +188,7 @@ public void processElement(
.setWindow(kv.getValue().getWindow())
.setPaneInfo(kv.getValue().getPaneInfo())
.setCausedByDrain(kv.getValue().getCausedByDrain())
+ .setValueKind(kv.getValue().getValueKind())
.output();
}
}));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
index 9974d29bfa0f..83fbb04ee0fe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
@@ -30,6 +30,7 @@
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.ValueKind;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -148,6 +149,7 @@ public void processElement(
BoundedWindow window,
PaneInfo paneInfo,
CausedByDrain causedByDrain,
+ ValueKind valueKind,
OutputReceiver>> r) {
r.output(
KV.of(
@@ -159,7 +161,8 @@ public void processElement(
paneInfo,
pc.currentRecordId(),
pc.currentRecordOffset(),
- causedByDrain)));
+ causedByDrain,
+ valueKind)));
}
}))
.setCoder(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
index 0a8d058107b8..594bdfc85039 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
@@ -190,6 +190,7 @@ public void processElement(
.setTimestamp(kv.getValue().getTimestamp())
.setWindow(kv.getValue().getWindow())
.setPaneInfo(kv.getValue().getPaneInfo())
+ .setValueKind(kv.getValue().getValueKind())
.output();
}
}));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 54d630d92fe4..8cc2d2e1b903 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -94,6 +94,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerFamilyParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ValueKindParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
@@ -128,6 +129,7 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
public static final String SCHEMA_ELEMENT_PARAMETER_METHOD = "schemaElement";
public static final String TIMESTAMP_PARAMETER_METHOD = "timestamp";
public static final String CAUSED_BY_DRAIN_PARAMETER_METHOD = "causedByDrain";
+ public static final String VALUE_KIND_PARAMETER_METHOD = "valueKind";
public static final String BUNDLE_FINALIZER_PARAMETER_METHOD = "bundleFinalizer";
public static final String OUTPUT_ROW_RECEIVER_METHOD = "outputRowReceiver";
public static final String TIME_DOMAIN_PARAMETER_METHOD = "timeDomain";
@@ -1111,6 +1113,15 @@ public StackManipulation dispatch(CausedByDrainParameter p) {
CAUSED_BY_DRAIN_PARAMETER_METHOD, DoFn.class)));
}
+ @Override
+ public StackManipulation dispatch(ValueKindParameter p) {
+ return new StackManipulation.Compound(
+ pushDelegate,
+ MethodInvocation.invoke(
+ getExtraContextFactoryMethodDescription(
+ VALUE_KIND_PARAMETER_METHOD, DoFn.class)));
+ }
+
@Override
public StackManipulation dispatch(BundleFinalizerParameter p) {
return simpleExtraContextParameter(BUNDLE_FINALIZER_PARAMETER_METHOD);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index a615761292aa..940df5356d54 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -43,6 +43,7 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
@@ -221,6 +222,9 @@ interface ArgumentProvider {
/** Provide a reference to the caused by drain. */
CausedByDrain causedByDrain(DoFn doFn);
+ /** Provide a reference to the {@link ValueKind}. */
+ ValueKind valueKind(DoFn doFn);
+
/** Provide a reference to the time domain for a timer firing. */
TimeDomain timeDomain(DoFn doFn);
@@ -335,6 +339,12 @@ public CausedByDrain causedByDrain(DoFn doFn) {
String.format("CausedByDrain unsupported in %s", getErrorContext()));
}
+ @Override
+ public ValueKind valueKind(DoFn doFn) {
+ throw new UnsupportedOperationException(
+ String.format("ValueKind unsupported in %s", getErrorContext()));
+ }
+
@Override
public String timerId(DoFn doFn) {
throw new UnsupportedOperationException(
@@ -529,6 +539,11 @@ public CausedByDrain causedByDrain(DoFn doFn) {
return delegate.causedByDrain(doFn);
}
+ @Override
+ public ValueKind valueKind(DoFn doFn) {
+ return delegate.valueKind(doFn);
+ }
+
@Override
public TimeDomain timeDomain(DoFn doFn) {
return delegate.timeDomain(doFn);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 83c2a67655bd..596179e4b623 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -345,6 +345,8 @@ public ResultT match(Cases cases) {
return cases.dispatch((BundleFinalizerParameter) this);
} else if (this instanceof CausedByDrainParameter) {
return cases.dispatch((CausedByDrainParameter) this);
+ } else if (this instanceof ValueKindParameter) {
+ return cases.dispatch((ValueKindParameter) this);
} else if (this instanceof KeyParameter) {
return cases.dispatch((KeyParameter) this);
} else {
@@ -405,6 +407,8 @@ public interface Cases {
ResultT dispatch(CausedByDrainParameter p);
+ ResultT dispatch(ValueKindParameter p);
+
ResultT dispatch(KeyParameter p);
/** A base class for a visitor with a default method for cases it is not interested in. */
@@ -507,6 +511,11 @@ public ResultT dispatch(CausedByDrainParameter p) {
return dispatchDefault(p);
}
+ @Override
+ public ResultT dispatch(ValueKindParameter p) {
+ return dispatchDefault(p);
+ }
+
@Override
public ResultT dispatch(StateParameter p) {
return dispatchDefault(p);
@@ -564,6 +573,8 @@ public ResultT dispatch(KeyParameter p) {
new AutoValue_DoFnSignature_Parameter_BundleFinalizerParameter();
private static final CausedByDrainParameter CAUSED_BY_DRAIN_PARAMETER =
new AutoValue_DoFnSignature_Parameter_CausedByDrainParameter();
+ private static final ValueKindParameter VALUE_KIND_PARAMETER =
+ new AutoValue_DoFnSignature_Parameter_ValueKindParameter();
private static final OnWindowExpirationContextParameter ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER =
new AutoValue_DoFnSignature_Parameter_OnWindowExpirationContextParameter();
@@ -592,6 +603,11 @@ public static CausedByDrainParameter causedByDrainParameter() {
return CAUSED_BY_DRAIN_PARAMETER;
}
+ /** Returns a {@link ValueKindParameter}. */
+ public static ValueKindParameter valueKindParameter() {
+ return VALUE_KIND_PARAMETER;
+ }
+
public static ElementParameter elementParameter(TypeDescriptor> elementT) {
return new AutoValue_DoFnSignature_Parameter_ElementParameter(elementT);
}
@@ -754,6 +770,16 @@ public abstract static class CausedByDrainParameter extends Parameter {
CausedByDrainParameter() {}
}
+ /**
+ * Descriptor for a {@link Parameter} of type {@link org.apache.beam.sdk.values.ValueKind}.
+ *
+ * All such descriptors are equal.
+ */
+ @AutoValue
+ public abstract static class ValueKindParameter extends Parameter {
+ ValueKindParameter() {}
+ }
+
/**
* Descriptor for a {@link Parameter} of type {@link DoFn.Element}.
*
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 3dcf7ff1f9d0..b465d64e954a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -98,6 +98,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.TypeParameter;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -141,6 +142,7 @@ private DoFnSignatures() {}
Parameter.SideInputParameter.class,
Parameter.TimerFamilyParameter.class,
Parameter.CausedByDrainParameter.class,
+ Parameter.ValueKindParameter.class,
Parameter.BundleFinalizerParameter.class);
private static final ImmutableList>
@@ -158,6 +160,7 @@ private DoFnSignatures() {}
Parameter.WatermarkEstimatorParameter.class,
Parameter.SideInputParameter.class,
Parameter.CausedByDrainParameter.class,
+ Parameter.ValueKindParameter.class,
Parameter.BundleFinalizerParameter.class);
private static final ImmutableList> ALLOWED_SETUP_PARAMETERS =
@@ -1367,6 +1370,11 @@ private static Parameter analyzeExtraParameter(
rawType.equals(CausedByDrain.class),
"CausedByDrain argument must have type org.apache.beam.sdk.values.CausedByDrain.");
return Parameter.causedByDrainParameter();
+ } else if (ValueKind.class.isAssignableFrom(rawType)) {
+ methodErrors.checkArgument(
+ rawType.equals(ValueKind.class),
+ "ValueKind argument must have type org.apache.beam.sdk.values.ValueKind.");
+ return Parameter.valueKindParameter();
} else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) {
String sideInputId = getSideInputId(param.getAnnotations());
paramErrors.checkArgument(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
index 5adf3d32a1a9..dbfd298d3447 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
@@ -56,6 +56,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
@@ -520,6 +521,11 @@ public CausedByDrain causedByDrain() {
return outerContext.causedByDrain();
}
+ @Override
+ public ValueKind valueKind() {
+ return outerContext.valueKind();
+ }
+
@Override
public Object sideInput(String tagId) {
PCollectionView> view = sideInputMapping.get(tagId);
@@ -549,6 +555,11 @@ public CausedByDrain causedByDrain(DoFn doFn) {
return outerContext.causedByDrain();
}
+ @Override
+ public ValueKind valueKind(DoFn doFn) {
+ return outerContext.valueKind();
+ }
+
@Override
public String timerId(DoFn doFn) {
throw new UnsupportedOperationException();
@@ -619,6 +630,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outerContext.outputWithTimestamp(output, timestamp);
}
+ @Override
+ public void outputWithKind(OutputT output, ValueKind kind) {
+ outerContext.outputWithKind(output, kind);
+ }
+
@Override
public void outputWindowedValue(
OutputT output,
@@ -638,6 +654,11 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp
outerContext.outputWithTimestamp(tag, output, timestamp);
}
+ @Override
+ public void outputWithKind(TupleTag tag, T output, ValueKind kind) {
+ outerContext.outputWithKind(tag, output, kind);
+ }
+
@Override
public void outputWindowedValue(
TupleTag tag,
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
index 67473e567d63..27022461d361 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java
@@ -50,5 +50,7 @@ public interface OutputBuilder extends WindowedValue {
OutputBuilder setCausedByDrain(CausedByDrain causedByDrain);
+ OutputBuilder setValueKind(ValueKind valueKind);
+
void output();
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index 0d8b2f7515e8..8d9551374616 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -68,6 +68,8 @@ public T getValue() {
public abstract CausedByDrain getCausedByDrain();
+ public abstract ValueKind getValueKind();
+
// todo #33176 specify additional metadata in the future
public static ValueInSingleWindow of(
T value,
@@ -76,14 +78,23 @@ public static ValueInSingleWindow of(
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
- CausedByDrain causedByDrain) {
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
return new AutoValue_ValueInSingleWindow<>(
- value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ value,
+ timestamp,
+ window,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ valueKind);
}
public static ValueInSingleWindow of(
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
- return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL);
+ return of(
+ value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT);
}
/** A coder for {@link ValueInSingleWindow}. */
@@ -127,6 +138,7 @@ public void encode(ValueInSingleWindow windowedElem, OutputStream outStream,
windowedElem.getCausedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
+ builder.setValueKind(ValueKindUtil.toProto(windowedElem.getValueKind()));
BeamFnApi.Elements.ElementMetadata metadata = builder.build();
ByteArrayCoder.of().encode(metadata.toByteArray(), outStream);
}
@@ -146,6 +158,7 @@ public ValueInSingleWindow decode(InputStream inStream, Context context) thro
BoundedWindow window = windowCoder.decode(inStream);
PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
CausedByDrain causedByDrain = CausedByDrain.NORMAL;
+ ValueKind valueKind = ValueKind.INSERT;
if (WindowedValues.WindowedValueCoder.isMetadataSupported() && paneInfo.isElementMetadata()) {
BeamFnApi.Elements.ElementMetadata elementMetadata =
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
@@ -153,12 +166,15 @@ public ValueInSingleWindow decode(InputStream inStream, Context context) thro
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
+ if (elementMetadata.hasValueKind()) {
+ valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
+ }
}
T value = valueCoder.decode(inStream, context);
// todo #33176 specify additional metadata in the future
return new AutoValue_ValueInSingleWindow<>(
- value, timestamp, window, paneInfo, null, null, causedByDrain);
+ value, timestamp, window, paneInfo, null, null, causedByDrain, valueKind);
}
@Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java
new file mode 100644
index 000000000000..7a190496ca2c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+/** The type of change operation represented by a Change Data Capture (CDC) record. */
+public enum ValueKind {
+ /** Indicates a new record was created in the source system. */
+ INSERT,
+
+ /**
+ * Indicates the state of a record immediately before an update occurred. This is typically
+ * used to identify the previous values of modified columns or to locate the record via its
+ * primary key.
+ */
+ UPDATE_BEFORE,
+
+ /**
+ * Indicates the state of a record immediately after an update occurred. Represents the
+ * current, valid state of the record following the change.
+ */
+ UPDATE_AFTER,
+
+ /** Indicates that an existing record was removed from the source system. */
+ DELETE
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java
new file mode 100644
index 000000000000..e3b65c65fc62
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
+
+/** Utility class for converting between {@link ValueKind} and {@link Elements.ValueKind.Enum}. */
+public class ValueKindUtil {
+ public static Elements.ValueKind.Enum toProto(ValueKind valueKind) {
+ switch (valueKind) {
+ case INSERT:
+ return Elements.ValueKind.Enum.INSERT;
+ case UPDATE_BEFORE:
+ return Elements.ValueKind.Enum.UPDATE_BEFORE;
+ case UPDATE_AFTER:
+ return Elements.ValueKind.Enum.UPDATE_AFTER;
+ case DELETE:
+ return Elements.ValueKind.Enum.DELETE;
+ default:
+ throw new IllegalArgumentException("Unknown ValueKind: " + valueKind);
+ }
+ }
+
+ public static ValueKind fromProto(Elements.ValueKind.Enum proto) {
+ switch (proto) {
+ case INSERT:
+ return ValueKind.INSERT;
+ case UPDATE_BEFORE:
+ return ValueKind.UPDATE_BEFORE;
+ case UPDATE_AFTER:
+ return ValueKind.UPDATE_AFTER;
+ case DELETE:
+ return ValueKind.DELETE;
+ default:
+ throw new IllegalArgumentException("Unknown ValueKind: " + proto);
+ }
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
index daebeb31a39c..d75926aa95c8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java
@@ -54,6 +54,10 @@ public interface WindowedValue {
CausedByDrain causedByDrain();
+ /** Returns the {@link ValueKind} associated with this WindowedValue. */
+ @Pure
+ ValueKind getValueKind();
+
/**
* A representation of each of the actual values represented by this compressed {@link
* WindowedValue}, one per window.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
index ba2720f5e39b..1524858f3b80 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
@@ -81,7 +81,8 @@ public static Builder builder(WindowedValue template) {
.setPaneInfo(template.getPaneInfo())
.setRecordOffset(template.getRecordOffset())
.setRecordId(template.getRecordId())
- .setCausedByDrain(template.causedByDrain());
+ .setCausedByDrain(template.causedByDrain())
+ .setValueKind(template.getValueKind());
}
public static class Builder implements OutputBuilder {
@@ -104,6 +105,7 @@ public static class Builder implements OutputBuilder {
private @Nullable String recordId;
private @Nullable Long recordOffset;
private CausedByDrain causedByDrain = CausedByDrain.NORMAL;
+ private ValueKind valueKind = ValueKind.INSERT;
@Override
public Builder setValue(T value) {
@@ -154,6 +156,13 @@ public Builder setCausedByDrain(CausedByDrain causedByDrain) {
return this;
}
+ @Override
+ public Builder setValueKind(ValueKind valueKind) {
+ checkStateNotNull(valueKind, "ValueKind is null");
+ this.valueKind = valueKind;
+ return this;
+ }
+
public Builder setReceiver(WindowedValueReceiver receiver) {
this.receiver = receiver;
return this;
@@ -208,6 +217,12 @@ public CausedByDrain causedByDrain() {
return causedByDrain;
}
+ @Override
+ public ValueKind getValueKind() {
+ checkStateNotNull(valueKind, "ValueKind not set");
+ return valueKind;
+ }
+
@Override
public Collection> explodeWindows() {
throw new UnsupportedOperationException(
@@ -243,7 +258,8 @@ public WindowedValue build() {
getPaneInfo(),
getRecordId(),
getRecordOffset(),
- causedByDrain());
+ causedByDrain(),
+ getValueKind());
}
@Override
@@ -261,10 +277,10 @@ public String toString() {
public static WindowedValue of(
T value, Instant timestamp, Collection extends BoundedWindow> windows, PaneInfo paneInfo) {
- return of(value, timestamp, windows, paneInfo, null, null, CausedByDrain.NORMAL);
+ return of(
+ value, timestamp, windows, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT);
}
- /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */
public static WindowedValue of(
T value,
Instant timestamp,
@@ -273,9 +289,31 @@ public static WindowedValue of(
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain) {
+ return of(
+ value,
+ timestamp,
+ windows,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ ValueKind.INSERT);
+ }
+
+ /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */
+ public static WindowedValue of(
+ T value,
+ Instant timestamp,
+ Collection extends BoundedWindow> windows,
+ PaneInfo paneInfo,
+ @Nullable String currentRecordId,
+ @Nullable Long currentRecordOffset,
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none");
checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null");
+ checkArgument(valueKind != null, "WindowedValue requires ValueKind, but it was null");
if (windows.size() == 1) {
return of(
value,
@@ -284,10 +322,18 @@ public static WindowedValue of(
paneInfo,
currentRecordId,
currentRecordOffset,
- causedByDrain);
+ causedByDrain,
+ valueKind);
} else {
return new TimestampedValueInMultipleWindows<>(
- value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ value,
+ timestamp,
+ windows,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ valueKind);
}
}
@@ -298,12 +344,21 @@ static WindowedValue createWithoutValidation(
Instant timestamp,
Collection extends BoundedWindow> windows,
PaneInfo paneInfo,
- CausedByDrain causedByDrain) {
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
if (windows.size() == 1) {
- return of(value, timestamp, windows.iterator().next(), paneInfo, null, null, causedByDrain);
+ return of(
+ value,
+ timestamp,
+ windows.iterator().next(),
+ paneInfo,
+ null,
+ null,
+ causedByDrain,
+ valueKind);
} else {
return new TimestampedValueInMultipleWindows<>(
- value, timestamp, windows, paneInfo, null, null, causedByDrain);
+ value, timestamp, windows, paneInfo, null, null, causedByDrain, valueKind);
}
}
@@ -312,10 +367,10 @@ public static WindowedValue of(
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
- return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL);
+ return of(
+ value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT);
}
- /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */
public static WindowedValue of(
T value,
Instant timestamp,
@@ -324,19 +379,54 @@ public static WindowedValue of(
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain) {
+ return of(
+ value,
+ timestamp,
+ window,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ ValueKind.INSERT);
+ }
+
+ /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */
+ public static WindowedValue of(
+ T value,
+ Instant timestamp,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ @Nullable String currentRecordId,
+ @Nullable Long currentRecordOffset,
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null");
+ checkArgument(valueKind != null, "WindowedValue requires ValueKind, but it was null");
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
return new ValueInGlobalWindow<>(
- value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain, valueKind);
} else if (isGlobal) {
return new TimestampedValueInGlobalWindow<>(
- value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ value,
+ timestamp,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ valueKind);
} else {
return new TimestampedValueInSingleWindow<>(
- value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ value,
+ timestamp,
+ window,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ valueKind);
}
}
@@ -345,7 +435,8 @@ public static WindowedValue of(
* default timestamp and pane.
*/
public static WindowedValue valueInGlobalWindow(T value) {
- return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL);
+ return new ValueInGlobalWindow<>(
+ value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL, ValueKind.INSERT);
}
/**
@@ -353,7 +444,8 @@ public static WindowedValue valueInGlobalWindow(T value) {
* default timestamp and the specified pane.
*/
public static WindowedValue valueInGlobalWindow(T value, PaneInfo paneInfo) {
- return new ValueInGlobalWindow<>(value, paneInfo, null, null, CausedByDrain.NORMAL);
+ return new ValueInGlobalWindow<>(
+ value, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT);
}
/**
@@ -365,7 +457,7 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta
return valueInGlobalWindow(value);
} else {
return new TimestampedValueInGlobalWindow<>(
- value, timestamp, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL);
+ value, timestamp, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL, ValueKind.INSERT);
}
}
@@ -379,7 +471,7 @@ public static WindowedValue timestampedValueInGlobalWindow(
return timestampedValueInGlobalWindow(value, timestamp);
} else {
return new TimestampedValueInGlobalWindow<>(
- value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL);
+ value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT);
}
}
@@ -396,7 +488,8 @@ public static WindowedValue withValue(
windowedValue.getPaneInfo(),
windowedValue.getRecordId(),
windowedValue.getRecordOffset(),
- windowedValue.causedByDrain());
+ windowedValue.causedByDrain(),
+ windowedValue.getValueKind());
}
public static boolean equals(
@@ -448,6 +541,7 @@ private abstract static class SimpleWindowedValue implements WindowedValue
private final @Nullable String currentRecordId;
private final @Nullable Long currentRecordOffset;
private final CausedByDrain causedByDrain;
+ private final ValueKind valueKind;
@Override
public @Nullable String getRecordId() {
@@ -464,17 +558,24 @@ public CausedByDrain causedByDrain() {
return causedByDrain;
}
+ @Override
+ public ValueKind getValueKind() {
+ return valueKind;
+ }
+
protected SimpleWindowedValue(
T value,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
- CausedByDrain causedByDrain) {
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
this.value = value;
this.paneInfo = checkNotNull(paneInfo);
this.currentRecordId = currentRecordId;
this.currentRecordOffset = currentRecordOffset;
this.causedByDrain = causedByDrain;
+ this.valueKind = checkNotNull(valueKind, "ValueKind is null");
}
@Override
@@ -523,8 +624,9 @@ public MinTimestampWindowedValue(
PaneInfo pane,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
- CausedByDrain causedByDrain) {
- super(value, pane, currentRecordId, currentRecordOffset, causedByDrain);
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
+ super(value, pane, currentRecordId, currentRecordOffset, causedByDrain, valueKind);
}
@Override
@@ -542,8 +644,9 @@ public ValueInGlobalWindow(
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
- CausedByDrain causedByDrain) {
- super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
+ super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain, valueKind);
}
@Override
@@ -559,7 +662,12 @@ public BoundedWindow getWindow() {
@Override
public WindowedValue withValue(NewT newValue) {
return new ValueInGlobalWindow<>(
- newValue, getPaneInfo(), getRecordId(), getRecordOffset(), causedByDrain());
+ newValue,
+ getPaneInfo(),
+ getRecordId(),
+ getRecordOffset(),
+ causedByDrain(),
+ getValueKind());
}
@Override
@@ -598,8 +706,9 @@ public TimestampedWindowedValue(
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
- CausedByDrain causedByDrain) {
- super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
+ super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain, valueKind);
this.timestamp = checkNotNull(timestamp);
}
@@ -622,8 +731,16 @@ public TimestampedValueInGlobalWindow(
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
- CausedByDrain causedByDrain) {
- super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
+ super(
+ value,
+ timestamp,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ valueKind);
}
@Override
@@ -644,7 +761,8 @@ public WindowedValue withValue(NewT newValue) {
getPaneInfo(),
getRecordId(),
getRecordOffset(),
- causedByDrain());
+ causedByDrain(),
+ getValueKind());
}
@Override
@@ -695,8 +813,16 @@ public TimestampedValueInSingleWindow(
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
- CausedByDrain causedByDrain) {
- super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
+ super(
+ value,
+ timestamp,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ valueKind);
this.window = checkNotNull(window);
}
@@ -709,7 +835,8 @@ public WindowedValue withValue(NewT newValue) {
getPaneInfo(),
getRecordId(),
getRecordOffset(),
- causedByDrain());
+ causedByDrain(),
+ getValueKind());
}
@Override
@@ -767,8 +894,16 @@ public TimestampedValueInMultipleWindows(
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
- CausedByDrain causedByDrain) {
- super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
+ CausedByDrain causedByDrain,
+ ValueKind valueKind) {
+ super(
+ value,
+ timestamp,
+ paneInfo,
+ currentRecordId,
+ currentRecordOffset,
+ causedByDrain,
+ valueKind);
this.windows = checkNotNull(windows);
}
@@ -786,7 +921,8 @@ public WindowedValue withValue(NewT newValue) {
getPaneInfo(),
getRecordId(),
getRecordOffset(),
- causedByDrain());
+ causedByDrain(),
+ getValueKind());
}
@Override
@@ -860,10 +996,10 @@ public static ParamWindowedValueCoder getParamWindowedValueCoder(Coder
/** Abstract class for {@code WindowedValue} coder. */
public abstract static class WindowedValueCoder extends StructuredCoder> {
final Coder valueCoder;
- private static boolean metadataSupported = false;
+ private static volatile boolean metadataSupported = false;
- public static void setMetadataSupported() {
- metadataSupported = true;
+ public static void setMetadataSupported(boolean isSupported) {
+ metadataSupported = isSupported;
}
public static boolean isMetadataSupported() {
@@ -941,13 +1077,12 @@ public void encode(WindowedValue windowedElem, OutputStream outStream, Contex
if (metadataSupported) {
BeamFnApi.Elements.ElementMetadata.Builder builder =
BeamFnApi.Elements.ElementMetadata.newBuilder();
- BeamFnApi.Elements.ElementMetadata em =
- builder
- .setDrain(
- windowedElem.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
- ? BeamFnApi.Elements.DrainMode.Enum.DRAINING
- : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
- .build();
+ builder.setDrain(
+ windowedElem.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
+ ? BeamFnApi.Elements.DrainMode.Enum.DRAINING
+ : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
+ builder.setValueKind(ValueKindUtil.toProto(windowedElem.getValueKind()));
+ BeamFnApi.Elements.ElementMetadata em = builder.build();
ByteArrayCoder.of().encode(em.toByteArray(), outStream);
}
@@ -966,6 +1101,7 @@ public WindowedValue decode(InputStream inStream, Context context)
Collection extends BoundedWindow> windows = windowsCoder.decode(inStream);
PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream);
CausedByDrain causedByDrain = CausedByDrain.NORMAL;
+ ValueKind valueKind = ValueKind.INSERT;
if (isMetadataSupported() && paneInfo.isElementMetadata()) {
BeamFnApi.Elements.ElementMetadata elementMetadata =
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
@@ -973,13 +1109,16 @@ public WindowedValue decode(InputStream inStream, Context context)
elementMetadata.getDrain().equals(BeamFnApi.Elements.DrainMode.Enum.DRAINING)
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
+ if (elementMetadata.hasValueKind()) {
+ valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
+ }
}
T value = valueCoder.decode(inStream, context);
// Because there are some remaining (incorrect) uses of WindowedValue with no windows,
// we call this deprecated no-validation path when decoding
return WindowedValues.createWithoutValidation(
- value, timestamp, windows, paneInfo, causedByDrain);
+ value, timestamp, windows, paneInfo, causedByDrain, valueKind);
}
@Override
@@ -1109,7 +1248,18 @@ public static ParamWindowedValueCoder of(
Instant timestamp,
Collection extends BoundedWindow> windows,
PaneInfo paneInfo) {
- return new ParamWindowedValueCoder<>(valueCoder, windowCoder, timestamp, windows, paneInfo);
+ return of(valueCoder, windowCoder, timestamp, windows, paneInfo, ValueKind.INSERT);
+ }
+
+ public static ParamWindowedValueCoder of(
+ Coder valueCoder,
+ Coder extends BoundedWindow> windowCoder,
+ Instant timestamp,
+ Collection extends BoundedWindow> windows,
+ PaneInfo paneInfo,
+ ValueKind valueKind) {
+ return new ParamWindowedValueCoder<>(
+ valueCoder, windowCoder, timestamp, windows, paneInfo, valueKind);
}
/**
@@ -1124,7 +1274,8 @@ public static ParamWindowedValueCoder of(
windowCoder,
BoundedWindow.TIMESTAMP_MIN_VALUE,
GLOBAL_WINDOWS,
- PaneInfo.NO_FIRING);
+ PaneInfo.NO_FIRING,
+ ValueKind.INSERT);
}
/**
@@ -1142,15 +1293,30 @@ public static ParamWindowedValueCoder of(Coder valueCoder) {
Coder extends BoundedWindow> windowCoder,
Instant timestamp,
Collection extends BoundedWindow> windows,
- PaneInfo paneInfo) {
+ PaneInfo paneInfo,
+ ValueKind valueKind) {
super(valueCoder, windowCoder);
- this.windowedValuePrototype = WindowedValues.of(EMPTY_BYTES, timestamp, windows, paneInfo);
+ this.windowedValuePrototype =
+ WindowedValues.of(
+ EMPTY_BYTES,
+ timestamp,
+ windows,
+ paneInfo,
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ valueKind);
}
@Override
public WindowedValueCoder withValueCoder(Coder valueCoder) {
return new ParamWindowedValueCoder<>(
- valueCoder, getWindowCoder(), getTimestamp(), getWindows(), getPaneInfo());
+ valueCoder,
+ getWindowCoder(),
+ getTimestamp(),
+ getWindows(),
+ getPaneInfo(),
+ getValueKind());
}
@Override
@@ -1188,6 +1354,10 @@ public void registerByteSizeObserver(WindowedValue value, ElementByteSizeObse
valueCoder.registerByteSizeObserver(value.getValue(), observer);
}
+ public ValueKind getValueKind() {
+ return windowedValuePrototype.getValueKind();
+ }
+
public Instant getTimestamp() {
return windowedValuePrototype.getTimestamp();
}
@@ -1207,7 +1377,14 @@ public static byte[] getPayload(ParamWindowedValueCoder> from) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
WindowedValue windowedValue =
WindowedValues.of(
- EMPTY_BYTES, from.getTimestamp(), from.getWindows(), from.getPaneInfo());
+ EMPTY_BYTES,
+ from.getTimestamp(),
+ from.getWindows(),
+ from.getPaneInfo(),
+ null,
+ null,
+ CausedByDrain.NORMAL,
+ from.getValueKind());
WindowedValues.FullWindowedValueCoder windowedValueCoder =
WindowedValues.FullWindowedValueCoder.of(ByteArrayCoder.of(), from.getWindowCoder());
try {
@@ -1235,7 +1412,8 @@ public static WindowedValues.ParamWindowedValueCoder> fromComponents(
windowCoder,
windowedValue.getTimestamp(),
windowedValue.getWindows(),
- windowedValue.getPaneInfo());
+ windowedValue.getPaneInfo(),
+ windowedValue.getValueKind());
} catch (IOException e) {
throw new RuntimeException(
"Unable to decode constant members from payload for ParamWindowedValueCoder: ", e);
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java
index a2ff99905f6c..2c53fd990e1e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java
@@ -54,7 +54,7 @@ public void process(ProcessContext pc, OutputReceiver r) {
@Category(NeedsRunner.class)
@Ignore
public void testMetadataPropagationAcrossShuffleParameter() {
- WindowedValues.WindowedValueCoder.setMetadataSupported();
+ WindowedValues.WindowedValueCoder.setMetadataSupported(true);
PCollection results =
pipeline
.apply(Create.of(1))
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValueKindTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValueKindTest.java
new file mode 100644
index 000000000000..62d8f6626a64
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValueKindTest.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.io.Serializable;
+import java.util.Collections;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn.StateId;
+import org.apache.beam.sdk.transforms.DoFn.TimerId;
+import org.apache.beam.sdk.transforms.DoFn.Timestamp;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.ValueKind;
+import org.apache.beam.sdk.values.WindowedValues;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ValueKind} support in {@link DoFn}. */
+@RunWith(JUnit4.class)
+public class ValueKindTest implements Serializable {
+
+ static {
+ System.setProperty(
+ "beamTestPipelineOptions", "[\"--runner=org.apache.beam.runners.direct.DirectRunner\"]");
+ }
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testValueKindParameterInDoFn() {
+ PCollection input = pipeline.apply(Create.of("a", "b"));
+
+ PCollection output =
+ input.apply(
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, ValueKind kind) {
+ c.output(element + ":" + kind);
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder("a:INSERT", "b:INSERT");
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testOutputWithKind() {
+ PCollection input = pipeline.apply(Create.of("a", "b"));
+
+ PCollection output =
+ input
+ .apply(
+ "SetKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(@Element String element, ProcessContext c) {
+ if ("a".equals(element)) {
+ c.outputWithKind(element, ValueKind.UPDATE_BEFORE);
+ } else {
+ c.outputWithKind(element, ValueKind.UPDATE_AFTER);
+ }
+ }
+ }))
+ .apply(
+ "ReadKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, ValueKind kind) {
+ c.output(element + ":" + kind);
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE", "b:UPDATE_AFTER");
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDefaultValueKind() {
+ PCollection input = pipeline.apply(Create.of("a"));
+
+ PCollection output =
+ input
+ .apply(
+ "SetKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(@Element String element, ProcessContext c) {
+ c.outputWithKind(element, ValueKind.UPDATE_BEFORE);
+ }
+ }))
+ .apply(
+ "StandardOutput",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(@Element String element, ProcessContext c) {
+ c.output(element); // Should preserve input kind!
+ }
+ }))
+ .apply(
+ "ReadKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, ValueKind kind) {
+ c.output(element + ":" + kind);
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE");
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testValueKindPreservedInOutputWindowedValue_MainOutput() {
+ PCollection input = pipeline.apply(Create.of("a"));
+
+ PCollection output =
+ input
+ .apply(
+ "SetKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(@Element String element, ProcessContext c) {
+ c.outputWithKind(element, ValueKind.UPDATE_BEFORE);
+ }
+ }))
+ .apply(
+ "OutputWindowedValue",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, BoundedWindow window) {
+ // Should preserve input kind!
+ c.outputWindowedValue(
+ element, c.timestamp(), Collections.singleton(window), c.pane());
+ }
+ }))
+ .apply(
+ "ReadKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, ValueKind kind) {
+ c.output(element + ":" + kind);
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE");
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testValueKindPreservedInOutputWindowedValue_TaggedOutput() {
+ PCollection input = pipeline.apply(Create.of("a"));
+ TupleTag mainTag = new TupleTag() {};
+ TupleTag sideTag = new TupleTag() {};
+
+ PCollectionTuple outputTuple =
+ input
+ .apply(
+ "SetKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(@Element String element, ProcessContext c) {
+ c.outputWithKind(element, ValueKind.UPDATE_BEFORE);
+ }
+ }))
+ .apply(
+ "OutputWindowedValueTagged",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, BoundedWindow window) {
+ c.outputWindowedValue(
+ sideTag,
+ element,
+ c.timestamp(),
+ Collections.singleton(window),
+ c.pane());
+ }
+ })
+ .withOutputTags(mainTag, TupleTagList.of(sideTag)));
+
+ PCollection output =
+ outputTuple
+ .get(sideTag)
+ .apply(
+ "ReadKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, ValueKind kind) {
+ c.output(element + ":" + kind);
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE");
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testValueKindPreservedInOutputWindowedValue_Object() {
+ PCollection input = pipeline.apply(Create.of("a"));
+
+ PCollection output =
+ input
+ .apply(
+ "OutputWindowedValueObject",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, BoundedWindow window) {
+ c.outputWindowedValue(
+ WindowedValues.of(
+ element,
+ c.timestamp(),
+ Collections.singleton(window),
+ c.pane(),
+ null,
+ null,
+ org.apache.beam.sdk.values.CausedByDrain.NORMAL,
+ ValueKind.UPDATE_BEFORE));
+ }
+ }))
+ .apply(
+ "ReadKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, ValueKind kind) {
+ c.output(element + ":" + kind);
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE");
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testValueKindPreservedInOutputWindowedValue_TaggedObject() {
+ PCollection input = pipeline.apply(Create.of("a"));
+ TupleTag mainTag = new TupleTag() {};
+ TupleTag sideTag = new TupleTag() {};
+
+ PCollectionTuple outputTuple =
+ input.apply(
+ "OutputWindowedValueTaggedObject",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, BoundedWindow window) {
+ c.outputWindowedValue(
+ sideTag,
+ WindowedValues.of(
+ element,
+ c.timestamp(),
+ Collections.singleton(window),
+ c.pane(),
+ null,
+ null,
+ org.apache.beam.sdk.values.CausedByDrain.NORMAL,
+ ValueKind.UPDATE_BEFORE));
+ }
+ })
+ .withOutputTags(mainTag, TupleTagList.of(sideTag)));
+
+ PCollection output =
+ outputTuple
+ .get(sideTag)
+ .apply(
+ "ReadKind",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(
+ @Element String element, ProcessContext c, ValueKind kind) {
+ c.output(element + ":" + kind);
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE");
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testValueKindPreservedAcrossTags() {
+ PCollection input = pipeline.apply(Create.of("a"));
+ TupleTag mainTag = new TupleTag() {};
+ TupleTag sideTag = new TupleTag