Skip to content

Commit d94d30c

Browse files
committed
CABI: improve and add cooperative thread built-ins
1 parent 978f15c commit d94d30c

3 files changed

Lines changed: 407 additions & 61 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 83 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
class Trap(BaseException): pass
1818
class CoreWebAssemblyException(BaseException): pass
19+
class ThreadExit(BaseException): pass
1920

2021
def trap():
2122
raise Trap()
@@ -321,13 +322,15 @@ def waiting(self):
321322
return not self.running() and self.ready_func is not None
322323

323324
def ready(self):
324-
assert(self.waiting())
325-
return self.ready_func()
325+
return self.waiting() and self.ready_func()
326326

327327
def __init__(self, task, thread_func):
328328
def cont_func(cancelled):
329329
assert(self.running() and not cancelled)
330-
thread_func()
330+
try:
331+
thread_func()
332+
except ThreadExit:
333+
pass
331334
return None
332335
self.cont = cont_new(cont_func)
333336
self.ready_func = None
@@ -337,7 +340,11 @@ def cont_func(cancelled):
337340
self.storage = [0,0]
338341
assert(self.suspended())
339342

340-
def resume_later(self):
343+
def exit(self):
344+
assert(self.running())
345+
raise ThreadExit()
346+
347+
def unsuspend(self):
341348
assert(self.suspended())
342349
self.ready_func = lambda: True
343350
self.task.inst.store.waiting.append(self)
@@ -360,7 +367,7 @@ def stop_waiting(self):
360367
self.task.inst.store.waiting.remove(self)
361368

362369
def suspend(self, cancellable) -> Cancelled:
363-
assert(self.running() and self.task.may_block())
370+
assert(self.running())
364371
if self.task.deliver_pending_cancel(cancellable):
365372
self.stop_waiting()
366373
return Cancelled.TRUE
@@ -370,7 +377,7 @@ def suspend(self, cancellable) -> Cancelled:
370377
return cancelled
371378

372379
def wait_until(self, ready_func, cancellable = False) -> Cancelled:
373-
assert(self.running() and self.task.may_block())
380+
assert(self.running())
374381
if self.task.deliver_pending_cancel(cancellable):
375382
return Cancelled.TRUE
376383
if ready_func() and not DETERMINISTIC_PROFILE and random.randint(0,1):
@@ -379,18 +386,7 @@ def wait_until(self, ready_func, cancellable = False) -> Cancelled:
379386
self.task.inst.store.waiting.append(self)
380387
return self.suspend(cancellable)
381388

382-
def yield_until(self, ready_func, cancellable) -> Cancelled:
383-
assert(self.running())
384-
if self.task.may_block():
385-
return self.wait_until(ready_func, cancellable)
386-
else:
387-
assert(ready_func())
388-
return Cancelled.FALSE
389-
390-
def yield_(self, cancellable) -> Cancelled:
391-
return self.yield_until(lambda: True, cancellable)
392-
393-
def switch_to(self, cancellable, other: Thread) -> Cancelled:
389+
def suspend_to_suspended(self, cancellable, other: Thread) -> Cancelled:
394390
assert(self.running() and other.suspended())
395391
if self.task.deliver_pending_cancel(cancellable):
396392
self.stop_waiting()
@@ -400,11 +396,27 @@ def switch_to(self, cancellable, other: Thread) -> Cancelled:
400396
assert(self.running() and (cancellable or not cancelled))
401397
return cancelled
402398

403-
def yield_to(self, cancellable, other: Thread) -> Cancelled:
399+
def yield_to_suspended(self, cancellable, other: Thread) -> Cancelled:
404400
assert(self.running() and other.suspended())
405401
self.ready_func = lambda: True
406402
self.task.inst.store.waiting.append(self)
407-
return self.switch_to(cancellable, other)
403+
return self.suspend_to_suspended(cancellable, other)
404+
405+
def suspend_then_promote(self, cancellable, other: Thread) -> ResumeArg:
406+
assert(self.running())
407+
if other.ready():
408+
other.stop_waiting()
409+
return self.suspend_to_suspended(cancellable, other)
410+
else:
411+
return self.suspend(cancellable)
412+
413+
def yield_then_promote(self, cancellable, other: Thread) -> ResumeArg:
414+
assert(self.running())
415+
if other.ready():
416+
other.stop_waiting()
417+
return self.yield_to_suspended(cancellable, other)
418+
else:
419+
return self.wait_until(lambda: True, cancellable)
408420

409421
### Tasks
410422

@@ -446,9 +458,6 @@ def needs_exclusive(self):
446458
assert(self.ft.async_)
447459
return not self.opts.async_ or self.opts.callback
448460

