Skip to content

Commit e9be90f

Browse files
feat: support-evaluation-tracking-api (#196)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent ccced7f commit e9be90f

File tree

9 files changed

+578
-27
lines changed

9 files changed

+578
-27
lines changed

flagsmith/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from flagsmith import webhooks
2+
from flagsmith.analytics import PipelineAnalyticsConfig
23
from flagsmith.flagsmith import Flagsmith
34
from flagsmith.version import __version__
45

5-
__all__ = ("Flagsmith", "webhooks", "__version__")
6+
__all__ = ("Flagsmith", "PipelineAnalyticsConfig", "webhooks", "__version__")

flagsmith/analytics.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
1+
import atexit
12
import json
3+
import logging
4+
import threading
5+
import time
26
import typing
7+
from dataclasses import dataclass
38
from datetime import datetime
49

510
from requests_futures.sessions import FuturesSession # type: ignore
611

12+
from flagsmith.version import __version__
13+
14+
logger = logging.getLogger(__name__)
15+
716
ANALYTICS_ENDPOINT: typing.Final[str] = "analytics/flags/"
817

918
# Used to control how often we send data(in seconds)
@@ -60,3 +69,161 @@ def track_feature(self, feature_name: str) -> None:
6069
self.analytics_data[feature_name] = self.analytics_data.get(feature_name, 0) + 1
6170
if (datetime.now() - self._last_flushed).seconds > ANALYTICS_TIMER:
6271
self.flush()
72+
73+
74+
@dataclass
75+
class PipelineAnalyticsConfig:
76+
analytics_server_url: str
77+
max_buffer_items: int = 1000
78+
flush_interval_seconds: float = 10.0
79+
80+
81+
class PipelineAnalyticsProcessor:
82+
"""
83+
Buffered analytics processor that sends per-evaluation and custom events
84+
to the Flagsmith pipeline analytics endpoint in batches.
85+
86+
Evaluation events are deduplicated within each flush window. Events are
87+
flushed periodically via a background timer or when the buffer is full.
88+
"""
89+
90+
def __init__(
91+
self,
92+
config: PipelineAnalyticsConfig,
93+
environment_key: str,
94+
) -> None:
95+
url = config.analytics_server_url
96+
if not url.endswith("/"):
97+
url = f"{url}/"
98+
self._batch_endpoint = f"{url}v1/analytics/batch"
99+
self._environment_key = environment_key
100+
self._max_buffer = config.max_buffer_items
101+
self._flush_interval_seconds = config.flush_interval_seconds
102+
103+
self._buffer: typing.List[typing.Dict[str, typing.Any]] = []
104+
self._dedup_keys: typing.Dict[str, str] = {}
105+
self._lock = threading.Lock()
106+
self._timer: typing.Optional[threading.Timer] = None
107+
108+
def record_evaluation_event(
109+
self,
110+
flag_key: str,
111+
enabled: bool,
112+
value: typing.Any,
113+
identity_identifier: typing.Optional[str] = None,
114+
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
115+
) -> None:
116+
fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}"
117+
should_flush = False
118+
119+
with self._lock:
120+
if self._dedup_keys.get(flag_key) == fingerprint:
121+
return
122+
self._dedup_keys[flag_key] = fingerprint
123+
self._buffer.append(
124+
{
125+
"event_id": flag_key,
126+
"event_type": "flag_evaluation",
127+
"evaluated_at": int(time.time() * 1000),
128+
"identity_identifier": identity_identifier,
129+
"enabled": enabled,
130+
"value": value,
131+
"traits": dict(traits) if traits else None,
132+
"metadata": {"sdk_version": __version__},
133+
}
134+
)
135+
if len(self._buffer) >= self._max_buffer:
136+
should_flush = True
137+
138+
if should_flush:
139+
self.flush()
140+
141+
def record_custom_event(
142+
self,
143+
event_name: str,
144+
identity_identifier: typing.Optional[str] = None,
145+
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
146+
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
147+
) -> None:
148+
should_flush = False
149+
150+
with self._lock:
151+
self._buffer.append(
152+
{
153+
"event_id": event_name,
154+
"event_type": "custom_event",
155+
"evaluated_at": int(time.time() * 1000),
156+
"identity_identifier": identity_identifier,
157+
"enabled": None,
158+
"value": None,
159+
"traits": dict(traits) if traits else None,
160+
"metadata": {**(metadata or {}), "sdk_version": __version__},
161+
}
162+
)
163+
if len(self._buffer) >= self._max_buffer:
164+
should_flush = True
165+
166+
if should_flush:
167+
self.flush()
168+
169+
def flush(self) -> None:
170+
with self._lock:
171+
if not self._buffer:
172+
return
173+
events = self._buffer
174+
self._buffer = []
175+
self._dedup_keys.clear()
176+
177+
payload = json.dumps(
178+
{"events": events, "environment_key": self._environment_key}
179+
)
180+
try:
181+
future = session.post(
182+
self._batch_endpoint,
183+
data=payload,
184+
timeout=3,
185+
headers={
186+
"Content-Type": "application/json; charset=utf-8",
187+
"X-Environment-Key": self._environment_key,
188+
"Flagsmith-SDK-User-Agent": f"flagsmith-python-client/{__version__}",
189+
},
190+
)
191+
except RuntimeError:
192+
logger.debug("Skipping flush: thread pool already shut down")
193+
return
194+
future.add_done_callback(lambda f: self._handle_flush_result(f, events))
195+
196+
def _handle_flush_result(
197+
self,
198+
future: typing.Any,
199+
events: typing.List[typing.Dict[str, typing.Any]],
200+
) -> None:
201+
try:
202+
response = future.result()
203+
response.raise_for_status()
204+
except Exception:
205+
logger.warning(
206+
"Failed to flush pipeline analytics, re-queuing events", exc_info=True
207+
)
208+
with self._lock:
209+
self._buffer = events + self._buffer
210+
self._buffer = self._buffer[: self._max_buffer]
211+
212+
def start(self) -> None:
213+
self._schedule_flush()
214+
atexit.register(self.stop)
215+
216+
def stop(self) -> None:
217+
atexit.unregister(self.stop)
218+
if self._timer is not None:
219+
self._timer.cancel()
220+
self.flush()
221+
222+
def _schedule_flush(self) -> None:
223+
self._timer = threading.Timer(self._flush_interval_seconds, self._timer_flush)
224+
self._timer.daemon = True
225+
self._timer.start()
226+
227+
def _timer_flush(self) -> None:
228+
self.flush()
229+
self._schedule_flush()

