Skip to content

Commit a3cb642

Browse files
authored
Merge pull request #38878: Fix OnWindowExpirationContext.
* foo * fix compilation * fix compilation
1 parent ea2fde7 commit a3cb642

9 files changed

Lines changed: 89 additions & 3 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,13 @@ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
649649
"Cannot access OnTimerContext outside of @OnTimer methods.");
650650
}
651651

652+
@Override
653+
public DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
654+
DoFn<InputT, OutputT> doFn) {
655+
throw new UnsupportedOperationException(
656+
"Cannot access OnWindowExpirationContext outside of @OnWindowExpiration methods.");
657+
}
658+
652659
@Override
653660
public RestrictionTracker<?, ?> restrictionTracker() {
654661
throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
@@ -958,6 +965,13 @@ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
958965
return this;
959966
}
960967

968+
@Override
969+
public DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
970+
DoFn<InputT, OutputT> doFn) {
971+
throw new UnsupportedOperationException(
972+
"Cannot access OnWindowExpirationContext outside of @OnWindowExpiration methods.");
973+
}
974+
961975
@Override
962976
public RestrictionTracker<?, ?> restrictionTracker() {
963977
throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");
@@ -1299,6 +1313,12 @@ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
12991313
throw new UnsupportedOperationException("OnTimerContext parameters are not supported.");
13001314
}
13011315

1316+
@Override
1317+
public DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
1318+
DoFn<InputT, OutputT> doFn) {
1319+
return this;
1320+
}
1321+
13021322
@Override
13031323
public RestrictionTracker<?, ?> restrictionTracker() {
13041324
throw new UnsupportedOperationException("RestrictionTracker parameters are not supported.");

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
142142
public static final String OUTPUT_PARAMETER_METHOD = "outputReceiver";
143143
public static final String TAGGED_OUTPUT_PARAMETER_METHOD = "taggedOutputReceiver";
144144
public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = "onTimerContext";
145+
public static final String ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER_METHOD =
146+
"onWindowExpirationContext";
145147
public static final String WINDOW_PARAMETER_METHOD = "window";
146148
public static final String PANE_INFO_PARAMETER_METHOD = "paneInfo";
147149
public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = "pipelineOptions";
@@ -1170,6 +1172,16 @@ public StackManipulation dispatch(OnTimerContextParameter p) {
11701172
ON_TIMER_CONTEXT_PARAMETER_METHOD, DoFn.class)));
11711173
}
11721174

1175+
@Override
1176+
public StackManipulation dispatch(
1177+
DoFnSignature.Parameter.OnWindowExpirationContextParameter p) {
1178+
return new StackManipulation.Compound(
1179+
pushDelegate,
1180+
MethodInvocation.invoke(
1181+
getExtraContextFactoryMethodDescription(
1182+
ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER_METHOD, DoFn.class)));
1183+
}
1184+
11731185
@Override
11741186
public StackManipulation dispatch(WindowParameter p) {
11751187
return new StackManipulation.Compound(

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ interface ArgumentProvider<InputT, OutputT> {
185185
/** Provide a {@link DoFn.OnTimerContext} to use with the given {@link DoFn}. */
186186
DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn);
187187

188+
/** Provide a {@link DoFn.OnWindowExpirationContext} to use with the given {@link DoFn}. */
189+
DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
190+
DoFn<InputT, OutputT> doFn);
191+
188192
/** Provide a reference to the input element. */
189193
InputT element(DoFn<InputT, OutputT> doFn);
190194

@@ -447,6 +451,13 @@ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
447451
String.format("OnTimerContext unsupported in %s", getErrorContext()));
448452
}
449453

