Skip to content

Commit 1c53cb2

Browse files
authored
Merge pull request #37851 from stankiewicz/metadata_propagation
Fixes metadata propagation from doFn output to next parDo by introducing new interface and plumbing propagation
2 parents b7c87f2 + 7a5c982 commit 1c53cb2

16 files changed

Lines changed: 295 additions & 95 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,21 @@ public <T> void outputWindowedValue(
477477
element.causedByDrain()));
478478
}
479479

480+
@Override
481+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
482+
outputWindowedValue(mainOutputTag, windowedValue);
483+
}
484+
485+
@Override
486+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
487+
noteOutput();
488+
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
489+
((TimestampObservingWatermarkEstimator) watermarkEstimator)
490+
.observeTimestamp(windowedValue.getTimestamp());
491+
}
492+
outputReceiver.output(tag, windowedValue);
493+
}
494+
480495
private void noteOutput() {
481496
checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()");
482497
checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()");

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
376376
emit(
377377
contextFactory.base(window, StateStyle.DIRECT),
378378
contextFactory.base(window, StateStyle.RENAMED),
379-
null);
379+
CausedByDrain.NORMAL);
380380
}
381381

382382
// We're all done with merging and emitting elements so can compress the activeWindow state.

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,17 @@ public <T> void output(TupleTag<T> tag, T output) {
453453
SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output));
454454
}
455455

456+
@Override
457+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
458+
outputWindowedValue(mainOutputTag, windowedValue);
459+
}
460+
461+
@Override
462+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
463+
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
464+
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
465+
}
466+
456467
@Override
457468
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
458469
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
@@ -1037,6 +1048,17 @@ public <T> void outputWindowedValue(
10371048
.output();
10381049
}
10391050

1051+
@Override
1052+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1053+
outputWindowedValue(mainOutputTag, windowedValue);
1054+
}
1055+
1056+
@Override
1057+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1058+
checkTimestamp(timestamp(), windowedValue.getTimestamp());
1059+
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
1060+
}
1061+
10401062
@Override
10411063
public BundleFinalizer bundleFinalizer() {
10421064
throw new UnsupportedOperationException(
@@ -1296,6 +1318,17 @@ public <T> void outputWindowedValue(
12961318
.output();
12971319
}
12981320

1321+
@Override
1322+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1323+
outputWindowedValue(mainOutputTag, windowedValue);
1324+
}
1325+
1326+
@Override
1327+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1328+
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
1329+
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
1330+
}
1331+
12991332
@Override
13001333
public BundleFinalizer bundleFinalizer() {
13011334
throw new UnsupportedOperationException(

sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.sdk.transforms.DoFn;
2424
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2525
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
26+
import org.apache.beam.sdk.values.CausedByDrain;
2627
import org.apache.beam.sdk.values.OutputBuilder;
2728
import org.apache.beam.sdk.values.WindowedValues;
2829
import org.joda.time.Instant;
@@ -54,6 +55,7 @@ public OutputBuilder<T> builder(T value) {
5455
.setWindow(fakeWindow)
5556
.setPaneInfo(PaneInfo.NO_FIRING)
5657
.setTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE)
58+
.setCausedByDrain(CausedByDrain.NORMAL)
5759
.setReceiver(windowedValue -> records.add(windowedValue.getValue()));
5860
}
5961

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.beam.sdk.values.Row;
5252
import org.apache.beam.sdk.values.TupleTag;
5353
import org.apache.beam.sdk.values.TypeDescriptor;
54+
import org.apache.beam.sdk.values.WindowedValue;
5455
import org.apache.beam.sdk.values.WindowingStrategy;
5556
import org.checkerframework.checker.nullness.qual.Nullable;
5657
import org.checkerframework.dataflow.qual.Pure;
@@ -284,6 +285,10 @@ public abstract <T> void outputWindowedValue(
284285
Instant timestamp,
285286
Collection<? extends BoundedWindow> windows,
286287
PaneInfo paneInfo);
288+
289+
public abstract <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue);
290+
291+
public abstract void outputWindowedValue(WindowedValue<OutputT> windowedValue);
287292
}
288293

289294
/** Information accessible when running a {@link DoFn.ProcessElement} method. */

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,8 @@ public OutputBuilder<Row> builder(Row value) {
7070
rowWithMetadata -> {
7171
((DoFn<?, T>.WindowedContext) context)
7272
.outputWindowedValue(
73-
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()),
74-
rowWithMetadata.getTimestamp(),
75-
rowWithMetadata.getWindows(),
76-
rowWithMetadata.getPaneInfo());
73+
rowWithMetadata.withValue(
74+
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue())));
7775
});
7876

