diff --git a/flagsmith/__init__.py b/flagsmith/__init__.py index 41473e4..d36feb8 100644 --- a/flagsmith/__init__.py +++ b/flagsmith/__init__.py @@ -1,5 +1,6 @@ from flagsmith import webhooks +from flagsmith.analytics import PipelineAnalyticsConfig from flagsmith.flagsmith import Flagsmith from flagsmith.version import __version__ -__all__ = ("Flagsmith", "webhooks", "__version__") +__all__ = ("Flagsmith", "PipelineAnalyticsConfig", "webhooks", "__version__") diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index dee1ed5..f0d0bab 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -1,9 +1,18 @@ +import atexit import json +import logging +import threading +import time import typing +from dataclasses import dataclass from datetime import datetime from requests_futures.sessions import FuturesSession # type: ignore +from flagsmith.version import __version__ + +logger = logging.getLogger(__name__) + ANALYTICS_ENDPOINT: typing.Final[str] = "analytics/flags/" # Used to control how often we send data(in seconds) @@ -60,3 +69,161 @@ def track_feature(self, feature_name: str) -> None: self.analytics_data[feature_name] = self.analytics_data.get(feature_name, 0) + 1 if (datetime.now() - self._last_flushed).seconds > ANALYTICS_TIMER: self.flush() + + +@dataclass +class PipelineAnalyticsConfig: + analytics_server_url: str + max_buffer_items: int = 1000 + flush_interval_seconds: float = 10.0 + + +class PipelineAnalyticsProcessor: + """ + Buffered analytics processor that sends per-evaluation and custom events + to the Flagsmith pipeline analytics endpoint in batches. + + Evaluation events are deduplicated within each flush window. Events are + flushed periodically via a background timer or when the buffer is full. + """ + + def __init__( + self, + config: PipelineAnalyticsConfig, + environment_key: str, + ) -> None: + url = config.analytics_server_url + if not url.endswith("/"): + url = f"{url}/" + self._batch_endpoint = f"{url}v1/analytics/batch" + self._environment_key = environment_key + self._max_buffer = config.max_buffer_items + self._flush_interval_seconds = config.flush_interval_seconds + + self._buffer: typing.List[typing.Dict[str, typing.Any]] = [] + self._dedup_keys: typing.Dict[str, str] = {} + self._lock = threading.Lock() + self._timer: typing.Optional[threading.Timer] = None + + def record_evaluation_event( + self, + flag_key: str, + enabled: bool, + value: typing.Any, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[typing.Dict[str, typing.Any]] = None, + ) -> None: + fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}" + should_flush = False + + with self._lock: + if self._dedup_keys.get(flag_key) == fingerprint: + return + self._dedup_keys[flag_key] = fingerprint + self._buffer.append( + { + "event_id": flag_key, + "event_type": "flag_evaluation", + "evaluated_at": int(time.time() * 1000), + "identity_identifier": identity_identifier, + "enabled": enabled, + "value": value, + "traits": dict(traits) if traits else None, + "metadata": {"sdk_version": __version__}, + } + ) + if len(self._buffer) >= self._max_buffer: + should_flush = True + + if should_flush: + self.flush() + + def record_custom_event( + self, + event_name: str, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[typing.Dict[str, typing.Any]] = None, + metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + ) -> None: + should_flush = False + + with self._lock: + self._buffer.append( + { + "event_id": event_name, + "event_type": "custom_event", + "evaluated_at": int(time.time() * 1000), + "identity_identifier": identity_identifier, + "enabled": None, + "value": None, + "traits": dict(traits) if traits else None, + "metadata": {**(metadata or {}), "sdk_version": __version__}, + } + ) + if len(self._buffer) >= self._max_buffer: + should_flush = True + + if should_flush: + self.flush() + + def flush(self) -> None: + with self._lock: + if not self._buffer: + return + events = self._buffer + self._buffer = [] + self._dedup_keys.clear() + + payload = json.dumps( + {"events": events, "environment_key": self._environment_key} + ) + try: + future = session.post( + self._batch_endpoint, + data=payload, + timeout=3, + headers={ + "Content-Type": "application/json; charset=utf-8", + "X-Environment-Key": self._environment_key, + "Flagsmith-SDK-User-Agent": f"flagsmith-python-client/{__version__}", + }, + ) + except RuntimeError: + logger.debug("Skipping flush: thread pool already shut down") + return + future.add_done_callback(lambda f: self._handle_flush_result(f, events)) + + def _handle_flush_result( + self, + future: typing.Any, + events: typing.List[typing.Dict[str, typing.Any]], + ) -> None: + try: + response = future.result() + response.raise_for_status() + except Exception: + logger.warning( + "Failed to flush pipeline analytics, re-queuing events", exc_info=True + ) + with self._lock: + self._buffer = events + self._buffer + self._buffer = self._buffer[: self._max_buffer] + + def start(self) -> None: + self._schedule_flush() + atexit.register(self.stop) + + def stop(self) -> None: + atexit.unregister(self.stop) + if self._timer is not None: + self._timer.cancel() + self.flush() + + def _schedule_flush(self) -> None: + self._timer = threading.Timer(self._flush_interval_seconds, self._timer_flush) + self._timer.daemon = True + self._timer.start() + + def _timer_flush(self) -> None: + self.flush() + self._schedule_flush() diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index 15da5ec..0ff75fe 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -8,13 +8,18 @@ from requests.adapters import HTTPAdapter from urllib3 import Retry -from flagsmith.analytics import AnalyticsProcessor +from flagsmith.analytics import ( + AnalyticsProcessor, + PipelineAnalyticsConfig, + PipelineAnalyticsProcessor, +) from flagsmith.exceptions import FlagsmithAPIError, FlagsmithClientError from flagsmith.mappers import ( map_context_and_identity_data_to_context, map_environment_document_to_context, map_environment_document_to_environment_updated_at, map_segment_results_to_identity_segments, + resolve_trait_values, ) from flagsmith.models import DefaultFlag, Flags, Segment from flagsmith.offline_handlers import OfflineHandler @@ -63,6 +68,7 @@ def __init__( environment_refresh_interval_seconds: typing.Union[int, float] = 60, retries: typing.Optional[Retry] = None, enable_analytics: bool = False, + pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig] = None, default_flag_handler: typing.Optional[ typing.Callable[[str], DefaultFlag] ] = None, @@ -108,6 +114,9 @@ def __init__( self.default_flag_handler = default_flag_handler self.enable_realtime_updates = enable_realtime_updates self._analytics_processor: typing.Optional[AnalyticsProcessor] = None + self._pipeline_analytics_processor: typing.Optional[ + PipelineAnalyticsProcessor + ] = None self._evaluation_context: typing.Optional[SDKEvaluationContext] = None self._environment_updated_at: typing.Optional[datetime] = None @@ -170,10 +179,28 @@ def __init__( self._initialise_local_evaluation() - if enable_analytics: - self._analytics_processor = AnalyticsProcessor( - environment_key, self.api_url, timeout=self.request_timeout_seconds - ) + self._initialise_analytics( + environment_key=environment_key, + enable_analytics=enable_analytics, + pipeline_analytics_config=pipeline_analytics_config, + ) + + def _initialise_analytics( + self, + environment_key: str, + enable_analytics: bool, + pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig], + ) -> None: + if enable_analytics: + self._analytics_processor = AnalyticsProcessor( + environment_key, self.api_url, timeout=self.request_timeout_seconds + ) + if pipeline_analytics_config: + self._pipeline_analytics_processor = PipelineAnalyticsProcessor( + config=pipeline_analytics_config, + environment_key=environment_key, + ) + self._pipeline_analytics_processor.start() def _initialise_local_evaluation(self) -> None: # To ensure that the environment is set before allowing subsequent @@ -290,6 +317,25 @@ def get_identity_segments( return map_segment_results_to_identity_segments(evaluation_result["segments"]) + def track_event( + self, + event_name: str, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[TraitMapping] = None, + metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + ) -> None: + if not self._pipeline_analytics_processor: + raise ValueError( + "Pipeline analytics is not configured. " + "Provide pipeline_analytics_config to use track_event." + ) + self._pipeline_analytics_processor.record_custom_event( + event_name=event_name, + identity_identifier=identity_identifier, + traits=resolve_trait_values(traits), + metadata=metadata, + ) + def update_environment(self) -> None: try: environment_data = self._get_json_response( @@ -345,6 +391,7 @@ def _get_environment_flags_from_document(self) -> Flags: evaluation_result=evaluation_result, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, + pipeline_analytics_processor=self._pipeline_analytics_processor, ) def _get_identity_flags_from_document( @@ -368,6 +415,9 @@ def _get_identity_flags_from_document( evaluation_result=evaluation_result, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, + pipeline_analytics_processor=self._pipeline_analytics_processor, + identity_identifier=identifier, + traits=resolve_trait_values(traits), ) def _get_environment_flags_from_api(self) -> Flags: @@ -379,6 +429,7 @@ def _get_environment_flags_from_api(self) -> Flags: api_flags=json_response, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, + pipeline_analytics_processor=self._pipeline_analytics_processor, ) except FlagsmithAPIError: if self.offline_handler: @@ -411,6 +462,9 @@ def _get_identity_flags_from_api( api_flags=json_response["flags"], analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, + pipeline_analytics_processor=self._pipeline_analytics_processor, + identity_identifier=identifier, + traits=resolve_trait_values(traits), ) except FlagsmithAPIError: if self.offline_handler: @@ -443,3 +497,6 @@ def __del__(self) -> None: if hasattr(self, "event_stream_thread"): self.event_stream_thread.stop() + + if self._pipeline_analytics_processor: + self._pipeline_analytics_processor.stop() diff --git a/flagsmith/mappers.py b/flagsmith/mappers.py index 18d1acf..3ed3e3d 100644 --- a/flagsmith/mappers.py +++ b/flagsmith/mappers.py @@ -12,7 +12,6 @@ StrValueSegmentCondition, ) from flag_engine.result.types import SegmentResult -from flag_engine.segments.types import ContextValue from flagsmith.api.types import ( EnvironmentModel, @@ -26,7 +25,7 @@ SDKEvaluationContext, SegmentMetadata, StreamEvent, - TraitConfig, + TraitMapping, ) from flagsmith.utils.datetime import fromisoformat @@ -75,31 +74,27 @@ def map_environment_document_to_environment_updated_at( return updated_at.astimezone(tz=timezone.utc) +def resolve_trait_values( + traits: typing.Optional[TraitMapping], +) -> typing.Optional[typing.Dict[str, typing.Any]]: + if not traits: + return None + return { + key: (val["value"] if isinstance(val, dict) else val) + for key, val in traits.items() + } + + def map_context_and_identity_data_to_context( context: SDKEvaluationContext, identifier: str, - traits: typing.Optional[ - typing.Mapping[ - str, - typing.Union[ - ContextValue, - TraitConfig, - ], - ] - ], + traits: typing.Optional[TraitMapping] = None, ) -> SDKEvaluationContext: return { **context, "identity": { "identifier": identifier, - "traits": { - trait_key: ( - trait_value_or_config["value"] - if isinstance(trait_value_or_config, dict) - else trait_value_or_config - ) - for trait_key, trait_value_or_config in (traits or {}).items() - }, + "traits": resolve_trait_values(traits) or {}, }, } diff --git a/flagsmith/models.py b/flagsmith/models.py index 72beb25..8d3765c 100644 --- a/flagsmith/models.py +++ b/flagsmith/models.py @@ -3,7 +3,7 @@ import typing from dataclasses import dataclass, field -from flagsmith.analytics import AnalyticsProcessor +from flagsmith.analytics import AnalyticsProcessor, PipelineAnalyticsProcessor from flagsmith.exceptions import FlagsmithFeatureDoesNotExistError from flagsmith.types import SDKEvaluationResult, SDKFlagResult @@ -57,6 +57,9 @@ class Flags: flags: typing.Dict[str, Flag] = field(default_factory=dict) default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]] = None _analytics_processor: typing.Optional[AnalyticsProcessor] = None + _pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None + _identity_identifier: typing.Optional[str] = None + _traits: typing.Optional[typing.Dict[str, typing.Any]] = None @classmethod def from_evaluation_result( @@ -64,6 +67,11 @@ def from_evaluation_result( evaluation_result: SDKEvaluationResult, analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], + pipeline_analytics_processor: typing.Optional[ + PipelineAnalyticsProcessor + ] = None, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: return cls( flags={ @@ -73,6 +81,9 @@ def from_evaluation_result( }, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, + _pipeline_analytics_processor=pipeline_analytics_processor, + _identity_identifier=identity_identifier, + _traits=traits, ) @classmethod @@ -81,6 +92,11 @@ def from_api_flags( api_flags: typing.Sequence[typing.Mapping[str, typing.Any]], analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], + pipeline_analytics_processor: typing.Optional[ + PipelineAnalyticsProcessor + ] = None, + identity_identifier: typing.Optional[str] = None, + traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: flags = { flag_data["feature"]["name"]: Flag.from_api_flag(flag_data) @@ -91,6 +107,9 @@ def from_api_flags( flags=flags, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, + _pipeline_analytics_processor=pipeline_analytics_processor, + _identity_identifier=identity_identifier, + _traits=traits, ) def all_flags(self) -> typing.List[Flag]: @@ -141,6 +160,15 @@ def get_flag(self, feature_name: str) -> typing.Union[DefaultFlag, Flag]: if self._analytics_processor and hasattr(flag, "feature_name"): self._analytics_processor.track_feature(flag.feature_name) + if self._pipeline_analytics_processor and hasattr(flag, "feature_name"): + self._pipeline_analytics_processor.record_evaluation_event( + flag_key=flag.feature_name, + enabled=flag.enabled, + value=flag.value, + identity_identifier=self._identity_identifier, + traits=self._traits, + ) + return flag diff --git a/tests/conftest.py b/tests/conftest.py index ce1153c..9c58c96 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,11 @@ from pytest_mock import MockerFixture from flagsmith import Flagsmith -from flagsmith.analytics import AnalyticsProcessor +from flagsmith.analytics import ( + AnalyticsProcessor, + PipelineAnalyticsConfig, + PipelineAnalyticsProcessor, +) from flagsmith.api.types import EnvironmentModel from flagsmith.mappers import map_environment_document_to_context from flagsmith.types import SDKEvaluationContext @@ -26,6 +30,21 @@ def analytics_processor() -> AnalyticsProcessor: ) +@pytest.fixture() +def pipeline_analytics_config() -> PipelineAnalyticsConfig: + return PipelineAnalyticsConfig(analytics_server_url="http://test_analytics/") + + +@pytest.fixture() +def pipeline_analytics_processor( + pipeline_analytics_config: PipelineAnalyticsConfig, +) -> PipelineAnalyticsProcessor: + return PipelineAnalyticsProcessor( + config=pipeline_analytics_config, + environment_key="test_key", + ) + + @pytest.fixture(scope="session") def api_key() -> str: return "".join(random.sample(string.ascii_letters, 20)) diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 494e854..b0ad097 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -9,6 +9,7 @@ from responses import matchers from flagsmith import Flagsmith, __version__ +from flagsmith.analytics import PipelineAnalyticsConfig from flagsmith.api.types import EnvironmentModel from flagsmith.exceptions import ( FlagsmithAPIError, @@ -915,3 +916,92 @@ def test_flagsmith__init__expected_headers_sent( "Connection": "keep-alive", **expected_headers, } + + +def test_track_event_raises_without_config(api_key: str) -> None: + flagsmith = Flagsmith(environment_key=api_key) + with pytest.raises(ValueError, match="Pipeline analytics is not configured"): + flagsmith.track_event("purchase") + + +@responses.activate() +def test_pipeline_analytics_records_events( + mocker: MockerFixture, api_key: str, flags_json: str +) -> None: + config = PipelineAnalyticsConfig(analytics_server_url="http://test/") + flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) + + mock_eval = mocker.patch.object( + flagsmith._pipeline_analytics_processor, "record_evaluation_event" + ) + mock_custom = mocker.patch.object( + flagsmith._pipeline_analytics_processor, "record_custom_event" + ) + + responses.add(method="GET", url=flagsmith.environment_flags_url, body=flags_json) + flags = flagsmith.get_environment_flags() + flags.get_flag("some_feature") + + mock_eval.assert_called_once_with( + flag_key="some_feature", + enabled=True, + value="some-value", + identity_identifier=None, + traits=None, + ) + + flagsmith.track_event( + "purchase", + identity_identifier="user1", + traits={"plan": "premium"}, + metadata={"amount": 99}, + ) + + mock_custom.assert_called_once_with( + event_name="purchase", + identity_identifier="user1", + traits={"plan": "premium"}, + metadata={"amount": 99}, + ) + + +@responses.activate() +def test_identity_flags_records_evaluation_with_resolved_traits( + mocker: MockerFixture, api_key: str, identities_json: str +) -> None: + config = PipelineAnalyticsConfig(analytics_server_url="http://test/") + flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) + + mock_record = mocker.patch.object( + flagsmith._pipeline_analytics_processor, "record_evaluation_event" + ) + + responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) + responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) + + flags = flagsmith.get_identity_flags("user123", traits={"plan": "premium"}) + flags.get_flag("some_feature") + + mock_record.assert_called_once_with( + flag_key="some_feature", + enabled=True, + value="some-value", + identity_identifier="user123", + traits={"plan": "premium"}, + ) + + mock_record.reset_mock() + + flags = flagsmith.get_identity_flags( + "user123", + traits={"plan": {"value": "premium", "transient": True}}, + ) + flags.get_flag("some_feature") + + mock_record.assert_called_once_with( + flag_key="some_feature", + enabled=True, + value="some-value", + identity_identifier="user123", + traits={"plan": "premium"}, + ) diff --git a/tests/test_models.py b/tests/test_models.py index c992395..8c42093 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -149,3 +149,15 @@ def test_flag_from_evaluation_result_missing_metadata__raises_expected() -> None # When & Then with pytest.raises(ValueError): Flag.from_evaluation_result(flag_result) + + +def test_get_flag_without_pipeline_processor() -> None: + flags = Flags( + flags={ + "my_feature": Flag( + enabled=True, value="v1", feature_name="my_feature", feature_id=1 + ) + }, + ) + flag = flags.get_flag("my_feature") + assert flag.enabled is True diff --git a/tests/test_pipeline_analytics.py b/tests/test_pipeline_analytics.py new file mode 100644 index 0000000..2a45899 --- /dev/null +++ b/tests/test_pipeline_analytics.py @@ -0,0 +1,182 @@ +import json +from concurrent.futures import Future +from unittest import mock + +import pytest + +from flagsmith.analytics import ( + PipelineAnalyticsConfig, + PipelineAnalyticsProcessor, +) + + +def test_record_evaluation_event_buffers_event( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", + enabled=True, + value="variant_a", + identity_identifier="user123", + traits={"plan": "premium"}, + ) + + assert len(pipeline_analytics_processor._buffer) == 1 + event = pipeline_analytics_processor._buffer[0] + assert event["event_id"] == "my_flag" + assert event["event_type"] == "flag_evaluation" + assert event["identity_identifier"] == "user123" + assert event["enabled"] is True + assert event["value"] == "variant_a" + assert event["traits"] == {"plan": "premium"} + assert "sdk_version" in event["metadata"] + assert isinstance(event["evaluated_at"], int) + + +@pytest.mark.parametrize( + "second_enabled, expected_count", + [ + (True, 1), # same fingerprint -> deduplicated + (False, 2), # different fingerprint -> both kept + ], +) +def test_evaluation_event_deduplication( + pipeline_analytics_processor: PipelineAnalyticsProcessor, + second_enabled: bool, + expected_count: int, +) -> None: + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" + ) + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", + enabled=second_enabled, + value="v1", + identity_identifier="user1", + ) + + assert len(pipeline_analytics_processor._buffer) == expected_count + + +def test_dedup_keys_cleared_after_flush( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + with mock.patch("flagsmith.analytics.session"): + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" + ) + pipeline_analytics_processor.flush() + + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" + ) + + assert len(pipeline_analytics_processor._buffer) == 1 + + +def test_auto_flush_on_buffer_full() -> None: + config = PipelineAnalyticsConfig( + analytics_server_url="http://test/", max_buffer_items=5 + ) + processor = PipelineAnalyticsProcessor(config=config, environment_key="key") + + with mock.patch("flagsmith.analytics.session"): + for i in range(5): + processor.record_evaluation_event( + flag_key=f"flag_{i}", enabled=True, value=str(i) + ) + + assert len(processor._buffer) == 0 + + +def test_flush_sends_correct_http_request( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + with mock.patch("flagsmith.analytics.session") as mock_session: + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" + ) + pipeline_analytics_processor.flush() + + mock_session.post.assert_called_once() + call_kwargs = mock_session.post.call_args + assert call_kwargs[0][0] == "http://test_analytics/v1/analytics/batch" + + headers = call_kwargs[1]["headers"] + assert headers["X-Environment-Key"] == "test_key" + assert headers["Content-Type"] == "application/json; charset=utf-8" + assert "flagsmith-python-client/" in headers["Flagsmith-SDK-User-Agent"] + + body = json.loads(call_kwargs[1]["data"]) + assert body["environment_key"] == "test_key" + assert len(body["events"]) == 1 + assert body["events"][0]["event_id"] == "my_flag" + + +def test_flush_noop_when_empty( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + with mock.patch("flagsmith.analytics.session") as mock_session: + pipeline_analytics_processor.flush() + + mock_session.post.assert_not_called() + + +def test_failed_flush_requeues_events( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + future: Future[None] = Future() + future.set_exception(Exception("connection error")) + + with mock.patch("flagsmith.analytics.session") as mock_session: + mock_session.post.return_value = future + pipeline_analytics_processor.record_evaluation_event( + flag_key="my_flag", enabled=True, value="v1" + ) + pipeline_analytics_processor.flush() + + assert len(pipeline_analytics_processor._buffer) == 1 + assert pipeline_analytics_processor._buffer[0]["event_id"] == "my_flag" + + +def test_record_custom_event( + pipeline_analytics_processor: PipelineAnalyticsProcessor, +) -> None: + pipeline_analytics_processor.record_custom_event( + event_name="purchase", + identity_identifier="user1", + traits={"plan": "premium"}, + metadata={"amount": 99}, + ) + # Custom events are never deduplicated + pipeline_analytics_processor.record_custom_event( + event_name="purchase", + identity_identifier="user1", + ) + + assert len(pipeline_analytics_processor._buffer) == 2 + event = pipeline_analytics_processor._buffer[0] + assert event["event_id"] == "purchase" + assert event["event_type"] == "custom_event" + assert event["enabled"] is None + assert event["value"] is None + assert event["traits"] == {"plan": "premium"} + assert event["metadata"]["amount"] == 99 + assert "sdk_version" in event["metadata"] + + +def test_start_stop_lifecycle() -> None: + config = PipelineAnalyticsConfig( + analytics_server_url="http://test/", flush_interval_seconds=100 + ) + processor = PipelineAnalyticsProcessor(config=config, environment_key="key") + + processor.start() + assert processor._timer is not None + assert processor._timer.is_alive() + + with mock.patch("flagsmith.analytics.session"): + processor.record_evaluation_event(flag_key="my_flag", enabled=True, value="v1") + processor.stop() + + assert len(processor._buffer) == 0