Skip to content

Commit 5017255

Browse files
dengfeigedengfeige
authored andcommitted
feat: Support to get the initial state of the Client
1 parent abf27f0 commit 5017255

8 files changed

Lines changed: 110 additions & 32 deletions

File tree

demo.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,17 @@
1010

1111
config = fp.Config(remote_uri=FEATURE_PROBE_SERVER_URL, # FeatureProbe server URL
1212
sync_mode='pooling',
13-
refresh_interval=3)
13+
refresh_interval=2,
14+
start_wait=5)
1415

1516
# Server Side SDK Key for your project and environment
1617
SDK_KEY = 'server-8ed48815ef044428826787e9a238b9c6a479f98c'
1718
with fp.Client(SDK_KEY, config) as client:
19+
if client.initialized():
20+
print("SDK successfully initialized!")
21+
else:
22+
print("SDK failed to initialize!")
23+
exit()
1824
# Create one user
1925
# "userId" is used in rules, should be filled in.
2026
user = fp.User().with_attr('userId', '00001')

featureprobe/client.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import logging
1616
import time
17+
from threading import Event
1718
from typing import Any
1819

1920
from featureprobe.config import Config
@@ -44,9 +45,21 @@ def __init__(self, server_sdk_key: str, config: Config = Config()):
4445
context = Context(server_sdk_key, config)
4546
self._event_processor = config.event_processor_creator(context)
4647
self._data_repo = config.data_repository_creator(context)
48+
49+
synchronize_process_ready = Event()
4750
self._synchronizer = config.synchronizer_creator(
48-
context, self._data_repo)
51+
context, self._data_repo, synchronize_process_ready)
4952
self._synchronizer.sync()
53+
if config.start_wait > 0:
54+
Client.__logger.info("Waiting up to " +
55+
str(config.start_wait) +
56+
" seconds for FeatureProbe client to initialize...")
57+
synchronize_process_ready.wait(config.start_wait)
58+
if self._synchronizer.initialized() is True:
59+
Client.__logger.info("Started FeatureProbe Client: Successfully")
60+
else:
61+
Client.__logger.warning(
62+
"Initialization timeout exceeded for FeatureProbe Client or an error occurred")
5063