flagsmith/flagsmith.py

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@
88
from requests.adapters import HTTPAdapter
99
from urllib3 import Retry
1010

11-
from flagsmith.analytics import AnalyticsProcessor
11+
from flagsmith.analytics import (
12+
AnalyticsProcessor,
13+
PipelineAnalyticsConfig,
14+
PipelineAnalyticsProcessor,
15+
)
1216
from flagsmith.exceptions import FlagsmithAPIError, FlagsmithClientError
1317
from flagsmith.mappers import (
1418
map_context_and_identity_data_to_context,
1519
map_environment_document_to_context,
1620
map_environment_document_to_environment_updated_at,
1721
map_segment_results_to_identity_segments,
22+
resolve_trait_values,
1823
)
1924
from flagsmith.models import DefaultFlag, Flags, Segment
2025
from flagsmith.offline_handlers import OfflineHandler
@@ -63,6 +68,7 @@ def __init__(
6368
environment_refresh_interval_seconds: typing.Union[int, float] = 60,
6469
retries: typing.Optional[Retry] = None,
6570
enable_analytics: bool = False,
71+
pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig] = None,
6672
default_flag_handler: typing.Optional[
6773
typing.Callable[[str], DefaultFlag]
6874
] = None,
@@ -108,6 +114,9 @@ def __init__(
108114
self.default_flag_handler = default_flag_handler
109115
self.enable_realtime_updates = enable_realtime_updates
110116
self._analytics_processor: typing.Optional[AnalyticsProcessor] = None
117+
self._pipeline_analytics_processor: typing.Optional[
118+
PipelineAnalyticsProcessor
119+
] = None
111120
self._evaluation_context: typing.Optional[SDKEvaluationContext] = None
112121
self._environment_updated_at: typing.Optional[datetime] = None
113122

@@ -170,10 +179,28 @@ def __init__(
170179

171180
self._initialise_local_evaluation()
172181

173-
if enable_analytics:
174-
self._analytics_processor = AnalyticsProcessor(
175-
environment_key, self.api_url, timeout=self.request_timeout_seconds
176-
)
182+
self._initialise_analytics(
183+
environment_key=environment_key,
184+
enable_analytics=enable_analytics,
185+
pipeline_analytics_config=pipeline_analytics_config,
186+
)
187+
188+
def _initialise_analytics(
189+
self,
190+
environment_key: str,
191+
enable_analytics: bool,
192+
pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig],
193+
) -> None:
194+
if enable_analytics:
195+
self._analytics_processor = AnalyticsProcessor(
196+
environment_key, self.api_url, timeout=self.request_timeout_seconds
197+
)
198+
if pipeline_analytics_config:
199+
self._pipeline_analytics_processor = PipelineAnalyticsProcessor(
200+
config=pipeline_analytics_config,
201+
environment_key=environment_key,
202+
)
203+
self._pipeline_analytics_processor.start()
177204

178205
def _initialise_local_evaluation(self) -> None:
179206
# To ensure that the environment is set before allowing subsequent
@@ -290,6 +317,25 @@ def get_identity_segments(
290317

291318
return map_segment_results_to_identity_segments(evaluation_result["segments"])
292319

320+
def track_event(
321+
self,
322+
event_name: str,
323+
identity_identifier: typing.Optional[str] = None,
324+
traits: typing.Optional[TraitMapping] = None,
325+
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
326+
) -> None:
327+
if not self._pipeline_analytics_processor:
328+
raise ValueError(
329+
"Pipeline analytics is not configured. "
330+
"Provide pipeline_analytics_config to use track_event."
331+
)
332+
self._pipeline_analytics_processor.record_custom_event(
333+
event_name=event_name,
334+
identity_identifier=identity_identifier,
335+
traits=resolve_trait_values(traits),
336+
metadata=metadata,
337+
)
338+
293339
def update_environment(self) -> None:
294340
try:
295341
environment_data = self._get_json_response(
@@ -345,6 +391,7 @@ def _get_environment_flags_from_document(self) -> Flags:
345391
evaluation_result=evaluation_result,
346392
analytics_processor=self._analytics_processor,
347393
default_flag_handler=self.default_flag_handler,
394+
pipeline_analytics_processor=self._pipeline_analytics_processor,
348395
)
349396

350397
def _get_identity_flags_from_document(
@@ -368,6 +415,9 @@ def _get_identity_flags_from_document(
368415
evaluation_result=evaluation_result,
369416
analytics_processor=self._analytics_processor,
370417
default_flag_handler=self.default_flag_handler,
418+
pipeline_analytics_processor=self._pipeline_analytics_processor,
419+
identity_identifier=identifier,
420+
traits=resolve_trait_values(traits),
371421
)
372422

373423
def _get_environment_flags_from_api(self) -> Flags:
@@ -379,6 +429,7 @@ def _get_environment_flags_from_api(self) -> Flags:
379429
api_flags=json_response,
380430
analytics_processor=self._analytics_processor,
381431
default_flag_handler=self.default_flag_handler,
432+
pipeline_analytics_processor=self._pipeline_analytics_processor,
382433
)
383434
except FlagsmithAPIError:
384435
if self.offline_handler:
@@ -411,6 +462,9 @@ def _get_identity_flags_from_api(
411462
api_flags=json_response["flags"],
412463
analytics_processor=self._analytics_processor,
413464
default_flag_handler=self.default_flag_handler,
465+
pipeline_analytics_processor=self._pipeline_analytics_processor,
466+
identity_identifier=identifier,
467+
traits=resolve_trait_values(traits),
414468
)
415469
except FlagsmithAPIError:
416470
if self.offline_handler:
@@ -443,3 +497,6 @@ def __del__(self) -> None:
443497

444498
if hasattr(self, "event_stream_thread"):
445499
self.event_stream_thread.stop()
500+
501+
if self._pipeline_analytics_processor:
502+
self._pipeline_analytics_processor.stop()

flagsmith/mappers.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
StrValueSegmentCondition,
1313
)
1414
from flag_engine.result.types import SegmentResult
15-
from flag_engine.segments.types import ContextValue
1615

1716
from flagsmith.api.types import (
1817
EnvironmentModel,
@@ -26,7 +25,7 @@
2625
SDKEvaluationContext,
2726
SegmentMetadata,
2827
StreamEvent,
29-
TraitConfig,
28+
TraitMapping,
3029
)
3130
from flagsmith.utils.datetime import fromisoformat
3231

@@ -75,31 +74,27 @@ def map_environment_document_to_environment_updated_at(
7574
return updated_at.astimezone(tz=timezone.utc)
7675

7776

77+
def resolve_trait_values(
78+
traits: typing.Optional[TraitMapping],
79+
) -> typing.Optional[typing.Dict[str, typing.Any]]:
80+
if not traits:
81+
return None
82+
return {
83+
key: (val["value"] if isinstance(val, dict) else val)
84+
for key, val in traits.items()
85+
}
86+
87+
7888
def map_context_and_identity_data_to_context(
7989
context: SDKEvaluationContext,
8090
identifier: str,
81-
traits: typing.Optional[
82-
typing.Mapping[
83-
str,
84-
typing.Union[
85-
ContextValue,
86-
TraitConfig,
87-
],
88-
]
89-
],
91+
traits: typing.Optional[TraitMapping] = None,
9092
) -> SDKEvaluationContext:
9193
return {
9294
**context,
9395
"identity": {
9496
"identifier": identifier,
95-
"traits": {
96-
trait_key: (
97-
trait_value_or_config["value"]
98-
if isinstance(trait_value_or_config, dict)
99-
else trait_value_or_config
100-
)
101-
for trait_key, trait_value_or_config in (traits or {}).items()
102-
},
97+
"traits": resolve_trait_values(traits) or {},
10398
},
10499
}
105100

0 commit comments

Comments
 (0)