Skip to content

Commit b95b218

Browse files
committed
fix(opcua,#386): per-MI active flag for event MI removal (Copilot review)
Two correctness fixes from Copilot's review of PR #387 in the same concurrency cluster: [Blocker] ``remove_event_monitored_item`` was bumping the global ``generation`` counter to invalidate in-flight callbacks for the MI being removed. That works for the removed MI but ALSO trips the trampoline staleness check for every peer monitored item registered against the same subscription, silently dropping their callbacks even though their server-side MIs are still live. Replace with a per-context ``std::atomic<bool> active`` flag flipped to false before the unique_ptr is moved out; the trampoline reads it (acquire) ahead of dispatch and bails for that MI only. Global ``generation`` stays reserved for full disconnect / ``remove_subscriptions``. [Defensive] ``setup_event_subscriptions`` was capturing ``const auto & cfg`` from a range-for by reference. Per current C++ semantics the captured reference resolves to the underlying vector element (not the local reference variable), so the closure stays valid - the integration test fires three distinct alarms and gets three distinct fault_codes, confirming each callback uses its own cfg. But value capture is unambiguous and matches the review feedback. ``AlarmEventConfig`` is a small struct of strings, so the copy is cheap. New test ``RemoveEventMonitoredItemUnknownIdDoesNotBumpGeneration`` locks the contract: peer MI staleness must not be triggered by an unknown-ID single removal. Local verify: 27/27 test_opcua_client + 27/27 test_alarm_state_machine.
1 parent 0c11ac9 commit b95b218

3 files changed

Lines changed: 58 additions & 8 deletions

File tree

src/ros2_medkit_plugins/ros2_medkit_opcua/src/opcua_client.cpp

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,21 @@ static void on_event_trampoline_c(UA_Client * client, UA_UInt32 sub_id, void * s
3838
/// Heap-owned context passed to the open62541 C event callback. Lifetime is
3939
/// owned by ``Impl::event_callbacks`` (unique_ptr); the raw pointer handed to C
4040
/// is valid until ``remove_event_monitored_item`` or ``remove_subscriptions``
41-
/// erases the entry. The ``generation_snapshot`` lets the trampoline drop
42-
/// callbacks that fire from a defunct subscription after a reconnect.
41+
/// erases the entry.
42+
///
43+
/// Two staleness guards layered together:
44+
/// - ``generation_snapshot`` filters callbacks fired from a defunct
45+
/// subscription after the whole client reconnected (bumped on disconnect
46+
/// and on ``remove_subscriptions``).
47+
/// - ``active`` filters callbacks fired from a single monitored item that has
48+
/// been individually removed via ``remove_event_monitored_item`` while
49+
/// other items in the same subscription are still live. Per-MI flag,
50+
/// not the global generation, so a single removal does not invalidate
51+
/// peer callbacks.
4352
struct EventCallbackContext {
4453
OpcuaClient * owner{nullptr};
4554
uint64_t generation_snapshot{0};
55+
std::atomic<bool> active{true};
4656
uint32_t subscription_id{0};
4757
uint32_t monitored_item_id{0}; // populated after createEvent returns
4858
OpcuaClient::EventCallback callback;
@@ -602,6 +612,13 @@ static void on_event_trampoline_c(UA_Client * /*client*/, UA_UInt32 sub_id, void
602612
if (ctx->generation_snapshot != ctx->owner->current_generation()) {
603613
return;
604614
}
615+
// Single MI removed via remove_event_monitored_item while peers remain
616+
// live - the global generation has not changed, so the peer trampolines
617+
// (in the same subscription) must keep firing. Drop only this MI's late
618+
// notifications via the per-context flag.
619+
if (!ctx->active.load(std::memory_order_acquire)) {
620+
return;
621+
}
605622

606623
// Copy the UA_Variant fields into open62541pp wrappers. UA_Variant_copy
607624
// duplicates the underlying buffer, which the opcua::Variant destructor
@@ -743,14 +760,26 @@ bool OpcuaClient::remove_event_monitored_item(uint32_t subscription_id, uint32_t
743760
if (it == impl_->event_callbacks.end() || it->second->subscription_id != subscription_id) {
744761
return false;
745762
}
763+
// Flip the per-MI active flag BEFORE moving the unique_ptr so any
764+
// in-flight trampoline call observes the cleared flag and bails out
765+
// before touching the about-to-be-freed callback. Order:
766+
// 1. set active=false (release): trampoline reads it (acquire) and
767+
// returns without invoking ``callback``.
768+
// 2. move unique_ptr out of map and erase: ctx still alive in
769+
// ``retired`` until end of function.
770+
// 3. delete server-side MI synchronously (mutex serializes against
771+
// the client's notification dispatch).
772+
// 4. ``retired`` falls out of scope -> ctx freed.
773+
// We deliberately do NOT bump the global ``generation`` counter here:
774+
// it would invalidate every peer monitored item registered against the
775+
// same subscription, silently dropping their callbacks even though
776+
// their MIs are still live on the server. Generation is reserved for
777+
// full disconnect / remove_subscriptions.
778+
it->second->active.store(false, std::memory_order_release);
746779
retired = std::move(it->second);
747780
impl_->event_callbacks.erase(it);
748781
}
749782

750-
// Bump generation BEFORE freeing the ctx so any in-flight trampoline call
751-
// captured the old generation and will drop its work.
752-
impl_->generation.fetch_add(1, std::memory_order_release);
753-
754783
if (impl_->connected) {
755784
UA_StatusCode status = UA_Client_MonitoredItems_deleteSingle(impl_->client.handle(), subscription_id, mi_id);
756785
(void)status; // best effort; on failure the server may already have dropped it

src/ros2_medkit_plugins/ros2_medkit_opcua/src/opcua_poller.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,16 @@ void OpcuaPoller::setup_event_subscriptions() {
232232
event_monitored_item_ids_.clear();
233233

234234
for (const auto & cfg : node_map_.event_alarms()) {
235-
auto callback = [this, &cfg](const std::vector<opcua::Variant> & values, const opcua::NodeId & source_node,
236-
const opcua::NodeId & event_type, const opcua::NodeId & condition_id) {
235+
// Capture cfg BY VALUE: even though range-for ``const auto & cfg`` binds
236+
// to a vector element that outlives the loop, an `&cfg`-by-reference
237+
// capture would chain through a local reference variable whose name
238+
// goes out of scope after each iteration. Defensible per current C++
239+
// semantics (the captured reference resolves to the underlying vector
240+
// element), but value capture is unambiguous and matches Copilot's
241+
// review feedback. AlarmEventConfig is a small struct of strings, so
242+
// copying is cheap.
243+
auto callback = [this, cfg](const std::vector<opcua::Variant> & values, const opcua::NodeId & source_node,
244+
const opcua::NodeId & event_type, const opcua::NodeId & condition_id) {
237245
on_event(cfg, values, source_node, event_type, condition_id);
238246
};
239247
uint32_t mi_id =

src/ros2_medkit_plugins/ros2_medkit_opcua/test/test_opcua_client.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,19 @@ TEST(OpcuaClientTest, RemoveEventMonitoredItemUnknownIdReturnsFalse) {
131131
EXPECT_FALSE(client.remove_event_monitored_item(/*sub_id=*/1, /*mi_id=*/9999));
132132
}
133133

134+
TEST(OpcuaClientTest, RemoveEventMonitoredItemUnknownIdDoesNotBumpGeneration) {
135+
// remove_event_monitored_item is per-MI cleanup. It must NOT touch the
136+
// global generation counter - that would silently invalidate every peer
137+
// monitored item's trampoline check and drop their callbacks even though
138+
// the underlying server-side MIs are still live. Per-MI staleness is
139+
// tracked by EventCallbackContext::active instead. (Copilot review on
140+
// PR #387.)
141+
OpcuaClient client;
142+
uint64_t before = client.current_generation();
143+
client.remove_event_monitored_item(/*sub_id=*/1, /*mi_id=*/9999);
144+
EXPECT_EQ(client.current_generation(), before);
145+
}
146+
134147
TEST(OpcuaClientTest, CallMethodWhenDisconnected) {
135148
OpcuaClient client;
136149
auto result = client.call_method(opcua::NodeId(0, UA_NS0ID_SERVER), opcua::NodeId(0, 11489), {});

0 commit comments

Comments
 (0)