Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion flagsmith/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from flagsmith import webhooks
from flagsmith.analytics import PipelineAnalyticsConfig
from flagsmith.flagsmith import Flagsmith
from flagsmith.version import __version__

__all__ = ("Flagsmith", "webhooks", "__version__")
__all__ = ("Flagsmith", "PipelineAnalyticsConfig", "webhooks", "__version__")
156 changes: 156 additions & 0 deletions flagsmith/analytics.py
Comment thread
khvn26 marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import json
import logging
import threading
import time
import typing
from dataclasses import dataclass
from datetime import datetime

from requests_futures.sessions import FuturesSession # type: ignore

from flagsmith.version import __version__

logger = logging.getLogger(__name__)

ANALYTICS_ENDPOINT: typing.Final[str] = "analytics/flags/"

# Used to control how often we send data(in seconds)
Expand Down Expand Up @@ -60,3 +68,151 @@ def track_feature(self, feature_name: str) -> None:
self.analytics_data[feature_name] = self.analytics_data.get(feature_name, 0) + 1
if (datetime.now() - self._last_flushed).seconds > ANALYTICS_TIMER:
self.flush()


@dataclass
class PipelineAnalyticsConfig:
analytics_server_url: str
max_buffer: int = 1000
Comment thread
khvn26 marked this conversation as resolved.
Outdated
flush_interval_seconds: float = 10.0


class PipelineAnalyticsProcessor:
Comment thread
khvn26 marked this conversation as resolved.
def __init__(
self,
config: PipelineAnalyticsConfig,
environment_key: str,
) -> None:
url = config.analytics_server_url
if not url.endswith("/"):
url = f"{url}/"
self._batch_endpoint = f"{url}v1/analytics/batch"
self._environment_key = environment_key
self._max_buffer = config.max_buffer
self._flush_interval_seconds = config.flush_interval_seconds

self._buffer: typing.List[typing.Dict[str, typing.Any]] = []
self._dedup_keys: typing.Dict[str, str] = {}
self._lock = threading.Lock()
self._timer: typing.Optional[threading.Timer] = None

def record_evaluation_event(
self,
flag_key: str,
enabled: bool,
value: typing.Any,
identity_identifier: typing.Optional[str] = None,
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}"
should_flush = False

with self._lock:
if self._dedup_keys.get(flag_key) == fingerprint:
return
self._dedup_keys[flag_key] = fingerprint
self._buffer.append(
{
"event_id": flag_key,
"event_type": "flag_evaluation",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": enabled,
"value": value,
"traits": dict(traits) if traits else None,
"metadata": {"sdk_version": __version__},
}
)
if len(self._buffer) >= self._max_buffer:
should_flush = True

if should_flush:
self.flush()

