Skip to content

Commit 6adfa4b

Browse files
authored
Add relative change detector (#3624)
* Add timestamp buffer and relative change detector * Add absolute threshold * dont alert if baseline is zero
1 parent 5794dda commit 6adfa4b

7 files changed

Lines changed: 2315 additions & 5 deletions

File tree

python/src/main/java/com/google/cloud/teleport/templates/python/BigQueryAnomalyDetection.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ public interface BigQueryAnomalyDetection {
6363
description = "Detector Specification (JSON)",
6464
helpText =
6565
"JSON string defining the anomaly detector. "
66-
+ "Example: {\"type\":\"ZScore\"} or "
67-
+ "{\"type\":\"ZScore\",\"config\":{\"threshold_criterion\":{\"type\":\"FixedThreshold\","
68-
+ "\"config\":{\"cutoff\":10}}}}")
66+
+ "Statistical: {\"type\":\"ZScore\"}, {\"type\":\"IQR\"}, {\"type\":\"RobustZScore\"}. "
67+
+ "Threshold: {\"type\":\"Threshold\",\"expression\":\"value >= 100\"}. "
68+
+ "RelativeChange: {\"type\":\"RelativeChange\",\"direction\":\"decrease\","
69+
+ "\"threshold_pct\":20,\"lookback_windows\":1}.")
6970
String getDetectorSpec();
7071

7172
@TemplateParameter.Text(

python/src/main/python/bigquery-anomaly-detection/src/bqmonitor/pipeline.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,8 @@
236236
from bqmonitor.metric import ComputeMetric
237237
from bqmonitor.metric import FanoutStrategy
238238
from bqmonitor.metric import MetricSpec
239+
from bqmonitor.relative_change_detector import RelativeChangeConfig
240+
from bqmonitor.relative_change_detector import RelativeChangeDoFn
239241
from bqmonitor.safe_eval import Expr
240242
from apache_beam.ml.anomaly.base import AnomalyPrediction
241243
from apache_beam.ml.anomaly.base import AnomalyResult
@@ -250,7 +252,7 @@
250252

251253
_LOGGER = logging.getLogger(__name__)
252254

253-
_SUPPORTED_DETECTORS = ('ZScore', 'IQR', 'RobustZScore')
255+
_SUPPORTED_DETECTORS = ('ZScore', 'IQR', 'RobustZScore', 'RelativeChange')
254256

255257

256258
@dataclass(frozen=True)
@@ -516,6 +518,7 @@ def process(self, element):
516518
{'name': 'value', 'type': 'FLOAT64', 'mode': 'REQUIRED'},
517519
{'name': 'score', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
518520
{'name': 'label', 'type': 'INT64', 'mode': 'REQUIRED'},
521+
{'name': 'info', 'type': 'STRING', 'mode': 'NULLABLE'},
519522
{'name': 'key', 'type': 'STRING', 'mode': 'NULLABLE'},
520523
]
521524
}
@@ -535,6 +538,7 @@ def process(self, element):
535538
'score': float(prediction.score) if prediction.score is not None
536539
else None,
537540
'label': int(prediction.label),
541+
'info': prediction.info if prediction.info else None,
538542
}
539543
if key is not None:
540544
row['key'] = str(key)
@@ -826,6 +830,34 @@ def _parse_detector_spec(json_str):
826830
"It will receive the computed metric value as 'value'.", expr_text)
827831
return _ThresholdAlert(expr_text)
828832

