diff --git a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto index 22b19ef03289..9ff30966a3e9 100644 --- a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto @@ -757,6 +757,26 @@ message Elements { } } + // The type of change operation represented by a Change Data Capture (CDC) record + message ValueKind { + enum Enum { + // Indicates a new record was created in the source system. + INSERT = 0; + + // Indicates the state of a record immediately before an update occurred. + // This is typically used to identify the previous values of modified columns + // or to locate the record via its primary key. + UPDATE_BEFORE = 1; + + // Indicates the state of a record immediately after an update occurred. + // Represents the current, valid state of the record following the change. + UPDATE_AFTER = 2; + + // Indicates that an existing record was removed from the source system. + DELETE = 3; + } + } + // Element metadata passed as part of WindowedValue to make WindowedValue // extensible and backward compatible message ElementMetadata { @@ -770,6 +790,9 @@ message Elements { // across IOs - Kafka, PubSub, http. // Example value: congo=t61rcWkgMzE optional string tracestate = 3; + // (Optional) The kind of value for CDC metadata. + // If missing or unspecified, implies INSERT for backwards compatibility. + optional ValueKind.Enum value_kind = 4; } // Represent the encoded user timer for a given instruction, transform and diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 07e4756885dd..aad35e835f80 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -202,6 +203,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return processContext.causedByDrain(); } + @Override + public ValueKind valueKind(DoFn doFn) { + return processContext.valueKind(); + } + @Override public RestrictionTracker restrictionTracker() { return processContext.tracker; @@ -407,6 +413,11 @@ public CausedByDrain causedByDrain() { return element.causedByDrain(); } + @Override + public ValueKind valueKind() { + return element.getValueKind(); + } + @Override public PipelineOptions getPipelineOptions() { return pipelineOptions; @@ -442,6 +453,7 @@ public void output(TupleTag tag, T value) { @Override public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) { + noteOutput(); outputReceiver.output( tag, WindowedValues.of( @@ -454,6 +466,31 @@ public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) element.causedByDrain())); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + + @Override + public void outputWithKind(TupleTag tag, T value, ValueKind kind) { + noteOutput(); + if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { + ((TimestampObservingWatermarkEstimator) watermarkEstimator) + .observeTimestamp(element.getTimestamp()); + } + outputReceiver.output( + tag, + WindowedValues.of( + value, + element.getTimestamp(), + element.getWindows(), + element.getPaneInfo(), + element.getRecordId(), + element.getRecordOffset(), + element.causedByDrain(), + kind)); + } + @Override public void outputWindowedValue( TupleTag tag, diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index d5f09522a3b0..d3961f0defb6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -61,6 +61,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; @@ -438,6 +439,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -471,6 +477,22 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo()); } + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + builderSupplier + .builder(output) + .setTimestamp(elem.getTimestamp()) + .setWindows(elem.getWindows()) + .setPaneInfo(elem.getPaneInfo()) + .setValueKind(kind) + .setReceiver( + wv -> { + checkTimestamp(elem.getTimestamp(), wv.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(tag, wv); + }) + .output(); + } + @Override public void outputWindowedValue( TupleTag tag, @@ -506,6 +528,11 @@ public Instant timestamp() { return elem.getRecordOffset(); } + @Override + public ValueKind valueKind() { + return elem.getValueKind(); + } + public Collection windows() { return elem.getWindows(); } @@ -581,6 +608,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return elem.causedByDrain(); } + @Override + public ValueKind valueKind(DoFn doFn) { + return elem.getValueKind(); + } + @Override public String timerId(DoFn doFn) { throw new UnsupportedOperationException( @@ -862,6 +894,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return causedByDrain; } + @Override + public ValueKind valueKind(DoFn doFn) { + throw new UnsupportedOperationException("ValueKind parameters are not supported."); + } + @Override public String timerId(DoFn doFn) { return timerId; @@ -1008,6 +1045,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -1030,6 +1072,19 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); } + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + checkTimestamp(timestamp(), timestamp); + builderSupplier + .builder(output) + .setTimestamp(timestamp) + .setWindows(Collections.singleton(window())) + .setPaneInfo(PaneInfo.NO_FIRING) + .setValueKind(kind) + .setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv)) + .output(); + } + @Override public void outputWindowedValue( TupleTag tag, @@ -1177,6 +1232,11 @@ public TimeDomain timeDomain(DoFn doFn) { "Cannot access time domain outside of @ProcessTimer method."); } + @Override + public ValueKind valueKind(DoFn doFn) { + throw new UnsupportedOperationException("ValueKind parameters are not supported."); + } + @Override public KeyT key() { return key; @@ -1279,6 +1339,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -1301,6 +1366,18 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); } + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + checkTimestamp(this.timestamp, timestamp); + builderSupplier + .builder(output) + .setTimestamp(timestamp) + .setWindows(Collections.singleton(window())) + .setPaneInfo(PaneInfo.NO_FIRING) + .setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv)) + .output(); + } + @Override public void outputWindowedValue( TupleTag tag, diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 6997b590e017..c65cc492f8c0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -176,6 +177,7 @@ public DirectPipelineResult run(Pipeline pipeline) { performRewrites(pipeline); MetricsEnvironment.setMetricsSupported(true); + WindowedValues.WindowedValueCoder.setMetadataSupported(true); try { DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); pipeline.traverseTopologically(graphVisitor); @@ -233,6 +235,7 @@ public DirectPipelineResult run(Pipeline pipeline) { return result; } finally { MetricsEnvironment.setMetricsSupported(false); + WindowedValues.WindowedValueCoder.setMetadataSupported(false); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index d4cd1af386e3..b1ed98f2d771 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -73,6 +73,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; @@ -1379,6 +1380,11 @@ public T getValue() { return value; } + @Override + public ValueKind getValueKind() { + return ValueKind.INSERT; + } + @Override public CausedByDrain causedByDrain() { return CausedByDrain.NORMAL; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java index 47ff5b764910..15a26530f3de 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java @@ -141,6 +141,7 @@ public void processElement( .setTimestamp(kv.getValue().getTimestamp()) .setWindow(kv.getValue().getWindow()) .setPaneInfo(kv.getValue().getPaneInfo()) + .setValueKind(kv.getValue().getValueKind()) .output(); } })); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 8c33123be6d5..f4f177a0b26a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -154,6 +154,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.resourcehints.ResourceHints; @@ -171,11 +172,17 @@ import org.apache.beam.sdk.util.construction.PipelineTranslation; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PValues; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueKind; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -191,6 +198,7 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -2761,4 +2769,132 @@ public void process() {} })); p.run(); } + + @Test + @Category({ValidatesRunner.class}) + public void testValueKindParameterAndOutputWithKind() { + boolean isRunnerV2 = false; + @Nullable + List experiments = + pipeline.getOptions().as(DataflowPipelineOptions.class).getExperiments(); + if (experiments != null + && (experiments.contains("use_unified_worker") || experiments.contains("use_runner_v2"))) { + isRunnerV2 = true; + } + // Skipp runner v2 because its Create uses a splittable DoFn, which contains a shuffle. + // ValueKind is not supported in Dataflow shuffle yet + assumeFalse(isRunnerV2); + + PCollection input = pipeline.apply(Create.of("a", "b", "c", "d", "e")); + TupleTag mainTag = new TupleTag() {}; + TupleTag sideTag = new TupleTag() {}; + + PCollectionTuple tuple = + input.apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, + @Timestamp org.joda.time.Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + ProcessContext c, + MultiOutputReceiver outputReceiver) { + switch (element) { + case "a": + c.output(element); // default: INSERT + return; + case "b": + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + return; + case "c": + c.outputWindowedValue( + WindowedValues.of( + element, + timestamp, + Collections.singleton(window), + paneInfo, + null, + null, + CausedByDrain.NORMAL, + ValueKind.UPDATE_AFTER)); + return; + case "d": + c.outputWithKind(sideTag, element, ValueKind.UPDATE_AFTER); + return; + case "e": + outputReceiver.get(sideTag).outputWithKind(element, ValueKind.DELETE); + } + } + }) + .withOutputTags(mainTag, TupleTagList.of(sideTag))); + + PCollection main = + tuple + .get(mainTag) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PCollection side = + tuple + .get(sideTag) + .apply( + "ReadKind-SideTag", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(main).containsInAnyOrder("a:INSERT", "b:UPDATE_BEFORE", "c:UPDATE_AFTER"); + PAssert.that(side).containsInAnyOrder("d:UPDATE_AFTER", "e:DELETE"); + pipeline.run(); + } + + @Test + @Ignore("enable once when element metadata is supported in shuffle") + @Category({ValidatesRunner.class}) + public void testValueKindPreservedAcrossShuffle() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "value"))); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn, KV>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply(Reshuffle.of()) + .apply( + "ReadKind", + ParDo.of( + new DoFn, String>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c, ValueKind kind) { + c.output(element.getValue() + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("value:UPDATE_BEFORE"); + pipeline.run(); + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java index 0afae3ef2da7..9c5b3186749d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.util.construction.CoderTranslation; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Duration; import org.slf4j.Logger; @@ -72,6 +73,7 @@ public static void main(String[] args) throws Exception { DataflowBatchWorkerHarness.class.getSimpleName()); CoderTranslation.verifyModelCodersRegistered(); + WindowedValues.FullWindowedValueCoder.setMetadataSupported(true); JvmInitializers.runBeforeProcessing(pipelineOptions); batchHarness.run(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f5e5adab1556..78afce19f06c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1024,7 +1024,7 @@ public static void main(String[] args) throws Exception { CoderTranslation.verifyModelCodersRegistered(); if (DataflowRunner.hasExperiment(options, ELEMENT_METADATA_SUPPORTED_EXPERIMENT)) { - WindowedValues.FullWindowedValueCoder.setMetadataSupported(); + WindowedValues.FullWindowedValueCoder.setMetadataSupported(true); } LOG.debug("Creating StreamingDataflowWorker from options: {}", options); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index 2347529cf4a5..7d090addb135 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -38,6 +38,8 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ValueKind; +import org.apache.beam.sdk.values.ValueKindUtil; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; @@ -139,6 +141,7 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce * drain happened upstream */ CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL; + ValueKind valueKind = ValueKind.INSERT; if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { BeamFnApi.Elements.ElementMetadata elementMetadata = WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); @@ -146,6 +149,9 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING ? CausedByDrain.CAUSED_BY_DRAIN : CausedByDrain.NORMAL; + if (elementMetadata.hasValueKind()) { + valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind()); + } } if (valueCoder instanceof KvCoder) { KvCoder kvCoder = (KvCoder) valueCoder; @@ -156,7 +162,14 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce T result = (T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data)); return WindowedValues.of( - result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream); + result, + timestampMillis, + windows, + paneInfo, + null, + null, + drainingValueFromUpstream, + valueKind); } else { notifyElementRead(data.available() + metadata.available()); return WindowedValues.of( @@ -166,7 +179,8 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce paneInfo, null, null, - drainingValueFromUpstream); + drainingValueFromUpstream, + valueKind); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index c328719bfb50..6ea4ac13274d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -42,6 +42,8 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.CausedByDrain; +import org.apache.beam.sdk.values.ValueKind; +import org.apache.beam.sdk.values.ValueKindUtil; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate; @@ -148,6 +150,7 @@ public Iterable timersIterable() { * drain happened upstream */ CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL; + ValueKind valueKind = ValueKind.INSERT; if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { BeamFnApi.Elements.ElementMetadata elementMetadata = WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); @@ -155,11 +158,12 @@ public Iterable timersIterable() { elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING ? CausedByDrain.CAUSED_BY_DRAIN : CausedByDrain.NORMAL; + valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind()); } InputStream inputStream = message.getData().newInput(); ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER); return WindowedValues.of( - value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream); + value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream, valueKind); } catch (RuntimeException | IOException e) { if (!skipUndecodableElements) { throw new RuntimeException(e); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index b07dc670e326..41d9052ce39f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ValueKindUtil; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder; import org.apache.beam.sdk.values.WindowedValue; @@ -220,7 +221,9 @@ public long add(WindowedValue data) throws IOException { ByteString id = ByteString.EMPTY; // todo #33176 specify additional metadata in the future BeamFnApi.Elements.ElementMetadata additionalMetadata = - BeamFnApi.Elements.ElementMetadata.newBuilder().build(); + BeamFnApi.Elements.ElementMetadata.newBuilder() + .setValueKind(ValueKindUtil.toProto(data.getValueKind())) + .build(); ByteString metadata = encodeMetadata( stream, windowsCoder, data.getWindows(), data.getPaneInfo(), additionalMetadata); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index 2f7a5ce54fbf..f0e3c0e15e9b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.CausedByDrain; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -65,6 +66,11 @@ public CausedByDrain causedByDrain() { return CausedByDrain.NORMAL; } + @Override + public ValueKind getValueKind() { + return ValueKind.INSERT; + } + @Override public Iterable> explodeWindows() { return Collections.emptyList(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 7136538753db..dac32937c0f3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -281,7 +281,7 @@ public void testFixedWindows() throws Exception { @Test public void testFixedWindowsWithDraining() throws Exception { - WindowedValues.WindowedValueCoder.setMetadataSupported(); + WindowedValues.WindowedValueCoder.setMetadataSupported(true); TupleTag>> outputTag = new TupleTag<>(); ListOutputManager outputManager = new ListOutputManager(); DoFnRunner, KV>> runner = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index c1568058435b..68af5ca63af9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -327,7 +327,7 @@ public void testCoderIsSerializableWithWellKnownCoderType() { @Test public void testDrainPropagated() throws Exception { - WindowedValues.FullWindowedValueCoder.setMetadataSupported(); + WindowedValues.FullWindowedValueCoder.setMetadataSupported(true); Windmill.WorkItem.Builder workItem = Windmill.WorkItem.newBuilder() .setKey(SERIALIZED_KEY) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java index 4bc01bd205a9..5bb9ba7036d0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; @@ -121,6 +122,11 @@ public CausedByDrain causedByDrain() { return CausedByDrain.NORMAL; } + @Override + public ValueKind getValueKind() { + return ValueKind.INSERT; + } + @Override public @Nullable Long getRecordOffset() { return null; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 0d892ab12d33..c01d29fbd051 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.checkerframework.checker.nullness.qual.Nullable; @@ -188,6 +189,25 @@ public abstract class WindowedContext { */ public abstract void outputWithTimestamp(OutputT output, Instant timestamp); + /** + * Adds the given element to the main output {@code PCollection}, with the given {@link + * ValueKind}. + * + *

