Skip to content

Commit b6bc904

Browse files
authored
Merge pull request #37715: Disable combiner lifting only for count triggers
1 parent 8ee8b10 commit b6bc904

19 files changed

Lines changed: 425 additions & 103 deletions

File tree

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,19 @@
8888
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
8989
import org.apache.beam.sdk.transforms.resourcehints.ResourceHint;
9090
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
91+
import org.apache.beam.sdk.transforms.windowing.AfterAll;
92+
import org.apache.beam.sdk.transforms.windowing.AfterEach;
93+
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
94+
import org.apache.beam.sdk.transforms.windowing.AfterPane;
95+
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
96+
import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
97+
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
9198
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
99+
import org.apache.beam.sdk.transforms.windowing.Never;
100+
import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
101+
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
102+
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
103+
import org.apache.beam.sdk.transforms.windowing.TriggerVisitor;
92104
import org.apache.beam.sdk.transforms.windowing.Window;
93105
import org.apache.beam.sdk.util.AppliedCombineFn;
94106
import org.apache.beam.sdk.util.CoderUtils;
@@ -134,6 +146,78 @@ public class DataflowPipelineTranslator {
134146
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
135147
private static final ObjectMapper MAPPER = new ObjectMapper();
136148

149+
/** Checks to see whether the Trigger tree is compatible with combiner lifting. */
150+
private static class TriggerCombinerLiftingCompatibility implements TriggerVisitor<Boolean> {
151+
static final TriggerCombinerLiftingCompatibility INSTANCE =
152+
new TriggerCombinerLiftingCompatibility();
153+
154+
@Override
155+
public Boolean visit(DefaultTrigger trigger) {
156+
return true;
157+
}
158+
159+
@Override
160+
public Boolean visit(AfterWatermark.FromEndOfWindow trigger) {
161+
return true;
162+
}
163+
164+
@Override
165+
public Boolean visit(AfterWatermark.AfterWatermarkEarlyAndLate trigger) {
166+
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
167+
}
168+
169+
@Override
170+
public Boolean visit(Never.NeverTrigger trigger) {
171+
return true;
172+
}
173+
174+
@Override
175+
public Boolean visit(ReshuffleTrigger<?> trigger) {
176+
return false;
177+
}
178+
179+
@Override
180+
public Boolean visit(AfterProcessingTime trigger) {
181+
return true;
182+
}
183+
184+
@Override
185+
public Boolean visit(AfterSynchronizedProcessingTime trigger) {
186+
return true;
187+
}
188+
189+
@Override
190+
public Boolean visit(AfterFirst trigger) {
191+
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
192+
}
193+
194+
@Override
195+
public Boolean visit(AfterAll trigger) {
196+
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
197+
}
198+
199+
@Override
200+
public Boolean visit(AfterEach trigger) {
201+
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
202+
}
203+
204+
@Override
205+
public Boolean visit(AfterPane trigger) {
206+
// Combiner lifting not supported for count triggers.
207+
return false;
208+
}
209+
210+
@Override
211+
public Boolean visit(Repeatedly trigger) {
212+
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
213+
}
214+
215+
@Override
216+
public Boolean visit(OrFinallyTrigger trigger) {
217+
return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
218+
}
219+
}
220+
137221
private static byte[] serializeWindowingStrategy(
138222
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
139223
try {
@@ -970,8 +1054,10 @@ private <K, V> void groupByKeyHelper(
9701054
&& windowingStrategy.getWindowFn().assignsToOneWindow();
9711055
if (isStreaming) {
9721056
allowCombinerLifting &= transform.fewKeys();
973-
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
974-
allowCombinerLifting &= (windowingStrategy.getTrigger() instanceof DefaultTrigger);
1057+
allowCombinerLifting &=
1058+
windowingStrategy
1059+
.getTrigger()
1060+
.accept(TriggerCombinerLiftingCompatibility.INSTANCE);
9751061
}
9761062
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, !allowCombinerLifting);
9771063
stepContext.addInput(

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.dataflow;
1919

20+
import static org.apache.beam.runners.dataflow.util.Structs.getBoolean;
2021
import static org.apache.beam.runners.dataflow.util.Structs.getString;
2122
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
2223
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
@@ -89,11 +90,13 @@
8990
import org.apache.beam.sdk.io.range.OffsetRange;
9091
import org.apache.beam.sdk.options.PipelineOptions;
9192
import org.apache.beam.sdk.options.PipelineOptionsFactory;
93+
import org.apache.beam.sdk.options.StreamingOptions;
9294
import org.apache.beam.sdk.options.ValueProvider;
9395
import org.apache.beam.sdk.state.StateSpec;
9496
import org.apache.beam.sdk.state.StateSpecs;
9597
import org.apache.beam.sdk.state.ValueState;
9698
import org.apache.beam.sdk.testing.PAssert;
99+
import org.apache.beam.sdk.transforms.Combine;
97100
import org.apache.beam.sdk.transforms.Count;
98101
import org.apache.beam.sdk.transforms.Create;
99102
import org.apache.beam.sdk.transforms.DoFn;
@@ -110,7 +113,17 @@
110113
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
111114
import org.apache.beam.sdk.transforms.resourcehints.ResourceHintsOptions;
112115
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
116+
import org.apache.beam.sdk.transforms.windowing.AfterAll;
117+
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
118+
import org.apache.beam.sdk.transforms.windowing.AfterPane;
119+
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
120+
import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
121+
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
122+
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
113123
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
124+
import org.apache.beam.sdk.transforms.windowing.Never;
125+
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
126+
import org.apache.beam.sdk.transforms.windowing.Trigger;
114127
import org.apache.beam.sdk.transforms.windowing.Window;
115128
import org.apache.beam.sdk.transforms.windowing.WindowFn;
116129
import org.apache.beam.sdk.util.DoFnInfo;
@@ -235,6 +248,131 @@ private static DataflowPipelineOptions buildPipelineOptions() throws IOException
235248
return options;
236249
}
237250

251+
private void testTriggerCombinerLiftingDisabled(Trigger trigger) throws Exception {
252+
DataflowPipelineOptions options = buildPipelineOptions();
253+
options.setRunner(DataflowRunner.class);
254+
options.as(StreamingOptions.class).setStreaming(true);
255+
Pipeline p = Pipeline.create(options);
256+
257+
p.traverseTopologically(new RecordingPipelineVisitor());
258+
SdkComponents sdkComponents = createSdkComponents(options);
259+
260+
p.apply("create", Create.of(1, 2, 3, 4).withCoder(VarIntCoder.of()))
261+
.setIsBoundedInternal(IsBounded.UNBOUNDED)
262+
.apply("window", Window.<Integer>configure().triggering(trigger).discardingFiredPanes())
263+
.apply("count", Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
264+
265+
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
266+
DataflowPipelineOptions translatorOptions =
267+
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
268+
translatorOptions.setStreaming(true);
269+
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(translatorOptions);
270+
271+
JobSpecification jobSpecification =
272+
t.translate(
273+
p,
274+
pipelineProto,
275+
sdkComponents,
276+
DataflowRunner.fromOptions(options),
277+
Collections.emptyList());
278+
279+
boolean foundDisable = false;
280+
for (Step step : jobSpecification.getJob().getSteps()) {
281+
if (getBoolean(step.getProperties(), PropertyNames.DISALLOW_COMBINER_LIFTING, false)) {
282+
foundDisable = true;
283+
}
284+
}
285+
assertTrue(foundDisable);
286+
}
287+
288+
@Test
289+
public void testRepeatedCountTriggerDisablesCombinerLifting() throws IOException, Exception {
290+
testTriggerCombinerLiftingDisabled(Repeatedly.forever((AfterPane.elementCountAtLeast(1))));
291+
}
292+
293+
@Test
294+
public void testEarlyCountTriggerDisablesCombinerLifting() throws IOException, Exception {
295+
testTriggerCombinerLiftingDisabled(
296+
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)));
297+
}
298+
299+
@Test
300+
public void testAfterFirstCountTriggerDisablesCombinerLifting() throws IOException, Exception {
301+
testTriggerCombinerLiftingDisabled(
302+
Repeatedly.forever(AfterFirst.of(Never.ever(), AfterPane.elementCountAtLeast(1))));
303+
}
304+
305+
@Test
306+
public void testAfterAllCountTriggerDisablesCombinerLifting() throws IOException, Exception {
307+
testTriggerCombinerLiftingDisabled(
308+
Repeatedly.forever(AfterAll.of(Never.ever(), AfterPane.elementCountAtLeast(1))));
309+
}
310+
311+
@Test
312+
public void testCombinerLiftingEnabled() throws IOException, Exception {
313+
DataflowPipelineOptions options = buildPipelineOptions();
314+
options.setRunner(DataflowRunner.class);
315+
options.as(StreamingOptions.class).setStreaming(true);
316+
Pipeline p = Pipeline.create(options);
317+
318+
p.traverseTopologically(new RecordingPipelineVisitor());
319+
SdkComponents sdkComponents = createSdkComponents(options);
320+
321+
PCollection<Integer> input =
322+
p.apply("create", Create.of(1, 2, 3, 4).withCoder(VarIntCoder.of()));
323+
324+
input
325+
.setIsBoundedInternal(IsBounded.UNBOUNDED)
326+
.apply(
327+
"window1",
328+
Window.<Integer>into(FixedWindows.of(Duration.millis(1)))
329+
.triggering(DefaultTrigger.of())
330+
.discardingFiredPanes())
331+
.apply("count", Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
332+
333+
input
334+
.apply(
335+
"window2",
336+
Window.<Integer>configure()
337+
.triggering(AfterWatermark.pastEndOfWindow())
338+
.discardingFiredPanes())
339+
.apply("count2", Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
340+
341+
input
342+
.apply(
343+
"window3",
344+
Window.<Integer>configure()
345+
.triggering(
346+
AfterWatermark.pastEndOfWindow()
347+
.withEarlyFirings(
348+
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))
349+
.withLateFirings(AfterSynchronizedProcessingTime.ofFirstElement()))
350+
.discardingFiredPanes())
351+
.apply("count3", Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
352+
353+
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
354+
DataflowPipelineOptions translatorOptions =
355+
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
356+
translatorOptions.setStreaming(true);
357+
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(translatorOptions);
358+
359+
JobSpecification jobSpecification =
360+
t.translate(
361+
p,
362+
pipelineProto,
363+
sdkComponents,
364+
DataflowRunner.fromOptions(options),
365+
Collections.emptyList());
366+
367+
boolean foundDisable = false;
368+
for (Step step : jobSpecification.getJob().getSteps()) {
369+
if (getBoolean(step.getProperties(), PropertyNames.DISALLOW_COMBINER_LIFTING, false)) {
370+
foundDisable = true;
371+
}
372+
}
373+
assertFalse(foundDisable);
374+
}
375+
238376
// Test that the transform names for Storage Write API for streaming pipelines are what we expect
239377
// them to be. This is required since the Windmill backend expects the step to contain that name.
240378
// For a more stable solution, we should use URN, but that is not currently used in the legacy

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
5858
return deadline;
5959
}
6060

61+
@Override
62+
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
63+
return visitor.visit(this);
64+
}
65+
6166
@Override
6267
protected OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
6368
return new AfterAll(continuationTriggers);

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,19 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
6767

6868
@Override
6969
public boolean mayFinish() {
70-
return subTriggers.stream().allMatch(trigger -> trigger.mayFinish());
70+
return subTriggers.stream().allMatch(Trigger::mayFinish);
7171
}
7272

7373
@Override
7474
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
7575
return Repeatedly.forever(new AfterFirst(continuationTriggers));
7676
}
7777

78+
@Override
79+
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
80+
return visitor.visit(this);
81+
}
82+
7883
@Override
7984
public String toString() {
8085
StringBuilder builder = new StringBuilder("AfterEach.inOrder(");

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
5858
return deadline;
5959
}
6060

