Skip to content

Commit 747f970

Browse files
committed
use enum types to represent status and event codes
Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 491df5c commit 747f970

File tree

2 files changed

+70
-80
lines changed

2 files changed

+70
-80
lines changed

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 56 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,27 @@ mod table;
6161
// currently haphazard. We need a refactor to manage yielding, backpressure, and event polling and delivery in a
6262
// more unified and structured way.
6363

64-
// TODO: move these into an enum:
65-
const STATUS_STARTING: u32 = 0;
66-
const STATUS_STARTED: u32 = 1;
67-
const STATUS_RETURNED: u32 = 2;
68-
const STATUS_DONE: u32 = 3;
69-
70-
mod events {
71-
// TODO: move these into an enum:
72-
pub const _EVENT_CALL_STARTING: u32 = 0;
73-
pub const EVENT_CALL_STARTED: u32 = 1;
74-
pub const EVENT_CALL_RETURNED: u32 = 2;
75-
pub const EVENT_CALL_DONE: u32 = 3;
76-
pub const _EVENT_YIELDED: u32 = 4;
77-
pub const EVENT_STREAM_READ: u32 = 5;
78-
pub const EVENT_STREAM_WRITE: u32 = 6;
79-
pub const EVENT_FUTURE_READ: u32 = 7;
80-
pub const EVENT_FUTURE_WRITE: u32 = 8;
64+
#[derive(Clone, Copy, Eq, PartialEq)]
65+
#[repr(u32)]
66+
enum Status {
67+
Starting,
68+
Started,
69+
Returned,
70+
Done,
71+
}
72+
73+
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
74+
#[repr(u32)]
75+
enum Event {
76+
_Starting,
77+
Started,
78+
Returned,
79+
Done,
80+
_Yielded,
81+
StreamRead,
82+
StreamWrite,
83+
FutureRead,
84+
FutureWrite,
8185
}
8286

8387
const EXIT_FLAG_ASYNC_CALLER: u32 = 1 << 0;
@@ -149,7 +153,7 @@ impl<T: 'static> PromisesUnordered<T> {
149153
}
150154

