Skip to content

Commit 6524c75

Browse files
authored
E2E Resiliency: More restarts & edge case handling (#1629)
* Handle scenario where restart happens on the last round Signed-off-by: noopur <noopur@intel.com> * Break if last round reached Signed-off-by: noopur <noopur@intel.com> * Corrected log msg Signed-off-by: noopur <noopur@intel.com> * Correction Signed-off-by: noopur <noopur@intel.com> * Using -9 Signed-off-by: noopur <noopur@intel.com> * Stop if the rounds are over Signed-off-by: noopur <noopur@intel.com> * Better logic to break the flow on reaching final round Signed-off-by: noopur <noopur@intel.com> --------- Signed-off-by: noopur <noopur@intel.com>
1 parent e1cd967 commit 6524c75

3 files changed

Lines changed: 39 additions & 36 deletions

File tree

tests/end_to_end/test_suites/tr_resiliency_tests.py

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -277,43 +277,42 @@ def _perform_restart_validate_rounds(fed_obj, db_file, total_rounds):
277277
db_file (str): Path to the database file
278278
total_rounds (int): Total number of rounds
279279
"""
280-
281-
init_round = fed_helper.get_current_round(db_file)
282-
log.info(f"Round number is {init_round} before restarts")
283-
284-
# Restart aggregator
285-
assert int_helper.restart_participants([fed_obj.aggregator])
286-
log.info("Aggregator restarted successfully")
287-
288-
assert (
289-
round_post_agg_restart := fed_helper.validate_round_increment(
290-
init_round,
291-
db_file,
292-
total_rounds,
280+
def _restart_and_validate(participants, current_round, db_file, total_rounds, description, attempt):
281+
"""
282+
Restarts the participants, validates the round increment, and checks if the total rounds are reached.
283+
"""
284+
assert int_helper.restart_participants(participants), f"Failed to restart {description} on attempt {attempt}"
285+
log.info(f"{description.capitalize()} restarted successfully on attempt {attempt}.")
286+
287+
new_round = fed_helper.validate_round_increment(current_round, db_file, total_rounds)
288+
assert new_round, f"Expected current round to be ahead of {current_round} after {description} restart on attempt {attempt}"
289+
return new_round
290+
291+
current_round = fed_helper.get_current_round(db_file)
292+
log.info(f"Round number is {current_round} before restarts")
293+
294+
for i in range(1, 4):
295+
# Restart aggregator and validate
296+
current_round = _restart_and_validate(
297+
[fed_obj.aggregator], current_round, db_file, total_rounds, "aggregator", attempt=i
293298
)
294-
), f"Expected current round to be ahead of {init_round} after aggregator restart"
295-
296-
# Restart collaborators
297-
assert int_helper.restart_participants(fed_obj.collaborators)
298-
log.info("Collaborators restarted successfully")
299+
# Stop further processing in case of final round.
300+
if current_round + 1 == total_rounds:
301+
break
299302

300-
assert (
301-
round_post_collab_restart := fed_helper.validate_round_increment(
302-
round_post_agg_restart,
303-
db_file,
304-
total_rounds,
303+
# Restart collaborators and validate
304+
current_round = _restart_and_validate(
305+
fed_obj.collaborators, current_round, db_file, total_rounds, "collaborators", attempt=i
305306
)
306-
), f"Expected current round to be ahead of {round_post_agg_restart} after collaborators restart"
307+
if current_round + 1 == total_rounds:
308+
break
307309

308-
# Restart all participants
309-
assert int_helper.restart_participants(fed_obj.collaborators + [fed_obj.aggregator])
310-
log.info("All participants restarted successfully")
311-
312-
assert fed_helper.validate_round_increment(
313-
round_post_collab_restart,
314-
db_file,
315-
total_rounds,
316-
), f"Expected current round to be ahead of {round_post_collab_restart} after all participants restart"
310+
# Restart all participants and validate
311+
current_round = _restart_and_validate(
312+
fed_obj.collaborators + [fed_obj.aggregator], current_round, db_file, total_rounds, "all participants", attempt=i
313+
)
314+
if current_round + 1 == total_rounds:
315+
break
317316

318317
log.info("Current round number is increasing after every restart as expected.")
319318

tests/end_to_end/utils/federation_helper.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ def _verify_completion_for_participant(
368368
# If None, it means the process is still running
369369
# This is applicable for native process only
370370
if participant.start_process:
371-
if participant.start_process.poll():
371+
if participant.start_process.poll() or not len(intr_helper.get_pids_for_active_command(participant.name)):
372372
log.info(f"No processes found for participant {participant.name}")
373373
break
374374
else:
@@ -1071,7 +1071,11 @@ def validate_round_increment(inp_round, database_file, total_rounds, timeout=300
10711071
if current_round > inp_round + 1:
10721072
log.info(f"Round number has increased from {inp_round} to {current_round}")
10731073
return current_round
1074-
log.info(f"Round number has not increased. Retrying in {sleep_interval} seconds...")
1074+
# Check if already at the final round (round no. index starts with 0)
1075+
if current_round + 1 == total_rounds:
1076+
log.info(f"Already at the final round")
1077+
return current_round
1078+
log.info(f"Round number has not increased from {inp_round}. Retrying in {sleep_interval} seconds...")
10751079
time.sleep(sleep_interval)
10761080
log.warning(f"Round number has not increased from {inp_round} after {timeout} seconds")
10771081
return False

tests/end_to_end/utils/interruption_helper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def stop_start_native_participant(participant, action):
7676
raise ex.ParticipantStopException(f"Invalid action {action}")
7777

7878
# Irrespective of the action, kill the processes to ensure clean state
79-
log.info(f"Kill the processes for {participant.name} if running to avoid conflicts")
79+
log.debug(f"Killing the processes (if running) for {participant.name} to avoid conflicts")
8080
participant.kill_process()
8181

8282
if action == "stop":

0 commit comments

Comments
 (0)