Skip to content

Commit af45707

Browse files
committed
CABI: improve and add cooperative thread built-ins
1 parent 95df54f commit af45707

3 files changed

Lines changed: 406 additions & 55 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 88 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
class Trap(BaseException): pass
1818
class CoreWebAssemblyException(BaseException): pass
19+
class ThreadExit(BaseException): pass
1920

2021
def trap():
2122
raise Trap()
@@ -332,13 +333,15 @@ def waiting(self):
332333
return not self.running() and self.ready_func is not None
333334

334335
def ready(self):
335-
assert(self.waiting())
336-
return self.ready_func()
336+
return self.waiting() and self.ready_func()
337337

338338
def __init__(self, task, thread_func):
339339
def cont_func(cancelled):
340340
assert(self.running() and not cancelled)
341-
thread_func()
341+
try:
342+
thread_func()
343+
except ThreadExit:
344+
pass
342345
return None
343346
self.cont = cont_new(cont_func)
344347
self.ready_func = None
@@ -348,6 +351,10 @@ def cont_func(cancelled):
348351
self.storage = [0,0]
349352
assert(self.suspended())
350353

354+
def exit(self):
355+
assert(self.running())
356+
raise ThreadExit()
357+
351358
def resume_later(self):
352359
assert(self.suspended())
353360
self.ready_func = lambda: True
@@ -356,7 +363,8 @@ def resume_later(self):
356363

357364
def resume(self, cancelled = Cancelled.FALSE):
358365
assert(not self.running() and (self.cancellable or not cancelled))
359-
self.stop_waiting()
366+
if self.waiting():
367+
self.stop_waiting(cancelled)
360368
thread = self
361369
while thread is not None:
362370
cont = thread.cont
@@ -365,23 +373,23 @@ def resume(self, cancelled = Cancelled.FALSE):
365373
thread = switch_to
366374
cancelled = Cancelled.FALSE
367375

368-
def stop_waiting(self):
369-
if self.waiting():
370-
self.ready_func = None
371-
self.task.inst.store.waiting.remove(self)
376+
def stop_waiting(self, cancelled = Cancelled.FALSE):
377+
assert(self.waiting())
378+
assert(cancelled or self.ready_func())
379+
self.ready_func = None
380+
self.task.inst.store.waiting.remove(self)
372381

373382
def suspend(self, cancellable) -> Cancelled:
374-
assert(self.running() and self.task.may_block())
383+
assert(self.running())
375384
if self.task.deliver_pending_cancel(cancellable):
376-
self.stop_waiting()
377385
return Cancelled.TRUE
378386
self.cancellable = cancellable
379387
cancelled = block(switch_to = None)
380388
assert(self.running() and (cancellable or not cancelled))
381389
return cancelled
382390

383391
def wait_until(self, ready_func, cancellable = False) -> Cancelled:
384-
assert(self.running() and self.task.may_block())
392+
assert(self.running())
385393
if self.task.deliver_pending_cancel(cancellable):
386394
return Cancelled.TRUE
387395
if ready_func() and not DETERMINISTIC_PROFILE and random.randint(0,1):
@@ -390,32 +398,41 @@ def wait_until(self, ready_func, cancellable = False) -> Cancelled:
390398
self.task.inst.store.waiting.append(self)
391399
return self.suspend(cancellable)
392400

393-
def yield_until(self, ready_func, cancellable) -> Cancelled:
394-
assert(self.running())
395-
if self.task.may_block():
396-
return self.wait_until(ready_func, cancellable)
397-
else:
398-
assert(ready_func())
399-
return Cancelled.FALSE
400-
401401
def yield_(self, cancellable) -> Cancelled:
402-
return self.yield_until(lambda: True, cancellable)
402+
return self.wait_until(lambda: True, cancellable)
403403

404-
def switch_to(self, cancellable, other: Thread) -> Cancelled:
404+
def suspend_then_resume(self, cancellable, other: Thread) -> Cancelled:
405405
assert(self.running() and other.suspended())
406406
if self.task.deliver_pending_cancel(cancellable):
407-
self.stop_waiting()
408407
return Cancelled.TRUE
409408
self.cancellable = cancellable
410409
cancelled = block(switch_to = other)
411410
assert(self.running() and (cancellable or not cancelled))
412411
return cancelled
413412

