Skip to content

Commit 71c2e05

Browse files
committed
CABI: refactor to remove Thread.in_event_loop (no behavior change)
1 parent 529a118 commit 71c2e05

File tree

3 files changed

+86
-73
lines changed

3 files changed

+86
-73
lines changed

design/mvp/CanonicalABI.md

Lines changed: 52 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ class ComponentInstance:
296296
threads: Table[Thread]
297297
may_leave: bool
298298
backpressure: int
299-
exclusive: bool
299+
exclusive: Optional[Task]
300300
num_waiting_to_enter: int
301301

302302
def __init__(self, store, parent = None):
@@ -307,7 +307,7 @@ class ComponentInstance:
307307
self.threads = Table()
308308
self.may_leave = True
309309
self.backpressure = 0
310-
self.exclusive = False
310+
self.exclusive = None
311311
self.num_waiting_to_enter = 0
312312
```
313313
Components are always instantiated in the context of a `Store` which is saved
@@ -609,7 +609,6 @@ class Thread:
609609
ready_func: Optional[Callable[[], bool]]
610610
cancellable: bool
611611
suspend_result: Optional[SuspendResult]
612-
in_event_loop: bool
613612
index: Optional[int]
614613
context: list[int]
615614

@@ -628,13 +627,11 @@ class Thread:
628627
assert(self.pending())
629628
return self.ready_func()
630629
```
631-
The `in_event_loop` field is used by `Task.request_cancellation` to prevent
632-
unexpected reentrance of `callback` functions. The `index` field stores the
633-
index of the thread in its containing component instance's `threads` table and
634-
is initialized only once a thread is allowed to start executing (after the
635-
backpressure gate). The `context` field holds the [thread-local storage]
636-
accessed by the `context.{get,set}` built-ins. All the other fields are used
637-
directly by `Thread` methods as shown next.
630+
The `index` field stores the index of the thread in its containing component
631+
instance's `threads` table and is initialized only once a thread is allowed to
632+
start executing (after the backpressure gate). The `context` field holds the
633+
[thread-local storage] accessed by the `context.{get,set}` built-ins. All the
634+
other fields are used directly by `Thread` methods as shown next.
638635

