Skip to content

Commit ee6932f

Browse files
committed
refactor co id
1 parent 0389957 commit ee6932f

6 files changed

Lines changed: 49 additions & 31 deletions

File tree

core/src/co_pool/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl<'p> CoroutinePool<'p> {
188188

189189
/// Returns `true` if the task queue is empty.
190190
pub fn is_empty(&self) -> bool {
191-
self.size() == 0
191+
self.task_queue.is_empty()
192192
}
193193

194194
/// Returns the number of tasks owned by this pool.
@@ -210,7 +210,14 @@ impl<'p> CoroutinePool<'p> {
210210
}
211211

212212
fn do_stop(&mut self, dur: Duration) -> std::io::Result<()> {
213-
_ = self.try_timed_schedule_task(dur)?;
213+
let timeout_time = get_timeout_time(dur);
214+
loop {
215+
_ = self.try_timeout_schedule_task(timeout_time)?;
216+
if self.get_running_size() == 0 || timeout_time.saturating_sub(now()) == 0 {
217+
break;
218+
}
219+
std::thread::sleep(Duration::from_millis(1));
220+
}
214221
assert_eq!(PoolState::Stopping, self.stopped()?);
215222
self.do_clean();
216223
Ok(())

core/src/coroutine/korosensei.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::cell::{Cell, RefCell, UnsafeCell};
1111
use std::collections::VecDeque;
1212
use std::ffi::c_longlong;
1313
use std::fmt::Debug;
14+
use std::hash::{DefaultHasher, Hash, Hasher};
1415
use std::io::Error;
1516

1617
cfg_if::cfg_if! {
@@ -25,6 +26,7 @@ cfg_if::cfg_if! {
2526
/// Use `corosensei` as the low-level coroutine.
2627
#[repr(C)]
2728
pub struct Coroutine<'c, Param, Yield, Return> {
29+
pub(crate) id: u64,
2830
pub(crate) name: String,
2931
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, DefaultStack>,
3032
pub(crate) state: Cell<CoroutineState<Yield, Return>>,
@@ -427,8 +429,12 @@ where
427429
co_name
428430
)
429431
});
432+
let mut hasher = DefaultHasher::new();
433+
name.hash(&mut hasher);
434+
let id = hasher.finish();
430435
#[allow(unused_mut)]
431436
let mut co = Coroutine {
437+
id,
432438
name,
433439
inner,
434440
stack_infos,

core/src/coroutine/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
6666
&self.name
6767
}
6868

69+
/// Get the id of this coroutine.
70+
#[allow(clippy::cast_possible_truncation)]
71+
pub fn id(&self) -> usize {
72+
self.id as usize
73+
}
74+
6975
/// Returns the current state of this `StateCoroutine`.
7076
pub fn state(&self) -> CoroutineState<Yield, Return>
7177
where

core/src/net/event_loop.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use crate::{error, impl_current_for, impl_display_by_debug, info};
77
use dashmap::DashSet;
88
use once_cell::sync::Lazy;
99
use rand::RngExt;
10-
use std::ffi::{c_char, c_int, c_void, CStr, CString};
10+
use std::collections::hash_map::DefaultHasher;
11+
use std::ffi::c_int;
12+
use std::hash::{Hash, Hasher};
1113
use std::io::{Error, ErrorKind};
1214
use std::marker::PhantomData;
1315
use std::ops::{Deref, DerefMut};
@@ -143,15 +145,11 @@ impl<'e> EventLoop<'e> {
143145

144146
#[allow(trivial_numeric_casts, clippy::cast_possible_truncation)]
145147
fn token(syscall: SyscallName) -> usize {
148+
// Coroutine path: consistent hash of (coroutine_name).
146149
if let Some(co) = SchedulableCoroutine::current() {
147-
let boxed: &'static mut CString = Box::leak(Box::from(
148-
CString::new(co.name()).expect("build name failed!"),
149-
));
150-
let cstr: &'static CStr = boxed.as_c_str();
151-
let token = cstr.as_ptr().cast::<c_void>() as usize;
152-
assert!(COROUTINE_TOKENS.insert(token));
153-
return token;
150+
return co.id();
154151
}
152+
// Thread path: consistent hash of (thread_id, syscall_name).
155153
unsafe {
156154
cfg_if::cfg_if! {
157155
if #[cfg(windows)] {
@@ -160,8 +158,10 @@ impl<'e> EventLoop<'e> {
160158
let thread_id = libc::pthread_self();
161159
}
162160
}
163-
let syscall_mask = <SyscallName as Into<&str>>::into(syscall).as_ptr() as usize;
164-
let token = thread_id as usize ^ syscall_mask;
161+
let mut hasher = DefaultHasher::new();
162+
(thread_id as usize).hash(&mut hasher);
163+
std::mem::discriminant(&syscall).hash(&mut hasher);
164+
let token = hasher.finish() as usize;
165165
if SyscallName::nio() != syscall {
166166
eprintln!("generate token:{token} for {syscall}");
167167
}
@@ -350,9 +350,7 @@ impl<'e> EventLoop<'e> {
350350
if COROUTINE_TOKENS.remove(&token).is_none() {
351351
return;
352352
}
353-
if let Ok(co_name) = CStr::from_ptr((token as *const c_void).cast::<c_char>()).to_str() {
354-
self.try_resume(co_name);
355-
}
353+
self.try_resume(token);
356354
}
357355

358356
pub(super) fn start(self) -> std::io::Result<Arc<Self>>

core/src/net/selector/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub(crate) trait Selector<I: Interest, E: Event, S: EventIterator<E>> {
8686
//同时对读写事件感兴趣
8787
let interests = I::read_and_write(token);
8888
self.reregister(fd, token, interests)
89-
.or(self.register(fd, token, interests))
89+
.or_else(|_| self.register(fd, token, interests))
9090
} else {
9191
self.register(fd, token, I::read(token))
9292
}?;
@@ -105,7 +105,7 @@ pub(crate) trait Selector<I: Interest, E: Event, S: EventIterator<E>> {
105105
//同时对读写事件感兴趣
106106
let interests = I::read_and_write(token);
107107
self.reregister(fd, token, interests)
108-
.or(self.register(fd, token, interests))
108+
.or_else(|_| self.register(fd, token, interests))
109109
} else {
110110
self.register(fd, token, I::write(token))
111111
}?;

core/src/scheduler.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,26 @@ impl Ord for SuspendItem<'_> {
5555

5656
#[repr(C)]
5757
#[derive(Debug)]
58-
struct SyscallSuspendItem<'s> {
58+
struct SyscallSuspendItem {
5959
timestamp: u64,
60-
co_name: &'s str,
60+
co_id: usize,
6161
}
6262

63-
impl PartialEq<Self> for SyscallSuspendItem<'_> {
63+
impl PartialEq<Self> for SyscallSuspendItem {
6464
fn eq(&self, other: &Self) -> bool {
6565
self.timestamp.eq(&other.timestamp)
6666
}
6767
}
6868

69-
impl Eq for SyscallSuspendItem<'_> {}
69+
impl Eq for SyscallSuspendItem {}
7070

71-
impl PartialOrd<Self> for SyscallSuspendItem<'_> {
71+
impl PartialOrd<Self> for SyscallSuspendItem {
7272
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
7373
Some(self.cmp(other))
7474
}
7575
}
7676

77-
impl Ord for SyscallSuspendItem<'_> {
77+
impl Ord for SyscallSuspendItem {
7878
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
7979
// BinaryHeap defaults to a large top heap, but we need a small top heap
8080
other.timestamp.cmp(&self.timestamp)
@@ -98,8 +98,8 @@ pub struct Scheduler<'s> {
9898
#[doc = include_str!("../docs/en/ordered-work-steal.md")]
9999
ready: OrderedLocalQueue<'s, SchedulableCoroutine<'s>>,
100100
suspend: BinaryHeap<SuspendItem<'s>>,
101-
syscall: DashMap<&'s str, SchedulableCoroutine<'s>>,
102-
syscall_suspend: BinaryHeap<SyscallSuspendItem<'s>>,
101+
syscall: DashMap<usize, SchedulableCoroutine<'s>>,
102+
syscall_suspend: BinaryHeap<SyscallSuspendItem>,
103103
}
104104

105105
impl Default for Scheduler<'_> {
@@ -220,8 +220,8 @@ impl<'s> Scheduler<'s> {
220220
///
221221
/// # Errors
222222
/// if change to ready fails.
223-
pub fn try_resume(&self, co_name: &'s str) {
224-
if let Some((_, co)) = self.syscall.remove(&co_name) {
223+
pub fn try_resume(&self, co_id: usize) {
224+
if let Some((_, co)) = self.syscall.remove(&co_id) {
225225
match co.state() {
226226
CoroutineState::Syscall(val, syscall, SyscallState::Suspend(_)) => {
227227
co.syscall(val, syscall, SyscallState::Callback)
@@ -313,10 +313,11 @@ impl<'s> Scheduler<'s> {
313313
CoroutineState::Syscall((), _, state) => {
314314
//挂起协程到系统调用表
315315
//如果已包含,说明当前系统调用还有上层父系统调用,因此直接忽略插入结果
316-
_ = self.syscall.insert(co_name, coroutine);
316+
let co_id = coroutine.id();
317+
_ = self.syscall.insert(co_id, coroutine);
317318
if let SyscallState::Suspend(timestamp) = state {
318319
self.syscall_suspend
319-
.push(SyscallSuspendItem { timestamp, co_name });
320+
.push(SyscallSuspendItem { timestamp, co_id });
320321
}
321322
}
322323
CoroutineState::Suspend((), timestamp) => {
@@ -373,7 +374,7 @@ impl<'s> Scheduler<'s> {
373374
break;
374375
}
375376
if let Some(item) = self.syscall_suspend.pop() {
376-
if let Some((_, co)) = self.syscall.remove(item.co_name) {
377+
if let Some((_, co)) = self.syscall.remove(&item.co_id) {
377378
match co.state() {
378379
CoroutineState::Syscall(val, syscall, SyscallState::Suspend(_)) => {
379380
co.syscall(val, syscall, SyscallState::Timeout)?;
@@ -420,7 +421,7 @@ mod tests {
420421
for timestamp in (0..10).rev() {
421422
heap.push(SyscallSuspendItem {
422423
timestamp,
423-
co_name: "test",
424+
co_id: 1,
424425
});
425426
}
426427
for timestamp in 0..10 {

0 commit comments

Comments
 (0)