Skip to content

Commit 6507271

Browse files
committed
move scenario processing to its own file
1 parent 9e37724 commit 6507271

2 files changed

Lines changed: 270 additions & 229 deletions

File tree

robotmbt/modeller.py

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
# -*- coding: utf-8 -*-
2+
3+
# BSD 3-Clause License
4+
#
5+
# Copyright (c) 2026, J. Foederer
6+
# All rights reserved.
7+
#
8+
# Redistribution and use in source and binary forms, with or without
9+
# modification, are permitted provided that the following conditions are met:
10+
#
11+
# 1. Redistributions of source code must retain the above copyright notice, this
12+
# list of conditions and the following disclaimer.
13+
#
14+
# 2. Redistributions in binary form must reproduce the above copyright notice,
15+
# this list of conditions and the following disclaimer in the documentation
16+
# and/or other materials provided with the distribution.
17+
#
18+
# 3. Neither the name of the copyright holder nor the names of its
19+
# contributors may be used to endorse or promote products derived from
20+
# this software without specific prior written permission.
21+
#
22+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
25+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
26+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
28+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32+
33+
from typing import Any
34+
35+
from robot.api import logger
36+
from robot.utils import is_list_like
37+
38+
from .modelspace import ModelSpace
39+
from .steparguments import StepArgument, StepArguments
40+
from .substitutionmap import SubstitutionMap
41+
from .suitedata import Scenario, Step
42+
from .tracestate import TraceState
43+
44+
45+
def try_to_fit_in_scenario(candidate: Scenario, tracestate: TraceState):
46+
"""
47+
Tries to insert the candidate scenario into the trace (in full or partial) and
48+
updates tracestate accordingly.
49+
"""
50+
model = tracestate.model if tracestate.model else ModelSpace()
51+
model.new_scenario_scope()
52+
inserted, remainder, new_model, extra_data = process_scenario(candidate, model)
53+
if not inserted: # insertion failed
54+
tracestate.reject_scenario(candidate.src_id)
55+
logger.debug(extra_data['fail_msg'])
56+
elif not remainder: # the scenario processed in full
57+
new_model.end_scenario_scope()
58+
tracestate.confirm_full_scenario(inserted.src_id, inserted, new_model)
59+
logger.debug(f"Inserted scenario {inserted.src_id}, {inserted.name}")
60+
if tracestate.is_refinement_active():
61+
handle_refinement_exit(inserted, tracestate)
62+
else: # the scenario is split into two parts, ready for refinement
63+
logger.debug(f"Partially inserted scenario {inserted.src_id}, {inserted.name}\n"
64+
f"Refinement needed at step: {remainder.steps[1]}")
65+
inserted.name = f"{inserted.name} (part {tracestate.highest_part(inserted.src_id)+1})"
66+
tracestate.push_partial_scenario(inserted.src_id, inserted, new_model, remainder)
67+
68+
69+
def process_scenario(scenario: Scenario, model: ModelSpace) -> tuple[Scenario, Scenario, ModelSpace, dict[str, Any]]:
70+
for step in scenario.steps:
71+
if 'error' in step.model_info:
72+
return None, None, model, dict(fail_masg=f"Error in scenario {scenario.name} "
73+
f"at step {step}: {step.model_info['error']}")
74+
for expr in _relevant_expressions(step):
75+
try:
76+
if model.process_expression(expr, step.args) is False:
77+
if step.gherkin_kw in ['when', None] and expr in step.model_info['OUT']:
78+
part1, part2 = split_for_refinement(scenario, step)
79+
return part1, part2, model, dict()
80+
else:
81+
return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, "
82+
f"{scenario.name}, due to step '{step}': [{expr}] is False")
83+
except Exception as err:
84+
return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, "
85+
f"{scenario.name}, due to step '{step}': [{expr}] {err}")
86+
return scenario.copy(), None, model, dict()
87+
88+
89+
def _relevant_expressions(step: Step) -> list[str]:
90+
if step.gherkin_kw is None and not step.model_info:
91+
return [] # model info is optional for action keywords
92+
expressions = []
93+
if 'IN' not in step.model_info or 'OUT' not in step.model_info:
94+
raise Exception(f"Model info incomplete for step: {step}")
95+
if step.gherkin_kw in ['given', 'when', None]:
96+
expressions += step.model_info['IN']
97+
if step.gherkin_kw in ['when', 'then', None]:
98+
expressions += step.model_info['OUT']
99+
return expressions
100+
101+
102+
def split_for_refinement(scenario: Scenario, step: Step) -> tuple[Scenario, Scenario]:
103+
front, back = scenario.split_at_step(scenario.steps.index(step))
104+
remaining_steps = '\n\t'.join([step.full_keyword, '- '*35] + [s.full_keyword for s in back.steps[1:]])
105+
remaining_steps = _escape_robot_vars(remaining_steps)
106+
edge_step = Step('Log', f"Refinement follows for step:\n\t{remaining_steps}", parent=scenario)
107+
edge_step.gherkin_kw = step.gherkin_kw
108+
edge_step.model_info = dict(IN=step.model_info['IN'], OUT=[])
109+
edge_step.detached = True
110+
edge_step.args = StepArguments(step.args)
111+
front.steps.append(edge_step)
112+
back.steps.insert(0, Step('Log', f"Refinement ready, completing step", parent=scenario))
113+
back.steps[1] = back.steps[1].copy()
114+
back.steps[1].model_info['IN'] = []
115+
return (front, back)
116+
117+
118+
def _escape_robot_vars(text: str) -> str:
119+
for seq in ("${", "@{", "%{", "&{", "*{"):
120+
text = text.replace(seq, "\\" + seq)
121+
return text
122+
123+
124+
def handle_refinement_exit(inserted_refinement: Scenario, tracestate: TraceState):
125+
refinement_tail = tracestate.get_remainder(tracestate.active_refinements[-1])
126+
exit_conditions = refinement_tail.steps[1].model_info['OUT']
127+
exit_conditions_processed = False
128+
for expr in exit_conditions:
129+
try:
130+
if tracestate.model.process_expression(expr, refinement_tail.steps[1].args) is False:
131+
break
132+
except Exception:
133+
break
134+
else:
135+
exit_conditions_processed = True
136+
137+
if not exit_conditions_processed:
138+
rewind(tracestate) # Reject insterted scenario. Even though it fits, it is not a refinement.
139+
logger.debug(f"Reconsidering scenario {inserted_refinement.src_id}, {inserted_refinement.name}, "
140+
f"did not meet refinement exit condition: {exit_conditions}")
141+
return
142+
143+
tail_inserted, remainder, new_model, extra_data = process_scenario(refinement_tail, tracestate.model)
144+
if not tail_inserted:
145+
logger.debug(extra_data['fail_msg'])
146+
# Confirm then rewind, to roll back complete scenario, including its refiements
147+
# Because that exit check passed, this is an error in the refined scenario itself
148+
tracestate.confirm_full_scenario(refinement_tail.src_id, refinement_tail, new_model)
149+
tail = rewind(tracestate)
150+
logger.debug(f"Having to roll back up to {tail.scenario.name if tail else 'the beginning'}")
151+
elif not remainder:
152+
new_model.end_scenario_scope()
153+
tracestate.confirm_full_scenario(tail_inserted.src_id, tail_inserted, new_model)
154+
logger.debug(f"Scenario '{tail_inserted.name}' completed after refinement")
155+
if tracestate.is_refinement_active():
156+
handle_refinement_exit(tail_inserted, tracestate)
157+
else:
158+
logger.debug(f"Partially inserted remainder of scenario {tail_inserted.src_id}, {tail_inserted.name}\n"
159+
f"refinement needed at step: {remainder.steps[1]}")
160+
tail_inserted.name = f"{tail_inserted.name} (part {tracestate.highest_part(tail_inserted.src_id)+1})"
161+
tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, new_model, remainder)
162+
163+
164+
def generate_scenario_variant(scenario: Scenario, model: ModelSpace) -> Scenario:
165+
scenario = scenario.copy()
166+
# collect set of constraints
167+
subs = SubstitutionMap()
168+
try:
169+
for step in scenario.steps:
170+
for expr in step.model_info.get('MOD', []):
171+
modded_arg, constraint = _parse_modifier_expression(expr, step.args)
172+
if step.args[modded_arg].is_default:
173+
continue
174+
if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]:
175+
org_example = step.args[modded_arg].org_value
176+
if step.gherkin_kw == 'then':
177+
constraint = None # No new constraints are processed for then-steps
178+
if org_example not in subs.substitutions:
179+
# if a then-step signals the first use of an example value, it is considered a new definition
180+
subs.substitute(org_example, [org_example])
181+
continue
182+
if not constraint and org_example not in subs.substitutions:
183+
raise ValueError(f"No options to choose from at first assignment to {org_example}")
184+
if constraint and constraint != '.*':
185+
options = model.process_expression(constraint, step.args)
186+
if options == 'exec':
187+
raise ValueError(f"Invalid constraint for argument substitution: {expr}")
188+
if not options:
189+
raise ValueError(f"Constraint on modifer did not yield any options: {expr}")
190+
if not is_list_like(options):
191+
raise ValueError(f"Constraint on modifer did not yield a set of options: {expr}")
192+
else:
193+
options = None
194+
subs.substitute(org_example, options)
195+
elif step.args[modded_arg].kind == StepArgument.VAR_POS:
196+
if step.args[modded_arg].value:
197+
modded_varargs = model.process_expression(constraint, step.args)
198+
if not is_list_like(modded_varargs):
199+
raise ValueError(f"Modifying varargs must yield a list of arguments")
200+
# Varargs are not added to the substitution map, but are used directly as-is. A modifier can
201+
# change the number of arguments in the list, making it impossible to decide which values to
202+
# match and which to drop and/or duplicate.
203+
step.args[modded_arg].value = modded_varargs
204+
elif step.args[modded_arg].kind == StepArgument.FREE_NAMED:
205+
if step.args[modded_arg].value:
206+
modded_free_args = model.process_expression(constraint, step.args)
207+
if not isinstance(modded_free_args, dict):
208+
raise ValueError("Modifying free named arguments must yield a dict")
209+
# Similar to varargs, modified free named arguments are used directly as-is.
210+
step.args[modded_arg].value = modded_free_args
211+
else:
212+
raise AssertionError(f"Unknown argument kind for {modded_arg}")
213+
except Exception as err:
214+
logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n"
215+
f" In step {step}: {err}")
216+
return None
217+
218+
try:
219+
subs.solve()
220+
except ValueError as err:
221+
logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n"
222+
f" {err}: {subs}")
223+
return None
224+
225+
# Update scenario with generated values
226+
if subs.solution:
227+
logger.debug(f"Example variant generated with argument substitution: {subs}")
228+
scenario.data_choices = subs
229+
for step in scenario.steps:
230+
if 'MOD' in step.model_info:
231+
for expr in step.model_info['MOD']:
232+
modded_arg, _ = _parse_modifier_expression(expr, step.args)
233+
if step.args[modded_arg].is_default:
234+
continue
235+
org_example = step.args[modded_arg].org_value
236+
if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]:
237+
step.args[modded_arg].value = subs.solution[org_example]
238+
return scenario
239+
240+
241+
def _parse_modifier_expression(expression: str, args: tuple[str]) -> tuple[str, str]:
242+
if expression.startswith('${'):
243+
for var in args:
244+
if expression.casefold().startswith(var.arg.casefold()):
245+
assignment_expr = expression.replace(var.arg, '', 1).strip()
246+
if not assignment_expr.startswith('=') or assignment_expr.startswith('=='):
247+
break # not an assignment
248+
constraint = assignment_expr.replace('=', '', 1).strip()
249+
return var.arg, constraint
250+
raise ValueError(f"Invalid argument substitution: {expression}")
251+
252+
253+
def rewind(tracestate: TraceState, drought_recovery: bool = False) -> Scenario:
254+
if tracestate[-1].remainder and tracestate.highest_part(tracestate[-1].remainder.src_id) > 1:
255+
# When rewinding an 'in between' part, rewind both the part and the refinement
256+
tracestate.rewind()
257+
tail = tracestate.rewind()
258+
while drought_recovery and tracestate.coverage_drought:
259+
tail = tracestate.rewind()
260+
return tail

0 commit comments

Comments
 (0)