Skip to content

Commit 94d080d

Browse files
committed
Commit
1 parent fd3691e commit 94d080d

16 files changed

Lines changed: 987 additions & 43 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ __marimo__/
220220
**/.tusk/traces/
221221
**/.tusk/logs/
222222

223+
# Bug tracking
224+
**/BUG_TRACKING.md
225+
223226
# macOS
224227
.DS_Store
225228

drift/core/adaptive_sampling.py

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
"""Adaptive sampling controller for inbound root-request admission."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
import math
7+
import random
8+
import time
9+
from dataclasses import dataclass
10+
from typing import Literal
11+
12+
logger = logging.getLogger(__name__)
13+
14+
SamplingMode = Literal["fixed", "adaptive"]
15+
AdaptiveSamplingState = Literal["fixed", "healthy", "warm", "hot", "critical_pause"]
16+
RootSamplingDecisionReason = Literal[
17+
"pre_app_start",
18+
"sampled",
19+
"not_sampled",
20+
"load_shed",
21+
"critical_pause",
22+
]
23+
24+
25+
@dataclass
26+
class ResolvedSamplingConfig:
27+
mode: SamplingMode
28+
base_rate: float
29+
min_rate: float
30+
31+
32+
@dataclass
33+
class AdaptiveSamplingHealthSnapshot:
34+
queue_fill_ratio: float | None = None
35+
dropped_span_count: int = 0
36+
export_failure_count: int = 0
37+
export_circuit_open: bool = False
38+
memory_pressure_ratio: float | None = None
39+
40+
41+
@dataclass
42+
class RootSamplingDecision:
43+
should_record: bool
44+
reason: RootSamplingDecisionReason
45+
mode: SamplingMode
46+
state: AdaptiveSamplingState
47+
base_rate: float
48+
min_rate: float
49+
effective_rate: float
50+
admission_multiplier: float
51+
52+
53+
def _clamp(value: float, min_value: float, max_value: float) -> float:
54+
return min(max_value, max(min_value, value))
55+
56+
57+
def _clamp01(value: float) -> float:
58+
return _clamp(value, 0.0, 1.0)
59+
60+
61+
def _normalize_between(value: float | None, zero_point: float, one_point: float) -> float:
62+
if value is None or one_point <= zero_point:
63+
return 0.0
64+
return _clamp01((value - zero_point) / (one_point - zero_point))
65+
66+
67+
class AdaptiveSamplingController:
68+
def __init__(
69+
self,
70+
config: ResolvedSamplingConfig,
71+
*,
72+
random_fn=random.random,
73+
now_fn=time.monotonic,
74+
) -> None:
75+
self._config = config
76+
self._random_fn = random_fn
77+
self._now_fn = now_fn
78+
79+
self._admission_multiplier = 1.0
80+
self._state: AdaptiveSamplingState = "fixed" if config.mode == "fixed" else "healthy"
81+
self._paused_until_s = 0.0
82+
self._last_updated_at_s = 0.0
83+
self._last_decrease_at_s = 0.0
84+
85+
self._prev_dropped_span_count = 0
86+
self._prev_export_failure_count = 0
87+
88+
self._queue_fill_ewma: float | None = None
89+
self._recent_drop_signal = 0.0
90+
self._recent_failure_signal = 0.0
91+
92+
def update(self, snapshot: AdaptiveSamplingHealthSnapshot) -> None:
93+
if self._config.mode != "adaptive":
94+
self._state = "fixed"
95+
self._admission_multiplier = 1.0
96+
return
97+
98+
now_s = self._now_fn()
99+
elapsed_s = 2.0 if self._last_updated_at_s == 0 else max(0.001, now_s - self._last_updated_at_s)
100+
self._last_updated_at_s = now_s
101+
102+
decay = math.exp(-(elapsed_s * 1000.0) / 30000.0)
103+
self._recent_drop_signal *= decay
104+
self._recent_failure_signal *= decay
105+
106+
dropped_delta = max(0, snapshot.dropped_span_count - self._prev_dropped_span_count)
107+
export_failure_delta = max(0, snapshot.export_failure_count - self._prev_export_failure_count)
108+
109+
self._prev_dropped_span_count = snapshot.dropped_span_count
110+
self._prev_export_failure_count = snapshot.export_failure_count
111+
112+
self._recent_drop_signal += dropped_delta
113+
self._recent_failure_signal += export_failure_delta
114+
115+
if snapshot.queue_fill_ratio is not None:
116+
queue_fill_ratio = _clamp01(snapshot.queue_fill_ratio)
117+
self._queue_fill_ewma = (
118+
queue_fill_ratio
119+
if self._queue_fill_ewma is None
120+
else (0.25 * queue_fill_ratio) + (0.75 * self._queue_fill_ewma)
121+
)
122+
123+
queue_pressure = _normalize_between(self._queue_fill_ewma, 0.20, 0.85)
124+
memory_pressure = _normalize_between(snapshot.memory_pressure_ratio, 0.80, 0.92)
125+
export_failure_pressure = _clamp01(self._recent_failure_signal / 5.0)
126+
pressure = max(queue_pressure, memory_pressure, export_failure_pressure)
127+
128+
hard_brake = (
129+
dropped_delta > 0 or snapshot.export_circuit_open or (snapshot.memory_pressure_ratio or 0.0) >= 0.92
130+
)
131+
132+
previous_state = self._state
133+
previous_multiplier = self._admission_multiplier
134+
135+
if hard_brake:
136+
self._paused_until_s = now_s + 15.0
137+
self._admission_multiplier = 0.0
138+
self._state = "critical_pause"
139+
self._last_decrease_at_s = now_s
140+
self._log_transition(previous_state, previous_multiplier, pressure, snapshot)
141+
return
142+
143+
if now_s < self._paused_until_s:
144+
self._state = "critical_pause"
145+
self._log_transition(previous_state, previous_multiplier, pressure, snapshot)
146+
return
147+
148+
min_multiplier = self._get_min_multiplier()
149+
if pressure >= 0.70:
150+
self._admission_multiplier = max(min_multiplier, self._admission_multiplier * 0.4)
151+
self._state = "hot"
152+
self._last_decrease_at_s = now_s
153+
elif pressure >= 0.45:
154+
self._admission_multiplier = max(min_multiplier, self._admission_multiplier * 0.7)
155+
self._state = "warm"
156+
self._last_decrease_at_s = now_s
157+
else:
158+
if pressure <= 0.20 and (now_s - self._last_decrease_at_s) >= 10.0:
159+
self._admission_multiplier = min(1.0, self._admission_multiplier + 0.05)
160+
self._state = "healthy"
161+
162+
self._log_transition(previous_state, previous_multiplier, pressure, snapshot)
163+
164+
def get_decision(self, *, is_pre_app_start: bool) -> RootSamplingDecision:
165+
if is_pre_app_start:
166+
return RootSamplingDecision(
167+
should_record=True,
168+
reason="pre_app_start",
169+
mode=self._config.mode,
170+
state=self._state,
171+
base_rate=self._config.base_rate,
172+
min_rate=self._config.min_rate,
173+
effective_rate=1.0,
174+
admission_multiplier=1.0,
175+
)
176+
177+
effective_rate = (
178+
self.get_effective_sampling_rate() if self._config.mode == "adaptive" else _clamp01(self._config.base_rate)
179+
)
180+
181+
if effective_rate <= 0.0:
182+
return RootSamplingDecision(
183+
should_record=False,
184+
reason="critical_pause" if self._state == "critical_pause" else "not_sampled",
185+
mode=self._config.mode,
186+
state=self._state,
187+
base_rate=self._config.base_rate,
188+
min_rate=self._config.min_rate,
189+
effective_rate=effective_rate,
190+
admission_multiplier=self._admission_multiplier,
191+
)
192+
193+
should_record = self._random_fn() < effective_rate
194+
return RootSamplingDecision(
195+
should_record=should_record,
196+
reason=(
197+
"sampled"
198+
if should_record
199+
else "load_shed"
200+
if self._config.mode == "adaptive" and effective_rate < self._config.base_rate
201+
else "not_sampled"
202+
),
203+
mode=self._config.mode,
204+
state=self._state,
205+
base_rate=self._config.base_rate,
206+
min_rate=self._config.min_rate,
207+
effective_rate=effective_rate,
208+
admission_multiplier=self._admission_multiplier if self._config.mode == "adaptive" else 1.0,
209+
)
210+
211+
def get_effective_sampling_rate(self) -> float:
212+
if self._config.mode != "adaptive":
213+
return _clamp01(self._config.base_rate)
214+
if self._state == "critical_pause" and self._now_fn() < self._paused_until_s:
215+
return 0.0
216+
effective_rate = self._config.base_rate * self._admission_multiplier
217+
return _clamp(
218+
effective_rate,
219+
min(self._config.base_rate, self._config.min_rate),
220+
self._config.base_rate,
221+
)
222+
223+
def _get_min_multiplier(self) -> float:
224+
if self._config.base_rate <= 0.0 or self._config.min_rate <= 0.0:
225+
return 0.0
226+
return _clamp01(self._config.min_rate / self._config.base_rate)
227+
228+
def _log_transition(
229+
self,
230+
previous_state: AdaptiveSamplingState,
231+
previous_multiplier: float,
232+
pressure: float,
233+
snapshot: AdaptiveSamplingHealthSnapshot,
234+
) -> None:
235+
if previous_state == self._state and abs(previous_multiplier - self._admission_multiplier) < 0.05:
236+
return
237+
238+
logger.info(
239+
"Adaptive sampling updated (state=%s, multiplier=%.2f, effective_rate=%.4f, pressure=%.2f, queue_fill=%s, memory_pressure_ratio=%s, export_circuit_open=%s).",
240+
self._state,
241+
self._admission_multiplier,
242+
self.get_effective_sampling_rate(),
243+
pressure,
244+
f"{self._queue_fill_ewma:.2f}" if self._queue_fill_ewma is not None else "n/a",
245+
snapshot.memory_pressure_ratio if snapshot.memory_pressure_ratio is not None else "n/a",
246+
snapshot.export_circuit_open,
247+
)

drift/core/batch_processor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,8 @@ def queue_size(self) -> int:
244244
def dropped_span_count(self) -> int:
245245
"""Get the number of dropped spans."""
246246
return self._dropped_spans
247+
248+
@property
249+
def max_queue_size(self) -> int:
250+
"""Get the configured maximum queue size."""
251+
return self._config.max_queue_size

drift/core/config.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,21 @@ class ComparisonConfig:
6666
ignore_fields: list[str] = field(default_factory=list)
6767

6868

69+
@dataclass
70+
class SamplingConfig:
71+
"""Configuration for fixed vs adaptive sampling."""
72+
73+
mode: str | None = None
74+
base_rate: float | None = None
75+
min_rate: float | None = None
76+
77+
6978
@dataclass
7079
class RecordingConfig:
7180
"""Configuration for recording behavior."""
7281

7382
sampling_rate: float | None = None
83+
sampling: SamplingConfig | None = None
7484
export_spans: bool | None = None
7585
enable_env_var_recording: bool | None = None
7686
enable_analytics: bool | None = None
@@ -144,8 +154,42 @@ def _parse_recording_config(data: dict[str, Any]) -> RecordingConfig:
144154
)
145155
sampling_rate = None
146156

157+
sampling = None
158+
raw_sampling = data.get("sampling")
159+
if isinstance(raw_sampling, dict):
160+
base_rate = raw_sampling.get("base_rate")
161+
if base_rate is not None and not isinstance(base_rate, (int, float)):
162+
logger.warning(
163+
f"Invalid 'sampling.base_rate' in config: expected number, got {type(base_rate).__name__}. "
164+
"This value will be ignored."
165+
)
166+
base_rate = None
167+
168+
min_rate = raw_sampling.get("min_rate")
169+
if min_rate is not None and not isinstance(min_rate, (int, float)):
170+
logger.warning(
171+
f"Invalid 'sampling.min_rate' in config: expected number, got {type(min_rate).__name__}. "
172+
"This value will be ignored."
173+
)
174+
min_rate = None
175+
176+
mode = raw_sampling.get("mode")
177+
if mode is not None and not isinstance(mode, str):
178+
logger.warning(
179+
f"Invalid 'sampling.mode' in config: expected string, got {type(mode).__name__}. "
180+
"This value will be ignored."
181+
)
182+
mode = None
183+
184+
sampling = SamplingConfig(
185+
mode=mode,
186+
base_rate=float(base_rate) if base_rate is not None else None,
187+
min_rate=float(min_rate) if min_rate is not None else None,
188+
)
189+
147190
return RecordingConfig(
148191
sampling_rate=sampling_rate,
192+
sampling=sampling,
149193
export_spans=data.get("export_spans"),
150194
enable_env_var_recording=data.get("enable_env_var_recording"),
151195
enable_analytics=data.get("enable_analytics"),

0 commit comments

Comments
 (0)