1+ import json
12import logging
23import typing
3- from json import JSONDecodeError
4+ from datetime import datetime
45
6+ import pytz
57import requests
68from flag_engine import engine
79from flag_engine .environments .models import EnvironmentModel
1517from flagsmith .models import DefaultFlag , Flags , Segment
1618from flagsmith .offline_handlers import BaseOfflineHandler
1719from flagsmith .polling_manager import EnvironmentDataPollingManager
20+ from flagsmith .streaming_manager import EventStreamManager , StreamEvent
1821from flagsmith .utils .identities import generate_identities_data
1922
2023logger = logging .getLogger (__name__ )
2124
2225DEFAULT_API_URL = "https://edge.api.flagsmith.com/api/v1/"
26+ DEFAULT_REALTIME_API_URL = "https://realtime.flagsmith.com/"
2327
2428
2529class Flagsmith :
@@ -41,6 +45,7 @@ def __init__(
4145 self ,
4246 environment_key : str = None ,
4347 api_url : str = None ,
48+ realtime_api_url : typing .Optional [str ] = None ,
4449 custom_headers : typing .Dict [str , typing .Any ] = None ,
4550 request_timeout_seconds : int = None ,
4651 enable_local_evaluation : bool = False ,
@@ -51,11 +56,13 @@ def __init__(
5156 proxies : typing .Dict [str , str ] = None ,
5257 offline_mode : bool = False ,
5358 offline_handler : BaseOfflineHandler = None ,
59+ enable_realtime_updates : bool = False ,
5460 ):
5561 """
5662 :param environment_key: The environment key obtained from Flagsmith interface.
5763 Required unless offline_mode is True.
5864 :param api_url: Override the URL of the Flagsmith API to communicate with
65+ :param realtime_api_url: Override the URL of the Flagsmith real-time API
5966 :param custom_headers: Additional headers to add to requests made to the
6067 Flagsmith API
6168 :param request_timeout_seconds: Number of seconds to wait for a request to
@@ -76,12 +83,15 @@ def __init__(
7683 :param offline_handler: provide a handler for offline logic. Used to get environment
7784 document from another source when in offline_mode. Works in place of
7885 default_flag_handler if offline_mode is not set and using remote evaluation.
86+ :param enable_realtime_updates: Use real-time functionality via SSE as opposed to polling the API
7987 """
8088
8189 self .offline_mode = offline_mode
8290 self .enable_local_evaluation = enable_local_evaluation
91+ self .environment_refresh_interval_seconds = environment_refresh_interval_seconds
8392 self .offline_handler = offline_handler
8493 self .default_flag_handler = default_flag_handler
94+ self .enable_realtime_updates = enable_realtime_updates
8595 self ._analytics_processor = None
8696 self ._environment = None
8797
@@ -93,6 +103,11 @@ def __init__(
93103 "Cannot use both default_flag_handler and offline_handler."
94104 )
95105
106+ if enable_realtime_updates and not enable_local_evaluation :
107+ raise ValueError (
108+ "Can only use realtime updates when running in local evaluation mode."
109+ )
110+
96111 if self .offline_handler :
97112 self ._environment = self .offline_handler .get_environment ()
98113
@@ -110,6 +125,13 @@ def __init__(
110125 api_url = api_url or DEFAULT_API_URL
111126 self .api_url = api_url if api_url .endswith ("/" ) else f"{ api_url } /"
112127
128+ realtime_api_url = realtime_api_url or DEFAULT_REALTIME_API_URL
129+ self .realtime_api_url = (
130+ realtime_api_url
131+ if realtime_api_url .endswith ("/" )
132+ else f"{ realtime_api_url } /"
133+ )
134+
113135 self .request_timeout_seconds = request_timeout_seconds
114136 self .session .mount (self .api_url , HTTPAdapter (max_retries = retries ))
115137
@@ -124,20 +146,60 @@ def __init__(
124146 "in the environment settings page."
125147 )
126148
127- self .environment_data_polling_manager_thread = (
128- EnvironmentDataPollingManager (
129- main = self ,
130- refresh_interval_seconds = environment_refresh_interval_seconds ,
131- daemon = True , # noqa
132- )
133- )
134- self .environment_data_polling_manager_thread .start ()
149+ self ._initialise_local_evaluation ()
135150
136151 if enable_analytics :
137152 self ._analytics_processor = AnalyticsProcessor (
138153 environment_key , self .api_url , timeout = self .request_timeout_seconds
139154 )
140155
156+ def _initialise_local_evaluation (self ) -> None :
157+ if self .enable_realtime_updates :
158+ self .update_environment ()
159+ stream_url = f"{ self .realtime_api_url } sse/environments/{ self ._environment .api_key } /stream"
160+
161+ self .event_stream_thread = EventStreamManager (
162+ stream_url = stream_url ,
163+ on_event = self .handle_stream_event ,
164+ daemon = True ,
165+ )
166+
167+ self .event_stream_thread .start ()
168+
169+ else :
170+ self .environment_data_polling_manager_thread = (
171+ EnvironmentDataPollingManager (
172+ main = self ,
173+ refresh_interval_seconds = self .environment_refresh_interval_seconds ,
174+ daemon = True ,
175+ )
176+ )
177+
178+ self .environment_data_polling_manager_thread .start ()
179+
180+ def handle_stream_event (self , event : StreamEvent ) -> None :
181+ try :
182+ event_data = json .loads (event .data )
183+ except json .JSONDecodeError as e :
184+ raise FlagsmithAPIError ("Unable to get valid json from event data." ) from e
185+
186+ try :
187+ stream_updated_at = datetime .fromtimestamp (event_data .get ("updated_at" ))
188+ except TypeError as e :
189+ raise FlagsmithAPIError (
190+ "Unable to get valid timestamp from event data."
191+ ) from e
192+
193+ if stream_updated_at .tzinfo is None :
194+ stream_updated_at = pytz .utc .localize (stream_updated_at )
195+
196+ environment_updated_at = self ._environment .updated_at
197+ if environment_updated_at .tzinfo is None :
198+ environment_updated_at = pytz .utc .localize (environment_updated_at )
199+
200+ if stream_updated_at > environment_updated_at :
201+ self .update_environment ()
202+
141203 def get_environment_flags (self ) -> Flags :
142204 """
143205 Get all the default for flags for the current environment.
@@ -267,7 +329,7 @@ def _get_json_response(self, url: str, method: str, body: dict = None):
267329 response .status_code ,
268330 )
269331 return response .json ()
270- except (requests .ConnectionError , JSONDecodeError ) as e :
332+ except (requests .ConnectionError , json . JSONDecodeError ) as e :
271333 raise FlagsmithAPIError (
272334 "Unable to get valid response from Flagsmith API."
273335 ) from e
@@ -291,3 +353,6 @@ def _build_identity_model(self, identifier: str, **traits):
291353 def __del__ (self ):
292354 if hasattr (self , "environment_data_polling_manager_thread" ):
293355 self .environment_data_polling_manager_thread .stop ()
356+
357+ if hasattr (self , "event_stream_thread" ):
358+ self .event_stream_thread .stop ()
0 commit comments