3131import java .util .ArrayList ;
3232import java .util .Arrays ;
3333import java .util .Collections ;
34- import java .util .HashMap ;
3534import java .util .List ;
3635import java .util .Map ;
3736import java .util .UUID ;
37+ import java .util .concurrent .ConcurrentHashMap ;
3838import java .util .concurrent .atomic .AtomicBoolean ;
3939import org .apache .beam .sdk .Pipeline ;
4040import org .apache .beam .sdk .coders .BigEndianIntegerCoder ;
@@ -958,16 +958,22 @@ public OffsetRange getInitialRestriction() {
958958 }
959959
960960 /**
961- * While the finalization callback hasn't been invoked, this DoFn will keep requesting
962- * finalization, wait one second and then checkpoint upto MAX_ATTEMPTS amount of times. Once the
963- * callback has been invoked, the DoFn will output the element and stop.
961+ * While the finalization callback hasn't been invoked, this DoFn repeatedly registers a
962+ * finalization request, sleeps ~100ms, and checkpoints (via {@link ProcessContinuation#resume()})
963+ * up to {@link #MAX_ATTEMPTS} times. Once the callback runs and flips {@code WAS_FINALIZED}, the
964+ * DoFn outputs the element and stops.
965+ *
966+ * <p>Shared state must be thread-safe: {@link BundleFinalizer} callbacks can run on a different
967+ * thread than {@code @ProcessElement}; use {@link ConcurrentHashMap} for {@code WAS_FINALIZED} so
968+ * {@code computeIfAbsent} / updates are not racy (a plain {@link HashMap} can hang or corrupt
969+ * under concurrent structural access).
964970 */
965971 public static class BundleFinalizingSplittableDoFn extends DoFn <String , String > {
972+ /** Upper bound on {@link ProcessContinuation#resume()} iterations via restriction width. */
966973 private static final long MAX_ATTEMPTS = 3000 ;
967- // We use the UUID to uniquely identify this DoFn in case this test is run with
968- // other tests in the same JVM.
969- private static final Map <UUID , AtomicBoolean > WAS_FINALIZED = new HashMap ();
970- private final UUID uuid = UUID .randomUUID ();
974+
975+ private static final long FINALIZATION_CALLBACK_TIMEOUT_SECS = 300 ;
976+ private static final Map <String , AtomicBoolean > WAS_FINALIZED = new ConcurrentHashMap <>();
971977
972978 @ NewTracker
973979 public RestrictionTracker <OffsetRange , Long > newTracker (@ Restriction OffsetRange restriction ) {
@@ -987,23 +993,29 @@ public ProcessContinuation process(
987993 RestrictionTracker <OffsetRange , Long > tracker ,
988994 BundleFinalizer bundleFinalizer )
989995 throws InterruptedException {
990- if (WAS_FINALIZED .computeIfAbsent (uuid , (unused ) -> new AtomicBoolean ()).get ()) {
996+ AtomicBoolean wasFinalized =
997+ WAS_FINALIZED .computeIfAbsent (element , (unused ) -> new AtomicBoolean ());
998+ if (wasFinalized .get ()) {
991999 tracker .tryClaim (tracker .currentRestriction ().getFrom () + 1 );
9921000 receiver .output (element );
1001+ WAS_FINALIZED .remove (element );
9931002 // Claim beyond the end now that we know we have been finalized.
9941003 tracker .tryClaim (Long .MAX_VALUE );
9951004 return stop ();
9961005 }
9971006 if (tracker .tryClaim (tracker .currentRestriction ().getFrom () + 1 )) {
9981007 bundleFinalizer .afterBundleCommit (
999- Instant .now ().plus (Duration .standardSeconds (MAX_ATTEMPTS )),
1000- () -> WAS_FINALIZED . computeIfAbsent ( uuid , ( unused ) -> new AtomicBoolean ()) .set (true ));
1008+ Instant .now ().plus (Duration .standardSeconds (FINALIZATION_CALLBACK_TIMEOUT_SECS )),
1009+ () -> wasFinalized .set (true ));
10011010 // We sleep here instead of setting a resume time since the resume time doesn't need to
10021011 // be honored.
10031012 sleep (100L );
10041013 return resume ();
10051014 }
1006- return stop ();
1015+ WAS_FINALIZED .remove (element );
1016+ throw new RuntimeException (
1017+ String .format (
1018+ "Bundle finalization callback was not observed after %d checkpoints." , MAX_ATTEMPTS ));
10071019 }
10081020
10091021 @ GetInitialRestriction
@@ -1017,9 +1029,10 @@ public OffsetRange getInitialRestriction() {
10171029 public void testBundleFinalizationOccursOnBoundedSplittableDoFn () throws Exception {
10181030 @ BoundedPerElement
10191031 class BoundedBundleFinalizingSplittableDoFn extends BundleFinalizingSplittableDoFn {}
1020- PCollection <String > foo = p .apply (Create .of ("foo" ));
1032+ String element = "foo-" + UUID .randomUUID ();
1033+ PCollection <String > foo = p .apply (Create .of (element ));
10211034 PCollection <String > res = foo .apply (ParDo .of (new BoundedBundleFinalizingSplittableDoFn ()));
1022- PAssert .that (res ).containsInAnyOrder ("foo" );
1035+ PAssert .that (res ).containsInAnyOrder (element );
10231036 p .run ();
10241037 }
10251038
@@ -1028,9 +1041,10 @@ class BoundedBundleFinalizingSplittableDoFn extends BundleFinalizingSplittableDo
10281041 public void testBundleFinalizationOccursOnUnboundedSplittableDoFn () throws Exception {
10291042 @ UnboundedPerElement
10301043 class UnboundedBundleFinalizingSplittableDoFn extends BundleFinalizingSplittableDoFn {}
1031- PCollection <String > foo = p .apply (Create .of ("foo" ));
1044+ String element = "foo-" + UUID .randomUUID ();
1045+ PCollection <String > foo = p .apply (Create .of (element ));
10321046 PCollection <String > res = foo .apply (ParDo .of (new UnboundedBundleFinalizingSplittableDoFn ()));
1033- PAssert .that (res ).containsInAnyOrder ("foo" );
1047+ PAssert .that (res ).containsInAnyOrder (element );
10341048 p .run ();
10351049 }
10361050
0 commit comments