Skip to content

Commit eaa6e73

Browse files
committed
use coroutine/task id
1 parent 0389957 commit eaa6e73

12 files changed

Lines changed: 171 additions & 145 deletions

File tree

core/src/co_pool/mod.rs

Lines changed: 64 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ mod state;
2626
/// Creator for coroutine pool.
2727
mod creator;
2828

29-
/// `task_name` -> `co_name`
30-
static RUNNING_TASKS: Lazy<DashMap<&str, &str>> = Lazy::new(DashMap::new);
29+
/// `task_id` -> `co_id`
30+
static RUNNING_TASKS: Lazy<DashMap<u64, u64>> = Lazy::new(DashMap::new);
3131

32-
static CANCEL_TASKS: Lazy<DashSet<&str>> = Lazy::new(DashSet::new);
32+
static CANCEL_TASKS: Lazy<DashSet<u64>> = Lazy::new(DashSet::new);
3333

3434
/// The coroutine pool impls.
3535
#[repr(C)]
@@ -55,10 +55,10 @@ pub struct CoroutinePool<'p> {
5555
//阻滞器
5656
blocker: Arc<CondvarBlocker>,
5757
//正在等待结果的
58-
waits: DashMap<&'p str, Arc<(Mutex<bool>, Condvar)>>,
58+
waits: DashMap<u64, Arc<(Mutex<bool>, Condvar)>>,
5959
//任务执行结果
60-
results: DashMap<String, Result<Option<usize>, &'p str>>,
61-
no_waits: DashSet<&'p str>,
60+
results: DashMap<u64, Result<Option<usize>, &'p str>>,
61+
no_waits: DashSet<u64>,
6262
}
6363

