|
7 | 7 | from policyengine_core.parameters.parameter_node import ParameterNode |
8 | 8 | from policyengine_core.variables import Variable |
9 | 9 |
|
| 10 | +MAX_DYNAMIC_BREAKDOWN_VALUES = 10_000 |
| 11 | + |
10 | 12 |
|
11 | 13 | def homogenize_parameter_structures( |
12 | 14 | root: ParameterNode, variables: Dict[str, Variable], default_value: Any = 0 |
@@ -44,6 +46,11 @@ def get_breakdown_variables(node: ParameterNode) -> List[str]: |
44 | 46 | f"Invalid breakdown metadata for parameter {node.name}: {type(breakdown)}" |
45 | 47 | ) |
46 | 48 | return None |
| 49 | + if len(breakdown) == 0: |
| 50 | + logging.warning( |
| 51 | + f"Invalid breakdown metadata for parameter {node.name}: empty list" |
| 52 | + ) |
| 53 | + return None |
47 | 54 | return breakdown |
48 | 55 | else: |
49 | 56 | return None |
@@ -131,41 +138,72 @@ def evaluate_dynamic_breakdown(expression: str) -> List[Any]: |
131 | 138 | parsed = ast.parse(expression, mode="eval") |
132 | 139 | evaluated = evaluate_dynamic_breakdown_node(parsed.body) |
133 | 140 | if isinstance(evaluated, range): |
| 141 | + validate_dynamic_breakdown_range_cardinality(evaluated, expression) |
134 | 142 | return list(evaluated) |
135 | 143 | if isinstance(evaluated, (list, tuple)): |
| 144 | + validate_dynamic_breakdown_cardinality(len(evaluated), expression) |
136 | 145 | return list(evaluated) |
137 | 146 | if isinstance(evaluated, set): |
| 147 | + validate_dynamic_breakdown_cardinality(len(evaluated), expression) |
138 | 148 | return list(evaluated) |
139 | 149 | raise ValueError( |
140 | 150 | f"Invalid dynamic breakdown expression '{expression}'. " |
141 | 151 | "Only literal collections and range() calls are allowed." |
142 | 152 | ) |
143 | 153 |
|
144 | 154 |
|
| 155 | +def validate_dynamic_breakdown_cardinality(count: int, expression: str) -> None: |
| 156 | + if count > MAX_DYNAMIC_BREAKDOWN_VALUES: |
| 157 | + raise ValueError( |
| 158 | + f"Dynamic breakdown expression '{expression}' produces {count} values, " |
| 159 | + f"which exceeds the maximum of {MAX_DYNAMIC_BREAKDOWN_VALUES}." |
| 160 | + ) |
| 161 | + |
| 162 | + |
| 163 | +def validate_dynamic_breakdown_range_cardinality( |
| 164 | + values: range, expression: str |
| 165 | +) -> None: |
| 166 | + try: |
| 167 | + count = len(values) |
| 168 | + except OverflowError as exc: |
| 169 | + raise ValueError( |
| 170 | + f"Dynamic breakdown expression '{expression}' produces too many values." |
| 171 | + ) from exc |
| 172 | + validate_dynamic_breakdown_cardinality(count, expression) |
| 173 | + |
| 174 | + |
145 | 175 | def evaluate_dynamic_breakdown_node(node: ast.AST) -> Any: |
146 | 176 | if isinstance(node, ast.Constant): |
147 | 177 | return node.value |
148 | 178 | if isinstance(node, ast.List): |
| 179 | + validate_dynamic_breakdown_cardinality(len(node.elts), ast.unparse(node)) |
149 | 180 | return [evaluate_dynamic_breakdown_node(element) for element in node.elts] |
150 | 181 | if isinstance(node, ast.Tuple): |
| 182 | + validate_dynamic_breakdown_cardinality(len(node.elts), ast.unparse(node)) |
151 | 183 | return tuple(evaluate_dynamic_breakdown_node(element) for element in node.elts) |
152 | 184 | if isinstance(node, ast.Set): |
| 185 | + validate_dynamic_breakdown_cardinality(len(node.elts), ast.unparse(node)) |
153 | 186 | return {evaluate_dynamic_breakdown_node(element) for element in node.elts} |
154 | | - if isinstance(node, ast.UnaryOp) and isinstance( |
155 | | - node.op, (ast.UAdd, ast.USub) |
156 | | - ): |
| 187 | + if isinstance(node, ast.UnaryOp) and isinstance(node.op, (ast.UAdd, ast.USub)): |
157 | 188 | operand = evaluate_dynamic_breakdown_node(node.operand) |
158 | 189 | return operand if isinstance(node.op, ast.UAdd) else -operand |
159 | 190 | if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): |
160 | 191 | if node.func.id == "range": |
161 | 192 | args = [evaluate_dynamic_breakdown_node(arg) for arg in node.args] |
162 | 193 | if node.keywords: |
163 | 194 | raise ValueError("range() keyword arguments are not allowed") |
164 | | - return range(*args) |
| 195 | + result = range(*args) |
| 196 | + validate_dynamic_breakdown_range_cardinality(result, ast.unparse(node)) |
| 197 | + return result |
165 | 198 | if node.func.id == "list": |
166 | 199 | if len(node.args) != 1 or node.keywords: |
167 | 200 | raise ValueError("list() must contain a single positional argument") |
168 | | - return list(evaluate_dynamic_breakdown_node(node.args[0])) |
| 201 | + evaluated = evaluate_dynamic_breakdown_node(node.args[0]) |
| 202 | + if isinstance(evaluated, (range, list, tuple, set)): |
| 203 | + return evaluated |
| 204 | + raise ValueError( |
| 205 | + "list() only supports range() and literal collection expressions" |
| 206 | + ) |
169 | 207 | raise ValueError( |
170 | 208 | f"Unsupported dynamic breakdown expression: {ast.unparse(node) if hasattr(ast, 'unparse') else type(node).__name__}" |
171 | 209 | ) |
0 commit comments