From de62d7fa0d3e25127440358744193ca7d03f2c53 Mon Sep 17 00:00:00 2001 From: lossv <1721684479@qq.com> Date: Thu, 5 Jan 2023 16:35:14 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dgcc4.8.5=E7=BC=96?= =?UTF-8?q?=E8=AF=91=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libgo/routine_sync/rutex.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libgo/routine_sync/rutex.h b/libgo/routine_sync/rutex.h index 11aa9ee4..2ed1ec1b 100644 --- a/libgo/routine_sync/rutex.h +++ b/libgo/routine_sync/rutex.h @@ -19,7 +19,7 @@ struct IntValue { public: inline std::atomic* value() { return ptr_; } - inline void ref(std::atomic* ptr) { ptr_ = ptr; } + inline void ref(std::atomic* ptr) { ptr_ = {ptr}; } protected: std::atomic* ptr_ {nullptr}; @@ -32,7 +32,7 @@ struct IntValue inline std::atomic* value() { return &value_; } protected: - std::atomic value_ {0}; + std::atomic value_ = {0}; }; struct RutexBase From 6232783df6ba628201f1a0e63b49be2a40ff91e9 Mon Sep 17 00:00:00 2001 From: lossv <1721684479@qq.com> Date: Thu, 9 Mar 2023 11:27:12 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=8D=8F=E7=A8=8B=E6=B1=A0=20=E5=88=9D?= =?UTF-8?q?=E5=A7=8B=E5=8C=96=E6=8C=87=E5=AE=9A=E6=A0=88=E7=A9=BA=E9=97=B4?= =?UTF-8?q?=E5=A4=A7=E5=B0=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libgo/pool/async_coroutine_pool.cpp | 28 +++++++++++++++++++++++----- libgo/pool/async_coroutine_pool.h | 7 +++++-- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/libgo/pool/async_coroutine_pool.cpp b/libgo/pool/async_coroutine_pool.cpp index 04ea4c4e..57720fa8 100644 --- a/libgo/pool/async_coroutine_pool.cpp +++ b/libgo/pool/async_coroutine_pool.cpp @@ -7,9 +7,10 @@ AsyncCoroutinePool * AsyncCoroutinePool::Create(size_t maxCallbackPoints) { return new AsyncCoroutinePool(maxCallbackPoints); } -void AsyncCoroutinePool::InitCoroutinePool(size_t maxCoroutineCount) +void AsyncCoroutinePool::InitCoroutinePool(size_t maxCoroutineCount, size_t stackSize) { maxCoroutineCount_ = maxCoroutineCount; + stackSize_ = stackSize; } void AsyncCoroutinePool::Start(int minThreadNumber, int maxThreadNumber) { @@ -21,11 +22,21 @@ void AsyncCoroutinePool::Start(int minThreadNumber, int maxThreadNumber) maxCoroutineCount_ = (std::max)(minThreadNumber * 128, maxThreadNumber); maxCoroutineCount_ = (std::min)(maxCoroutineCount_, 10240); } - for (size_t i = 0; i < maxCoroutineCount_; ++i) { - go co_scheduler(scheduler_) [this]{ - this->Go(); - }; + + if(stackSize_ > 0) { + for (size_t i = 0; i < maxCoroutineCount_; ++i) { + go_stack(stackSize_) co_scheduler(scheduler_) [this]{ + this->Go(); + }; + } + } else { + for (size_t i = 0; i < maxCoroutineCount_; ++i) { + go co_scheduler(scheduler_) [this]{ + this->Go(); + }; + } } + } void AsyncCoroutinePool::Go() { @@ -55,6 +66,13 @@ void AsyncCoroutinePool::Post(Func const& func, Func const& callback) PoolTask task{func, callback}; tasks_ << std::move(task); } + +void AsyncCoroutinePool::Post(Func const& func) +{ + PoolTask task{func, NULL}; + tasks_ << std::move(task); +} + bool AsyncCoroutinePool::AddCallbackPoint(AsyncCoroutinePool::CallbackPoint * point) { size_t writeIdx = writePointsCount_++; diff --git a/libgo/pool/async_coroutine_pool.h b/libgo/pool/async_coroutine_pool.h index e6321ae4..ee46300d 100644 --- a/libgo/pool/async_coroutine_pool.h +++ b/libgo/pool/async_coroutine_pool.h @@ -16,13 +16,15 @@ class AsyncCoroutinePool typedef std::function Func; // 初始化协程数量 - void InitCoroutinePool(size_t maxCoroutineCount); + void InitCoroutinePool(size_t maxCoroutineCount, size_t stackSize = 0); // 启动协程池 - void Start(int minThreadNumber, int maxThreadNumber = 0); + void Start(int minThreadNumber, int maxThreadNumber); void Post(Func const& func, Func const& callback); + void Post(Func const& func); + template void Post(Channel const& ret, std::function const& func) { Post([=]{ ret << func(); }, NULL); @@ -75,6 +77,7 @@ class AsyncCoroutinePool private: size_t maxCoroutineCount_; + size_t stackSize_; std::atomic coroutineCount_{0}; Scheduler* scheduler_; Channel tasks_; From 21fbf1ad109603bc1a4ba5676de2ce0809af4896 Mon Sep 17 00:00:00 2001 From: lossv <1721684479@qq.com> Date: Fri, 30 Jun 2023 15:55:12 +0800 Subject: [PATCH 3/5] pool wait stop --- libgo/pool/async_coroutine_pool.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libgo/pool/async_coroutine_pool.cpp b/libgo/pool/async_coroutine_pool.cpp index 57720fa8..73db6541 100644 --- a/libgo/pool/async_coroutine_pool.cpp +++ b/libgo/pool/async_coroutine_pool.cpp @@ -94,6 +94,10 @@ AsyncCoroutinePool::AsyncCoroutinePool(size_t maxCallbackPoints) points_ = new CallbackPoint*[maxCallbackPoints_]; } +void AsyncCoroutinePool::WaitStop(){ + while (!tasks_.empty()); +} + size_t AsyncCoroutinePool::CallbackPoint::Run(size_t maxTrigger) { size_t i = 0; From 1d9b37aca66cbc7680378776efad5b2dbaced283 Mon Sep 17 00:00:00 2001 From: lossv <1721684479@qq.com> Date: Fri, 30 Jun 2023 16:34:02 +0800 Subject: [PATCH 4/5] pool wait stop --- libgo/pool/async_coroutine_pool.cpp | 6 +++++- libgo/pool/async_coroutine_pool.h | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/libgo/pool/async_coroutine_pool.cpp b/libgo/pool/async_coroutine_pool.cpp index 73db6541..6085933d 100644 --- a/libgo/pool/async_coroutine_pool.cpp +++ b/libgo/pool/async_coroutine_pool.cpp @@ -44,6 +44,8 @@ void AsyncCoroutinePool::Go() PoolTask task; tasks_ >> task; + taskRunningPoints++; + if (task.func_) task.func_(); @@ -59,6 +61,8 @@ void AsyncCoroutinePool::Go() size_t idx = ++robin_ % pointsCount; points_[idx]->Post(std::move(task.cb_)); points_[idx]->Notify(); + + taskRunningPoints--; } } void AsyncCoroutinePool::Post(Func const& func, Func const& callback) @@ -95,7 +99,7 @@ AsyncCoroutinePool::AsyncCoroutinePool(size_t maxCallbackPoints) } void AsyncCoroutinePool::WaitStop(){ - while (!tasks_.empty()); + while (!tasks_.empty() && taskRunningPoints == 0); } size_t AsyncCoroutinePool::CallbackPoint::Run(size_t maxTrigger) diff --git a/libgo/pool/async_coroutine_pool.h b/libgo/pool/async_coroutine_pool.h index ee46300d..7226836e 100644 --- a/libgo/pool/async_coroutine_pool.h +++ b/libgo/pool/async_coroutine_pool.h @@ -25,6 +25,8 @@ class AsyncCoroutinePool void Post(Func const& func); + void WaitStop(); + template void Post(Channel const& ret, std::function const& func) { Post([=]{ ret << func(); }, NULL); @@ -83,6 +85,7 @@ class AsyncCoroutinePool Channel tasks_; std::atomic pointsCount_{0}; std::atomic writePointsCount_{0}; + std::atomic taskRunningPoints{0}; size_t maxCallbackPoints_; std::atomic robin_{0}; CallbackPoint ** points_; From 2896db7f59a13a6d77fabe1f43efb0593781e51a Mon Sep 17 00:00:00 2001 From: lossv <1721684479@qq.com> Date: Mon, 3 Jul 2023 11:24:48 +0800 Subject: [PATCH 5/5] fix taskRunningPoints --- libgo/pool/async_coroutine_pool.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/libgo/pool/async_coroutine_pool.cpp b/libgo/pool/async_coroutine_pool.cpp index 6085933d..6d9fc5ab 100644 --- a/libgo/pool/async_coroutine_pool.cpp +++ b/libgo/pool/async_coroutine_pool.cpp @@ -49,12 +49,15 @@ void AsyncCoroutinePool::Go() if (task.func_) task.func_(); - if (!task.cb_) + if (!task.cb_) { + taskRunningPoints--; continue; + } size_t pointsCount = pointsCount_; if (!pointsCount) { task.cb_(); + taskRunningPoints--; continue; } @@ -98,8 +101,9 @@ AsyncCoroutinePool::AsyncCoroutinePool(size_t maxCallbackPoints) points_ = new CallbackPoint*[maxCallbackPoints_]; } -void AsyncCoroutinePool::WaitStop(){ - while (!tasks_.empty() && taskRunningPoints == 0); +void AsyncCoroutinePool::WaitStop() +{ + while (!tasks_.empty() || taskRunningPoints.load() != 0); } size_t AsyncCoroutinePool::CallbackPoint::Run(size_t maxTrigger)