def record_custom_event(
self,
event_name: str,
identity_identifier: typing.Optional[str] = None,
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
should_flush = False

with self._lock:
self._buffer.append(
{
"event_id": event_name,
"event_type": "custom_event",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": None,
"value": None,
"traits": dict(traits) if traits else None,
"metadata": {**(metadata or {}), "sdk_version": __version__},
}
)
if len(self._buffer) >= self._max_buffer:
should_flush = True

if should_flush:
self.flush()

def flush(self) -> None:
with self._lock:
if not self._buffer:
return
events = self._buffer
self._buffer = []
self._dedup_keys.clear()

payload = json.dumps(
{"events": events, "environment_key": self._environment_key}
)
try:
future = session.post(
self._batch_endpoint,
data=payload,
timeout=3,
headers={
"Content-Type": "application/json; charset=utf-8",
"X-Environment-Key": self._environment_key,
"Flagsmith-SDK-User-Agent": f"flagsmith-python-client/{__version__}",
},
)
except RuntimeError:
logger.debug("Skipping flush: thread pool already shut down")
return
future.add_done_callback(lambda f: self._handle_flush_result(f, events))

def _handle_flush_result(
self,
future: typing.Any,
events: typing.List[typing.Dict[str, typing.Any]],
) -> None:
try:
response = future.result()
response.raise_for_status()
except Exception:
logger.warning(
"Failed to flush pipeline analytics, re-queuing events", exc_info=True
)
with self._lock:
self._buffer = events + self._buffer
self._buffer = self._buffer[: self._max_buffer]

def start(self) -> None:
self._schedule_flush()

def stop(self) -> None:
if self._timer is not None:
self._timer.cancel()
self.flush()

def _schedule_flush(self) -> None:
self._timer = threading.Timer(self._flush_interval_seconds, self._timer_flush)
self._timer.daemon = True
self._timer.start()

def _timer_flush(self) -> None:
self.flush()
self._schedule_flush()
70 changes: 65 additions & 5 deletions flagsmith/flagsmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
from requests.adapters import HTTPAdapter
from urllib3 import Retry

from flagsmith.analytics import AnalyticsProcessor
from flagsmith.analytics import (
AnalyticsProcessor,
PipelineAnalyticsConfig,
PipelineAnalyticsProcessor,
)
from flagsmith.exceptions import FlagsmithAPIError, FlagsmithClientError
from flagsmith.mappers import (
map_context_and_identity_data_to_context,
map_environment_document_to_context,
map_environment_document_to_environment_updated_at,
map_segment_results_to_identity_segments,
resolve_trait_values,
)
from flagsmith.models import DefaultFlag, Flags, Segment
from flagsmith.offline_handlers import OfflineHandler
Expand Down Expand Up @@ -63,6 +68,7 @@ def __init__(
environment_refresh_interval_seconds: typing.Union[int, float] = 60,
retries: typing.Optional[Retry] = None,
enable_analytics: bool = False,
pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig] = None,
default_flag_handler: typing.Optional[
typing.Callable[[str], DefaultFlag]
] = None,
Expand Down Expand Up @@ -108,6 +114,9 @@ def __init__(
self.default_flag_handler = default_flag_handler
self.enable_realtime_updates = enable_realtime_updates
self._analytics_processor: typing.Optional[AnalyticsProcessor] = None
self._pipeline_analytics_processor: typing.Optional[
PipelineAnalyticsProcessor
] = None
self._evaluation_context: typing.Optional[SDKEvaluationContext] = None
self._environment_updated_at: typing.Optional[datetime] = None

Expand Down Expand Up @@ -170,10 +179,28 @@ def __init__(

self._initialise_local_evaluation()

if enable_analytics:
self._analytics_processor = AnalyticsProcessor(
environment_key, self.api_url, timeout=self.request_timeout_seconds
)
self._initialise_analytics(
environment_key=environment_key,
enable_analytics=enable_analytics,
pipeline_analytics_config=pipeline_analytics_config,
)

def _initialise_analytics(
self,
environment_key: str,
enable_analytics: bool,
pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig],
) -> None:
if enable_analytics:
self._analytics_processor = AnalyticsProcessor(
environment_key, self.api_url, timeout=self.request_timeout_seconds
)
if pipeline_analytics_config:
self._pipeline_analytics_processor = PipelineAnalyticsProcessor(
config=pipeline_analytics_config,
environment_key=environment_key,
)
self._pipeline_analytics_processor.start()

def _initialise_local_evaluation(self) -> None:
# To ensure that the environment is set before allowing subsequent
Expand Down Expand Up @@ -290,6 +317,25 @@ def get_identity_segments(

return map_segment_results_to_identity_segments(evaluation_result["segments"])

def track_event(
self,
event_name: str,
identity_identifier: typing.Optional[str] = None,
traits: typing.Optional[TraitMapping] = None,
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
) -> None:
if not self._pipeline_analytics_processor:
raise ValueError(
"Pipeline analytics is not configured. "
"Provide pipeline_analytics_config to use track_event."
)
self._pipeline_analytics_processor.record_custom_event(
event_name=event_name,
identity_identifier=identity_identifier,
traits=resolve_trait_values(traits),
metadata=metadata,
)

def update_environment(self) -> None:
try:
environment_data = self._get_json_response(
Expand Down Expand Up @@ -345,6 +391,7 @@ def _get_environment_flags_from_document(self) -> Flags:
evaluation_result=evaluation_result,
analytics_processor=self._analytics_processor,
default_flag_handler=self.default_flag_handler,
pipeline_analytics_processor=self._pipeline_analytics_processor,
)

def _get_identity_flags_from_document(
Expand All @@ -368,6 +415,9 @@ def _get_identity_flags_from_document(
evaluation_result=evaluation_result,
analytics_processor=self._analytics_processor,
default_flag_handler=self.default_flag_handler,
pipeline_analytics_processor=self._pipeline_analytics_processor,
identity_identifier=identifier,
traits=resolve_trait_values(traits),
)

def _get_environment_flags_from_api(self) -> Flags:
Expand All @@ -379,6 +429,7 @@ def _get_environment_flags_from_api(self) -> Flags:
api_flags=json_response,
analytics_processor=self._analytics_processor,
default_flag_handler=self.default_flag_handler,
pipeline_analytics_processor=self._pipeline_analytics_processor,
)
except FlagsmithAPIError:
if self.offline_handler:
Expand Down Expand Up @@ -411,6 +462,9 @@ def _get_identity_flags_from_api(
api_flags=json_response["flags"],
analytics_processor=self._analytics_processor,
default_flag_handler=self.default_flag_handler,
pipeline_analytics_processor=self._pipeline_analytics_processor,
identity_identifier=identifier,
traits=resolve_trait_values(traits),
)
except FlagsmithAPIError:
if self.offline_handler:
Expand Down Expand Up @@ -443,3 +497,9 @@ def __del__(self) -> None:

if hasattr(self, "event_stream_thread"):
self.event_stream_thread.stop()

if (
hasattr(self, "_pipeline_analytics_processor")
Comment thread
khvn26 marked this conversation as resolved.
Outdated
and self._pipeline_analytics_processor
):
self._pipeline_analytics_processor.stop()
33 changes: 14 additions & 19 deletions flagsmith/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
StrValueSegmentCondition,
)
from flag_engine.result.types import SegmentResult
from flag_engine.segments.types import ContextValue

from flagsmith.api.types import (
EnvironmentModel,
Expand All @@ -26,7 +25,7 @@
SDKEvaluationContext,
SegmentMetadata,
StreamEvent,
TraitConfig,
TraitMapping,
)
from flagsmith.utils.datetime import fromisoformat

Expand Down Expand Up @@ -75,31 +74,27 @@ def map_environment_document_to_environment_updated_at(
return updated_at.astimezone(tz=timezone.utc)


def resolve_trait_values(
traits: typing.Optional[TraitMapping],
) -> typing.Optional[typing.Dict[str, typing.Any]]:
if not traits:
return None
return {
key: (val["value"] if isinstance(val, dict) else val)
for key, val in traits.items()
}


def map_context_and_identity_data_to_context(
context: SDKEvaluationContext,
identifier: str,
traits: typing.Optional[
typing.Mapping[
str,
typing.Union[
ContextValue,
TraitConfig,
],
]
],
traits: typing.Optional[TraitMapping] = None,
) -> SDKEvaluationContext:
return {
**context,
"identity": {
"identifier": identifier,
"traits": {
trait_key: (
trait_value_or_config["value"]
if isinstance(trait_value_or_config, dict)
else trait_value_or_config
)
for trait_key, trait_value_or_config in (traits or {}).items()
},
"traits": resolve_trait_values(traits) or {},
},
}

Expand Down
Loading
Loading