@@ -214,11 +214,10 @@ class CanonicalOptions(LiftLowerOptions):
214214
215215### Runtime State
216216
217- scheduler = asyncio .Lock ()
218-
219217#### Component Instance State
220218
221219class ComponentInstance :
220+ store : Store
222221 table : Table
223222 may_leave : bool
224223 backpressure : bool
@@ -455,6 +454,51 @@ def drop(self):
455454 trap_if (len (self .elems ) > 0 )
456455 trap_if (self .num_waiting > 0 )
457456
457+ #### Thread State
458+
459+ class Cancelled (IntEnum ):
460+ FALSE = 0
461+ TRUE = 1
462+
463+ class Thread :
464+ store : Store
465+ awaitable : Optional [Awaitable ]
466+ on_resume : Optional [asyncio .Future ]
467+ on_suspend_or_exit : Optional [asyncio .Future ]
468+
469+ def __init__ (self , store , callee , on_start , on_resolve ):
470+ self .store = store
471+ self .awaitable = None
472+ self .on_resume = asyncio .Future ()
473+ self .on_suspend_or_exit = None
474+ async def thread ():
475+ assert (await self .on_resume == Cancelled .FALSE )
476+ callee (self , on_start , on_resolve )
477+ self .on_suspend_or_exit .set_result (None )
478+ asyncio .create_task (thread ())
479+ store .waiting .append (self )
480+
481+ async def resume (self , cancelled = Cancelled .FALSE ):
482+ assert (cancelled or (not self .awaitable or self .awaitable .done ()))
483+ self .awaitable = None
484+ self .store .waiting .remove (self )
485+ self .on_resume .set_result (cancelled )
486+ self .on_resume = None
487+ assert (not self .on_suspend_or_exit )
488+ self .on_suspend_or_exit = asyncio .Future ()
489+ await self .on_suspend_or_exit
490+ assert (self .awaitable )
491+ self .store .waiting .append (self )
492+
493+ async def suspend (self , awaitable ) -> Cancelled :
494+ assert (not self .awaitable )
495+ self .awaitable = awaitable
496+ self .on_suspend_or_exit .set_result (None )
497+ self .on_suspend_or_exit = None
498+ assert (not self .on_resume )
499+ self .on_resume = asyncio .Future ()
500+ return await thread .on_resume
501+
458502#### Task State
459503
460504class Cancelled (IntEnum ):
@@ -477,29 +521,28 @@ class State(Enum):
477521 inst : ComponentInstance
478522 ft : FuncType
479523 supertask : Optional [Task ]
524+ thread : Thread
480525 on_resolve : OnResolve
481- on_block : OnBlock
482526 num_borrows : int
483527 context : ContextLocalStorage
484528
485- def __init__ (self , opts , inst , ft , supertask , on_resolve , on_block ):
529+ def __init__ (self , opts , inst , ft , supertask , thread , on_resolve ):
486530 self .state = Task .State .INITIAL
487531 self .opts = opts
488532 self .inst = inst
489533 self .ft = ft
490534 self .supertask = supertask
535+ self .thread = thread
491536 self .on_resolve = on_resolve
492- self .on_block = on_block
493537 self .num_borrows = 0
494538 self .context = ContextLocalStorage ()
495539
496540 async def enter (self ):
497- assert (scheduler .locked ())
498541 self .trap_if_on_the_stack (self .inst )
499542 if not self .may_enter (self ) or self .inst .pending_tasks :
500543 f = asyncio .Future ()
501544 self .inst .pending_tasks .append ((self , f ))
502- if await self .on_block (f ) == Cancelled .TRUE :
545+ if await self .thread . suspend (f ) == Cancelled .TRUE :
503546 [i ] = [i for i ,(t ,_ ) in enumerate (self .inst .pending_tasks ) if t == self ]
504547 self .inst .pending_tasks .pop (i )
505548 self .on_resolve (None )
@@ -537,35 +580,22 @@ async def sync_wait(self, awaitable) -> None:
537580 return
538581 assert (self .inst .unblocked .is_set ())
539582 self .inst .unblocked .clear ()
540- if await self .on_block (awaitable ) == Cancelled .TRUE :
583+ if await self .thread . suspend (awaitable ) == Cancelled .TRUE :
541584 assert (self .state == Task .State .INITIAL )
542585 self .state = Task .State .PENDING_CANCEL
543- assert (await self .on_block (awaitable ) == Cancelled .FALSE )
586+ assert (await self .thread . suspend (awaitable ) == Cancelled .FALSE )
544587 self .inst .unblocked .set ()
545588
546589 async def async_wait (self , awaitable ) -> Cancelled :
547590 self .maybe_start_pending_task ()
548591 awaitable = asyncio .ensure_future (awaitable )
549592 if awaitable .done () and not DETERMINISTIC_PROFILE and random .randint (0 ,1 ):
550593 return
551- cancelled = await self .on_block (awaitable )
594+ cancelled = await self .thread . suspend (awaitable )
552595 while not self .inst .unblocked .is_set ():
553- cancelled |= await self .on_block (self .inst .unblocked .wait ())
596+ cancelled |= await self .thread . suspend (self .inst .unblocked .wait ())
554597 return cancelled
555598
556- async def call_sync (self , callee , on_start , on_return ):
557- async def sync_on_block (awaitable ):
558- if await self .on_block (awaitable ) == Cancelled .TRUE :
559- assert (self .state == Task .State .INITIAL )
560- self .state = Task .State .PENDING_CANCEL
561- assert (await self .on_block (awaitable ) == Cancelled .FALSE )
562- return False
563-
564- assert (self .inst .unblocked .is_set ())
565- self .inst .unblocked .clear ()
566- await callee (self , on_start , on_return , sync_on_block )
567- self .inst .unblocked .set ()
568-
569599 async def wait_for_event (self , waitable_set , sync ) -> EventTuple :
570600 if self .state == Task .State .PENDING_CANCEL :
571601 self .state = Task .State .CANCEL_DELIVERED
@@ -625,7 +655,6 @@ def cancel(self):
625655 self .state = Task .State .RESOLVED
626656
627657 def exit (self ):
628- assert (scheduler .locked ())
629658 trap_if (self .state != Task .State .RESOLVED )
630659 assert (self .num_borrows == 0 )
631660 if self .opts .sync :
@@ -645,34 +674,46 @@ class State(IntEnum):
645674
646675 state : State
647676 task : Task
677+ callee_thread : Optional [Thread ]
648678 lenders : Optional [list [ResourceHandle ]]
649- request_cancel_begin : asyncio .Future
650- request_cancel_end : asyncio .Future
679+ cancelled : bool
651680
652681 def __init__ (self , task ):
653682 Waitable .__init__ (self )
654683 self .state = Subtask .State .STARTING
655684 self .task = task
685+ self .callee_thread = None
656686 self .lenders = []
657- self .request_cancel_begin = asyncio .Future ()
658- self .request_cancel_end = asyncio .Future ()
687+ self .cancelled = False
659688
660- async def call_sync (self , callee , on_start , on_resolve ):
661- def sync_on_start ():
689+ async def call (self , callee , on_start , on_resolve ):
690+ def update_on_start ():
662691 assert (self .state == Subtask .State .STARTING )
663692 self .state = Subtask .State .STARTED
664693 return on_start ()
665694
666- def sync_on_resolve (result ):
667- assert (result is not None )
695+ def update_on_resolve (result ):
668696 assert (self .state == Subtask .State .STARTED )
669- self .state = Subtask .State .RETURNED
697+ if result is None :
698+ assert (self .cancelled )
699+ if self .state == Subtask .State .STARTING :
700+ self .state = Subtask .State .CANCELLED_BEFORE_STARTED
701+ else :
702+ assert (self .state == Subtask .State .STARTED )
703+ self .state = Subtask .State .CANCELLED_BEFORE_RETURNED
704+ else :
705+ assert (self .state == Subtask .State .STARTED )
706+ self .state = Subtask .State .RETURNED
670707 on_resolve (result )
671708
672- await Task .call_sync (self .task , callee , sync_on_start , sync_on_resolve )
709+ assert (not self .callee_thread )
710+ self .callee_thread = Thread (self .supertask .inst .store , callee , update_on_start , update_on_resolve )
711+ await self .callee_thread .resume ()
673712
674- def cancelled (self ):
675- return self .request_cancel_begin .done ()
713+ async def wait_until_resolved (self ):
714+ while not self .resolved ():
715+ await self .supertask .sync_wait (self .callee_thread .awaitable )
716+ await self .callee_thread .resume ()
676717
677718 def resolved (self ):
678719 match self .state :
@@ -684,57 +725,13 @@ def resolved(self):
684725 Subtask .State .CANCELLED_BEFORE_RETURNED ):
685726 return True
686727
687- async def request_cancel (self ):
688- assert (not self .cancelled () and not self .resolved ())
689- self .request_cancel_begin .set_result (None )
690- await self .request_cancel_end
691-
692- async def call_async (self , callee , on_start , on_resolve ):
693- async def do_call ():
694- await callee (self .task , async_on_start , async_on_resolve , async_on_block )
695- relinquish_control ()
696-
697- def async_on_start ():
698- assert (self .state == Subtask .State .STARTING )
699- self .state = Subtask .State .STARTED
700- return on_start ()
701-
702- def async_on_resolve (result ):
703- if result is None :
704- if self .state == Subtask .State .STARTING :
705- self .state = Subtask .State .CANCELLED_BEFORE_STARTED
706- else :
707- assert (self .state == Subtask .State .STARTED )
708- self .state = Subtask .State .CANCELLED_BEFORE_RETURNED
709- else :
710- assert (self .state == Subtask .State .STARTED )
711- self .state = Subtask .State .RETURNED
712- on_resolve (result )
713-
714- async def async_on_block (awaitable ):
715- relinquish_control ()
716- if not self .request_cancel_end .done ():
717- await asyncio .wait ([awaitable , self .request_cancel_begin ],
718- return_when = asyncio .FIRST_COMPLETED )
719- if self .request_cancel_begin .done ():
720- return True
721- else :
722- await awaitable
723- assert (awaitable .done ())
724- await scheduler .acquire ()
725- return False
726-
727- def relinquish_control ():
728- if not ret .done ():
729- ret .set_result (None )
730- elif self .request_cancel_begin .done () and not self .request_cancel_end .done ():
731- self .request_cancel_end .set_result (None )
732- else :
733- scheduler .release ()
728+ def cancellation_requested (self ):
729+ return self .cancelled
734730
735- ret = asyncio .Future ()
736- asyncio .create_task (do_call ())
737- await ret
731+ async def request_cancel (self ):
732+ assert (not self .cancellation_requested () and not self .resolved ())
733+ self .cancelled = True
734+ await self .callee_thread .resume (Cancelled .TRUE )
738735
739736 def add_lender (self , lending_handle ):
740737 assert (not self .resolve_delivered () and not self .resolved ())
@@ -976,6 +973,27 @@ def drop(self):
976973 trap_if (not self .done )
977974 FutureEnd .drop (self )
978975
976+ ### Store State
977+
978+ class Store :
979+ loop : asyncio .AbstractEventLoop
980+ waiting : list [Thread ]
981+
982+ def __init__ (self ):
983+ self .loop = asyncio .new_event_loop ()
984+ self .waiting = []
985+
986+ def call_export (self , callee , on_start , on_resolve ):
987+ self .run_until_complete (Thread (self , callee , on_start , on_resolve ).resume ())
988+
989+ def tick (self , i ):
990+ if not DETERMINISTIC_PROFILE :
991+ random .shuffle (self .waiting )
992+ for thread in self .waiting :
993+ if thread .awaitable .done ():
994+ self .run_until_complete (thread .resume ())
995+ return
996+
979997### Despecialization
980998
981999def despecialize (t ):
@@ -1931,8 +1949,8 @@ def lower_flat_values(cx, max_flat, vs, ts, out_param = None):
19311949
19321950### `canon lift`
19331951
1934- async def canon_lift (opts , inst , ft , callee , caller , on_start , on_resolve , on_block ):
1935- task = Task (opts , inst , ft , caller , on_resolve , on_block )
1952+ async def canon_lift (opts , inst , ft , callee , caller , thread , on_start , on_resolve ):
1953+ task = Task (opts , inst , ft , caller , thread , on_resolve )
19361954 if not await task .enter ():
19371955 return
19381956
@@ -2021,7 +2039,9 @@ def on_resolve(result):
20212039 nonlocal flat_results
20222040 flat_results = lower_flat_values (cx , MAX_FLAT_RESULTS , result , ft .result_type (), flat_args )
20232041
2024- await subtask .call_sync (callee , on_start , on_resolve )
2042+ await subtask .call (callee , on_start , on_resolve )
2043+ await subtask .wait_until_resolved ()
2044+
20252045 assert (types_match_values (flat_ft .results , flat_results ))
20262046 subtask .deliver_resolve ()
20272047 return flat_results
@@ -2049,6 +2069,11 @@ def subtask_event():
20492069 subtask .deliver_resolve ()
20502070 return [Subtask .State .RETURNED ]
20512071
2072+ await subtask .call (callee , on_start , on_resolve )
2073+ if subtask .resolved ():
2074+ subtask .deliver_resolve ()
2075+ return [Subtask .State .RETURNED ]
2076+
20522077 subtaski = task .inst .table .add (subtask )
20532078 assert (0 < subtaski <= Table .MAX_LENGTH < 2 ** 28 )
20542079 assert (0 <= subtask .state < 2 ** 4 )
@@ -2213,16 +2238,13 @@ async def canon_subtask_cancel(sync, task, i):
22132238 trap_if (not task .inst .may_leave )
22142239 subtask = task .inst .table .get (i )
22152240 trap_if (not isinstance (subtask , Subtask ))
2216- trap_if (subtask .resolve_delivered () or subtask .cancelled ())
2241+ trap_if (subtask .resolve_delivered () or subtask .cancellation_requested ())
22172242 if subtask .resolved ():
22182243 assert (subtask .has_pending_event ())
22192244 else :
22202245 await subtask .request_cancel ()
22212246 if sync :
2222- while not subtask .resolved ():
2223- if subtask .has_pending_event ():
2224- _ = subtask .get_event ()
2225- await task .sync_wait (subtask .wait_for_pending_event ())
2247+ await subtask .wait_until_resolved ()
22262248 else :
22272249 if not subtask .resolved ():
22282250 return [BLOCKED ]
0 commit comments