414-
def yield_to(self, cancellable, other: Thread) -> Cancelled:
413+
def yield_then_resume(self, cancellable, other: Thread) -> Cancelled:
415414
assert(self.running() and other.suspended())
415+
if self.task.deliver_pending_cancel(cancellable):
416+
return Cancelled.TRUE
416417
self.ready_func = lambda: True
417418
self.task.inst.store.waiting.append(self)
418-
return self.switch_to(cancellable, other)
419+
return self.suspend_then_resume(cancellable, other)
420+
421+
def suspend_then_promote(self, cancellable, other: Thread) -> ResumeArg:
422+
assert(self.running())
423+
if other.ready():
424+
other.stop_waiting()
425+
return self.suspend_then_resume(cancellable, other)
426+
else:
427+
return self.suspend(cancellable)
428+
429+
def yield_then_promote(self, cancellable, other: Thread) -> ResumeArg:
430+
assert(self.running())
431+
if other.ready():
432+
other.stop_waiting()
433+
return self.yield_then_resume(cancellable, other)
434+
else:
435+
return self.yield_(cancellable)
419436

420437
### Tasks
421438

@@ -459,9 +476,6 @@ def needs_exclusive(self):
459476
assert(self.ft.async_)
460477
return not self.opts.async_ or self.opts.callback
461478

462-
def may_block(self):
463-
return self.ft.async_ or self.state == Task.State.RESOLVED
464-
465479
def enter_implicit_thread(self):
466480
assert(self.state == Task.State.INITIAL)
467481
self.implicit_thread = current_thread()
@@ -691,6 +705,11 @@ def remove(self, i):
691705
self.free.append(i)
692706
return e
693707

708+
def __iter__(self):
709+
for e in self.array:
710+
if e is not None:
711+
yield e
712+
694713
### Resource State
695714

696715
class ResourceHandle:
@@ -2142,13 +2161,12 @@ def thread_func():
21422161
inst.exclusive_thread = None
21432162
match code:
21442163
case CallbackCode.YIELD:
2145-
cancelled = thread.yield_until(lambda: not inst.exclusive_thread, cancellable = True)
2164+
cancelled = thread.wait_until(lambda: not inst.exclusive_thread, cancellable = True)
21462165
if cancelled:
21472166
event = (EventCode.TASK_CANCELLED, 0, 0)
21482167
else:
21492168
event = (EventCode.NONE, 0, 0)
21502169
case CallbackCode.WAIT:
2151-
trap_if(not task.may_block())
21522170
wset = inst.handles.get(si)
21532171
trap_if(not isinstance(wset, WaitableSet))
21542172
event = wset.wait_for_event_and(lambda: not inst.exclusive_thread, cancellable = True)
@@ -2165,6 +2183,11 @@ def thread_func():
21652183
task = Task(ft, opts, inst, on_start, on_resolve, caller)
21662184
thread = Thread(task, thread_func)
21672185
thread.resume()
2186+
if not ft.async_:
2187+
while task.state != Task.State.RESOLVED:
2188+
candidates = { t for t in inst.threads if t.ready() and t is not inst.exclusive_thread }
2189+
trap_if(not candidates)
2190+
random.choice(list(candidates)).resume()
21682191
return task.request_cancellation
21692192

21702193
class CallbackCode(IntEnum):
@@ -2192,7 +2215,6 @@ def call_and_trap_on_throw(callee, args):
21922215
def canon_lower(callee, ft, opts, flat_args: list[CoreValType]) -> list[CoreValType]:
21932216
thread = current_thread()
21942217
trap_if(not thread.task.inst.may_leave)
2195-
trap_if(not thread.task.may_block() and ft.async_ and not opts.async_)
21962218

21972219
subtask = Subtask()
21982220
cx = LiftLowerContext(opts, thread.task.inst, subtask)
@@ -2372,13 +2394,12 @@ def canon_waitable_set_new():
23722394
### 🔀 `canon waitable-set.wait`
23732395

23742396
def canon_waitable_set_wait(cancellable, mem, si, ptr):
2375-
task = current_task()
2376-
trap_if(not task.inst.may_leave)
2377-
trap_if(not task.may_block())
2378-
wset = task.inst.handles.get(si)
2397+
inst = current_instance()
2398+
trap_if(not inst.may_leave)
2399+
wset = inst.handles.get(si)
23792400
trap_if(not isinstance(wset, WaitableSet))
23802401
event = wset.wait_for_event(cancellable)
2381-
return unpack_event(mem, task.inst, ptr, event)
2402+
return unpack_event(mem, inst, ptr, event)
23822403

23832404
def unpack_event(mem, inst, ptr, e: EventTuple):
23842405
event, p1, p2 = e
@@ -2430,7 +2451,6 @@ def canon_waitable_join(wi, si):
24302451
def canon_subtask_cancel(async_, i):
24312452
thread = current_thread()
24322453
trap_if(not thread.task.inst.may_leave)
2433-
trap_if(not thread.task.may_block() and not async_)
24342454
subtask = thread.task.inst.handles.get(i)
24352455
trap_if(not isinstance(subtask, Subtask))
24362456
trap_if(subtask.resolve_delivered())
@@ -2492,7 +2512,6 @@ def canon_stream_write(stream_t, opts, i, ptr, n):
24922512
def stream_copy(EndT, BufferT, event_code, stream_t, opts, i, ptr, n):
24932513
thread = current_thread()
24942514
trap_if(not thread.task.inst.may_leave)
2495-
trap_if(not thread.task.may_block() and not opts.async_)
24962515

