@@ -7509,29 +7509,27 @@ When draining a pipeline, it is important to terminate these loops to allow the
75097509
75107510{{< highlight java >}}
75117511public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<String, Integer>> {
7512- @StateId ("key") private final StateSpec<ValueState<String >> key = StateSpecs.value();
75137512 @TimerId ("loopingTimer") private final TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
75147513
75157514 @ProcessElement
75167515 public void process(
75177516 @Element KV<String, Integer> element,
7518- @StateId("key ") ValueState<String> keyState ,
7519- @TimerId("loopingTimer") Timer timer ) {
7517+ @TimerId("loopingTimer ") Timer timer ,
7518+ OutputReceiver<KV<String, Integer>> output ) {
75207519
7521- keyState.write(element.getKey());
75227520 // Set initial timer
75237521 timer.offset(Duration.standardMinutes(1)).setRelative();
75247522 output.output(element);
75257523 }
75267524
75277525 @OnTimer("loopingTimer")
75287526 public void onTimer(
7529- @StateId("key") ValueState< String> keyState ,
7527+ @Key String key ,
75307528 @TimerId("loopingTimer") Timer timer,
75317529 OutputReceiver<KV<String, Integer>> output,
75327530 CausedByDrain drain) {
75337531
7534- output.output(KV.of(keyState.read() , 0));
7532+ output.output(KV.of(key , 0));
75357533
75367534 // Cancel looping timer if drain is in progress
75377535 if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
@@ -7546,43 +7544,43 @@ public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<St
75467544
75477545{{< highlight py >}}
75487546# Python does not currently support detecting drain in OnTimer.
7549- # The following example demonstrates a looping timer without drain support.
7547+ # The following example demonstrates a looping timer without drain support,
7548+ # using event time.
75507549
75517550class LoopingTimerDoFn(DoFn):
7552- KEY_STATE = ValueStateSpec('key', coders.StrUtf8Coder())
75537551 TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
75547552
7555- def process(self, element, key_state=DoFn.StateParam(KEY_STATE), timer=DoFn.TimerParam(TIMER)):
7556- key_state.write(element[ 0] )
7557- timer.set(Timestamp.now() + Duration(seconds=60))
7553+ def process(self, element, ts=DoFn.TimestampParam, timer=DoFn.TimerParam(TIMER)):
7554+ timer.set(ts + Duration(seconds=60))
75587555 yield element
75597556
75607557 @on_timer(TIMER)
7561- def on_timer(self, key_state =DoFn.StateParam(KEY_STATE) , timer=DoFn.TimerParam(TIMER)):
7562- yield (key_state.read() , 0)
7558+ def on_timer(self, key =DoFn.KeyParam, timestamp=DoFn.TimestampParam , timer=DoFn.TimerParam(TIMER)):
7559+ yield (key , 0)
75637560 # Loops forever, cannot handle drain safely if it never stops.
7564- timer.set(Timestamp.now() + Duration(seconds=60))
7561+ timer.set(timestamp + Duration(seconds=60))
75657562{{< /highlight >}}
75667563
75677564{{< highlight go >}}
75687565// Go does not currently support detecting drain in OnTimer.
7569- // The following example demonstrates a looping timer without drain support.
7566+ // The following example demonstrates a looping timer without drain support,
7567+ // using event time.
75707568
75717569type LoopingTimerFn struct {
7572- KeyState state.Value[ string]
7573- Timer timers.EventTime
7570+ Timer timers.EventTime
75747571}
75757572
7576- func (fn * LoopingTimerFn) ProcessElement(sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) {
7577- fn.KeyState.Write(sp, key )
7578- fn.Timer.Set(tp, time.Now().Add(60 * time.Second) )
7573+ func (fn * LoopingTimerFn) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) {
7574+ nextTime := et.ToTime().Add(60 * time.Second )
7575+ fn.Timer.Set(tp, nextTime )
75797576 emit(key, value)
75807577}
75817578
7582- func (fn * LoopingTimerFn) OnTimer(sp state.Provider, tp timers.Provider, key string, emit func(string, int)) {
7579+ func (fn * LoopingTimerFn) OnTimer(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context , emit func(string, int)) {
75837580 emit(key, 0)
75847581 // Loops forever, cannot handle drain safely if it never stops.
7585- fn.Timer.Set(tp, time.Now().Add(60* time.Second))
7582+ nextTime := et.ToTime().Add(60 * time.Second)
7583+ fn.Timer.Set(tp, nextTime)
75867584}
75877585{{< /highlight >}}
75887586
0 commit comments