7977
} else {
@@ -84,10 +82,8 @@ public OutputBuilder<Row> builder(Row value) {
8482
rowWithMetadata -> {
8583
context.outputWindowedValue(
8684
tag,
87-
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()),
88-
rowWithMetadata.getTimestamp(),
89-
rowWithMetadata.getWindows(),
90-
rowWithMetadata.getPaneInfo());
85+
rowWithMetadata.withValue(
86+
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue())));
9187
});
9288
}
9389
}
@@ -120,19 +116,9 @@ public OutputBuilder<T> builder(T value) {
120116
@Override
121117
public void output(WindowedValue<T> windowedValue) {
122118
if (outputTag != null) {
123-
context.outputWindowedValue(
124-
outputTag,
125-
windowedValue.getValue(),
126-
windowedValue.getTimestamp(),
127-
windowedValue.getWindows(),
128-
windowedValue.getPaneInfo());
119+
context.outputWindowedValue(outputTag, windowedValue);
129120
} else {
130-
((DoFn<?, T>.WindowedContext) context)
131-
.outputWindowedValue(
132-
windowedValue.getValue(),
133-
windowedValue.getTimestamp(),
134-
windowedValue.getWindows(),
135-
windowedValue.getPaneInfo());
121+
((DoFn<?, T>.WindowedContext) context).outputWindowedValue(windowedValue);
136122
}
137123
}
138124
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,27 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
649649
CausedByDrain.NORMAL));
650650
}
651651

652+
@Override
653+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
654+
outputWindowedValue(mainOutputTag, windowedValue);
655+
}
656+
657+
@Override
658+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
659+
for (BoundedWindow w : windowedValue.getWindows()) {
660+
getMutableOutput(tag)
661+
.add(
662+
ValueInSingleWindow.of(
663+
windowedValue.getValue(),
664+
windowedValue.getTimestamp(),
665+
w,
666+
windowedValue.getPaneInfo(),
667+
windowedValue.getRecordId(),
668+
windowedValue.getRecordOffset(),
669+
windowedValue.causedByDrain()));
670+
}
671+
}
672+
652673
@Override
653674
public <T> void outputWindowedValue(
654675
TupleTag<T> tag,

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ public void processElement(
187187
.setTimestamp(kv.getValue().getTimestamp())
188188
.setWindow(kv.getValue().getWindow())
189189
.setPaneInfo(kv.getValue().getPaneInfo())
190+
.setCausedByDrain(kv.getValue().getCausedByDrain())
190191
.output();
191192
}
192193
}));

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.beam.sdk.coders.VoidCoder;
2323
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2424
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
25+
import org.apache.beam.sdk.values.CausedByDrain;
2526
import org.apache.beam.sdk.values.KV;
2627
import org.apache.beam.sdk.values.PBegin;
2728
import org.apache.beam.sdk.values.PCollection;
@@ -141,16 +142,24 @@ public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, V>> i
141142
new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() {
142143
@ProcessElement
143144
public void processElement(
145+
ProcessContext pc,
144146
@Element KV<K, V> element,
145147
@DoFn.Timestamp Instant timestamp,
146148
BoundedWindow window,
147149
PaneInfo paneInfo,
150+
CausedByDrain causedByDrain,
148151
OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
149152
r.output(
150153
KV.of(
151154
element.getKey(),
152155
ValueInSingleWindow.of(
153-
element.getValue(), timestamp, window, paneInfo)));
156+
element.getValue(),
157+
timestamp,
158+
window,
159+
paneInfo,
160+
pc.currentRecordId(),
161+
pc.currentRecordOffset(),
162+
causedByDrain)));
154163
}
155164
}))
156165
.setCoder(

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.beam.sdk.values.PCollectionView;
5757
import org.apache.beam.sdk.values.Row;
5858
import org.apache.beam.sdk.values.TupleTag;
59+
import org.apache.beam.sdk.values.WindowedValue;
5960
import org.apache.beam.sdk.values.WindowedValues;
6061
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
6162
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -560,13 +561,7 @@ public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
560561
public OutputBuilder<OutputT> builder(OutputT value) {
561562
return outputBuilderSupplier
562563
.builder(value)
563-
.setReceiver(
564-
windowedValue ->
565-
outerContext.outputWindowedValue(
566-
windowedValue.getValue(),
567-
windowedValue.getTimestamp(),
568-
windowedValue.getWindows(),
569-
windowedValue.getPaneInfo()));
564+
.setReceiver(windowedValue -> outerContext.outputWindowedValue(windowedValue));
570565
}
571566
};
572567
}
@@ -582,13 +577,7 @@ public OutputBuilder<T> builder(T value) {
582577
return outputBuilderSupplier
583578
.builder(value)
584579
.setReceiver(
585-
windowedValue ->
586-
outerContext.outputWindowedValue(
587-
tag,
588-
windowedValue.getValue(),
589-
windowedValue.getTimestamp(),
590-
windowedValue.getWindows(),
591-
windowedValue.getPaneInfo()));
580+
windowedValue -> outerContext.outputWindowedValue(tag, windowedValue));
592581
}
593582
};
594583
}
@@ -659,6 +648,16 @@ public <T> void outputWindowedValue(
659648
outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo);
660649
}
661650

651+
@Override
652+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
653+
outerContext.outputWindowedValue(windowedValue);
654+
}
655+
656+
@Override
657+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
658+
outerContext.outputWindowedValue(tag, windowedValue);
659+
}
660+
662661
@Override
663662
public InputT element() {
664663
return element;

0 commit comments

Comments
 (0)