6464
impl Drop for CoroutinePool<'_> {
@@ -188,7 +188,7 @@ impl<'p> CoroutinePool<'p> {
188188

189189
/// Returns `true` if the task queue is empty.
190190
pub fn is_empty(&self) -> bool {
191-
self.size() == 0
191+
self.task_queue.is_empty()
192192
}
193193

194194
/// Returns the number of tasks owned by this pool.
@@ -210,7 +210,14 @@ impl<'p> CoroutinePool<'p> {
210210
}
211211

212212
fn do_stop(&mut self, dur: Duration) -> std::io::Result<()> {
213-
_ = self.try_timed_schedule_task(dur)?;
213+
let timeout_time = get_timeout_time(dur);
214+
loop {
215+
_ = self.try_timeout_schedule_task(timeout_time)?;
216+
if self.get_running_size() == 0 || timeout_time.saturating_sub(now()) == 0 {
217+
break;
218+
}
219+
std::thread::sleep(Duration::from_millis(1));
220+
}
214221
assert_eq!(PoolState::Stopping, self.stopped()?);
215222
self.do_clean();
216223
Ok(())
@@ -219,11 +226,11 @@ impl<'p> CoroutinePool<'p> {
219226
fn do_clean(&mut self) {
220227
// clean up remaining wait tasks
221228
for r in &self.waits {
222-
let task_name = *r.key();
229+
let task_id = *r.key();
223230
_ = self
224231
.results
225-
.insert(task_name.to_string(), Err("The coroutine pool has stopped"));
226-
self.notify(task_name);
232+
.insert(task_id, Err("The coroutine pool has stopped"));
233+
self.notify(task_id);
227234
}
228235
}
229236

@@ -237,16 +244,22 @@ impl<'p> CoroutinePool<'p> {
237244
func: impl FnOnce(Option<usize>) -> Option<usize> + 'p,
238245
param: Option<usize>,
239246
priority: Option<c_longlong>,
240-
) -> std::io::Result<String> {
247+
) -> std::io::Result<u64> {
241248
match self.state() {
242249
PoolState::Running => {}
243250
PoolState::Stopping | PoolState::Stopped => {
244251
return Err(Error::other("The coroutine pool is stopping or stopped !"))
245252
}
246253
}
247-
let name = name.unwrap_or(format!("{}@{}", self.name(), uuid::Uuid::new_v4()));
248-
self.submit_raw_task(Task::new(name.clone(), func, param, priority));
249-
Ok(name)
254+
let task = Task::new(
255+
name.unwrap_or(format!("{}@{}", self.name(), uuid::Uuid::new_v4())),
256+
func,
257+
param,
258+
priority,
259+
);
260+
let task_id = task.id();
261+
self.submit_raw_task(task);
262+
Ok(task_id)
250263
}
251264

252265
/// Submit new task to this pool.
@@ -258,52 +271,51 @@ impl<'p> CoroutinePool<'p> {
258271
self.blocker.notify();
259272
}
260273

261-
/// Attempt to obtain task results with the given `task_name`.
262-
pub fn try_take_task_result(&self, task_name: &str) -> Option<Result<Option<usize>, &'p str>> {
263-
self.results.remove(task_name).map(|(_, r)| r)
274+
/// Attempt to obtain task results with the given `task_id`.
275+
pub fn try_take_task_result(&self, task_id: u64) -> Option<Result<Option<usize>, &'p str>> {
276+
self.results.remove(&task_id).map(|(_, r)| r)
264277
}
265278

266279
/// clean the task result data.
267-
pub fn clean_task_result(&self, task_name: &str) {
268-
if self.try_take_task_result(task_name).is_some() {
280+
pub fn clean_task_result(&self, task_id: u64) {
281+
if self.try_take_task_result(task_id).is_some() {
269282
return;
270283
}
271-
_ = self.no_waits.insert(Box::leak(Box::from(task_name)));
272-
_ = CANCEL_TASKS.remove(task_name);
284+
_ = self.no_waits.insert(task_id);
285+
_ = CANCEL_TASKS.remove(&task_id);
273286
}
274287

275-
/// Use the given `task_name` to obtain task results, and if no results are found,
288+
/// Use the given `task_id` to obtain task results, and if no results are found,
276289
/// block the current thread for `wait_time`.
277290
///
278291
/// # Errors
279292
/// if timeout
280293
pub fn wait_task_result(
281294
&self,
282-
task_name: &str,
295+
task_id: u64,
283296
wait_time: Duration,
284297
) -> std::io::Result<Result<Option<usize>, &str>> {
285-
let key = Box::leak(Box::from(task_name));
286-
if let Some(r) = self.try_take_task_result(key) {
287-
self.notify(key);
298+
if let Some(r) = self.try_take_task_result(task_id) {
299+
self.notify(task_id);
288300
return Ok(r);
289301
}
290302
if SchedulableCoroutine::current().is_some() {
291303
let timeout_time = get_timeout_time(wait_time);
292304
loop {
293305
_ = self.try_run();
294-
if let Some(r) = self.try_take_task_result(key) {
306+
if let Some(r) = self.try_take_task_result(task_id) {
295307
return Ok(r);
296308
}
297309
if timeout_time.saturating_sub(now()) == 0 {
298310
return Err(Error::new(ErrorKind::TimedOut, "wait timeout"));
299311
}
300312
}
301313
}
302-
let arc = if let Some(arc) = self.waits.get(key) {
314+
let arc = if let Some(arc) = self.waits.get(&task_id) {
303315
arc.clone()
304316
} else {
305317
let arc = Arc::new((Mutex::new(true), Condvar::new()));
306-
assert!(self.waits.insert(key, arc.clone()).is_none());
318+
assert!(self.waits.insert(task_id, arc.clone()).is_none());
307319
arc
308320
};
309321
let (lock, cvar) = &*arc;
@@ -315,8 +327,8 @@ impl<'p> CoroutinePool<'p> {
315327
)
316328
.map_err(|e| Error::other(format!("{e}")))?,
317329
);
318-
if let Some(r) = self.try_take_task_result(key) {
319-
self.notify(key);
330+
if let Some(r) = self.try_take_task_result(task_id) {
331+
self.notify(task_id);
320332
return Ok(r);
321333
}
322334
Err(Error::new(ErrorKind::TimedOut, "wait timeout"))
@@ -402,32 +414,31 @@ impl<'p> CoroutinePool<'p> {
402414

403415
fn try_run(&self) -> Option<()> {
404416
self.task_queue.pop().map(|task| {
405-
let tname = task.get_name().to_string().leak();
406-
if CANCEL_TASKS.contains(tname) {
407-
_ = CANCEL_TASKS.remove(tname);
408-
warn!("Cancel task:{} successfully !", tname);
417+
let task_id = task.id();
418+
if CANCEL_TASKS.contains(&task_id) {
419+
_ = CANCEL_TASKS.remove(&task_id);
420+
warn!("Cancel task:{} successfully !", task_id);
409421
return;
410422
}
411423
if let Some(co) = SchedulableCoroutine::current() {
412-
_ = RUNNING_TASKS.insert(tname, co.name());
424+
_ = RUNNING_TASKS.insert(task_id, co.id);
413425
}
414-
let (task_name, result) = task.run();
415-
_ = RUNNING_TASKS.remove(tname);
416-
let n = task_name.clone().leak();
417-
if self.no_waits.contains(n) {
418-
_ = self.no_waits.remove(n);
426+
let (_, result) = task.run();
427+
_ = RUNNING_TASKS.remove(&task_id);
428+
if self.no_waits.contains(&task_id) {
429+
_ = self.no_waits.remove(&task_id);
419430
return;
420431
}
421432
assert!(
422-
self.results.insert(task_name.clone(), result).is_none(),
433+
self.results.insert(task_id, result).is_none(),
423434
"The previous result was not retrieved in a timely manner"
424435
);
425-
self.notify(&task_name);
436+
self.notify(task_id);
426437
})
427438
}
428439

429-
fn notify(&self, task_name: &str) {
430-
if let Some((_, arc)) = self.waits.remove(task_name) {
440+
fn notify(&self, task_id: u64) {
441+
if let Some((_, arc)) = self.waits.remove(&task_id) {
431442
let (lock, cvar) = &*arc;
432443
let mut pending = lock.lock().expect("notify task failed");
433444
*pending = false;
@@ -436,9 +447,9 @@ impl<'p> CoroutinePool<'p> {
436447
}
437448

438449
/// Try to cancel a task.
439-
pub fn try_cancel_task(task_name: &str) {
450+
pub fn try_cancel_task(task_id: u64) {
440451
// 检查正在运行的任务是否是要取消的任务
441-
if let Some(info) = RUNNING_TASKS.get(task_name) {
452+
if let Some(info) = RUNNING_TASKS.get(&task_id) {
442453
let co_name = *info;
443454
// todo windows support
444455
#[allow(unused_variables)]
@@ -450,26 +461,26 @@ impl<'p> CoroutinePool<'p> {
450461
{
451462
warn!(
452463
"Attempt to cancel task:{} running on coroutine:{} by thread:{}, cancelling...",
453-
task_name, co_name, pthread
464+
task_id, co_name, pthread
454465
);
455466
} else {
456467
error!(
457468
"Attempt to cancel task:{} running on coroutine:{} by thread:{} failed !",
458-
task_name, co_name, pthread
469+
task_id, co_name, pthread
459470
);
460471
}
461472
} else {
462473
// 添加到待取消队列
463474
Scheduler::try_cancel_coroutine(co_name);
464475
warn!(
465476
"Attempt to cancel task:{} running on coroutine:{}, cancelling...",
466-
task_name, co_name
477+
task_id, co_name
467478
);
468479
}
469480
} else {
470481
// 添加到待取消队列
471-
_ = CANCEL_TASKS.insert(Box::leak(Box::from(task_name)));
472-
warn!("Attempt to cancel task:{}, cancelling...", task_name);
482+
_ = CANCEL_TASKS.insert(task_id);
483+
warn!("Attempt to cancel task:{}, cancelling...", task_id);
473484
}
474485
}
475486

core/src/co_pool/task.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::catch;
22
use crate::common::ordered_work_steal::Ordered;
33
use std::ffi::c_longlong;
4+
use std::hash::{DefaultHasher, Hash, Hasher};
45

56
/// 做C兼容时会用到
67
pub type UserTaskFunc = extern "C" fn(usize) -> usize;
@@ -10,6 +11,7 @@ pub type UserTaskFunc = extern "C" fn(usize) -> usize;
1011
#[derive(educe::Educe)]
1112
#[educe(Debug)]
1213
pub struct Task<'t> {
14+
id: u64,
1315
name: String,
1416
#[educe(Debug(ignore))]
1517
func: Box<dyn FnOnce(Option<usize>) -> Option<usize> + 't>,
@@ -25,7 +27,11 @@ impl<'t> Task<'t> {
2527
param: Option<usize>,
2628
priority: Option<c_longlong>,
2729
) -> Self {
30+
let mut hasher = DefaultHasher::new();
31+
name.hash(&mut hasher);
32+
let id = hasher.finish();
2833
Task {
34+
id,
2935
name,
3036
func: Box::new(func),
3137
param,
@@ -35,10 +41,16 @@ impl<'t> Task<'t> {
3541

3642
/// get the task name.
3743
#[must_use]
38-
pub fn get_name(&self) -> &str {
44+
pub fn name(&self) -> &str {
3945
&self.name
4046
}
4147

48+
/// get the task id.
49+
#[must_use]
50+
pub fn id(&self) -> u64 {
51+
self.id
52+
}
53+
4254
/// execute the task
4355
///
4456
/// # Errors

core/src/coroutine/korosensei.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::cell::{Cell, RefCell, UnsafeCell};
1111
use std::collections::VecDeque;
1212
use std::ffi::c_longlong;
1313
use std::fmt::Debug;
14+
use std::hash::{DefaultHasher, Hash, Hasher};
1415
use std::io::Error;
1516

1617
cfg_if::cfg_if! {
@@ -25,6 +26,7 @@ cfg_if::cfg_if! {
2526
/// Use `corosensei` as the low-level coroutine.
2627
#[repr(C)]
2728
pub struct Coroutine<'c, Param, Yield, Return> {
29+
pub(crate) id: u64,
2830
pub(crate) name: String,
2931
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, DefaultStack>,
3032
pub(crate) state: Cell<CoroutineState<Yield, Return>>,
@@ -427,8 +429,12 @@ where
427429
co_name
428430
)
429431
});
432+
let mut hasher = DefaultHasher::new();
433+
name.hash(&mut hasher);
434+
let id = hasher.finish();
430435
#[allow(unused_mut)]
431436
let mut co = Coroutine {
437+
id,
432438
name,
433439
inner,
434440
stack_infos,

core/src/coroutine/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
6666
&self.name
6767
}
6868

69+
/// Get the id of this coroutine.
70+
/// todo use u64
71+
#[allow(clippy::cast_possible_truncation)]
72+
pub fn id(&self) -> usize {
73+
self.id as usize
74+
}
75+
6976
/// Returns the current state of this `StateCoroutine`.
7077
pub fn state(&self) -> CoroutineState<Yield, Return>
7178
where

0 commit comments

Comments
 (0)