639636
When a `Thread` is created, an internal `threading.Thread` is started and
640637
immediately blocked `acquire()`ing `fiber_lock` (which will be `release()`ed by
@@ -648,7 +645,6 @@ immediately blocked `acquire()`ing `fiber_lock` (which will be `release()`ed by
648645
self.ready_func = None
649646
self.cancellable = False
650647
self.suspend_result = None
651-
self.in_event_loop = False
652648
self.index = None
653649
self.context = [0] * Thread.CONTEXT_LENGTH
654650
def fiber_func():
@@ -943,10 +939,11 @@ for calls to exports.
943939
```python
944940
class Task(Call, Supertask):
945941
class State(Enum):
946-
INITIAL = 1
947-
PENDING_CANCEL = 2
948-
CANCEL_DELIVERED = 3
949-
RESOLVED = 4
942+
UNRESOLVED = 1
943+
BACKPRESSURE = 2
944+
PENDING_CANCEL = 3
945+
CANCEL_DELIVERED = 4
946+
RESOLVED = 5
950947

951948
state: State
952949
opts: CanonicalOptions
@@ -958,7 +955,7 @@ class Task(Call, Supertask):
958955
threads: list[Thread]
959956

960957
def __init__(self, opts, inst, ft, supertask, on_resolve):
961-
self.state = Task.State.INITIAL
958+
self.state = Task.State.UNRESOLVED
962959
self.opts = opts
963960
self.inst = inst
964961
self.ft = ft
@@ -1034,17 +1031,19 @@ exports.
10341031
if not self.ft.async_:
10351032
return True
10361033
def has_backpressure():
1037-
return self.inst.backpressure > 0 or (self.needs_exclusive() and self.inst.exclusive)
1034+
return self.inst.backpressure > 0 or (self.needs_exclusive() and bool(self.inst.exclusive))
10381035
if has_backpressure() or self.inst.num_waiting_to_enter > 0:
1036+
self.state = Task.State.BACKPRESSURE
10391037
self.inst.num_waiting_to_enter += 1
10401038
result = thread.suspend_until(lambda: not has_backpressure(), cancellable = True)
10411039
self.inst.num_waiting_to_enter -= 1
10421040
if result == SuspendResult.CANCELLED:
10431041
self.cancel()
10441042
return False
1043+
self.state = Task.State.UNRESOLVED
10451044
if self.needs_exclusive():
1046-
assert(not self.inst.exclusive)
1047-
self.inst.exclusive = True
1045+
assert(self.inst.exclusive is None)
1046+
self.inst.exclusive = self
10481047
return True
10491048
```
10501049
Since the order in which suspended threads are resumed is nondeterministic (see
@@ -1068,47 +1067,55 @@ returns to clear the `exclusive` flag set by `Task.enter`, allowing other
10681067
if not self.ft.async_:
10691068
return
10701069
if self.needs_exclusive():
1071-
assert(self.inst.exclusive)
1072-
self.inst.exclusive = False
1070+
assert(self.inst.exclusive is self)
1071+
self.inst.exclusive = None
10731072
```
10741073

10751074
The `Task.request_cancellation` method is called by the host or wasm caller
10761075
(via the `Call` interface of `Task`) to signal that they don't need the return
10771076
value and that the caller should hurry up and call the `OnResolve` callback. If
1078-
*any* of a cancelled `Task`'s `Thread`s are expecting cancellation (e.g., when
1077+
a task is waiting to start in `Task.enter` due to backpressure, then it is
1078+
immediately cancelled without running any guest code. Otherwise, if *any* of a
1079+
cancelled `Task`'s `Thread`s are expecting cancellation (e.g., when
10791080
an `async callback` export returns to the event loop or when a `waitable-set.*`
10801081
or `thread.*` built-in is called with `cancellable` set), `request_cancellation`
10811082
immediately resumes that thread (picking one nondeterministically if there are
10821083
multiple), giving the thread the chance to handle cancellation promptly
10831084
(allowing `subtask.cancel` to complete eagerly without returning `BLOCKED`).
1084-
Otherwise, the cancellation request is remembered in the `Task`'s `state` so
1085-
that it can be delivered in the future by `Task.deliver_pending_cancel`.
10861085
```python
10871086
def request_cancellation(self):
1088-
assert(self.state == Task.State.INITIAL)
1089-
random.shuffle(self.threads)
1090-
for thread in self.threads:
1091-
if thread.cancellable and not (thread.in_event_loop and self.inst.exclusive):
1092-
self.state = Task.State.CANCEL_DELIVERED
1093-
thread.resume(SuspendResult.CANCELLED)
1094-
return
1087+
if self.state == Task.State.BACKPRESSURE:
1088+
assert(len(self.threads) == 1)
1089+
self.state = Task.State.CANCEL_DELIVERED
1090+
self.threads[0].resume(SuspendResult.CANCELLED)
1091+
return
1092+
assert(self.state == Task.State.UNRESOLVED)
1093+
if not self.needs_exclusive() or not self.inst.exclusive or self.inst.exclusive is self:
1094+
random.shuffle(self.threads)
1095+
for thread in self.threads:
1096+
if thread.cancellable:
1097+
self.state = Task.State.CANCEL_DELIVERED
1098+
thread.resume(SuspendResult.CANCELLED)
1099+
return
10951100
self.state = Task.State.PENDING_CANCEL
1101+
```
1102+
As handled above, cancellation must avoid running two `needs_exclusive` tasks at
1103+
the same time in the corner case where the first task starts and blocks and then
1104+
the other task is cancelled. However, a single `needs_exclusive` task that
1105+
starts and blocks calling a built-in with `cancellable` set *can* be immediately
1106+
resumed. Thus, the `exclusive` lock tracks *which* task is exclusively running
1107+
to distinguish these cases.
10961108

1109+
If cancellation cannot be immediately delivered by `Task.request_cancellation`,
1110+
the request is remembered in `Task.state` and delivered at the next opportunity
1111+
by `Task.deliver_pending_cancel`, which is checked at all cancellation points:
1112+
```python
10971113
def deliver_pending_cancel(self, cancellable) -> bool:
10981114
if cancellable and self.state == Task.State.PENDING_CANCEL:
10991115
self.state = Task.State.CANCEL_DELIVERED
11001116
return True
11011117
return False
11021118
```
1103-
`in_event_loop` is set by the `async callback` event loop (in `canon_lift`,
1104-
defined below) every time the event loop suspends the thread and is used here
1105-
to detect the corner case where one `async callback` task returns to its event
1106-
loop, then a second `async callback` task starts running and suspends *without*
1107-
returning to its event loop, and then the caller cancels the first task. In
1108-
this case, the first task's `Thread` is `cancellable` (it returned to its event
1109-
loop, which sets `cancellable`) but it cannot be resumed until the second task
1110-
returns to its event loop (since `async callback` wasm code is non-reentrant
1111-
and `needs_exclusive`).
11121119

11131120
The following `Task` methods wrap corresponding `Thread` methods after first
11141121
delivering any pending cancellations set by `Task.request_cancellation`:
@@ -3341,8 +3348,8 @@ function (specified as a `funcidx` immediate in `canon lift`) until the
33413348
[packed] = call_and_trap_on_throw(callee, thread, flat_args)
33423349
code,si = unpack_callback_result(packed)
33433350
while code != CallbackCode.EXIT:
3344-
thread.in_event_loop = True
3345-
inst.exclusive = False
3351+
assert(inst.exclusive is task)
3352+
inst.exclusive = None
33463353
match code:
33473354
case CallbackCode.YIELD:
33483355
if task.may_block():
@@ -3356,8 +3363,8 @@ function (specified as a `funcidx` immediate in `canon lift`) until the
33563363
event = task.wait_until(lambda: not inst.exclusive, thread, wset, cancellable = True)
33573364
case _:
33583365
trap()
3359-
thread.in_event_loop = False
3360-
inst.exclusive = True
3366+
assert(inst.exclusive is None)
3367+
inst.exclusive = task
33613368
event_code, p1, p2 = event
33623369
[packed] = call_and_trap_on_throw(opts.callback, thread, [event_code, p1, p2])
33633370
code,si = unpack_callback_result(packed)
@@ -3383,9 +3390,7 @@ execute in the interim. However, other synchronous and `async callback` tasks
33833390
*cannot* execute while running core wasm called from the event loop as this
33843391
could break the non-reentrancy assumptions of the core wasm code. Thus,
33853392
`async callback` tasks allow less concurrency than non-`callback` `async`
3386-
tasks, which entirely ignore `ComponentInstance.exclusive`. The `in_event_loop`
3387-
flag is set while suspended to prevent `Task.request_cancellation` from
3388-
reentering during a core wasm call.
3393+
tasks, which entirely ignore `ComponentInstance.exclusive`.
33893394

33903395
The end of `canon_lift` immediately runs the `thread_func` function (which
33913396
contains all the steps above) in a new `Thread`, allowing `thread_func` to make

design/mvp/canonical-abi/definitions.py

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ class ComponentInstance:
260260
threads: Table[Thread]
261261
may_leave: bool
262262
backpressure: int
263-
exclusive: bool
263+
exclusive: Optional[Task]
264264
num_waiting_to_enter: int
265265

266266
def __init__(self, store, parent = None):
@@ -271,7 +271,7 @@ def __init__(self, store, parent = None):
271271
self.threads = Table()
272272
self.may_leave = True
273273
self.backpressure = 0
274-
self.exclusive = False
274+
self.exclusive = None
275275
self.num_waiting_to_enter = 0
276276

277277
def reflexive_ancestors(self) -> set[ComponentInstance]:
@@ -376,7 +376,6 @@ class Thread:
376376
ready_func: Optional[Callable[[], bool]]
377377
cancellable: bool
378378
suspend_result: Optional[SuspendResult]
379-
in_event_loop: bool
380379
index: Optional[int]
381380
context: list[int]
382381

@@ -403,7 +402,6 @@ def __init__(self, task, thread_func):
403402
self.ready_func = None
404403
self.cancellable = False
405404
self.suspend_result = None
406-
self.in_event_loop = False
407405
self.index = None
408406
self.context = [0] * Thread.CONTEXT_LENGTH
409407
def fiber_func():
@@ -559,10 +557,11 @@ def drop(self):
559557

560558
class Task(Call, Supertask):
561559
class State(Enum):
562-
INITIAL = 1
563-
PENDING_CANCEL = 2
564-
CANCEL_DELIVERED = 3
565-
RESOLVED = 4
560+
UNRESOLVED = 1
561+
BACKPRESSURE = 2
562+
PENDING_CANCEL = 3
563+
CANCEL_DELIVERED = 4
564+
RESOLVED = 5
566565

567566
state: State
568567
opts: CanonicalOptions
@@ -574,7 +573,7 @@ class State(Enum):
574573
threads: list[Thread]
575574

576575
def __init__(self, opts, inst, ft, supertask, on_resolve):
577-
self.state = Task.State.INITIAL
576+
self.state = Task.State.UNRESOLVED
578577
self.opts = opts
579578
self.inst = inst
580579
self.ft = ft
@@ -605,35 +604,43 @@ def enter(self, thread):
605604
if not self.ft.async_:
606605
return True
607606
def has_backpressure():
608-
return self.inst.backpressure > 0 or (self.needs_exclusive() and self.inst.exclusive)
607+
return self.inst.backpressure > 0 or (self.needs_exclusive() and bool(self.inst.exclusive))
609608
if has_backpressure() or self.inst.num_waiting_to_enter > 0:
609+
self.state = Task.State.BACKPRESSURE
610610
self.inst.num_waiting_to_enter += 1
611611
result = thread.suspend_until(lambda: not has_backpressure(), cancellable = True)
612612
self.inst.num_waiting_to_enter -= 1
613613
if result == SuspendResult.CANCELLED:
614614
self.cancel()
615615
return False
616+
self.state = Task.State.UNRESOLVED
616617
if self.needs_exclusive():
617-
assert(not self.inst.exclusive)
618-
self.inst.exclusive = True
618+
assert(self.inst.exclusive is None)
619+
self.inst.exclusive = self
619620
return True
620621

621622
def exit(self):
622623
assert(len(self.threads) > 0)
623624
if not self.ft.async_:
624625
return
625626
if self.needs_exclusive():
626-
assert(self.inst.exclusive)
627-
self.inst.exclusive = False
627+
assert(self.inst.exclusive is self)
628+
self.inst.exclusive = None
628629

629630
def request_cancellation(self):
630-
assert(self.state == Task.State.INITIAL)
631-
random.shuffle(self.threads)
632-
for thread in self.threads:
633-
if thread.cancellable and not (thread.in_event_loop and self.inst.exclusive):
634-
self.state = Task.State.CANCEL_DELIVERED
635-
thread.resume(SuspendResult.CANCELLED)
636-
return
631+
if self.state == Task.State.BACKPRESSURE:
632+
assert(len(self.threads) == 1)
633+
self.state = Task.State.CANCEL_DELIVERED
634+
self.threads[0].resume(SuspendResult.CANCELLED)
635+
return
636+
assert(self.state == Task.State.UNRESOLVED)
637+
if not self.needs_exclusive() or not self.inst.exclusive or self.inst.exclusive is self:
638+
random.shuffle(self.threads)
639+
for thread in self.threads:
640+
if thread.cancellable:
641+
self.state = Task.State.CANCEL_DELIVERED
642+
thread.resume(SuspendResult.CANCELLED)
643+
return
637644
self.state = Task.State.PENDING_CANCEL
638645

639646
def deliver_pending_cancel(self, cancellable) -> bool:
@@ -2014,8 +2021,8 @@ def thread_func(thread):
20142021
[packed] = call_and_trap_on_throw(callee, thread, flat_args)
20152022
code,si = unpack_callback_result(packed)
20162023
while code != CallbackCode.EXIT:
2017-
thread.in_event_loop = True
2018-
inst.exclusive = False
2024+
assert(inst.exclusive is task)
2025+
inst.exclusive = None
20192026
match code:
20202027
case CallbackCode.YIELD:
20212028
if task.may_block():
@@ -2029,8 +2036,8 @@ def thread_func(thread):
20292036
event = task.wait_until(lambda: not inst.exclusive, thread, wset, cancellable = True)
20302037
case _:
20312038
trap()
2032-
thread.in_event_loop = False
2033-
inst.exclusive = True
2039+
assert(inst.exclusive is None)
2040+
inst.exclusive = task
20342041
event_code, p1, p2 = event
20352042
[packed] = call_and_trap_on_throw(opts.callback, thread, [event_code, p1, p2])
20362043
code,si = unpack_callback_result(packed)

design/mvp/canonical-abi/run_tests.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ def run_lift(opts, inst, ft, callee, on_start, on_resolve):
5959

6060
def mk_task(caller, on_resolve, thread_func):
6161
inst = ComponentInstance(caller.inst.store)
62-
task = Task(None, inst, FuncType([],[],async_=True), caller, on_resolve)
62+
opts = mk_opts(async_ = True)
63+
task = Task(opts, inst, FuncType([],[],async_=True), caller, on_resolve)
6364
thread = Thread(task, thread_func)
6465
thread.resume()
6566
return task

0 commit comments

Comments
 (0)