Skip to content

Commit f258f0c

Browse files
authored
Merge pull request #34883: Refactor: separate PairWithRestriction from FnApiDoFnRunner
2 parents 0e21278 + 41d7021 commit f258f0c

7 files changed

Lines changed: 1243 additions & 620 deletions

File tree

sdks/java/harness/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ applyJavaNature(
4545
'AssignWindowsRunner': 'https://github.com/typetools/checker-framework/issues/3794',
4646
'WindowMergingFnRunner': 'https://github.com/typetools/checker-framework/issues/3794',
4747
'FnApiDoFnRunner': 'https://github.com/typetools/checker-framework/issues/5436',
48+
'SplittablePairWithRestrictionDoFnRunner': 'https://github.com/typetools/checker-framework/issues/5436',
4849
],
4950
automaticModuleName: 'org.apache.beam.fn.harness',
5051
testShadowJar: true,

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
149149
Factory factory = new Factory();
150150
return ImmutableMap.<String, PTransformRunnerFactory>builder()
151151
.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, factory)
152-
.put(PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN, factory)
153152
.put(PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN, factory)
154153
.put(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN, factory)
155154
.put(
@@ -375,7 +374,6 @@ public final void addRunnerForPTransform(Context context) {
375374
case PTransformTranslation.PAR_DO_TRANSFORM_URN:
376375
mainOutputTag = (TupleTag) ParDoTranslation.getMainOutputTag(parDoPayload);
377376
break;
378-
case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
379377
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
380378
case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
381379
mainOutputTag =
@@ -498,8 +496,6 @@ public final void addRunnerForPTransform(Context context) {
498496
case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
499497
addStartFunction.accept(this::startBundle);
500498
break;
501-
case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
502-
// startBundle should not be invoked
503499
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
504500
// startBundle should not be invoked
505501
case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
@@ -525,18 +521,6 @@ public final void addRunnerForPTransform(Context context) {
525521
this.processContext = new NonWindowObservingProcessBundleContext();
526522
}
527523
break;
528-
case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
529-
if (doFnSignature.getInitialRestriction().observesWindow()
530-
|| (doFnSignature.getInitialWatermarkEstimatorState() != null
531-
&& doFnSignature.getInitialWatermarkEstimatorState().observesWindow())
532-
|| !sideInputMapping.isEmpty()) {
533-
mainInputConsumer = this::processElementForWindowObservingPairWithRestriction;
534-
this.processContext = new WindowObservingProcessBundleContext();
535-
} else {
536-
mainInputConsumer = this::processElementForPairWithRestriction;
537-
this.processContext = new NonWindowObservingProcessBundleContext();
538-
}
539-
break;
540524
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
541525
if ((doFnSignature.splitRestriction() != null
542526
&& doFnSignature.splitRestriction().observesWindow())
@@ -667,8 +651,6 @@ public void accept(WindowedValue input) throws Exception {
667651
case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
668652
addFinishFunction.accept(this::finishBundle);
669653
break;
670-
case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
671-
// finishBundle should not be invoked
672654
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
673655
// finishBundle should not be invoked
674656
case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
@@ -815,57 +797,6 @@ private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
815797
}
816798
}
817799

818-
private void processElementForPairWithRestriction(WindowedValue<InputT> elem) {
819-
currentElement = elem;
820-
try {
821-
currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext);
822-
outputTo(
823-
mainOutputConsumer,
824-
(WindowedValue)
825-
elem.withValue(
826-
KV.of(
827-
elem.getValue(),
828-
KV.of(
829-
currentRestriction,
830-
doFnInvoker.invokeGetInitialWatermarkEstimatorState(processContext)))));
831-
} finally {
832-
currentElement = null;
833-
currentRestriction = null;
834-
}
835-
836-
this.stateAccessor.finalizeState();
837-
}
838-
839-
private void processElementForWindowObservingPairWithRestriction(WindowedValue<InputT> elem) {
840-
currentElement = elem;
841-
try {
842-
Iterator<BoundedWindow> windowIterator =
843-
(Iterator<BoundedWindow>) elem.getWindows().iterator();
844-
while (windowIterator.hasNext()) {
845-
currentWindow = windowIterator.next();
846-
currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext);
847-
outputTo(
848-
mainOutputConsumer,
849-
(WindowedValue)
850-
WindowedValue.of(
851-
KV.of(
852-
elem.getValue(),
853-
KV.of(
854-
currentRestriction,
855-
doFnInvoker.invokeGetInitialWatermarkEstimatorState(processContext))),
856-
currentElement.getTimestamp(),
857-
currentWindow,
858-
currentElement.getPane()));
859-
}
860-
} finally {
861-
currentElement = null;
862-
currentWindow = null;
863-
currentRestriction = null;
864-
}
865-
866-
this.stateAccessor.finalizeState();
867-
}
868-
869800
private void processElementForSplitRestriction(
870801
WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
871802
currentElement = elem.withValue(elem.getValue().getKey());

0 commit comments

Comments
 (0)