449-
def may_block(self):
450-
return self.ft.async_ or self.state == Task.State.RESOLVED
451-
452461
def enter_implicit_thread(self):
453462
assert(self.state == Task.State.INITIAL)
454463
self.implicit_thread = current_thread()
@@ -672,6 +681,11 @@ def remove(self, i):
672681
self.free.append(i)
673682
return e
674683

684+
def __iter__(self):
685+
for e in self.array:
686+
if e is not None:
687+
yield e
688+
675689
### Resource State
676690

677691
class ResourceHandle:
@@ -2114,13 +2128,12 @@ def thread_func():
21142128
inst.exclusive_thread = None
21152129
match code:
21162130
case CallbackCode.YIELD:
2117-
cancelled = thread.yield_until(lambda: not inst.exclusive_thread, cancellable = True)
2131+
cancelled = thread.wait_until(lambda: not inst.exclusive_thread, cancellable = True)
21182132
if cancelled:
21192133
event = (EventCode.TASK_CANCELLED, 0, 0)
21202134
else:
21212135
event = (EventCode.NONE, 0, 0)
21222136
case CallbackCode.WAIT:
2123-
trap_if(not task.may_block())
21242137
wset = inst.handles.get(si)
21252138
trap_if(not isinstance(wset, WaitableSet))
21262139
event = wset.wait_for_event_and(lambda: not inst.exclusive_thread, cancellable = True)
@@ -2137,6 +2150,11 @@ def thread_func():
21372150
task = Task(ft, opts, inst, on_start, on_resolve)
21382151
thread = Thread(task, thread_func)
21392152
thread.resume()
2153+
if not ft.async_:
2154+
while task.state != Task.State.RESOLVED:
2155+
candidates = { t for t in inst.threads if t.ready() and t is not inst.exclusive_thread }
2156+
trap_if(not candidates)
2157+
random.choice(list(candidates)).resume()
21402158
return task.request_cancellation
21412159

21422160
class CallbackCode(IntEnum):
@@ -2164,7 +2182,6 @@ def call_and_trap_on_throw(callee, args):
21642182
def canon_lower(callee, ft, opts, flat_args: list[CoreValType]) -> list[CoreValType]:
21652183
thread = current_thread()
21662184
trap_if(not thread.task.inst.may_leave)
2167-
trap_if(not thread.task.may_block() and ft.async_ and not opts.async_)
21682185

21692186
subtask = Subtask()
21702187
cx = LiftLowerContext(opts, thread.task.inst, subtask)
@@ -2345,13 +2362,12 @@ def canon_waitable_set_new():
23452362
### 🔀 `canon waitable-set.wait`
23462363

23472364
def canon_waitable_set_wait(cancellable, mem, si, ptr):
2348-
task = current_task()
2349-
trap_if(not task.inst.may_leave)
2350-
trap_if(not task.may_block())
2351-
wset = task.inst.handles.get(si)
2365+
inst = current_instance()
2366+
trap_if(not inst.may_leave)
2367+
wset = inst.handles.get(si)
23522368
trap_if(not isinstance(wset, WaitableSet))
23532369
event = wset.wait_for_event(cancellable)
2354-
return unpack_event(mem, task.inst, ptr, event)
2370+
return unpack_event(mem, inst, ptr, event)
23552371

23562372
def unpack_event(mem, inst, ptr, e: EventTuple):
23572373
event, p1, p2 = e
@@ -2402,7 +2418,6 @@ def canon_waitable_join(wi, si):
24022418
def canon_subtask_cancel(async_, i):
24032419
thread = current_thread()
24042420
trap_if(not thread.task.inst.may_leave)
2405-
trap_if(not thread.task.may_block() and not async_)
24062421
subtask = thread.task.inst.handles.get(i)
24072422
trap_if(not isinstance(subtask, Subtask))
24082423
trap_if(subtask.resolve_delivered())
@@ -2463,7 +2478,6 @@ def canon_stream_write(stream_t, opts, i, ptr, n):
24632478
def stream_copy(EndT, BufferT, event_code, stream_t, opts, i, ptr, n):
24642479
thread = current_thread()
24652480
trap_if(not thread.task.inst.may_leave)
2466-
trap_if(not thread.task.may_block() and not opts.async_)
24672481

24682482
e = thread.task.inst.handles.get(i)
24692483
trap_if(not isinstance(e, EndT))
@@ -2518,7 +2532,6 @@ def canon_future_write(future_t, opts, i, ptr):
25182532
def future_copy(EndT, BufferT, event_code, future_t, opts, i, ptr):
25192533
thread = current_thread()
25202534
trap_if(not thread.task.inst.may_leave)
2521-
trap_if(not thread.task.may_block() and not opts.async_)
25222535

