|
| 1 | +import inspect |
| 2 | +from typing import get_type_hints |
| 3 | +import warnings |
1 | 4 | class Event: |
2 | 5 | # TODO: should "sensors" arg of the trigger function be a dictionary instead |
3 | 6 | # of a list? It would be more intuitive to access the sensors by name |
@@ -89,11 +92,10 @@ def __init__(self, trigger, action, name, event_context=None): |
89 | 92 | passed to subsequent calls. Defaults to an empty dictionary if not |
90 | 93 | provided. |
91 | 94 | """ |
| 95 | + self.name = name |
92 | 96 | self.trigger = self.__verify_trigger(trigger) |
93 | 97 | self.action = self.__verify_action(action) |
94 | | - self.name = name |
95 | 98 | self.event_context = event_context if event_context is not None else {} |
96 | | - |
97 | 99 | # TODO: implement tracking for whether this event is currently enabled |
98 | 100 | # or disabled. The disable_event flag from the action return value should |
99 | 101 | # control whether this event continues to be checked for triggering. |
@@ -122,8 +124,89 @@ def __verify_trigger(self, trigger): |
122 | 124 | # 2. Return type annotation is bool or can be tested to return bool |
123 | 125 | # 3. Consider allowing signature to be flexible (accepts **kwargs) |
124 | 126 | # to accommodate user-defined custom event_context keys |
| 127 | + # verify if the return type is bool when annotated |
| 128 | + return_annotation = get_type_hints(trigger).get('return', None) |
| 129 | + if return_annotation is not None and return_annotation is not bool: |
| 130 | + raise ValueError(f"Trigger function {self.name} must return a boolean value.") |
| 131 | + # verify if the trigger function accepts **kwargs and therefore can |
| 132 | + # receive standard event arguments plus custom event_context keys |
| 133 | + s = inspect.signature(trigger) |
| 134 | + if not any(p.kind == inspect.Parameter.VAR_KEYWORD for p in s.parameters.values()): |
| 135 | + raise ValueError( |
| 136 | + f"Trigger function {self.name} must accept **kwargs to receive event context " |
| 137 | + f"and simulation state." |
| 138 | + ) |
| 139 | + if any(p.kind == inspect.Parameter.POSITIONAL_ONLY for p in s.parameters.values()): |
| 140 | + raise ValueError( |
| 141 | + f"Trigger function {self.name} must accept keyword arguments; " |
| 142 | + "positional-only parameters are not supported." |
| 143 | + ) |
| 144 | + # Helper function to generate dummy values based on type annotations |
| 145 | + # of parameters, allowing to test the function without real values |
| 146 | + def _placeholder_for_parameter(parameter): |
| 147 | + annotation = parameter.annotation |
| 148 | + if annotation is inspect.Parameter.empty: |
| 149 | + warnings.warn(f"Trigger function {self.name}: Test with parameters skipped due " |
| 150 | + f"to missing type annotation for parameter '{parameter.name}'. \n" |
| 151 | + f"Is highly recommended that parameters have type annotations " |
| 152 | + f"(var: type). Parameter '{parameter.name}' has no annotation.") |
| 153 | + skip_test = True |
| 154 | + return None, skip_test |
| 155 | + if annotation in (int, float): |
| 156 | + return 0, False |
| 157 | + if annotation is bool: |
| 158 | + return False, False |
| 159 | + if annotation is str: |
| 160 | + return "", False |
| 161 | + if annotation in (list, tuple, set, dict): |
| 162 | + return annotation(), False |
| 163 | + origin = getattr(annotation, "__origin__", None) |
| 164 | + if origin in (list, tuple, set, dict): |
| 165 | + return origin(), False |
| 166 | + return None, False |
| 167 | + # Build a dictionary with dummy values to test if function accepts **kwargs |
| 168 | + # Include an unexpected argument to validate the function doesn't complain |
| 169 | + test_kwargs = {"unexpected_kwarg": 123} |
| 170 | + skip_test = False |
| 171 | + # Iterate through function parameters to generate appropriate test values |
| 172 | + for name, parameter in s.parameters.items(): |
| 173 | + if parameter.kind in ( |
| 174 | + inspect.Parameter.POSITIONAL_OR_KEYWORD, |
| 175 | + inspect.Parameter.KEYWORD_ONLY, |
| 176 | + ): |
| 177 | + if parameter.default is inspect.Parameter.empty: |
| 178 | + annotation = parameter.annotation |
| 179 | + if annotation in (list, tuple, set, dict): |
| 180 | + skip_test = True |
| 181 | + elif hasattr(annotation, "__origin__") and getattr(annotation, "__origin__", None) in (list, tuple, set, dict): |
| 182 | + skip_test = True |
| 183 | + else: |
| 184 | + test_kwargs[name], skip_test = _placeholder_for_parameter(parameter) |
| 185 | + # Execute the trigger function with test values to validate compatibility |
| 186 | + # If TypeError occurs, the function doesn't properly accept **kwargs |
| 187 | + if not skip_test: |
| 188 | + try: |
| 189 | + trigger(**test_kwargs) |
| 190 | + except TypeError as e: |
| 191 | + raise ValueError( |
| 192 | + f"Trigger function {self.name} must accept arbitrary kwargs without raising " |
| 193 | + "a TypeError." |
| 194 | + ) from e |
| 195 | + except Exception as e: |
| 196 | + raise ValueError( |
| 197 | + f"Trigger function {self.name} must accept arbitrary kwargs without raising " |
| 198 | + f"an error: {e}" |
| 199 | + ) from e |
| 200 | + else: |
| 201 | + # Test was skipped due to complex types; warn user to validate manually |
| 202 | + warnings.warn( |
| 203 | + f"Trigger function {self.name}: Test with parameters " |
| 204 | + f"skipped for parameters with complex types " |
| 205 | + f"(list, tuple, set, dict). Ensure the function handles " |
| 206 | + f"arbitrary inputs gracefully." |
| 207 | + ) |
125 | 208 | return trigger |
126 | | - |
| 209 | + |
127 | 210 | def __verify_action(self, action): |
128 | 211 | """Verifies that the action function is valid. |
129 | 212 |
|
|
0 commit comments