61+
@Override
62+
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
63+
return visitor.visit(this);
64+
}
65+
6166
@Override
6267
protected OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
6368
return new AfterFirst(continuationTriggers);

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
5959
return BoundedWindow.TIMESTAMP_MAX_VALUE;
6060
}
6161

62+
@Override
63+
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
64+
return visitor.visit(this);
65+
}
66+
6267
@Override
6368
protected OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
6469
return AfterPane.elementCountAtLeast(1);

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
107107
return BoundedWindow.TIMESTAMP_MAX_VALUE;
108108
}
109109

110+
@Override
111+
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
112+
return visitor.visit(this);
113+
}
114+
110115
@Override
111116
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
112117
return AfterSynchronizedProcessingTime.ofFirstElement();

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
4949
return BoundedWindow.TIMESTAMP_MAX_VALUE;
5050
}
5151

52+
@Override
53+
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
54+
return visitor.visit(this);
55+
}
56+
5257
@Override
5358
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
5459
return this;

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateTrigger) {
101101
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
102102
}
103103

104+
@Override
105+
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
106+
return visitor.visit(this);
107+
}
108+
104109
@Override
105110
public Trigger getContinuationTrigger() {
106111
return new AfterWatermarkEarlyAndLate(
@@ -177,6 +182,11 @@ protected FromEndOfWindow getContinuationTrigger(List<Trigger> continuationTrigg
177182
return this;
178183
}
179184

185+
@Override
186+
public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
187+
return visitor.visit(this);
188+
}
189+
180190
@Override
181191
public String toString() {
182192
return TO_STRING;

0 commit comments

Comments
 (0)