25232536
e = thread.task.inst.handles.get(i)
25242537
trap_if(not isinstance(e, EndT))
@@ -2571,7 +2584,6 @@ def canon_future_cancel_write(future_t, async_, i):
25712584
def cancel_copy(EndT, event_code, stream_or_future_t, async_, i):
25722585
thread = current_thread()
25732586
trap_if(not thread.task.inst.may_leave)
2574-
trap_if(not thread.task.may_block() and not async_)
25752587
e = thread.task.inst.handles.get(i)
25762588
trap_if(not isinstance(e, EndT))
25772589
trap_if(e.shared.t != stream_or_future_t.t)
@@ -2618,6 +2630,14 @@ def canon_thread_index():
26182630
assert(thread.index is not None)
26192631
return [thread.index]
26202632

2633+
### 🧵 `canon thread.exit`
2634+
2635+
def canon_thread_exit():
2636+
thread = current_thread()
2637+
trap_if(not thread.task.inst.may_leave)
2638+
thread.exit()
2639+
assert(False)
2640+
26212641
### 🧵 `canon thread.new-indirect`
26222642

26232643
@dataclass
@@ -2639,22 +2659,21 @@ def thread_func():
26392659
task.register_thread(new_thread)
26402660
return [new_thread.index]
26412661

2642-
### 🧵 `canon thread.resume-later`
2662+
### 🧵 `canon thread.unsuspend`
26432663

2644-
def canon_thread_resume_later(i):
2664+
def canon_thread_unsuspend(i):
26452665
inst = current_instance()
26462666
trap_if(not inst.may_leave)
26472667
other_thread = inst.threads.get(i)
26482668
trap_if(not other_thread.suspended())
2649-
other_thread.resume_later()
2669+
other_thread.unsuspend()
26502670
return []
26512671

26522672
### 🧵 `canon thread.suspend`
26532673

26542674
def canon_thread_suspend(cancellable):
26552675
thread = current_thread()
26562676
trap_if(not thread.task.inst.may_leave)
2657-
trap_if(not thread.task.may_block())
26582677
cancelled = thread.suspend(cancellable)
26592678
return [cancelled]
26602679

@@ -2663,27 +2682,45 @@ def canon_thread_suspend(cancellable):
26632682
def canon_thread_yield(cancellable):
26642683
thread = current_thread()
26652684
trap_if(not thread.task.inst.may_leave)
2666-
cancelled = thread.yield_(cancellable)
2685+
cancelled = thread.wait_until(lambda: True, cancellable)
26672686
return [cancelled]
26682687

2669-
### 🧵 `canon thread.switch-to`
2688+
### 🧵 `canon thread.suspend-to-suspended`
26702689

2671-
def canon_thread_switch_to(cancellable, i):
2690+
def canon_thread_suspend_to_suspended(cancellable, i):
26722691
thread = current_thread()
26732692
trap_if(not thread.task.inst.may_leave)
26742693
other_thread = thread.task.inst.threads.get(i)
26752694
trap_if(not other_thread.suspended())
2676-
cancelled = thread.switch_to(cancellable, other_thread)
2695+
cancelled = thread.suspend_to_suspended(cancellable, other_thread)
26772696
return [cancelled]
26782697

2679-
### 🧵 `canon thread.yield-to`
2698+
### 🧵 `canon thread.yield-to-suspended`
26802699

2681-
def canon_thread_yield_to(cancellable, i):
2700+
def canon_thread_yield_to_suspended(cancellable, i):
26822701
thread = current_thread()
26832702
trap_if(not thread.task.inst.may_leave)
26842703
other_thread = thread.task.inst.threads.get(i)
26852704
trap_if(not other_thread.suspended())
2686-
cancelled = thread.yield_to(cancellable, other_thread)
2705+
cancelled = thread.yield_to_suspended(cancellable, other_thread)
2706+
return [cancelled]
2707+
2708+
### 🧵 `canon thread.suspend-then-promote`
2709+
2710+
def canon_thread_suspend_then_promote(cancellable, i):
2711+
thread = current_thread()
2712+
trap_if(not thread.task.inst.may_leave)
2713+
other_thread = thread.task.inst.threads.get(i)
2714+
cancelled = thread.suspend_then_promote(cancellable, other_thread)
2715+
return [cancelled]
2716+
2717+
### 🧵 `canon thread.yield-then-promote`
2718+
2719+
def canon_thread_yield_then_promote(cancellable, i):
2720+
thread = current_thread()
2721+
trap_if(not thread.task.inst.may_leave)
2722+
other_thread = thread.task.inst.threads.get(i)
2723+
cancelled = thread.yield_then_promote(cancellable, other_thread)
26872724
return [cancelled]
26882725

26892726
### 📝 `canon error-context.new`

0 commit comments

Comments
 (0)