24972516
e = thread.task.inst.handles.get(i)
24982517
trap_if(not isinstance(e, EndT))
@@ -2548,7 +2567,6 @@ def canon_future_write(future_t, opts, i, ptr):
25482567
def future_copy(EndT, BufferT, event_code, future_t, opts, i, ptr):
25492568
thread = current_thread()
25502569
trap_if(not thread.task.inst.may_leave)
2551-
trap_if(not thread.task.may_block() and not opts.async_)
25522570

25532571
e = thread.task.inst.handles.get(i)
25542572
trap_if(not isinstance(e, EndT))
@@ -2602,7 +2620,6 @@ def canon_future_cancel_write(future_t, async_, i):
26022620
def cancel_copy(EndT, event_code, stream_or_future_t, async_, i):
26032621
thread = current_thread()
26042622
trap_if(not thread.task.inst.may_leave)
2605-
trap_if(not thread.task.may_block() and not async_)
26062623
e = thread.task.inst.handles.get(i)
26072624
trap_if(not isinstance(e, EndT))
26082625
trap_if(e.shared.t != stream_or_future_t.t)
@@ -2650,6 +2667,14 @@ def canon_thread_index():
26502667
assert(thread.index is not None)
26512668
return [thread.index]
26522669

2670+
### 🧵 `canon thread.exit`
2671+
2672+
def canon_thread_exit():
2673+
thread = current_thread()
2674+
trap_if(not thread.task.inst.may_leave)
2675+
thread.exit()
2676+
assert(False)
2677+
26532678
### 🧵 `canon thread.new-indirect`
26542679

26552680
@dataclass
@@ -2686,7 +2711,6 @@ def canon_thread_resume_later(i):
26862711
def canon_thread_suspend(cancellable):
26872712
thread = current_thread()
26882713
trap_if(not thread.task.inst.may_leave)
2689-
trap_if(not thread.task.may_block())
26902714
cancelled = thread.suspend(cancellable)
26912715
return [cancelled]
26922716

@@ -2698,24 +2722,42 @@ def canon_thread_yield(cancellable):
26982722
cancelled = thread.yield_(cancellable)
26992723
return [cancelled]
27002724

2701-
### 🧵 `canon thread.switch-to`
2725+
### 🧵 `canon thread.suspend-then-resume`
27022726

2703-
def canon_thread_switch_to(cancellable, i):
2727+
def canon_thread_suspend_then_resume(cancellable, i):
27042728
thread = current_thread()
27052729
trap_if(not thread.task.inst.may_leave)
27062730
other_thread = thread.task.inst.threads.get(i)
27072731
trap_if(not other_thread.suspended())
2708-
cancelled = thread.switch_to(cancellable, other_thread)
2732+
cancelled = thread.suspend_then_resume(cancellable, other_thread)
27092733
return [cancelled]
27102734

2711-
### 🧵 `canon thread.yield-to`
2735+
### 🧵 `canon thread.yield-then-resume`
27122736

2713-
def canon_thread_yield_to(cancellable, i):
2737+
def canon_thread_yield_then_resume(cancellable, i):
27142738
thread = current_thread()
27152739
trap_if(not thread.task.inst.may_leave)
27162740
other_thread = thread.task.inst.threads.get(i)
27172741
trap_if(not other_thread.suspended())
2718-
cancelled = thread.yield_to(cancellable, other_thread)
2742+
cancelled = thread.yield_then_resume(cancellable, other_thread)
2743+
return [cancelled]
2744+
2745+
### 🧵 `canon thread.suspend-then-promote`
2746+
2747+
def canon_thread_suspend_then_promote(cancellable, i):
2748+
thread = current_thread()
2749+
trap_if(not thread.task.inst.may_leave)
2750+
other_thread = thread.task.inst.threads.get(i)
2751+
cancelled = thread.suspend_then_promote(cancellable, other_thread)
2752+
return [cancelled]
2753+
2754+
### 🧵 `canon thread.yield-then-promote`
2755+
2756+
def canon_thread_yield_then_promote(cancellable, i):
2757+
thread = current_thread()
2758+
trap_if(not thread.task.inst.may_leave)
2759+
other_thread = thread.task.inst.threads.get(i)
2760+
cancelled = thread.yield_then_promote(cancellable, other_thread)
27192761
return [cancelled]
27202762

27212763
### 📝 `canon error-context.new`

0 commit comments

Comments
 (0)