Skip to content

Commit 976a86e

Browse files
committed
Fixes
1 parent 94d080d commit 976a86e

6 files changed

Lines changed: 494 additions & 137 deletions

File tree

drift/core/adaptive_sampling.py

Lines changed: 119 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import math
77
import random
8+
import threading
89
import time
910
from dataclasses import dataclass
1011
from typing import Literal
@@ -75,6 +76,7 @@ def __init__(
7576
self._config = config
7677
self._random_fn = random_fn
7778
self._now_fn = now_fn
79+
self._lock = threading.RLock()
7880

7981
self._admission_multiplier = 1.0
8082
self._state: AdaptiveSamplingState = "fixed" if config.mode == "fixed" else "healthy"
@@ -90,135 +92,140 @@ def __init__(
9092
self._recent_failure_signal = 0.0
9193

9294
def update(self, snapshot: AdaptiveSamplingHealthSnapshot) -> None:
93-
if self._config.mode != "adaptive":
94-
self._state = "fixed"
95-
self._admission_multiplier = 1.0
96-
return
97-
98-
now_s = self._now_fn()
99-
elapsed_s = 2.0 if self._last_updated_at_s == 0 else max(0.001, now_s - self._last_updated_at_s)
100-
self._last_updated_at_s = now_s
101-
102-
decay = math.exp(-(elapsed_s * 1000.0) / 30000.0)
103-
self._recent_drop_signal *= decay
104-
self._recent_failure_signal *= decay
105-
106-
dropped_delta = max(0, snapshot.dropped_span_count - self._prev_dropped_span_count)
107-
export_failure_delta = max(0, snapshot.export_failure_count - self._prev_export_failure_count)
108-
109-
self._prev_dropped_span_count = snapshot.dropped_span_count
110-
self._prev_export_failure_count = snapshot.export_failure_count
111-
112-
self._recent_drop_signal += dropped_delta
113-
self._recent_failure_signal += export_failure_delta
114-
115-
if snapshot.queue_fill_ratio is not None:
116-
queue_fill_ratio = _clamp01(snapshot.queue_fill_ratio)
117-
self._queue_fill_ewma = (
118-
queue_fill_ratio
119-
if self._queue_fill_ewma is None
120-
else (0.25 * queue_fill_ratio) + (0.75 * self._queue_fill_ewma)
95+
with self._lock:
96+
if self._config.mode != "adaptive":
97+
self._state = "fixed"
98+
self._admission_multiplier = 1.0
99+
return
100+
101+
now_s = self._now_fn()
102+
elapsed_s = 2.0 if self._last_updated_at_s == 0 else max(0.001, now_s - self._last_updated_at_s)
103+
self._last_updated_at_s = now_s
104+
105+
decay = math.exp(-(elapsed_s * 1000.0) / 30000.0)
106+
self._recent_drop_signal *= decay
107+
self._recent_failure_signal *= decay
108+
109+
dropped_delta = max(0, snapshot.dropped_span_count - self._prev_dropped_span_count)
110+
export_failure_delta = max(0, snapshot.export_failure_count - self._prev_export_failure_count)
111+
112+
self._prev_dropped_span_count = snapshot.dropped_span_count
113+
self._prev_export_failure_count = snapshot.export_failure_count
114+
115+
self._recent_drop_signal += dropped_delta
116+
self._recent_failure_signal += export_failure_delta
117+
118+
if snapshot.queue_fill_ratio is not None:
119+
queue_fill_ratio = _clamp01(snapshot.queue_fill_ratio)
120+
self._queue_fill_ewma = (
121+
queue_fill_ratio
122+
if self._queue_fill_ewma is None
123+
else (0.25 * queue_fill_ratio) + (0.75 * self._queue_fill_ewma)
124+
)
125+
126+
queue_pressure = _normalize_between(self._queue_fill_ewma, 0.20, 0.85)
127+
memory_pressure = _normalize_between(snapshot.memory_pressure_ratio, 0.80, 0.92)
128+
export_failure_pressure = _clamp01(self._recent_failure_signal / 5.0)
129+
pressure = max(queue_pressure, memory_pressure, export_failure_pressure)
130+
131+
hard_brake = (
132+
dropped_delta > 0 or snapshot.export_circuit_open or (snapshot.memory_pressure_ratio or 0.0) >= 0.92
121133
)
122134

123-
queue_pressure = _normalize_between(self._queue_fill_ewma, 0.20, 0.85)
124-
memory_pressure = _normalize_between(snapshot.memory_pressure_ratio, 0.80, 0.92)
125-
export_failure_pressure = _clamp01(self._recent_failure_signal / 5.0)
126-
pressure = max(queue_pressure, memory_pressure, export_failure_pressure)
135+
previous_state = self._state
136+
previous_multiplier = self._admission_multiplier
137+
138+
if hard_brake:
139+
self._paused_until_s = now_s + 15.0
140+
self._admission_multiplier = 0.0
141+
self._state = "critical_pause"
142+
self._last_decrease_at_s = now_s
143+
self._log_transition(previous_state, previous_multiplier, pressure, snapshot)
144+
return
145+
146+
if now_s < self._paused_until_s:
147+
self._state = "critical_pause"
148+
self._log_transition(previous_state, previous_multiplier, pressure, snapshot)
149+
return
150+
151+
min_multiplier = self._get_min_multiplier()
152+
if pressure >= 0.70:
153+
self._admission_multiplier = max(min_multiplier, self._admission_multiplier * 0.4)
154+
self._state = "hot"
155+
self._last_decrease_at_s = now_s
156+
elif pressure >= 0.45:
157+
self._admission_multiplier = max(min_multiplier, self._admission_multiplier * 0.7)
158+
self._state = "warm"
159+
self._last_decrease_at_s = now_s
160+
else:
161+
if pressure <= 0.20 and (now_s - self._last_decrease_at_s) >= 10.0:
162+
self._admission_multiplier = min(1.0, self._admission_multiplier + 0.05)
163+
self._state = "healthy"
127164

128-
hard_brake = (
129-
dropped_delta > 0 or snapshot.export_circuit_open or (snapshot.memory_pressure_ratio or 0.0) >= 0.92
130-
)
131-
132-
previous_state = self._state
133-
previous_multiplier = self._admission_multiplier
134-
135-
if hard_brake:
136-
self._paused_until_s = now_s + 15.0
137-
self._admission_multiplier = 0.0
138-
self._state = "critical_pause"
139-
self._last_decrease_at_s = now_s
140165
self._log_transition(previous_state, previous_multiplier, pressure, snapshot)
141-
return
142-
143-
if now_s < self._paused_until_s:
144-
self._state = "critical_pause"
145-
self._log_transition(previous_state, previous_multiplier, pressure, snapshot)
146-
return
147-
148-
min_multiplier = self._get_min_multiplier()
149-
if pressure >= 0.70:
150-
self._admission_multiplier = max(min_multiplier, self._admission_multiplier * 0.4)
151-
self._state = "hot"
152-
self._last_decrease_at_s = now_s
153-
elif pressure >= 0.45:
154-
self._admission_multiplier = max(min_multiplier, self._admission_multiplier * 0.7)
155-
self._state = "warm"
156-
self._last_decrease_at_s = now_s
157-
else:
158-
if pressure <= 0.20 and (now_s - self._last_decrease_at_s) >= 10.0:
159-
self._admission_multiplier = min(1.0, self._admission_multiplier + 0.05)
160-
self._state = "healthy"
161-
162-
self._log_transition(previous_state, previous_multiplier, pressure, snapshot)
163166

164167
def get_decision(self, *, is_pre_app_start: bool) -> RootSamplingDecision:
165-
if is_pre_app_start:
166-
return RootSamplingDecision(
167-
should_record=True,
168-
reason="pre_app_start",
169-
mode=self._config.mode,
170-
state=self._state,
171-
base_rate=self._config.base_rate,
172-
min_rate=self._config.min_rate,
173-
effective_rate=1.0,
174-
admission_multiplier=1.0,
168+
with self._lock:
169+
if is_pre_app_start:
170+
return RootSamplingDecision(
171+
should_record=True,
172+
reason="pre_app_start",
173+
mode=self._config.mode,
174+
state=self._state,
175+
base_rate=self._config.base_rate,
176+
min_rate=self._config.min_rate,
177+
effective_rate=1.0,
178+
admission_multiplier=1.0,
179+
)
180+
181+
effective_rate = (
182+
self.get_effective_sampling_rate()
183+
if self._config.mode == "adaptive"
184+
else _clamp01(self._config.base_rate)
175185
)
176186

177-
effective_rate = (
178-
self.get_effective_sampling_rate() if self._config.mode == "adaptive" else _clamp01(self._config.base_rate)
179-
)
180-
181-
if effective_rate <= 0.0:
187+
if effective_rate <= 0.0:
188+
return RootSamplingDecision(
189+
should_record=False,
190+
reason="critical_pause" if self._state == "critical_pause" else "not_sampled",
191+
mode=self._config.mode,
192+
state=self._state,
193+
base_rate=self._config.base_rate,
194+
min_rate=self._config.min_rate,
195+
effective_rate=effective_rate,
196+
admission_multiplier=self._admission_multiplier,
197+
)
198+
199+
should_record = self._random_fn() < effective_rate
182200
return RootSamplingDecision(
183-
should_record=False,
184-
reason="critical_pause" if self._state == "critical_pause" else "not_sampled",
201+
should_record=should_record,
202+
reason=(
203+
"sampled"
204+
if should_record
205+
else "load_shed"
206+
if self._config.mode == "adaptive" and effective_rate < self._config.base_rate
207+
else "not_sampled"
208+
),
185209
mode=self._config.mode,
186210
state=self._state,
187211
base_rate=self._config.base_rate,
188212
min_rate=self._config.min_rate,
189213
effective_rate=effective_rate,
190-
admission_multiplier=self._admission_multiplier,
214+
admission_multiplier=self._admission_multiplier if self._config.mode == "adaptive" else 1.0,
191215
)
192216

193-
should_record = self._random_fn() < effective_rate
194-
return RootSamplingDecision(
195-
should_record=should_record,
196-
reason=(
197-
"sampled"
198-
if should_record
199-
else "load_shed"
200-
if self._config.mode == "adaptive" and effective_rate < self._config.base_rate
201-
else "not_sampled"
202-
),
203-
mode=self._config.mode,
204-
state=self._state,
205-
base_rate=self._config.base_rate,
206-
min_rate=self._config.min_rate,
207-
effective_rate=effective_rate,
208-
admission_multiplier=self._admission_multiplier if self._config.mode == "adaptive" else 1.0,
209-
)
210-
211217
def get_effective_sampling_rate(self) -> float:
212-
if self._config.mode != "adaptive":
213-
return _clamp01(self._config.base_rate)
214-
if self._state == "critical_pause" and self._now_fn() < self._paused_until_s:
215-
return 0.0
216-
effective_rate = self._config.base_rate * self._admission_multiplier
217-
return _clamp(
218-
effective_rate,
219-
min(self._config.base_rate, self._config.min_rate),
220-
self._config.base_rate,
221-
)
218+
with self._lock:
219+
if self._config.mode != "adaptive":
220+
return _clamp01(self._config.base_rate)
221+
if self._state == "critical_pause" and self._now_fn() < self._paused_until_s:
222+
return 0.0
223+
effective_rate = self._config.base_rate * self._admission_multiplier
224+
return _clamp(
225+
effective_rate,
226+
min(self._config.base_rate, self._config.min_rate),
227+
self._config.base_rate,
228+
)
222229

223230
def _get_min_multiplier(self) -> float:
224231
if self._config.base_rate <= 0.0 or self._config.min_rate <= 0.0:

drift/core/drift_sdk.py

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -352,13 +352,14 @@ def _determine_sampling_config(self, init_param: float | None) -> ResolvedSampli
352352
config_sampling.mode,
353353
)
354354

355-
base_rate = 1.0
355+
base_rate: float | None = None
356356
if init_param is not None:
357357
validated = validate_sampling_rate(init_param, "init params")
358358
if validated is not None:
359359
logger.debug(f"Using sampling rate from init params: {validated}")
360360
base_rate = validated
361-
else:
361+
362+
if base_rate is None:
362363
env_rate = os.environ.get("TUSK_SAMPLING_RATE")
363364
if env_rate is not None:
364365
try:
@@ -369,20 +370,22 @@ def _determine_sampling_config(self, init_param: float | None) -> ResolvedSampli
369370
base_rate = validated
370371
except ValueError:
371372
logger.warning(f"Invalid TUSK_SAMPLING_RATE env var: {env_rate}")
372-
elif config_sampling and config_sampling.base_rate is not None:
373-
validated = validate_sampling_rate(
374-
config_sampling.base_rate, "config file recording.sampling.base_rate"
375-
)
376-
if validated is not None:
377-
base_rate = validated
378-
elif recording_config and recording_config.sampling_rate is not None:
379-
validated = validate_sampling_rate(
380-
recording_config.sampling_rate, "config file recording.sampling_rate"
381-
)
382-
if validated is not None:
383-
base_rate = validated
384-
else:
385-
logger.debug("Using default sampling rate: 1.0")
373+
374+
if base_rate is None and config_sampling and config_sampling.base_rate is not None:
375+
validated = validate_sampling_rate(config_sampling.base_rate, "config file recording.sampling.base_rate")
376+
if validated is not None:
377+
logger.debug(f"Using sampling rate from config file recording.sampling.base_rate: {validated}")
378+
base_rate = validated
379+
380+
if base_rate is None and recording_config and recording_config.sampling_rate is not None:
381+
validated = validate_sampling_rate(recording_config.sampling_rate, "config file recording.sampling_rate")
382+
if validated is not None:
383+
logger.debug(f"Using sampling rate from config file recording.sampling_rate: {validated}")
384+
base_rate = validated
385+
386+
if base_rate is None:
387+
logger.debug("Using default sampling rate: 1.0")
388+
base_rate = 1.0
386389

387390
min_rate = 0.0
388391
if mode == "adaptive":
@@ -423,11 +426,17 @@ def _start_adaptive_sampling_control_loop(self) -> None:
423426
name="drift-adaptive-sampling",
424427
)
425428
self._adaptive_sampling_thread.start()
426-
self._update_adaptive_sampling_health()
429+
self._safe_update_adaptive_sampling_health()
427430

428431
def _adaptive_sampling_loop(self) -> None:
429432
while not self._adaptive_sampling_stop_event.wait(timeout=2.0):
433+
self._safe_update_adaptive_sampling_health()
434+
435+
def _safe_update_adaptive_sampling_health(self) -> None:
436+
try:
430437
self._update_adaptive_sampling_health()
438+
except Exception:
439+
logger.error("Adaptive sampling health update failed; keeping previous controller state.", exc_info=True)
431440

432441
def _update_adaptive_sampling_health(self) -> None:
433442
if self._adaptive_sampling_controller is None:
@@ -484,14 +493,52 @@ def _get_memory_pressure_ratio(self) -> float | None:
484493
if cgroup_v1_current is not None:
485494
return cgroup_v1_current / self._effective_memory_limit_bytes
486495

496+
current_rss_bytes = self._read_current_rss_bytes()
497+
if current_rss_bytes is not None:
498+
return current_rss_bytes / self._effective_memory_limit_bytes
499+
500+
return None
501+
502+
@staticmethod
503+
def _parse_proc_status_rss_bytes(raw_status: str) -> int | None:
504+
for line in raw_status.splitlines():
505+
if not line.startswith("VmRSS:"):
506+
continue
507+
508+
parts = line.split()
509+
if len(parts) < 3 or parts[2].lower() != "kb":
510+
return None
511+
512+
return int(parts[1]) * 1024
513+
514+
return None
515+
516+
@staticmethod
517+
def _parse_proc_statm_rss_bytes(raw_statm: str, page_size: int) -> int | None:
518+
fields = raw_statm.split()
519+
if len(fields) < 2:
520+
return None
521+
522+
return int(fields[1]) * page_size
523+
524+
def _read_current_rss_bytes(self) -> int | None:
487525
try:
488-
import resource
526+
proc_status_path = Path("/proc/self/status")
527+
if proc_status_path.exists():
528+
parsed = self._parse_proc_status_rss_bytes(proc_status_path.read_text())
529+
if parsed is not None:
530+
return parsed
531+
except Exception:
532+
pass
489533

490-
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
491-
rss_bytes = rss if platform.system() == "Darwin" else rss * 1024
492-
return rss_bytes / self._effective_memory_limit_bytes
534+
try:
535+
proc_statm_path = Path("/proc/self/statm")
536+
if proc_statm_path.exists():
537+
return self._parse_proc_statm_rss_bytes(proc_statm_path.read_text(), int(os.sysconf("SC_PAGE_SIZE")))
493538
except Exception:
494-
return None
539+
pass
540+
541+
return None
495542

496543
def _read_numeric_control_file(self, path: str) -> int | None:
497544
try:

0 commit comments

Comments
 (0)