forked from Flagsmith/flagsmith-openfeature-provider-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprovider.py
More file actions
141 lines (123 loc) · 4.92 KB
/
provider.py
File metadata and controls
141 lines (123 loc) · 4.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import json
import typing
from json import JSONDecodeError
from flagsmith.exceptions import FlagsmithClientError
from flagsmith.flagsmith import Flagsmith
from openfeature.evaluation_context import EvaluationContext
from openfeature.exception import (
ErrorCode,
FlagNotFoundError,
ParseError,
TypeMismatchError,
)
from openfeature.flag_evaluation import FlagResolutionDetails, FlagType
from openfeature.provider import Metadata, AbstractProvider
from openfeature_flagsmith.exceptions import FlagsmithProviderError
_BASIC_FLAG_TYPE_MAPPINGS = {
FlagType.BOOLEAN: bool,
FlagType.INTEGER: int,
FlagType.FLOAT: float,
FlagType.STRING: str,
}
class FlagsmithProvider(AbstractProvider):
def __init__(
self,
client: Flagsmith,
use_boolean_config_value: bool = False,
return_value_for_disabled_flags: bool = False,
use_flagsmith_defaults: bool = False,
):
self._client = client
self.return_value_for_disabled_flags = return_value_for_disabled_flags
self.use_flagsmith_defaults = use_flagsmith_defaults
self.use_boolean_config_value = use_boolean_config_value
def get_metadata(self) -> Metadata:
return Metadata(name="FlagsmithProvider")
def resolve_boolean_details(
self,
flag_key: str,
default_value: bool,
evaluation_context: EvaluationContext = EvaluationContext(),
) -> FlagResolutionDetails[bool]:
return self._resolve(
flag_key, FlagType.BOOLEAN, default_value, evaluation_context
)
def resolve_string_details(
self,
flag_key: str,
default_value: str,
evaluation_context: EvaluationContext = EvaluationContext(),
) -> FlagResolutionDetails[str]:
return self._resolve(
flag_key, FlagType.STRING, default_value, evaluation_context
)
def resolve_integer_details(
self,
flag_key: str,
default_value: int,
evaluation_context: EvaluationContext = EvaluationContext(),
) -> FlagResolutionDetails[int]:
return self._resolve(
flag_key, FlagType.INTEGER, default_value, evaluation_context
)
def resolve_float_details(
self,
flag_key: str,
default_value: float,
evaluation_context: EvaluationContext = EvaluationContext(),
) -> FlagResolutionDetails[float]:
return self._resolve(
flag_key, FlagType.FLOAT, default_value, evaluation_context
)
def resolve_object_details(
self,
flag_key: str,
default_value: typing.Union[dict, list],
evaluation_context: EvaluationContext = EvaluationContext(),
) -> FlagResolutionDetails[typing.Union[dict, list]]:
return self._resolve(
flag_key, FlagType.OBJECT, default_value, evaluation_context
)
def _resolve(
self,
flag_key: str,
flag_type: FlagType,
default_value: typing.Any,
evaluation_context: EvaluationContext,
) -> FlagResolutionDetails:
try:
flag = self._get_flags(evaluation_context).get_flag(flag_key)
except FlagsmithClientError as e:
raise FlagsmithProviderError(
error_code=ErrorCode.GENERAL,
error_message="An error occurred retrieving flags from Flagsmith client.",
) from e
if flag.is_default and not self.use_flagsmith_defaults:
raise FlagNotFoundError(error_message="Flag '%s' was not found." % flag_key)
if flag_type == FlagType.BOOLEAN and not self.use_boolean_config_value:
return FlagResolutionDetails(value=flag.enabled)
if not (self.return_value_for_disabled_flags or flag.enabled):
raise FlagsmithProviderError(
error_code=ErrorCode.GENERAL,
error_message="Flag '%s' is not enabled." % flag_key,
)
required_type = _BASIC_FLAG_TYPE_MAPPINGS.get(flag_type)
if required_type and isinstance(flag.value, required_type):
return FlagResolutionDetails(value=flag.value)
elif flag_type is FlagType.OBJECT and isinstance(flag.value, str):
try:
return FlagResolutionDetails(value=json.loads(flag.value))
except JSONDecodeError as e:
msg = "Unable to parse object from value for flag '%s'" % flag_key
raise ParseError(error_message=msg) from e
raise TypeMismatchError(
error_message="Value for flag '%s' is not of type '%s'"
% (flag_key, flag_type.value)
)
def _get_flags(self, evaluation_context: EvaluationContext = EvaluationContext()):
if targeting_key := evaluation_context.targeting_key:
return self._client.get_identity_flags(
identifier=targeting_key,
traits=evaluation_context.attributes.get("traits", {}),
)
return self._client.get_environment_flags()