Skip to content

Commit a57a280

Browse files
committed
CABI: improve and add cooperative thread built-ins
1 parent 978f15c commit a57a280

3 files changed

Lines changed: 406 additions & 55 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 88 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
class Trap(BaseException): pass
1818
class CoreWebAssemblyException(BaseException): pass
19+
class ThreadExit(BaseException): pass
1920

2021
def trap():
2122
raise Trap()
@@ -321,13 +322,15 @@ def waiting(self):
321322
return not self.running() and self.ready_func is not None
322323

323324
def ready(self):
324-
assert(self.waiting())
325-
return self.ready_func()
325+
return self.waiting() and self.ready_func()
326326

327327
def __init__(self, task, thread_func):
328328
def cont_func(cancelled):
329329
assert(self.running() and not cancelled)
330-
thread_func()
330+
try:
331+
thread_func()
332+
except ThreadExit:
333+
pass
331334
return None
332335
self.cont = cont_new(cont_func)
333336
self.ready_func = None
@@ -337,6 +340,10 @@ def cont_func(cancelled):
337340
self.storage = [0,0]
338341
assert(self.suspended())
339342

343+
def exit(self):
344+
assert(self.running())
345+
raise ThreadExit()
346+
340347
def resume_later(self):
341348
assert(self.suspended())
342349
self.ready_func = lambda: True
@@ -345,7 +352,8 @@ def resume_later(self):
345352

346353
def resume(self, cancelled = Cancelled.FALSE):
347354
assert(not self.running() and (self.cancellable or not cancelled))
348-
self.stop_waiting()
355+
if self.waiting():
356+
self.stop_waiting(cancelled)
349357
thread = self
350358
while thread is not None:
351359
cont = thread.cont
@@ -354,23 +362,23 @@ def resume(self, cancelled = Cancelled.FALSE):
354362
thread = switch_to
355363
cancelled = Cancelled.FALSE
356364

357-
def stop_waiting(self):
358-
if self.waiting():
359-
self.ready_func = None
360-
self.task.inst.store.waiting.remove(self)
365+
def stop_waiting(self, cancelled = Cancelled.FALSE):
366+
assert(self.waiting())
367+
assert(cancelled or self.ready_func())
368+
self.ready_func = None
369+
self.task.inst.store.waiting.remove(self)
361370

362371
def suspend(self, cancellable) -> Cancelled:
363-
assert(self.running() and self.task.may_block())
372+
assert(self.running())
364373
if self.task.deliver_pending_cancel(cancellable):
365-
self.stop_waiting()
366374
return Cancelled.TRUE
367375
self.cancellable = cancellable
368376
cancelled = block(switch_to = None)
369377
assert(self.running() and (cancellable or not cancelled))
370378
return cancelled
371379

372380
def wait_until(self, ready_func, cancellable = False) -> Cancelled:
373-
assert(self.running() and self.task.may_block())
381+
assert(self.running())
374382
if self.task.deliver_pending_cancel(cancellable):
375383
return Cancelled.TRUE
376384
if ready_func() and not DETERMINISTIC_PROFILE and random.randint(0,1):
@@ -379,32 +387,41 @@ def wait_until(self, ready_func, cancellable = False) -> Cancelled:
379387
self.task.inst.store.waiting.append(self)
380388
return self.suspend(cancellable)
381389

382-
def yield_until(self, ready_func, cancellable) -> Cancelled:
383-
assert(self.running())
384-
if self.task.may_block():
385-
return self.wait_until(ready_func, cancellable)
386-
else:
387-
assert(ready_func())
388-
return Cancelled.FALSE
389-
390390
def yield_(self, cancellable) -> Cancelled:
391-
return self.yield_until(lambda: True, cancellable)
391+
return self.wait_until(lambda: True, cancellable)
392392

393-
def switch_to(self, cancellable, other: Thread) -> Cancelled:
393+
def suspend_then_resume(self, cancellable, other: Thread) -> Cancelled:
394394
assert(self.running() and other.suspended())
395395
if self.task.deliver_pending_cancel(cancellable):
396-
self.stop_waiting()
397396
return Cancelled.TRUE
398397
self.cancellable = cancellable
399398
cancelled = block(switch_to = other)
400399
assert(self.running() and (cancellable or not cancelled))
401400
return cancelled
402401

