diff --git a/atest/resources/birthday_cards_data_variation.resource b/atest/resources/birthday_cards_data_variation.resource index c3648b66..6096fa2d 100644 --- a/atest/resources/birthday_cards_data_variation.resource +++ b/atest/resources/birthday_cards_data_variation.resource @@ -61,7 +61,7 @@ the birthday card has ${n} different names written on it [Documentation] *model info* ... :IN: len(set(birthday_card.names)) == ${n} ... :OUT: len(set(birthday_card.names)) == ${n} - Length should be ${names} ${{int(${n})}} + Length should be ${{set(${names})}} ${{int(${n})}} ${person} signs the birthday card [Documentation] *model info* diff --git a/atest/resources/birthday_cards_flat.resource b/atest/resources/birthday_cards_flat.resource index 20dfabb8..d3976f70 100644 --- a/atest/resources/birthday_cards_flat.resource +++ b/atest/resources/birthday_cards_flat.resource @@ -60,7 +60,7 @@ ${person A} passes the birthday card ${back/on} to ${person B} Log Thank you ${person A}, I'll be sure to put my name on here. Set suite variable ${has_card} ${person B} -${person} refuses to write their name on the birthday card +${person} writes their name in invisible ink on the birthday card [Documentation] *model info* ... :IN: birthday_card ... :OUT: birthday_card diff --git a/atest/robotMBT tests/05__repeating_scenarios/02__repetition_with_identity_bogey.robot b/atest/robotMBT tests/05__repeating_scenarios/02__repetition_with_identity_bogey.robot index 964057eb..4b9b9cd3 100644 --- a/atest/robotMBT tests/05__repeating_scenarios/02__repetition_with_identity_bogey.robot +++ b/atest/robotMBT tests/05__repeating_scenarios/02__repetition_with_identity_bogey.robot @@ -19,10 +19,10 @@ Someone writes their name on the card when Someone writes their name on the birthday card then the birthday card has 'Someone' written on it -Refusing to share the birthday card +Signing the card in invisible ink Given there is a birthday card and the birthday card has 'Someone' written on it - when Johan refuses to write their name on the birthday card + when Johan writes their name in invisible ink on the birthday card then the birthday card has 'Someone' written on it but the birthday card does not have 'Johan' written on it diff --git a/atest/robotMBT tests/05__repeating_scenarios/08__repetition_caused_by_refinement.robot b/atest/robotMBT tests/05__repeating_scenarios/08__repetition_caused_by_refinement.robot new file mode 100644 index 00000000..14edf685 --- /dev/null +++ b/atest/robotMBT tests/05__repeating_scenarios/08__repetition_caused_by_refinement.robot @@ -0,0 +1,29 @@ +*** Settings *** +Documentation This suite has more low-level scenarios than high-level scenarios, +... meaning that the high-level scenario must be repeated in order for +... the second low-level scenario to be reached. +Suite Setup Treat this test suite Model-based +Resource ../../resources/birthday_cards_composed.resource +Library robotmbt + +*** Test Cases *** +Buying a card + When someone buys a birthday card + then there is a blank birthday card available + +high-level scenario + Given there is a birthday card + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + +low-level scenario A + Given there is a birthday card + when Someone writes their name in pen on the birthday card + then the birthday card has 'Someone' written on it + and there is text added in ink on the birthday card + +low-level scenario B + Given there is a birthday card + when Someone writes their name in pen on the birthday card + then the birthday card has 'Someone' written on it + and there is text added in ink on the birthday card diff --git a/atest/robotMBT tests/05__repeating_scenarios/08__impossible_trace.robot b/atest/robotMBT tests/05__repeating_scenarios/09__impossible_trace.robot similarity index 93% rename from atest/robotMBT tests/05__repeating_scenarios/08__impossible_trace.robot rename to atest/robotMBT tests/05__repeating_scenarios/09__impossible_trace.robot index 01a1f2a9..99f976fe 100644 --- a/atest/robotMBT tests/05__repeating_scenarios/08__impossible_trace.robot +++ b/atest/robotMBT tests/05__repeating_scenarios/09__impossible_trace.robot @@ -11,9 +11,9 @@ Buying a card When someone buys a birthday card then there is a blank birthday card available -Refusing to sign the birthday card +Signing the card in invisible ink Given there is a birthday card - when everybody refuses to write their name on the birthday card + when everybody writes their name in invisible ink on the birthday card then the birthday card has 0 names written on it At least 42 people can write their name on the card diff --git a/atest/robotMBT tests/05__repeating_scenarios/10__reject_refinement_on_exit_condition.robot b/atest/robotMBT tests/05__repeating_scenarios/10__reject_refinement_on_exit_condition.robot new file mode 100644 index 00000000..59e423d7 --- /dev/null +++ b/atest/robotMBT tests/05__repeating_scenarios/10__reject_refinement_on_exit_condition.robot @@ -0,0 +1,47 @@ +*** Settings *** +Documentation This suite confirms that a scenario that can be inserted at the place +... of refinement based on its entry conditions, but afterwards does not +... satisfy the high-level scenario's exit conditions, is rejected. +Suite Setup Expect failing suite processing +Resource ../../resources/birthday_cards_flat.resource +Library robotmbt + + +*** Test Cases *** +Buying a card + When someone buys a birthday card + then there is a blank birthday card available + +high-level scenario + Given there is a birthday card + when Two people write their name on the birthday card + then the birthday card has 2 names written on it + +low-level scenario + Given there is a birthday card + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + + +*** Keywords *** +Two people write their name on the birthday card + [Documentation] + ... *model info* + ... :IN: scenario.count = len(birthday_card.names) + ... :OUT: len(birthday_card.names) == scenario.count+2 + Skip when unreachable + Length should be ${names} ${2} + +Expect failing suite processing + Run keyword and expect error Unable to compose* Treat this test suite Model-based + Set suite variable ${expected_error_detected} ${True} + +Skip when unreachable + [Documentation] + ... If the scenario is inserted after proper detection of the expected error, + ... then this keyword causes the remainder of the scenario to be skipped and + ... the test passes. When inserted without detected error, the scenario will + ... fail. + IF ${expected_error_detected} + Pass execution Accepting intentionally unreachable scenario + END diff --git a/atest/robotMBT tests/05__repeating_scenarios/11__reject_double_refinement.robot b/atest/robotMBT tests/05__repeating_scenarios/11__reject_double_refinement.robot new file mode 100644 index 00000000..d6063437 --- /dev/null +++ b/atest/robotMBT tests/05__repeating_scenarios/11__reject_double_refinement.robot @@ -0,0 +1,86 @@ +*** Settings *** +Documentation This suite covers a special case where an incomplete rollback inside a +... scenario which is split up for refinement, could cause two scenarios +... to be inserted for a single refinement. To get into that situation the +... high-level scenario has two steps that require refinement. The first +... one only checks that a certain name is inserted, it does not check the +... number of names. The second one checks that the total number of names +... is three. This should be an unreachable situation, because refinements +... are always a single scenario and each of the scenarios only inserts a +... single name. If the rollback of the middle part (2.2 in the trace) was +... incomplete, i.e. its rollback did not include the refinement scenario +... as well, then inserting the second low-level scenario would satisfy +... the first exit conditions and inserting another low-level scenario for +... the second refinement would satisfy exit conditions for both steps and +... incorrectly complete the suite. +Suite Setup Expect failing suite processing +Resource ../../resources/birthday_cards_flat.resource +Library robotmbt + + +*** Test Cases *** +Buying a card + When someone buys a birthday card + then there is a blank birthday card available + +high-level scenario + Given there is a birthday card + when The first person writes their name on the birthday card + and Two more people write their name on the birthday card + then the birthday card has 3 names written on it + +low-level scenario A + Given there is a birthday card + and we are in refinement + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + +low-level scenario B + Given there is a birthday card + and we are in refinement + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + +low-level scenario C + Given there is a birthday card + and we are in refinement + when Someone writes their name on the birthday card + then the birthday card has 'Someone' written on it + + +*** Keywords *** +The first person writes their name on the birthday card + [Documentation] *model info* + ... :IN: scenario.count = len(birthday_card.names) + ... :OUT: Someone in birthday_card.names + Skip when unreachable + Should Contain ${names} Someone + +Two more people write their name on the birthday card + [Documentation] *model info* + ... :IN: scenario.count + ... :OUT: len(birthday_card.names) == scenario.count+3 + Skip when unreachable + Length should be ${names} ${2} + +we are in refinement + [Documentation] Helper to prevent lower-level scenarios from being valid + ... at the top-level. (Better for performance) + ... *model info* + ... :IN: scenario.count + ... :OUT: None + No Operation + +Expect failing suite processing + Run keyword and expect error Unable to compose* Treat this test suite Model-based + Set suite variable ${expected_error_detected} ${True} + +Skip when unreachable + [Documentation] + ... If the scenario is inserted after proper detection of the expected error, + ... then this keyword causes the remainder of the scenario to be skipped and + ... the test passes. When inserted without detected error, the scenario will + ... fail. + IF ${expected_error_detected} + Pass execution Accepting intentionally unreachable scenario + END diff --git a/atest/robotMBT tests/06__data_variation/01__equivalence_partitioning.robot b/atest/robotMBT tests/06__data_variation/01__equivalence_partitioning.robot index 84cc4082..6410f36e 100644 --- a/atest/robotMBT tests/06__data_variation/01__equivalence_partitioning.robot +++ b/atest/robotMBT tests/06__data_variation/01__equivalence_partitioning.robot @@ -8,6 +8,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/02__equivalence_partitioning_double_data.robot b/atest/robotMBT tests/06__data_variation/02__equivalence_partitioning_double_data.robot index 332ab48c..ae9c0929 100644 --- a/atest/robotMBT tests/06__data_variation/02__equivalence_partitioning_double_data.robot +++ b/atest/robotMBT tests/06__data_variation/02__equivalence_partitioning_double_data.robot @@ -9,6 +9,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/03__refinement_with_data_reuse.robot b/atest/robotMBT tests/06__data_variation/03__refinement_with_data_reuse.robot index 52762ce1..4e47bb95 100644 --- a/atest/robotMBT tests/06__data_variation/03__refinement_with_data_reuse.robot +++ b/atest/robotMBT tests/06__data_variation/03__refinement_with_data_reuse.robot @@ -9,6 +9,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/04__refinement_with_data_fan_out.robot b/atest/robotMBT tests/06__data_variation/04__refinement_with_data_fan_out.robot index dc8da64d..c0b67eab 100644 --- a/atest/robotMBT tests/06__data_variation/04__refinement_with_data_fan_out.robot +++ b/atest/robotMBT tests/06__data_variation/04__refinement_with_data_fan_out.robot @@ -1,16 +1,17 @@ *** Settings *** Documentation This suite uses refinement and equivalence partitioning. The high-level scenario ... requires just a single key example from the equivalence class and uses just one -... concrete example. This scenario is then refined by two more detailed examples -... that use 2 different actors with specific characters. One is concise, the other -... a bit more elaborate. This implies that for at least one set of examples the -... high-level scenario's example value does not match the low-level scenario's -... example value. They must however still be matched, and kept identical, under -... refinement. +... concrete example. This scenario can be refined by two more detailed examples +... that use 2 different actors. This implies that one refinement example will match +... the high-level scenario's example, the other does not. To complete the trace, +... the high-level scenario must be repeated, once with each possible refinement. +... The example values between the high- and low-level scenarios must be matched, +... and kept identical, under refinement. Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/05__refinement_with_data_fan_out_multi_part.robot b/atest/robotMBT tests/06__data_variation/05__refinement_with_data_fan_out_multi_part.robot new file mode 100644 index 00000000..7f82d960 --- /dev/null +++ b/atest/robotMBT tests/06__data_variation/05__refinement_with_data_fan_out_multi_part.robot @@ -0,0 +1,64 @@ +*** Settings *** +Documentation This suite is an extension to the 'data fan out' suite, which needed refinement +... for just a single step. Here, the high-level scenario needs refinement in three +... of its steps. The background defines three actors and there are three refinement +... scenarios available. One for each actor. However, the high-level scenario only +... uses two of the actors, forcing the model to match up two of the steps and to +... never use the third option. The test fails if the model does not properly keep +... its data choices over all its steps when splitting the high-level scenario, and +... picks data independently in each step. +Suite Setup Treat this test suite Model-based +Resource ../../resources/birthday_cards_data_variation.resource +Library robotmbt + + +*** Test Cases *** +Background + Given Bahar is having their birthday + and Johan is a friend of Bahar + and Tannaz is a friend of Bahar + and Frederique is a friend of Bahar + When Johan buys a birthday card + then there is a blank birthday card available + +A friend signs the birthday card + Given there is a birthday card + when Johan signs the birthday card + and Tannaz signs the birthday card + and Johan signs the birthday card again + then the birthday card has a personal touch + and the birthday card has 2 different names written on it + +Signing the birthday card with your name only + Given there is a birthday card + and Johan is signing the birthday card + when Johan writes their name on the birthday card + then the birthday card has 'Johan' written on it + +Signing the birthday card with: Happy birthday! + Given there is a birthday card + and Tannaz is signing the birthday card + when Tannaz writes their name on the birthday card + and Tannaz adds the wish 'Happy birthday!' to the birthday card + then the birthday card has 'Tannaz' written on it + and the birthday card proclaims: Happy birthday! + +Signing the birthday card with: Cheers! + Given there is a birthday card + and Frederique is signing the birthday card + when Frederique writes their name on the birthday card + and Frederique adds the wish 'Cheers!' to the birthday card + then the birthday card has 'Frederique' written on it + and the birthday card proclaims: Cheers! + + +*** Keywords *** +${person} signs the birthday card again + [Documentation] Similar to '${person} signs the birthday card', but + ... without the check that the name is not already on there. + ... + ... *model info* + ... :MOD: ${person}= [guest for guest in party.guests] + ... :IN: scenario.guest = ${person} | scenario.count = len(birthday_card.names) + ... :OUT: len(birthday_card.names) == scenario.count+1 + Should contain ${names} ${person} diff --git a/atest/robotMBT tests/06__data_variation/05__interacting_equivalence_classes.robot b/atest/robotMBT tests/06__data_variation/06__interacting_equivalence_classes.robot similarity index 99% rename from atest/robotMBT tests/06__data_variation/05__interacting_equivalence_classes.robot rename to atest/robotMBT tests/06__data_variation/06__interacting_equivalence_classes.robot index 87a5fa15..2c8f0f7c 100644 --- a/atest/robotMBT tests/06__data_variation/05__interacting_equivalence_classes.robot +++ b/atest/robotMBT tests/06__data_variation/06__interacting_equivalence_classes.robot @@ -12,6 +12,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Johan is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/06__independent_equivalence_classes.robot b/atest/robotMBT tests/06__data_variation/07__independent_equivalence_classes.robot similarity index 99% rename from atest/robotMBT tests/06__data_variation/06__independent_equivalence_classes.robot rename to atest/robotMBT tests/06__data_variation/07__independent_equivalence_classes.robot index 4899c0b7..bba90197 100644 --- a/atest/robotMBT tests/06__data_variation/06__independent_equivalence_classes.robot +++ b/atest/robotMBT tests/06__data_variation/07__independent_equivalence_classes.robot @@ -12,6 +12,7 @@ Suite Setup Treat this test suite Model-based Resource ../../resources/birthday_cards_data_variation.resource Library robotmbt + *** Test Cases *** Background Given Bahar is having their birthday diff --git a/atest/robotMBT tests/06__data_variation/07__define_example_values_in_then_step.robot b/atest/robotMBT tests/06__data_variation/08__define_example_values_in_then_step.robot similarity index 99% rename from atest/robotMBT tests/06__data_variation/07__define_example_values_in_then_step.robot rename to atest/robotMBT tests/06__data_variation/08__define_example_values_in_then_step.robot index db589cb4..c31de190 100644 --- a/atest/robotMBT tests/06__data_variation/07__define_example_values_in_then_step.robot +++ b/atest/robotMBT tests/06__data_variation/08__define_example_values_in_then_step.robot @@ -15,6 +15,7 @@ Documentation This test suite focuses on the initialisation of model data fr Suite Setup Treat this test suite Model-based Library robotmbt + *** Test Cases *** Background Given Bahar is throwing a party for their friends diff --git a/robotmbt/modeller.py b/robotmbt/modeller.py new file mode 100644 index 00000000..6b793557 --- /dev/null +++ b/robotmbt/modeller.py @@ -0,0 +1,261 @@ +# -*- coding: utf-8 -*- + +# BSD 3-Clause License +# +# Copyright (c) 2026, J. Foederer +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import Any + +from robot.api import logger +from robot.utils import is_list_like + +from .modelspace import ModelSpace +from .steparguments import StepArgument, StepArguments +from .substitutionmap import SubstitutionMap +from .suitedata import Scenario, Step +from .tracestate import TraceState + + +def try_to_fit_in_scenario(candidate: Scenario, tracestate: TraceState): + """ + Tries to insert the candidate scenario into the trace (in full or partial) and + updates tracestate accordingly. + """ + model = tracestate.model if tracestate.model else ModelSpace() + model.new_scenario_scope() + inserted, remainder, extra_data = process_scenario(candidate, model) + if not inserted: # insertion failed + tracestate.reject_scenario(candidate.src_id) + logger.debug(extra_data['fail_msg']) + elif not remainder: # the scenario processed in full + model.end_scenario_scope() + tracestate.confirm_full_scenario(inserted.src_id, inserted, model) + logger.debug(f"Inserted scenario {inserted.src_id}, {inserted.name}") + if tracestate.is_refinement_active(): + handle_refinement_exit(inserted, tracestate) + else: # the scenario is split into two parts, ready for refinement + logger.debug(f"Partially inserted scenario {inserted.src_id}, {inserted.name}\n" + f"Refinement needed at step: {remainder.steps[1]}") + inserted.name = f"{inserted.name} (part {tracestate.highest_part(inserted.src_id)+1})" + tracestate.push_partial_scenario(inserted.src_id, inserted, model, remainder) + + +def process_scenario(scenario: Scenario, model: ModelSpace) -> tuple[Scenario, Scenario, dict[str, Any]]: + for step in scenario.steps: + if 'error' in step.model_info: + return None, None, dict(fail_masg=f"Error in scenario {scenario.name} " + f"at step {step}: {step.model_info['error']}") + for expr in _relevant_expressions(step): + try: + if model.process_expression(expr, step.args) is False: + if step.gherkin_kw in ['when', None] and expr in step.model_info['OUT']: + part1, part2 = split_for_refinement(scenario, step) + return part1, part2, dict() + else: + return None, None, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " + f"{scenario.name}, due to step '{step}': [{expr}] is False") + except Exception as err: + return None, None, dict(fail_msg=f"Unable to insert scenario {scenario.src_id}, " + f"{scenario.name}, due to step '{step}': [{expr}] {err}") + return scenario.copy(), None, dict() + + +def _relevant_expressions(step: Step) -> list[str]: + if step.gherkin_kw is None and not step.model_info: + return [] # model info is optional for action keywords + expressions = [] + if 'IN' not in step.model_info or 'OUT' not in step.model_info: + raise Exception(f"Model info incomplete for step: {step}") + if step.gherkin_kw in ['given', 'when', None]: + expressions += step.model_info['IN'] + if step.gherkin_kw in ['when', 'then', None]: + expressions += step.model_info['OUT'] + return expressions + + +def split_for_refinement(scenario: Scenario, step: Step) -> tuple[Scenario, Scenario]: + front, back = scenario.split_at_step(scenario.steps.index(step)) + remaining_steps = '\n\t'.join([step.full_keyword, '- '*35] + [s.full_keyword for s in back.steps[1:]]) + remaining_steps = _escape_robot_vars(remaining_steps) + edge_step = Step('Log', f"Refinement follows for step:\n\t{remaining_steps}", parent=scenario) + edge_step.gherkin_kw = step.gherkin_kw + edge_step.model_info = dict(IN=step.model_info['IN'], OUT=[]) + edge_step.detached = True + edge_step.args = StepArguments(step.args) + front.steps.append(edge_step) + back.steps.insert(0, Step('Log', f"Refinement ready, completing step", parent=scenario)) + back.steps[1] = back.steps[1].copy() + back.steps[1].model_info['IN'] = [] + return (front, back) + + +def _escape_robot_vars(text: str) -> str: + for seq in ("${", "@{", "%{", "&{", "*{"): + text = text.replace(seq, "\\" + seq) + return text + + +def handle_refinement_exit(inserted_refinement: Scenario, tracestate: TraceState): + refinement_tail = tracestate.get_remainder(tracestate.active_refinements[-1]) + exit_conditions = refinement_tail.steps[1].model_info['OUT'] + exit_conditions_processed = False + for expr in exit_conditions: + try: + if tracestate.model.process_expression(expr, refinement_tail.steps[1].args) is False: + break + except Exception: + break + else: + exit_conditions_processed = True + + if not exit_conditions_processed: + rewind(tracestate) # Reject insterted scenario. Even though it fits, it is not a refinement. + logger.debug(f"Reconsidering scenario {inserted_refinement.src_id}, {inserted_refinement.name}, " + f"did not meet refinement exit condition: {exit_conditions}") + return + + model = tracestate.model + tail_inserted, remainder, extra_data = process_scenario(refinement_tail, model) + if not tail_inserted: + logger.debug(extra_data['fail_msg']) + # Confirm then rewind, to roll back complete scenario, including its refiements + # Because that exit check passed, this is an error in the refined scenario itself + tracestate.confirm_full_scenario(refinement_tail.src_id, refinement_tail, model) + tail = rewind(tracestate) + logger.debug(f"Having to roll back up to {tail.scenario.name if tail else 'the beginning'}") + elif not remainder: + model.end_scenario_scope() + tracestate.confirm_full_scenario(tail_inserted.src_id, tail_inserted, model) + logger.debug(f"Scenario '{tail_inserted.name}' completed after refinement") + if tracestate.is_refinement_active(): + handle_refinement_exit(tail_inserted, tracestate) + else: + logger.debug(f"Partially inserted remainder of scenario {tail_inserted.src_id}, {tail_inserted.name}\n" + f"refinement needed at step: {remainder.steps[1]}") + tail_inserted.name = f"{tail_inserted.name} (part {tracestate.highest_part(tail_inserted.src_id)+1})" + tracestate.push_partial_scenario(tail_inserted.src_id, tail_inserted, model, remainder) + + +def generate_scenario_variant(scenario: Scenario, model: ModelSpace) -> Scenario: + scenario = scenario.copy() + # collect set of constraints + subs = SubstitutionMap() + try: + for step in scenario.steps: + for expr in step.model_info.get('MOD', []): + modded_arg, constraint = _parse_modifier_expression(expr, step.args) + if step.args[modded_arg].is_default: + continue + if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]: + org_example = step.args[modded_arg].org_value + if step.gherkin_kw == 'then': + constraint = None # No new constraints are processed for then-steps + if org_example not in subs.substitutions: + # if a then-step signals the first use of an example value, it is considered a new definition + subs.substitute(org_example, [org_example]) + continue + if not constraint and org_example not in subs.substitutions: + raise ValueError(f"No options to choose from at first assignment to {org_example}") + if constraint and constraint != '.*': + options = model.process_expression(constraint, step.args) + if options == 'exec': + raise ValueError(f"Invalid constraint for argument substitution: {expr}") + if not options: + raise ValueError(f"Constraint on modifer did not yield any options: {expr}") + if not is_list_like(options): + raise ValueError(f"Constraint on modifer did not yield a set of options: {expr}") + else: + options = None + subs.substitute(org_example, options) + elif step.args[modded_arg].kind == StepArgument.VAR_POS: + if step.args[modded_arg].value: + modded_varargs = model.process_expression(constraint, step.args) + if not is_list_like(modded_varargs): + raise ValueError(f"Modifying varargs must yield a list of arguments") + # Varargs are not added to the substitution map, but are used directly as-is. A modifier can + # change the number of arguments in the list, making it impossible to decide which values to + # match and which to drop and/or duplicate. + step.args[modded_arg].value = modded_varargs + elif step.args[modded_arg].kind == StepArgument.FREE_NAMED: + if step.args[modded_arg].value: + modded_free_args = model.process_expression(constraint, step.args) + if not isinstance(modded_free_args, dict): + raise ValueError("Modifying free named arguments must yield a dict") + # Similar to varargs, modified free named arguments are used directly as-is. + step.args[modded_arg].value = modded_free_args + else: + raise AssertionError(f"Unknown argument kind for {modded_arg}") + except Exception as err: + logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n" + f" In step {step}: {err}") + return None + + try: + subs.solve() + except ValueError as err: + logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n" + f" {err}: {subs}") + return None + + # Update scenario with generated values + if subs.solution: + logger.debug(f"Example variant generated with argument substitution: {subs}") + scenario.data_choices = subs + for step in scenario.steps: + if 'MOD' in step.model_info: + for expr in step.model_info['MOD']: + modded_arg, _ = _parse_modifier_expression(expr, step.args) + if step.args[modded_arg].is_default: + continue + org_example = step.args[modded_arg].org_value + if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]: + step.args[modded_arg].value = subs.solution[org_example] + return scenario + + +def _parse_modifier_expression(expression: str, args: tuple[str]) -> tuple[str, str]: + if expression.startswith('${'): + for var in args: + if expression.casefold().startswith(var.arg.casefold()): + assignment_expr = expression.replace(var.arg, '', 1).strip() + if not assignment_expr.startswith('=') or assignment_expr.startswith('=='): + break # not an assignment + constraint = assignment_expr.replace('=', '', 1).strip() + return var.arg, constraint + raise ValueError(f"Invalid argument substitution: {expression}") + + +def rewind(tracestate: TraceState, drought_recovery: bool = False) -> Scenario: + if tracestate[-1].remainder and tracestate.highest_part(tracestate[-1].remainder.src_id) > 1: + # When rewinding an 'in between' part, rewind both the part and the refinement + tracestate.rewind() + tail = tracestate.rewind() + while drought_recovery and tracestate.coverage_drought: + tail = tracestate.rewind() + return tail diff --git a/robotmbt/suiteprocessors.py b/robotmbt/suiteprocessors.py index 669b64bd..aa6b7f9d 100644 --- a/robotmbt/suiteprocessors.py +++ b/robotmbt/suiteprocessors.py @@ -34,13 +34,11 @@ import random from robot.api import logger -from robot.utils import is_list_like -from .substitutionmap import SubstitutionMap +from . import modeller from .modelspace import ModelSpace -from .suitedata import Suite, Scenario, Step +from .suitedata import Suite from .tracestate import TraceState -from .steparguments import StepArgument, StepArguments class SuiteProcessors: @@ -89,61 +87,74 @@ def process_test_suite(self, in_suite, *, seed='new'): "\n\t".join([f"{s.src_id}: {s.name}" for s in self.scenarios])) self._init_randomiser(seed) - random.shuffle(self.scenarios) + self.shuffled = [s.src_id for s in self.scenarios] + random.shuffle(self.shuffled) # Keep a single shuffle for all TraceStates (non-essential) # a short trace without the need for repeating scenarios is preferred - self._try_to_reach_full_coverage(allow_duplicate_scenarios=False) + tracestate = self._try_to_reach_full_coverage(allow_duplicate_scenarios=False) - if not self.tracestate.coverage_reached(): + if not tracestate.coverage_reached(): logger.debug("Direct trace not available. Allowing repetition of scenarios") - self._try_to_reach_full_coverage(allow_duplicate_scenarios=True) - if not self.tracestate.coverage_reached(): + tracestate = self._try_to_reach_full_coverage(allow_duplicate_scenarios=True) + if not tracestate.coverage_reached(): raise Exception("Unable to compose a consistent suite") - self.out_suite.scenarios = self.tracestate.get_trace() - self._report_tracestate_wrapup() + self.out_suite.scenarios = tracestate.get_trace() + self._report_tracestate_wrapup(tracestate) return self.out_suite def _try_to_reach_full_coverage(self, allow_duplicate_scenarios): - self.tracestate = TraceState(len(self.scenarios)) - self.active_model = ModelSpace() - while not self.tracestate.coverage_reached(): - i_candidate = self.tracestate.next_candidate(retry=allow_duplicate_scenarios) - if i_candidate is None: - if not self.tracestate.can_rewind(): + tracestate = TraceState(self.shuffled) + while not tracestate.coverage_reached(): + candidate_id = tracestate.next_candidate(retry=allow_duplicate_scenarios) + if candidate_id is None: # No more candidates remaining for this level + if not tracestate.can_rewind(): break - tail = self._rewind() - logger.debug("Having to roll back up to " - f"{tail.scenario.name if tail else 'the beginning'}") - self._report_tracestate_to_user() + tail = modeller.rewind(tracestate) + logger.debug(f"Having to roll back up to {tail.scenario.name if tail else 'the beginning'}") + self._report_tracestate_to_user(tracestate) else: - self.active_model.new_scenario_scope() - inserted = self._try_to_fit_in_scenario(i_candidate, self._scenario_with_repeat_counter(i_candidate), - retry_flag=allow_duplicate_scenarios) - if inserted: + candidate = self._select_scenario_variant(candidate_id, tracestate) + if not candidate: # No valid variant available in the current state + tracestate.reject_scenario(candidate_id) + continue + previous_len = len(tracestate) + modeller.try_to_fit_in_scenario(candidate, tracestate) + self._report_tracestate_to_user(tracestate) + if len(tracestate) > previous_len: + logger.debug(f"last state:\n{tracestate.model.get_status_text()}") self.DROUGHT_LIMIT = 50 - if self.__last_candidate_changed_nothing(): + if self.__last_candidate_changed_nothing(tracestate): logger.debug("Repeated scenario did not change the model's state. Stop trying.") - self._rewind() - elif self.tracestate.coverage_drought > self.DROUGHT_LIMIT: + modeller.rewind(tracestate) + elif tracestate.coverage_drought > self.DROUGHT_LIMIT: logger.debug(f"Went too long without new coverage (>{self.DROUGHT_LIMIT}x). " "Roll back to last coverage increase and try something else.") - self._rewind(drought_recovery=True) - self._report_tracestate_to_user() - logger.debug(f"last state:\n{self.active_model.get_status_text()}") + modeller.rewind(tracestate, drought_recovery=True) + self._report_tracestate_to_user(tracestate) + logger.debug(f"last state:\n{tracestate.model.get_status_text()}") + return tracestate - def __last_candidate_changed_nothing(self): - if len(self.tracestate) < 2: + @staticmethod + def __last_candidate_changed_nothing(tracestate): + if len(tracestate) < 2: return False - if self.tracestate[-1].id != self.tracestate[-2].id: + if tracestate[-1].id != tracestate[-2].id: return False - return self.tracestate[-1].model == self.tracestate[-2].model + return tracestate[-1].model == tracestate[-2].model + + def _select_scenario_variant(self, candidate_id, tracestate): + candidate = self._scenario_with_repeat_counter(candidate_id, tracestate) + candidate = modeller.generate_scenario_variant(candidate, tracestate.model or ModelSpace()) + return candidate - def _scenario_with_repeat_counter(self, index): - """Fetches the scenario by index and, if this scenario is already used in the trace, - adds a repetition counter to its name.""" - candidate = self.scenarios[index] - rep_count = self.tracestate.count(index) + def _scenario_with_repeat_counter(self, index, tracestate): + """ + Fetches the scenario by index and, if this scenario is already + used in the trace, adds a repetition counter to its name. + """ + candidate = next(s for s in self.scenarios if s.src_id == index) + rep_count = tracestate.count(index) if rep_count: candidate = candidate.copy() candidate.name = f"{candidate.name} (rep {rep_count+1})" @@ -158,274 +169,17 @@ def _fail_on_step_errors(suite): for s in error_list]) raise Exception(err_msg) - def _try_to_fit_in_scenario(self, index, candidate, retry_flag): - candidate = self._generate_scenario_variant(candidate, self.active_model) - if not candidate: - self.active_model.end_scenario_scope() - self.tracestate.reject_scenario(index) - self._report_tracestate_to_user() - return False - - confirmed_candidate, new_model = self._process_scenario(candidate, self.active_model) - if confirmed_candidate: - self.active_model = new_model - self.active_model.end_scenario_scope() - self.tracestate.confirm_full_scenario(index, confirmed_candidate, self.active_model) - logger.debug(f"Inserted scenario {confirmed_candidate.src_id}, {confirmed_candidate.name}") - self._report_tracestate_to_user() - logger.debug(f"last state:\n{self.active_model.get_status_text()}") - return True - - part1, part2 = self._split_candidate_if_refinement_needed(candidate, self.active_model) - if part2: - exit_conditions = part2.steps[1].model_info['OUT'] - part1.name = f"{part1.name} (part {self.tracestate.highest_part(index)+1})" - part1, new_model = self._process_scenario(part1, self.active_model) - self.tracestate.push_partial_scenario(index, part1, new_model) - self.active_model = new_model - self._report_tracestate_to_user() - logger.debug(f"last state:\n{self.active_model.get_status_text()}") - - i_refine = self.tracestate.next_candidate(retry=retry_flag) - if i_refine is None: - logger.debug("Refinement needed, but there are no scenarios left") - self._rewind() - self._report_tracestate_to_user() - return False - while i_refine is not None: - self.active_model.new_scenario_scope() - m_inserted = self._try_to_fit_in_scenario( - i_refine, self._scenario_with_repeat_counter(i_refine), retry_flag) - if m_inserted: - insert_valid_here = True - try: - # Check exit condition before finalizing refinement and inserting the tail part - model_scratchpad = self.active_model.copy() - for expr in exit_conditions: - if model_scratchpad.process_expression(expr, part2.steps[1].args) is False: - insert_valid_here = False - break - except Exception: - insert_valid_here = False - if insert_valid_here: - m_finished = self._try_to_fit_in_scenario(index, part2, retry_flag) - if m_finished: - return True - else: - logger.debug(f"Scenario did not meet refinement conditions {exit_conditions}") - logger.debug(f"last state:\n{self.active_model.get_status_text()}") - logger.debug(f"Reconsidering {self.scenarios[i_refine].name}, scenario excluded") - self._rewind() - self._report_tracestate_to_user() - i_refine = self.tracestate.next_candidate(retry=retry_flag) - self._rewind() - self._report_tracestate_to_user() - return False - - self.active_model.end_scenario_scope() - self.tracestate.reject_scenario(index) - self._report_tracestate_to_user() - return False - - def _rewind(self, drought_recovery=False): - tail = self.tracestate.rewind() - while drought_recovery and self.tracestate.coverage_drought: - tail = self.tracestate.rewind() - self.active_model = self.tracestate.model or ModelSpace() - return tail - @staticmethod - def _split_candidate_if_refinement_needed(scenario, model): - m = model.copy() - scenario = scenario.copy() - no_split = (scenario, None) - for step in scenario.steps: - if 'error' in step.model_info: - return no_split - if step.gherkin_kw in ['given', 'when', None]: - for expr in step.model_info.get('IN', []): - try: - if m.process_expression(expr, step.args) is False: - return no_split - except Exception: - return no_split - if step.gherkin_kw in ['when', 'then', None]: - for expr in step.model_info.get('OUT', []): - refine_here = False - try: - if m.process_expression(expr, step.args) is False: - if step.gherkin_kw in ['when', None]: - logger.debug(f"Refinement needed for scenario: {scenario.name}\nat step: {step}") - refine_here = True - else: - return no_split - except Exception: - return no_split - if refine_here: - front, back = scenario.split_at_step(scenario.steps.index(step)) - remaining_steps = '\n\t'.join([step.full_keyword, '- '*35] + - [s.full_keyword for s in back.steps[1:]]) - remaining_steps = SuiteProcessors.escape_robot_vars(remaining_steps) - edge_step = Step('Log', f"Refinement follows for step:\n\t{remaining_steps}", parent=scenario) - edge_step.gherkin_kw = step.gherkin_kw - edge_step.model_info = dict(IN=step.model_info['IN'], OUT=[]) - edge_step.detached = True - edge_step.args = StepArguments(step.args) - front.steps.append(edge_step) - back.steps.insert(0, Step('Log', f"Refinement ready, completing step", parent=scenario)) - back.steps[1] = back.steps[1].copy() - back.steps[1].model_info['IN'] = [] - return (front, back) - return no_split + def _report_tracestate_to_user(tracestate): + user_trace = f"[{', '.join(tracestate.id_trace)}]" + logger.debug(f"Trace: {user_trace} Reject: {list(tracestate.tried)}") @staticmethod - def escape_robot_vars(text): - for seq in ("${", "@{", "%{", "&{", "*{"): - text = text.replace(seq, "\\" + seq) - return text - - @staticmethod - def _process_scenario(scenario, model): - m = model.copy() - scenario = scenario.copy() - for step in scenario.steps: - if 'error' in step.model_info: - logger.debug(f"Error in scenario {scenario.name} at step {step}: {step.model_info['error']}") - return None, None - for expr in SuiteProcessors._relevant_expressions(step): - try: - if m.process_expression(expr, step.args) is False: - raise Exception(False) - except Exception as err: - logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, " - f"due to step '{step}': [{expr}] {err}") - return None, None - return scenario, m - - @staticmethod - def _relevant_expressions(step): - if step.gherkin_kw is None and not step.model_info: - return [] # model info is optional for action keywords - expressions = [] - if 'IN' not in step.model_info or 'OUT' not in step.model_info: - raise Exception(f"Model info incomplete for step: {step}") - if step.gherkin_kw in ['given', 'when', None]: - expressions += step.model_info['IN'] - if step.gherkin_kw in ['when', 'then', None]: - expressions += step.model_info['OUT'] - return expressions - - def _generate_scenario_variant(self, scenario, model): - m = model.copy() - scenario = scenario.copy() - scenarios_in_refinement = self.tracestate.find_scenarios_with_active_refinement() - - # reuse previous solution for all parts in split-up scenario - for sir in scenarios_in_refinement: - if sir.src_id == scenario.src_id: - return scenario - - # collect set of constraints - subs = SubstitutionMap() - try: - for step in scenario.steps: - if 'MOD' in step.model_info: - for expr in step.model_info['MOD']: - modded_arg, constraint = self._parse_modifier_expression(expr, step.args) - if step.args[modded_arg].is_default: - continue - if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]: - org_example = step.args[modded_arg].org_value - if step.gherkin_kw == 'then': - constraint = None # No new constraints are processed for then-steps - if org_example not in subs.substitutions: - # if a then-step signals the first use of an example value, it is considered a new definition - subs.substitute(org_example, [org_example]) - continue - if not constraint and org_example not in subs.substitutions: - raise ValueError(f"No options to choose from at first assignment to {org_example}") - if constraint and constraint != '.*': - options = m.process_expression(constraint, step.args) - if options == 'exec': - raise ValueError(f"Invalid constraint for argument substitution: {expr}") - if not options: - raise ValueError(f"Constraint on modifer did not yield any options: {expr}") - if not is_list_like(options): - raise ValueError(f"Constraint on modifer did not yield a set of options: {expr}") - else: - options = None - subs.substitute(org_example, options) - elif step.args[modded_arg].kind == StepArgument.VAR_POS: - if step.args[modded_arg].value: - modded_varargs = m.process_expression(constraint, step.args) - if not is_list_like(modded_varargs): - raise ValueError(f"Modifying varargs must yield a list of arguments") - # Varargs are not added to the substitution map, but are used directly as-is. A modifier can - # change the number of arguments in the list, making it impossible to decide which values to - # match and which to drop and/or duplicate. - step.args[modded_arg].value = modded_varargs - elif step.args[modded_arg].kind == StepArgument.FREE_NAMED: - if step.args[modded_arg].value: - modded_free_args = m.process_expression(constraint, step.args) - if not isinstance(modded_free_args, dict): - raise ValueError("Modifying free named arguments must yield a dict") - # Similar to varargs, modified free named arguments are used directly as-is. - step.args[modded_arg].value = modded_free_args - else: - raise AssertionError(f"Unknown argument kind for {modded_arg}") - except Exception as err: - logger.debug(f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n" - f" In step {step}: {err}") - return None - - try: - subs.solve() - except ValueError as err: - logger.debug( - f"Unable to insert scenario {scenario.src_id}, {scenario.name}, due to modifier\n {err}: {subs}") - return None - - # Update scenario with generated values - if subs.solution: - logger.debug(f"Example variant generated with argument substitution: {subs}") - scenario.data_choices = subs - for step in scenario.steps: - if 'MOD' in step.model_info: - for expr in step.model_info['MOD']: - modded_arg, _ = self._parse_modifier_expression(expr, step.args) - if step.args[modded_arg].is_default: - continue - org_example = step.args[modded_arg].org_value - if step.args[modded_arg].kind in [StepArgument.EMBEDDED, StepArgument.POSITIONAL, StepArgument.NAMED]: - step.args[modded_arg].value = subs.solution[org_example] - return scenario - - @staticmethod - def _parse_modifier_expression(expression, args): - if expression.startswith('${'): - for var in args: - if expression.casefold().startswith(var.arg.casefold()): - assignment_expr = expression.replace(var.arg, '', 1).strip() - if not assignment_expr.startswith('=') or assignment_expr.startswith('=='): - break # not an assignment - constraint = assignment_expr.replace('=', '', 1).strip() - return var.arg, constraint - raise ValueError(f"Invalid argument substitution: {expression}") - - def _report_tracestate_to_user(self): - user_trace = "[" - for snapshot in self.tracestate: - part = f".{snapshot.id.split('.')[1]}" if '.' in snapshot.id else "" - user_trace += f"{snapshot.scenario.src_id}{part}, " - user_trace = user_trace[:-2] + "]" if ',' in user_trace else "[]" - reject_trace = [self.scenarios[i].src_id for i in self.tracestate.tried] - logger.debug(f"Trace: {user_trace} Reject: {reject_trace}") - - def _report_tracestate_wrapup(self): + def _report_tracestate_wrapup(tracestate): logger.info("Trace composed:") - for step in self.tracestate: - logger.info(step.scenario.name) - logger.debug(f"model\n{step.model.get_status_text()}\n") + for progression in tracestate: + logger.info(progression.scenario.name) + logger.debug(f"model\n{progression.model.get_status_text()}\n") @staticmethod def _init_randomiser(seed): diff --git a/robotmbt/tracestate.py b/robotmbt/tracestate.py index 22526748..959db914 100644 --- a/robotmbt/tracestate.py +++ b/robotmbt/tracestate.py @@ -31,121 +31,137 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. class TraceState: - def __init__(self, n_scenarios): - self._c_pool = [False] * n_scenarios # coverage pool: True means scenario is in trace + def __init__(self, scenario_indexes: list[int]): + self.c_pool = {index: 0 for index in scenario_indexes} + if len(self.c_pool) != len(scenario_indexes): + raise ValueError("Scenarios must be uniquely identifiable") self._tried = [[]] # Keeps track of the scenarios already tried at each step in the trace - self._trace = [] # Choice trace, when was which scenario inserted (e.g. ['1', '2.1', '3', '2.0']) self._snapshots = [] # Keeps details for elements in trace self._open_refinements = [] @property def model(self): """returns the model as it is at the end of the current trace""" - return self._snapshots[-1].model if self._trace else None + return self._snapshots[-1].model if self._snapshots else None @property def tried(self): """returns the indices that were rejected or previously inserted at the current position""" return tuple(self._tried[-1]) - def coverage_reached(self): - return all(self._c_pool) - @property def coverage_drought(self): """Number of scenarios since last new coverage""" return self._snapshots[-1].coverage_drought if self._snapshots else 0 + @property + def id_trace(self): + return [snap.id for snap in self._snapshots] + + @property + def active_refinements(self): + return self._open_refinements[:] + + def coverage_reached(self): + return all(self.c_pool.values()) + def get_trace(self): return [snap.scenario for snap in self._snapshots] def next_candidate(self, retry=False): - for i in range(len(self._c_pool)): - if i not in self._tried[-1] and not self._is_refinement_active(i) and self.count(i) == 0: + for i in self.c_pool: + if i not in self._tried[-1] and not self.is_refinement_active(i) and self.count(i) == 0: return i if not retry: return None - for i in range(len(self._c_pool)): - if i not in self._tried[-1] and not self._is_refinement_active(i): + for i in self.c_pool: + if i not in self._tried[-1] and not self.is_refinement_active(i): return i return None def count(self, index): - """Count the number of times the index is present in the trace. - unfinished partial scenarios are excluded.""" - return self._trace.count(str(index)) + self._trace.count(str(f"{index}.0")) + """ + Count the number of times the index is present in the trace. + unfinished partial scenarios are excluded. + """ + return self.c_pool[index] def highest_part(self, index): - """Given the current trace and an index, returns the highest part number of an ongoing - refinement for the related scenario. Returns 0 when there is no refinement active.""" - for i in range(1, len(self._trace)+1): - if self._trace[-i] == f'{index}': + """ + Given the current trace and an index, returns the highest part number of an ongoing + refinement for the related scenario. Returns 0 when there is no refinement active. + """ + for i in range(1, len(self.id_trace)+1): + if self.id_trace[-i] == f'{index}': return 0 - if self._trace[-i].startswith(f'{index}.'): - return int(self._trace[-i].split('.')[1]) + if self.id_trace[-i].startswith(f'{index}.'): + return int(self.id_trace[-i].split('.')[1]) return 0 - def _is_refinement_active(self, index): - return self.highest_part(index) != 0 + def is_refinement_active(self, index=None): + """ + When called with an index, returns True if that scenario is currently being refined + When index is ommitted, return True if any refinement is active + """ + if index is None: + return self._open_refinements != [] + else: + return self.highest_part(index) != 0 - def find_scenarios_with_active_refinement(self): - scenarios = [] - for i in self._open_refinements: - index = -self._trace[::-1].index(f'{i}.1')-1 - scenarios.append(self._snapshots[index].scenario) - return scenarios + def get_remainder(self, index): + """ + When pushing a partial scenario, the remainder can be passed along for safe keeping. + This method retrieves the remainder for the last part that was pushed. + """ + last_part = self.highest_part(index) + index = -self.id_trace[::-1].index(f'{index}.{last_part}')-1 + return self._snapshots[index].remainder def reject_scenario(self, i_scenario): """Trying a scenario excludes it from further cadidacy on this level""" self._tried[-1].append(i_scenario) def confirm_full_scenario(self, index, scenario, model): - if not self._c_pool[index]: - self._c_pool[index] = True - c_drought = 0 - else: - c_drought = self.coverage_drought+1 - if self._is_refinement_active(index): + c_drought = 0 if self.c_pool[index] == 0 else self.coverage_drought+1 + self.c_pool[index] += 1 + if self.is_refinement_active(index): id = f"{index}.0" self._open_refinements.pop() else: id = str(index) self._tried[-1].append(index) self._tried.append([]) - self._trace.append(id) - self._snapshots.append(TraceSnapShot(id, scenario, model, c_drought)) + self._snapshots.append(TraceSnapShot(id, scenario, model, drought=c_drought)) - def push_partial_scenario(self, index, scenario, model): - if self._is_refinement_active(index): + def push_partial_scenario(self, index, scenario, model, remainder=None): + if self.is_refinement_active(index): id = f"{index}.{self.highest_part(index)+1}" else: id = f"{index}.1" self._tried[-1].append(index) - self._tried.append([]) self._open_refinements.append(index) - self._trace.append(id) - self._snapshots.append(TraceSnapShot(id, scenario, model, self.coverage_drought)) + self._tried.append([]) + self._snapshots.append(TraceSnapShot(id, scenario, model, remainder, self.coverage_drought)) def can_rewind(self): - return len(self._trace) > 0 + return len(self._snapshots) > 0 def rewind(self): - id = self._trace.pop() + id = self._snapshots[-1].id index = int(id.split('.')[0]) + self._snapshots.pop() if id.endswith('.0'): - self._snapshots.pop() + self.c_pool[index] -= 1 self._open_refinements.append(index) - while self._trace[-1] != f"{index}.1": + while self._snapshots[-1].id != f"{index}.1": self.rewind() return self.rewind() - self._snapshots.pop() - if '.' not in id or id.endswith('.1'): - if self.count(index) == 0: - self._c_pool[index] = False - self._tried.pop() - if id.endswith('.1'): - self._open_refinements.pop() + self._tried.pop() + if '.' not in id: + self.c_pool[index] -= 1 + if id.endswith('.1'): + self._open_refinements.pop() return self._snapshots[-1] if self._snapshots else None def __iter__(self): @@ -159,9 +175,10 @@ def __len__(self): class TraceSnapShot: - def __init__(self, id, inserted_scenario, model_state, drought=0): + def __init__(self, id, inserted_scenario, model_state, remainder=None, drought=0): self.id = id self.scenario = inserted_scenario + self.remainder = remainder self._model = model_state.copy() self.coverage_drought = drought diff --git a/utest/test_tracestate.py b/utest/test_tracestate.py index 5bfb63a3..77c1462a 100644 --- a/utest/test_tracestate.py +++ b/utest/test_tracestate.py @@ -36,69 +36,72 @@ class TestTraceState(unittest.TestCase): def test_an_empty_tracestate_doesnt_do_so_much(self): - ts = TraceState(0) + ts = TraceState([]) self.assertIs(ts.next_candidate(), None) self.assertIs(ts.coverage_reached(), True) self.assertEqual(ts.get_trace(), []) self.assertIs(ts.can_rewind(), False) def test_completing_single_size_trace(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) self.assertIs(ts.coverage_reached(), False) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertIs(ts.coverage_reached(), True) self.assertEqual(ts.get_trace(), ['one']) def test_confirming_excludes_scenario_from_candidacy(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) + ts.confirm_full_scenario(1, 'one', {}) self.assertIs(ts.next_candidate(), None) def test_trying_excludes_scenario_from_candidacy(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) - ts.reject_scenario(0) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) + ts.reject_scenario(1) self.assertIs(ts.next_candidate(), None) def test_scenario_still_excluded_from_candidacy_after_rewind(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) + ts.confirm_full_scenario(1, 'one', {}) ts.rewind() self.assertIs(ts.next_candidate(), None) def test_candidates_come_in_order_when_accepted(self): - ts = TraceState(3) + ts = TraceState([10, 20, 30]) candidates = [] - for scenario in range(3): + for _ in range(3): candidates.append(ts.next_candidate()) - ts.confirm_full_scenario(candidates[-1], scenario, {}) + ts.confirm_full_scenario(candidates[-1], 'scenario', {}) candidates.append(ts.next_candidate()) - self.assertEqual(candidates, [0, 1, 2, None]) + self.assertEqual(candidates, [10, 20, 30, None]) + + def test_scenarios_must_be_uniquely_identifiable(self): + self.assertRaises(ValueError, TraceState, [1, 2, 3, 2]) def test_candidates_come_in_order_when_rejected(self): - ts = TraceState(3) + ts = TraceState([10, 20, 30]) candidates = [] for _ in range(3): candidates.append(ts.next_candidate()) ts.reject_scenario(candidates[-1]) candidates.append(ts.next_candidate()) - self.assertEqual(candidates, [0, 1, 2, None]) + self.assertEqual(candidates, [10, 20, 30, None]) def test_rejected_scenarios_are_candidates_for_new_positions(self): - ts = TraceState(3) + ts = TraceState([1, 2, 3]) candidates = [] - ts.reject_scenario(0) - for scenario in range(3): + ts.reject_scenario(1) + for _ in range(3): candidates.append(ts.next_candidate()) - ts.confirm_full_scenario(candidates[-1], scenario, {}) + ts.confirm_full_scenario(candidates[-1], 'scenario', {}) candidates.append(ts.next_candidate()) - self.assertEqual(candidates, [1, 0, 2, None]) + self.assertEqual(candidates, [2, 1, 3, None]) def test_previously_confirmed_scenarios_can_be_retried_if_no_new_candidates_exist(self): - ts = TraceState(3) + ts = TraceState(range(3)) first_candidate = ts.next_candidate(retry=True) ts.confirm_full_scenario(first_candidate, 'one', {}) ts.reject_scenario(ts.next_candidate(retry=True)) @@ -113,18 +116,18 @@ def test_previously_confirmed_scenarios_can_be_retried_if_no_new_candidates_exis self.assertEqual(ts.get_trace(), ['one', 'one', 'two', 'three']) def test_retry_can_continue_once_coverage_is_reached(self): - ts = TraceState(3) + ts = TraceState([1, 2, 3]) ts.confirm_full_scenario(ts.next_candidate(retry=True), 'one', {}) ts.confirm_full_scenario(ts.next_candidate(retry=True), 'two', {}) ts.confirm_full_scenario(ts.next_candidate(retry=True), 'three', {}) self.assertTrue(ts.coverage_reached()) - self.assertEqual(ts.next_candidate(retry=True), 0) - ts.reject_scenario(0) self.assertEqual(ts.next_candidate(retry=True), 1) + ts.reject_scenario(1) + self.assertEqual(ts.next_candidate(retry=True), 2) self.assertEqual(ts.next_candidate(retry=False), None) def test_count_scenario_repetitions(self): - ts = TraceState(2) + ts = TraceState([1, 2]) first = ts.next_candidate() self.assertEqual(ts.count(first), 0) ts.confirm_full_scenario(first, 'one', {}) @@ -133,8 +136,8 @@ def test_count_scenario_repetitions(self): self.assertEqual(ts.count(first), 2) def test_rewind_single_available_scenario(self): - ts = TraceState(1) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + ts.confirm_full_scenario(1, 'one', {}) self.assertIs(ts.coverage_reached(), True) self.assertIs(ts.can_rewind(), True) ts.rewind() @@ -144,38 +147,38 @@ def test_rewind_single_available_scenario(self): self.assertEqual(ts.get_trace(), []) def test_rewind_returns_none_after_rewinding_last_step(self): - ts = TraceState(1) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + ts.confirm_full_scenario(1, 'one', {}) self.assertIs(ts.rewind(), None) - def test_traces_can_have_multiple_sceanrios(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'foo', dict(a=1)) + def test_traces_can_have_multiple_scenarios(self): + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'foo', dict(a=1)) self.assertIs(ts.coverage_reached(), False) - ts.confirm_full_scenario(1, 'bar', dict(b=2)) + ts.confirm_full_scenario(2, 'bar', dict(b=2)) self.assertIs(ts.coverage_reached(), True) self.assertEqual(ts.get_trace(), ['foo', 'bar']) def test_rewind_returns_snapshot_of_the_step_before(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'foo', dict(a=1)) - ts.confirm_full_scenario(1, 'bar', dict(b=2)) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'foo', dict(a=1)) + ts.confirm_full_scenario(2, 'bar', dict(b=2)) tail = ts.rewind() - self.assertEqual(tail.id, '0') + self.assertEqual(tail.id, '1') self.assertEqual(tail.scenario, 'foo') self.assertEqual(tail.model, dict(a=1)) def test_completing_size_three_trace(self): - ts = TraceState(3) - ts.confirm_full_scenario(ts.next_candidate(), 1, {}) - ts.confirm_full_scenario(ts.next_candidate(), 2, {}) + ts = TraceState(range(3)) + ts.confirm_full_scenario(ts.next_candidate(), 'one', {}) + ts.confirm_full_scenario(ts.next_candidate(), 'two', {}) self.assertIs(ts.coverage_reached(), False) - ts.confirm_full_scenario(ts.next_candidate(), 3, {}) + ts.confirm_full_scenario(ts.next_candidate(), 'three', {}) self.assertIs(ts.coverage_reached(), True) - self.assertEqual(ts.get_trace(), [1, 2, 3]) + self.assertEqual(ts.get_trace(), ['one', 'two', 'three']) def test_completing_size_three_trace_after_reject(self): - ts = TraceState(3) + ts = TraceState(range(3)) first = ts.next_candidate() ts.confirm_full_scenario(first, first, {}) rejected = ts.next_candidate() @@ -190,7 +193,7 @@ def test_completing_size_three_trace_after_reject(self): self.assertEqual(ts.get_trace(), [first, third, second]) def test_completing_size_three_trace_after_rewind(self): - ts = TraceState(3) + ts = TraceState(range(3)) first = ts.next_candidate() ts.confirm_full_scenario(first, first, {}) reject2 = ts.next_candidate() @@ -211,16 +214,16 @@ def test_completing_size_three_trace_after_rewind(self): self.assertEqual(ts.get_trace(), [retry_first, retry_second, retry_third]) def test_highest_part_when_index_not_present(self): - ts = TraceState(1) - self.assertEqual(ts.highest_part(0), 0) + ts = TraceState([1]) + self.assertEqual(ts.highest_part(1), 0) def test_highest_part_for_non_partial_sceanrio(self): - ts = TraceState(1) - ts.confirm_full_scenario(0, 'one', {}) - self.assertEqual(ts.highest_part(0), 0) + ts = TraceState([1]) + ts.confirm_full_scenario(1, 'one', {}) + self.assertEqual(ts.highest_part(1), 0) def test_model_property_takes_model_from_tail(self): - ts = TraceState(2) + ts = TraceState(range(2)) ts.confirm_full_scenario(ts.next_candidate(), 'one', dict(a=1)) ts.confirm_full_scenario(ts.next_candidate(), 'two', dict(b=2)) self.assertEqual(ts.model, dict(b=2)) @@ -228,35 +231,35 @@ def test_model_property_takes_model_from_tail(self): self.assertEqual(ts.model, dict(a=1)) def test_no_model_from_empty_trace(self): - ts = TraceState(1) + ts = TraceState([1]) self.assertIs(ts.model, None) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertIsNotNone(ts.model) ts.rewind() self.assertIs(ts.model, None) def test_tried_property_starts_empty(self): - ts = TraceState(1) + ts = TraceState([1]) self.assertEqual(ts.tried, ()) def test_rejected_scenarios_are_tried(self): - ts = TraceState(1) - ts.reject_scenario(0) - self.assertEqual(ts.tried, (0,)) + ts = TraceState([1]) + ts.reject_scenario(1) + self.assertEqual(ts.tried, (1,)) def test_confirmed_scenario_is_tried_and_triggers_next_step(self): - ts = TraceState(1) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1]) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.tried, ()) ts.rewind() - self.assertEqual(ts.tried, (0,)) + self.assertEqual(ts.tried, (1,)) def test_can_iterate_over_tracestate_snapshots(self): - ts = TraceState(3) + ts = TraceState([1, 2, 3]) ts.confirm_full_scenario(ts.next_candidate(), 'one', dict(a=1)) ts.confirm_full_scenario(ts.next_candidate(), 'two', dict(b=2)) ts.confirm_full_scenario(ts.next_candidate(), 'three', dict(c=3)) - for act, exp in zip(ts, ['0', '1', '2']): + for act, exp in zip(ts, ['1', '2', '3']): self.assertEqual(act.id, exp) for act, exp in zip(ts, ['one', 'two', 'three']): self.assertEqual(act.scenario, exp) @@ -264,18 +267,18 @@ def test_can_iterate_over_tracestate_snapshots(self): self.assertEqual(act.model, exp) def test_can_index_tracestate_snapshots(self): - ts = TraceState(3) + ts = TraceState([1, 2, 3]) ts.confirm_full_scenario(ts.next_candidate(), 'one', dict(a=1)) ts.confirm_full_scenario(ts.next_candidate(), 'two', dict(b=2)) ts.confirm_full_scenario(ts.next_candidate(), 'three', dict(c=3)) - self.assertEqual(ts[0].id, '0') + self.assertEqual(ts[0].id, '1') self.assertEqual(ts[1].scenario, 'two') self.assertEqual(ts[2].model, dict(c=3)) self.assertEqual(ts[-1].scenario, 'three') - self.assertEqual([s.id for s in ts[1:]], ['1', '2']) + self.assertEqual([s.id for s in ts[1:]], ['2', '3']) def test_adding_coverage_prevents_drought(self): - ts = TraceState(3) + ts = TraceState(range(3)) ts.confirm_full_scenario(ts.next_candidate(), 'one', {}) self.assertEqual(ts.coverage_drought, 0) ts.confirm_full_scenario(ts.next_candidate(), 'two', {}) @@ -284,30 +287,30 @@ def test_adding_coverage_prevents_drought(self): self.assertEqual(ts.coverage_drought, 0) def test_repeated_scenarios_increases_drought(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 0) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 1) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 2) def test_drought_is_reset_with_new_coverage(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 0) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 1) - ts.confirm_full_scenario(1, 'two', {}) + ts.confirm_full_scenario(2, 'two', {}) self.assertEqual(ts.coverage_drought, 0) def test_rewind_includes_drought_update(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one', {}) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 0) - ts.confirm_full_scenario(0, 'one', {}) + ts.confirm_full_scenario(1, 'one', {}) self.assertEqual(ts.coverage_drought, 1) - ts.confirm_full_scenario(1, 'two', {}) + ts.confirm_full_scenario(2, 'two', {}) self.assertEqual(ts.coverage_drought, 0) ts.rewind() self.assertEqual(ts.coverage_drought, 1) @@ -317,40 +320,40 @@ def test_rewind_includes_drought_update(self): class TestPartialScenarios(unittest.TestCase): def test_push_partial_does_not_complete_coverage(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertEqual(ts.get_trace(), ['part1']) self.assertIs(ts.coverage_reached(), False) def test_confirm_full_after_push_partial_completes_coverage(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertIs(ts.coverage_reached(), False) - ts.push_partial_scenario(0, 'part2', {}) + ts.push_partial_scenario(1, 'part2', {}) self.assertIs(ts.coverage_reached(), False) - ts.confirm_full_scenario(0, 'remainder', {}) + ts.confirm_full_scenario(1, 'remainder', {}) self.assertEqual(ts.get_trace(), ['part1', 'part2', 'remainder']) self.assertIs(ts.coverage_reached(), True) def test_scenario_unavailble_once_pushed_partial(self): - ts = TraceState(1) + ts = TraceState([1]) candidate = ts.next_candidate() ts.push_partial_scenario(candidate, 'part1', {}) self.assertIs(ts.next_candidate(), None) def test_rewind_of_single_part(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertEqual(ts.get_trace(), ['part1']) self.assertIs(ts.can_rewind(), True) ts.rewind() self.assertEqual(ts.get_trace(), []) def test_rewind_all_parts(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertIs(ts.coverage_reached(), False) - ts.push_partial_scenario(0, 'part2', {}) + ts.push_partial_scenario(1, 'part2', {}) self.assertIs(ts.coverage_reached(), False) self.assertEqual(ts.get_trace(), ['part1', 'part2']) self.assertIs(ts.next_candidate(), None) @@ -363,84 +366,103 @@ def test_rewind_all_parts(self): self.assertIs(ts.can_rewind(), False) def test_partial_scenario_still_excluded_from_candidacy_after_rewind(self): - ts = TraceState(1) - self.assertEqual(ts.next_candidate(), 0) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + self.assertEqual(ts.next_candidate(), 1) + ts.push_partial_scenario(1, 'part1', {}) ts.rewind() self.assertIs(ts.next_candidate(), None) def test_rewind_to_partial_scenario(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', dict(a=1)) - ts.push_partial_scenario(0, 'part2', dict(b=2)) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', dict(a=1)) + ts.push_partial_scenario(1, 'part2', dict(b=2)) snapshot = ts.rewind() - self.assertEqual(snapshot.id, '0.1') + self.assertEqual(snapshot.id, '1.1') self.assertEqual(snapshot.scenario, 'part1') self.assertEqual(snapshot.model, dict(a=1)) def test_rewind_last_part(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one', dict(a=1)) - ts.push_partial_scenario(1, 'part1', dict(b=2)) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one', dict(a=1)) + ts.push_partial_scenario(2, 'part1', dict(b=2)) snapshot = ts.rewind() self.assertEqual(ts.get_trace(), ['one']) - self.assertEqual(snapshot.id, '0') + self.assertEqual(snapshot.id, '1') self.assertEqual(snapshot.scenario, 'one') self.assertEqual(snapshot.model, dict(a=1)) def test_rewind_all_parts_of_completed_scenario_at_once(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', dict(a=1)) - ts.push_partial_scenario(0, 'part2', dict(b=2)) - ts.confirm_full_scenario(0, 'remainder', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', dict(a=1)) + ts.push_partial_scenario(1, 'part2', dict(b=2)) + ts.confirm_full_scenario(1, 'remainder', {}) tail = ts.rewind() self.assertEqual(ts.get_trace(), []) self.assertIs(ts.next_candidate(), None) self.assertIs(tail, None) + def test_tried_entries_after_rewind(self): + ts = TraceState([1, 2, 10, 11, 12, 20, 21]) + ts.push_partial_scenario(1, 'part1', {}) + ts.reject_scenario(10) + ts.reject_scenario(11) + ts.confirm_full_scenario(2, 'two', {}) + ts.push_partial_scenario(1, 'part2', {}) + ts.reject_scenario(20) + ts.reject_scenario(21) + self.assertEqual(ts.tried, (20, 21)) + ts.rewind() + self.assertEqual(ts.tried, ()) + ts.rewind() + self.assertEqual(ts.tried, (10, 11, 2)) + ts.reject_scenario(12) + self.assertEqual(ts.tried, (10, 11, 2, 12)) + ts.rewind() + self.assertEqual(ts.tried, (1,)) + def test_highest_part_after_first_part(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) - self.assertEqual(ts[-1].id, '0.1') - self.assertEqual(ts.highest_part(0), 1) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) + self.assertEqual(ts[-1].id, '1.1') + self.assertEqual(ts.highest_part(1), 1) def test_highest_part_after_multiple_parts(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) - ts.push_partial_scenario(0, 'part2', {}) - self.assertEqual(ts[-1].id, '0.2') - self.assertEqual(ts.highest_part(0), 2) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) + ts.push_partial_scenario(1, 'part2', {}) + self.assertEqual(ts[-1].id, '1.2') + self.assertEqual(ts.highest_part(1), 2) def test_highest_part_after_completing_multiple_parts(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) - ts.push_partial_scenario(0, 'part2', {}) - ts.confirm_full_scenario(0, 'remainder', {}) - self.assertEqual(ts[-1].id, '0.0') - self.assertEqual(ts.highest_part(0), 0) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) + ts.push_partial_scenario(1, 'part2', {}) + ts.confirm_full_scenario(1, 'remainder', {}) + self.assertEqual(ts[-1].id, '1.0') + self.assertEqual(ts.highest_part(1), 0) def test_highest_part_after_partial_rewind(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) - ts.push_partial_scenario(0, 'part2', {}) - self.assertEqual(ts.highest_part(0), 2) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) + ts.push_partial_scenario(1, 'part2', {}) + self.assertEqual(ts.highest_part(1), 2) ts.rewind() - self.assertEqual(ts.highest_part(0), 1) + self.assertEqual(ts.highest_part(1), 1) ts.rewind() - self.assertEqual(ts.highest_part(0), 0) + self.assertEqual(ts.highest_part(1), 0) def test_highest_part_is_0_when_no_refinement_is_ongoing(self): - ts = TraceState(1) - self.assertEqual(ts.highest_part(0), 0) - ts.push_partial_scenario(0, 'part1', {}) - ts.push_partial_scenario(0, 'part2', {}) - ts.confirm_full_scenario(0, 'remainder', {}) - self.assertEqual(ts.highest_part(0), 0) + ts = TraceState([1]) + self.assertEqual(ts.highest_part(1), 0) + ts.push_partial_scenario(1, 'part1', {}) + ts.push_partial_scenario(1, 'part2', {}) + ts.confirm_full_scenario(1, 'remainder', {}) + self.assertEqual(ts.highest_part(1), 0) ts.rewind() - self.assertEqual(ts.highest_part(0), 0) + self.assertEqual(ts.highest_part(1), 0) def test_count_scenario_repetitions_with_partials(self): - ts = TraceState(2) + ts = TraceState(range(2)) first = ts.next_candidate() self.assertEqual(ts.count(first), 0) ts.confirm_full_scenario(first, 'full', {}) @@ -458,36 +480,36 @@ def test_count_scenario_repetitions_with_partials(self): self.assertEqual(ts.count(second), 1) def test_partial_scenario_is_tried_without_finishing(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', {}) + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', {}) self.assertEqual(ts.tried, ()) ts.rewind() - self.assertEqual(ts.tried, (0,)) + self.assertEqual(ts.tried, (1,)) def test_get_last_snapshot_by_index(self): - ts = TraceState(1) - ts.push_partial_scenario(0, 'part1', dict(a=1)) - self.assertEqual(ts[-1].id, '0.1') + ts = TraceState([1]) + ts.push_partial_scenario(1, 'part1', dict(a=1)) + self.assertEqual(ts[-1].id, '1.1') self.assertEqual(ts[-1].scenario, 'part1') self.assertEqual(ts[-1].model, dict(a=1)) self.assertEqual(ts[-1].coverage_drought, 0) - ts.push_partial_scenario(0, 'part2', dict(b=2)) - ts.confirm_full_scenario(0, 'remainder', dict(c=3)) - self.assertEqual(ts[-1].id, '0.0') + ts.push_partial_scenario(1, 'part2', dict(b=2)) + ts.confirm_full_scenario(1, 'remainder', dict(c=3)) + self.assertEqual(ts[-1].id, '1.0') self.assertEqual(ts[-1].scenario, 'remainder') self.assertEqual(ts[-1].model, dict(c=3)) self.assertEqual(ts[-1].coverage_drought, 0) def test_only_completed_scenarios_affect_drought(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'one full', {}) - ts.push_partial_scenario(0, 'one part1', {}) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'one full', {}) + ts.push_partial_scenario(1, 'one part1', {}) self.assertEqual(ts.coverage_drought, 0) - ts.confirm_full_scenario(0, 'one remainder', {}) + ts.confirm_full_scenario(1, 'one remainder', {}) self.assertEqual(ts.coverage_drought, 1) - ts.push_partial_scenario(1, 'two part1', {}) + ts.push_partial_scenario(2, 'two part1', {}) self.assertEqual(ts.coverage_drought, 1) - ts.confirm_full_scenario(1, 'two remainder', {}) + ts.confirm_full_scenario(2, 'two remainder', {}) self.assertEqual(ts.coverage_drought, 0) diff --git a/utest/test_tracestate_refinement.py b/utest/test_tracestate_refinement.py index 2cbdca61..1b997e6b 100644 --- a/utest/test_tracestate_refinement.py +++ b/utest/test_tracestate_refinement.py @@ -36,7 +36,7 @@ class TestTraceStateRefinement(unittest.TestCase): def test_single_step_refinement(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate() @@ -49,7 +49,7 @@ def test_single_step_refinement(self): self.assertEqual(ts.get_trace(), ['T1.1', 'B1', 'T1.0']) def test_rewind_step_with_refinement(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate() @@ -62,7 +62,7 @@ def test_rewind_step_with_refinement(self): self.assertIs(ts.coverage_reached(), False) def test_rewind_refinement(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate() @@ -74,7 +74,7 @@ def test_rewind_refinement(self): self.assertIs(ts.coverage_reached(), False) def test_refinement_at_two_steps(self): - ts = TraceState(3) + ts = TraceState(range(3)) outer = ts.next_candidate() ts.push_partial_scenario(outer, 'T1.1', {}) ts.confirm_full_scenario(ts.next_candidate(), 'B1', {}) @@ -88,7 +88,7 @@ def test_refinement_at_two_steps(self): self.assertEqual(ts.get_trace(), ['T1.1', 'B1', 'T1.2', 'B2', 'T1.0']) def test_rewind_to_swap_refinements(self): - ts = TraceState(3) + ts = TraceState(range(3)) outer = ts.next_candidate() ts.push_partial_scenario(outer, 'T1.1', {}) inner1 = ts.next_candidate() @@ -112,7 +112,7 @@ def test_rewind_to_swap_refinements(self): self.assertEqual(ts.get_trace(), ['T1.1', 'B2', 'T1.2', 'B1', 'T1.0']) def test_rewind_partial_scenario_to_before_outer(self): - ts = TraceState(4) + ts = TraceState(range(4)) head = ts.next_candidate() ts.confirm_full_scenario(head, 'HEAD', {}) outer = ts.next_candidate() @@ -130,7 +130,7 @@ def test_rewind_partial_scenario_to_before_outer(self): self.assertNotIn(ts.next_candidate(), [head, outer]) def test_rewind_scenario_with_double_refinement_as_one(self): - ts = TraceState(4) + ts = TraceState(range(4)) head = ts.next_candidate() ts.confirm_full_scenario(head, 'HEAD', {}) outer = ts.next_candidate() @@ -148,7 +148,7 @@ def test_rewind_scenario_with_double_refinement_as_one(self): self.assertNotEqual(ts.next_candidate(), [head, outer]) def test_nested_refinement(self): - ts = TraceState(3) + ts = TraceState(range(3)) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) middle_level = ts.next_candidate() @@ -163,11 +163,11 @@ def test_nested_refinement(self): self.assertEqual(ts.get_trace(), ['T1.1', 'M1.1', 'B1', 'M1.0', 'T1.0']) def test_rewind_to_swap_nested_refinement(self): - ts = TraceState(3) + ts = TraceState(range(3)) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) lower_level = ts.next_candidate() - ts.push_partial_scenario(lower_level, 'B1', {}) + ts.push_partial_scenario(lower_level, 'B1.1', {}) middle_level = ts.next_candidate() ts.reject_scenario(middle_level) self.assertIs(ts.next_candidate(), None) @@ -183,7 +183,7 @@ def test_rewind_to_swap_nested_refinement(self): self.assertEqual(ts.get_trace(), ['T1.1', 'M1.1', 'B1', 'M1.0', 'T1.0']) def test_rewind_nested_refinement_as_one(self): - ts = TraceState(4) + ts = TraceState(range(4)) ts.confirm_full_scenario(ts.next_candidate(), 'HEAD', {}) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) @@ -199,7 +199,7 @@ def test_rewind_nested_refinement_as_one(self): self.assertEqual(ts.get_trace(), ['HEAD', 'T1.1']) def test_rewind_scenario_with_nested_refinement_as_one(self): - ts = TraceState(3) + ts = TraceState(range(3)) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) middle_level = ts.next_candidate() @@ -215,7 +215,7 @@ def test_rewind_scenario_with_nested_refinement_as_one(self): self.assertEqual(ts.tried, (top_level,)) def test_highest_parts_from_refined_scenario(self): - ts = TraceState(4) + ts = TraceState(range(4)) top_level = ts.next_candidate() ts.push_partial_scenario(top_level, 'T1.1', {}) middle_level_1 = ts.next_candidate() @@ -245,7 +245,7 @@ def test_highest_parts_from_refined_scenario(self): 'T1.2', 'M2.1', 'M2.2', 'M2.3', 'M2.0', 'T1.0']) def test_refinement_can_resolve_drought(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.confirm_full_scenario(candidate1, 'T1', {}) ts.confirm_full_scenario(candidate1, 'T1', {}) @@ -261,7 +261,7 @@ def test_refinement_can_resolve_drought(self): self.assertEqual(ts.coverage_drought, 2) def test_scenario_cannot_refine_itself(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate() ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate() @@ -270,7 +270,7 @@ def test_scenario_cannot_refine_itself(self): self.assertIsNone(ts.next_candidate()) def test_scenario_cannot_refine_itself_with_repetition(self): - ts = TraceState(2) + ts = TraceState(range(2)) candidate1 = ts.next_candidate(retry=True) ts.push_partial_scenario(candidate1, 'T1.1', {}) candidate2 = ts.next_candidate(retry=True) @@ -279,66 +279,138 @@ def test_scenario_cannot_refine_itself_with_repetition(self): self.assertIsNone(ts.next_candidate(retry=True)) def test_initially_no_scenario_is_in_refinement(self): - ts = TraceState(1) - self.assertEqual(ts.find_scenarios_with_active_refinement(), []) + ts = TraceState([1]) + self.assertEqual(ts.active_refinements, []) def test_full_scenario_is_not_reported_as_refinement(self): - ts = TraceState(2) - ts.confirm_full_scenario(0, 'S1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), []) + ts = TraceState([1, 2]) + ts.confirm_full_scenario(1, 'S1', {}) + self.assertEqual(ts.active_refinements, []) def test_push_partial_opens_refinement(self): - ts = TraceState(4) - ts.push_partial_scenario(0, 'S1.1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['S1.1']) + ts = TraceState([1, 2]) + ts.push_partial_scenario(1, 'S1.1', {}) + self.assertEqual(ts.active_refinements, [1]) def test_nested_refinements_are_all_reported_as_in_refinement(self): - ts = TraceState(4) - ts.push_partial_scenario(0, 'T1.1', {}) - ts.push_partial_scenario(1, 'M1.1', {}) - ts.push_partial_scenario(2, 'B1.1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1', 'B1.1']) + ts = TraceState([1, 2, 3, 4]) + ts.push_partial_scenario(1, 'T1.1', {}) + ts.push_partial_scenario(2, 'M1.1', {}) + ts.push_partial_scenario(3, 'B1.1', {}) + self.assertEqual(ts.active_refinements, [1, 2, 3]) def test_closing_refinement_removes_it_from_list(self): - ts = TraceState(4) - ts.push_partial_scenario(0, 'T1.1', {}) - ts.push_partial_scenario(1, 'M1.1', {}) - ts.push_partial_scenario(2, 'B1.1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1', 'B1.1']) - ts.confirm_full_scenario(2, 'B1.0', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1']) + ts = TraceState([1, 2, 3, 4]) + ts.push_partial_scenario(1, 'T1.1', {}) + ts.push_partial_scenario(2, 'M1.1', {}) + ts.push_partial_scenario(3, 'B1.1', {}) + self.assertEqual(ts.active_refinements, [1, 2, 3]) + ts.confirm_full_scenario(3, 'B1.0', {}) + self.assertEqual(ts.active_refinements, [1, 2]) def test_multi_step_refinement_is_reported_only_once(self): - ts = TraceState(4) - ts.push_partial_scenario(0, 'T1.1', {}) - ts.push_partial_scenario(1, 'M1.1', {}) - ts.confirm_full_scenario(2, 'B1', {}) - ts.push_partial_scenario(1, 'M1.2', {}) - ts.confirm_full_scenario(3, 'B2', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1']) + ts = TraceState([1, 2, 3, 4]) + ts.push_partial_scenario(1, 'T1.1', {}) + ts.push_partial_scenario(2, 'M1.1', {}) + ts.confirm_full_scenario(3, 'B1', {}) + ts.push_partial_scenario(2, 'M1.2', {}) + ts.confirm_full_scenario(4, 'B2', {}) + self.assertEqual(ts.active_refinements, [1, 2]) def test_rewind_open_refinement_removes_it_from_list(self): - ts = TraceState(4) - ts.push_partial_scenario(0, 'T1.1', {}) - ts.push_partial_scenario(1, 'M1.1', {}) - ts.push_partial_scenario(2, 'B1.1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1', 'B1.1']) + ts = TraceState([1, 2, 3, 4]) + ts.push_partial_scenario(1, 'T1.1', {}) + ts.push_partial_scenario(2, 'M1.1', {}) + ts.push_partial_scenario(3, 'B1.1', {}) + self.assertEqual(ts.active_refinements, [1, 2, 3]) ts.rewind() - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T1.1', 'M1.1']) + self.assertEqual(ts.active_refinements, [1, 2]) def test_rewind_finished_scenario_with_refinement_removes_enclosed_refinements(self): - ts = TraceState(5) - ts.confirm_full_scenario(0, 'T1', {}) - ts.push_partial_scenario(1, 'T2.1', {}) - ts.push_partial_scenario(2, 'M1.1', {}) - ts.push_partial_scenario(3, 'B1.1', {}) - ts.confirm_full_scenario(4, 'S1', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T2.1', 'M1.1', 'B1.1']) - ts.confirm_full_scenario(3, 'B1.0', {}) - ts.confirm_full_scenario(2, 'M1.0', {}) - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T2.1']) + ts = TraceState([1, 2, 3, 4, 5]) + ts.confirm_full_scenario(1, 'T1', {}) + ts.push_partial_scenario(2, 'T2.1', {}) + ts.push_partial_scenario(3, 'M1.1', {}) + ts.push_partial_scenario(4, 'B1.1', {}) + ts.confirm_full_scenario(5, 'S1', {}) + self.assertEqual(ts.active_refinements, [2, 3, 4]) + ts.confirm_full_scenario(4, 'B1.0', {}) + ts.confirm_full_scenario(3, 'M1.0', {}) + self.assertEqual(ts.active_refinements, [2]) ts.rewind() # Middle including its Bottom refinement - self.assertEqual(ts.find_scenarios_with_active_refinement(), ['T2.1']) + self.assertEqual(ts.active_refinements, [2]) + + def test_is_refinement_active_no_index(self): + ts = TraceState([1, 2, 3]) + self.assertFalse(ts.is_refinement_active()) + ts.confirm_full_scenario(1, 'one', {}) + self.assertFalse(ts.is_refinement_active()) + ts.push_partial_scenario(2, 'part1', {}) + self.assertTrue(ts.is_refinement_active()) + ts.confirm_full_scenario(3, 'refinment', {}) + self.assertTrue(ts.is_refinement_active()) + ts.push_partial_scenario(2, 'part2', {}) + self.assertTrue(ts.is_refinement_active()) + ts.confirm_full_scenario(2, 'remainder', {}) + self.assertFalse(ts.is_refinement_active()) + ts.rewind() + self.assertFalse(ts.is_refinement_active()) + ts.rewind() + self.assertFalse(ts.is_refinement_active()) + + def test_is_refinement_active_by_index(self): + ts = TraceState([1, 2, 3]) + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + ts.confirm_full_scenario(1, 'one', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + ts.push_partial_scenario(2, 'part1', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + self.assertFalse(ts.is_refinement_active(3)) + ts.push_partial_scenario(3, 'refinement head', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + self.assertTrue(ts.is_refinement_active(3)) + ts.confirm_full_scenario(3, 'refinement tail', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + self.assertFalse(ts.is_refinement_active(3)) + ts.push_partial_scenario(2, 'part2', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + ts.rewind() + self.assertFalse(ts.is_refinement_active(1)) + self.assertTrue(ts.is_refinement_active(2)) + ts.confirm_full_scenario(2, 'remainder', {}) + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + ts.rewind() + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + ts.rewind() + self.assertFalse(ts.is_refinement_active(1)) + self.assertFalse(ts.is_refinement_active(2)) + + def test_remainder_can_be_set_and_retrieved(self): + ts = TraceState([1, 2]) + ts.push_partial_scenario(1, 'one part1', {}, 'one part2') + ts.push_partial_scenario(2, 'two part1', {}, 'two parts 2+3') + self.assertEqual(ts.get_remainder(1), 'one part2') + self.assertEqual(ts.get_remainder(2), 'two parts 2+3') + ts.push_partial_scenario(2, 'two part2', {}, 'two part3') + self.assertEqual(ts.get_remainder(2), 'two part3') + ts.rewind() + self.assertEqual(ts.get_remainder(2), 'two parts 2+3') + ts.push_partial_scenario(2, 'two part2', {}, 'two part3B') + self.assertEqual(ts.get_remainder(2), 'two part3B') + ts.confirm_full_scenario(2, 'two', {}) + self.assertEqual(ts.get_remainder(1), 'one part2') + self.assertIsNone(ts.get_remainder(2)) + ts.confirm_full_scenario(1, 'one', {}) + self.assertIsNone(ts.get_remainder(1)) + self.assertIsNone(ts.get_remainder(2)) if __name__ == '__main__':