@@ -26,10 +26,10 @@ mod state;
2626/// Creator for coroutine pool.
2727mod creator;
2828
29- /// `task_name ` -> `co_name `
30- static RUNNING_TASKS : Lazy < DashMap < & str , & str > > = Lazy :: new ( DashMap :: new) ;
29+ /// `task_id ` -> `co_id `
30+ static RUNNING_TASKS : Lazy < DashMap < u64 , u64 > > = Lazy :: new ( DashMap :: new) ;
3131
32- static CANCEL_TASKS : Lazy < DashSet < & str > > = Lazy :: new ( DashSet :: new) ;
32+ static CANCEL_TASKS : Lazy < DashSet < u64 > > = Lazy :: new ( DashSet :: new) ;
3333
3434/// The coroutine pool impls.
3535#[ repr( C ) ]
@@ -55,10 +55,10 @@ pub struct CoroutinePool<'p> {
5555 //阻滞器
5656 blocker : Arc < CondvarBlocker > ,
5757 //正在等待结果的
58- waits : DashMap < & ' p str , Arc < ( Mutex < bool > , Condvar ) > > ,
58+ waits : DashMap < u64 , Arc < ( Mutex < bool > , Condvar ) > > ,
5959 //任务执行结果
60- results : DashMap < String , Result < Option < usize > , & ' p str > > ,
61- no_waits : DashSet < & ' p str > ,
60+ results : DashMap < u64 , Result < Option < usize > , & ' p str > > ,
61+ no_waits : DashSet < u64 > ,
6262}
6363
6464impl Drop for CoroutinePool < ' _ > {
@@ -188,7 +188,7 @@ impl<'p> CoroutinePool<'p> {
188188
189189 /// Returns `true` if the task queue is empty.
190190 pub fn is_empty ( & self ) -> bool {
191- self . size ( ) == 0
191+ self . task_queue . is_empty ( )
192192 }
193193
194194 /// Returns the number of tasks owned by this pool.
@@ -210,7 +210,14 @@ impl<'p> CoroutinePool<'p> {
210210 }
211211
212212 fn do_stop ( & mut self , dur : Duration ) -> std:: io:: Result < ( ) > {
213- _ = self . try_timed_schedule_task ( dur) ?;
213+ let timeout_time = get_timeout_time ( dur) ;
214+ loop {
215+ _ = self . try_timeout_schedule_task ( timeout_time) ?;
216+ if self . get_running_size ( ) == 0 || timeout_time. saturating_sub ( now ( ) ) == 0 {
217+ break ;
218+ }
219+ std:: thread:: sleep ( Duration :: from_millis ( 1 ) ) ;
220+ }
214221 assert_eq ! ( PoolState :: Stopping , self . stopped( ) ?) ;
215222 self . do_clean ( ) ;
216223 Ok ( ( ) )
@@ -219,11 +226,11 @@ impl<'p> CoroutinePool<'p> {
219226 fn do_clean ( & mut self ) {
220227 // clean up remaining wait tasks
221228 for r in & self . waits {
222- let task_name = * r. key ( ) ;
229+ let task_id = * r. key ( ) ;
223230 _ = self
224231 . results
225- . insert ( task_name . to_string ( ) , Err ( "The coroutine pool has stopped" ) ) ;
226- self . notify ( task_name ) ;
232+ . insert ( task_id , Err ( "The coroutine pool has stopped" ) ) ;
233+ self . notify ( task_id ) ;
227234 }
228235 }
229236
@@ -237,16 +244,22 @@ impl<'p> CoroutinePool<'p> {
237244 func : impl FnOnce ( Option < usize > ) -> Option < usize > + ' p ,
238245 param : Option < usize > ,
239246 priority : Option < c_longlong > ,
240- ) -> std:: io:: Result < String > {
247+ ) -> std:: io:: Result < u64 > {
241248 match self . state ( ) {
242249 PoolState :: Running => { }
243250 PoolState :: Stopping | PoolState :: Stopped => {
244251 return Err ( Error :: other ( "The coroutine pool is stopping or stopped !" ) )
245252 }
246253 }
247- let name = name. unwrap_or ( format ! ( "{}@{}" , self . name( ) , uuid:: Uuid :: new_v4( ) ) ) ;
248- self . submit_raw_task ( Task :: new ( name. clone ( ) , func, param, priority) ) ;
249- Ok ( name)
254+ let task = Task :: new (
255+ name. unwrap_or ( format ! ( "{}@{}" , self . name( ) , uuid:: Uuid :: new_v4( ) ) ) ,
256+ func,
257+ param,
258+ priority,
259+ ) ;
260+ let task_id = task. id ( ) ;
261+ self . submit_raw_task ( task) ;
262+ Ok ( task_id)
250263 }
251264
252265 /// Submit new task to this pool.
@@ -258,52 +271,51 @@ impl<'p> CoroutinePool<'p> {
258271 self . blocker . notify ( ) ;
259272 }
260273
261- /// Attempt to obtain task results with the given `task_name `.
262- pub fn try_take_task_result ( & self , task_name : & str ) -> Option < Result < Option < usize > , & ' p str > > {
263- self . results . remove ( task_name ) . map ( |( _, r) | r)
274+ /// Attempt to obtain task results with the given `task_id `.
275+ pub fn try_take_task_result ( & self , task_id : u64 ) -> Option < Result < Option < usize > , & ' p str > > {
276+ self . results . remove ( & task_id ) . map ( |( _, r) | r)
264277 }
265278
266279 /// clean the task result data.
267- pub fn clean_task_result ( & self , task_name : & str ) {
268- if self . try_take_task_result ( task_name ) . is_some ( ) {
280+ pub fn clean_task_result ( & self , task_id : u64 ) {
281+ if self . try_take_task_result ( task_id ) . is_some ( ) {
269282 return ;
270283 }
271- _ = self . no_waits . insert ( Box :: leak ( Box :: from ( task_name ) ) ) ;
272- _ = CANCEL_TASKS . remove ( task_name ) ;
284+ _ = self . no_waits . insert ( task_id ) ;
285+ _ = CANCEL_TASKS . remove ( & task_id ) ;
273286 }
274287
275- /// Use the given `task_name ` to obtain task results, and if no results are found,
288+ /// Use the given `task_id ` to obtain task results, and if no results are found,
276289 /// block the current thread for `wait_time`.
277290 ///
278291 /// # Errors
279292 /// if timeout
280293 pub fn wait_task_result (
281294 & self ,
282- task_name : & str ,
295+ task_id : u64 ,
283296 wait_time : Duration ,
284297 ) -> std:: io:: Result < Result < Option < usize > , & str > > {
285- let key = Box :: leak ( Box :: from ( task_name) ) ;
286- if let Some ( r) = self . try_take_task_result ( key) {
287- self . notify ( key) ;
298+ if let Some ( r) = self . try_take_task_result ( task_id) {
299+ self . notify ( task_id) ;
288300 return Ok ( r) ;
289301 }
290302 if SchedulableCoroutine :: current ( ) . is_some ( ) {
291303 let timeout_time = get_timeout_time ( wait_time) ;
292304 loop {
293305 _ = self . try_run ( ) ;
294- if let Some ( r) = self . try_take_task_result ( key ) {
306+ if let Some ( r) = self . try_take_task_result ( task_id ) {
295307 return Ok ( r) ;
296308 }
297309 if timeout_time. saturating_sub ( now ( ) ) == 0 {
298310 return Err ( Error :: new ( ErrorKind :: TimedOut , "wait timeout" ) ) ;
299311 }
300312 }
301313 }
302- let arc = if let Some ( arc) = self . waits . get ( key ) {
314+ let arc = if let Some ( arc) = self . waits . get ( & task_id ) {
303315 arc. clone ( )
304316 } else {
305317 let arc = Arc :: new ( ( Mutex :: new ( true ) , Condvar :: new ( ) ) ) ;
306- assert ! ( self . waits. insert( key , arc. clone( ) ) . is_none( ) ) ;
318+ assert ! ( self . waits. insert( task_id , arc. clone( ) ) . is_none( ) ) ;
307319 arc
308320 } ;
309321 let ( lock, cvar) = & * arc;
@@ -315,8 +327,8 @@ impl<'p> CoroutinePool<'p> {
315327 )
316328 . map_err ( |e| Error :: other ( format ! ( "{e}" ) ) ) ?,
317329 ) ;
318- if let Some ( r) = self . try_take_task_result ( key ) {
319- self . notify ( key ) ;
330+ if let Some ( r) = self . try_take_task_result ( task_id ) {
331+ self . notify ( task_id ) ;
320332 return Ok ( r) ;
321333 }
322334 Err ( Error :: new ( ErrorKind :: TimedOut , "wait timeout" ) )
@@ -402,32 +414,31 @@ impl<'p> CoroutinePool<'p> {
402414
403415 fn try_run ( & self ) -> Option < ( ) > {
404416 self . task_queue . pop ( ) . map ( |task| {
405- let tname = task. get_name ( ) . to_string ( ) . leak ( ) ;
406- if CANCEL_TASKS . contains ( tname ) {
407- _ = CANCEL_TASKS . remove ( tname ) ;
408- warn ! ( "Cancel task:{} successfully !" , tname ) ;
417+ let task_id = task. id ( ) ;
418+ if CANCEL_TASKS . contains ( & task_id ) {
419+ _ = CANCEL_TASKS . remove ( & task_id ) ;
420+ warn ! ( "Cancel task:{} successfully !" , task_id ) ;
409421 return ;
410422 }
411423 if let Some ( co) = SchedulableCoroutine :: current ( ) {
412- _ = RUNNING_TASKS . insert ( tname , co. name ( ) ) ;
424+ _ = RUNNING_TASKS . insert ( task_id , co. id ) ;
413425 }
414- let ( task_name, result) = task. run ( ) ;
415- _ = RUNNING_TASKS . remove ( tname) ;
416- let n = task_name. clone ( ) . leak ( ) ;
417- if self . no_waits . contains ( n) {
418- _ = self . no_waits . remove ( n) ;
426+ let ( _, result) = task. run ( ) ;
427+ _ = RUNNING_TASKS . remove ( & task_id) ;
428+ if self . no_waits . contains ( & task_id) {
429+ _ = self . no_waits . remove ( & task_id) ;
419430 return ;
420431 }
421432 assert ! (
422- self . results. insert( task_name . clone ( ) , result) . is_none( ) ,
433+ self . results. insert( task_id , result) . is_none( ) ,
423434 "The previous result was not retrieved in a timely manner"
424435 ) ;
425- self . notify ( & task_name ) ;
436+ self . notify ( task_id ) ;
426437 } )
427438 }
428439
429- fn notify ( & self , task_name : & str ) {
430- if let Some ( ( _, arc) ) = self . waits . remove ( task_name ) {
440+ fn notify ( & self , task_id : u64 ) {
441+ if let Some ( ( _, arc) ) = self . waits . remove ( & task_id ) {
431442 let ( lock, cvar) = & * arc;
432443 let mut pending = lock. lock ( ) . expect ( "notify task failed" ) ;
433444 * pending = false ;
@@ -436,9 +447,9 @@ impl<'p> CoroutinePool<'p> {
436447 }
437448
438449 /// Try to cancel a task.
439- pub fn try_cancel_task ( task_name : & str ) {
450+ pub fn try_cancel_task ( task_id : u64 ) {
440451 // 检查正在运行的任务是否是要取消的任务
441- if let Some ( info) = RUNNING_TASKS . get ( task_name ) {
452+ if let Some ( info) = RUNNING_TASKS . get ( & task_id ) {
442453 let co_name = * info;
443454 // todo windows support
444455 #[ allow( unused_variables) ]
@@ -450,26 +461,26 @@ impl<'p> CoroutinePool<'p> {
450461 {
451462 warn ! (
452463 "Attempt to cancel task:{} running on coroutine:{} by thread:{}, cancelling..." ,
453- task_name , co_name, pthread
464+ task_id , co_name, pthread
454465 ) ;
455466 } else {
456467 error ! (
457468 "Attempt to cancel task:{} running on coroutine:{} by thread:{} failed !" ,
458- task_name , co_name, pthread
469+ task_id , co_name, pthread
459470 ) ;
460471 }
461472 } else {
462473 // 添加到待取消队列
463474 Scheduler :: try_cancel_coroutine ( co_name) ;
464475 warn ! (
465476 "Attempt to cancel task:{} running on coroutine:{}, cancelling..." ,
466- task_name , co_name
477+ task_id , co_name
467478 ) ;
468479 }
469480 } else {
470481 // 添加到待取消队列
471- _ = CANCEL_TASKS . insert ( Box :: leak ( Box :: from ( task_name ) ) ) ;
472- warn ! ( "Attempt to cancel task:{}, cancelling..." , task_name ) ;
482+ _ = CANCEL_TASKS . insert ( task_id ) ;
483+ warn ! ( "Attempt to cancel task:{}, cancelling..." , task_id ) ;
473484 }
474485 }
475486
0 commit comments