403-
def yield_to(self, cancellable, other: Thread) -> Cancelled:
402+
def yield_then_resume(self, cancellable, other: Thread) -> Cancelled:
404403
assert(self.running() and other.suspended())
404+
if self.task.deliver_pending_cancel(cancellable):
405+
return Cancelled.TRUE
405406
self.ready_func = lambda: True
406407
self.task.inst.store.waiting.append(self)
407-
return self.switch_to(cancellable, other)
408+
return self.suspend_then_resume(cancellable, other)
409+
410+
def suspend_then_promote(self, cancellable, other: Thread) -> ResumeArg:
411+
assert(self.running())
412+
if other.ready():
413+
other.stop_waiting()
414+
return self.suspend_then_resume(cancellable, other)
415+
else:
416+
return self.suspend(cancellable)
417+
418+
def yield_then_promote(self, cancellable, other: Thread) -> ResumeArg:
419+
assert(self.running())
420+
if other.ready():
421+
other.stop_waiting()
422+
return self.yield_then_resume(cancellable, other)
423+
else:
424+
return self.yield_(cancellable)
408425

409426
### Tasks
410427

@@ -446,9 +463,6 @@ def needs_exclusive(self):
446463
assert(self.ft.async_)
447464
return not self.opts.async_ or self.opts.callback
448465

449-
def may_block(self):
450-
return self.ft.async_ or self.state == Task.State.RESOLVED
451-
452466
def enter_implicit_thread(self):
453467
assert(self.state == Task.State.INITIAL)
454468
self.implicit_thread = current_thread()
@@ -672,6 +686,11 @@ def remove(self, i):
672686
self.free.append(i)
673687
return e
674688

689+
def __iter__(self):
690+
for e in self.array:
691+
if e is not None:
692+
yield e
693+
675694
### Resource State
676695

677696
class ResourceHandle:
@@ -2114,13 +2133,12 @@ def thread_func():
21142133
inst.exclusive_thread = None
21152134
match code:
21162135
case CallbackCode.YIELD:
2117-
cancelled = thread.yield_until(lambda: not inst.exclusive_thread, cancellable = True)
2136+
cancelled = thread.wait_until(lambda: not inst.exclusive_thread, cancellable = True)
21182137
if cancelled:
21192138
event = (EventCode.TASK_CANCELLED, 0, 0)
21202139
else:
21212140
event = (EventCode.NONE, 0, 0)
21222141
case CallbackCode.WAIT:
2123-
trap_if(not task.may_block())
21242142
wset = inst.handles.get(si)
21252143
trap_if(not isinstance(wset, WaitableSet))
21262144
event = wset.wait_for_event_and(lambda: not inst.exclusive_thread, cancellable = True)
@@ -2137,6 +2155,11 @@ def thread_func():
21372155
task = Task(ft, opts, inst, on_start, on_resolve)
21382156
thread = Thread(task, thread_func)
21392157
thread.resume()
2158+
if not ft.async_:
2159+
while task.state != Task.State.RESOLVED:
2160+
candidates = { t for t in inst.threads if t.ready() and t is not inst.exclusive_thread }
2161+
trap_if(not candidates)
2162+
random.choice(list(candidates)).resume()
21402163
return task.request_cancellation
21412164

21422165
class CallbackCode(IntEnum):
@@ -2164,7 +2187,6 @@ def call_and_trap_on_throw(callee, args):
21642187
def canon_lower(callee, ft, opts, flat_args: list[CoreValType]) -> list[CoreValType]:
21652188
thread = current_thread()
21662189
trap_if(not thread.task.inst.may_leave)
2167-
trap_if(not thread.task.may_block() and ft.async_ and not opts.async_)
21682190

21692191
subtask = Subtask()
21702192
cx = LiftLowerContext(opts, thread.task.inst, subtask)
@@ -2345,13 +2367,12 @@ def canon_waitable_set_new():
23452367
### 🔀 `canon waitable-set.wait`
23462368

23472369
def canon_waitable_set_wait(cancellable, mem, si, ptr):
2348-
task = current_task()
2349-
trap_if(not task.inst.may_leave)
2350-
trap_if(not task.may_block())
2351-
wset = task.inst.handles.get(si)
2370+
inst = current_instance()
2371+
trap_if(not inst.may_leave)
2372+
wset = inst.handles.get(si)
23522373
trap_if(not isinstance(wset, WaitableSet))
23532374
event = wset.wait_for_event(cancellable)
2354-
return unpack_event(mem, task.inst, ptr, event)
2375+
return unpack_event(mem, inst, ptr, event)
23552376

23562377
def unpack_event(mem, inst, ptr, e: EventTuple):
23572378
event, p1, p2 = e
@@ -2402,7 +2423,6 @@ def canon_waitable_join(wi, si):
24022423
def canon_subtask_cancel(async_, i):
24032424
thread = current_thread()
24042425
trap_if(not thread.task.inst.may_leave)
2405-
trap_if(not thread.task.may_block() and not async_)
24062426
subtask = thread.task.inst.handles.get(i)
24072427
trap_if(not isinstance(subtask, Subtask))
24082428
trap_if(subtask.resolve_delivered())
@@ -2463,7 +2483,6 @@ def canon_stream_write(stream_t, opts, i, ptr, n):
24632483
def stream_copy(EndT, BufferT, event_code, stream_t, opts, i, ptr, n):
24642484
thread = current_thread()
24652485
trap_if(not thread.task.inst.may_leave)
2466-
trap_if(not thread.task.may_block() and not opts.async_)
24672486