833+
if detector_type == 'RelativeChange':
834+
config = d.get('config', {})
835+
direction = d.get('direction', config.get('direction'))
836+
if direction is None:
837+
raise ValueError(
838+
"RelativeChange detector requires 'direction' "
839+
"(one of: increase, decrease, both).")
840+
lookback_windows = d.get('lookback_windows',
841+
config.get('lookback_windows'))
842+
if lookback_windows is None:
843+
raise ValueError(
844+
"RelativeChange detector requires 'lookback_windows' "
845+
"(number of prior windows to compare against).")
846+
threshold_pct = d.get('threshold_pct',
847+
config.get('threshold_pct'))
848+
absolute_threshold = d.get('absolute_threshold',
849+
config.get('absolute_threshold'))
850+
if threshold_pct is None and absolute_threshold is None:
851+
raise ValueError(
852+
"RelativeChange detector requires at least one of "
853+
"'threshold_pct' or 'absolute_threshold'.")
854+
return RelativeChangeConfig(
855+
direction=direction,
856+
lookback_windows=lookback_windows,
857+
threshold_pct=threshold_pct,
858+
absolute_threshold=absolute_threshold,
859+
)
860+
829861
if detector_type not in _SUPPORTED_DETECTORS:
830862
raise ValueError(
831863
f"Unknown detector type '{detector_type}'. "
@@ -1121,7 +1153,17 @@ def _add_offset_key(element, _wd=_window_duration, _keyed=has_group_by):
11211153
global_metrics = (
11221154
global_metrics | 'AddOffsetKey' >> beam.Map(_add_offset_key))
11231155

1124-
if isinstance(detector, _ThresholdAlert):
1156+
if isinstance(detector, RelativeChangeConfig):
1157+
anomalies = (
1158+
global_metrics
1159+
| 'DetectAnomalies' >> beam.ParDo(
1160+
RelativeChangeDoFn(
1161+
direction=detector.direction,
1162+
threshold_pct=detector.threshold_pct,
1163+
absolute_threshold=detector.absolute_threshold,
1164+
lookback_windows=detector.lookback_windows)))
1165+
1166+
elif isinstance(detector, _ThresholdAlert):
11251167
anomalies = global_metrics | 'DetectAnomalies' >> beam.ParDo(detector)
11261168
else:
11271169
global_metrics = (
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
#
2+
# Copyright (C) 2026 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
# use this file except in compliance with the License. You may obtain a copy of
6+
# the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations under
14+
# the License.
15+
#
16+
17+
"""Relative change anomaly detector.
18+
19+
Compares the current metric window value against the mean of the last N
20+
windows and alerts if the change exceeds a percentage threshold, an
21+
absolute threshold, or both. Either threshold triggers the alert.
22+
23+
Example detector specs::
24+
25+
{"type": "RelativeChange", "direction": "decrease", "threshold_pct": 20}
26+
{"type": "RelativeChange", "direction": "increase", "threshold_pct": 500,
27+
"absolute_threshold": 1.0}
28+
{"type": "RelativeChange", "direction": "both", "absolute_threshold": 5.0,
29+
"lookback_windows": 3}
30+
31+
Pipeline integration::
32+
33+
(key, beam.Row) -> RelativeChangeDoFn -> (key, AnomalyResult)
34+
(stateful, per-key)
35+
36+
The DoFn buffers elements using ``TimestampBufferDoFnBag`` and event-time
37+
timers to handle out-of-order arrival (backlog, backfill, polling gaps).
38+
Elements are only processed once the watermark guarantees completeness.
39+
40+
The baseline mean is tracked incrementally using the same algorithm as
41+
``apache_beam.ml.anomaly.univariate.mean.IncSlidingMeanTracker``:
42+
on each push, the mean is updated via ``delta / n`` with O(1)
43+
complexity. The tracker state (mean, n, deque of last N values) is
44+
persisted in the base class's ``EXTRA_STATE`` across timer firings.
45+
"""
46+
47+
import dataclasses
48+
import logging
49+
import math
50+
from collections import deque
51+
from typing import Optional
52+
53+
from apache_beam.ml.anomaly.base import AnomalyPrediction
54+
from apache_beam.ml.anomaly.base import AnomalyResult
55+
56+
from bqmonitor.timestamp_buffer import TimestampBufferDoFnBag
57+
58+
_LOGGER = logging.getLogger(__name__)
59+
60+
_VALID_DIRECTIONS = ('decrease', 'increase', 'both')
61+
62+
63+
@dataclasses.dataclass(frozen=True)
64+
class _RelativeChangeConfig:
65+
"""Configuration for the RelativeChange detector.
66+
67+
At least one of ``threshold_pct`` or ``absolute_threshold`` must be
68+
provided. If both are provided, either one triggers the alert.
69+
"""
70+
direction: str
71+
lookback_windows: int
72+
threshold_pct: Optional[float] = None
73+
absolute_threshold: Optional[float] = None
74+
75+
def __post_init__(self):
76+
if self.direction not in _VALID_DIRECTIONS:
77+
raise ValueError(
78+
f"direction must be one of {_VALID_DIRECTIONS}, "
79+
f"got '{self.direction}'")
80+
if self.threshold_pct is None and self.absolute_threshold is None:
81+
raise ValueError(
82+
"At least one of 'threshold_pct' or 'absolute_threshold' "
83+
"must be provided.")
84+
if self.threshold_pct is not None and self.threshold_pct < 0:
85+
raise ValueError(
86+
f'threshold_pct must be >= 0, got {self.threshold_pct}')
87+
if (self.absolute_threshold is not None
88+
and self.absolute_threshold < 0):
89+
raise ValueError(
90+
f'absolute_threshold must be >= 0, '
91+
f'got {self.absolute_threshold}')
92+
if self.lookback_windows < 1:
93+
raise ValueError(
94+
f'lookback_windows must be >= 1, got {self.lookback_windows}')
95+
96+
97+
# Backward-compatible alias used by pipeline.py.
98+
RelativeChangeConfig = _RelativeChangeConfig
99+
100+
101+
class IncSlidingMeanTracker:
102+
"""Incremental sliding window mean tracker.
103+
104+
Uses the same algorithm as
105+
``apache_beam.ml.anomaly.univariate.mean.IncSlidingMeanTracker``:
106+
maintains a running mean updated via ``delta / n`` on each push,
107+
with O(1) amortized cost. When the window is full, the oldest value
108+
is evicted and the mean is adjusted.
109+
110+
This class is serializable so it can be stored in Beam
111+
ReadModifyWriteState.
112+
"""
113+
114+
def __init__(self, window_size):
115+
self._window_size = window_size
116+
self._queue = deque(maxlen=window_size)
117+
self._mean = 0.0
118+
self._n = 0
119+
120+
def push(self, x):
121+
"""Push a new value, evicting the oldest if the window is full."""
122+
delta = x - self._mean
123+
124+
if len(self._queue) >= self._window_size:
125+
old_x = self._queue.popleft()
126+
self._n -= 1
127+
delta += (self._mean - old_x)
128+
129+
self._queue.append(x)
130+
self._n += 1
131+
132+
if self._n > 0:
133+
self._mean += delta / self._n
134+
else:
135+
self._mean = 0.0
136+
137+
def get(self):
138+
"""Return the current mean, or NaN if empty."""
139+
if self._n < 1:
140+
return float('nan')
141+
return self._mean
142+
143+
@property
144+
def count(self):
145+
return self._n
146+
147+
148+
def _compute_pct_change(current, baseline):
149+
"""Compute percentage change from baseline to current.
150+
151+
Returns (pct_change, is_valid) where is_valid is False when
152+
baseline is zero (pct change is mathematically undefined).
153+
When baseline is zero, use ``absolute_threshold`` to alert.
154+
"""
155+
if baseline == 0:
156+
return (0.0, False)
157+
return ((current - baseline) / abs(baseline) * 100.0, True)
158+
159+
160+
def _check_alert(current, baseline, pct_change, pct_valid,
161+
direction, threshold_pct, absolute_threshold):
162+
"""Check if the change triggers an alert.
163+
164+
Triggers if either the pct threshold or absolute threshold is met.
165+
pct threshold is skipped when pct_valid is False (baseline is zero).
166+
"""
167+
delta = current - baseline
168+
if direction == 'decrease':
169+
pct_hit = (pct_valid and threshold_pct is not None
170+
and pct_change <= -threshold_pct)
171+
abs_hit = (absolute_threshold is not None
172+
and -delta >= absolute_threshold)
173+
elif direction == 'increase':
174+
pct_hit = (pct_valid and threshold_pct is not None
175+
and pct_change >= threshold_pct)
176+
abs_hit = (absolute_threshold is not None
177+
and delta >= absolute_threshold)
178+
else: # both
179+
pct_hit = (pct_valid and threshold_pct is not None
180+
and abs(pct_change) >= threshold_pct)
181+
abs_hit = (absolute_threshold is not None
182+
and abs(delta) >= absolute_threshold)
183+
return pct_hit or abs_hit
184+
185+
186+
class RelativeChangeDoFn(TimestampBufferDoFnBag):
187+
"""Stateful DoFn that detects relative changes between windows.
188+
189+
Subclasses ``TimestampBufferDoFnBag`` for the buffer/timer/trim
190+
machinery. Uses the base class's ``EXTRA_STATE`` to persist the
191+
mean tracker and total-pushed counter across timer firings.
192+
193+
Outputs:
194+
(key, AnomalyResult)
195+
"""
196+
197+
def __init__(self, direction='decrease', threshold_pct=None,
198+
absolute_threshold=None, lookback_windows=1,
199+
batch_interval_sec=5):
200+
if threshold_pct is None and absolute_threshold is None:
201+
raise ValueError(
202+
"At least one of 'threshold_pct' or 'absolute_threshold' "
203+
"must be provided.")
204+
super().__init__(
205+
context_size=0,
206+
batch_interval_sec=batch_interval_sec)
207+
self._direction = direction
208+
self._threshold_pct = threshold_pct
209+
self._absolute_threshold = absolute_threshold
210+
self._lookback_windows = lookback_windows
211+
212+
def process_element(self, key, element_ts, value, context, **extra):
213+
es = extra['extra_state']
214+
state = es.read() or {}
215+
tracker = state.get('tracker') or IncSlidingMeanTracker(
216+
self._lookback_windows)
217+
total_pushed = state.get('total_pushed', 0)
218+
219+
row = value
220+
current_value = row.value
221+
222+
if total_pushed < self._lookback_windows:
223+
prediction = AnomalyPrediction(
224+
model_id='RelativeChange',
225+
score=None,
226+
label=-2,
227+
info=(f'warmup: {total_pushed}'
228+
f'/{self._lookback_windows}'))
229+
result = AnomalyResult(example=row, predictions=[prediction])
230+
yield (key, result)
231+
else:
232+
baseline = tracker.get()
233+
234+
pct_change, is_valid = _compute_pct_change(
235+
current_value, baseline)
236+
237+
is_alert = _check_alert(
238+
current_value, baseline, pct_change, is_valid,
239+
self._direction, self._threshold_pct,
240+
self._absolute_threshold)
241+
242+
info = (f'baseline={baseline:.4f} '
243+
f'current={current_value:.4f} '
244+
f'pct_change={pct_change:.2f}% '
245+
f'abs_delta={current_value - baseline:.4f}')
246+
247+
prediction = AnomalyPrediction(
248+
model_id='RelativeChange',
249+
score=pct_change if not math.isinf(pct_change) else None,
250+
label=1 if is_alert else 0,
251+
info=info)
252+
result = AnomalyResult(example=row, predictions=[prediction])
253+
yield (key, result)
254+
255+
# Push current value into the tracker AFTER scoring, so it
256+
# becomes part of the baseline for future elements.
257+
tracker.push(current_value)
258+
total_pushed += 1
259+
260+
state['tracker'] = tracker
261+
state['total_pushed'] = total_pushed
262+
es.write(state)

0 commit comments

Comments
 (0)