Skip to content

Commit e14411e

Browse files
committed
fix(opcua): richer write errors, YAML validation, poll_interval clamp, backoff
Write error types (#23, #24): - Change write_value return from bool to tl::expected<void, WriteErrorInfo> with specific error codes: NotConnected, TypeMismatch, AccessDenied, NodeNotFound, TransportError - Add data_type_hint parameter to skip the readValue type-probe round-trip when the caller knows the expected type from NodeMap; halves mutex hold time on writes - Callers (vendor route handlers + DataProvider::write_data + OperationProvider::execute_operation) map WriteError codes to appropriate HTTP status (400 for type mismatch, 403 for access denied, 502 for transport errors) instead of generic 502 for all failures YAML validation (#19): - Validate required fields (node_id, entity_id, data_name) per entry; skip with RCLCPP_WARN if missing - Validate data_type is one of bool/int/float/string; default to float with warning if unknown - Detect duplicate node_id entries; first wins, subsequent skipped with warning - Cap node count at 10000 to prevent DDS resource exhaustion from oversized configs - Replace all std::cerr output with RCLCPP_WARN/RCLCPP_ERROR for structured logging Poll interval clamp (#20): - Clamp poll_interval_ms to [100, 60000] in configure() so sub-100ms values no longer cause a CPU spin even without the condition_variable fix from the previous commit Exponential backoff on reconnect (#28): - Double the reconnect wait on each failed attempt, capped at 60s - Reset to configured interval on successful reconnect - Prevents network and log saturation when PLC is down for extended periods
1 parent 084eefb commit e14411e

6 files changed

Lines changed: 179 additions & 66 deletions

File tree

src/ros2_medkit_plugins/ros2_medkit_opcua/CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ if(BUILD_TESTING)
152152
target_include_directories(test_node_map PRIVATE
153153
${CMAKE_CURRENT_SOURCE_DIR}/include
154154
)
155+
medkit_target_dependencies(test_node_map
156+
ros2_medkit_gateway
157+
rclcpp
158+
)
155159
target_link_libraries(test_node_map
156160
open62541pp::open62541pp
157161
nlohmann_json::nlohmann_json
@@ -166,6 +170,9 @@ if(BUILD_TESTING)
166170
target_include_directories(test_opcua_client PRIVATE
167171
${CMAKE_CURRENT_SOURCE_DIR}/include
168172
)
173+
medkit_target_dependencies(test_opcua_client
174+
ros2_medkit_gateway
175+
)
169176
target_link_libraries(test_opcua_client
170177
open62541pp::open62541pp
171178
nlohmann_json::nlohmann_json

src/ros2_medkit_plugins/ros2_medkit_opcua/include/ros2_medkit_opcua/opcua_client.hpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <vector>
2525

2626
#include <open62541pp/open62541pp.hpp>
27+
#include <tl/expected.hpp>
2728

2829
namespace ros2_medkit_gateway {
2930

@@ -84,9 +85,22 @@ class OpcuaClient {
8485
/// Read multiple values
8586
std::vector<ReadResult> read_values(const std::vector<opcua::NodeId> & node_ids);
8687

88+
/// OPC-UA write error classification
89+
enum class WriteError { NotConnected, TypeMismatch, AccessDenied, NodeNotFound, TransportError };
90+
91+
/// Detailed write error info
92+
struct WriteErrorInfo {
93+
WriteError code;
94+
std::string message;
95+
};
96+
8797
/// Write a value to a node
88-
/// @return true if write succeeded
89-
bool write_value(const opcua::NodeId & node_id, const OpcuaValue & value);
98+
/// @param data_type_hint If non-empty, skip readValue type-probe and use this hint
99+
/// ("bool", "int", "float", "string") to select the write coercion directly.
100+
/// Halves mutex hold time by eliminating a round-trip to the server.
101+
/// @return void on success, WriteErrorInfo on failure with specific error code
102+
tl::expected<void, WriteErrorInfo> write_value(const opcua::NodeId & node_id, const OpcuaValue & value,
103+
const std::string & data_type_hint = "");
90104

91105
/// Create a subscription with data change notifications
92106
/// @return Subscription ID, or 0 on failure

src/ros2_medkit_plugins/ros2_medkit_opcua/src/node_map.cpp

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <string>
2727

2828
#include <nlohmann/json.hpp>
29+
#include <rclcpp/rclcpp.hpp>
2930
#include <yaml-cpp/yaml.h>
3031

3132
namespace ros2_medkit_gateway {
@@ -187,17 +188,53 @@ bool NodeMap::load(const std::string & yaml_path) {
187188
entity_index_.clear();
188189
node_id_index_.clear();
189190

191+
if (nodes.size() > 10000) {
192+
RCLCPP_ERROR(rclcpp::get_logger("opcua.node_map"),
193+
"Node map has %zu entries (max 10000) - refusing to load to prevent resource exhaustion",
194+
nodes.size());
195+
return false;
196+
}
197+
190198
for (size_t i = 0; i < nodes.size(); ++i) {
191199
const auto & n = nodes[i];
200+
201+
// Validate required fields
202+
if (!n["node_id"] || !n["entity_id"] || !n["data_name"]) {
203+
RCLCPP_WARN(rclcpp::get_logger("opcua.node_map"),
204+
"Entry %zu missing required field (node_id/entity_id/data_name) - skipping", i);
205+
continue;
206+
}
207+
192208
NodeMapEntry entry;
193209

194210
entry.node_id_str = n["node_id"].as<std::string>();
195211
entry.node_id = parse_node_id(entry.node_id_str);
212+
if (entry.node_id_str.empty()) {
213+
RCLCPP_WARN(rclcpp::get_logger("opcua.node_map"), "Entry %zu has empty node_id - skipping", i);
214+
continue;
215+
}
196216
entry.entity_id = n["entity_id"].as<std::string>();
197217
entry.data_name = n["data_name"].as<std::string>();
198218
entry.display_name = n["display_name"].as<std::string>(entry.data_name);
199219
entry.unit = n["unit"].as<std::string>("");
200220
entry.data_type = n["data_type"].as<std::string>("float");
221+
222+
// Validate data_type is one of the known types
223+
if (entry.data_type != "bool" && entry.data_type != "int" && entry.data_type != "float" &&
224+
entry.data_type != "string") {
225+
RCLCPP_WARN(rclcpp::get_logger("opcua.node_map"),
226+
"Entry %zu (%s) has unknown data_type '%s' - defaulting to 'float'", i, entry.node_id_str.c_str(),
227+
entry.data_type.c_str());
228+
entry.data_type = "float";
229+
}
230+
231+
// Detect duplicate node_id (first wins)
232+
if (node_id_index_.count(entry.node_id_str) > 0) {
233+
RCLCPP_WARN(rclcpp::get_logger("opcua.node_map"), "Duplicate node_id '%s' at entry %zu - skipping (first wins)",
234+
entry.node_id_str.c_str(), i);
235+
continue;
236+
}
237+
201238
entry.writable = n["writable"].as<bool>(false);
202239
if (n["min_value"]) {
203240
entry.min_value = n["min_value"].as<double>();
@@ -241,8 +278,8 @@ bool NodeMap::load(const std::string & yaml_path) {
241278
if (n["ros2_topic"]) {
242279
entry.ros2_topic = n["ros2_topic"].as<std::string>();
243280
if (!is_valid_ros2_topic(entry.ros2_topic)) {
244-
std::cerr << "NodeMap: invalid custom ros2_topic '" << entry.ros2_topic << "' for " << entry.node_id_str
245-
<< ", auto-generating" << std::endl;
281+
RCLCPP_WARN(rclcpp::get_logger("opcua.node_map"), "Invalid custom ros2_topic '%s' for %s - auto-generating",
282+
entry.ros2_topic.c_str(), entry.node_id_str.c_str());
246283
entry.ros2_topic.clear();
247284
}
248285
}
@@ -271,7 +308,7 @@ bool NodeMap::load(const std::string & yaml_path) {
271308
return true;
272309

273310
} catch (const std::exception & e) {
274-
std::cerr << "NodeMap::load failed: " << e.what() << std::endl;
311+
RCLCPP_ERROR(rclcpp::get_logger("opcua.node_map"), "NodeMap::load failed: %s", e.what());
275312
return false;
276313
}
277314
}

src/ros2_medkit_plugins/ros2_medkit_opcua/src/opcua_client.cpp

Lines changed: 87 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -260,78 +260,113 @@ std::vector<ReadResult> OpcuaClient::read_values(const std::vector<opcua::NodeId
260260
return results;
261261
}
262262

263-
bool OpcuaClient::write_value(const opcua::NodeId & node_id, const OpcuaValue & value) {
263+
tl::expected<void, OpcuaClient::WriteErrorInfo>
264+
OpcuaClient::write_value(const opcua::NodeId & node_id, const OpcuaValue & value, const std::string & data_type_hint) {
264265
std::lock_guard<std::mutex> lock(impl_->client_mutex);
265266

266267
if (!impl_->connected) {
267-
return false;
268+
return tl::make_unexpected(WriteErrorInfo{WriteError::NotConnected, "Not connected to OPC-UA server"});
268269
}
269270

271+
// Helper: coerce OpcuaValue to double for numeric writes
272+
auto to_double = [](const OpcuaValue & v) -> double {
273+
return std::visit(
274+
[](auto && x) -> double {
275+
if constexpr (std::is_arithmetic_v<std::decay_t<decltype(x)>>) {
276+
return static_cast<double>(x);
277+
}
278+
return 0.0;
279+
},
280+
v);
281+
};
282+
283+
// Helper: coerce OpcuaValue to int32 for integer writes
284+
auto to_int32 = [](const OpcuaValue & v) -> int32_t {
285+
return std::visit(
286+
[](auto && x) -> int32_t {
287+
if constexpr (std::is_arithmetic_v<std::decay_t<decltype(x)>>) {
288+
return static_cast<int32_t>(x);
289+
}
290+
return 0;
291+
},
292+
v);
293+
};
294+
295+
// Helper: coerce OpcuaValue to bool
296+
auto to_bool = [](const OpcuaValue & v) -> bool {
297+
return std::visit(
298+
[](auto && x) -> bool {
299+
using T = std::decay_t<decltype(x)>;
300+
if constexpr (std::is_integral_v<T>) {
301+
return x != 0;
302+
} else if constexpr (std::is_floating_point_v<T>) {
303+
return x > T{0} || x < T{0};
304+
}
305+
return false;
306+
},
307+
v);
308+
};
309+
270310
try {
271311
opcua::Node node(impl_->client, node_id);
272312

273-
// Read the node's current value to determine the expected type,
274-
// then cast our value to match. OPC-UA servers reject type mismatches.
313+
// Fast path: use the data_type_hint from NodeMap instead of probing
314+
// the server with a readValue round-trip. Halves mutex hold time.
315+
if (!data_type_hint.empty()) {
316+
if (data_type_hint == "float") {
317+
node.writeValueScalar(static_cast<float>(to_double(value)));
318+
} else if (data_type_hint == "int") {
319+
node.writeValueScalar(to_int32(value));
320+
} else if (data_type_hint == "bool") {
321+
node.writeValueScalar(to_bool(value));
322+
} else if (data_type_hint == "string") {
323+
auto sval = std::visit(
324+
[](auto && x) -> std::string {
325+
using T = std::decay_t<decltype(x)>;
326+
if constexpr (std::is_same_v<T, std::string>) {
327+
return x;
328+
} else {
329+
return std::to_string(x);
330+
}
331+
},
332+
value);
333+
node.writeValueScalar(opcua::String(sval));
334+
} else {
335+
// Unknown hint - fall through to probe path
336+
node.writeValueScalar(static_cast<float>(to_double(value)));
337+
}
338+
return {};
339+
}
340+
341+
// Slow path: read current value to discover expected type, then write.
275342
auto current = node.readValue();
276343

277344
if (current.isType<float>()) {
278-
double dval = std::visit(
279-
[](auto && v) -> double {
280-
if constexpr (std::is_arithmetic_v<std::decay_t<decltype(v)>>) {
281-
return static_cast<double>(v);
282-
} else {
283-
return 0.0;
284-
}
285-
},
286-
value);
287-
node.writeValueScalar(static_cast<float>(dval));
345+
node.writeValueScalar(static_cast<float>(to_double(value)));
288346
} else if (current.isType<double>()) {
289-
double dval = std::visit(
290-
[](auto && v) -> double {
291-
if constexpr (std::is_arithmetic_v<std::decay_t<decltype(v)>>) {
292-
return static_cast<double>(v);
293-
} else {
294-
return 0.0;
295-
}
296-
},
297-
value);
298-
node.writeValueScalar(dval);
347+
node.writeValueScalar(to_double(value));
299348
} else if (current.isType<int32_t>()) {
300-
int32_t ival = std::visit(
301-
[](auto && v) -> int32_t {
302-
if constexpr (std::is_arithmetic_v<std::decay_t<decltype(v)>>) {
303-
return static_cast<int32_t>(v);
304-
} else {
305-
return 0;
306-
}
307-
},
308-
value);
309-
node.writeValueScalar(ival);
349+
node.writeValueScalar(to_int32(value));
310350
} else if (current.isType<bool>()) {
311-
bool bval = std::visit(
312-
[](auto && v) -> bool {
313-
using T = std::decay_t<decltype(v)>;
314-
if constexpr (std::is_integral_v<T>) {
315-
return v != 0;
316-
} else if constexpr (std::is_floating_point_v<T>) {
317-
// Any non-zero finite or subnormal value is truthy; use ordered
318-
// comparisons to avoid triggering -Wfloat-equal.
319-
return v > T{0} || v < T{0};
320-
} else {
321-
return false;
322-
}
323-
},
324-
value);
325-
node.writeValueScalar(bval);
351+
node.writeValueScalar(to_bool(value));
326352
} else {
327-
// Fallback: try direct variant write
328353
auto var = value_to_variant(value);
329354
node.writeValue(var);
330355
}
331-
return true;
356+
return {};
332357
} catch (const opcua::BadStatus & e) {
333358
maybe_mark_disconnected(impl_->connected, e);
334-
return false;
359+
auto code = e.code();
360+
if (code == UA_STATUSCODE_BADTYPEMISMATCH) {
361+
return tl::make_unexpected(WriteErrorInfo{WriteError::TypeMismatch, e.what()});
362+
}
363+
if (code == UA_STATUSCODE_BADUSERACCESSDENIED || code == UA_STATUSCODE_BADNOTWRITABLE) {
364+
return tl::make_unexpected(WriteErrorInfo{WriteError::AccessDenied, e.what()});
365+
}
366+
if (code == UA_STATUSCODE_BADNODEIDUNKNOWN) {
367+
return tl::make_unexpected(WriteErrorInfo{WriteError::NodeNotFound, e.what()});
368+
}
369+
return tl::make_unexpected(WriteErrorInfo{WriteError::TransportError, e.what()});
335370
}
336371
}
337372

src/ros2_medkit_plugins/ros2_medkit_opcua/src/opcua_plugin.cpp

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ void OpcuaPlugin::configure(const nlohmann::json & config) {
7373
poller_config_.subscription_interval_ms = config["subscription_interval_ms"].get<double>();
7474
}
7575
if (config.contains("poll_interval_ms")) {
76-
poller_config_.poll_interval = std::chrono::milliseconds(config["poll_interval_ms"].get<int>());
76+
auto ms = std::clamp(config["poll_interval_ms"].get<int>(), 100, 60000);
77+
poller_config_.poll_interval = std::chrono::milliseconds(ms);
7778
}
7879

7980
// Environment variables override YAML config (for Docker)
@@ -391,8 +392,13 @@ void OpcuaPlugin::handle_plc_operations(const PluginRequest & req, PluginRespons
391392
}
392393
}
393394

394-
if (!client_->write_value(entry->node_id, write_val)) {
395-
res.send_error(502, ERR_SERVICE_UNAVAILABLE, "Failed to write value to PLC node: " + entry->node_id_str);
395+
auto write_result = client_->write_value(entry->node_id, write_val, entry->data_type);
396+
if (!write_result) {
397+
int status = (write_result.error().code == OpcuaClient::WriteError::NotConnected ||
398+
write_result.error().code == OpcuaClient::WriteError::TransportError)
399+
? 502
400+
: 400;
401+
res.send_error(status, ERR_SERVICE_UNAVAILABLE, write_result.error().message);
396402
return;
397403
}
398404

@@ -846,9 +852,19 @@ OpcuaPlugin::execute_operation(const std::string & entity_id, const std::string
846852
}
847853
}
848854

849-
if (!client_->write_value(entry->node_id, write_val)) {
850-
return tl::make_unexpected(OperationProviderErrorInfo{
851-
OperationProviderError::TransportError, "Failed to write value to PLC node: " + entry->node_id_str, 502});
855+
auto write_result = client_->write_value(entry->node_id, write_val, entry->data_type);
856+
if (!write_result) {
857+
auto wcode = write_result.error().code;
858+
if (wcode == OpcuaClient::WriteError::TypeMismatch) {
859+
return tl::make_unexpected(
860+
OperationProviderErrorInfo{OperationProviderError::InvalidParameters, write_result.error().message, 400});
861+
}
862+
if (wcode == OpcuaClient::WriteError::AccessDenied) {
863+
return tl::make_unexpected(
864+
OperationProviderErrorInfo{OperationProviderError::Rejected, write_result.error().message, 403});
865+
}
866+
return tl::make_unexpected(
867+
OperationProviderErrorInfo{OperationProviderError::TransportError, write_result.error().message, 502});
852868
}
853869

854870
nlohmann::json result;

src/ros2_medkit_plugins/ros2_medkit_opcua/src/opcua_poller.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ void OpcuaPoller::on_data_change(const std::string & node_id, const OpcuaValue &
118118
}
119119

120120
void OpcuaPoller::poll_loop() {
121+
auto reconnect_wait = config_.reconnect_interval;
122+
constexpr auto max_reconnect_wait = std::chrono::milliseconds(60000);
123+
121124
while (running_.load()) {
122125
// Handle reconnection
123126
if (!client_.is_connected()) {
@@ -128,18 +131,19 @@ void OpcuaPoller::poll_loop() {
128131

129132
// Attempt reconnect with original config (preserves timeout, etc.)
130133
if (client_.connect(client_.current_config())) {
134+
reconnect_wait = config_.reconnect_interval; // reset on success
131135
if (config_.prefer_subscriptions) {
132136
setup_subscriptions();
133137
}
134138
} else {
135-
// Wait before retry. condition_variable so stop() wakes us immediately.
139+
// Exponential backoff capped at 60s. condition_variable so stop() wakes immediately.
136140
{
137141
std::unique_lock<std::mutex> lock(stop_mutex_);
138-
stop_cv_.wait_for(lock, config_.reconnect_interval, [this] {
142+
stop_cv_.wait_for(lock, reconnect_wait, [this] {
139143
return !running_.load();
140144
});
141145
}
142-
146+
reconnect_wait = std::min(reconnect_wait * 2, max_reconnect_wait);
143147
continue;
144148
}
145149
}

0 commit comments

Comments
 (0)