Skip to content

Commit 83ec187

Browse files
committed
refactor task id
1 parent ee6932f commit 83ec187

10 files changed

Lines changed: 119 additions & 112 deletions

File tree

core/src/co_pool/mod.rs

Lines changed: 51 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ mod state;
2626
/// Creator for coroutine pool.
2727
mod creator;
2828

29-
/// `task_name` -> `co_name`
30-
static RUNNING_TASKS: Lazy<DashMap<&str, &str>> = Lazy::new(DashMap::new);
29+
/// `task_id` -> `co_id`
30+
static RUNNING_TASKS: Lazy<DashMap<u64, u64>> = Lazy::new(DashMap::new);
3131

32-
static CANCEL_TASKS: Lazy<DashSet<&str>> = Lazy::new(DashSet::new);
32+
static CANCEL_TASKS: Lazy<DashSet<u64>> = Lazy::new(DashSet::new);
3333

3434
/// The coroutine pool impls.
3535
#[repr(C)]
@@ -55,10 +55,10 @@ pub struct CoroutinePool<'p> {
5555
//阻滞器
5656
blocker: Arc<CondvarBlocker>,
5757
//正在等待结果的
58-
waits: DashMap<&'p str, Arc<(Mutex<bool>, Condvar)>>,
58+
waits: DashMap<u64, Arc<(Mutex<bool>, Condvar)>>,
5959
//任务执行结果
60-
results: DashMap<String, Result<Option<usize>, &'p str>>,
61-
no_waits: DashSet<&'p str>,
60+
results: DashMap<u64, Result<Option<usize>, &'p str>>,
61+
no_waits: DashSet<u64>,
6262
}
6363

6464
impl Drop for CoroutinePool<'_> {
@@ -226,11 +226,11 @@ impl<'p> CoroutinePool<'p> {
226226
fn do_clean(&mut self) {
227227
// clean up remaining wait tasks
228228
for r in &self.waits {
229-
let task_name = *r.key();
229+
let task_id = *r.key();
230230
_ = self
231231
.results
232-
.insert(task_name.to_string(), Err("The coroutine pool has stopped"));
233-
self.notify(task_name);
232+
.insert(task_id, Err("The coroutine pool has stopped"));
233+
self.notify(task_id);
234234
}
235235
}
236236

@@ -244,16 +244,22 @@ impl<'p> CoroutinePool<'p> {
244244
func: impl FnOnce(Option<usize>) -> Option<usize> + 'p,
245245
param: Option<usize>,
246246
priority: Option<c_longlong>,
247-
) -> std::io::Result<String> {
247+
) -> std::io::Result<u64> {
248248
match self.state() {
249249
PoolState::Running => {}
250250
PoolState::Stopping | PoolState::Stopped => {
251251
return Err(Error::other("The coroutine pool is stopping or stopped !"))
252252
}
253253
}
254-
let name = name.unwrap_or(format!("{}@{}", self.name(), uuid::Uuid::new_v4()));
255-
self.submit_raw_task(Task::new(name.clone(), func, param, priority));
256-
Ok(name)
254+
let task = Task::new(
255+
name.unwrap_or(format!("{}@{}", self.name(), uuid::Uuid::new_v4())),
256+
func,
257+
param,
258+
priority,
259+
);
260+
let task_id = task.id();
261+
self.submit_raw_task(task);
262+
Ok(task_id)
257263
}
258264

259265
/// Submit new task to this pool.
@@ -266,17 +272,17 @@ impl<'p> CoroutinePool<'p> {
266272
}
267273

268274
/// Attempt to obtain task results with the given `task_name`.
269-
pub fn try_take_task_result(&self, task_name: &str) -> Option<Result<Option<usize>, &'p str>> {
270-
self.results.remove(task_name).map(|(_, r)| r)
275+
pub fn try_take_task_result(&self, task_id: u64) -> Option<Result<Option<usize>, &'p str>> {
276+
self.results.remove(&task_id).map(|(_, r)| r)
271277
}
272278

