Skip to content

Commit b21b9e3

Browse files
committed
add drain docs
1 parent 7235088 commit b21b9e3

4 files changed

Lines changed: 96 additions & 3 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070

7171
## New Features / Improvements
7272

73+
* Capability introduces an indicator for aggregations and timers firing during a pipeline drain, allowing users and sinks to recognize and appropriately handle potentially incomplete or partial data ([#36884](https://github.com/apache/beam/issues/36884)).
7374
* Added support for setting disk provisioned IOPS and throughput in Dataflow runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` pipeline options (Java/Go) ([#38349](https://github.com/apache/beam/issues/38349)).
7475
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
7576
* TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to

website/www/site/content/en/blog/looping-timers.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,18 @@ public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<St
221221
public void onTimer(
222222
OnTimerContext c,
223223
@StateId("key") ValueState<String> key,
224-
@TimerId("loopingTimer") Timer loopingTimer) {
224+
@TimerId("loopingTimer") Timer loopingTimer,
225+
CausedByDrain drain) {
225226

226227
LOG.info("Timer @ {} fired", c.timestamp());
227228
c.output(KV.of(key.read(), 0));
228229

230+
// Check if drain is in progress and avoid resetting the timer
231+
if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
232+
LOG.info("Drain in progress, stopping looping timer.");
233+
return;
234+
}
235+
229236
// If we do not put in a “time to live” value, then the timer would loop forever
230237
Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1));
231238
if (nextTimer.isBefore(stopTimerTime)) {
@@ -347,4 +354,4 @@ support for dealing with this use case in production.
347354

348355

349356
Runner specific notes:
350-
Google Cloud Dataflow Runners Drain feature does not support looping timers (Link to matrix)
357+
Support for cancelling looping timers on drain is currently limited to Dataflow and is being implemented (see [Issue #36884](https://github.com/apache/beam/issues/36884)).

website/www/site/content/en/documentation/programming-guide.md

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7501,6 +7501,91 @@ class BufferDoFn(DoFn):
75017501
{{< code_sample "sdks/go/examples/snippets/04transforms.go" batching_dofn_example >}}
75027502
{{< /highlight >}}
75037503

7504+
#### 11.5.3. Looping timers {#looping-timers}
7505+
7506+
Looping timers are a pattern where a timer sets another timer for a future time, creating a loop. This is useful for producing periodic outputs or heartbeats in the absence of data for a specific key.
7507+
7508+
When draining a pipeline, it is important to terminate these loops to allow the pipeline to finish. In the Java SDK, you can use the `CausedByDrain` parameter in the `@OnTimer` method to check if the timer firing was induced by a drain operation. **Note:** `CausedByDrain` will be set only in certain runners. Check the [capability matrix](/documentation/runners/capability-matrix/) for more details.
7509+
7510+
{{< highlight java >}}
7511+
public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<String, Integer>> {
7512+
@StateId("key") private final StateSpec<ValueState<String>> key = StateSpecs.value();
7513+
@TimerId("loopingTimer") private final TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
7514+
7515+
@ProcessElement
7516+
public void process(
7517+
@Element KV<String, Integer> element,
7518+
@StateId("key") ValueState<String> keyState,
7519+
@TimerId("loopingTimer") Timer timer) {
7520+
7521+
keyState.write(element.getKey());
7522+
// Set initial timer
7523+
timer.offset(Duration.standardMinutes(1)).setRelative();
7524+
output.output(element);
7525+
}
7526+
7527+
@OnTimer("loopingTimer")
7528+
public void onTimer(
7529+
@StateId("key") ValueState<String> keyState,
7530+
@TimerId("loopingTimer") Timer timer,
7531+
OutputReceiver<KV<String, Integer>> output,
7532+
CausedByDrain drain) {
7533+
7534+
output.output(KV.of(keyState.read(), 0));
7535+
7536+
// Cancel looping timer if drain is in progress
7537+
if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
7538+
return;
7539+
}
7540+
7541+
// Set next timer
7542+
timer.offset(Duration.standardMinutes(1)).setRelative();
7543+
}
7544+
}
7545+
{{< /highlight >}}
7546+
7547+
{{< highlight py >}}
7548+
# Python does not currently support detecting drain in OnTimer.
7549+
# The following example demonstrates a looping timer without drain support.
7550+
7551+
class LoopingTimerDoFn(DoFn):
7552+
KEY_STATE = ValueStateSpec('key', coders.StrUtf8Coder())
7553+
TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
7554+
7555+
def process(self, element, key_state=DoFn.StateParam(KEY_STATE), timer=DoFn.TimerParam(TIMER)):
7556+
key_state.write(element[0])
7557+
timer.set(Timestamp.now() + Duration(seconds=60))
7558+
yield element
7559+
7560+
@on_timer(TIMER)
7561+
def on_timer(self, key_state=DoFn.StateParam(KEY_STATE), timer=DoFn.TimerParam(TIMER)):
7562+
yield (key_state.read(), 0)
7563+
# Loops forever, cannot handle drain safely if it never stops.
7564+
timer.set(Timestamp.now() + Duration(seconds=60))
7565+
{{< /highlight >}}
7566+
7567+
{{< highlight go >}}
7568+
// Go does not currently support detecting drain in OnTimer.
7569+
// The following example demonstrates a looping timer without drain support.
7570+
7571+
type LoopingTimerFn struct {
7572+
KeyState state.Value[string]
7573+
Timer timers.EventTime
7574+
}
7575+
7576+
func (fn *LoopingTimerFn) ProcessElement(sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) {
7577+
fn.KeyState.Write(sp, key)
7578+
fn.Timer.Set(tp, time.Now().Add(60*time.Second))
7579+
emit(key, value)
7580+
}
7581+
7582+
func (fn *LoopingTimerFn) OnTimer(sp state.Provider, tp timers.Provider, key string, emit func(string, int)) {
7583+
emit(key, 0)
7584+
// Loops forever, cannot handle drain safely if it never stops.
7585+
fn.Timer.Set(tp, time.Now().Add(60*time.Second))
7586+
}
7587+
{{< /highlight >}}
7588+
75047589

75057590
## 12. Splittable `DoFns` {#splittable-dofns}
75067591

website/www/site/data/capability_matrix.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1475,7 +1475,7 @@ capability-matrix:
14751475
- class: dataflow
14761476
l1: "Partially"
14771477
l2:
1478-
l3: Dataflow has a native drain operation, but it does not work in the presence of event time timer loops. Final implemention pending model support.
1478+
l3: Dataflow has a native drain operation, support for event time timer loops drain is limited to Non-portable runner.
14791479
- class: prism
14801480
l1: "No"
14811481
l2:

0 commit comments

Comments
 (0)