-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathanalytics.py
More file actions
229 lines (195 loc) · 7.53 KB
/
analytics.py
File metadata and controls
229 lines (195 loc) · 7.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import atexit
import json
import logging
import threading
import time
import typing
from dataclasses import dataclass
from datetime import datetime
from requests_futures.sessions import FuturesSession # type: ignore
from flagsmith.version import __version__
logger = logging.getLogger(__name__)
ANALYTICS_ENDPOINT: typing.Final[str] = "analytics/flags/"
# Used to control how often we send data(in seconds)
ANALYTICS_TIMER: typing.Final[int] = 10
session = FuturesSession(max_workers=4)
class AnalyticsProcessor:
"""
AnalyticsProcessor is used to track how often individual Flags are evaluated within
the Flagsmith SDK. Docs: https://docs.flagsmith.com/advanced-use/flag-analytics.
"""
def __init__(
self, environment_key: str, base_api_url: str, timeout: typing.Optional[int] = 3
):
"""
Initialise the AnalyticsProcessor to handle sending analytics on flag usage to
the Flagsmith API.
:param environment_key: environment key obtained from the Flagsmith UI
:param base_api_url: base api url to override when using self hosted version
:param timeout: used to tell requests to stop waiting for a response after a
given number of seconds
"""
self.analytics_endpoint = base_api_url + ANALYTICS_ENDPOINT
self.environment_key = environment_key
self._last_flushed = datetime.now()
self.analytics_data: typing.MutableMapping[str, typing.Any] = {}
self.timeout = timeout or 3
def flush(self) -> None:
"""
Sends all the collected data to the api asynchronously and resets the timer
"""
if not self.analytics_data:
return
session.post(
self.analytics_endpoint,
data=json.dumps(self.analytics_data),
timeout=self.timeout,
headers={
"X-Environment-Key": self.environment_key,
"Content-Type": "application/json",
},
)
self.analytics_data.clear()
self._last_flushed = datetime.now()
def track_feature(self, feature_name: str) -> None:
self.analytics_data[feature_name] = self.analytics_data.get(feature_name, 0) + 1
if (datetime.now() - self._last_flushed).seconds > ANALYTICS_TIMER:
self.flush()
@dataclass
class PipelineAnalyticsConfig:
analytics_server_url: str
max_buffer_items: int = 1000
flush_interval_seconds: float = 10.0
class PipelineAnalyticsProcessor:
"""
Buffered analytics processor that sends per-evaluation and custom events
to the Flagsmith pipeline analytics endpoint in batches.
Evaluation events are deduplicated within each flush window. Events are
flushed periodically via a background timer or when the buffer is full.
"""
def __init__(
self,
config: PipelineAnalyticsConfig,
environment_key: str,
) -> None:
url = config.analytics_server_url
if not url.endswith("/"):
url = f"{url}/"
self._batch_endpoint = f"{url}v1/analytics/batch"
self._environment_key = environment_key
self._max_buffer = config.max_buffer_items
self._flush_interval_seconds = config.flush_interval_seconds
self._buffer: typing.List[typing.Dict[str, typing.Any]] = []
self._dedup_keys: typing.Dict[str, str] = {}
self._lock = threading.Lock()
self._timer: typing.Optional[threading.Timer] = None
def record_evaluation_event(
self,
flag_key: str,
enabled: bool,
value: typing.Any,
identity_identifier: typing.Optional[str] = None,
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}"
should_flush = False
with self._lock:
if self._dedup_keys.get(flag_key) == fingerprint:
return
self._dedup_keys[flag_key] = fingerprint
self._buffer.append(
{
"event_id": flag_key,
"event_type": "flag_evaluation",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": enabled,
"value": value,
"traits": dict(traits) if traits else None,
"metadata": {"sdk_version": __version__},
}
)
if len(self._buffer) >= self._max_buffer:
should_flush = True
if should_flush:
self.flush()
def record_custom_event(
self,
event_name: str,
identity_identifier: typing.Optional[str] = None,
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
should_flush = False
with self._lock:
self._buffer.append(
{
"event_id": event_name,
"event_type": "custom_event",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": None,
"value": None,
"traits": dict(traits) if traits else None,
"metadata": {**(metadata or {}), "sdk_version": __version__},
}
)
if len(self._buffer) >= self._max_buffer:
should_flush = True
if should_flush:
self.flush()
def flush(self) -> None:
with self._lock:
if not self._buffer:
return
events = self._buffer
self._buffer = []
self._dedup_keys.clear()
payload = json.dumps(
{"events": events, "environment_key": self._environment_key}
)
try:
future = session.post(
self._batch_endpoint,
data=payload,
timeout=3,
headers={
"Content-Type": "application/json; charset=utf-8",
"X-Environment-Key": self._environment_key,
"Flagsmith-SDK-User-Agent": f"flagsmith-python-client/{__version__}",
},
)
except RuntimeError:
logger.debug("Skipping flush: thread pool already shut down")
return
future.add_done_callback(lambda f: self._handle_flush_result(f, events))
def _handle_flush_result(
self,
future: typing.Any,
events: typing.List[typing.Dict[str, typing.Any]],
) -> None:
try:
response = future.result()
response.raise_for_status()
except Exception:
logger.warning(
"Failed to flush pipeline analytics, re-queuing events", exc_info=True
)
with self._lock:
self._buffer = events + self._buffer
self._buffer = self._buffer[: self._max_buffer]
def start(self) -> None:
self._schedule_flush()
atexit.register(self.stop)
def stop(self) -> None:
atexit.unregister(self.stop)
if self._timer is not None:
self._timer.cancel()
self.flush()
def _schedule_flush(self) -> None:
self._timer = threading.Timer(self._flush_interval_seconds, self._timer_flush)
self._timer.daemon = True
self._timer.start()
def _timer_flush(self) -> None:
self.flush()
self._schedule_flush()