454+
@Override
455+
public DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
456+
DoFn<InputT, OutputT> doFn) {
457+
throw new UnsupportedOperationException(
458+
String.format("OnWindowExpirationContext unsupported in %s", getErrorContext()));
459+
}
460+
450461
@Override
451462
public State state(String stateId, boolean alwaysFetched) {
452463
throw new UnsupportedOperationException(
@@ -538,6 +549,12 @@ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
538549
return delegate.onTimerContext(doFn);
539550
}
540551

552+
@Override
553+
public DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
554+
DoFn<InputT, OutputT> doFn) {
555+
return delegate.onWindowExpirationContext(doFn);
556+
}
557+
541558
@Override
542559
public InputT element(DoFn<InputT, OutputT> doFn) {
543560
return delegate.element(doFn);

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,8 @@ public <ResultT> ResultT match(Cases<ResultT> cases) {
305305
return cases.dispatch((ProcessContextParameter) this);
306306
} else if (this instanceof OnTimerContextParameter) {
307307
return cases.dispatch((OnTimerContextParameter) this);
308+
} else if (this instanceof OnWindowExpirationContextParameter) {
309+
return cases.dispatch((OnWindowExpirationContextParameter) this);
308310
} else if (this instanceof WindowParameter) {
309311
return cases.dispatch((WindowParameter) this);
310312
} else if (this instanceof PaneInfoParameter) {
@@ -391,6 +393,8 @@ public interface Cases<ResultT> {
391393

392394
ResultT dispatch(OnTimerContextParameter p);
393395

396+
ResultT dispatch(OnWindowExpirationContextParameter p);
397+
394398
ResultT dispatch(WindowParameter p);
395399

396400
ResultT dispatch(PaneInfoParameter p);
@@ -498,6 +502,11 @@ public ResultT dispatch(OnTimerContextParameter p) {
498502
return dispatchDefault(p);
499503
}
500504

505+
@Override
506+
public ResultT dispatch(OnWindowExpirationContextParameter p) {
507+
return dispatchDefault(p);
508+
}
509+
501510
@Override
502511
public ResultT dispatch(WindowParameter p) {
503512
return dispatchDefault(p);

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,8 @@ private DoFnSignatures() {}
229229
Parameter.StateParameter.class,
230230
Parameter.TimestampParameter.class,
231231
Parameter.KeyParameter.class,
232-
Parameter.SideInputParameter.class);
232+
Parameter.SideInputParameter.class,
233+
Parameter.OnWindowExpirationContextParameter.class);
233234

234235
private static final Collection<Class<? extends Parameter>>
235236
ALLOWED_GET_INITIAL_RESTRICTION_PARAMETERS =

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,12 @@ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
506506
throw new IllegalStateException();
507507
}
508508

509+
@Override
510+
public DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
511+
DoFn<InputT, OutputT> doFn) {
512+
throw new IllegalStateException();
513+
}
514+
509515
@Override
510516
public InputT element(DoFn<InputT, OutputT> doFn) {
511517
return element;

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7335,11 +7335,15 @@ public void onTimer(
73357335
@OnWindowExpiration
73367336
public void onWindowExpiration(
73377337
@AlwaysFetched @StateId(stateId) ValueState<Integer> state,
7338+
BoundedWindow window,
73387339
@Key String key,
7340+
OnWindowExpirationContext context,
73397341
OutputReceiver<Integer> r) {
73407342
Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
73417343
// verify state
73427344
assertEquals(1, (int) currentValue);
7345+
Preconditions.checkNotNull(context);
7346+
assertEquals(window, context.window());
73437347
// To check output is received from OnWindowExpiration
73447348
r.output(currentValue);
73457349
}

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,16 +1422,20 @@ public void bar(
14221422
@StateId("foo") ValueState<Integer> s,
14231423
PipelineOptions p,
14241424
OutputReceiver<String> o,
1425-
MultiOutputReceiver m) {}
1425+
MultiOutputReceiver m,
1426+
OnWindowExpirationContext c) {}
14261427
}.getClass());
14271428

14281429
List<Parameter> params = sig.onWindowExpiration().extraParameters();
1429-
assertThat(params.size(), equalTo(5));
1430+
assertThat(params.size(), equalTo(6));
14301431
assertThat(params.get(0), instanceOf(WindowParameter.class));
14311432
assertThat(params.get(1), instanceOf(StateParameter.class));
14321433
assertThat(params.get(2), instanceOf(PipelineOptionsParameter.class));
14331434
assertThat(params.get(3), instanceOf(OutputReceiverParameter.class));
14341435
assertThat(params.get(4), instanceOf(TaggedOutputReceiverParameter.class));
1436+
assertThat(
1437+
params.get(5),
1438+
instanceOf(DoFnSignature.Parameter.OnWindowExpirationContextParameter.class));
14351439
}
14361440

14371441
private interface FeatureTest {

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2250,6 +2250,13 @@ public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
22502250
"Cannot access OnTimerContext outside of @OnTimer methods.");
22512251
}
22522252

2253+
@Override
2254+
public DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
2255+
DoFn<InputT, OutputT> doFn) {
2256+
throw new UnsupportedOperationException(
2257+
"Cannot access OnWindowExpirationContext outside of @OnWindowExpiration methods.");
2258+
}
2259+
22532260
@Override
22542261
public RestrictionTracker<?, ?> restrictionTracker() {
22552262
return currentTracker;
@@ -2469,6 +2476,12 @@ private void checkOnWindowExpirationTimestamp(Instant timestamp) {
24692476
private final OnWindowExpirationContext.Context context =
24702477
new OnWindowExpirationContext.Context();
24712478

2479+
@Override
2480+
public DoFn<InputT, OutputT>.OnWindowExpirationContext onWindowExpirationContext(
2481+
DoFn<InputT, OutputT> doFn) {
2482+
return context;
2483+
}
2484+
24722485
@Override
24732486
public BoundedWindow window() {
24742487
return currentWindow;

0 commit comments

Comments
 (0)