Skip to content

Commit 4489c8c

Browse files
committed
Add cooperative threads
1 parent b5ace5f commit 4489c8c

1 file changed

Lines changed: 32 additions & 22 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -358,22 +358,6 @@ def write(self, vs):
358358
assert(all(v == () for v in vs))
359359
self.progress += len(vs)
360360

361-
#### Context-Local Storage
362-
363-
class ContextLocalStorage:
364-
LENGTH = 1
365-
array: list[int]
366-
367-
def __init__(self):
368-
self.array = [0] * ContextLocalStorage.LENGTH
369-
370-
def set(self, i, v):
371-
assert(types_match_values(['i32'], [v]))
372-
self.array[i] = v
373-
374-
def get(self, i):
375-
return self.array[i]
376-
377361
#### Waitable State
378362

379363
class EventCode(IntEnum):
@@ -475,7 +459,6 @@ class State(Enum):
475459
supertask: Optional[Task]
476460
on_resolve: Callable[[Optional[list[any]]], None]
477461
num_borrows: int
478-
context: ContextLocalStorage
479462

480463
def __init__(self, opts, inst, ft, supertask, on_resolve):
481464
self.state = Task.State.INITIAL
@@ -485,7 +468,6 @@ def __init__(self, opts, inst, ft, supertask, on_resolve):
485468
self.supertask = supertask
486469
self.on_resolve = on_resolve
487470
self.num_borrows = 0
488-
self.context = ContextLocalStorage()
489471

490472
async def enter(self, thread):
491473
self.trap_if_on_the_stack(self.inst)
@@ -889,12 +871,16 @@ def drop(self):
889871

890872
class Thread:
891873
task: Task
874+
context: list[int]
892875
awaitable: Optional[Awaitable]
893876
on_resume: Optional[asyncio.Future]
894877
on_suspend_or_exit: Optional[asyncio.Future]
895878

879+
CONTEXT_LENGTH = 1
880+
896881
def __init__(self, task, coro):
897882
self.task = task
883+
self.context = [0] * Thread.CONTEXT_LENGTH
898884
self.awaitable = asyncio.Future()
899885
self.on_resume = asyncio.Future()
900886
self.on_suspend_or_exit = None
@@ -930,6 +916,18 @@ async def suspend(self, awaitable) -> Cancelled:
930916
self.on_resume = None
931917
return cancelled
932918

919+
async def switch(self, other: Thread) -> Cancelled:
920+
assert(not self.awaitable and not other.awaitable)
921+
assert(self.on_suspend_or_exit and not other.on_suspend_or_exit)
922+
other.on_suspend_or_exit = self.on_suspend_or_exit
923+
self.on_suspend_or_exit = None
924+
other.on_resume.set_result(Cancelled.FALSE)
925+
assert(not self.on_resume)
926+
self.on_resume = asyncio.Future()
927+
cancelled = await self.on_resume
928+
self.on_resume = None
929+
return cancelled
930+
933931
#### Store State / Embedding API
934932

935933
class Store:
@@ -2108,19 +2106,31 @@ async def canon_resource_rep(rt, thread, i):
21082106
trap_if(h.rt is not rt)
21092107
return [h.rep]
21102108

2109+
### 🧵 `canon thread.new_indirect`
2110+
2111+
async def canon_thread_new_indirect(shared, ft, ftbl, thread, i, c):
2112+
inst = thread.task.inst
2113+
trap_if(not inst.may_leave)
2114+
assert(not shared)
2115+
f = ftbl.get(i)
2116+
trap_if(f is None)
2117+
trap_if(f.type != ft)
2118+
new_thread = Thread(thread.task, f(c))
2119+
return [inst.table.add(thread)]
2120+
21112121
### 🔀 `canon context.get`
21122122

21132123
async def canon_context_get(t, i, thread):
21142124
assert(t == 'i32')
2115-
assert(i < ContextLocalStorage.LENGTH)
2116-
return [thread.task.context.get(i)]
2125+
assert(i < Thread.CONTEXT_LENGTH)
2126+
return [thread.context[i]]
21172127

21182128
### 🔀 `canon context.set`
21192129

21202130
async def canon_context_set(t, i, thread, v):
21212131
assert(t == 'i32')
2122-
assert(i < ContextLocalStorage.LENGTH)
2123-
thread.task.context.set(i, v)
2132+
assert(i < Thread.CONTEXT_LENGTH)
2133+
thread.context[i] = v
21242134
return []
21252135

21262136
### 🔀 `canon backpressure.set`

0 commit comments

Comments
 (0)