151155
struct HostTaskResult {
152-
event: u32,
156+
event: Event,
153157
param: u32,
154158
caller: TableId<GuestTask>,
155159
}
@@ -214,7 +218,7 @@ struct GuestTask {
214218
lift_result: Option<(RawLift, TypeTaskReturnIndex)>,
215219
result: Option<LiftedResult>,
216220
callback: Option<Callback>,
217-
events: VecDeque<(u32, AnyTask, u32)>,
221+
events: VecDeque<(Event, AnyTask, u32)>,
218222
caller: Caller,
219223
deferred: Deferred,
220224
should_yield: bool,
@@ -264,9 +268,7 @@ impl AnyTask {
264268
.get(*task)?
265269
.events
266270
.iter()
267-
.filter_map(|(event, call, _)| {
268-
(*event == events::EVENT_CALL_DONE).then_some(*call)
269-
})
271+
.filter_map(|(event, call, _)| (*event == Event::Done).then_some(*call))
270272
.collect::<Vec<_>>();
271273

272274
for call in finished {
@@ -535,7 +537,7 @@ pub(crate) fn first_poll<T, R: Send + 'static>(
535537
let result = fun(store.as_context_mut())?;
536538
lower(store, result)?;
537539
Ok(HostTaskResult {
538-
event: events::EVENT_CALL_DONE,
540+
event: Event::Done,
539541
param: 0u32,
540542
caller,
541543
})
@@ -606,7 +608,7 @@ pub(crate) fn poll_and_block<'a, T, R: Send + Sync + 'static>(
606608
store.concurrent_state().table.get_mut(caller)?.result =
607609
Some(Box::new(result) as _);
608610
Ok(HostTaskResult {
609-
event: events::EVENT_CALL_DONE,
611+
event: Event::Done,
610612
param: 0u32,
611613
caller,
612614
})
@@ -698,7 +700,7 @@ pub(crate) async fn on_fiber<'a, R: Send + Sync + 'static, T: Send>(
698700
fn maybe_send_event<'a, T>(
699701
mut store: StoreContextMut<'a, T>,
700702
guest_task: TableId<GuestTask>,
701-
event: u32,
703+
event: Event,
702704
call: AnyTask,
703705
result: u32,
704706
) -> Result<StoreContextMut<'a, T>> {
@@ -718,15 +720,15 @@ fn maybe_send_event<'a, T>(
718720
bail!("handle not found for waitable rep {}", call.rep());
719721
};
720722
log::trace!(
721-
"use callback to deliver event {event} to {} for {} (handle {handle}): {:?} {}",
723+
"use callback to deliver event {event:?} to {} for {} (handle {handle}): {:?} {}",
722724
guest_task.rep(),
723725
call.rep(),
724726
callback.function,
725727
callback.context
726728
);
727729
let params = &mut [
728730
ValRaw::u32(callback.context),
729-
ValRaw::u32(event),
731+
ValRaw::u32(event as u32),
730732
ValRaw::u32(handle),
731733
ValRaw::u32(result),
732734
];
@@ -741,13 +743,8 @@ fn maybe_send_event<'a, T>(
741743
match &store.concurrent_state().table.get(guest_task)?.caller {
742744
Caller::Guest { task, .. } => {
743745
let task = *task;
744-
store = maybe_send_event(
745-
store,
746-
task,
747-
events::EVENT_CALL_DONE,
748-
AnyTask::Guest(guest_task),
749-
0,
750-
)?;
746+
store =
747+
maybe_send_event(store, task, Event::Done, AnyTask::Guest(guest_task), 0)?;
751748
}
752749
Caller::Host(_) => {
753750
log::trace!("maybe_send_event will delete {}", call.rep());
@@ -765,7 +762,7 @@ fn maybe_send_event<'a, T>(
765762
.events
766763
.push_back((event, call, result));
767764

768-
let resumed = if event == events::EVENT_CALL_DONE {
765+
let resumed = if event == Event::Done {
769766
if let Some(fiber) = store
770767
.concurrent_state()
771768
.table
@@ -774,7 +771,7 @@ fn maybe_send_event<'a, T>(
774771
.take_fiber()
775772
{
776773
log::trace!(
777-
"use fiber to deliver event {event} to {} for {}",
774+
"use fiber to deliver event {event:?} to {} for {}",
778775
guest_task.rep(),
779776
call.rep()
780777
);
@@ -791,7 +788,7 @@ fn maybe_send_event<'a, T>(
791788

792789
if !resumed {
793790
log::trace!(
794-
"queue event {event} to {} for {}",
791+
"queue event {event:?} to {} for {}",
795792
guest_task.rep(),
796793
call.rep()
797794
);
@@ -819,7 +816,7 @@ fn resume_stackful<'a, T>(
819816
.with_context(|| format!("bad handle: {}", guest_task.rep()))?
820817
.events,
821818
) {
822-
if event == events::EVENT_CALL_DONE {
819+
if event == Event::Done {
823820
log::trace!("resume_stackful will delete call {}", call.rep());
824821
call.delete_all_from(store.as_context_mut())?;
825822
}
@@ -832,13 +829,7 @@ fn resume_stackful<'a, T>(
832829
}
833830
Caller::Guest { task, .. } => {
834831
let task = *task;
835-
maybe_send_event(
836-
store,
837-
task,
838-
events::EVENT_CALL_DONE,
839-
AnyTask::Guest(guest_task),
840-
0,
841-
)
832+
maybe_send_event(store, task, Event::Done, AnyTask::Guest(guest_task), 0)
842833
}
843834
}
844835
} else {
@@ -867,11 +858,11 @@ fn resume_stackless<'a, T>(
867858

868859
let task = store.concurrent_state().table.get_mut(guest_task)?;
869860
let event = if task.lift_result.is_some() {
870-
events::EVENT_CALL_STARTED
861+
Event::Started
871862
} else if guest_context != 0 {
872-
events::EVENT_CALL_RETURNED
863+
Event::Returned
873864
} else {
874-
events::EVENT_CALL_DONE
865+
Event::Done
875866
};
876867
if guest_context != 0 {
877868
log::trace!("set callback for {}", guest_task.rep());
@@ -915,11 +906,10 @@ fn handle_ready<'a, T>(
915906
let result = fun(vm_store)?;
916907
store = unsafe { StoreContextMut::<T>(&mut *vm_store.cast()) };
917908
let task = match result.event {
918-
events::EVENT_CALL_DONE => AnyTask::Host(TableId::<HostTask>::new(task)),
919-
events::EVENT_STREAM_READ
920-
| events::EVENT_FUTURE_READ
921-
| events::EVENT_STREAM_WRITE
922-
| events::EVENT_FUTURE_WRITE => AnyTask::Transmit(TableId::<TransmitState>::new(task)),
909+
Event::Done => AnyTask::Host(TableId::<HostTask>::new(task)),
910+
Event::StreamRead | Event::FutureRead | Event::StreamWrite | Event::FutureWrite => {
911+
AnyTask::Transmit(TableId::<TransmitState>::new(task))
912+
}
923913
_ => unreachable!(),
924914
};
925915
store = maybe_send_event(store, result.caller, result.event, task, result.param)?;
@@ -1306,7 +1296,7 @@ unsafe fn task_check<T>(cx: *mut VMOpaqueContext, async_: bool, check: TaskCheck
13061296
.ok_or_else(|| anyhow!("no tasks to wait for"))?;
13071297

13081298
log::trace!(
1309-
"deliver event {event} via task.wait to {} for {}",
1299+
"deliver event {event:?} via task.wait to {} for {}",
13101300
guest_task.rep(),
13111301
call.rep()
13121302
);
@@ -1332,7 +1322,7 @@ unsafe fn task_check<T>(cx: *mut VMOpaqueContext, async_: bool, check: TaskCheck
13321322
handle.store(&mut lower, InterfaceType::U32, ptr)?;
13331323
result.store(&mut lower, InterfaceType::U32, ptr + 4)?;
13341324

1335-
Ok(event)
1325+
Ok(event as u32)
13361326
}
13371327
TaskCheck::Poll(memory, payload, caller_instance) => {
13381328
if let Some((event, call, result)) = cx
@@ -1343,7 +1333,7 @@ unsafe fn task_check<T>(cx: *mut VMOpaqueContext, async_: bool, check: TaskCheck
13431333
.pop_front()
13441334
{
13451335
log::trace!(
1346-
"deliver event {event} via task.poll to {} for {}",
1336+
"deliver event {event:?} via task.poll to {} for {}",
13471337
guest_task.rep(),
13481338
call.rep()
13491339
);
@@ -1368,7 +1358,7 @@ unsafe fn task_check<T>(cx: *mut VMOpaqueContext, async_: bool, check: TaskCheck
13681358
&ValRaw::u32(payload),
13691359
)?;
13701360
let mut lower = LowerContext::new(cx, &options, types, instance);
1371-
event.store(&mut lower, InterfaceType::U32, ptr)?;
1361+
(event as u32).store(&mut lower, InterfaceType::U32, ptr)?;
13721362
handle.store(&mut lower, InterfaceType::U32, ptr + 4)?;
13731363
result.store(&mut lower, InterfaceType::U32, ptr + 8)?;
13741364

@@ -1601,7 +1591,7 @@ pub(crate) extern "C" fn async_enter<T>(
16011591
maybe_send_event(
16021592
cx,
16031593
TableId::new(rep),
1604-
events::EVENT_CALL_STARTED,
1594+
Event::Started,
16051595
AnyTask::Guest(task),
16061596
0,
16071597
)?;
@@ -1623,7 +1613,7 @@ pub(crate) extern "C" fn async_enter<T>(
16231613
maybe_send_event(
16241614
cx,
16251615
TableId::new(rep),
1626-
events::EVENT_CALL_RETURNED,
1616+
Event::Returned,
16271617
AnyTask::Guest(task),
16281618
0,
16291619
)?;
@@ -1902,29 +1892,29 @@ pub(crate) extern "C" fn async_exit<T>(
19021892
let task = cx.concurrent_state().table.get(guest_task)?;
19031893

19041894
let mut status = if task.lower_params.is_some() {
1905-
STATUS_STARTING
1895+
Status::Starting
19061896
} else if task.lift_result.is_some() {
1907-
STATUS_STARTED
1897+
Status::Started
19081898
} else if guest_context != 0 || callback.is_null() {
1909-
STATUS_RETURNED
1899+
Status::Returned
19101900
} else {
1911-
STATUS_DONE
1901+
Status::Done
19121902
};
19131903

1914-
let call = if status != STATUS_DONE {
1904+
let call = if status != Status::Done {
19151905
if (flags & EXIT_FLAG_ASYNC_CALLER) != 0 {
19161906
(*instance).component_waitable_tables()[caller_instance]
19171907
.insert(guest_task.rep(), WaitableState::Task)?
19181908
} else {
19191909
poll_for_result(cx)?;
1920-
status = STATUS_DONE;
1910+
status = Status::Done;
19211911
0
19221912
}
19231913
} else {
19241914
0
19251915
};
19261916

1927-
Ok((status << 30) | call)
1917+
Ok(((status as u32) << 30) | call)
19281918
})
19291919
}
19301920
}

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
super::{
3-
call_host_and_handle_result, events, table::TableId, GuestTask, HostTaskFuture,
3+
call_host_and_handle_result, table::TableId, Event, GuestTask, HostTaskFuture,
44
HostTaskResult, Promise,
55
},
66
crate::{
@@ -66,7 +66,7 @@ fn state_table(instance: &mut ComponentInstance, ty: TableIndex) -> &mut StateTa
6666
fn push_event<T>(
6767
mut store: StoreContextMut<T>,
6868
rep: u32,
69-
event: u32,
69+
event: Event,
7070
param: usize,
7171
caller: TableId<GuestTask>,
7272
) {
@@ -281,8 +281,8 @@ fn host_write<T: func::Lower + Send + Sync + 'static, U, S: AsContextMut<Data =
281281
store.as_context_mut(),
282282
transmit_id.rep(),
283283
match ty {
284-
TableIndex::Future(_) => events::EVENT_FUTURE_READ,
285-
TableIndex::Stream(_) => events::EVENT_STREAM_READ,
284+
TableIndex::Future(_) => Event::FutureRead,
285+
TableIndex::Stream(_) => Event::StreamRead,
286286
},
287287
count,
288288
caller,
@@ -417,8 +417,8 @@ pub fn host_read<T: func::Lift + Sync + Send + 'static, U, S: AsContextMut<Data
417417
store,
418418
transmit_id.rep(),
419419
match ty {
420-
TableIndex::Future(_) => events::EVENT_FUTURE_WRITE,
421-
TableIndex::Stream(_) => events::EVENT_STREAM_WRITE,
420+
TableIndex::Future(_) => Event::FutureWrite,
421+
TableIndex::Stream(_) => Event::StreamWrite,
422422
},
423423
count,
424424
caller,
@@ -546,8 +546,8 @@ fn host_close_writer<U, S: AsContextMut<Data = U>>(mut store: S, rep: u32) -> Re
546546
store,
547547
transmit_id.rep(),
548548
match ty {
549-
TableIndex::Future(_) => events::EVENT_FUTURE_READ,
550-
TableIndex::Stream(_) => events::EVENT_STREAM_READ,
549+
TableIndex::Future(_) => Event::FutureRead,
550+
TableIndex::Stream(_) => Event::StreamRead,
551551
},
552552
CLOSED,
553553
caller,
@@ -598,8 +598,8 @@ fn host_close_reader<U, S: AsContextMut<Data = U>>(mut store: S, rep: u32) -> Re
598598
store.as_context_mut(),
599599
transmit_id.rep(),
600600
match ty {
601-
TableIndex::Future(_) => events::EVENT_FUTURE_WRITE,
602-
TableIndex::Stream(_) => events::EVENT_STREAM_WRITE,
601+
TableIndex::Future(_) => Event::FutureRead,
602+
TableIndex::Stream(_) => Event::StreamRead,
603603
},
604604
CLOSED,
605605
caller,
@@ -1421,8 +1421,8 @@ fn guest_write<T>(
14211421
cx,
14221422
transmit_id.rep(),
14231423
match read_ty {
1424-
TableIndex::Future(_) => events::EVENT_FUTURE_READ,
1425-
TableIndex::Stream(_) => events::EVENT_STREAM_READ,
1424+
TableIndex::Future(_) => Event::FutureRead,
1425+
TableIndex::Stream(_) => Event::StreamRead,
14261426
},
14271427
count,
14281428
read_caller,
@@ -1576,8 +1576,8 @@ fn guest_read<T>(
15761576
cx,
15771577
transmit_id.rep(),
15781578
match write_ty {
1579-
TableIndex::Future(_) => events::EVENT_FUTURE_WRITE,
1580-
TableIndex::Stream(_) => events::EVENT_STREAM_WRITE,
1579+
TableIndex::Future(_) => Event::FutureWrite,
1580+
TableIndex::Stream(_) => Event::StreamWrite,
15811581
},
15821582
count,
15831583
write_caller,

0 commit comments

Comments
 (0)