Skip to content

Commit 6d602ea

Browse files
aleks-fCopilotmatejk
authored
5150 reactor sock remove (#5151)
* fix(NotificationCenter): use RWLock to prevent lock-order-inversion #5150 Change NotificationCenter from Mutex to RWLock to fix TSAN lock-order-inversion warnings when notification handlers call back into the notification center (e.g., hasEventHandler() from within a SocketReactor handler). Key changes: - Replace Mutex with RWLock for better read concurrency - Use read locks for hasObserver(), observersToNotify(), hasObservers(), countObservers(), backlog() - Use write locks for addObserver(), removeObserver(), clear() - observersToNotify() now copies observer list and releases lock before calling accepts() to avoid lock cycle with NObserver's internal mutex - Add clear() method to atomically disable and remove all observers - Update AsyncNotificationCenter to use RWLock::ScopedLock Also fixes AsyncNotificationCenter shutdown to properly signal the dequeue thread via ShutdownNotification instead of relying only on wakeUpAll() #5058 * fix(Net): add SocketReactor::remove() for safe socket cleanup #5150 Add SocketReactor::remove() method that atomically removes a socket from the poll set and disables all its event handlers. This prevents race conditions when removing handlers in destructors, where events could still be dispatched to handlers while other handlers for the same socket are being removed. Changes: - Add SocketReactor::remove(socket) for atomic socket removal - Add SocketNotifier::disableObservers() to disable all handlers - Add SocketNotifier::socket() getter - Skip dispatch for removed sockets in dispatch() methods - Update tests to use remove() before removeEventHandler() - Add testSocketReactorRemove() test * fix(Foundation): improve ProcessRunner robustness #5054 - Fix race condition: set _pPH after _pid to ensure pid() returns valid value when running() returns true - Add upfront invalid PID validation before termination attempt - Escalate to Process::kill() if requestTermination times out, with its own timeout before throwing - Try to remove stale PID file on timeout instead of throwing - Wrap start() wait logic in try/catch to kill orphan process and reset state if startup fails - Clear _error under lock after successful stop to prevent stale error from affecting next start() * Update Net/include/Poco/Net/SocketReactor.h Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update Foundation/src/AsyncNotificationCenter.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update Foundation/include/Poco/AsyncNotificationCenter.h Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update Foundation/src/AsyncNotificationCenter.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * chore: add TaskManager::testCancel to win ignore list * chore: few improvements in NotificationCenter and ODBC cmake. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Matej Kenda <matejken@gmail.com>
1 parent 7f03ba5 commit 6d602ea

13 files changed

Lines changed: 295 additions & 84 deletions

Data/ODBC/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ find_file(
5454
if (_msodbc_h)
5555
get_filename_component(MSODBC_DIR ${_msodbc_h} DIRECTORY CACHE)
5656
message(STATUS "ODBC: Found msodbcsql.h in: ${MSODBC_DIR}")
57-
target_include_directories(DataODBC PUBLIC "${MSODBC_DIR}")
57+
target_include_directories(DataODBC SYSTEM PRIVATE "${MSODBC_DIR}")
5858

5959
if (ENABLE_DATA_SQL_SERVER_BIG_STRINGS)
6060
target_compile_definitions(DataODBC PUBLIC POCO_DATA_SQL_SERVER_BIG_STRINGS=1)

Foundation/include/Poco/AsyncNotificationCenter.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,11 @@ class Foundation_API AsyncNotificationCenter: public NotificationCenter
151151

152152
using Adapter = RunnableAdapter<AsyncNotificationCenter>;
153153

154+
class ShutdownNotification: public Notification
155+
/// Internal notification used to signal the dequeue loop to stop.
156+
{
157+
};
158+
154159
const AsyncMode _mode { AsyncMode::ENQUEUE };
155160

156161
// Async enqueue for notifications

Foundation/include/Poco/NotificationCenter.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
#include "Poco/Foundation.h"
2222
#include "Poco/Notification.h"
23-
#include "Poco/Mutex.h"
23+
#include "Poco/RWLock.h"
2424
#include "Poco/SharedPtr.h"
2525
#include "Poco/Observer.h"
2626
#include "Poco/NObserver.h"
@@ -186,11 +186,18 @@ class Foundation_API NotificationCenter
186186
/// Returns a reference to the default
187187
/// NotificationCenter.
188188

189+
void clear();
190+
/// Disables and removes all observers.
191+
///
192+
/// This can be called to ensure that no more
193+
/// notifications will be dispatched to observers,
194+
/// even if they are currently being dispatched.
195+
189196
protected:
190197
using AbstractObserverPtr = SharedPtr<AbstractObserver>;
191198
using ObserverList = std::vector<AbstractObserverPtr>;
192199

193-
Mutex& mutex()
200+
RWLock& mutex()
194201
{
195202
return _mutex;
196203
}
@@ -200,8 +207,8 @@ class Foundation_API NotificationCenter
200207

201208
private:
202209

203-
ObserverList _observers;
204-
mutable Mutex _mutex;
210+
ObserverList _observers;
211+
mutable RWLock _mutex;
205212
};
206213
} // namespace Poco
207214