Once passed to {@code outputWithKind} the element should not be modified in any way. + * + *

If invoked from {@link ProcessElement}, the output element will have the same windowing + * metadata as the input element. + * + *

If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to + * determine what windows the element should be in, throwing an exception if the {@code + * WindowFn} attempts to access any information about the input element. + * + *

Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle} + * or {@link FinishBundle} methods. + */ + public abstract void outputWithKind(OutputT output, ValueKind kind); + /** * Adds the given element to the main output {@code PCollection}, with the given windowing * metadata. @@ -289,6 +309,17 @@ public abstract void outputWindowedValue( public abstract void outputWindowedValue(TupleTag tag, WindowedValue windowedValue); public abstract void outputWindowedValue(WindowedValue windowedValue); + + /** + * Adds the given element to the main output {@code PCollection} with the given {@link + * ValueKind}. + * + *

Once passed to {@code outputWithKind} the element should not be modified in any way. + * + *

Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle} + * or {@link FinishBundle} methods. + */ + public abstract void outputWithKind(TupleTag tag, T output, ValueKind kind); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ @@ -338,6 +369,9 @@ public abstract class ProcessContext extends WindowedContext { @Pure public abstract org.apache.beam.sdk.values.CausedByDrain causedByDrain(); + + @Pure + public abstract ValueKind valueKind(); } /** Information accessible when running a {@link DoFn.OnTimer} method. */ @@ -419,6 +453,10 @@ default void outputWithTimestamp(T value, Instant timestamp) { builder(value).setTimestamp(timestamp).output(); } + default void outputWithKind(T value, ValueKind valueKind) { + builder(value).setValueKind(valueKind).output(); + } + default void outputWindowedValue( T value, Instant timestamp, @@ -426,6 +464,20 @@ default void outputWindowedValue( PaneInfo paneInfo) { builder(value).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output(); } + + default void outputWindowedValue( + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + builder(value) + .setTimestamp(timestamp) + .setWindows(windows) + .setPaneInfo(paneInfo) + .setValueKind(valueKind) + .output(); + } } /** Receives tagged output for a multi-output function. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 4de2c3d2c9c0..4f4e08074daf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; @@ -502,7 +503,8 @@ public void output(TupleTag tag, T output, Instant timestamp, BoundedWind PaneInfo.NO_FIRING, null, null, - CausedByDrain.NORMAL)); + CausedByDrain.NORMAL, + ValueKind.INSERT)); } }; } @@ -606,6 +608,11 @@ public CausedByDrain causedByDrain() { return element.getCausedByDrain(); } + @Override + public ValueKind valueKind() { + return element.getValueKind(); + } + @Override public PipelineOptions getPipelineOptions() { return options; @@ -621,6 +628,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outputWithKind(mainOutputTag, output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -646,7 +658,23 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp element.getPaneInfo(), null, null, - CausedByDrain.NORMAL)); + CausedByDrain.NORMAL, + ValueKind.INSERT)); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + getMutableOutput(tag) + .add( + ValueInSingleWindow.of( + output, + element.getTimestamp(), + element.getWindow(), + element.getPaneInfo(), + null, + null, + CausedByDrain.NORMAL, + kind)); } @Override @@ -666,7 +694,8 @@ public void outputWindowedValue(TupleTag tag, WindowedValue windowedVa windowedValue.getPaneInfo(), windowedValue.getRecordId(), windowedValue.getRecordOffset(), - windowedValue.causedByDrain())); + windowedValue.causedByDrain(), + windowedValue.getValueKind())); } } @@ -681,7 +710,14 @@ public void outputWindowedValue( getMutableOutput(tag) .add( ValueInSingleWindow.of( - output, timestamp, w, paneInfo, null, null, CausedByDrain.NORMAL)); + output, + timestamp, + w, + paneInfo, + null, + null, + CausedByDrain.NORMAL, + ValueKind.INSERT)); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java index 44f27824382d..5463365e4c6e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java @@ -188,6 +188,7 @@ public void processElement( .setWindow(kv.getValue().getWindow()) .setPaneInfo(kv.getValue().getPaneInfo()) .setCausedByDrain(kv.getValue().getCausedByDrain()) + .setValueKind(kv.getValue().getValueKind()) .output(); } })); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java index 9974d29bfa0f..83fbb04ee0fe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.sdk.values.ValueKind; import org.joda.time.Duration; import org.joda.time.Instant; @@ -148,6 +149,7 @@ public void processElement( BoundedWindow window, PaneInfo paneInfo, CausedByDrain causedByDrain, + ValueKind valueKind, OutputReceiver>> r) { r.output( KV.of( @@ -159,7 +161,8 @@ public void processElement( paneInfo, pc.currentRecordId(), pc.currentRecordOffset(), - causedByDrain))); + causedByDrain, + valueKind))); } })) .setCoder( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 0a8d058107b8..594bdfc85039 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -190,6 +190,7 @@ public void processElement( .setTimestamp(kv.getValue().getTimestamp()) .setWindow(kv.getValue().getWindow()) .setPaneInfo(kv.getValue().getPaneInfo()) + .setValueKind(kv.getValue().getValueKind()) .output(); } })); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 54d630d92fe4..8cc2d2e1b903 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -94,6 +94,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerFamilyParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ValueKindParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; @@ -128,6 +129,7 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory { public static final String SCHEMA_ELEMENT_PARAMETER_METHOD = "schemaElement"; public static final String TIMESTAMP_PARAMETER_METHOD = "timestamp"; public static final String CAUSED_BY_DRAIN_PARAMETER_METHOD = "causedByDrain"; + public static final String VALUE_KIND_PARAMETER_METHOD = "valueKind"; public static final String BUNDLE_FINALIZER_PARAMETER_METHOD = "bundleFinalizer"; public static final String OUTPUT_ROW_RECEIVER_METHOD = "outputRowReceiver"; public static final String TIME_DOMAIN_PARAMETER_METHOD = "timeDomain"; @@ -1111,6 +1113,15 @@ public StackManipulation dispatch(CausedByDrainParameter p) { CAUSED_BY_DRAIN_PARAMETER_METHOD, DoFn.class))); } + @Override + public StackManipulation dispatch(ValueKindParameter p) { + return new StackManipulation.Compound( + pushDelegate, + MethodInvocation.invoke( + getExtraContextFactoryMethodDescription( + VALUE_KIND_PARAMETER_METHOD, DoFn.class))); + } + @Override public StackManipulation dispatch(BundleFinalizerParameter p) { return simpleExtraContextParameter(BUNDLE_FINALIZER_PARAMETER_METHOD); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index a615761292aa..940df5356d54 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -221,6 +222,9 @@ interface ArgumentProvider { /** Provide a reference to the caused by drain. */ CausedByDrain causedByDrain(DoFn doFn); + /** Provide a reference to the {@link ValueKind}. */ + ValueKind valueKind(DoFn doFn); + /** Provide a reference to the time domain for a timer firing. */ TimeDomain timeDomain(DoFn doFn); @@ -335,6 +339,12 @@ public CausedByDrain causedByDrain(DoFn doFn) { String.format("CausedByDrain unsupported in %s", getErrorContext())); } + @Override + public ValueKind valueKind(DoFn doFn) { + throw new UnsupportedOperationException( + String.format("ValueKind unsupported in %s", getErrorContext())); + } + @Override public String timerId(DoFn doFn) { throw new UnsupportedOperationException( @@ -529,6 +539,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return delegate.causedByDrain(doFn); } + @Override + public ValueKind valueKind(DoFn doFn) { + return delegate.valueKind(doFn); + } + @Override public TimeDomain timeDomain(DoFn doFn) { return delegate.timeDomain(doFn); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 83c2a67655bd..596179e4b623 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -345,6 +345,8 @@ public ResultT match(Cases cases) { return cases.dispatch((BundleFinalizerParameter) this); } else if (this instanceof CausedByDrainParameter) { return cases.dispatch((CausedByDrainParameter) this); + } else if (this instanceof ValueKindParameter) { + return cases.dispatch((ValueKindParameter) this); } else if (this instanceof KeyParameter) { return cases.dispatch((KeyParameter) this); } else { @@ -405,6 +407,8 @@ public interface Cases { ResultT dispatch(CausedByDrainParameter p); + ResultT dispatch(ValueKindParameter p); + ResultT dispatch(KeyParameter p); /** A base class for a visitor with a default method for cases it is not interested in. */ @@ -507,6 +511,11 @@ public ResultT dispatch(CausedByDrainParameter p) { return dispatchDefault(p); } + @Override + public ResultT dispatch(ValueKindParameter p) { + return dispatchDefault(p); + } + @Override public ResultT dispatch(StateParameter p) { return dispatchDefault(p); @@ -564,6 +573,8 @@ public ResultT dispatch(KeyParameter p) { new AutoValue_DoFnSignature_Parameter_BundleFinalizerParameter(); private static final CausedByDrainParameter CAUSED_BY_DRAIN_PARAMETER = new AutoValue_DoFnSignature_Parameter_CausedByDrainParameter(); + private static final ValueKindParameter VALUE_KIND_PARAMETER = + new AutoValue_DoFnSignature_Parameter_ValueKindParameter(); private static final OnWindowExpirationContextParameter ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER = new AutoValue_DoFnSignature_Parameter_OnWindowExpirationContextParameter(); @@ -592,6 +603,11 @@ public static CausedByDrainParameter causedByDrainParameter() { return CAUSED_BY_DRAIN_PARAMETER; } + /** Returns a {@link ValueKindParameter}. */ + public static ValueKindParameter valueKindParameter() { + return VALUE_KIND_PARAMETER; + } + public static ElementParameter elementParameter(TypeDescriptor elementT) { return new AutoValue_DoFnSignature_Parameter_ElementParameter(elementT); } @@ -754,6 +770,16 @@ public abstract static class CausedByDrainParameter extends Parameter { CausedByDrainParameter() {} } + /** + * Descriptor for a {@link Parameter} of type {@link org.apache.beam.sdk.values.ValueKind}. + * + *

All such descriptors are equal. + */ + @AutoValue + public abstract static class ValueKindParameter extends Parameter { + ValueKindParameter() {} + } + /** * Descriptor for a {@link Parameter} of type {@link DoFn.Element}. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index 3dcf7ff1f9d0..b465d64e954a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -98,6 +98,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.TypeParameter; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -141,6 +142,7 @@ private DoFnSignatures() {} Parameter.SideInputParameter.class, Parameter.TimerFamilyParameter.class, Parameter.CausedByDrainParameter.class, + Parameter.ValueKindParameter.class, Parameter.BundleFinalizerParameter.class); private static final ImmutableList> @@ -158,6 +160,7 @@ private DoFnSignatures() {} Parameter.WatermarkEstimatorParameter.class, Parameter.SideInputParameter.class, Parameter.CausedByDrainParameter.class, + Parameter.ValueKindParameter.class, Parameter.BundleFinalizerParameter.class); private static final ImmutableList> ALLOWED_SETUP_PARAMETERS = @@ -1367,6 +1370,11 @@ private static Parameter analyzeExtraParameter( rawType.equals(CausedByDrain.class), "CausedByDrain argument must have type org.apache.beam.sdk.values.CausedByDrain."); return Parameter.causedByDrainParameter(); + } else if (ValueKind.class.isAssignableFrom(rawType)) { + methodErrors.checkArgument( + rawType.equals(ValueKind.class), + "ValueKind argument must have type org.apache.beam.sdk.values.ValueKind."); + return Parameter.valueKindParameter(); } else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) { String sideInputId = getSideInputId(param.getAnnotations()); paramErrors.checkArgument( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index 5adf3d32a1a9..dbfd298d3447 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; @@ -520,6 +521,11 @@ public CausedByDrain causedByDrain() { return outerContext.causedByDrain(); } + @Override + public ValueKind valueKind() { + return outerContext.valueKind(); + } + @Override public Object sideInput(String tagId) { PCollectionView view = sideInputMapping.get(tagId); @@ -549,6 +555,11 @@ public CausedByDrain causedByDrain(DoFn doFn) { return outerContext.causedByDrain(); } + @Override + public ValueKind valueKind(DoFn doFn) { + return outerContext.valueKind(); + } + @Override public String timerId(DoFn doFn) { throw new UnsupportedOperationException(); @@ -619,6 +630,11 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outerContext.outputWithTimestamp(output, timestamp); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + outerContext.outputWithKind(output, kind); + } + @Override public void outputWindowedValue( OutputT output, @@ -638,6 +654,11 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp outerContext.outputWithTimestamp(tag, output, timestamp); } + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + outerContext.outputWithKind(tag, output, kind); + } + @Override public void outputWindowedValue( TupleTag tag, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java index 67473e567d63..27022461d361 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java @@ -50,5 +50,7 @@ public interface OutputBuilder extends WindowedValue { OutputBuilder setCausedByDrain(CausedByDrain causedByDrain); + OutputBuilder setValueKind(ValueKind valueKind); + void output(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index 0d8b2f7515e8..8d9551374616 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -68,6 +68,8 @@ public T getValue() { public abstract CausedByDrain getCausedByDrain(); + public abstract ValueKind getValueKind(); + // todo #33176 specify additional metadata in the future public static ValueInSingleWindow of( T value, @@ -76,14 +78,23 @@ public static ValueInSingleWindow of( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - CausedByDrain causedByDrain) { + CausedByDrain causedByDrain, + ValueKind valueKind) { return new AutoValue_ValueInSingleWindow<>( - value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + value, + timestamp, + window, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + valueKind); } public static ValueInSingleWindow of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { - return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL); + return of( + value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT); } /** A coder for {@link ValueInSingleWindow}. */ @@ -127,6 +138,7 @@ public void encode(ValueInSingleWindow windowedElem, OutputStream outStream, windowedElem.getCausedByDrain() == CausedByDrain.CAUSED_BY_DRAIN ? BeamFnApi.Elements.DrainMode.Enum.DRAINING : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING); + builder.setValueKind(ValueKindUtil.toProto(windowedElem.getValueKind())); BeamFnApi.Elements.ElementMetadata metadata = builder.build(); ByteArrayCoder.of().encode(metadata.toByteArray(), outStream); } @@ -146,6 +158,7 @@ public ValueInSingleWindow decode(InputStream inStream, Context context) thro BoundedWindow window = windowCoder.decode(inStream); PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream); CausedByDrain causedByDrain = CausedByDrain.NORMAL; + ValueKind valueKind = ValueKind.INSERT; if (WindowedValues.WindowedValueCoder.isMetadataSupported() && paneInfo.isElementMetadata()) { BeamFnApi.Elements.ElementMetadata elementMetadata = BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream)); @@ -153,12 +166,15 @@ public ValueInSingleWindow decode(InputStream inStream, Context context) thro elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING ? CausedByDrain.CAUSED_BY_DRAIN : CausedByDrain.NORMAL; + if (elementMetadata.hasValueKind()) { + valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind()); + } } T value = valueCoder.decode(inStream, context); // todo #33176 specify additional metadata in the future return new AutoValue_ValueInSingleWindow<>( - value, timestamp, window, paneInfo, null, null, causedByDrain); + value, timestamp, window, paneInfo, null, null, causedByDrain, valueKind); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java new file mode 100644 index 000000000000..7a190496ca2c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKind.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +/** The type of change operation represented by a Change Data Capture (CDC) record. */ +public enum ValueKind { + /** Indicates a new record was created in the source system. */ + INSERT, + + /** + * Indicates the state of a record immediately before an update occurred. This is typically + * used to identify the previous values of modified columns or to locate the record via its + * primary key. + */ + UPDATE_BEFORE, + + /** + * Indicates the state of a record immediately after an update occurred. Represents the + * current, valid state of the record following the change. + */ + UPDATE_AFTER, + + /** Indicates that an existing record was removed from the source system. */ + DELETE +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java new file mode 100644 index 000000000000..e3b65c65fc62 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueKindUtil.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; + +/** Utility class for converting between {@link ValueKind} and {@link Elements.ValueKind.Enum}. */ +public class ValueKindUtil { + public static Elements.ValueKind.Enum toProto(ValueKind valueKind) { + switch (valueKind) { + case INSERT: + return Elements.ValueKind.Enum.INSERT; + case UPDATE_BEFORE: + return Elements.ValueKind.Enum.UPDATE_BEFORE; + case UPDATE_AFTER: + return Elements.ValueKind.Enum.UPDATE_AFTER; + case DELETE: + return Elements.ValueKind.Enum.DELETE; + default: + throw new IllegalArgumentException("Unknown ValueKind: " + valueKind); + } + } + + public static ValueKind fromProto(Elements.ValueKind.Enum proto) { + switch (proto) { + case INSERT: + return ValueKind.INSERT; + case UPDATE_BEFORE: + return ValueKind.UPDATE_BEFORE; + case UPDATE_AFTER: + return ValueKind.UPDATE_AFTER; + case DELETE: + return ValueKind.DELETE; + default: + throw new IllegalArgumentException("Unknown ValueKind: " + proto); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java index daebeb31a39c..d75926aa95c8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java @@ -54,6 +54,10 @@ public interface WindowedValue { CausedByDrain causedByDrain(); + /** Returns the {@link ValueKind} associated with this WindowedValue. */ + @Pure + ValueKind getValueKind(); + /** * A representation of each of the actual values represented by this compressed {@link * WindowedValue}, one per window. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index ba2720f5e39b..1524858f3b80 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -81,7 +81,8 @@ public static Builder builder(WindowedValue template) { .setPaneInfo(template.getPaneInfo()) .setRecordOffset(template.getRecordOffset()) .setRecordId(template.getRecordId()) - .setCausedByDrain(template.causedByDrain()); + .setCausedByDrain(template.causedByDrain()) + .setValueKind(template.getValueKind()); } public static class Builder implements OutputBuilder { @@ -104,6 +105,7 @@ public static class Builder implements OutputBuilder { private @Nullable String recordId; private @Nullable Long recordOffset; private CausedByDrain causedByDrain = CausedByDrain.NORMAL; + private ValueKind valueKind = ValueKind.INSERT; @Override public Builder setValue(T value) { @@ -154,6 +156,13 @@ public Builder setCausedByDrain(CausedByDrain causedByDrain) { return this; } + @Override + public Builder setValueKind(ValueKind valueKind) { + checkStateNotNull(valueKind, "ValueKind is null"); + this.valueKind = valueKind; + return this; + } + public Builder setReceiver(WindowedValueReceiver receiver) { this.receiver = receiver; return this; @@ -208,6 +217,12 @@ public CausedByDrain causedByDrain() { return causedByDrain; } + @Override + public ValueKind getValueKind() { + checkStateNotNull(valueKind, "ValueKind not set"); + return valueKind; + } + @Override public Collection> explodeWindows() { throw new UnsupportedOperationException( @@ -243,7 +258,8 @@ public WindowedValue build() { getPaneInfo(), getRecordId(), getRecordOffset(), - causedByDrain()); + causedByDrain(), + getValueKind()); } @Override @@ -261,10 +277,10 @@ public String toString() { public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { - return of(value, timestamp, windows, paneInfo, null, null, CausedByDrain.NORMAL); + return of( + value, timestamp, windows, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT); } - /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ public static WindowedValue of( T value, Instant timestamp, @@ -273,9 +289,31 @@ public static WindowedValue of( @Nullable String currentRecordId, @Nullable Long currentRecordOffset, CausedByDrain causedByDrain) { + return of( + value, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + ValueKind.INSERT); + } + + /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ + public static WindowedValue of( + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset, + CausedByDrain causedByDrain, + ValueKind valueKind) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null"); + checkArgument(valueKind != null, "WindowedValue requires ValueKind, but it was null"); if (windows.size() == 1) { return of( value, @@ -284,10 +322,18 @@ public static WindowedValue of( paneInfo, currentRecordId, currentRecordOffset, - causedByDrain); + causedByDrain, + valueKind); } else { return new TimestampedValueInMultipleWindows<>( - value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + value, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + valueKind); } } @@ -298,12 +344,21 @@ static WindowedValue createWithoutValidation( Instant timestamp, Collection windows, PaneInfo paneInfo, - CausedByDrain causedByDrain) { + CausedByDrain causedByDrain, + ValueKind valueKind) { if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, null, null, causedByDrain); + return of( + value, + timestamp, + windows.iterator().next(), + paneInfo, + null, + null, + causedByDrain, + valueKind); } else { return new TimestampedValueInMultipleWindows<>( - value, timestamp, windows, paneInfo, null, null, causedByDrain); + value, timestamp, windows, paneInfo, null, null, causedByDrain, valueKind); } } @@ -312,10 +367,10 @@ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); - return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL); + return of( + value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT); } - /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ public static WindowedValue of( T value, Instant timestamp, @@ -324,19 +379,54 @@ public static WindowedValue of( @Nullable String currentRecordId, @Nullable Long currentRecordOffset, CausedByDrain causedByDrain) { + return of( + value, + timestamp, + window, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + ValueKind.INSERT); + } + + /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ + public static WindowedValue of( + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset, + CausedByDrain causedByDrain, + ValueKind valueKind) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null"); + checkArgument(valueKind != null, "WindowedValue requires ValueKind, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { return new ValueInGlobalWindow<>( - value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain, valueKind); } else if (isGlobal) { return new TimestampedValueInGlobalWindow<>( - value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + value, + timestamp, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + valueKind); } else { return new TimestampedValueInSingleWindow<>( - value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + value, + timestamp, + window, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + valueKind); } } @@ -345,7 +435,8 @@ public static WindowedValue of( * default timestamp and pane. */ public static WindowedValue valueInGlobalWindow(T value) { - return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL); + return new ValueInGlobalWindow<>( + value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL, ValueKind.INSERT); } /** @@ -353,7 +444,8 @@ public static WindowedValue valueInGlobalWindow(T value) { * default timestamp and the specified pane. */ public static WindowedValue valueInGlobalWindow(T value, PaneInfo paneInfo) { - return new ValueInGlobalWindow<>(value, paneInfo, null, null, CausedByDrain.NORMAL); + return new ValueInGlobalWindow<>( + value, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT); } /** @@ -365,7 +457,7 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta return valueInGlobalWindow(value); } else { return new TimestampedValueInGlobalWindow<>( - value, timestamp, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL); + value, timestamp, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL, ValueKind.INSERT); } } @@ -379,7 +471,7 @@ public static WindowedValue timestampedValueInGlobalWindow( return timestampedValueInGlobalWindow(value, timestamp); } else { return new TimestampedValueInGlobalWindow<>( - value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL); + value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL, ValueKind.INSERT); } } @@ -396,7 +488,8 @@ public static WindowedValue withValue( windowedValue.getPaneInfo(), windowedValue.getRecordId(), windowedValue.getRecordOffset(), - windowedValue.causedByDrain()); + windowedValue.causedByDrain(), + windowedValue.getValueKind()); } public static boolean equals( @@ -448,6 +541,7 @@ private abstract static class SimpleWindowedValue implements WindowedValue private final @Nullable String currentRecordId; private final @Nullable Long currentRecordOffset; private final CausedByDrain causedByDrain; + private final ValueKind valueKind; @Override public @Nullable String getRecordId() { @@ -464,17 +558,24 @@ public CausedByDrain causedByDrain() { return causedByDrain; } + @Override + public ValueKind getValueKind() { + return valueKind; + } + protected SimpleWindowedValue( T value, PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - CausedByDrain causedByDrain) { + CausedByDrain causedByDrain, + ValueKind valueKind) { this.value = value; this.paneInfo = checkNotNull(paneInfo); this.currentRecordId = currentRecordId; this.currentRecordOffset = currentRecordOffset; this.causedByDrain = causedByDrain; + this.valueKind = checkNotNull(valueKind, "ValueKind is null"); } @Override @@ -523,8 +624,9 @@ public MinTimestampWindowedValue( PaneInfo pane, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - CausedByDrain causedByDrain) { - super(value, pane, currentRecordId, currentRecordOffset, causedByDrain); + CausedByDrain causedByDrain, + ValueKind valueKind) { + super(value, pane, currentRecordId, currentRecordOffset, causedByDrain, valueKind); } @Override @@ -542,8 +644,9 @@ public ValueInGlobalWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - CausedByDrain causedByDrain) { - super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + CausedByDrain causedByDrain, + ValueKind valueKind) { + super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain, valueKind); } @Override @@ -559,7 +662,12 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { return new ValueInGlobalWindow<>( - newValue, getPaneInfo(), getRecordId(), getRecordOffset(), causedByDrain()); + newValue, + getPaneInfo(), + getRecordId(), + getRecordOffset(), + causedByDrain(), + getValueKind()); } @Override @@ -598,8 +706,9 @@ public TimestampedWindowedValue( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - CausedByDrain causedByDrain) { - super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + CausedByDrain causedByDrain, + ValueKind valueKind) { + super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain, valueKind); this.timestamp = checkNotNull(timestamp); } @@ -622,8 +731,16 @@ public TimestampedValueInGlobalWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - CausedByDrain causedByDrain) { - super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + CausedByDrain causedByDrain, + ValueKind valueKind) { + super( + value, + timestamp, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + valueKind); } @Override @@ -644,7 +761,8 @@ public WindowedValue withValue(NewT newValue) { getPaneInfo(), getRecordId(), getRecordOffset(), - causedByDrain()); + causedByDrain(), + getValueKind()); } @Override @@ -695,8 +813,16 @@ public TimestampedValueInSingleWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - CausedByDrain causedByDrain) { - super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + CausedByDrain causedByDrain, + ValueKind valueKind) { + super( + value, + timestamp, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + valueKind); this.window = checkNotNull(window); } @@ -709,7 +835,8 @@ public WindowedValue withValue(NewT newValue) { getPaneInfo(), getRecordId(), getRecordOffset(), - causedByDrain()); + causedByDrain(), + getValueKind()); } @Override @@ -767,8 +894,16 @@ public TimestampedValueInMultipleWindows( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - CausedByDrain causedByDrain) { - super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + CausedByDrain causedByDrain, + ValueKind valueKind) { + super( + value, + timestamp, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + valueKind); this.windows = checkNotNull(windows); } @@ -786,7 +921,8 @@ public WindowedValue withValue(NewT newValue) { getPaneInfo(), getRecordId(), getRecordOffset(), - causedByDrain()); + causedByDrain(), + getValueKind()); } @Override @@ -860,10 +996,10 @@ public static ParamWindowedValueCoder getParamWindowedValueCoder(Coder /** Abstract class for {@code WindowedValue} coder. */ public abstract static class WindowedValueCoder extends StructuredCoder> { final Coder valueCoder; - private static boolean metadataSupported = false; + private static volatile boolean metadataSupported = false; - public static void setMetadataSupported() { - metadataSupported = true; + public static void setMetadataSupported(boolean isSupported) { + metadataSupported = isSupported; } public static boolean isMetadataSupported() { @@ -941,13 +1077,12 @@ public void encode(WindowedValue windowedElem, OutputStream outStream, Contex if (metadataSupported) { BeamFnApi.Elements.ElementMetadata.Builder builder = BeamFnApi.Elements.ElementMetadata.newBuilder(); - BeamFnApi.Elements.ElementMetadata em = - builder - .setDrain( - windowedElem.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN - ? BeamFnApi.Elements.DrainMode.Enum.DRAINING - : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING) - .build(); + builder.setDrain( + windowedElem.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN + ? BeamFnApi.Elements.DrainMode.Enum.DRAINING + : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING); + builder.setValueKind(ValueKindUtil.toProto(windowedElem.getValueKind())); + BeamFnApi.Elements.ElementMetadata em = builder.build(); ByteArrayCoder.of().encode(em.toByteArray(), outStream); } @@ -966,6 +1101,7 @@ public WindowedValue decode(InputStream inStream, Context context) Collection windows = windowsCoder.decode(inStream); PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream); CausedByDrain causedByDrain = CausedByDrain.NORMAL; + ValueKind valueKind = ValueKind.INSERT; if (isMetadataSupported() && paneInfo.isElementMetadata()) { BeamFnApi.Elements.ElementMetadata elementMetadata = BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream)); @@ -973,13 +1109,16 @@ public WindowedValue decode(InputStream inStream, Context context) elementMetadata.getDrain().equals(BeamFnApi.Elements.DrainMode.Enum.DRAINING) ? CausedByDrain.CAUSED_BY_DRAIN : CausedByDrain.NORMAL; + if (elementMetadata.hasValueKind()) { + valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind()); + } } T value = valueCoder.decode(inStream, context); // Because there are some remaining (incorrect) uses of WindowedValue with no windows, // we call this deprecated no-validation path when decoding return WindowedValues.createWithoutValidation( - value, timestamp, windows, paneInfo, causedByDrain); + value, timestamp, windows, paneInfo, causedByDrain, valueKind); } @Override @@ -1109,7 +1248,18 @@ public static ParamWindowedValueCoder of( Instant timestamp, Collection windows, PaneInfo paneInfo) { - return new ParamWindowedValueCoder<>(valueCoder, windowCoder, timestamp, windows, paneInfo); + return of(valueCoder, windowCoder, timestamp, windows, paneInfo, ValueKind.INSERT); + } + + public static ParamWindowedValueCoder of( + Coder valueCoder, + Coder windowCoder, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + return new ParamWindowedValueCoder<>( + valueCoder, windowCoder, timestamp, windows, paneInfo, valueKind); } /** @@ -1124,7 +1274,8 @@ public static ParamWindowedValueCoder of( windowCoder, BoundedWindow.TIMESTAMP_MIN_VALUE, GLOBAL_WINDOWS, - PaneInfo.NO_FIRING); + PaneInfo.NO_FIRING, + ValueKind.INSERT); } /** @@ -1142,15 +1293,30 @@ public static ParamWindowedValueCoder of(Coder valueCoder) { Coder windowCoder, Instant timestamp, Collection windows, - PaneInfo paneInfo) { + PaneInfo paneInfo, + ValueKind valueKind) { super(valueCoder, windowCoder); - this.windowedValuePrototype = WindowedValues.of(EMPTY_BYTES, timestamp, windows, paneInfo); + this.windowedValuePrototype = + WindowedValues.of( + EMPTY_BYTES, + timestamp, + windows, + paneInfo, + null, + null, + CausedByDrain.NORMAL, + valueKind); } @Override public WindowedValueCoder withValueCoder(Coder valueCoder) { return new ParamWindowedValueCoder<>( - valueCoder, getWindowCoder(), getTimestamp(), getWindows(), getPaneInfo()); + valueCoder, + getWindowCoder(), + getTimestamp(), + getWindows(), + getPaneInfo(), + getValueKind()); } @Override @@ -1188,6 +1354,10 @@ public void registerByteSizeObserver(WindowedValue value, ElementByteSizeObse valueCoder.registerByteSizeObserver(value.getValue(), observer); } + public ValueKind getValueKind() { + return windowedValuePrototype.getValueKind(); + } + public Instant getTimestamp() { return windowedValuePrototype.getTimestamp(); } @@ -1207,7 +1377,14 @@ public static byte[] getPayload(ParamWindowedValueCoder from) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); WindowedValue windowedValue = WindowedValues.of( - EMPTY_BYTES, from.getTimestamp(), from.getWindows(), from.getPaneInfo()); + EMPTY_BYTES, + from.getTimestamp(), + from.getWindows(), + from.getPaneInfo(), + null, + null, + CausedByDrain.NORMAL, + from.getValueKind()); WindowedValues.FullWindowedValueCoder windowedValueCoder = WindowedValues.FullWindowedValueCoder.of(ByteArrayCoder.of(), from.getWindowCoder()); try { @@ -1235,7 +1412,8 @@ public static WindowedValues.ParamWindowedValueCoder fromComponents( windowCoder, windowedValue.getTimestamp(), windowedValue.getWindows(), - windowedValue.getPaneInfo()); + windowedValue.getPaneInfo(), + windowedValue.getValueKind()); } catch (IOException e) { throw new RuntimeException( "Unable to decode constant members from payload for ParamWindowedValueCoder: ", e); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java index a2ff99905f6c..2c53fd990e1e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java @@ -54,7 +54,7 @@ public void process(ProcessContext pc, OutputReceiver r) { @Category(NeedsRunner.class) @Ignore public void testMetadataPropagationAcrossShuffleParameter() { - WindowedValues.WindowedValueCoder.setMetadataSupported(); + WindowedValues.WindowedValueCoder.setMetadataSupported(true); PCollection results = pipeline .apply(Create.of(1)) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValueKindTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValueKindTest.java new file mode 100644 index 000000000000..62d8f6626a64 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValueKindTest.java @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; +import java.util.Collections; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn.StateId; +import org.apache.beam.sdk.transforms.DoFn.TimerId; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.sdk.values.ValueKind; +import org.apache.beam.sdk.values.WindowedValues; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ValueKind} support in {@link DoFn}. */ +@RunWith(JUnit4.class) +public class ValueKindTest implements Serializable { + + static { + System.setProperty( + "beamTestPipelineOptions", "[\"--runner=org.apache.beam.runners.direct.DirectRunner\"]"); + } + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void testValueKindParameterInDoFn() { + PCollection input = pipeline.apply(Create.of("a", "b")); + + PCollection output = + input.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:INSERT", "b:INSERT"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testOutputWithKind() { + PCollection input = pipeline.apply(Create.of("a", "b")); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + if ("a".equals(element)) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } else { + c.outputWithKind(element, ValueKind.UPDATE_AFTER); + } + } + })) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE", "b:UPDATE_AFTER"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testDefaultValueKind() { + PCollection input = pipeline.apply(Create.of("a")); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply( + "StandardOutput", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.output(element); // Should preserve input kind! + } + })) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInOutputWindowedValue_MainOutput() { + PCollection input = pipeline.apply(Create.of("a")); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply( + "OutputWindowedValue", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, BoundedWindow window) { + // Should preserve input kind! + c.outputWindowedValue( + element, c.timestamp(), Collections.singleton(window), c.pane()); + } + })) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInOutputWindowedValue_TaggedOutput() { + PCollection input = pipeline.apply(Create.of("a")); + TupleTag mainTag = new TupleTag() {}; + TupleTag sideTag = new TupleTag() {}; + + PCollectionTuple outputTuple = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply( + "OutputWindowedValueTagged", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, BoundedWindow window) { + c.outputWindowedValue( + sideTag, + element, + c.timestamp(), + Collections.singleton(window), + c.pane()); + } + }) + .withOutputTags(mainTag, TupleTagList.of(sideTag))); + + PCollection output = + outputTuple + .get(sideTag) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInOutputWindowedValue_Object() { + PCollection input = pipeline.apply(Create.of("a")); + + PCollection output = + input + .apply( + "OutputWindowedValueObject", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, BoundedWindow window) { + c.outputWindowedValue( + WindowedValues.of( + element, + c.timestamp(), + Collections.singleton(window), + c.pane(), + null, + null, + org.apache.beam.sdk.values.CausedByDrain.NORMAL, + ValueKind.UPDATE_BEFORE)); + } + })) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInOutputWindowedValue_TaggedObject() { + PCollection input = pipeline.apply(Create.of("a")); + TupleTag mainTag = new TupleTag() {}; + TupleTag sideTag = new TupleTag() {}; + + PCollectionTuple outputTuple = + input.apply( + "OutputWindowedValueTaggedObject", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, BoundedWindow window) { + c.outputWindowedValue( + sideTag, + WindowedValues.of( + element, + c.timestamp(), + Collections.singleton(window), + c.pane(), + null, + null, + org.apache.beam.sdk.values.CausedByDrain.NORMAL, + ValueKind.UPDATE_BEFORE)); + } + }) + .withOutputTags(mainTag, TupleTagList.of(sideTag))); + + PCollection output = + outputTuple + .get(sideTag) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedAcrossTags() { + PCollection input = pipeline.apply(Create.of("a")); + TupleTag mainTag = new TupleTag() {}; + TupleTag sideTag = new TupleTag() {}; + + PCollectionTuple outputTuple = + input.apply( + "OutputWithKindTagged", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(sideTag, element, ValueKind.UPDATE_BEFORE); + } + }) + .withOutputTags(mainTag, TupleTagList.of(sideTag))); + + PCollection output = + outputTuple + .get(sideTag) + .apply( + "ReadKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, ProcessContext c, ValueKind kind) { + c.output(element + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindInSplittableDoFn() { + PCollection input = pipeline.apply(Create.of("a")); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply( + "SplittableDoFn", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element String element, + RestrictionTracker tracker, + ProcessContext c, + ValueKind kind) { + if (tracker.tryClaim(tracker.currentRestriction().getFrom())) { + c.output(element + ":" + kind); + } + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction(@Element String element) { + return new OffsetRange(0, 1); + } + })); + + PAssert.that(output).containsInAnyOrder("a:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testOutputWithKindInWindowExpiration() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "a"))); + PCollection> windowedInput = + input.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); + + PCollection output = + windowedInput.apply( + "StatefulParDo", + ParDo.of( + new DoFn, String>() { + @StateId("dummy") + private final StateSpec> spec = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c) { + // Do nothing, just let state be created + } + + @OnWindowExpiration + public void onWindowExpiration( + OutputReceiver receiver, + BoundedWindow window, + @Timestamp org.joda.time.Instant timestamp) { + receiver.outputWindowedValue( + "expired", + timestamp, + Collections.singleton(window), + PaneInfo.NO_FIRING, + ValueKind.UPDATE_BEFORE); + } + })); + + PAssert.that(output).containsInAnyOrder("expired"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testOutputWithKindInOnTimer() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "a"))); + + PCollection output = + input.apply( + "TimerParDo", + ParDo.of( + new DoFn, String>() { + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + @Element KV element, + @TimerId("timer") Timer timer, + ProcessContext c) { + timer.set(c.timestamp().plus(Duration.standardSeconds(1))); + } + + @OnTimer("timer") + public void onTimer(OnTimerContext c) { + c.outputWithKind("timed_out", ValueKind.UPDATE_BEFORE); + } + })); + + PAssert.that(output).containsInAnyOrder("timed_out"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInReshuffle() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "value"))); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn, KV>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply(Reshuffle.of()) + .apply( + "ReadKind", + ParDo.of( + new DoFn, String>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c, ValueKind kind) { + c.output(element.getValue() + ":" + kind); + } + })); + + PAssert.that(output).containsInAnyOrder("value:UPDATE_BEFORE"); + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testValueKindPreservedInGroupByKeyWithReify() { + PCollection> input = pipeline.apply(Create.of(KV.of("key", "value"))); + + PCollection output = + input + .apply( + "SetKind", + ParDo.of( + new DoFn, KV>() { + @ProcessElement + public void processElement( + @Element KV element, ProcessContext c) { + c.outputWithKind(element, ValueKind.UPDATE_BEFORE); + } + })) + .apply(Reify.windowsInValue()) + .apply(GroupByKey.create()) + .apply( + "ReadKind", + ParDo.of( + new DoFn>>, String>() { + @ProcessElement + public void processElement( + @Element KV>> element, + ProcessContext c) { + for (ValueInSingleWindow value : element.getValue()) { + c.output(value.getValue() + ":" + value.getValueKind()); + } + } + })); + + PAssert.that(output).containsInAnyOrder("value:UPDATE_BEFORE"); + pipeline.run(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index 97b99321e5c7..2e3bd275f715 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.values.CausedByDrain; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -80,7 +81,7 @@ public void testWindowedValueCoder() throws CoderException { @Test public void testWindowedValueWithElementMetadataCoder() throws CoderException { - WindowedValues.WindowedValueCoder.setMetadataSupported(); + WindowedValues.WindowedValueCoder.setMetadataSupported(true); Instant timestamp = new Instant(1234); WindowedValue value = WindowedValues.of( @@ -108,6 +109,33 @@ public void testWindowedValueWithElementMetadataCoder() throws CoderException { Assert.assertEquals(CausedByDrain.CAUSED_BY_DRAIN, value.causedByDrain()); } + @Test + public void testWindowedValueWithValueKindCoder() throws CoderException { + WindowedValues.WindowedValueCoder.setMetadataSupported(true); + Instant timestamp = new Instant(1234); + WindowedValue value = + WindowedValues.builder() + .setValue("abc") + .setTimestamp(timestamp) + .setWindows( + Arrays.asList(new IntervalWindow(timestamp, timestamp.plus(Duration.millis(1000))))) + .setPaneInfo(PaneInfo.NO_FIRING) + .setValueKind(ValueKind.UPDATE_BEFORE) + .build(); + + Coder> windowedValueCoder = + WindowedValues.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder()); + + byte[] encodedValue = CoderUtils.encodeToByteArray(windowedValueCoder, value); + WindowedValue decodedValue = + CoderUtils.decodeFromByteArray(windowedValueCoder, encodedValue); + + Assert.assertEquals(value.getValue(), decodedValue.getValue()); + Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp()); + Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray()); + Assert.assertEquals(value.getValueKind(), decodedValue.getValueKind()); + } + @Test public void testFullWindowedValueCoderIsSerializableWithWellKnownCoderType() { CoderProperties.coderSerializable( diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 4d7819ce9b42..b25dbfac92d6 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -108,6 +108,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.ValueKind; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.WindowedValueCoder; @@ -1736,6 +1737,13 @@ private class WindowObservingProcessBundleContext public OutputBuilder builder(OutputT value) { return WindowedValues.builder() .setValue(value) + .setTimestamp(currentElement.getTimestamp()) + .setPaneInfo(currentElement.getPaneInfo()) + .setWindows(currentElement.getWindows()) + .setRecordOffset(currentElement.getRecordOffset()) + .setRecordId(currentElement.getRecordId()) + .setCausedByDrain(currentElement.causedByDrain()) + .setValueKind(currentElement.getValueKind()) .setReceiver(windowedValue -> outputTo(mainOutputConsumer, windowedValue)); } @@ -1762,6 +1770,32 @@ public void output(TupleTag tag, T output) { output, currentElement.getTimestamp(), currentWindow, currentElement.getPaneInfo())); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + builder(output).setValueKind(kind).output(); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + outputWindowedValue( + tag, + output, + currentElement.getTimestamp(), + currentElement.getWindows(), + currentElement.getPaneInfo(), + kind); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo, valueKind); + } + @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all @@ -1813,6 +1847,16 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { + outputWindowedValue(tag, output, timestamp, windows, paneInfo, ValueKind.INSERT); + } + + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all // runners can provide proper timestamps. FnDataReceiver> consumer = @@ -1820,7 +1864,10 @@ public void outputWindowedValue( if (consumer == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } - outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); + outputTo( + consumer, + WindowedValues.of( + output, timestamp, windows, paneInfo, null, null, CausedByDrain.NORMAL, valueKind)); } @Override @@ -1912,6 +1959,46 @@ public void output(TupleTag tag, T output) { outputTo(consumer, currentElement.withValue(output)); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + builder(output).setValueKind(kind).output(); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + builder(output) + .setTimestamp(timestamp) + .setWindows(windows) + .setPaneInfo(paneInfo) + .setValueKind(valueKind) + .output(); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of( + output, + currentElement.getTimestamp(), + currentElement.getWindows(), + currentElement.getPaneInfo(), + null, + null, + CausedByDrain.NORMAL, + kind)); + } + @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { builder(output).setValue(output).setTimestamp(timestamp).output(); @@ -2270,10 +2357,20 @@ public CausedByDrain causedByDrain() { return currentElement.causedByDrain(); } + @Override + public ValueKind valueKind() { + return currentElement.getValueKind(); + } + @Override public CausedByDrain causedByDrain(DoFn doFn) { return currentElement.causedByDrain(); } + + @Override + public ValueKind valueKind(DoFn doFn) { + return currentElement.getValueKind(); + } } /** @@ -2312,6 +2409,42 @@ public void outputWindowedValue( OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + OutputReceiver.super.outputWithKind(output, kind); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo, valueKind); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + checkOnWindowExpirationTimestamp(currentTimer.getHoldTimestamp()); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of( + output, + currentTimer.getHoldTimestamp(), + currentWindow, + currentTimer.getPaneInfo(), + null, + null, + currentTimer.causedByDrain(), + kind)); + } + @Override public BoundedWindow window() { return currentWindow; @@ -2641,6 +2774,42 @@ public void outputWindowedValue( OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo); } + @Override + public void outputWithKind(OutputT output, ValueKind kind) { + OutputReceiver.super.outputWithKind(output, kind); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo, + ValueKind valueKind) { + OutputReceiver.super.outputWindowedValue(output, timestamp, windows, paneInfo, valueKind); + } + + @Override + public void outputWithKind(TupleTag tag, T output, ValueKind kind) { + checkTimerTimestamp(currentTimer.getHoldTimestamp()); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumer, + WindowedValues.of( + output, + currentTimer.getHoldTimestamp(), + currentWindow, + currentTimer.getPaneInfo(), + null, + null, + causedByDrain, + kind)); + } + @Override public void output(TupleTag tag, T output) { checkTimerTimestamp(currentTimer.getHoldTimestamp()); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index d0724432f3c9..18c9f4834a78 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -66,6 +66,7 @@ import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.TextFormat; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -296,6 +297,7 @@ public static void main( // Register standard file systems. FileSystems.setDefaultPipelineOptions(options); CoderTranslation.verifyModelCodersRegistered(); + WindowedValues.FullWindowedValueCoder.setMetadataSupported(true); EnumMap< BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction>