@@ -1011,8 +1011,8 @@ def test_scan_not_mixed_with_other_families(self):
10111011 run .stage ()
10121012
10131013
1014- class TestResubmissionLifecycle (unittest .TestCase ):
1015- """Tests for #1: resubmission sets SUBMITTED status and clears flag ."""
1014+ class TestNoResubmissionLifecycle (unittest .TestCase ):
1015+ """Pipe runs must never resubmit — Q-state workers handle retried tasks ."""
10161016
10171017 def setUp (self ):
10181018 self .tmpdir = tempfile .mkdtemp (prefix = 'pipe_resub_test_' )
@@ -1021,11 +1021,11 @@ def setUp(self):
10211021 def tearDown (self ):
10221022 shutil .rmtree (self .tmpdir , ignore_errors = True )
10231023
1024- def test_resubmission_sets_submitted_status (self ):
1025- """After successful resubmission, pipe status should be SUBMITTED."""
1024+ def test_no_resubmission_even_with_retried_tasks (self ):
1025+ """Even when all workers are done and retried tasks remain,
1026+ poll_pipes must not resubmit a new scheduler job."""
10261027 tasks = [_make_task_spec (f'task_{ i } ' ) for i in range (3 )]
10271028 pipe = self .sched .pipe_coordinator .submit_pipe_run ('resub_test' , tasks )
1028- # Simulate retried tasks (attempt_index > 0) so reconcile flags resubmission
10291029 for task_id in ['task_0' , 'task_1' , 'task_2' ]:
10301030 now = time .time ()
10311031 update_task_state (pipe .pipe_root , task_id , new_status = TaskState .CLAIMED ,
@@ -1038,22 +1038,10 @@ def test_resubmission_sets_submitted_status(self):
10381038 claimed_at = None , lease_expires_at = None ,
10391039 started_at = None , ended_at = None , failure_class = None )
10401040 pipe .status = PipeRunState .RECONCILING
1041- # Mock submit_to_scheduler to succeed
1042- with patch .object (pipe , 'submit_to_scheduler' , return_value = ('submitted' , '12345' )):
1041+ with patch .object (pipe , 'submit_to_scheduler' , return_value = ('submitted' , '12345' )) as mock_submit :
10431042 self .sched .pipe_coordinator .poll_pipes ()
1044- self .assertEqual (pipe .status , PipeRunState .SUBMITTED )
1045- self .assertEqual (pipe .scheduler_job_id , '12345' )
1046- self .assertFalse (pipe ._needs_resubmission )
1047-
1048- def test_resubmission_clears_flag_on_failure (self ):
1049- """After failed resubmission, flag should still be cleared to avoid infinite loops."""
1050- tasks = [_make_task_spec (f'task_{ i } ' ) for i in range (3 )]
1051- pipe = self .sched .pipe_coordinator .submit_pipe_run ('resub_fail' , tasks )
1052- pipe ._needs_resubmission = True
1053- pipe .status = PipeRunState .RECONCILING
1054- with patch .object (pipe , 'submit_to_scheduler' , return_value = ('errored' , None )):
1055- self .sched .pipe_coordinator .poll_pipes ()
1056- self .assertFalse (pipe ._needs_resubmission )
1043+ mock_submit .assert_not_called ()
1044+ self .assertFalse (pipe .needs_resubmission )
10571045
10581046
10591047class TestShouldUsePipeOwnerType (unittest .TestCase ):
0 commit comments