Foundation/src/AsyncNotificationCenter.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "Poco/AsyncNotificationCenter.h"
1717

1818
#include "Poco/AbstractObserver.h"
19+
#include "Poco/RWLock.h"
1920
#include "Poco/Stopwatch.h"
2021
#include "Poco/Debugger.h"
2122
#include "Poco/ErrorHandler.h"
@@ -155,7 +156,7 @@ void AsyncNotificationCenter::notifyObservers(Notification::Ptr& pNotification)
155156

156157
void AsyncNotificationCenter::start()
157158
{
158-
Poco::ScopedLock l(mutex());
159+
RWLock::ScopedLock l(mutex());
159160

160161
if (_mode == AsyncMode::ENQUEUE || _mode == AsyncMode::BOTH)
161162
{
@@ -180,8 +181,8 @@ void AsyncNotificationCenter::start()
180181

181182
if (_mode == AsyncMode::NOTIFY || _mode == AsyncMode::BOTH)
182183
{
183-
auto dispatch = [this](std::stop_token stopToken, std::size_t id) {
184-
this->dispatchNotifications(stopToken, id);
184+
auto dispatch = [this](std::stop_token stopToken, std::size_t workerId) {
185+
this->dispatchNotifications(stopToken, workerId);
185186
};
186187

187188
for (std::size_t i {0}; i < _workersCount; ++i)
@@ -200,6 +201,7 @@ void AsyncNotificationCenter::stop()
200201
{
201202
if (_enqueueThreadStarted.exchange(false))
202203
{
204+
_nq.enqueueUrgentNotification(new ShutdownNotification);
203205
_nq.wakeUpAll();
204206
while (!_enqueueThreadDone) Thread::sleep(100);
205207
_enqueueThread.join();
@@ -224,8 +226,11 @@ void AsyncNotificationCenter::dequeue()
224226
Notification::Ptr pNf;
225227
_enqueueThreadStarted = true;
226228
_enqueueThreadDone = false;
227-
while ((pNf = _nq.waitDequeueNotification()))
229+
while (true)
228230
{
231+
pNf = _nq.waitDequeueNotification();
232+
if (!pNf) break;
233+
if (pNf.cast<ShutdownNotification>()) break;
229234
try
230235
{
231236
notifyObservers(pNf);

Foundation/src/NotificationCenter.cpp

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,20 @@
1414

1515
#include "Poco/NotificationCenter.h"
1616
#include "Poco/Notification.h"
17-
#include "Poco/Observer.h"
18-
#include "Poco/AutoPtr.h"
17+
#include "Poco/RWLock.h"
1918

2019

2120
namespace Poco {
2221

2322

24-
NotificationCenter::NotificationCenter()
25-
{
26-
}
23+
NotificationCenter::NotificationCenter() = default;
2724

2825

2926
NotificationCenter::~NotificationCenter()
3027
{
3128
try
3229
{
33-
ObserverList observersToDisable;
34-
{
35-
Mutex::ScopedLock lock(_mutex);
36-
observersToDisable = std::move(_observers);
37-
}
38-
for (auto& o: observersToDisable)
39-
o->disable();
30+
clear();
4031
}
4132
catch(...)
4233
{
@@ -47,35 +38,30 @@ NotificationCenter::~NotificationCenter()
4738

4839
void NotificationCenter::addObserver(const AbstractObserver& observer)
4940
{
50-
Mutex::ScopedLock lock(_mutex);
51-
_observers.push_back(observer.clone());
41+
RWLock::ScopedLock lock(_mutex);
42+
_observers.emplace_back(observer.clone());
5243
_observers.back()->start();
5344
}
5445

5546

5647
void NotificationCenter::removeObserver(const AbstractObserver& observer)
5748
{
58-
AbstractObserverPtr pObserver;
49+
RWLock::ScopedLock lock(_mutex);
50+
for (auto it = _observers.begin(); it != _observers.end(); ++it)
5951
{
60-
Mutex::ScopedLock lock(_mutex);
61-
for (auto it = _observers.begin(); it != _observers.end(); ++it)
52+
if (observer.equals(**it))
6253
{
63-
if (observer.equals(**it))
64-
{
65-
pObserver = *it;
66-
_observers.erase(it);
67-
break;
68-
}
54+
(*it)->disable();
55+
_observers.erase(it);
56+
return;
6957
}
7058
}
71-
if (pObserver)
72-
pObserver->disable();
7359
}
7460

7561

7662
bool NotificationCenter::hasObserver(const AbstractObserver& observer) const
7763
{
78-
Mutex::ScopedLock lock(_mutex);
64+
RWLock::ScopedReadLock lock(_mutex);
7965
for (const auto& p: _observers)
8066
if (observer.equals(*p)) return true;
8167

@@ -85,9 +71,14 @@ bool NotificationCenter::hasObserver(const AbstractObserver& observer) const
8571

8672
NotificationCenter::ObserverList NotificationCenter::observersToNotify(const Notification::Ptr& pNotification) const
8773
{
74+
ObserverList observers;
75+
{
76+
RWLock::ScopedReadLock lock(_mutex);
77+
observers = _observers;
78+
}
79+
// Filter outside the lock to avoid lock-order-inversion with NObserver mutex
8880
ObserverList ret;
89-
ScopedLock<Mutex> lock(_mutex);
90-
for (auto& o : _observers)
81+
for (auto& o : observers)
9182
{
9283
if (o->accepts(pNotification))
9384
ret.push_back(o);
@@ -116,15 +107,15 @@ void NotificationCenter::notifyObservers(Notification::Ptr& pNotification)
116107

117108
bool NotificationCenter::hasObservers() const
118109
{
119-
Mutex::ScopedLock lock(_mutex);
110+
RWLock::ScopedReadLock lock(_mutex);
120111

121112
return !_observers.empty();
122113
}
123114

124115

125116
std::size_t NotificationCenter::countObservers() const
126117
{
127-
Mutex::ScopedLock lock(_mutex);
118+
RWLock::ScopedReadLock lock(_mutex);
128119

129120
return _observers.size();
130121
}
@@ -134,10 +125,10 @@ int NotificationCenter::backlog() const
134125
{
135126
int cnt = 0;
136127

137-
ScopedLockWithUnlock<Mutex> lock(_mutex);
128+
_mutex.readLock();
138129
ObserverList observersToCount(_observers);
139-
lock.unlock();
140-
for (auto& p : observersToCount)
130+
_mutex.unlock();
131+
for (const auto& p : observersToCount)
141132
cnt += p->backlog();
142133

143134
return cnt;
@@ -151,4 +142,13 @@ NotificationCenter& NotificationCenter::defaultCenter()
151142
}
152143

153144

145+
void NotificationCenter::clear()
146+
{
147+
RWLock::ScopedLock lock(_mutex);
148+
for (auto& o: _observers)
149+
o->disable();
150+
_observers.clear();
151+
}
152+
153+
154154
} // namespace Poco

0 commit comments

Comments
 (0)