24682487
e = thread.task.inst.handles.get(i)
24692488
trap_if(not isinstance(e, EndT))
@@ -2518,7 +2537,6 @@ def canon_future_write(future_t, opts, i, ptr):
25182537
def future_copy(EndT, BufferT, event_code, future_t, opts, i, ptr):
25192538
thread = current_thread()
25202539
trap_if(not thread.task.inst.may_leave)
2521-
trap_if(not thread.task.may_block() and not opts.async_)
25222540

25232541
e = thread.task.inst.handles.get(i)
25242542
trap_if(not isinstance(e, EndT))
@@ -2571,7 +2589,6 @@ def canon_future_cancel_write(future_t, async_, i):
25712589
def cancel_copy(EndT, event_code, stream_or_future_t, async_, i):
25722590
thread = current_thread()
25732591
trap_if(not thread.task.inst.may_leave)
2574-
trap_if(not thread.task.may_block() and not async_)
25752592
e = thread.task.inst.handles.get(i)
25762593
trap_if(not isinstance(e, EndT))
25772594
trap_if(e.shared.t != stream_or_future_t.t)
@@ -2618,6 +2635,14 @@ def canon_thread_index():
26182635
assert(thread.index is not None)
26192636
return [thread.index]
26202637

2638+
### 🧵 `canon thread.exit`
2639+
2640+
def canon_thread_exit():
2641+
thread = current_thread()
2642+
trap_if(not thread.task.inst.may_leave)
2643+
thread.exit()
2644+
assert(False)
2645+
26212646
### 🧵 `canon thread.new-indirect`
26222647

26232648
@dataclass
@@ -2654,7 +2679,6 @@ def canon_thread_resume_later(i):
26542679
def canon_thread_suspend(cancellable):
26552680
thread = current_thread()
26562681
trap_if(not thread.task.inst.may_leave)
2657-
trap_if(not thread.task.may_block())
26582682
cancelled = thread.suspend(cancellable)
26592683
return [cancelled]
26602684

@@ -2666,24 +2690,42 @@ def canon_thread_yield(cancellable):
26662690
cancelled = thread.yield_(cancellable)
26672691
return [cancelled]
26682692

2669-
### 🧵 `canon thread.switch-to`
2693+
### 🧵 `canon thread.suspend-then-resume`
26702694

2671-
def canon_thread_switch_to(cancellable, i):
2695+
def canon_thread_suspend_then_resume(cancellable, i):
26722696
thread = current_thread()
26732697
trap_if(not thread.task.inst.may_leave)
26742698
other_thread = thread.task.inst.threads.get(i)
26752699
trap_if(not other_thread.suspended())
2676-
cancelled = thread.switch_to(cancellable, other_thread)
2700+
cancelled = thread.suspend_then_resume(cancellable, other_thread)
26772701
return [cancelled]
26782702

2679-
### 🧵 `canon thread.yield-to`
2703+
### 🧵 `canon thread.yield-then-resume`
26802704

2681-
def canon_thread_yield_to(cancellable, i):
2705+
def canon_thread_yield_then_resume(cancellable, i):
26822706
thread = current_thread()
26832707
trap_if(not thread.task.inst.may_leave)
26842708
other_thread = thread.task.inst.threads.get(i)
26852709
trap_if(not other_thread.suspended())
2686-
cancelled = thread.yield_to(cancellable, other_thread)
2710+
cancelled = thread.yield_then_resume(cancellable, other_thread)
2711+
return [cancelled]
2712+
2713+
### 🧵 `canon thread.suspend-then-promote`
2714+
2715+
def canon_thread_suspend_then_promote(cancellable, i):
2716+
thread = current_thread()
2717+
trap_if(not thread.task.inst.may_leave)
2718+
other_thread = thread.task.inst.threads.get(i)
2719+
cancelled = thread.suspend_then_promote(cancellable, other_thread)
2720+
return [cancelled]
2721+
2722+
### 🧵 `canon thread.yield-then-promote`
2723+
2724+
def canon_thread_yield_then_promote(cancellable, i):
2725+
thread = current_thread()
2726+
trap_if(not thread.task.inst.may_leave)
2727+
other_thread = thread.task.inst.threads.get(i)
2728+
cancelled = thread.yield_then_promote(cancellable, other_thread)
26872729
return [cancelled]
26882730

26892731
### 📝 `canon error-context.new`

0 commit comments

Comments
 (0)