From f067277885f49cd3833aba81d00a2949c97dc941 Mon Sep 17 00:00:00 2001 From: JFoederer <32476108+JFoederer@users.noreply.github.com> Date: Fri, 26 Dec 2025 12:13:33 +0100 Subject: [PATCH 1/9] reuse scenario indexes for TraceState --- robotmbt/suiteprocessors.py | 2 +- robotmbt/tracestate.py | 60 +++--- utest/test_tracestate.py | 307 ++++++++++++++-------------- utest/test_tracestate_refinement.py | 94 ++++----- 4 files changed, 233 insertions(+), 230 deletions(-) diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index 669b64bd..72520279 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -105,7 +105,7 @@ def process_test_suite(self, in_suite, *, seed='new'): return self.out_suite def _try_to_reach_full_coverage(self, allow_duplicate_scenarios): - self.tracestate = TraceState(len(self.scenarios)) + self.tracestate = TraceState(range(len(self.scenarios))) self.active_model = ModelSpace() while not self.tracestate.coverage_reached(): i_candidate = self.tracestate.next_candidate(retry=allow_duplicate_scenarios) diff --git a/robotmbt/tracestate.py b/robotmbt/tracestate.py index 22526748..77b936ec 100644 --- a/robotmbt/tracestate.py +++ b/robotmbt/tracestate.py @@ -31,17 +31,18 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. class TraceState: - def __init__(self, n_scenarios): - self._c_pool = [False] * n_scenarios # coverage pool: True means scenario is in trace + def __init__(self, scenario_indexes): + if len(scenario_indexes) != len(set(scenario_indexes)): + raise ValueError("Scenarios must be uniquely identifiable") + self.c_pool = {index: 0 for index in scenario_indexes} self._tried = [[]] # Keeps track of the scenarios already tried at each step in the trace - self._trace = [] # Choice trace, when was which scenario inserted (e.g. ['1', '2.1', '3', '2.0']) self._snapshots = [] # Keeps details for elements in trace self._open_refinements = [] @property def model(self): """returns the model as it is at the end of the current trace""" - return self._snapshots[-1].model if self._trace else None + return self._snapshots[-1].model if self._snapshots else None @property def tried(self): @@ -49,23 +50,27 @@ def tried(self): return tuple(self._tried[-1]) def coverage_reached(self): - return all(self._c_pool) + return all(self.c_pool.values()) @property def coverage_drought(self): """Number of scenarios since last new coverage""" return self._snapshots[-1].coverage_drought if self._snapshots else 0 + @property + def id_trace(self): + return [snap.id for snap in self._snapshots] + def get_trace(self): return [snap.scenario for snap in self._snapshots] def next_candidate(self, retry=False): - for i in range(len(self._c_pool)): + for i in self.c_pool: if i not in self._tried[-1] and not self._is_refinement_active(i) and self.count(i) == 0: return i if not retry: return None - for i in range(len(self._c_pool)): + for i in self.c_pool: if i not in self._tried[-1] and not self._is_refinement_active(i): return i return None @@ -73,16 +78,16 @@ def next_candidate(self, retry=False): def count(self, index): """Count the number of times the index is present in the trace. unfinished partial scenarios are excluded.""" - return self._trace.count(str(index)) + self._trace.count(str(f"{index}.0")) + return self.c_pool[index] def highest_part(self, index): """Given the current trace and an index, returns the highest part number of an ongoing refinement for the related scenario. Returns 0 when there is no refinement active.""" - for i in range(1, len(self._trace)+1): - if self._trace[-i] == f'{index}': + for i in range(1, len(self.id_trace)+1): + if self.id_trace[-i] == f'{index}': return 0 - if self._trace[-i].startswith(f'{index}.'): - return int(self._trace[-i].split('.')[1]) + if self.id_trace[-i].startswith(f'{index}.'): + return int(self.id_trace[-i].split('.')[1]) return 0 def _is_refinement_active(self, index): @@ -91,7 +96,7 @@ def _is_refinement_active(self, index): def find_scenarios_with_active_refinement(self): scenarios = [] for i in self._open_refinements: - index = -self._trace[::-1].index(f'{i}.1')-1 + index = -self.id_trace[::-1].index(f'{i}.1')-1 scenarios.append(self._snapshots[index].scenario) return scenarios @@ -100,11 +105,8 @@ def reject_scenario(self, i_scenario): self._tried[-1].append(i_scenario) def confirm_full_scenario(self, index, scenario, model): - if not self._c_pool[index]: - self._c_pool[index] = True - c_drought = 0 - else: - c_drought = self.coverage_drought+1 + c_drought = 0 if self.c_pool[index] == 0 else self.coverage_drought+1 + self.c_pool[index] += 1 if self._is_refinement_active(index): id = f"{index}.0" self._open_refinements.pop() @@ -112,7 +114,6 @@ def confirm_full_scenario(self, index, scenario, model): id = str(index) self._tried[-1].append(index) self._tried.append([]) - self._trace.append(id) self._snapshots.append(TraceSnapShot(id, scenario, model, c_drought)) def push_partial_scenario(self, index, scenario, model): @@ -123,29 +124,28 @@ def push_partial_scenario(self, index, scenario, model): self._tried[-1].append(index) self._tried.append([]) self._open_refinements.append(index) - self._trace.append(id) self._snapshots.append(TraceSnapShot(id, scenario, model, self.coverage_drought)) def can_rewind(self): - return len(self._trace) > 0 + return len(self._snapshots) > 0 def rewind(self): - id = self._trace.pop() + id = self._snapshots[-1].id index = int(id.split('.')[0]) + self._snapshots.pop() if id.endswith('.0'): - self._snapshots.pop() + self.c_pool[index] -= 1 self._open_refinements.append(index) - while self._trace[-1] != f"{index}.1": + while self._snapshots[-1].id != f"{index}.1": self.rewind() return self.rewind() - self._snapshots.pop() - if '.' not in id or id.endswith('.1'): - if self.count(index) == 0: - self._c_pool[index] = False + if '.' not in id: + self._tried.pop() + self.c_pool[index] -= 1 + if id.endswith('.1'): self._tried.pop() - if id.endswith('.1'): - self._open_refinements.pop() + self._open_refinements.pop() return self._snapshots[-1] if self._snapshots else None def __iter__(self): diff --git a/utest/test_tracestate.py b/utest/test_tracestate.py index 5bfb63a3..66cc2dab 100644 --- a/utest/test_tracestate.py +++ b/utest/test_tracestate.py @@ -36,69 +36,72 @@ class TestTraceState(unittest.TestCase): def test_an_empty_tracestate_doesnt_do_so_much(self): - ts = TraceState(0) + ts = TraceState([]) self.assertIs(ts.next_candidate(), None) self.assertIs(ts.coverage_reached(), True) self.assertEqual(ts.get_trace(), []) self.assertIs(ts.can_rewind(), False) def test_completing_single_size_trace(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) self.assertIs(ts.coverage_reached(), False) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertIs(ts.coverage_reached(), True) self.assertEqual(ts.get_trace(), ['one']) def test_confirming_excludes_scenario_from_candidacy(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) + ts.confirm_full_scenario(1, 'one', {}) self.assertIs(ts.next_candidate(), None) def test_trying_excludes_scenario_from_candidacy(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) - ts.reject_scenario(0) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) + ts.reject_scenario(1) self.assertIs(ts.next_candidate(), None) def test_scenario_still_excluded_from_candidacy_after_rewind(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) + ts.confirm_full_scenario(1, 'one', {}) ts.rewind() self.assertIs(ts.next_candidate(), None) def test_candidates_come_in_order_when_accepted(self): - ts = TraceState(3) + ts = TraceState([10, 20, 30]) candidates = [] - for scenario in range(3): + for _ in range(3): candidates.append(ts.next_candidate()) - ts.confirm_full_scenario(candidates[-1], scenario, {}) + ts.confirm_full_scenario(candidates[-1], 'scenario', {}) candidates.append(ts.next_candidate()) - self.assertEqual(candidates, [0, 1, 2, None]) + self.assertEqual(candidates, [10, 20, 30, None]) + + def test_scenarios_must_be_uniquely_identifiable(self): + self.assertRaises(ValueError, TraceState, [1, 2, 3, 2]) def test_candidates_come_in_order_when_rejected(self): - ts = TraceState(3) + ts = TraceState([10, 20, 30]) candidates = [] for _ in range(3): candidates.append(ts.next_candidate()) ts.reject_scenario(candidates[-1]) candidates.append(ts.next_candidate()) - self.assertEqual(candidates, [0, 1, 2, None]) + self.assertEqual(candidates, [10, 20, 30, None]) def test_rejected_scenarios_are_candidates_for_new_positions(self): - ts = TraceState(3) + ts = TraceState([1, 2, 3]) candidates = [] - ts.reject_scenario(0) - for scenario in range(3): + ts.reject_scenario(1) + for _ in range(3): candidates.append(ts.next_candidate()) - ts.confirm_full_scenario(candidates[-1], scenario, {}) + ts.confirm_full_scenario(candidates[-1], 'scenario', {}) candidates.append(ts.next_candidate()) - self.assertEqual(candidates, [1, 0, 2, None]) + self.assertEqual(candidates, [2, 1, 3, None]) def test_previously_confirmed_scenarios_can_be_retried_if_no_new_candidates_exist(self): - ts = TraceState(3) + ts = TraceState(range(3)) first_candidate = ts.next_candidate(retry=True) ts.confirm_full_scenario(first_candidate, 'one', {}) ts.reject_scenario(ts.next_candidate(retry=True)) @@ -113,18 +116,18 @@ def test_previously_confirmed_scenarios_can_be_retried_if_no_new_candidates_exis self.assertEqual(ts.get_trace(), ['one', 'one', 'two', 'three']) def test_retry_can_continue_once_coverage_is_reached(self): - ts = TraceState(3) + ts = TraceState([1, 2, 3]) ts.confirm_full_scenario(ts.next_candidate(retry=True), 'one', {}) ts.confirm_full_scenario(ts.next_candidate(retry=True), 'two', {}) ts.confirm_full_scenario(ts.next_candidate(retry=True), 'three', {}) self.assertTrue(ts.coverage_reached()) - self.assertEqual(ts.next_candidate(retry=True), 0) - ts.reject_scenario(0) self.assertEqual(ts.next_candidate(retry=True), 1) + ts.reject_scenario(1) + self.assertEqual(ts.next_candidate(retry=True), 2) self.assertEqual(ts.next_candidate(retry=False), None) def test_count_scenario_repetitions(self): - ts = TraceState(2) + ts = TraceState([1, 2]) first = ts.next_candidate() self.assertEqual(ts.count(first), 0) ts.confirm_full_scenario(first, 'one', {}) @@ -133,8 +136,8 @@ def test_count_scenario_repetitions(self): self.assertEqual(ts.count(first), 2) def test_rewind_single_available_scenario(self): - ts = TraceState(1) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + ts.confirm_full_scenario(1, 'one', {}) self.assertIs(ts.coverage_reached(), True) self.assertIs(ts.can_rewind(), True) ts.rewind() @@ -144,38 +147,38 @@ def test_rewind_single_available_scenario(self): self.assertEqual(ts.get_trace(), []) def test_rewind_returns_none_after_rewinding_last_step(self): - ts = TraceState(1) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + ts.confirm_full_scenario(1, 'one', {}) self.assertIs(ts.rewind(), None) - def test_traces_can_have_multiple_sceanrios(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'foo', dict(a=1)) + def test_traces_can_have_multiple_scenarios(self): + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'foo', dict(a=1)) self.assertIs(ts.coverage_reached(), False) - ts.confirm_full_scenario(1, 'bar', dict(b=2)) + ts.confirm_full_scenario(2, 'bar', dict(b=2)) self.assertIs(ts.coverage_reached(), True) self.assertEqual(ts.get_trace(), ['foo', 'bar']) def test_rewind_returns_snapshot_of_the_step_before(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'foo', dict(a=1)) - ts.confirm_full_scenario(1, 'bar', dict(b=2)) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'foo', dict(a=1)) + ts.confirm_full_scenario(2, 'bar', dict(b=2)) tail = ts.rewind() - self.assertEqual(tail.id, '0') + self.assertEqual(tail.id, '1') self.assertEqual(tail.scenario, 'foo') self.assertEqual(tail.model, dict(a=1)) def test_completing_size_three_trace(self): - ts = TraceState(3) - ts.confirm_full_scenario(ts.next_candidate(), 1, {}) - ts.confirm_full_scenario(ts.next_candidate(), 2, {}) + ts = TraceState(range(3)) + ts.confirm_full_scenario(ts.next_candidate(), 'one', {}) + ts.confirm_full_scenario(ts.next_candidate(), 'two', {}) self.assertIs(ts.coverage_reached(), False) - ts.confirm_full_scenario(ts.next_candidate(), 3, {}) + ts.confirm_full_scenario(ts.next_candidate(), 'three', {}) self.assertIs(ts.coverage_reached(), True) - self.assertEqual(ts.get_trace(), [1, 2, 3]) + self.assertEqual(ts.get_trace(), ['one', 'two', 'three']) def test_completing_size_three_trace_after_reject(self): - ts = TraceState(3) + ts = TraceState(range(3)) first = ts.next_candidate() ts.confirm_full_scenario(first, first, {}) rejected = ts.next_candidate() @@ -190,7 +193,7 @@ def test_completing_size_three_trace_after_reject(self): self.assertEqual(ts.get_trace(), [first, third, second]) def test_completing_size_three_trace_after_rewind(self): - ts = TraceState(3) + ts = TraceState(range(3)) first = ts.next_candidate() ts.confirm_full_scenario(first, first, {}) reject2 = ts.next_candidate() @@ -211,16 +214,16 @@ def test_completing_size_three_trace_after_rewind(self): self.assertEqual(ts.get_trace(), [retry_first, retry_second, retry_third]) def test_highest_part_when_index_not_present(self): - ts = TraceState(1) - self.assertEqual(ts.highest_part(0), 0) + ts = TraceState([1]) + self.assertEqual(ts.highest_part(1), 0) def test_highest_part_for_non_partial_sceanrio(self): - ts = TraceState(1) - ts.confirm_full_scenario(0, 'one', {}) - self.assertEqual(ts.highest_part(0), 0) + ts = TraceState([1]) + ts.confirm_full_scenario(1, 'one', {}) + self.assertEqual(ts.highest_part(1), 0) def test_model_property_takes_model_from_tail(self): - ts = TraceState(2) + ts = TraceState(range(2)) ts.confirm_full_scenario(ts.next_candidate(), 'one', dict(a=1)) ts.confirm_full_scenario(ts.next_candidate(), 'two', dict(b=2)) self.assertEqual(ts.model, dict(b=2)) @@ -228,35 +231,35 @@ def test_model_property_takes_model_from_tail(self): self.assertEqual(ts.model, dict(a=1)) def test_no_model_from_empty_trace(self): - ts = TraceState(1) + ts = TraceState([1]) self.assertIs(ts.model, None) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertIsNotNone(ts.model) ts.rewind() self.assertIs(ts.model, None) def test_tried_property_starts_empty(self): - ts = TraceState(1) + ts = TraceState([1]) self.assertEqual(ts.tried, ()) def test_rejected_scenarios_are_tried(self): - ts = TraceState(1) - ts.reject_scenario(0) - self.assertEqual(ts.tried, (0,)) + ts = TraceState([1]) + ts.reject_scenario(1) + self.assertEqual(ts.tried, (1,)) def test_confirmed_scenario_is_tried_and_triggers_next_step(self): - ts = TraceState(1) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.tried, ()) ts.rewind() - self.assertEqual(ts.tried, (0,)) + self.assertEqual(ts.tried, (1,)) def test_can_iterate_over_tracestate_snapshots(self): - ts = TraceState(3) + ts = TraceState([1, 2, 3]) ts.confirm_full_scenario(ts.next_candidate(), 'one', dict(a=1)) ts.confirm_full_scenario(ts.next_candidate(), 'two', dict(b=2)) ts.confirm_full_scenario(ts.next_candidate(), 'three', dict(c=3)) - for act, exp in zip(ts, ['0', '1', '2']): + for act, exp in zip(ts, ['1', '2', '3']): self.assertEqual(act.id, exp) for act, exp in zip(ts, ['one', 'two', 'three']): self.assertEqual(act.scenario, exp) @@ -264,18 +267,18 @@ def test_can_iterate_over_tracestate_snapshots(self): self.assertEqual(act.model, exp) def test_can_index_tracestate_snapshots(self): - ts = TraceState(3) + ts = TraceState([1, 2, 3]) ts.confirm_full_scenario(ts.next_candidate(), 'one', dict(a=1)) ts.confirm_full_scenario(ts.next_candidate(), 'two', dict(b=2)) ts.confirm_full_scenario(ts.next_candidate(), 'three', dict(c=3)) - self.assertEqual(ts[0].id, '0') + self.assertEqual(ts[0].id, '1') self.assertEqual(ts[1].scenario, 'two') self.assertEqual(ts[2].model, dict(c=3)) self.assertEqual(ts[-1].scenario, 'three') - self.assertEqual([s.id for s in ts[1:]], ['1', '2']) + self.assertEqual([s.id for s in ts[1:]], ['2', '3']) def test_adding_coverage_prevents_drought(self): - ts = TraceState(3) + ts = TraceState(range(3)) ts.confirm_full_scenario(ts.next_candidate(), 'one', {}) self.assertEqual(ts.coverage_drought, 0) ts.confirm_full_scenario(ts.next_candidate(), 'two', {}) @@ -284,30 +287,30 @@ def test_adding_coverage_prevents_drought(self): self.assertEqual(ts.coverage_drought, 0) def test_repeated_scenarios_increases_drought(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 0) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 1) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 2) def test_drought_is_reset_with_new_coverage(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 0) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 1) - ts.confirm_full_scenario(1, 'two', {}) + ts.confirm_full_scenario(2, 'two', {}) self.assertEqual(ts.coverage_drought, 0) def test_rewind_includes_drought_update(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 0) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 1) - ts.confirm_full_scenario(1, 'two', {}) + ts.confirm_full_scenario(2, 'two', {}) self.assertEqual(ts.coverage_drought, 0) ts.rewind() self.assertEqual(ts.coverage_drought, 1) @@ -317,40 +320,40 @@ def test_rewind_includes_drought_update(self): class TestPartialScenarios(unittest.TestCase): def test_push_partial_does_not_complete_coverage(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertEqual(ts.get_trace(), ['part1']) self.assertIs(ts.coverage_reached(), False) def test_confirm_full_after_push_partial_completes_coverage(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertIs(ts.coverage_reached(), False) - ts.push_partial_scenario(0, 'part2', {}) + ts.push_partial_scenario(1, 'part2', {}) self.assertIs(ts.coverage_reached(), False) - ts.confirm_full_scenario(0, 'remainder', {}) + ts.confirm_full_scenario(1, 'remainder', {}) self.assertEqual(ts.get_trace(), ['part1', 'part2', 'remainder']) self.assertIs(ts.coverage_reached(), True) def test_scenario_unavailble_once_pushed_partial(self): - ts = TraceState(1) + ts = TraceState([1]) candidate = ts.next_candidate() ts.push_partial_scenario(candidate, 'part1', {}) self.assertIs(ts.next_candidate(), None) def test_rewind_of_single_part(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertEqual(ts.get_trace(), ['part1']) self.assertIs(ts.can_rewind(), True) ts.rewind() self.assertEqual(ts.get_trace(), []) def test_rewind_all_parts(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertIs(ts.coverage_reached(), False) - ts.push_partial_scenario(0, 'part2', {}) + ts.push_partial_scenario(1, 'part2', {}) self.assertIs(ts.coverage_reached(), False) self.assertEqual(ts.get_trace(), ['part1', 'part2']) self.assertIs(ts.next_candidate(), None) @@ -363,84 +366,84 @@ def test_rewind_all_parts(self): self.assertIs(ts.can_rewind(), False) def test_partial_scenario_still_excluded_from_candidacy_after_rewind(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) + ts.push_partial_scenario(1, 'part1', {}) ts.rewind() self.assertIs(ts.next_candidate(), None) def test_rewind_to_partial_scenario(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', dict(a=1)) - ts.push_partial_scenario(0, 'part2', dict(b=2)) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', dict(a=1)) + ts.push_partial_scenario(1, 'part2', dict(b=2)) snapshot = ts.rewind() - self.assertEqual(snapshot.id, '0.1') + self.assertEqual(snapshot.id, '1.1') self.assertEqual(snapshot.scenario, 'part1') self.assertEqual(snapshot.model, dict(a=1)) def test_rewind_last_part(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one', dict(a=1)) - ts.push_partial_scenario(1, 'part1', dict(b=2)) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one', dict(a=1)) + ts.push_partial_scenario(2, 'part1', dict(b=2)) snapshot = ts.rewind() self.assertEqual(ts.get_trace(), ['one']) - self.assertEqual(snapshot.id, '0') + self.assertEqual(snapshot.id, '1') self.assertEqual(snapshot.scenario, 'one') self.assertEqual(snapshot.model, dict(a=1)) def test_rewind_all_parts_of_completed_scenario_at_once(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', dict(a=1)) - ts.push_partial_scenario(0, 'part2', dict(b=2)) - ts.confirm_full_scenario(0, 'remainder', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', dict(a=1)) + ts.push_partial_scenario(1, 'part2', dict(b=2)) + ts.confirm_full_scenario(1, 'remainder', {}) tail = ts.rewind() self.assertEqual(ts.get_trace(), []) self.assertIs(ts.next_candidate(), None) self.assertIs(tail, None) def test_highest_part_after_first_part(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) - self.assertEqual(ts[-1].id, '0.1') - self.assertEqual(ts.highest_part(0), 1) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) + self.assertEqual(ts[-1].id, '1.1') + self.assertEqual(ts.highest_part(1), 1) def test_highest_part_after_multiple_parts(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) - ts.push_partial_scenario(0, 'part2', {}) - self.assertEqual(ts[-1].id, '0.2') - self.assertEqual(ts.highest_part(0), 2) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) + ts.push_partial_scenario(1, 'part2', {}) + self.assertEqual(ts[-1].id, '1.2') + self.assertEqual(ts.highest_part(1), 2) def test_highest_part_after_completing_multiple_parts(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) - ts.push_partial_scenario(0, 'part2', {}) - ts.confirm_full_scenario(0, 'remainder', {}) - self.assertEqual(ts[-1].id, '0.0') - self.assertEqual(ts.highest_part(0), 0) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) + ts.push_partial_scenario(1, 'part2', {}) + ts.confirm_full_scenario(1, 'remainder', {}) + self.assertEqual(ts[-1].id, '1.0') + self.assertEqual(ts.highest_part(1), 0) def test_highest_part_after_partial_rewind(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) - ts.push_partial_scenario(0, 'part2', {}) - self.assertEqual(ts.highest_part(0), 2) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) + ts.push_partial_scenario(1, 'part2', {}) + self.assertEqual(ts.highest_part(1), 2) ts.rewind() - self.assertEqual(ts.highest_part(0), 1) + self.assertEqual(ts.highest_part(1), 1) ts.rewind() - self.assertEqual(ts.highest_part(0), 0) + self.assertEqual(ts.highest_part(1), 0) def test_highest_part_is_0_when_no_refinement_is_ongoing(self): - ts = TraceState(1) - self.assertEqual(ts.highest_part(0), 0) - ts.push_partial_scenario(0, 'part1', {}) - ts.push_partial_scenario(0, 'part2', {}) - ts.confirm_full_scenario(0, 'remainder', {}) - self.assertEqual(ts.highest_part(0), 0) + ts = TraceState([1]) + self.assertEqual(ts.highest_part(1), 0) + ts.push_partial_scenario(1, 'part1', {}) + ts.push_partial_scenario(1, 'part2', {}) + ts.confirm_full_scenario(1, 'remainder', {}) + self.assertEqual(ts.highest_part(1), 0) ts.rewind() - self.assertEqual(ts.highest_part(0), 0) + self.assertEqual(ts.highest_part(1), 0) def test_count_scenario_repetitions_with_partials(self): - ts = TraceState(2) + ts = TraceState(range(2)) first = ts.next_candidate() self.assertEqual(ts.count(first), 0) ts.confirm_full_scenario(first, 'full', {}) @@ -458,36 +461,36 @@ def test_count_scenario_repetitions_with_partials(self): self.assertEqual(ts.count(second), 1) def test_partial_scenario_is_tried_without_finishing(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertEqual(ts.tried, ()) ts.rewind() - self.assertEqual(ts.tried, (0,)) + self.assertEqual(ts.tried, (1,)) def test_get_last_snapshot_by_index(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', dict(a=1)) - self.assertEqual(ts[-1].id, '0.1') + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', dict(a=1)) + self.assertEqual(ts[-1].id, '1.1') self.assertEqual(ts[-1].scenario, 'part1') self.assertEqual(ts[-1].model, dict(a=1)) self.assertEqual(ts[-1].coverage_drought, 0) - ts.push_partial_scenario(0, 'part2', dict(b=2)) - ts.confirm_full_scenario(0, 'remainder', dict(c=3)) - self.assertEqual(ts[-1].id, '0.0') + ts.push_partial_scenario(1, 'part2', dict(b=2)) + ts.confirm_full_scenario(1, 'remainder', dict(c=3)) + self.assertEqual(ts[-1].id, '1.0') self.assertEqual(ts[-1].scenario, 'remainder') self.assertEqual(ts[-1].model, dict(c=3)) self.assertEqual(ts[-1].coverage_drought, 0) def test_only_completed_scenarios_affect_drought(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one full', {}) - ts.push_partial_scenario(0, 'one part1', {}) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one full', {}) + ts.push_partial_scenario(1, 'one part1', {}) self.assertEqual(ts.coverage_drought, 0) - ts.confirm_full_scenario(0, 'one remainder', {}) + ts.confirm_full_scenario(1, 'one remainder', {}) self.assertEqual(ts.coverage_drought, 1) - ts.push_partial_scenario(1, 'two part1', {}) + ts.push_partial_scenario(2, 'two part1', {}) self.assertEqual(ts.coverage_drought, 1) - ts.confirm_full_scenario(1, 'two remainder', {}) + ts.confirm_full_scenario(2, 'two remainder', {}) self.assertEqual(ts.coverage_drought, 0) diff --git a/utest/test_tracestate_refinement.py b/utest/test_tracestate_refinement.py index 2cbdca61..57e2760a 100644 --- a/utest/test_tracestate_refinement.py +++ b/utest/test_tracestate_refinement.py @@ -36,7 +36,7 @@ class TestTraceStateRefinement(unittest.TestCase): def test_single_step_refinement(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate() @@ -49,7 +49,7 @@ def test_single_step_refinement(self): self.assertEqual(ts.get_trace(), ['T1.1', 'B1', 'T1.0']) def test_rewind_step_with_refinement(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate() @@ -62,7 +62,7 @@ def test_rewind_step_with_refinement(self): self.assertIs(ts.coverage_reached(), False) def test_rewind_refinement(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate() @@ -74,7 +74,7 @@ def test_rewind_refinement(self): self.assertIs(ts.coverage_reached(), False) def test_refinement_at_two_steps(self): - ts = TraceState(3) + ts = TraceState(range(3)) outer = ts.next_candidate() ts.push_partial_scenario(outer, 'T1.1', {}) ts.confirm_full_scenario(ts.next_candidate(), 'B1', {}) @@ -88,7 +88,7 @@ def test_refinement_at_two_steps(self): self.assertEqual(ts.get_trace(), ['T1.1', 'B1', 'T1.2', 'B2', 'T1.0']) def test_rewind_to_swap_refinements(self): - ts = TraceState(3) + ts = TraceState(range(3)) outer = ts.next_candidate() ts.push_partial_scenario(outer, 'T1.1', {}) inner1 = ts.next_candidate() @@ -112,7 +112,7 @@ def test_rewind_to_swap_refinements(self): self.assertEqual(ts.get_trace(), ['T1.1', 'B2', 'T1.2', 'B1', 'T1.0']) def test_rewind_partial_scenario_to_before_outer(self): - ts = TraceState(4) + ts = TraceState(range(4)) head = ts.next_candidate() ts.confirm_full_scenario(head, 'HEAD', {}) outer = ts.next_candidate() @@ -130,7 +130,7 @@ def test_rewind_partial_scenario_to_before_outer(self): self.assertNotIn(ts.next_candidate(), [head, outer]) def test_rewind_scenario_with_double_refinement_as_one(self): - ts = TraceState(4) + ts = TraceState(range(4)) head = ts.next_candidate() ts.confirm_full_scenario(head, 'HEAD', {}) outer = ts.next_candidate() @@ -148,7 +148,7 @@ def test_rewind_scenario_with_double_refinement_as_one(self): self.assertNotEqual(ts.next_candidate(), [head, outer]) def test_nested_refinement(self): - ts = TraceState(3) + ts = TraceState(range(3)) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) middle_level = ts.next_candidate() @@ -163,11 +163,11 @@ def test_nested_refinement(self): self.assertEqual(ts.get_trace(), ['T1.1', 'M1.1', 'B1', 'M1.0', 'T1.0']) def test_rewind_to_swap_nested_refinement(self): - ts = TraceState(3) + ts = TraceState(range(3)) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) lower_level = ts.next_candidate() - ts.push_partial_scenario(lower_level, 'B1', {}) + ts.push_partial_scenario(lower_level, 'B1.1', {}) middle_level = ts.next_candidate() ts.reject_scenario(middle_level) self.assertIs(ts.next_candidate(), None) @@ -183,7 +183,7 @@ def test_rewind_to_swap_nested_refinement(self): self.assertEqual(ts.get_trace(), ['T1.1', 'M1.1', 'B1', 'M1.0', 'T1.0']) def test_rewind_nested_refinement_as_one(self): - ts = TraceState(4) + ts = TraceState(range(4)) ts.confirm_full_scenario(ts.next_candidate(), 'HEAD', {}) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) @@ -199,7 +199,7 @@ def test_rewind_nested_refinement_as_one(self): self.assertEqual(ts.get_trace(), ['HEAD', 'T1.1']) def test_rewind_scenario_with_nested_refinement_as_one(self): - ts = TraceState(3) + ts = TraceState(range(3)) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) middle_level = ts.next_candidate() @@ -215,7 +215,7 @@ def test_rewind_scenario_with_nested_refinement_as_one(self): self.assertEqual(ts.tried, (top_level,)) def test_highest_parts_from_refined_scenario(self): - ts = TraceState(4) + ts = TraceState(range(4)) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) middle_level_1 = ts.next_candidate() @@ -245,7 +245,7 @@ def test_highest_parts_from_refined_scenario(self): 'T1.2', 'M2.1', 'M2.2', 'M2.3', 'M2.0', 'T1.0']) def test_refinement_can_resolve_drought(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.confirm_full_scenario(candidate1, 'T1', {}) ts.confirm_full_scenario(candidate1, 'T1', {}) @@ -261,7 +261,7 @@ def test_refinement_can_resolve_drought(self): self.assertEqual(ts.coverage_drought, 2) def test_scenario_cannot_refine_itself(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate() @@ -270,7 +270,7 @@ def test_scenario_cannot_refine_itself(self): self.assertIsNone(ts.next_candidate()) def test_scenario_cannot_refine_itself_with_repetition(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate(retry=True) ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate(retry=True) @@ -279,63 +279,63 @@ def test_scenario_cannot_refine_itself_with_repetition(self): self.assertIsNone(ts.next_candidate(retry=True)) def test_initially_no_scenario_is_in_refinement(self): - ts = TraceState(1) + ts = TraceState([1]) self.assertEqual(ts.find_scenarios_with_active_refinement(), []) def test_full_scenario_is_not_reported_as_refinement(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'S1', {}) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'S1', {}) self.assertEqual(ts.find_scenarios_with_active_refinement(), []) def test_push_partial_opens_refinement(self): - ts = TraceState(4) + ts = TraceState([1, 2]) ts.push_partial_scenario(0, 'S1.1', {}) self.assertEqual(ts.find_scenarios_with_active_refinement(), ['S1.1']) def test_nested_refinements_are_all_reported_as_in_refinement(self): - ts = TraceState(4) - ts.push_partial_scenario(0, 'T1.1', {}) - ts.push_partial_scenario(1, 'M1.1', {}) - ts.push_partial_scenario(2, 'B1.1', {}) + ts = TraceState([1, 2, 3, 4]) + ts.push_partial_scenario(1, 'T1.1', {}) + ts.push_partial_scenario(2, 'M1.1', {}) + ts.push_partial_scenario(3, 'B1.1', {}) self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1', 'B1.1']) def test_closing_refinement_removes_it_from_list(self): - ts = TraceState(4) - ts.push_partial_scenario(0, 'T1.1', {}) - ts.push_partial_scenario(1, 'M1.1', {}) - ts.push_partial_scenario(2, 'B1.1', {}) + ts = TraceState([1, 2, 3, 4]) + ts.push_partial_scenario(1, 'T1.1', {}) + ts.push_partial_scenario(2, 'M1.1', {}) + ts.push_partial_scenario(3, 'B1.1', {}) self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1', 'B1.1']) - ts.confirm_full_scenario(2, 'B1.0', {}) + ts.confirm_full_scenario(3, 'B1.0', {}) self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1']) def test_multi_step_refinement_is_reported_only_once(self): - ts = TraceState(4) - ts.push_partial_scenario(0, 'T1.1', {}) - ts.push_partial_scenario(1, 'M1.1', {}) - ts.confirm_full_scenario(2, 'B1', {}) - ts.push_partial_scenario(1, 'M1.2', {}) - ts.confirm_full_scenario(3, 'B2', {}) + ts = TraceState([1, 2, 3, 4]) + ts.push_partial_scenario(1, 'T1.1', {}) + ts.push_partial_scenario(2, 'M1.1', {}) + ts.confirm_full_scenario(3, 'B1', {}) + ts.push_partial_scenario(2, 'M1.2', {}) + ts.confirm_full_scenario(4, 'B2', {}) self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1']) def test_rewind_open_refinement_removes_it_from_list(self): - ts = TraceState(4) - ts.push_partial_scenario(0, 'T1.1', {}) - ts.push_partial_scenario(1, 'M1.1', {}) - ts.push_partial_scenario(2, 'B1.1', {}) + ts = TraceState([1, 2, 3, 4]) + ts.push_partial_scenario(1, 'T1.1', {}) + ts.push_partial_scenario(2, 'M1.1', {}) + ts.push_partial_scenario(3, 'B1.1', {}) self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1', 'B1.1']) ts.rewind() self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1']) def test_rewind_finished_scenario_with_refinement_removes_enclosed_refinements(self): - ts = TraceState(5) - ts.confirm_full_scenario(0, 'T1', {}) - ts.push_partial_scenario(1, 'T2.1', {}) - ts.push_partial_scenario(2, 'M1.1', {}) - ts.push_partial_scenario(3, 'B1.1', {}) - ts.confirm_full_scenario(4, 'S1', {}) + ts = TraceState([1, 2, 3, 4, 5]) + ts.confirm_full_scenario(1, 'T1', {}) + ts.push_partial_scenario(2, 'T2.1', {}) + ts.push_partial_scenario(3, 'M1.1', {}) + ts.push_partial_scenario(4, 'B1.1', {}) + ts.confirm_full_scenario(5, 'S1', {}) self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T2.1', 'M1.1', 'B1.1']) - ts.confirm_full_scenario(3, 'B1.0', {}) - ts.confirm_full_scenario(2, 'M1.0', {}) + ts.confirm_full_scenario(4, 'B1.0', {}) + ts.confirm_full_scenario(3, 'M1.0', {}) self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T2.1']) ts.rewind() # Middle including its Bottom refinement self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T2.1']) From 993407f9e22cbd89cd661c938dc029a556452ddc Mon Sep 17 00:00:00 2001 From: JFoederer <32476108+JFoederer@users.noreply.github.com> Date: Fri, 26 Dec 2025 15:58:50 +0100 Subject: [PATCH 2/9] keep original scenario list unshuffled --- robotmbt/suiteprocessors.py | 19 ++++++++----------- robotmbt/tracestate.py | 6 +++--- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index 72520279..e2d577a7 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -89,7 +89,8 @@ def process_test_suite(self, in_suite, *, seed='new'): "\n\t".join([f"{s.src_id}: {s.name}" for s in self.scenarios])) self._init_randomiser(seed) - random.shuffle(self.scenarios) + self.shuffled = [s.src_id for s in self.scenarios] + random.shuffle(self.shuffled) # Keep a single shuffle for all TraceStates (non-essential) # a short trace without the need for repeating scenarios is preferred self._try_to_reach_full_coverage(allow_duplicate_scenarios=False) @@ -105,7 +106,7 @@ def process_test_suite(self, in_suite, *, seed='new'): return self.out_suite def _try_to_reach_full_coverage(self, allow_duplicate_scenarios): - self.tracestate = TraceState(range(len(self.scenarios))) + self.tracestate = TraceState(self.shuffled) self.active_model = ModelSpace() while not self.tracestate.coverage_reached(): i_candidate = self.tracestate.next_candidate(retry=allow_duplicate_scenarios) @@ -142,7 +143,7 @@ def __last_candidate_changed_nothing(self): def _scenario_with_repeat_counter(self, index): """Fetches the scenario by index and, if this scenario is already used in the trace, adds a repetition counter to its name.""" - candidate = self.scenarios[index] + candidate = next(s for s in self.scenarios if s.src_id == index) rep_count = self.tracestate.count(index) if rep_count: candidate = candidate.copy() @@ -214,7 +215,8 @@ def _try_to_fit_in_scenario(self, index, candidate, retry_flag): else: logger.debug(f"Scenario did not meet refinement conditions {exit_conditions}") logger.debug(f"last state:\n{self.active_model.get_status_text()}") - logger.debug(f"Reconsidering {self.scenarios[i_refine].name}, scenario excluded") + scenario_name = next(s.name for s in self.scenarios if s.src_id == i_refine) + logger.debug(f"Reconsidering {scenario_name}, scenario excluded") self._rewind() self._report_tracestate_to_user() i_refine = self.tracestate.next_candidate(retry=retry_flag) @@ -413,13 +415,8 @@ def _parse_modifier_expression(expression, args): raise ValueError(f"Invalid argument substitution: {expression}") def _report_tracestate_to_user(self): - user_trace = "[" - for snapshot in self.tracestate: - part = f".{snapshot.id.split('.')[1]}" if '.' in snapshot.id else "" - user_trace += f"{snapshot.scenario.src_id}{part}, " - user_trace = user_trace[:-2] + "]" if ',' in user_trace else "[]" - reject_trace = [self.scenarios[i].src_id for i in self.tracestate.tried] - logger.debug(f"Trace: {user_trace} Reject: {reject_trace}") + user_trace = f"[{', '.join(self.tracestate.id_trace)}]" + logger.debug(f"Trace: {user_trace} Reject: {list(self.tracestate.tried)}") def _report_tracestate_wrapup(self): logger.info("Trace composed:") diff --git a/robotmbt/tracestate.py b/robotmbt/tracestate.py index 77b936ec..b00c5680 100644 --- a/robotmbt/tracestate.py +++ b/robotmbt/tracestate.py @@ -31,10 +31,10 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. class TraceState: - def __init__(self, scenario_indexes): - if len(scenario_indexes) != len(set(scenario_indexes)): - raise ValueError("Scenarios must be uniquely identifiable") + def __init__(self, scenario_indexes: list[int]): self.c_pool = {index: 0 for index in scenario_indexes} + if len(self.c_pool) != len(scenario_indexes): + raise ValueError("Scenarios must be uniquely identifiable") self._tried = [[]] # Keeps track of the scenarios already tried at each step in the trace self._snapshots = [] # Keeps details for elements in trace self._open_refinements = [] From bc96a306b98c453f97285e479698e264c94315ce Mon Sep 17 00:00:00 2001 From: JFoederer <32476108+JFoederer@users.noreply.github.com> Date: Sun, 28 Dec 2025 11:51:17 +0100 Subject: [PATCH 3/9] refactor processing to reach full coverage Open issue: rewind under refinement --- robotmbt/suiteprocessors.py | 345 +++++++++++++++++------------------- 1 file changed, 166 insertions(+), 179 deletions(-) diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index e2d577a7..7ad9f637 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -93,58 +93,75 @@ def process_test_suite(self, in_suite, *, seed='new'): random.shuffle(self.shuffled) # Keep a single shuffle for all TraceStates (non-essential) # a short trace without the need for repeating scenarios is preferred - self._try_to_reach_full_coverage(allow_duplicate_scenarios=False) + tracestate = self._try_to_reach_full_coverage(allow_duplicate_scenarios=False) - if not self.tracestate.coverage_reached(): + if not tracestate.coverage_reached(): logger.debug("Direct trace not available. Allowing repetition of scenarios") - self._try_to_reach_full_coverage(allow_duplicate_scenarios=True) - if not self.tracestate.coverage_reached(): + tracestate = self._try_to_reach_full_coverage(allow_duplicate_scenarios=True) + if not tracestate.coverage_reached(): raise Exception("Unable to compose a consistent suite") - self.out_suite.scenarios = self.tracestate.get_trace() - self._report_tracestate_wrapup() + self.out_suite.scenarios = tracestate.get_trace() + self._report_tracestate_wrapup(tracestate) return self.out_suite def _try_to_reach_full_coverage(self, allow_duplicate_scenarios): - self.tracestate = TraceState(self.shuffled) - self.active_model = ModelSpace() - while not self.tracestate.coverage_reached(): - i_candidate = self.tracestate.next_candidate(retry=allow_duplicate_scenarios) - if i_candidate is None: - if not self.tracestate.can_rewind(): + tracestate = TraceState(self.shuffled) + refinement_stack = [] + while not tracestate.coverage_reached(): + candidate_id = tracestate.next_candidate(retry=allow_duplicate_scenarios) + if candidate_id is None: # No more candidates remaining for this level + if not tracestate.can_rewind(): break - tail = self._rewind() - logger.debug("Having to roll back up to " - f"{tail.scenario.name if tail else 'the beginning'}") - self._report_tracestate_to_user() + tail = self._rewind(tracestate) + logger.debug(f"Having to roll back up to {tail.scenario.name if tail else 'the beginning'}") + self._report_tracestate_to_user(tracestate) else: - self.active_model.new_scenario_scope() - inserted = self._try_to_fit_in_scenario(i_candidate, self._scenario_with_repeat_counter(i_candidate), - retry_flag=allow_duplicate_scenarios) - if inserted: + candidate = self._select_scenario_variant(candidate_id, tracestate) + if not candidate: # No valid variant available in the current state + tracestate.reject_scenario(candidate_id) + continue + previous_len = len(tracestate) + self._try_to_fit_in_scenario(candidate, tracestate, refinement_stack) + if len(tracestate) > previous_len: self.DROUGHT_LIMIT = 50 - if self.__last_candidate_changed_nothing(): + if self.__last_candidate_changed_nothing(tracestate): logger.debug("Repeated scenario did not change the model's state. Stop trying.") - self._rewind() - elif self.tracestate.coverage_drought > self.DROUGHT_LIMIT: + self._rewind(tracestate) + elif tracestate.coverage_drought > self.DROUGHT_LIMIT: logger.debug(f"Went too long without new coverage (>{self.DROUGHT_LIMIT}x). " "Roll back to last coverage increase and try something else.") - self._rewind(drought_recovery=True) - self._report_tracestate_to_user() - logger.debug(f"last state:\n{self.active_model.get_status_text()}") + self._rewind(tracestate, drought_recovery=True) + self._report_tracestate_to_user(tracestate) + logger.debug(f"last state:\n{tracestate.model.get_status_text()}") + return tracestate - def __last_candidate_changed_nothing(self): - if len(self.tracestate) < 2: + @staticmethod + def __last_candidate_changed_nothing(tracestate): + if len(tracestate) < 2: return False - if self.tracestate[-1].id != self.tracestate[-2].id: + if tracestate[-1].id != tracestate[-2].id: return False - return self.tracestate[-1].model == self.tracestate[-2].model + return tracestate[-1].model == tracestate[-2].model + + def _select_scenario_variant(self, candidate_id, tracestate): + candidate = self._scenario_with_repeat_counter(candidate_id, tracestate) + scenarios_in_refinement = tracestate.find_scenarios_with_active_refinement() + if candidate_id in [s.src_id for s in scenarios_in_refinement]: + # reuse previous solution for all parts in split-up scenario + candidate = candidate.copy() + candidate.data_choices = scenarios_in_refinement[candidate_id].data_choices.copy() + else: + candidate = self._generate_scenario_variant(candidate, tracestate.model or ModelSpace()) + return candidate - def _scenario_with_repeat_counter(self, index): - """Fetches the scenario by index and, if this scenario is already used in the trace, - adds a repetition counter to its name.""" + def _scenario_with_repeat_counter(self, index, tracestate): + """ + Fetches the scenario by index and, if this scenario is already + used in the trace, adds a repetition counter to its name. + """ candidate = next(s for s in self.scenarios if s.src_id == index) - rep_count = self.tracestate.count(index) + rep_count = tracestate.count(index) if rep_count: candidate = candidate.copy() candidate.name = f"{candidate.name} (rep {rep_count+1})" @@ -159,129 +176,102 @@ def _fail_on_step_errors(suite): for s in error_list]) raise Exception(err_msg) - def _try_to_fit_in_scenario(self, index, candidate, retry_flag): - candidate = self._generate_scenario_variant(candidate, self.active_model) - if not candidate: - self.active_model.end_scenario_scope() - self.tracestate.reject_scenario(index) - self._report_tracestate_to_user() - return False - - confirmed_candidate, new_model = self._process_scenario(candidate, self.active_model) - if confirmed_candidate: - self.active_model = new_model - self.active_model.end_scenario_scope() - self.tracestate.confirm_full_scenario(index, confirmed_candidate, self.active_model) - logger.debug(f"Inserted scenario {confirmed_candidate.src_id}, {confirmed_candidate.name}") - self._report_tracestate_to_user() - logger.debug(f"last state:\n{self.active_model.get_status_text()}") - return True - - part1, part2 = self._split_candidate_if_refinement_needed(candidate, self.active_model) - if part2: - exit_conditions = part2.steps[1].model_info['OUT'] - part1.name = f"{part1.name} (part {self.tracestate.highest_part(index)+1})" - part1, new_model = self._process_scenario(part1, self.active_model) - self.tracestate.push_partial_scenario(index, part1, new_model) - self.active_model = new_model - self._report_tracestate_to_user() - logger.debug(f"last state:\n{self.active_model.get_status_text()}") - - i_refine = self.tracestate.next_candidate(retry=retry_flag) - if i_refine is None: - logger.debug("Refinement needed, but there are no scenarios left") - self._rewind() - self._report_tracestate_to_user() - return False - while i_refine is not None: - self.active_model.new_scenario_scope() - m_inserted = self._try_to_fit_in_scenario( - i_refine, self._scenario_with_repeat_counter(i_refine), retry_flag) - if m_inserted: - insert_valid_here = True - try: - # Check exit condition before finalizing refinement and inserting the tail part - model_scratchpad = self.active_model.copy() - for expr in exit_conditions: - if model_scratchpad.process_expression(expr, part2.steps[1].args) is False: - insert_valid_here = False - break - except Exception: - insert_valid_here = False - if insert_valid_here: - m_finished = self._try_to_fit_in_scenario(index, part2, retry_flag) - if m_finished: - return True - else: - logger.debug(f"Scenario did not meet refinement conditions {exit_conditions}") - logger.debug(f"last state:\n{self.active_model.get_status_text()}") - scenario_name = next(s.name for s in self.scenarios if s.src_id == i_refine) - logger.debug(f"Reconsidering {scenario_name}, scenario excluded") - self._rewind() - self._report_tracestate_to_user() - i_refine = self.tracestate.next_candidate(retry=retry_flag) - self._rewind() - self._report_tracestate_to_user() - return False - - self.active_model.end_scenario_scope() - self.tracestate.reject_scenario(index) - self._report_tracestate_to_user() - return False - - def _rewind(self, drought_recovery=False): - tail = self.tracestate.rewind() - while drought_recovery and self.tracestate.coverage_drought: - tail = self.tracestate.rewind() - self.active_model = self.tracestate.model or ModelSpace() - return tail + def _try_to_fit_in_scenario(self, candidate, tracestate, refinement_stack): + """ + Tries to insert the candidate scenario into the trace (in full or partial) and + updates tracestate and refinement_stack accordingly. + """ + model = tracestate.model if tracestate.model else ModelSpace() + model.new_scenario_scope() + inserted, remainder, new_model, extra_data = self._process_scenario(candidate, model) + if not inserted: # insertion failed + model.end_scenario_scope() # redundant?? + tracestate.reject_scenario(candidate.src_id) + logger.debug(extra_data['fail_msg']) + self._report_tracestate_to_user(tracestate) + elif not remainder: # the scenario processed in full + new_model.end_scenario_scope() + tracestate.confirm_full_scenario(inserted.src_id, inserted, new_model) + logger.debug(f"Inserted scenario {inserted.src_id}, {inserted.name}") + if refinement_stack: + self._handle_refinement_exit(inserted, tracestate, refinement_stack) + self._report_tracestate_to_user(tracestate) + logger.debug(f"last state:\n{tracestate.model.get_status_text()}") + else: # the scenario is split into two parts, ready for refinement + logger.debug(f"Partially inserted scenario {inserted.src_id}, {inserted.name}\n" + f"Refinement needed at step: {remainder.steps[1]}") + inserted.name = f"{inserted.name} (part {tracestate.highest_part(inserted.src_id)+1})" + tracestate.push_partial_scenario(inserted.src_id, inserted, new_model) + refinement_stack.append(remainder) + self._report_tracestate_to_user(tracestate) + logger.debug(f"last state:\n{new_model.get_status_text()}") + + def _handle_refinement_exit(self, inserted_refinement, tracestate, refinement_stack): + refinement_tail = refinement_stack.pop() + exit_conditions = refinement_tail.steps[1].model_info['OUT'] + exit_conditions_processed = False + for expr in exit_conditions: + try: + if tracestate.model.process_expression(expr, refinement_tail.steps[1].args) is False: + break + except Exception: + break + else: + exit_conditions_processed = True + + if not exit_conditions_processed: + self._rewind(tracestate) # Reject insterted scenario. Even though it fits, it is not a refinement. + refinement_stack.append(refinement_tail) + logger.debug(f"Reconsidering scenario {inserted_refinement.src_id}, {inserted_refinement.name}, " + f"did not meet refinement conditions: {exit_conditions}") + return + + tail_inserted, remainder, new_model, extra_data = self._process_scenario(refinement_tail, tracestate.model) + if not tail_inserted: + logger.debug(extra_data['fail_msg']) + # Confirm then rewind, to roll back complete scenario, including its refiements + # Because that exit check passed, this is an error in the refined scenario itself + tracestate.confirm_full_scenario(refinement_tail.src_id, refinement_tail, new_model) + tail = self._rewind(tracestate) + logger.debug(f"Having to roll back up to {tail.scenario.name if tail else 'the beginning'}") + elif not remainder: + new_model.end_scenario_scope() + tracestate.confirm_full_scenario(tail_inserted.src_id, tail_inserted, new_model) + logger.debug(f"Scenario '{tail_inserted.name}' completed after refinement") + if refinement_stack: + self._handle_refinement_exit(tail_inserted, tracestate, refinement_stack) + else: + logger.debug(f"Partially inserted remainder of scenario {tail_inserted.src_id}, {tail_inserted.name}\n" + f"refinement needed at step: {remainder.steps[1]}") + tail_inserted.name = f"{tail_inserted.name} (part {tracestate.highest_part(tail_inserted.src_id)+1})" + tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, new_model) + refinement_stack.append(remainder) @staticmethod - def _split_candidate_if_refinement_needed(scenario, model): - m = model.copy() - scenario = scenario.copy() - no_split = (scenario, None) - for step in scenario.steps: - if 'error' in step.model_info: - return no_split - if step.gherkin_kw in ['given', 'when', None]: - for expr in step.model_info.get('IN', []): - try: - if m.process_expression(expr, step.args) is False: - return no_split - except Exception: - return no_split - if step.gherkin_kw in ['when', 'then', None]: - for expr in step.model_info.get('OUT', []): - refine_here = False - try: - if m.process_expression(expr, step.args) is False: - if step.gherkin_kw in ['when', None]: - logger.debug(f"Refinement needed for scenario: {scenario.name}\nat step: {step}") - refine_here = True - else: - return no_split - except Exception: - return no_split - if refine_here: - front, back = scenario.split_at_step(scenario.steps.index(step)) - remaining_steps = '\n\t'.join([step.full_keyword, '- '*35] + - [s.full_keyword for s in back.steps[1:]]) - remaining_steps = SuiteProcessors.escape_robot_vars(remaining_steps) - edge_step = Step('Log', f"Refinement follows for step:\n\t{remaining_steps}", parent=scenario) - edge_step.gherkin_kw = step.gherkin_kw - edge_step.model_info = dict(IN=step.model_info['IN'], OUT=[]) - edge_step.detached = True - edge_step.args = StepArguments(step.args) - front.steps.append(edge_step) - back.steps.insert(0, Step('Log', f"Refinement ready, completing step", parent=scenario)) - back.steps[1] = back.steps[1].copy() - back.steps[1].model_info['IN'] = [] - return (front, back) - return no_split + def _split_for_refinement(scenario, step): + front, back = scenario.split_at_step(scenario.steps.index(step)) + remaining_steps = '\n\t'.join([step.full_keyword, '- '*35] + [s.full_keyword for s in back.steps[1:]]) + remaining_steps = SuiteProcessors._escape_robot_vars(remaining_steps) + edge_step = Step('Log', f"Refinement follows for step:\n\t{remaining_steps}", parent=scenario) + edge_step.gherkin_kw = step.gherkin_kw + edge_step.model_info = dict(IN=step.model_info['IN'], OUT=[]) + edge_step.detached = True + edge_step.args = StepArguments(step.args) + front.steps.append(edge_step) + back.steps.insert(0, Step('Log', f"Refinement ready, completing step", parent=scenario)) + back.steps[1] = back.steps[1].copy() + back.steps[1].model_info['IN'] = [] + return (front, back) + + def _rewind(self, tracestate, drought_recovery=False): + # Todo: Rewind needs to consider refinement stack. + tail = tracestate.rewind() + while drought_recovery and tracestate.coverage_drought: + tail = tracestate.rewind() + return tail @staticmethod - def escape_robot_vars(text): + def _escape_robot_vars(text): for seq in ("${", "@{", "%{", "&{", "*{"): text = text.replace(seq, "\\" + seq) return text @@ -289,20 +279,23 @@ def escape_robot_vars(text): @staticmethod def _process_scenario(scenario, model): m = model.copy() - scenario = scenario.copy() for step in scenario.steps: if 'error' in step.model_info: - logger.debug(f"Error in scenario {scenario.name} at step {step}: {step.model_info['error']}") - return None, None + return None, None, m, dict(fail_masg=f"Error in scenario {scenario.name} " + f"at step {step}: {step.model_info['error']}") for expr in SuiteProcessors._relevant_expressions(step): try: if m.process_expression(expr, step.args) is False: - raise Exception(False) + if step.gherkin_kw in ['when', None] and expr in step.model_info['OUT']: + part1, part2 = SuiteProcessors._split_for_refinement(scenario, step) + return part1, part2, m, dict() + else: + return None, None, m, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " + f"{scenario.name}, due to step '{step}': [{expr}] is False") except Exception as err: - logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, " - f"due to step '{step}': [{expr}] {err}") - return None, None - return scenario, m + return None, None, m, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, {scenario.name}," + f" due to step '{step}': [{expr}] {err}") + return scenario.copy(), None, m, dict() @staticmethod def _relevant_expressions(step): @@ -318,15 +311,7 @@ def _relevant_expressions(step): return expressions def _generate_scenario_variant(self, scenario, model): - m = model.copy() scenario = scenario.copy() - scenarios_in_refinement = self.tracestate.find_scenarios_with_active_refinement() - - # reuse previous solution for all parts in split-up scenario - for sir in scenarios_in_refinement: - if sir.src_id == scenario.src_id: - return scenario - # collect set of constraints subs = SubstitutionMap() try: @@ -347,7 +332,7 @@ def _generate_scenario_variant(self, scenario, model): if not constraint and org_example not in subs.substitutions: raise ValueError(f"No options to choose from at first assignment to {org_example}") if constraint and constraint != '.*': - options = m.process_expression(constraint, step.args) + options = model.process_expression(constraint, step.args) if options == 'exec': raise ValueError(f"Invalid constraint for argument substitution: {expr}") if not options: @@ -359,7 +344,7 @@ def _generate_scenario_variant(self, scenario, model): subs.substitute(org_example, options) elif step.args[modded_arg].kind == StepArgument.VAR_POS: if step.args[modded_arg].value: - modded_varargs = m.process_expression(constraint, step.args) + modded_varargs = model.process_expression(constraint, step.args) if not is_list_like(modded_varargs): raise ValueError(f"Modifying varargs must yield a list of arguments") # Varargs are not added to the substitution map, but are used directly as-is. A modifier can @@ -368,7 +353,7 @@ def _generate_scenario_variant(self, scenario, model): step.args[modded_arg].value = modded_varargs elif step.args[modded_arg].kind == StepArgument.FREE_NAMED: if step.args[modded_arg].value: - modded_free_args = m.process_expression(constraint, step.args) + modded_free_args = model.process_expression(constraint, step.args) if not isinstance(modded_free_args, dict): raise ValueError("Modifying free named arguments must yield a dict") # Similar to varargs, modified free named arguments are used directly as-is. @@ -414,15 +399,17 @@ def _parse_modifier_expression(expression, args): return var.arg, constraint raise ValueError(f"Invalid argument substitution: {expression}") - def _report_tracestate_to_user(self): - user_trace = f"[{', '.join(self.tracestate.id_trace)}]" - logger.debug(f"Trace: {user_trace} Reject: {list(self.tracestate.tried)}") + @staticmethod + def _report_tracestate_to_user(tracestate): + user_trace = f"[{', '.join(tracestate.id_trace)}]" + logger.debug(f"Trace: {user_trace} Reject: {list(tracestate.tried)}") - def _report_tracestate_wrapup(self): + @staticmethod + def _report_tracestate_wrapup(tracestate): logger.info("Trace composed:") - for step in self.tracestate: - logger.info(step.scenario.name) - logger.debug(f"model\n{step.model.get_status_text()}\n") + for progression in tracestate: + logger.info(progression.scenario.name) + logger.debug(f"model\n{progression.model.get_status_text()}\n") @staticmethod def _init_randomiser(seed): From 5ca5492bddef65aefcb6ef015d9a2b851c8f6449 Mon Sep 17 00:00:00 2001 From: JFoederer <32476108+JFoederer@users.noreply.github.com> Date: Tue, 30 Dec 2025 10:27:11 +0100 Subject: [PATCH 4/9] delegate refinement stack to TraceState --- .../08__repetition_caused_by_refinement.robot | 29 ++++++ ...trace.robot => 09__impossible_trace.robot} | 0 robotmbt/suiteprocessors.py | 55 +++++------ robotmbt/tracestate.py | 64 +++++++----- utest/test_tracestate_refinement.py | 98 ++++++++++++++++--- 5 files changed, 180 insertions(+), 66 deletions(-) create mode 100644 atest/robotMBT tests/05__repeating_scenarios/08__repetition_caused_by_refinement.robot rename atest/robotMBT tests/05__repeating_scenarios/{08__impossible_trace.robot => 09__impossible_trace.robot} (100%) diff --git a/atest/robotMBT tests/05__repeating_scenarios/08__repetition_caused_by_refinement.robot b/atest/robotMBT tests/05__repeating_scenarios/08__repetition_caused_by_refinement.robot new file mode 100644 index 00000000..14edf685 --- /dev/null +++ b/atest/robotMBT tests/05__repeating_scenarios/08__repetition_caused_by_refinement.robot @@ -0,0 +1,29 @@ +*** Settings *** +Documentation This suite has more low-level scenarios than high-level scenarios, +... meaning that the high-level scenario must be repeated in order for +... the second low-level scenario to be reached. +Suite Setup Treat this test suite Model-based +Resource ../../resources/birthday_cards_composed.resource +Library robotmbt + +*** Test Cases *** +Buying a card + When someone buys a birthday card + then there is a blank birthday card available + +high-level scenario + Given there is a birthday card + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + +low-level scenario A + Given there is a birthday card + when Someone writes their name in pen on the birthday card + then the birthday card has 'Someone' written on it + and there is text added in ink on the birthday card + +low-level scenario B + Given there is a birthday card + when Someone writes their name in pen on the birthday card + then the birthday card has 'Someone' written on it + and there is text added in ink on the birthday card diff --git a/atest/robotMBT tests/05__repeating_scenarios/08__impossible_trace.robot b/atest/robotMBT tests/05__repeating_scenarios/09__impossible_trace.robot similarity index 100% rename from atest/robotMBT tests/05__repeating_scenarios/08__impossible_trace.robot rename to atest/robotMBT tests/05__repeating_scenarios/09__impossible_trace.robot diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index 7ad9f637..11d02fc4 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -107,7 +107,6 @@ def process_test_suite(self, in_suite, *, seed='new'): def _try_to_reach_full_coverage(self, allow_duplicate_scenarios): tracestate = TraceState(self.shuffled) - refinement_stack = [] while not tracestate.coverage_reached(): candidate_id = tracestate.next_candidate(retry=allow_duplicate_scenarios) if candidate_id is None: # No more candidates remaining for this level @@ -122,7 +121,7 @@ def _try_to_reach_full_coverage(self, allow_duplicate_scenarios): tracestate.reject_scenario(candidate_id) continue previous_len = len(tracestate) - self._try_to_fit_in_scenario(candidate, tracestate, refinement_stack) + self._try_to_fit_in_scenario(candidate, tracestate) if len(tracestate) > previous_len: self.DROUGHT_LIMIT = 50 if self.__last_candidate_changed_nothing(tracestate): @@ -146,11 +145,10 @@ def __last_candidate_changed_nothing(tracestate): def _select_scenario_variant(self, candidate_id, tracestate): candidate = self._scenario_with_repeat_counter(candidate_id, tracestate) - scenarios_in_refinement = tracestate.find_scenarios_with_active_refinement() - if candidate_id in [s.src_id for s in scenarios_in_refinement]: + if candidate_id in tracestate.active_refinements: # reuse previous solution for all parts in split-up scenario candidate = candidate.copy() - candidate.data_choices = scenarios_in_refinement[candidate_id].data_choices.copy() + candidate.data_choices = tracestate.get_remainder(candidate_id).data_choices.copy() else: candidate = self._generate_scenario_variant(candidate, tracestate.model or ModelSpace()) return candidate @@ -176,16 +174,15 @@ def _fail_on_step_errors(suite): for s in error_list]) raise Exception(err_msg) - def _try_to_fit_in_scenario(self, candidate, tracestate, refinement_stack): + def _try_to_fit_in_scenario(self, candidate, tracestate): """ Tries to insert the candidate scenario into the trace (in full or partial) and - updates tracestate and refinement_stack accordingly. + updates tracestate accordingly. """ model = tracestate.model if tracestate.model else ModelSpace() model.new_scenario_scope() inserted, remainder, new_model, extra_data = self._process_scenario(candidate, model) if not inserted: # insertion failed - model.end_scenario_scope() # redundant?? tracestate.reject_scenario(candidate.src_id) logger.debug(extra_data['fail_msg']) self._report_tracestate_to_user(tracestate) @@ -193,21 +190,20 @@ def _try_to_fit_in_scenario(self, candidate, tracestate, refinement_stack): new_model.end_scenario_scope() tracestate.confirm_full_scenario(inserted.src_id, inserted, new_model) logger.debug(f"Inserted scenario {inserted.src_id}, {inserted.name}") - if refinement_stack: - self._handle_refinement_exit(inserted, tracestate, refinement_stack) + if tracestate.is_refinement_active(): + self._handle_refinement_exit(inserted, tracestate) self._report_tracestate_to_user(tracestate) logger.debug(f"last state:\n{tracestate.model.get_status_text()}") else: # the scenario is split into two parts, ready for refinement logger.debug(f"Partially inserted scenario {inserted.src_id}, {inserted.name}\n" f"Refinement needed at step: {remainder.steps[1]}") inserted.name = f"{inserted.name} (part {tracestate.highest_part(inserted.src_id)+1})" - tracestate.push_partial_scenario(inserted.src_id, inserted, new_model) - refinement_stack.append(remainder) + tracestate.push_partial_scenario(inserted.src_id, inserted, new_model, remainder) self._report_tracestate_to_user(tracestate) logger.debug(f"last state:\n{new_model.get_status_text()}") - def _handle_refinement_exit(self, inserted_refinement, tracestate, refinement_stack): - refinement_tail = refinement_stack.pop() + def _handle_refinement_exit(self, inserted_refinement, tracestate): + refinement_tail = tracestate.get_remainder(tracestate.active_refinements[-1]) exit_conditions = refinement_tail.steps[1].model_info['OUT'] exit_conditions_processed = False for expr in exit_conditions: @@ -221,7 +217,6 @@ def _handle_refinement_exit(self, inserted_refinement, tracestate, refinement_st if not exit_conditions_processed: self._rewind(tracestate) # Reject insterted scenario. Even though it fits, it is not a refinement. - refinement_stack.append(refinement_tail) logger.debug(f"Reconsidering scenario {inserted_refinement.src_id}, {inserted_refinement.name}, " f"did not meet refinement conditions: {exit_conditions}") return @@ -238,14 +233,13 @@ def _handle_refinement_exit(self, inserted_refinement, tracestate, refinement_st new_model.end_scenario_scope() tracestate.confirm_full_scenario(tail_inserted.src_id, tail_inserted, new_model) logger.debug(f"Scenario '{tail_inserted.name}' completed after refinement") - if refinement_stack: - self._handle_refinement_exit(tail_inserted, tracestate, refinement_stack) + if tracestate.is_refinement_active(): + self._handle_refinement_exit(tail_inserted, tracestate) else: logger.debug(f"Partially inserted remainder of scenario {tail_inserted.src_id}, {tail_inserted.name}\n" f"refinement needed at step: {remainder.steps[1]}") tail_inserted.name = f"{tail_inserted.name} (part {tracestate.highest_part(tail_inserted.src_id)+1})" - tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, new_model) - refinement_stack.append(remainder) + tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, new_model, remainder) @staticmethod def _split_for_refinement(scenario, step): @@ -264,7 +258,9 @@ def _split_for_refinement(scenario, step): return (front, back) def _rewind(self, tracestate, drought_recovery=False): - # Todo: Rewind needs to consider refinement stack. + if tracestate[-1].remainder and tracestate.highest_part(tracestate[-1].remainder.src_id) > 1: + # When rewinding an 'in between' part, rewind both the part and the refinement + tracestate.rewind() tail = tracestate.rewind() while drought_recovery and tracestate.coverage_drought: tail = tracestate.rewind() @@ -278,24 +274,23 @@ def _escape_robot_vars(text): @staticmethod def _process_scenario(scenario, model): - m = model.copy() for step in scenario.steps: if 'error' in step.model_info: - return None, None, m, dict(fail_masg=f"Error in scenario {scenario.name} " - f"at step {step}: {step.model_info['error']}") + return None, None, model, dict(fail_masg=f"Error in scenario {scenario.name} " + f"at step {step}: {step.model_info['error']}") for expr in SuiteProcessors._relevant_expressions(step): try: - if m.process_expression(expr, step.args) is False: + if model.process_expression(expr, step.args) is False: if step.gherkin_kw in ['when', None] and expr in step.model_info['OUT']: part1, part2 = SuiteProcessors._split_for_refinement(scenario, step) - return part1, part2, m, dict() + return part1, part2, model, dict() else: - return None, None, m, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " - f"{scenario.name}, due to step '{step}': [{expr}] is False") + return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " + f"{scenario.name}, due to step '{step}': [{expr}] is False") except Exception as err: - return None, None, m, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, {scenario.name}," - f" due to step '{step}': [{expr}] {err}") - return scenario.copy(), None, m, dict() + return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " + f"{scenario.name}, due to step '{step}': [{expr}] {err}") + return scenario.copy(), None, model, dict() @staticmethod def _relevant_expressions(step): diff --git a/robotmbt/tracestate.py b/robotmbt/tracestate.py index b00c5680..6b2ec035 100644 --- a/robotmbt/tracestate.py +++ b/robotmbt/tracestate.py @@ -49,9 +49,6 @@ def tried(self): """returns the indices that were rejected or previously inserted at the current position""" return tuple(self._tried[-1]) - def coverage_reached(self): - return all(self.c_pool.values()) - @property def coverage_drought(self): """Number of scenarios since last new coverage""" @@ -61,28 +58,39 @@ def coverage_drought(self): def id_trace(self): return [snap.id for snap in self._snapshots] + @property + def active_refinements(self): + return self._open_refinements[:] + + def coverage_reached(self): + return all(self.c_pool.values()) + def get_trace(self): return [snap.scenario for snap in self._snapshots] def next_candidate(self, retry=False): for i in self.c_pool: - if i not in self._tried[-1] and not self._is_refinement_active(i) and self.count(i) == 0: + if i not in self._tried[-1] and not self.is_refinement_active(i) and self.count(i) == 0: return i if not retry: return None for i in self.c_pool: - if i not in self._tried[-1] and not self._is_refinement_active(i): + if i not in self._tried[-1] and not self.is_refinement_active(i): return i return None def count(self, index): - """Count the number of times the index is present in the trace. - unfinished partial scenarios are excluded.""" + """ + Count the number of times the index is present in the trace. + unfinished partial scenarios are excluded. + """ return self.c_pool[index] def highest_part(self, index): - """Given the current trace and an index, returns the highest part number of an ongoing - refinement for the related scenario. Returns 0 when there is no refinement active.""" + """ + Given the current trace and an index, returns the highest part number of an ongoing + refinement for the related scenario. Returns 0 when there is no refinement active. + """ for i in range(1, len(self.id_trace)+1): if self.id_trace[-i] == f'{index}': return 0 @@ -90,15 +98,24 @@ def highest_part(self, index): return int(self.id_trace[-i].split('.')[1]) return 0 - def _is_refinement_active(self, index): - return self.highest_part(index) != 0 + def is_refinement_active(self, index=None): + """ + When called with an index, returns True if that scenario is currently being refined + When index is ommitted, return True if any refinement is active + """ + if index is None: + return self._open_refinements != [] + else: + return self.highest_part(index) != 0 - def find_scenarios_with_active_refinement(self): - scenarios = [] - for i in self._open_refinements: - index = -self.id_trace[::-1].index(f'{i}.1')-1 - scenarios.append(self._snapshots[index].scenario) - return scenarios + def get_remainder(self, index): + """ + When pushing a partial scenario, the remainder can be passed along for safe keeping. + This method retrieves the remainder for the last part that was pushed. + """ + last_part = self.highest_part(index) + index = -self.id_trace[::-1].index(f'{index}.{last_part}')-1 + return self._snapshots[index].remainder def reject_scenario(self, i_scenario): """Trying a scenario excludes it from further cadidacy on this level""" @@ -107,24 +124,24 @@ def reject_scenario(self, i_scenario): def confirm_full_scenario(self, index, scenario, model): c_drought = 0 if self.c_pool[index] == 0 else self.coverage_drought+1 self.c_pool[index] += 1 - if self._is_refinement_active(index): + if self.is_refinement_active(index): id = f"{index}.0" self._open_refinements.pop() else: id = str(index) self._tried[-1].append(index) self._tried.append([]) - self._snapshots.append(TraceSnapShot(id, scenario, model, c_drought)) + self._snapshots.append(TraceSnapShot(id, scenario, model, drought=c_drought)) - def push_partial_scenario(self, index, scenario, model): - if self._is_refinement_active(index): + def push_partial_scenario(self, index, scenario, model, remainder=None): + if self.is_refinement_active(index): id = f"{index}.{self.highest_part(index)+1}" else: id = f"{index}.1" self._tried[-1].append(index) self._tried.append([]) self._open_refinements.append(index) - self._snapshots.append(TraceSnapShot(id, scenario, model, self.coverage_drought)) + self._snapshots.append(TraceSnapShot(id, scenario, model, remainder, self.coverage_drought)) def can_rewind(self): return len(self._snapshots) > 0 @@ -159,9 +176,10 @@ def __len__(self): class TraceSnapShot: - def __init__(self, id, inserted_scenario, model_state, drought=0): + def __init__(self, id, inserted_scenario, model_state, remainder=None, drought=0): self.id = id self.scenario = inserted_scenario + self.remainder = remainder self._model = model_state.copy() self.coverage_drought = drought diff --git a/utest/test_tracestate_refinement.py b/utest/test_tracestate_refinement.py index 57e2760a..1b997e6b 100644 --- a/utest/test_tracestate_refinement.py +++ b/utest/test_tracestate_refinement.py @@ -280,33 +280,33 @@ def test_scenario_cannot_refine_itself_with_repetition(self): def test_initially_no_scenario_is_in_refinement(self): ts = TraceState([1]) - self.assertEqual(ts.find_scenarios_with_active_refinement(), []) + self.assertEqual(ts.active_refinements, []) def test_full_scenario_is_not_reported_as_refinement(self): ts = TraceState([1, 2]) ts.confirm_full_scenario(1, 'S1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), []) + self.assertEqual(ts.active_refinements, []) def test_push_partial_opens_refinement(self): ts = TraceState([1, 2]) - ts.push_partial_scenario(0, 'S1.1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['S1.1']) + ts.push_partial_scenario(1, 'S1.1', {}) + self.assertEqual(ts.active_refinements, [1]) def test_nested_refinements_are_all_reported_as_in_refinement(self): ts = TraceState([1, 2, 3, 4]) ts.push_partial_scenario(1, 'T1.1', {}) ts.push_partial_scenario(2, 'M1.1', {}) ts.push_partial_scenario(3, 'B1.1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1', 'B1.1']) + self.assertEqual(ts.active_refinements, [1, 2, 3]) def test_closing_refinement_removes_it_from_list(self): ts = TraceState([1, 2, 3, 4]) ts.push_partial_scenario(1, 'T1.1', {}) ts.push_partial_scenario(2, 'M1.1', {}) ts.push_partial_scenario(3, 'B1.1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1', 'B1.1']) + self.assertEqual(ts.active_refinements, [1, 2, 3]) ts.confirm_full_scenario(3, 'B1.0', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1']) + self.assertEqual(ts.active_refinements, [1, 2]) def test_multi_step_refinement_is_reported_only_once(self): ts = TraceState([1, 2, 3, 4]) @@ -315,16 +315,16 @@ def test_multi_step_refinement_is_reported_only_once(self): ts.confirm_full_scenario(3, 'B1', {}) ts.push_partial_scenario(2, 'M1.2', {}) ts.confirm_full_scenario(4, 'B2', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1']) + self.assertEqual(ts.active_refinements, [1, 2]) def test_rewind_open_refinement_removes_it_from_list(self): ts = TraceState([1, 2, 3, 4]) ts.push_partial_scenario(1, 'T1.1', {}) ts.push_partial_scenario(2, 'M1.1', {}) ts.push_partial_scenario(3, 'B1.1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1', 'B1.1']) + self.assertEqual(ts.active_refinements, [1, 2, 3]) ts.rewind() - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1']) + self.assertEqual(ts.active_refinements, [1, 2]) def test_rewind_finished_scenario_with_refinement_removes_enclosed_refinements(self): ts = TraceState([1, 2, 3, 4, 5]) @@ -333,12 +333,84 @@ def test_rewind_finished_scenario_with_refinement_removes_enclosed_refinements(s ts.push_partial_scenario(3, 'M1.1', {}) ts.push_partial_scenario(4, 'B1.1', {}) ts.confirm_full_scenario(5, 'S1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T2.1', 'M1.1', 'B1.1']) + self.assertEqual(ts.active_refinements, [2, 3, 4]) ts.confirm_full_scenario(4, 'B1.0', {}) ts.confirm_full_scenario(3, 'M1.0', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T2.1']) + self.assertEqual(ts.active_refinements, [2]) ts.rewind() # Middle including its Bottom refinement - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T2.1']) + self.assertEqual(ts.active_refinements, [2]) + + def test_is_refinement_active_no_index(self): + ts = TraceState([1, 2, 3]) + self.assertFalse(ts.is_refinement_active()) + ts.confirm_full_scenario(1, 'one', {}) + self.assertFalse(ts.is_refinement_active()) + ts.push_partial_scenario(2, 'part1', {}) + self.assertTrue(ts.is_refinement_active()) + ts.confirm_full_scenario(3, 'refinment', {}) + self.assertTrue(ts.is_refinement_active()) + ts.push_partial_scenario(2, 'part2', {}) + self.assertTrue(ts.is_refinement_active()) + ts.confirm_full_scenario(2, 'remainder', {}) + self.assertFalse(ts.is_refinement_active()) + ts.rewind() + self.assertFalse(ts.is_refinement_active()) + ts.rewind() + self.assertFalse(ts.is_refinement_active()) + + def test_is_refinement_active_by_index(self): + ts = TraceState([1, 2, 3]) + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + ts.confirm_full_scenario(1, 'one', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + ts.push_partial_scenario(2, 'part1', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + self.assertFalse(ts.is_refinement_active(3)) + ts.push_partial_scenario(3, 'refinement head', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + self.assertTrue(ts.is_refinement_active(3)) + ts.confirm_full_scenario(3, 'refinement tail', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + self.assertFalse(ts.is_refinement_active(3)) + ts.push_partial_scenario(2, 'part2', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + ts.rewind() + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + ts.confirm_full_scenario(2, 'remainder', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + ts.rewind() + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + ts.rewind() + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + + def test_remainder_can_be_set_and_retrieved(self): + ts = TraceState([1, 2]) + ts.push_partial_scenario(1, 'one part1', {}, 'one part2') + ts.push_partial_scenario(2, 'two part1', {}, 'two parts 2+3') + self.assertEqual(ts.get_remainder(1), 'one part2') + self.assertEqual(ts.get_remainder(2), 'two parts 2+3') + ts.push_partial_scenario(2, 'two part2', {}, 'two part3') + self.assertEqual(ts.get_remainder(2), 'two part3') + ts.rewind() + self.assertEqual(ts.get_remainder(2), 'two parts 2+3') + ts.push_partial_scenario(2, 'two part2', {}, 'two part3B') + self.assertEqual(ts.get_remainder(2), 'two part3B') + ts.confirm_full_scenario(2, 'two', {}) + self.assertEqual(ts.get_remainder(1), 'one part2') + self.assertIsNone(ts.get_remainder(2)) + ts.confirm_full_scenario(1, 'one', {}) + self.assertIsNone(ts.get_remainder(1)) + self.assertIsNone(ts.get_remainder(2)) if __name__ == '__main__': From d28694f2dd3d4a2695d06d139152ec8ad8697d54 Mon Sep 17 00:00:00 2001 From: JFoederer <32476108+JFoederer@users.noreply.github.com> Date: Wed, 31 Dec 2025 19:51:03 +0100 Subject: [PATCH 5/9] more tests for refinement and rewinds --- atest/resources/birthday_cards_flat.resource | 2 +- .../02__repetition_with_identity_bogey.robot | 4 +- .../09__impossible_trace.robot | 4 +- ..._reject_refinement_on_exit_condition.robot | 47 ++++++++++ .../11__reject_double_refinement.robot | 86 +++++++++++++++++++ robotmbt/suiteprocessors.py | 2 +- robotmbt/tracestate.py | 5 +- utest/test_tracestate.py | 19 ++++ 8 files changed, 160 insertions(+), 9 deletions(-) create mode 100644 atest/robotMBT tests/05__repeating_scenarios/10__reject_refinement_on_exit_condition.robot create mode 100644 atest/robotMBT tests/05__repeating_scenarios/11__reject_double_refinement.robot diff --git a/atest/resources/birthday_cards_flat.resource b/atest/resources/birthday_cards_flat.resource index 20dfabb8..d3976f70 100644 --- a/atest/resources/birthday_cards_flat.resource +++ b/atest/resources/birthday_cards_flat.resource @@ -60,7 +60,7 @@ ${person A} passes the birthday card ${back/on} to ${person B} Log Thank you ${person A}, I'll be sure to put my name on here. Set suite variable ${has_card} ${person B} -${person} refuses to write their name on the birthday card +${person} writes their name in invisible ink on the birthday card [Documentation] *model info* ... :IN: birthday_card ... :OUT: birthday_card diff --git a/atest/robotMBT tests/05__repeating_scenarios/02__repetition_with_identity_bogey.robot b/atest/robotMBT tests/05__repeating_scenarios/02__repetition_with_identity_bogey.robot index 964057eb..4b9b9cd3 100644 --- a/atest/robotMBT tests/05__repeating_scenarios/02__repetition_with_identity_bogey.robot +++ b/atest/robotMBT tests/05__repeating_scenarios/02__repetition_with_identity_bogey.robot @@ -19,10 +19,10 @@ Someone writes their name on the card when Someone writes their name on the birthday card then the birthday card has 'Someone' written on it -Refusing to share the birthday card +Signing the card in invisible ink Given there is a birthday card and the birthday card has 'Someone' written on it - when Johan refuses to write their name on the birthday card + when Johan writes their name in invisible ink on the birthday card then the birthday card has 'Someone' written on it but the birthday card does not have 'Johan' written on it diff --git a/atest/robotMBT tests/05__repeating_scenarios/09__impossible_trace.robot b/atest/robotMBT tests/05__repeating_scenarios/09__impossible_trace.robot index 01a1f2a9..99f976fe 100644 --- a/atest/robotMBT tests/05__repeating_scenarios/09__impossible_trace.robot +++ b/atest/robotMBT tests/05__repeating_scenarios/09__impossible_trace.robot @@ -11,9 +11,9 @@ Buying a card When someone buys a birthday card then there is a blank birthday card available -Refusing to sign the birthday card +Signing the card in invisible ink Given there is a birthday card - when everybody refuses to write their name on the birthday card + when everybody writes their name in invisible ink on the birthday card then the birthday card has 0 names written on it At least 42 people can write their name on the card diff --git a/atest/robotMBT tests/05__repeating_scenarios/10__reject_refinement_on_exit_condition.robot b/atest/robotMBT tests/05__repeating_scenarios/10__reject_refinement_on_exit_condition.robot new file mode 100644 index 00000000..59e423d7 --- /dev/null +++ b/atest/robotMBT tests/05__repeating_scenarios/10__reject_refinement_on_exit_condition.robot @@ -0,0 +1,47 @@ +*** Settings *** +Documentation This suite confirms that a scenario that can be inserted at the place +... of refinement based on its entry conditions, but afterwards does not +... satisfy the high-level scenario's exit conditions, is rejected. +Suite Setup Expect failing suite processing +Resource ../../resources/birthday_cards_flat.resource +Library robotmbt + + +*** Test Cases *** +Buying a card + When someone buys a birthday card + then there is a blank birthday card available + +high-level scenario + Given there is a birthday card + when Two people write their name on the birthday card + then the birthday card has 2 names written on it + +low-level scenario + Given there is a birthday card + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + + +*** Keywords *** +Two people write their name on the birthday card + [Documentation] + ... *model info* + ... :IN: scenario.count = len(birthday_card.names) + ... :OUT: len(birthday_card.names) == scenario.count+2 + Skip when unreachable + Length should be ${names} ${2} + +Expect failing suite processing + Run keyword and expect error Unable to compose* Treat this test suite Model-based + Set suite variable ${expected_error_detected} ${True} + +Skip when unreachable + [Documentation] + ... If the scenario is inserted after proper detection of the expected error, + ... then this keyword causes the remainder of the scenario to be skipped and + ... the test passes. When inserted without detected error, the scenario will + ... fail. + IF ${expected_error_detected} + Pass execution Accepting intentionally unreachable scenario + END diff --git a/atest/robotMBT tests/05__repeating_scenarios/11__reject_double_refinement.robot b/atest/robotMBT tests/05__repeating_scenarios/11__reject_double_refinement.robot new file mode 100644 index 00000000..d6063437 --- /dev/null +++ b/atest/robotMBT tests/05__repeating_scenarios/11__reject_double_refinement.robot @@ -0,0 +1,86 @@ +*** Settings *** +Documentation This suite covers a special case where an incomplete rollback inside a +... scenario which is split up for refinement, could cause two scenarios +... to be inserted for a single refinement. To get into that situation the +... high-level scenario has two steps that require refinement. The first +... one only checks that a certain name is inserted, it does not check the +... number of names. The second one checks that the total number of names +... is three. This should be an unreachable situation, because refinements +... are always a single scenario and each of the scenarios only inserts a +... single name. If the rollback of the middle part (2.2 in the trace) was +... incomplete, i.e. its rollback did not include the refinement scenario +... as well, then inserting the second low-level scenario would satisfy +... the first exit conditions and inserting another low-level scenario for +... the second refinement would satisfy exit conditions for both steps and +... incorrectly complete the suite. +Suite Setup Expect failing suite processing +Resource ../../resources/birthday_cards_flat.resource +Library robotmbt + + +*** Test Cases *** +Buying a card + When someone buys a birthday card + then there is a blank birthday card available + +high-level scenario + Given there is a birthday card + when The first person writes their name on the birthday card + and Two more people write their name on the birthday card + then the birthday card has 3 names written on it + +low-level scenario A + Given there is a birthday card + and we are in refinement + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + +low-level scenario B + Given there is a birthday card + and we are in refinement + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + +low-level scenario C + Given there is a birthday card + and we are in refinement + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + + +*** Keywords *** +The first person writes their name on the birthday card + [Documentation] *model info* + ... :IN: scenario.count = len(birthday_card.names) + ... :OUT: Someone in birthday_card.names + Skip when unreachable + Should Contain ${names} Someone + +Two more people write their name on the birthday card + [Documentation] *model info* + ... :IN: scenario.count + ... :OUT: len(birthday_card.names) == scenario.count+3 + Skip when unreachable + Length should be ${names} ${2} + +we are in refinement + [Documentation] Helper to prevent lower-level scenarios from being valid + ... at the top-level. (Better for performance) + ... *model info* + ... :IN: scenario.count + ... :OUT: None + No Operation + +Expect failing suite processing + Run keyword and expect error Unable to compose* Treat this test suite Model-based + Set suite variable ${expected_error_detected} ${True} + +Skip when unreachable + [Documentation] + ... If the scenario is inserted after proper detection of the expected error, + ... then this keyword causes the remainder of the scenario to be skipped and + ... the test passes. When inserted without detected error, the scenario will + ... fail. + IF ${expected_error_detected} + Pass execution Accepting intentionally unreachable scenario + END diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index 11d02fc4..3362e51f 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -218,7 +218,7 @@ def _handle_refinement_exit(self, inserted_refinement, tracestate): if not exit_conditions_processed: self._rewind(tracestate) # Reject insterted scenario. Even though it fits, it is not a refinement. logger.debug(f"Reconsidering scenario {inserted_refinement.src_id}, {inserted_refinement.name}, " - f"did not meet refinement conditions: {exit_conditions}") + f"did not meet refinement exit condition: {exit_conditions}") return tail_inserted, remainder, new_model, extra_data = self._process_scenario(refinement_tail, tracestate.model) diff --git a/robotmbt/tracestate.py b/robotmbt/tracestate.py index 6b2ec035..959db914 100644 --- a/robotmbt/tracestate.py +++ b/robotmbt/tracestate.py @@ -139,8 +139,8 @@ def push_partial_scenario(self, index, scenario, model, remainder=None): else: id = f"{index}.1" self._tried[-1].append(index) - self._tried.append([]) self._open_refinements.append(index) + self._tried.append([]) self._snapshots.append(TraceSnapShot(id, scenario, model, remainder, self.coverage_drought)) def can_rewind(self): @@ -157,11 +157,10 @@ def rewind(self): self.rewind() return self.rewind() + self._tried.pop() if '.' not in id: - self._tried.pop() self.c_pool[index] -= 1 if id.endswith('.1'): - self._tried.pop() self._open_refinements.pop() return self._snapshots[-1] if self._snapshots else None diff --git a/utest/test_tracestate.py b/utest/test_tracestate.py index 66cc2dab..77c1462a 100644 --- a/utest/test_tracestate.py +++ b/utest/test_tracestate.py @@ -401,6 +401,25 @@ def test_rewind_all_parts_of_completed_scenario_at_once(self): self.assertIs(ts.next_candidate(), None) self.assertIs(tail, None) + def test_tried_entries_after_rewind(self): + ts = TraceState([1, 2, 10, 11, 12, 20, 21]) + ts.push_partial_scenario(1, 'part1', {}) + ts.reject_scenario(10) + ts.reject_scenario(11) + ts.confirm_full_scenario(2, 'two', {}) + ts.push_partial_scenario(1, 'part2', {}) + ts.reject_scenario(20) + ts.reject_scenario(21) + self.assertEqual(ts.tried, (20, 21)) + ts.rewind() + self.assertEqual(ts.tried, ()) + ts.rewind() + self.assertEqual(ts.tried, (10, 11, 2)) + ts.reject_scenario(12) + self.assertEqual(ts.tried, (10, 11, 2, 12)) + ts.rewind() + self.assertEqual(ts.tried, (1,)) + def test_highest_part_after_first_part(self): ts = TraceState([1]) ts.push_partial_scenario(1, 'part1', {}) From 9e37724d9a6e2976a43ec488df59032c2f5dbe21 Mon Sep 17 00:00:00 2001 From: JFoederer <32476108+JFoederer@users.noreply.github.com> Date: Thu, 1 Jan 2026 18:28:46 +0100 Subject: [PATCH 6/9] new test for data consistency in split-up scenarios --- .../birthday_cards_data_variation.resource | 2 +- .../01__equivalence_partitioning.robot | 1 + ...equivalence_partitioning_double_data.robot | 1 + .../03__refinement_with_data_reuse.robot | 1 + .../04__refinement_with_data_fan_out.robot | 13 ++-- ...inement_with_data_fan_out_multi_part.robot | 64 +++++++++++++++++++ ...06__interacting_equivalence_classes.robot} | 1 + ...07__independent_equivalence_classes.robot} | 1 + ..._define_example_values_in_then_step.robot} | 1 + robotmbt/suiteprocessors.py | 7 +- 10 files changed, 79 insertions(+), 13 deletions(-) create mode 100644 atest/robotMBT tests/06__data_variation/05__refinement_with_data_fan_out_multi_part.robot rename atest/robotMBT tests/06__data_variation/{05__interacting_equivalence_classes.robot => 06__interacting_equivalence_classes.robot} (99%) rename atest/robotMBT tests/06__data_variation/{06__independent_equivalence_classes.robot => 07__independent_equivalence_classes.robot} (99%) rename atest/robotMBT tests/06__data_variation/{07__define_example_values_in_then_step.robot => 08__define_example_values_in_then_step.robot} (99%) diff --git a/atest/resources/birthday_cards_data_variation.resource b/atest/resources/birthday_cards_data_variation.resource index c3648b66..6096fa2d 100644 --- a/atest/resources/birthday_cards_data_variation.resource +++ b/atest/resources/birthday_cards_data_variation.resource @@ -61,7 +61,7 @@ the birthday card has ${n} different names written on it [Documentation] *model info* ... :IN: len(set(birthday_card.names)) == ${n} ... :OUT: len(set(birthday_card.names)) == ${n} - Length should be ${names} ${{int(${n})}} + Length should be ${{set(${names})}} ${{int(${n})}} ${person} signs the birthday card [Documentation] *model info* diff --git a/atest/robotMBT tests/06__data_variation/01__equivalence_partitioning.robot b/atest/robotMBT tests/06__data_variation/01__equivalence_partitioning.robot index 84cc4082..6410f36e 100644 --- a/atest/robotMBT tests/06__data_variation/01__equivalence_partitioning.robot +++ b/atest/robotMBT tests/06__data_variation/01__equivalence_partitioning.robot @@ -8,6 +8,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/02__equivalence_partitioning_double_data.robot b/atest/robotMBT tests/06__data_variation/02__equivalence_partitioning_double_data.robot index 332ab48c..ae9c0929 100644 --- a/atest/robotMBT tests/06__data_variation/02__equivalence_partitioning_double_data.robot +++ b/atest/robotMBT tests/06__data_variation/02__equivalence_partitioning_double_data.robot @@ -9,6 +9,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/03__refinement_with_data_reuse.robot b/atest/robotMBT tests/06__data_variation/03__refinement_with_data_reuse.robot index 52762ce1..4e47bb95 100644 --- a/atest/robotMBT tests/06__data_variation/03__refinement_with_data_reuse.robot +++ b/atest/robotMBT tests/06__data_variation/03__refinement_with_data_reuse.robot @@ -9,6 +9,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/04__refinement_with_data_fan_out.robot b/atest/robotMBT tests/06__data_variation/04__refinement_with_data_fan_out.robot index dc8da64d..c0b67eab 100644 --- a/atest/robotMBT tests/06__data_variation/04__refinement_with_data_fan_out.robot +++ b/atest/robotMBT tests/06__data_variation/04__refinement_with_data_fan_out.robot @@ -1,16 +1,17 @@ *** Settings *** Documentation This suite uses refinement and equivalence partitioning. The high-level scenario ... requires just a single key example from the equivalence class and uses just one -... concrete example. This scenario is then refined by two more detailed examples -... that use 2 different actors with specific characters. One is concise, the other -... a bit more elaborate. This implies that for at least one set of examples the -... high-level scenario's example value does not match the low-level scenario's -... example value. They must however still be matched, and kept identical, under -... refinement. +... concrete example. This scenario can be refined by two more detailed examples +... that use 2 different actors. This implies that one refinement example will match +... the high-level scenario's example, the other does not. To complete the trace, +... the high-level scenario must be repeated, once with each possible refinement. +... The example values between the high- and low-level scenarios must be matched, +... and kept identical, under refinement. Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/05__refinement_with_data_fan_out_multi_part.robot b/atest/robotMBT tests/06__data_variation/05__refinement_with_data_fan_out_multi_part.robot new file mode 100644 index 00000000..7f82d960 --- /dev/null +++ b/atest/robotMBT tests/06__data_variation/05__refinement_with_data_fan_out_multi_part.robot @@ -0,0 +1,64 @@ +*** Settings *** +Documentation This suite is an extension to the 'data fan out' suite, which needed refinement +... for just a single step. Here, the high-level scenario needs refinement in three +... of its steps. The background defines three actors and there are three refinement +... scenarios available. One for each actor. However, the high-level scenario only +... uses two of the actors, forcing the model to match up two of the steps and to +... never use the third option. The test fails if the model does not properly keep +... its data choices over all its steps when splitting the high-level scenario, and +... picks data independently in each step. +Suite Setup Treat this test suite Model-based +Resource ../../resources/birthday_cards_data_variation.resource +Library robotmbt + + +*** Test Cases *** +Background + Given Bahar is having their birthday + and Johan is a friend of Bahar + and Tannaz is a friend of Bahar + and Frederique is a friend of Bahar + When Johan buys a birthday card + then there is a blank birthday card available + +A friend signs the birthday card + Given there is a birthday card + when Johan signs the birthday card + and Tannaz signs the birthday card + and Johan signs the birthday card again + then the birthday card has a personal touch + and the birthday card has 2 different names written on it + +Signing the birthday card with your name only + Given there is a birthday card + and Johan is signing the birthday card + when Johan writes their name on the birthday card + then the birthday card has 'Johan' written on it + +Signing the birthday card with: Happy birthday! + Given there is a birthday card + and Tannaz is signing the birthday card + when Tannaz writes their name on the birthday card + and Tannaz adds the wish 'Happy birthday!' to the birthday card + then the birthday card has 'Tannaz' written on it + and the birthday card proclaims: Happy birthday! + +Signing the birthday card with: Cheers! + Given there is a birthday card + and Frederique is signing the birthday card + when Frederique writes their name on the birthday card + and Frederique adds the wish 'Cheers!' to the birthday card + then the birthday card has 'Frederique' written on it + and the birthday card proclaims: Cheers! + + +*** Keywords *** +${person} signs the birthday card again + [Documentation] Similar to '${person} signs the birthday card', but + ... without the check that the name is not already on there. + ... + ... *model info* + ... :MOD: ${person}= [guest for guest in party.guests] + ... :IN: scenario.guest = ${person} | scenario.count = len(birthday_card.names) + ... :OUT: len(birthday_card.names) == scenario.count+1 + Should contain ${names} ${person} diff --git a/atest/robotMBT tests/06__data_variation/05__interacting_equivalence_classes.robot b/atest/robotMBT tests/06__data_variation/06__interacting_equivalence_classes.robot similarity index 99% rename from atest/robotMBT tests/06__data_variation/05__interacting_equivalence_classes.robot rename to atest/robotMBT tests/06__data_variation/06__interacting_equivalence_classes.robot index 87a5fa15..2c8f0f7c 100644 --- a/atest/robotMBT tests/06__data_variation/05__interacting_equivalence_classes.robot +++ b/atest/robotMBT tests/06__data_variation/06__interacting_equivalence_classes.robot @@ -12,6 +12,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Johan is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/06__independent_equivalence_classes.robot b/atest/robotMBT tests/06__data_variation/07__independent_equivalence_classes.robot similarity index 99% rename from atest/robotMBT tests/06__data_variation/06__independent_equivalence_classes.robot rename to atest/robotMBT tests/06__data_variation/07__independent_equivalence_classes.robot index 4899c0b7..bba90197 100644 --- a/atest/robotMBT tests/06__data_variation/06__independent_equivalence_classes.robot +++ b/atest/robotMBT tests/06__data_variation/07__independent_equivalence_classes.robot @@ -12,6 +12,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/07__define_example_values_in_then_step.robot b/atest/robotMBT tests/06__data_variation/08__define_example_values_in_then_step.robot similarity index 99% rename from atest/robotMBT tests/06__data_variation/07__define_example_values_in_then_step.robot rename to atest/robotMBT tests/06__data_variation/08__define_example_values_in_then_step.robot index db589cb4..c31de190 100644 --- a/atest/robotMBT tests/06__data_variation/07__define_example_values_in_then_step.robot +++ b/atest/robotMBT tests/06__data_variation/08__define_example_values_in_then_step.robot @@ -15,6 +15,7 @@ Documentation This test suite focuses on the initialisation of model data fr Suite Setup Treat this test suite Model-based Library robotmbt + *** Test Cases *** Background Given Bahar is throwing a party for their friends diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index 3362e51f..605c75b9 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -145,12 +145,7 @@ def __last_candidate_changed_nothing(tracestate): def _select_scenario_variant(self, candidate_id, tracestate): candidate = self._scenario_with_repeat_counter(candidate_id, tracestate) - if candidate_id in tracestate.active_refinements: - # reuse previous solution for all parts in split-up scenario - candidate = candidate.copy() - candidate.data_choices = tracestate.get_remainder(candidate_id).data_choices.copy() - else: - candidate = self._generate_scenario_variant(candidate, tracestate.model or ModelSpace()) + candidate = self._generate_scenario_variant(candidate, tracestate.model or ModelSpace()) return candidate def _scenario_with_repeat_counter(self, index, tracestate): From 6507271fbdc270d2754d68f91141793bf033c48e Mon Sep 17 00:00:00 2001 From: JFoederer <32476108+JFoederer@users.noreply.github.com> Date: Fri, 2 Jan 2026 11:02:59 +0100 Subject: [PATCH 7/9] move scenario processing to its own file --- robotmbt/modeller.py | 260 ++++++++++++++++++++++++++++++++++++ robotmbt/suiteprocessors.py | 239 ++------------------------------- 2 files changed, 270 insertions(+), 229 deletions(-) create mode 100644 robotmbt/modeller.py diff --git a/robotmbt/modeller.py b/robotmbt/modeller.py new file mode 100644 index 00000000..cf8c36be --- /dev/null +++ b/robotmbt/modeller.py @@ -0,0 +1,260 @@ +# -*- coding: utf-8 -*- + +# BSD 3-Clause License +# +# Copyright (c) 2026, J. Foederer +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import Any + +from robot.api import logger +from robot.utils import is_list_like + +from .modelspace import ModelSpace +from .steparguments import StepArgument, StepArguments +from .substitutionmap import SubstitutionMap +from .suitedata import Scenario, Step +from .tracestate import TraceState + + +def try_to_fit_in_scenario(candidate: Scenario, tracestate: TraceState): + """ + Tries to insert the candidate scenario into the trace (in full or partial) and + updates tracestate accordingly. + """ + model = tracestate.model if tracestate.model else ModelSpace() + model.new_scenario_scope() + inserted, remainder, new_model, extra_data = process_scenario(candidate, model) + if not inserted: # insertion failed + tracestate.reject_scenario(candidate.src_id) + logger.debug(extra_data['fail_msg']) + elif not remainder: # the scenario processed in full + new_model.end_scenario_scope() + tracestate.confirm_full_scenario(inserted.src_id, inserted, new_model) + logger.debug(f"Inserted scenario {inserted.src_id}, {inserted.name}") + if tracestate.is_refinement_active(): + handle_refinement_exit(inserted, tracestate) + else: # the scenario is split into two parts, ready for refinement + logger.debug(f"Partially inserted scenario {inserted.src_id}, {inserted.name}\n" + f"Refinement needed at step: {remainder.steps[1]}") + inserted.name = f"{inserted.name} (part {tracestate.highest_part(inserted.src_id)+1})" + tracestate.push_partial_scenario(inserted.src_id, inserted, new_model, remainder) + + +def process_scenario(scenario: Scenario, model: ModelSpace) -> tuple[Scenario, Scenario, ModelSpace, dict[str, Any]]: + for step in scenario.steps: + if 'error' in step.model_info: + return None, None, model, dict(fail_masg=f"Error in scenario {scenario.name} " + f"at step {step}: {step.model_info['error']}") + for expr in _relevant_expressions(step): + try: + if model.process_expression(expr, step.args) is False: + if step.gherkin_kw in ['when', None] and expr in step.model_info['OUT']: + part1, part2 = split_for_refinement(scenario, step) + return part1, part2, model, dict() + else: + return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " + f"{scenario.name}, due to step '{step}': [{expr}] is False") + except Exception as err: + return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " + f"{scenario.name}, due to step '{step}': [{expr}] {err}") + return scenario.copy(), None, model, dict() + + +def _relevant_expressions(step: Step) -> list[str]: + if step.gherkin_kw is None and not step.model_info: + return [] # model info is optional for action keywords + expressions = [] + if 'IN' not in step.model_info or 'OUT' not in step.model_info: + raise Exception(f"Model info incomplete for step: {step}") + if step.gherkin_kw in ['given', 'when', None]: + expressions += step.model_info['IN'] + if step.gherkin_kw in ['when', 'then', None]: + expressions += step.model_info['OUT'] + return expressions + + +def split_for_refinement(scenario: Scenario, step: Step) -> tuple[Scenario, Scenario]: + front, back = scenario.split_at_step(scenario.steps.index(step)) + remaining_steps = '\n\t'.join([step.full_keyword, '- '*35] + [s.full_keyword for s in back.steps[1:]]) + remaining_steps = _escape_robot_vars(remaining_steps) + edge_step = Step('Log', f"Refinement follows for step:\n\t{remaining_steps}", parent=scenario) + edge_step.gherkin_kw = step.gherkin_kw + edge_step.model_info = dict(IN=step.model_info['IN'], OUT=[]) + edge_step.detached = True + edge_step.args = StepArguments(step.args) + front.steps.append(edge_step) + back.steps.insert(0, Step('Log', f"Refinement ready, completing step", parent=scenario)) + back.steps[1] = back.steps[1].copy() + back.steps[1].model_info['IN'] = [] + return (front, back) + + +def _escape_robot_vars(text: str) -> str: + for seq in ("${", "@{", "%{", "&{", "*{"): + text = text.replace(seq, "\\" + seq) + return text + + +def handle_refinement_exit(inserted_refinement: Scenario, tracestate: TraceState): + refinement_tail = tracestate.get_remainder(tracestate.active_refinements[-1]) + exit_conditions = refinement_tail.steps[1].model_info['OUT'] + exit_conditions_processed = False + for expr in exit_conditions: + try: + if tracestate.model.process_expression(expr, refinement_tail.steps[1].args) is False: + break + except Exception: + break + else: + exit_conditions_processed = True + + if not exit_conditions_processed: + rewind(tracestate) # Reject insterted scenario. Even though it fits, it is not a refinement. + logger.debug(f"Reconsidering scenario {inserted_refinement.src_id}, {inserted_refinement.name}, " + f"did not meet refinement exit condition: {exit_conditions}") + return + + tail_inserted, remainder, new_model, extra_data = process_scenario(refinement_tail, tracestate.model) + if not tail_inserted: + logger.debug(extra_data['fail_msg']) + # Confirm then rewind, to roll back complete scenario, including its refiements + # Because that exit check passed, this is an error in the refined scenario itself + tracestate.confirm_full_scenario(refinement_tail.src_id, refinement_tail, new_model) + tail = rewind(tracestate) + logger.debug(f"Having to roll back up to {tail.scenario.name if tail else 'the beginning'}") + elif not remainder: + new_model.end_scenario_scope() + tracestate.confirm_full_scenario(tail_inserted.src_id, tail_inserted, new_model) + logger.debug(f"Scenario '{tail_inserted.name}' completed after refinement") + if tracestate.is_refinement_active(): + handle_refinement_exit(tail_inserted, tracestate) + else: + logger.debug(f"Partially inserted remainder of scenario {tail_inserted.src_id}, {tail_inserted.name}\n" + f"refinement needed at step: {remainder.steps[1]}") + tail_inserted.name = f"{tail_inserted.name} (part {tracestate.highest_part(tail_inserted.src_id)+1})" + tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, new_model, remainder) + + +def generate_scenario_variant(scenario: Scenario, model: ModelSpace) -> Scenario: + scenario = scenario.copy() + # collect set of constraints + subs = SubstitutionMap() + try: + for step in scenario.steps: + for expr in step.model_info.get('MOD', []): + modded_arg, constraint = _parse_modifier_expression(expr, step.args) + if step.args[modded_arg].is_default: + continue + if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]: + org_example = step.args[modded_arg].org_value + if step.gherkin_kw == 'then': + constraint = None # No new constraints are processed for then-steps + if org_example not in subs.substitutions: + # if a then-step signals the first use of an example value, it is considered a new definition + subs.substitute(org_example, [org_example]) + continue + if not constraint and org_example not in subs.substitutions: + raise ValueError(f"No options to choose from at first assignment to {org_example}") + if constraint and constraint != '.*': + options = model.process_expression(constraint, step.args) + if options == 'exec': + raise ValueError(f"Invalid constraint for argument substitution: {expr}") + if not options: + raise ValueError(f"Constraint on modifer did not yield any options: {expr}") + if not is_list_like(options): + raise ValueError(f"Constraint on modifer did not yield a set of options: {expr}") + else: + options = None + subs.substitute(org_example, options) + elif step.args[modded_arg].kind == StepArgument.VAR_POS: + if step.args[modded_arg].value: + modded_varargs = model.process_expression(constraint, step.args) + if not is_list_like(modded_varargs): + raise ValueError(f"Modifying varargs must yield a list of arguments") + # Varargs are not added to the substitution map, but are used directly as-is. A modifier can + # change the number of arguments in the list, making it impossible to decide which values to + # match and which to drop and/or duplicate. + step.args[modded_arg].value = modded_varargs + elif step.args[modded_arg].kind == StepArgument.FREE_NAMED: + if step.args[modded_arg].value: + modded_free_args = model.process_expression(constraint, step.args) + if not isinstance(modded_free_args, dict): + raise ValueError("Modifying free named arguments must yield a dict") + # Similar to varargs, modified free named arguments are used directly as-is. + step.args[modded_arg].value = modded_free_args + else: + raise AssertionError(f"Unknown argument kind for {modded_arg}") + except Exception as err: + logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n" + f" In step {step}: {err}") + return None + + try: + subs.solve() + except ValueError as err: + logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n" + f" {err}: {subs}") + return None + + # Update scenario with generated values + if subs.solution: + logger.debug(f"Example variant generated with argument substitution: {subs}") + scenario.data_choices = subs + for step in scenario.steps: + if 'MOD' in step.model_info: + for expr in step.model_info['MOD']: + modded_arg, _ = _parse_modifier_expression(expr, step.args) + if step.args[modded_arg].is_default: + continue + org_example = step.args[modded_arg].org_value + if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]: + step.args[modded_arg].value = subs.solution[org_example] + return scenario + + +def _parse_modifier_expression(expression: str, args: tuple[str]) -> tuple[str, str]: + if expression.startswith('${'): + for var in args: + if expression.casefold().startswith(var.arg.casefold()): + assignment_expr = expression.replace(var.arg, '', 1).strip() + if not assignment_expr.startswith('=') or assignment_expr.startswith('=='): + break # not an assignment + constraint = assignment_expr.replace('=', '', 1).strip() + return var.arg, constraint + raise ValueError(f"Invalid argument substitution: {expression}") + + +def rewind(tracestate: TraceState, drought_recovery: bool = False) -> Scenario: + if tracestate[-1].remainder and tracestate.highest_part(tracestate[-1].remainder.src_id) > 1: + # When rewinding an 'in between' part, rewind both the part and the refinement + tracestate.rewind() + tail = tracestate.rewind() + while drought_recovery and tracestate.coverage_drought: + tail = tracestate.rewind() + return tail diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index 605c75b9..89b31cae 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -34,13 +34,11 @@ import random from robot.api import logger -from robot.utils import is_list_like -from .substitutionmap import SubstitutionMap +from . import modeller from .modelspace import ModelSpace -from .suitedata import Suite, Scenario, Step +from .suitedata import Suite from .tracestate import TraceState -from .steparguments import StepArgument, StepArguments class SuiteProcessors: @@ -112,7 +110,7 @@ def _try_to_reach_full_coverage(self, allow_duplicate_scenarios): if candidate_id is None: # No more candidates remaining for this level if not tracestate.can_rewind(): break - tail = self._rewind(tracestate) + tail = modeller.rewind(tracestate) logger.debug(f"Having to roll back up to {tail.scenario.name if tail else 'the beginning'}") self._report_tracestate_to_user(tracestate) else: @@ -121,16 +119,19 @@ def _try_to_reach_full_coverage(self, allow_duplicate_scenarios): tracestate.reject_scenario(candidate_id) continue previous_len = len(tracestate) - self._try_to_fit_in_scenario(candidate, tracestate) + modeller.try_to_fit_in_scenario(candidate, tracestate) + self._report_tracestate_to_user(tracestate) + if tracestate.model: + logger.debug(f"last state:\n{tracestate.model.get_status_text()}") if len(tracestate) > previous_len: self.DROUGHT_LIMIT = 50 if self.__last_candidate_changed_nothing(tracestate): logger.debug("Repeated scenario did not change the model's state. Stop trying.") - self._rewind(tracestate) + modeller.rewind(tracestate) elif tracestate.coverage_drought > self.DROUGHT_LIMIT: logger.debug(f"Went too long without new coverage (>{self.DROUGHT_LIMIT}x). " "Roll back to last coverage increase and try something else.") - self._rewind(tracestate, drought_recovery=True) + modeller.rewind(tracestate, drought_recovery=True) self._report_tracestate_to_user(tracestate) logger.debug(f"last state:\n{tracestate.model.get_status_text()}") return tracestate @@ -145,7 +146,7 @@ def __last_candidate_changed_nothing(tracestate): def _select_scenario_variant(self, candidate_id, tracestate): candidate = self._scenario_with_repeat_counter(candidate_id, tracestate) - candidate = self._generate_scenario_variant(candidate, tracestate.model or ModelSpace()) + candidate = modeller.generate_scenario_variant(candidate, tracestate.model or ModelSpace()) return candidate def _scenario_with_repeat_counter(self, index, tracestate): @@ -169,226 +170,6 @@ def _fail_on_step_errors(suite): for s in error_list]) raise Exception(err_msg) - def _try_to_fit_in_scenario(self, candidate, tracestate): - """ - Tries to insert the candidate scenario into the trace (in full or partial) and - updates tracestate accordingly. - """ - model = tracestate.model if tracestate.model else ModelSpace() - model.new_scenario_scope() - inserted, remainder, new_model, extra_data = self._process_scenario(candidate, model) - if not inserted: # insertion failed - tracestate.reject_scenario(candidate.src_id) - logger.debug(extra_data['fail_msg']) - self._report_tracestate_to_user(tracestate) - elif not remainder: # the scenario processed in full - new_model.end_scenario_scope() - tracestate.confirm_full_scenario(inserted.src_id, inserted, new_model) - logger.debug(f"Inserted scenario {inserted.src_id}, {inserted.name}") - if tracestate.is_refinement_active(): - self._handle_refinement_exit(inserted, tracestate) - self._report_tracestate_to_user(tracestate) - logger.debug(f"last state:\n{tracestate.model.get_status_text()}") - else: # the scenario is split into two parts, ready for refinement - logger.debug(f"Partially inserted scenario {inserted.src_id}, {inserted.name}\n" - f"Refinement needed at step: {remainder.steps[1]}") - inserted.name = f"{inserted.name} (part {tracestate.highest_part(inserted.src_id)+1})" - tracestate.push_partial_scenario(inserted.src_id, inserted, new_model, remainder) - self._report_tracestate_to_user(tracestate) - logger.debug(f"last state:\n{new_model.get_status_text()}") - - def _handle_refinement_exit(self, inserted_refinement, tracestate): - refinement_tail = tracestate.get_remainder(tracestate.active_refinements[-1]) - exit_conditions = refinement_tail.steps[1].model_info['OUT'] - exit_conditions_processed = False - for expr in exit_conditions: - try: - if tracestate.model.process_expression(expr, refinement_tail.steps[1].args) is False: - break - except Exception: - break - else: - exit_conditions_processed = True - - if not exit_conditions_processed: - self._rewind(tracestate) # Reject insterted scenario. Even though it fits, it is not a refinement. - logger.debug(f"Reconsidering scenario {inserted_refinement.src_id}, {inserted_refinement.name}, " - f"did not meet refinement exit condition: {exit_conditions}") - return - - tail_inserted, remainder, new_model, extra_data = self._process_scenario(refinement_tail, tracestate.model) - if not tail_inserted: - logger.debug(extra_data['fail_msg']) - # Confirm then rewind, to roll back complete scenario, including its refiements - # Because that exit check passed, this is an error in the refined scenario itself - tracestate.confirm_full_scenario(refinement_tail.src_id, refinement_tail, new_model) - tail = self._rewind(tracestate) - logger.debug(f"Having to roll back up to {tail.scenario.name if tail else 'the beginning'}") - elif not remainder: - new_model.end_scenario_scope() - tracestate.confirm_full_scenario(tail_inserted.src_id, tail_inserted, new_model) - logger.debug(f"Scenario '{tail_inserted.name}' completed after refinement") - if tracestate.is_refinement_active(): - self._handle_refinement_exit(tail_inserted, tracestate) - else: - logger.debug(f"Partially inserted remainder of scenario {tail_inserted.src_id}, {tail_inserted.name}\n" - f"refinement needed at step: {remainder.steps[1]}") - tail_inserted.name = f"{tail_inserted.name} (part {tracestate.highest_part(tail_inserted.src_id)+1})" - tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, new_model, remainder) - - @staticmethod - def _split_for_refinement(scenario, step): - front, back = scenario.split_at_step(scenario.steps.index(step)) - remaining_steps = '\n\t'.join([step.full_keyword, '- '*35] + [s.full_keyword for s in back.steps[1:]]) - remaining_steps = SuiteProcessors._escape_robot_vars(remaining_steps) - edge_step = Step('Log', f"Refinement follows for step:\n\t{remaining_steps}", parent=scenario) - edge_step.gherkin_kw = step.gherkin_kw - edge_step.model_info = dict(IN=step.model_info['IN'], OUT=[]) - edge_step.detached = True - edge_step.args = StepArguments(step.args) - front.steps.append(edge_step) - back.steps.insert(0, Step('Log', f"Refinement ready, completing step", parent=scenario)) - back.steps[1] = back.steps[1].copy() - back.steps[1].model_info['IN'] = [] - return (front, back) - - def _rewind(self, tracestate, drought_recovery=False): - if tracestate[-1].remainder and tracestate.highest_part(tracestate[-1].remainder.src_id) > 1: - # When rewinding an 'in between' part, rewind both the part and the refinement - tracestate.rewind() - tail = tracestate.rewind() - while drought_recovery and tracestate.coverage_drought: - tail = tracestate.rewind() - return tail - - @staticmethod - def _escape_robot_vars(text): - for seq in ("${", "@{", "%{", "&{", "*{"): - text = text.replace(seq, "\\" + seq) - return text - - @staticmethod - def _process_scenario(scenario, model): - for step in scenario.steps: - if 'error' in step.model_info: - return None, None, model, dict(fail_masg=f"Error in scenario {scenario.name} " - f"at step {step}: {step.model_info['error']}") - for expr in SuiteProcessors._relevant_expressions(step): - try: - if model.process_expression(expr, step.args) is False: - if step.gherkin_kw in ['when', None] and expr in step.model_info['OUT']: - part1, part2 = SuiteProcessors._split_for_refinement(scenario, step) - return part1, part2, model, dict() - else: - return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " - f"{scenario.name}, due to step '{step}': [{expr}] is False") - except Exception as err: - return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " - f"{scenario.name}, due to step '{step}': [{expr}] {err}") - return scenario.copy(), None, model, dict() - - @staticmethod - def _relevant_expressions(step): - if step.gherkin_kw is None and not step.model_info: - return [] # model info is optional for action keywords - expressions = [] - if 'IN' not in step.model_info or 'OUT' not in step.model_info: - raise Exception(f"Model info incomplete for step: {step}") - if step.gherkin_kw in ['given', 'when', None]: - expressions += step.model_info['IN'] - if step.gherkin_kw in ['when', 'then', None]: - expressions += step.model_info['OUT'] - return expressions - - def _generate_scenario_variant(self, scenario, model): - scenario = scenario.copy() - # collect set of constraints - subs = SubstitutionMap() - try: - for step in scenario.steps: - if 'MOD' in step.model_info: - for expr in step.model_info['MOD']: - modded_arg, constraint = self._parse_modifier_expression(expr, step.args) - if step.args[modded_arg].is_default: - continue - if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]: - org_example = step.args[modded_arg].org_value - if step.gherkin_kw == 'then': - constraint = None # No new constraints are processed for then-steps - if org_example not in subs.substitutions: - # if a then-step signals the first use of an example value, it is considered a new definition - subs.substitute(org_example, [org_example]) - continue - if not constraint and org_example not in subs.substitutions: - raise ValueError(f"No options to choose from at first assignment to {org_example}") - if constraint and constraint != '.*': - options = model.process_expression(constraint, step.args) - if options == 'exec': - raise ValueError(f"Invalid constraint for argument substitution: {expr}") - if not options: - raise ValueError(f"Constraint on modifer did not yield any options: {expr}") - if not is_list_like(options): - raise ValueError(f"Constraint on modifer did not yield a set of options: {expr}") - else: - options = None - subs.substitute(org_example, options) - elif step.args[modded_arg].kind == StepArgument.VAR_POS: - if step.args[modded_arg].value: - modded_varargs = model.process_expression(constraint, step.args) - if not is_list_like(modded_varargs): - raise ValueError(f"Modifying varargs must yield a list of arguments") - # Varargs are not added to the substitution map, but are used directly as-is. A modifier can - # change the number of arguments in the list, making it impossible to decide which values to - # match and which to drop and/or duplicate. - step.args[modded_arg].value = modded_varargs - elif step.args[modded_arg].kind == StepArgument.FREE_NAMED: - if step.args[modded_arg].value: - modded_free_args = model.process_expression(constraint, step.args) - if not isinstance(modded_free_args, dict): - raise ValueError("Modifying free named arguments must yield a dict") - # Similar to varargs, modified free named arguments are used directly as-is. - step.args[modded_arg].value = modded_free_args - else: - raise AssertionError(f"Unknown argument kind for {modded_arg}") - except Exception as err: - logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n" - f" In step {step}: {err}") - return None - - try: - subs.solve() - except ValueError as err: - logger.debug( - f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n {err}: {subs}") - return None - - # Update scenario with generated values - if subs.solution: - logger.debug(f"Example variant generated with argument substitution: {subs}") - scenario.data_choices = subs - for step in scenario.steps: - if 'MOD' in step.model_info: - for expr in step.model_info['MOD']: - modded_arg, _ = self._parse_modifier_expression(expr, step.args) - if step.args[modded_arg].is_default: - continue - org_example = step.args[modded_arg].org_value - if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]: - step.args[modded_arg].value = subs.solution[org_example] - return scenario - - @staticmethod - def _parse_modifier_expression(expression, args): - if expression.startswith('${'): - for var in args: - if expression.casefold().startswith(var.arg.casefold()): - assignment_expr = expression.replace(var.arg, '', 1).strip() - if not assignment_expr.startswith('=') or assignment_expr.startswith('=='): - break # not an assignment - constraint = assignment_expr.replace('=', '', 1).strip() - return var.arg, constraint - raise ValueError(f"Invalid argument substitution: {expression}") - @staticmethod def _report_tracestate_to_user(tracestate): user_trace = f"[{', '.join(tracestate.id_trace)}]" From 1e3f3f97b96cd14698d875fde99eca68d5221e57 Mon Sep 17 00:00:00 2001 From: JFoederer <32476108+JFoederer@users.noreply.github.com> Date: Fri, 2 Jan 2026 11:44:09 +0100 Subject: [PATCH 8/9] remove redundant return value (model) --- robotmbt/modeller.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/robotmbt/modeller.py b/robotmbt/modeller.py index cf8c36be..6b793557 100644 --- a/robotmbt/modeller.py +++ b/robotmbt/modeller.py @@ -49,13 +49,13 @@ def try_to_fit_in_scenario(candidate: Scenario, tracestate: TraceState): """ model = tracestate.model if tracestate.model else ModelSpace() model.new_scenario_scope() - inserted, remainder, new_model, extra_data = process_scenario(candidate, model) + inserted, remainder, extra_data = process_scenario(candidate, model) if not inserted: # insertion failed tracestate.reject_scenario(candidate.src_id) logger.debug(extra_data['fail_msg']) elif not remainder: # the scenario processed in full - new_model.end_scenario_scope() - tracestate.confirm_full_scenario(inserted.src_id, inserted, new_model) + model.end_scenario_scope() + tracestate.confirm_full_scenario(inserted.src_id, inserted, model) logger.debug(f"Inserted scenario {inserted.src_id}, {inserted.name}") if tracestate.is_refinement_active(): handle_refinement_exit(inserted, tracestate) @@ -63,27 +63,27 @@ def try_to_fit_in_scenario(candidate: Scenario, tracestate: TraceState): logger.debug(f"Partially inserted scenario {inserted.src_id}, {inserted.name}\n" f"Refinement needed at step: {remainder.steps[1]}") inserted.name = f"{inserted.name} (part {tracestate.highest_part(inserted.src_id)+1})" - tracestate.push_partial_scenario(inserted.src_id, inserted, new_model, remainder) + tracestate.push_partial_scenario(inserted.src_id, inserted, model, remainder) -def process_scenario(scenario: Scenario, model: ModelSpace) -> tuple[Scenario, Scenario, ModelSpace, dict[str, Any]]: +def process_scenario(scenario: Scenario, model: ModelSpace) -> tuple[Scenario, Scenario, dict[str, Any]]: for step in scenario.steps: if 'error' in step.model_info: - return None, None, model, dict(fail_masg=f"Error in scenario {scenario.name} " - f"at step {step}: {step.model_info['error']}") + return None, None, dict(fail_masg=f"Error in scenario {scenario.name} " + f"at step {step}: {step.model_info['error']}") for expr in _relevant_expressions(step): try: if model.process_expression(expr, step.args) is False: if step.gherkin_kw in ['when', None] and expr in step.model_info['OUT']: part1, part2 = split_for_refinement(scenario, step) - return part1, part2, model, dict() + return part1, part2, dict() else: - return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " - f"{scenario.name}, due to step '{step}': [{expr}] is False") + return None, None, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " + f"{scenario.name}, due to step '{step}': [{expr}] is False") except Exception as err: - return None, None, model, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " - f"{scenario.name}, due to step '{step}': [{expr}] {err}") - return scenario.copy(), None, model, dict() + return None, None, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " + f"{scenario.name}, due to step '{step}': [{expr}] {err}") + return scenario.copy(), None, dict() def _relevant_expressions(step: Step) -> list[str]: @@ -140,17 +140,18 @@ def handle_refinement_exit(inserted_refinement: Scenario, tracestate: TraceState f"did not meet refinement exit condition: {exit_conditions}") return - tail_inserted, remainder, new_model, extra_data = process_scenario(refinement_tail, tracestate.model) + model = tracestate.model + tail_inserted, remainder, extra_data = process_scenario(refinement_tail, model) if not tail_inserted: logger.debug(extra_data['fail_msg']) # Confirm then rewind, to roll back complete scenario, including its refiements # Because that exit check passed, this is an error in the refined scenario itself - tracestate.confirm_full_scenario(refinement_tail.src_id, refinement_tail, new_model) + tracestate.confirm_full_scenario(refinement_tail.src_id, refinement_tail, model) tail = rewind(tracestate) logger.debug(f"Having to roll back up to {tail.scenario.name if tail else 'the beginning'}") elif not remainder: - new_model.end_scenario_scope() - tracestate.confirm_full_scenario(tail_inserted.src_id, tail_inserted, new_model) + model.end_scenario_scope() + tracestate.confirm_full_scenario(tail_inserted.src_id, tail_inserted, model) logger.debug(f"Scenario '{tail_inserted.name}' completed after refinement") if tracestate.is_refinement_active(): handle_refinement_exit(tail_inserted, tracestate) @@ -158,7 +159,7 @@ def handle_refinement_exit(inserted_refinement: Scenario, tracestate: TraceState logger.debug(f"Partially inserted remainder of scenario {tail_inserted.src_id}, {tail_inserted.name}\n" f"refinement needed at step: {remainder.steps[1]}") tail_inserted.name = f"{tail_inserted.name} (part {tracestate.highest_part(tail_inserted.src_id)+1})" - tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, new_model, remainder) + tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, model, remainder) def generate_scenario_variant(scenario: Scenario, model: ModelSpace) -> Scenario: From 55c4bc95bb3cc887b29ba2623d7444dbec63fa47 Mon Sep 17 00:00:00 2001 From: JFoederer <32476108+JFoederer@users.noreply.github.com> Date: Sat, 3 Jan 2026 09:00:59 +0100 Subject: [PATCH 9/9] restore logging tweak --- robotmbt/suiteprocessors.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index 89b31cae..aa6b7f9d 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -121,9 +121,8 @@ def _try_to_reach_full_coverage(self, allow_duplicate_scenarios): previous_len = len(tracestate) modeller.try_to_fit_in_scenario(candidate, tracestate) self._report_tracestate_to_user(tracestate) - if tracestate.model: - logger.debug(f"last state:\n{tracestate.model.get_status_text()}") if len(tracestate) > previous_len: + logger.debug(f"last state:\n{tracestate.model.get_status_text()}") self.DROUGHT_LIMIT = 50 if self.__last_candidate_changed_nothing(tracestate): logger.debug("Repeated scenario did not change the model's state. Stop trying.")