@@ -548,36 +548,33 @@ async def func(ctx, _blocks, block):
548548 assert "pending" in executed
549549
550550 async def test_concurrent_claim_blocked (self , state_manager ):
551- """Only one worker can hold the claim at a time."""
552- from fastloop .exceptions import LoopClaimError
553-
551+ """Only one worker can hold the claim at a time (mutual exclusion)."""
554552 workflow , _ = await state_manager .get_or_create_workflow (
555553 workflow_name = "test" ,
556554 blocks = [{"type" : "step" , "text" : "test" }],
557555 )
558556
559- claim_acquired = [False , False ]
557+ holding = [False , False ]
558+ overlap_detected = [False ]
559+ acquired_order = []
560560
561- async def worker1 ( ):
561+ async def worker ( idx : int ):
562562 async with state_manager .with_workflow_claim (workflow .workflow_id ):
563- claim_acquired [0 ] = True
564- await asyncio .sleep (2 )
565-
566- async def worker2 ():
567- await asyncio .sleep (0.1 )
568- try :
569- async with state_manager .with_workflow_claim (workflow .workflow_id ):
570- claim_acquired [1 ] = True
571- except LoopClaimError :
572- pass
573-
574- task1 = asyncio .create_task (worker1 ())
575- task2 = asyncio .create_task (worker2 ())
563+ if holding [1 - idx ]:
564+ overlap_detected [0 ] = True
565+ holding [idx ] = True
566+ acquired_order .append (idx )
567+ await asyncio .sleep (0.5 )
568+ holding [idx ] = False
569+
570+ task1 = asyncio .create_task (worker (0 ))
571+ await asyncio .sleep (0.1 )
572+ task2 = asyncio .create_task (worker (1 ))
576573
577574 await asyncio .gather (task1 , task2 , return_exceptions = True )
578575
579- assert claim_acquired [0 ]
580- assert not claim_acquired [ 1 ]
576+ assert not overlap_detected [0 ], "Both workers held the claim simultaneously"
577+ assert acquired_order == [ 0 , 1 ], "Workers should acquire in order"
581578
582579
583580class TestRetryIntegration :
0 commit comments