5164
def __enter__(self):
5265
return self
@@ -63,6 +76,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
6376
"""
6477
self.close()
6578

79+
def initialized(self):
80+
"""Tests whether the FeatureProbe client is ready to be used"""
81+
return self._synchronizer.initialized()
82+
6683
def flush(self):
6784
"""Manually push events"""
6885
self._event_processor.flush()

featureprobe/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ def __init__(self,
5050
event_url: str = None,
5151
remote_uri: str = 'http://127.0.0.1:4007',
5252
http_config: HttpConfig = HttpConfig(),
53-
refresh_interval: Union[timedelta, float] = timedelta(seconds=5),
53+
refresh_interval: Union[timedelta, float] = timedelta(seconds=2),
54+
start_wait: float = 5,
5455
):
5556
self._location = location
5657
self._synchronizer_creator = SyncMode(sync_mode).synchronizer_creator
@@ -59,6 +60,7 @@ def __init__(self,
5960
self._synchronizer_url = synchronizer_url
6061
self._event_url = event_url
6162
self._remote_uri = remote_uri
63+
self._start_wait = start_wait
6264
self._http_config = http_config or HttpConfig()
6365
self._refresh_interval = refresh_interval \
6466
if isinstance(refresh_interval, timedelta) \
@@ -99,3 +101,7 @@ def http_config(self):
99101
@property
100102
def refresh_interval(self):
101103
return self._refresh_interval
104+
105+
@property
106+
def start_wait(self):
107+
return self._start_wait

featureprobe/file_synchronizer.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import json
1616
import logging
17+
from threading import Event
1718
from typing import TYPE_CHECKING
1819

1920
from featureprobe.model.repository import Repository
@@ -29,27 +30,35 @@ class FileSynchronizer(Synchronizer):
2930

3031
def __init__(self,
3132
data_repository: "DataRepository",
32-
location: str):
33+
location: str,
34+
ready: "Event"):
3335
self._data_repository = data_repository
3436
self._location = location
37+
self._ready = ready
3538

3639
@classmethod
3740
def from_context(
3841
cls,
3942
context: "Context",
40-
data_repo: "DataRepository") -> "Synchronizer":
41-
return cls(data_repo, context.location)
43+
data_repo: "DataRepository",
44+
ready: "Event") -> "Synchronizer":
45+
return cls(data_repo, context.location, ready)
4246

4347
def sync(self):
4448
try:
4549
with open(self._location, 'r', encoding='utf-8') as f:
4650
repo = Repository.from_json(json.load(f))
4751
self._data_repository.refresh(repo)
52+
self._ready.set()
4853
except FileNotFoundError:
4954
# sourcery skip: replace-interpolation-with-fstring
5055
self._logger.error(
5156
'repository file resource not found in path: %s' %
5257
self._location)
5358

59+
def initialized(self):
60+
return self._ready.is_set()
61+
5462
def close(self):
63+
self._ready.clear()
5564
return

featureprobe/pooling_synchronizer.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@
3232
class PoolingSynchronizer(Synchronizer):
3333
__logger = logging.getLogger('FeatureProbe-Synchronizer')
3434

35-
def __init__(self, context: "Context", data_repo: "DataRepository"):
35+
def __init__(
36+
self,
37+
context: "Context",
38+
data_repo: "DataRepository",
39+
ready: "threading.Event"):
3640
self._refresh_interval = context.refresh_interval
3741
self._api_url = context.synchronizer_url
3842
self._data_repo = data_repo
@@ -47,13 +51,15 @@ def __init__(self, context: "Context", data_repo: "DataRepository"):
4751

4852
self._scheduler = None
4953
self._lock = threading.RLock()
54+
self._ready = ready
5055

5156
@classmethod
5257
def from_context(
5358
cls,
5459
context: "Context",
55-
data_repo: "DataRepository") -> "Synchronizer":
56-
return cls(context, data_repo)
60+
data_repo: "DataRepository",
61+
ready: "threading.Event") -> "Synchronizer":
62+
return cls(context, data_repo, ready)
5763

5864
def sync(self):
5965
PoolingSynchronizer.__logger.info(
@@ -77,6 +83,7 @@ def close(self):
7783
with self._lock:
7884
self._scheduler.shutdown()
7985
del self._scheduler
86+
self._ready.clear()
8087

8188
def _poll(self):
8289
try:
@@ -88,7 +95,13 @@ def _poll(self):
8895
self.__logger.debug('Http response body: %s' % body)
8996
repo = Repository.from_json(body)
9097
self._data_repo.refresh(repo)
98+
99+
if not self._ready.is_set():
100+
self._ready.set()
91101
except Exception as e: # noqa
92102
self.__logger.error(
93103
'Unexpected error from polling processor',
94104
exc_info=e)
105+
106+
def initialized(self):
107+
return self._ready.is_set()

featureprobe/synchronizer.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from abc import ABC, abstractmethod
16+
from threading import Event
1617

1718
from typing import TYPE_CHECKING
1819

@@ -28,7 +29,8 @@ class Synchronizer(ABC):
2829
def from_context(
2930
cls,
3031
context: "Context",
31-
data_repo: "DataRepository") -> "Synchronizer":
32+
data_repo: "DataRepository",
33+
ready: "Event") -> "Synchronizer":
3234
pass
3335

3436
@abstractmethod
@@ -39,5 +41,9 @@ def sync(self):
3941
def close(self):
4042
pass
4143

44+
@abstractmethod
45+
def initialized(self):
46+
pass
47+
4248
def __exit__(self, exc_type, exc_val, exc_tb):
4349
self.close()

tests/polling_synchronizer_test.py

Lines changed: 0 additions & 22 deletions
This file was deleted.

tests/pooling_synchronizer_test.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,46 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
15+
from featureprobe.pooling_synchronizer import PoolingSynchronizer
16+
from featureprobe.memory_data_repository import MemoryDataRepository
17+
from featureprobe.config import Config
18+
from featureprobe.context import Context
19+
from threading import Event
20+
from requests import Session
21+
from unittest.mock import patch
22+
23+
24+
def test_init_synchronizer_failed():
25+
realy = Event()
26+
context = Context("test-sdk-key", Config())
27+
synchroizer = PoolingSynchronizer.from_context(
28+
context, MemoryDataRepository.from_context(context), realy)
29+
realy.wait(2)
30+
31+
assert not synchroizer.initialized()
32+
33+
34+
@patch.object(Session, 'get')
35+
def test_init_synchronizer_wait_for_init_success(session_get):
36+
realy = Event()
37+
context = Context("test-sdk-key", Config())
38+
session_get.return_value = MockHttpReponse(200, '{}')
39+
synchroizer = PoolingSynchronizer.from_context(
40+
context, MemoryDataRepository.from_context(context), realy)
41+
synchroizer.sync()
42+
realy.wait(5)
43+
44+
assert synchroizer.initialized()
45+
46+
47+
class MockHttpReponse:
48+
def __init__(self, status_code, json_response):
49+
self.status_code = status_code
50+
self.response = json_response
51+
52+
def raise_for_status(self):
53+
pass
54+
55+
def json(self):
56+
return self.response

0 commit comments

Comments
 (0)