273279
/// clean the task result data.
274-
pub fn clean_task_result(&self, task_name: &str) {
275-
if self.try_take_task_result(task_name).is_some() {
280+
pub fn clean_task_result(&self, task_id: u64) {
281+
if self.try_take_task_result(task_id).is_some() {
276282
return;
277283
}
278-
_ = self.no_waits.insert(Box::leak(Box::from(task_name)));
279-
_ = CANCEL_TASKS.remove(task_name);
284+
_ = self.no_waits.insert(task_id);
285+
_ = CANCEL_TASKS.remove(&task_id);
280286
}
281287

282288
/// Use the given `task_name` to obtain task results, and if no results are found,
@@ -286,31 +292,30 @@ impl<'p> CoroutinePool<'p> {
286292
/// if timeout
287293
pub fn wait_task_result(
288294
&self,
289-
task_name: &str,
295+
task_id: u64,
290296
wait_time: Duration,
291297
) -> std::io::Result<Result<Option<usize>, &str>> {
292-
let key = Box::leak(Box::from(task_name));
293-
if let Some(r) = self.try_take_task_result(key) {
294-
self.notify(key);
298+
if let Some(r) = self.try_take_task_result(task_id) {
299+
self.notify(task_id);
295300
return Ok(r);
296301
}
297302
if SchedulableCoroutine::current().is_some() {
298303
let timeout_time = get_timeout_time(wait_time);
299304
loop {
300305
_ = self.try_run();
301-
if let Some(r) = self.try_take_task_result(key) {
306+
if let Some(r) = self.try_take_task_result(task_id) {
302307
return Ok(r);
303308
}
304309
if timeout_time.saturating_sub(now()) == 0 {
305310
return Err(Error::new(ErrorKind::TimedOut, "wait timeout"));
306311
}
307312
}
308313
}
309-
let arc = if let Some(arc) = self.waits.get(key) {
314+
let arc = if let Some(arc) = self.waits.get(&task_id) {
310315
arc.clone()
311316
} else {
312317
let arc = Arc::new((Mutex::new(true), Condvar::new()));
313-
assert!(self.waits.insert(key, arc.clone()).is_none());
318+
assert!(self.waits.insert(task_id, arc.clone()).is_none());
314319
arc
315320
};
316321
let (lock, cvar) = &*arc;
@@ -322,8 +327,8 @@ impl<'p> CoroutinePool<'p> {
322327
)
323328
.map_err(|e| Error::other(format!("{e}")))?,
324329
);
325-
if let Some(r) = self.try_take_task_result(key) {
326-
self.notify(key);
330+
if let Some(r) = self.try_take_task_result(task_id) {
331+
self.notify(task_id);
327332
return Ok(r);
328333
}
329334
Err(Error::new(ErrorKind::TimedOut, "wait timeout"))
@@ -409,32 +414,31 @@ impl<'p> CoroutinePool<'p> {
409414

410415
fn try_run(&self) -> Option<()> {
411416
self.task_queue.pop().map(|task| {
412-
let tname = task.get_name().to_string().leak();
413-
if CANCEL_TASKS.contains(tname) {
414-
_ = CANCEL_TASKS.remove(tname);
415-
warn!("Cancel task:{} successfully !", tname);
417+
let task_id = task.id();
418+
if CANCEL_TASKS.contains(&task_id) {
419+
_ = CANCEL_TASKS.remove(&task_id);
420+
warn!("Cancel task:{} successfully !", task_id);
416421
return;
417422
}
418423
if let Some(co) = SchedulableCoroutine::current() {
419-
_ = RUNNING_TASKS.insert(tname, co.name());
424+
_ = RUNNING_TASKS.insert(task_id, co.id);
420425
}
421-
let (task_name, result) = task.run();
422-
_ = RUNNING_TASKS.remove(tname);
423-
let n = task_name.clone().leak();
424-
if self.no_waits.contains(n) {
425-
_ = self.no_waits.remove(n);
426+
let (_, result) = task.run();
427+
_ = RUNNING_TASKS.remove(&task_id);
428+
if self.no_waits.contains(&task_id) {
429+
_ = self.no_waits.remove(&task_id);
426430
return;
427431
}
428432
assert!(
429-
self.results.insert(task_name.clone(), result).is_none(),
433+
self.results.insert(task_id, result).is_none(),
430434
"The previous result was not retrieved in a timely manner"
431435
);
432-
self.notify(&task_name);
436+
self.notify(task_id);
433437
})
434438
}
435439

436-
fn notify(&self, task_name: &str) {
437-
if let Some((_, arc)) = self.waits.remove(task_name) {
440+
fn notify(&self, task_id: u64) {
441+
if let Some((_, arc)) = self.waits.remove(&task_id) {
438442
let (lock, cvar) = &*arc;
439443
let mut pending = lock.lock().expect("notify task failed");
440444
*pending = false;
@@ -443,9 +447,9 @@ impl<'p> CoroutinePool<'p> {
443447
}
444448

445449
/// Try to cancel a task.
446-
pub fn try_cancel_task(task_name: &str) {
450+
pub fn try_cancel_task(task_id: u64) {
447451
// 检查正在运行的任务是否是要取消的任务
448-
if let Some(info) = RUNNING_TASKS.get(task_name) {
452+
if let Some(info) = RUNNING_TASKS.get(&task_id) {
449453
let co_name = *info;
450454
// todo windows support
451455
#[allow(unused_variables)]
@@ -470,13 +474,13 @@ impl<'p> CoroutinePool<'p> {
470474
Scheduler::try_cancel_coroutine(co_name);
471475
warn!(
472476
"Attempt to cancel task:{} running on coroutine:{}, cancelling...",
473-
task_name, co_name
477+
task_id, co_name
474478
);
475479
}
476480
} else {
477481
// 添加到待取消队列
478-
_ = CANCEL_TASKS.insert(Box::leak(Box::from(task_name)));
479-
warn!("Attempt to cancel task:{}, cancelling...", task_name);
482+
_ = CANCEL_TASKS.insert(task_id);
483+
warn!("Attempt to cancel task:{}, cancelling...", task_id);
480484
}
481485
}
482486

core/src/co_pool/task.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::catch;
22
use crate::common::ordered_work_steal::Ordered;
33
use std::ffi::c_longlong;
4+
use std::hash::{DefaultHasher, Hash, Hasher};
45

56
/// 做C兼容时会用到
67
pub type UserTaskFunc = extern "C" fn(usize) -> usize;
@@ -10,6 +11,7 @@ pub type UserTaskFunc = extern "C" fn(usize) -> usize;
1011
#[derive(educe::Educe)]
1112
#[educe(Debug)]
1213
pub struct Task<'t> {
14+
id: u64,
1315
name: String,
1416
#[educe(Debug(ignore))]
1517
func: Box<dyn FnOnce(Option<usize>) -> Option<usize> + 't>,
@@ -25,7 +27,11 @@ impl<'t> Task<'t> {
2527
param: Option<usize>,
2628
priority: Option<c_longlong>,
2729
) -> Self {
30+
let mut hasher = DefaultHasher::new();
31+
name.hash(&mut hasher);
32+
let id = hasher.finish();
2833
Task {
34+
id,
2935
name,
3036
func: Box::new(func),
3137
param,
@@ -35,10 +41,16 @@ impl<'t> Task<'t> {
3541

3642
/// get the task name.
3743
#[must_use]
38-
pub fn get_name(&self) -> &str {
44+
pub fn name(&self) -> &str {
3945
&self.name
4046
}
4147

48+
/// get the task name.
49+
#[must_use]
50+
pub fn id(&self) -> u64 {
51+
self.id
52+
}
53+
4254
/// execute the task
4355
///
4456
/// # Errors

core/src/coroutine/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
6767
}
6868

6969
/// Get the id of this coroutine.
70+
/// todo use u64
7071
#[allow(clippy::cast_possible_truncation)]
7172
pub fn id(&self) -> usize {
7273
self.id as usize

core/src/net/event_loop.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ impl<'e> EventLoop<'e> {
139139
}
140140

141141
/// Try to cancel a task from `CoroutinePool`.
142-
pub(super) fn try_cancel_task(name: &str) {
143-
CoroutinePool::try_cancel_task(name);
142+
pub(super) fn try_cancel_task(task_id: u64) {
143+
CoroutinePool::try_cancel_task(task_id);
144144
}
145145

146146
#[allow(trivial_numeric_casts, clippy::cast_possible_truncation)]
@@ -350,7 +350,7 @@ impl<'e> EventLoop<'e> {
350350
if COROUTINE_TOKENS.remove(&token).is_none() {
351351
return;
352352
}
353-
self.try_resume(token);
353+
self.try_resume(token as u64);
354354
}
355355

356356
pub(super) fn start(self) -> std::io::Result<Arc<Self>>

core/src/net/join.rs

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,41 @@
11
use crate::net::event_loop::EventLoop;
2-
use std::ffi::{c_char, CStr, CString};
32
use std::io::{Error, ErrorKind};
43
use std::sync::Arc;
54
use std::time::Duration;
65

76
#[allow(missing_docs)]
87
#[repr(C)]
98
#[derive(Debug)]
10-
pub struct JoinHandle(&'static Arc<EventLoop<'static>>, *const c_char);
9+
pub struct JoinHandle(&'static Arc<EventLoop<'static>>, u64);
1110

1211
impl Drop for JoinHandle {
1312
fn drop(&mut self) {
14-
if let Ok(name) = self.get_name() {
15-
self.0.clean_task_result(name);
13+
if let Ok(task_id) = self.id() {
14+
self.0.clean_task_result(task_id);
1615
}
1716
}
1817
}
1918

2019
impl JoinHandle {
2120
/// create `JoinHandle` instance.
2221
pub(crate) fn err(pool: &'static Arc<EventLoop<'static>>) -> Self {
23-
Self::new(pool, "")
22+
Self::new(pool, 0)
2423
}
2524

2625
/// create `JoinHandle` instance.
27-
pub(crate) fn new(pool: &'static Arc<EventLoop<'static>>, name: &str) -> Self {
28-
let boxed: &'static mut CString = Box::leak(Box::from(
29-
CString::new(name).expect("init JoinHandle failed!"),
30-
));
31-
let cstr: &'static CStr = boxed.as_c_str();
32-
JoinHandle(pool, cstr.as_ptr())
26+
pub(crate) fn new(pool: &'static Arc<EventLoop<'static>>, task_id: u64) -> Self {
27+
JoinHandle(pool, task_id)
3328
}
3429

35-
/// get the task name.
30+
/// get the task id.
3631
///
3732
/// # Errors
38-
/// if the task name is invalid.
39-
pub fn get_name(&self) -> std::io::Result<&str> {
40-
unsafe { CStr::from_ptr(self.1) }
41-
.to_str()
42-
.map_err(|_| Error::new(ErrorKind::InvalidInput, "Invalid task name"))
33+
/// if the task id is invalid.
34+
pub fn id(&self) -> std::io::Result<u64> {
35+
if 0 == self.1 {
36+
return Err(Error::new(ErrorKind::InvalidInput, "Invalid task id"));
37+
}
38+
Ok(self.1)
4339
}
4440

4541
/// join with `Duration`.
@@ -66,12 +62,9 @@ impl JoinHandle {
6662
&self,
6763
timeout_time: u64,
6864
) -> std::io::Result<Result<Option<usize>, &str>> {
69-
let name = self.get_name()?;
70-
if name.is_empty() {
71-
return Err(Error::new(ErrorKind::InvalidInput, "Invalid task name"));
72-
}
65+
let task_id = self.id()?;
7366
self.0.wait_task_result(
74-
name,
67+
task_id,
7568
Duration::from_nanos(timeout_time.saturating_sub(crate::common::now())),
7669
)
7770
}

core/src/net/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,13 @@ impl EventLoops {
150150
.submit_task(name, func, param, priority)
151151
.map_or_else(
152152
|_| JoinHandle::err(event_loop),
153-
|n| JoinHandle::new(event_loop, n.as_str()),
153+
|task_id| JoinHandle::new(event_loop, task_id),
154154
)
155155
}
156156

157157
/// Try to cancel a task from event-loop.
158-
pub fn try_cancel_task(name: &str) {
159-
EventLoop::try_cancel_task(name);
158+
pub fn try_cancel_task(task_id: u64) {
159+
EventLoop::try_cancel_task(task_id);
160160
}
161161

162162
/// Submit a new coroutine to event-loop.

0 commit comments

Comments
 (0)