@@ -805,8 +805,8 @@ has a chance to re-enable backpressure. To ensure this property, the
805805` starting_pending_task ` flag is set here and cleared when the pending task
806806actually resumes execution in ` enter ` , preventing more pending tasks from being
807807started in the interim. Lastly, ` maybe_start_pending_task ` is only called at
808- specific points (` wait_on ` and ` exit ` , below) where the core wasm code has had
809- the opportunity to re-enable backpressure if need be.
808+ specific points (` wait_async ` and ` exit ` , below) where the core wasm code has
809+ had the opportunity to re-enable backpressure if need be.
810810``` python
811811 def maybe_start_pending_task (self ):
812812 if self .inst.starting_pending_task:
@@ -822,63 +822,65 @@ Notably, the loop in `maybe_start_pending_task` allows pending async tasks to
822822start even when there is a blocked pending sync task ahead of them in the
823823` pending_tasks ` queue.
824824
825- The ` Task.wait_on ` method defines how to block the current task on a given
826- Python [ awaitable] using the ` OnBlock ` callback described above:
825+ The ` Task.wait_sync ` method blocks the current task until a given Python
826+ [ awaitable] is resolved, setting the instance-wide ` calling_sync_import ` flag
827+ to prohibit (via guards in ` may_enter ` and ` wait_async ` ) any other tasks from
828+ starting or resuming in the current component instance in the interim. (Tasks
829+ in * other* component instances may however execute in the interim.)
827830``` python
828- async def wait_on (self , awaitable , sync , cancellable = False ) -> bool :
829- if sync:
830- assert (not self .inst.calling_sync_import)
831- self .inst.calling_sync_import = True
832- else :
833- self .maybe_start_pending_task()
834-
831+ async def wait_sync (self , awaitable ) -> None :
835832 awaitable = asyncio.ensure_future(awaitable)
836833 if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0 ,1 ):
837- cancelled = Cancelled.FALSE
838- else :
839- cancelled = await self .on_block(awaitable)
840- if cancelled and not cancellable:
841- assert (self .state == Task.State.INITIAL )
842- self .state = Task.State.PENDING_CANCEL
843- cancelled = await self .on_block(awaitable)
844- assert (not cancelled)
845-
846- if sync:
847- self .inst.calling_sync_import = False
848- self .inst.async_waiting_tasks.notify_all()
849- else :
850- while self .inst.calling_sync_import:
851- await self .inst.async_waiting_tasks.wait()
852-
853- return cancelled
834+ return
835+ assert (not self .inst.calling_sync_import)
836+ self .calling_sync_import = True
837+ if await self .on_block(awaitable) == Cancelled.TRUE :
838+ assert (self .state == Task.State.INITIAL )
839+ self .state = Task.State.PENDING_CANCEL
840+ assert (await self .on_block(awaitable) == Cancelled.FALSE )
841+ self .inst.calling_sync_import = False
842+ self .inst.async_waiting_tasks.notify_all()
854843```
855844If the given ` awaitable ` is already resolved (e.g., if between making an async
856845import call that blocked and calling ` waitable-set.wait ` the I/O operation
857846completed), the Component Model allows the runtime to nondeterministically
858847avoid calling ` OnBlock ` which, in component-to-component async calls, means
859848that control flow does not need to transfer to the calling component.
860849
861- If ` wait_on ` is called with ` sync ` set to ` True ` , only tasks in * other*
862- component instances may execute; no code in the current component instance may
863- execute. This is achieved by setting and waiting on ` calling_sync_import `
864- (using the ` async_waiting_tasks ` [ ` asyncio.Condition ` ] ). ` calling_sync_import `
865- is also checked by ` may_enter ` (to prevent reentrance by a new task) and set by
866- ` call_sync ` (defined next).
867-
868- If ` wait_on ` is called with ` cancellable ` set to ` True ` , then the caller
869- expects and propagates the case where the supertask requests cancellation
870- (when ` True ` is returned). But if ` cancellable ` is ` False ` , then ` wait_on ` must
871- handle the cancellation request itself by setting the task's ` state ` to
872- ` PENDING_CANCEL ` (to be picked up by ` wait_for_event ` , ` poll_for_event ` or
873- ` yield ` in the future) and calling ` OnBlock ` a second time (noting that
874- ` OnBlock ` can only request cancellation * once* ).
850+ Since synchronous callers may not be have enough context to know how or where
851+ to propagate cancellation, if a supertask requests cancellation while
852+ synchronously waiting, ` wait_sync ` remembers the request for cancellation and
853+ then keeps waiting (which, since cancellation is delivered at most once, will
854+ not return ` Cancelled.TRUE ` again). Cancellation will be delivered to core
855+ wasm code at the next asynchronous call where cancellation is expected.
856+
857+ The ` Task.wait_async ` method blocks the current task until a given Python
858+ [ awaitable] is resolved. Unlike ` wait_sync ` , ` wait_async ` allows other tasks to
859+ be started or resumed in the current component instance in the interim.
860+ However, while asynchronously waiting, another task may start * synchronously*
861+ waiting and thus ` wait_async ` takes care to wait until ` calling_sync_import ` is
862+ cleared before returning control flow back to the calling task:
863+ ``` python
864+ async def wait_async (self , awaitable ) -> Cancelled:
865+ self .maybe_start_pending_task()
866+ awaitable = asyncio.ensure_future(awaitable)
867+ if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0 ,1 ):
868+ return Cancelled.FALSE
869+ cancelled = await self .on_block(awaitable)
870+ while self .inst.calling_sync_import:
871+ cancelled |= await self .on_block(self .inst.async_waiting_tasks.wait())
872+ return cancelled
873+ ```
874+ The ` maybe_start_pending_task() ` call on the first line indicates that each
875+ time a task asynchronously waits, it potentially unblocks a single pending task
876+ that had previously hit backpressure and queued, but has since been unblocked.
875877
876878The ` Task.call_sync ` method defines how a task makes a synchronous call to an
877- imported ` callee ` . ` call_sync ` works just like ` wait_on ` when ` sync ` is ` True `
878- and ` cancellable ` is ` False ` except that ` call_sync ` avoids unconditionally
879- blocking and instead only blocks if ` callee ` transitively blocks. This means
880- that N-deep synchronous callstacks avoid the overhead of async calls if none of
881- the calls in the stack actually block on external I/O.
879+ imported ` callee ` . ` call_sync ` works just like ` wait_sync ` except that
880+ ` call_sync ` avoids unconditionally blocking and instead only blocks if ` callee `
881+ transitively blocks. This means that N-deep synchronous callstacks avoid the
882+ overhead of async calls if none of the calls in the stack actually block on
883+ external I/O.
882884``` python
883885 async def call_sync (self , callee , on_start , on_return ):
884886 async def sync_on_block (awaitable ):
@@ -900,21 +902,24 @@ when a `callback` is used, when the `callback` returns `WAIT` to the event
900902loop. ` wait_for_event ` waits until a ` Waitable ` in a given ` WaitableSet ` makes
901903progress:
902904``` python
903- async def wait_for_event (self , waitable_set , sync ) -> EventTuple:
905+ async def wait_for_event (self , wset , sync ) -> EventTuple:
904906 if self .state == Task.State.PENDING_CANCEL :
905907 self .state = Task.State.CANCEL_DELIVERED
906908 return (EventCode.TASK_CANCELLED , 0 , 0 )
907909 else :
908- waitable_set .num_waiting += 1
910+ wset .num_waiting += 1
909911 e = None
910912 while not e:
911- maybe_event = waitable_set.maybe_has_pending_event.wait()
912- if await self .wait_on(maybe_event, sync, cancellable = True ):
913- assert (self .state == Task.State.INITIAL )
914- self .state = Task.State.CANCEL_DELIVERED
915- return (EventCode.TASK_CANCELLED , 0 , 0 )
916- e = waitable_set.poll()
917- waitable_set.num_waiting -= 1
913+ if sync:
914+ await self .wait_sync(wset.maybe_has_pending_event.wait())
915+ else :
916+ if await self .wait_async(wset.maybe_has_pending_event.wait()) == Cancelled.TRUE :
917+ assert (self .state == Task.State.INITIAL )
918+ self .state = Task.State.CANCEL_DELIVERED
919+ e = (EventCode.TASK_CANCELLED , 0 , 0 )
920+ break
921+ e = wset.poll()
922+ wset.num_waiting -= 1
918923 return e
919924```
920925As mentioned above with ` WaitableSet ` , ` maybe_has_pending_event ` is allowed to
@@ -923,10 +928,10 @@ there is actually an event. This looping as well as the number of iterations is
923928not semantically observable by the wasm code and so the host implementation can
924929loop or not using its own event delivery scheme.
925930
926- If there is already a pending cancellation request (from a previous
927- non-cancellable ` wait_on ` or a ` call_sync ` ), the cancellation request is
928- delivered to core wasm via the ` TASK_CANCELLED ` event code and task's ` state `
929- is transitioned to ` CANCEL_DELIVERED ` so that ` canon_task_cancel ` can be called
931+ If there is already a pending cancellation request (during a previous
932+ ` wait_sync ` or a ` call_sync ` ), the cancellation request is now delivered to
933+ core wasm via the ` TASK_CANCELLED ` event code and task's ` state ` is
934+ transitioned to ` CANCEL_DELIVERED ` so that ` canon_task_cancel ` can be called
930935without trapping. If cancellation is requested * during* ` wait_for_event ` , there
931936is a direct transition to the ` CANCEL_DELIVERED ` state.
932937
@@ -941,12 +946,16 @@ control flow to switch to other `asyncio.Task`s.
941946 if self .state == Task.State.PENDING_CANCEL :
942947 self .state = Task.State.CANCEL_DELIVERED
943948 return (EventCode.TASK_CANCELLED , 0 , 0 )
944- elif await self .wait_on(asyncio.sleep(0 ), sync, cancellable = True ):
945- assert (self .state == Task.State.INITIAL )
946- self .state = Task.State.CANCEL_DELIVERED
947- return (EventCode.TASK_CANCELLED , 0 , 0 )
948- else :
949+ elif sync:
950+ await self .wait_sync(asyncio.sleep(0 ))
949951 return (EventCode.NONE , 0 , 0 )
952+ else :
953+ if await self .wait_async(asyncio.sleep(0 )) == Cancelled.TRUE :
954+ assert (self .state == Task.State.INITIAL )
955+ self .state = Task.State.CANCEL_DELIVERED
956+ return (EventCode.TASK_CANCELLED , 0 , 0 )
957+ else :
958+ return (EventCode.NONE , 0 , 0 )
950959```
951960Handling of cancellation requests in ` yield_ ` mirrors ` wait_for_event ` above,
952961handling both the cases of pending cancellation and cancellation while
@@ -957,11 +966,11 @@ when a `callback` is used, when the `callback` returns `POLL` to the event
957966loop. Polling returns the ` NONE ` event code instead of blocking when there are
958967no pending events.
959968``` python
960- async def poll_for_event (self , waitable_set , sync ) -> Optional[EventTuple]:
969+ async def poll_for_event (self , wset , sync ) -> Optional[EventTuple]:
961970 event_code,_,_ = e = await self .yield_(sync)
962971 if event_code == EventCode.TASK_CANCELLED :
963972 return e
964- elif (e := waitable_set .poll()):
973+ elif (e := wset .poll()):
965974 return e
966975 else :
967976 return (EventCode.NONE , 0 , 0 )
@@ -3579,10 +3588,8 @@ execute, however tasks in *other* component instances may execute. This allows
35793588a long-running task in one component to avoid starving other components without
35803589needing support full reentrancy.
35813590
3582- Because other tasks can execute, a subtask can be cancelled while executing
3583- ` yield ` , in which case ` yield ` returns ` 1 ` . The language runtime and bindings
3584- generators should handle cancellation the same way as when receiving the
3585- ` TASK_CANCELLED ` event from ` waitable-set.wait ` .
3591+ If ` async ` is set, ` yield ` can return the ` 1 ` to indicate that the caller
3592+ requested cancellation. If ` async ` is not set, ` yield ` will always return ` 0 ` .
35863593
35873594
35883595### 🔀 ` canon waitable-set.new `
@@ -3636,11 +3643,15 @@ in the same component instance, which can be useful for producer toolchains in
36363643situations where interleaving is not supported. However, this is generally worse
36373644for concurrency and thus producer toolchains should set ` async ` when possible.
36383645
3639- ` wait ` can be called from a synchronously-lifted export so that even
3640- synchronous code can make concurrent import calls. In these synchronous cases,
3641- though, the automatic backpressure (applied by ` Task.enter ` ) will ensure there
3642- is only ever at most once synchronously-lifted task executing in a component
3643- instance at a time.
3646+ If ` async ` is set, ` waitable-set.wait ` can return the ` TASK_CANCELLED ` event
3647+ indicating that the caller requested cancellation. If ` async ` is not set, this
3648+ notification will be remembered and delivered at the next ` async ` call.
3649+
3650+ ` waitable-set.wait ` can be called from a synchronously-lifted export so that
3651+ even synchronous code can make concurrent import calls. In these synchronous
3652+ cases, though, the automatic backpressure (applied by ` Task.enter ` ) will ensure
3653+ there is only ever at most once synchronously-lifted task executing in a
3654+ component instance at a time.
36443655
36453656
36463657### 🔀 ` canon waitable-set.poll `
@@ -3667,6 +3678,10 @@ async def canon_waitable_set_poll(sync, mem, task, si, ptr):
36673678When ` async ` is set, ` poll_for_event ` can yield to other tasks (in this or other
36683679components) as part of polling for an event.
36693680
3681+ If ` async ` is set, ` waitable-set.poll ` can return the ` TASK_CANCELLED ` event
3682+ indicating that the caller requested cancellation. If ` async ` is not set, this
3683+ notification will be remembered and delivered at the next ` async ` call.
3684+
36703685
36713686### 🔀 ` canon waitable-set.drop `
36723687
@@ -3764,7 +3779,7 @@ async def canon_subtask_cancel(sync, task, i):
37643779 while not subtask.resolved():
37653780 if subtask.has_pending_event():
37663781 _ = subtask.get_event()
3767- await task.wait_on (subtask.wait_for_pending_event(), sync = True )
3782+ await task.wait_sync (subtask.wait_for_pending_event())
37683783 else :
37693784 if not subtask.resolved():
37703785 return [BLOCKED ]
@@ -3932,7 +3947,7 @@ instance, but allowing other tasks in other component instances to make
39323947progress):
39333948``` python
39343949 if opts.sync and not e.has_pending_event():
3935- await task.wait_on (e.wait_for_pending_event(), sync = True )
3950+ await task.wait_sync (e.wait_for_pending_event())
39363951```
39373952Finally, if there is a pending event on the stream end (which is necessarily a
39383953` copy_event ` closure), it is eagerly returned to the caller. Otherwise, the
@@ -4021,9 +4036,8 @@ in the high 28 bits; they're always zero.
40214036The end of ` future_copy ` is the exact same as ` stream_copy ` : waiting if ` sync `
40224037and returning either the progress made or ` BLOCKED ` .
40234038``` python
4024-
40254039 if opts.sync and not e.has_pending_event():
4026- await task.wait_on (e.wait_for_pending_event(), sync = True )
4040+ await task.wait_sync (e.wait_for_pending_event())
40274041
40284042 if e.has_pending_event():
40294043 code,index,payload = e.get_event()
@@ -4073,7 +4087,7 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
40734087 e.shared.cancel()
40744088 if not e.has_pending_event():
40754089 if sync:
4076- await task.wait_on (e.wait_for_pending_event(), sync = True )
4090+ await task.wait_sync (e.wait_for_pending_event())
40774091 else :
40784092 return [BLOCKED ]
40794093 code,index,payload = e.get_event()
@@ -4087,8 +4101,8 @@ callbacks (passed by `canon_{stream,future}_{read,write}` above) which will set
40874101a pending event that is caught by the * second* check for
40884102` e.has_pending_event() ` .
40894103
4090- If the copy hasn't been cancelled, the synchronous case uses ` Task.wait_on ` to
4091- synchronously and uninterruptibly wait for one of the ` on_* ` callbacks to
4104+ If the copy hasn't been cancelled, the synchronous case uses ` Task.wait_sync `
4105+ to synchronously and uninterruptibly wait for one of the ` on_* ` callbacks to
40924106eventually be called (which will set the pending event).
40934107
40944108The asynchronous case simply returns ` BLOCKING ` and the client code must wait
0 commit comments