diff --git a/src/modules/mavlink/mavlink_main.cpp b/src/modules/mavlink/mavlink_main.cpp index 802bc40858fa..a502a3e09939 100644 --- a/src/modules/mavlink/mavlink_main.cpp +++ b/src/modules/mavlink/mavlink_main.cpp @@ -513,7 +513,10 @@ Mavlink::forward_message(const mavlink_message_t *msg, Mavlink *self) return; } - LockGuard lg{mavlink_module_mutex}; + // No mutex needed: instances can't be destroyed while receiver threads run + // (destroy_all_instances() stops all threads first). Pointer reads are + // naturally atomic on aligned architectures. pass_message() has its own + // per-instance _message_buffer_mutex for the ring buffer. for (Mavlink *inst : mavlink_module_instances) { if (inst && (inst != self) && (inst->get_forwarding_on())) { diff --git a/test/mavsdk_tests/CMakeLists_bench.txt b/test/mavsdk_tests/CMakeLists_bench.txt new file mode 100644 index 000000000000..2f9c58883ddc --- /dev/null +++ b/test/mavsdk_tests/CMakeLists_bench.txt @@ -0,0 +1,26 @@ +cmake_minimum_required(VERSION 3.5.1) + +project(mavlink_forwarding_bench CXX) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + +find_package(MAVSDK REQUIRED) +find_package(Threads REQUIRED) + +add_executable(mavlink_forwarding_bench + test_mavlink_forwarding_bench.cpp +) + +target_link_libraries(mavlink_forwarding_bench + MAVSDK::mavsdk + ${CMAKE_THREAD_LIBS_INIT} +) + +target_compile_options(mavlink_forwarding_bench + PRIVATE + -Wall + -Wextra + -Werror +) diff --git a/test/mavsdk_tests/run_forwarding_bench.sh b/test/mavsdk_tests/run_forwarding_bench.sh new file mode 100755 index 000000000000..c834d0bdad05 --- /dev/null +++ b/test/mavsdk_tests/run_forwarding_bench.sh @@ -0,0 +1,117 @@ +#!/bin/bash +# +# MAVLink forwarding mutex benchmark runner. +# +# Builds the benchmark tool, starts PX4 SITL SIH, runs the benchmark, +# then collects perf counter data. +# +# Usage: +# ./run_forwarding_bench.sh [--duration 60] [--rate 200] [--connections 2] [--label baseline] +# +# The --label flag tags the output files for before/after comparison. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PX4_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" +BUILD_DIR="$SCRIPT_DIR/build_bench" +RESULTS_DIR="$PX4_DIR/bench_results" + +DURATION=60 +RATE=200 +CONNECTIONS=2 +LABEL="run" + +while [[ $# -gt 0 ]]; do + case "$1" in + --duration) DURATION="$2"; shift 2 ;; + --rate) RATE="$2"; shift 2 ;; + --connections) CONNECTIONS="$2"; shift 2 ;; + --label) LABEL="$2"; shift 2 ;; + --help|-h) + echo "Usage: $0 [--duration N] [--rate N] [--connections N] [--label NAME]" + exit 0 + ;; + *) echo "Unknown arg: $1"; exit 1 ;; + esac +done + +TIMESTAMP=$(date +%Y%m%d_%H%M%S) +RESULT_PREFIX="${RESULTS_DIR}/${LABEL}_${TIMESTAMP}" + +mkdir -p "$RESULTS_DIR" + +echo "=== Step 1: Build PX4 SITL SIH ===" +cd "$PX4_DIR" +make px4_sitl_sih 2>&1 | tail -5 + +echo "" +echo "=== Step 2: Build benchmark tool ===" +mkdir -p "$BUILD_DIR" +cd "$BUILD_DIR" +# Symlink the standalone CMakeLists and source into build dir +ln -sf "$SCRIPT_DIR/CMakeLists_bench.txt" CMakeLists.txt +ln -sf "$SCRIPT_DIR/test_mavlink_forwarding_bench.cpp" . +cmake -DCMAKE_BUILD_TYPE=Release . +make -j$(nproc 2>/dev/null || sysctl -n hw.ncpu) + +echo "" +echo "=== Step 3: Start PX4 SITL SIH ===" +cd "$PX4_DIR" + +# Start PX4 in background, capture PID +PX4_SIM_MODEL=sihsim_quadx $PX4_DIR/build/px4_sitl_sih/bin/px4 \ + -s $PX4_DIR/ROMFS/px4fmu_common/init.d-posix/rcS \ + -w $PX4_DIR/build/px4_sitl_sih/rootfs \ + $PX4_DIR/build/px4_sitl_sih/etc \ + > "${RESULT_PREFIX}_px4.log" 2>&1 & +PX4_PID=$! + +echo "PX4 started (PID=$PX4_PID), waiting 10s for boot..." +sleep 10 + +# Verify PX4 is running +if ! kill -0 "$PX4_PID" 2>/dev/null; then + echo "ERROR: PX4 failed to start. Check ${RESULT_PREFIX}_px4.log" + exit 1 +fi + +echo "" +echo "=== Step 4: Run benchmark ===" +echo " Duration: ${DURATION}s" +echo " Rate: ${RATE} Hz/connection" +echo " Connections: ${CONNECTIONS}" +echo "" + +"$BUILD_DIR/mavlink_forwarding_bench" \ + --duration "$DURATION" \ + --rate "$RATE" \ + --connections "$CONNECTIONS" \ + --report "${RESULT_PREFIX}_sender.csv" \ + 2>&1 | tee "${RESULT_PREFIX}_bench.log" + +echo "" +echo "=== Step 5: Perf counters ===" +echo "The benchmark tool reads perf counters from the board automatically" +echo "via the MAVLink shell protocol. Check the bench log above for BENCH lines." + +echo "" +echo "=== Step 6: Stop PX4 ===" +kill -INT "$PX4_PID" 2>/dev/null || true +wait "$PX4_PID" 2>/dev/null || true + +# Extract bench lines from the benchmark log if available +if grep -q "BENCH" "${RESULT_PREFIX}_bench.log" 2>/dev/null; then + echo "Perf counters:" + grep "BENCH" "${RESULT_PREFIX}_bench.log" | tee "${RESULT_PREFIX}_perf.txt" +fi + +echo "" +echo "=== Results saved ===" +echo " PX4 log: ${RESULT_PREFIX}_px4.log" +echo " Bench log: ${RESULT_PREFIX}_bench.log" +echo " Sender CSV: ${RESULT_PREFIX}_sender.csv" +echo " Perf data: ${RESULT_PREFIX}_perf.txt (if available)" +echo "" +echo "To compare runs:" +echo " diff _perf.txt _perf.txt" diff --git a/test/mavsdk_tests/test_mavlink_forwarding_bench.cpp b/test/mavsdk_tests/test_mavlink_forwarding_bench.cpp new file mode 100644 index 000000000000..d9c7a3cd2b10 --- /dev/null +++ b/test/mavsdk_tests/test_mavlink_forwarding_bench.cpp @@ -0,0 +1,428 @@ +/**************************************************************************** + * + * Copyright (c) 2026 PX4 Development Team. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name PX4 nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +/** + * @file test_mavlink_forwarding_bench.cpp + * + * MAVLink forwarding mutex benchmark. + * Sends high-rate broadcast messages to multiple PX4 MAVLink instances + * to measure the cost of mavlink_module_mutex in the forwarding path. + * + * Usage: + * 1. Start PX4 SITL: make px4_sitl_sih + * 2. Build and run this benchmark (see CMakeLists.txt) + * 3. After the run completes, dump perf counters in the PX4 console: + * > perf + * + * The benchmark connects to two UDP ports (GCS + offboard) and floods + * broadcast DEBUG_FLOAT_ARRAY messages. Since target_system=0 (broadcast), + * every message hits forward_message() and gets forwarded to all other + * instances, creating real mutex contention between receiver threads. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace mavsdk; +using namespace std::chrono; +using namespace std::chrono_literals; + +static std::atomic g_stop{false}; + +static void signal_handler(int /*signum*/) +{ + g_stop.store(true); +} + +struct SenderStats { + std::atomic messages_sent{0}; + std::atomic send_errors{0}; +}; + +// Send broadcast DEBUG_FLOAT_ARRAY at the given rate on a single connection. +// Broadcast (sysid=0) ensures the message always enters forward_message(). +static void sender_thread(MavlinkPassthrough &passthrough, + int rate_hz, + SenderStats &stats) +{ + const auto interval = microseconds(1'000'000 / rate_hz); + auto next_send = steady_clock::now(); + + while (!g_stop.load()) { + // DEBUG_FLOAT_ARRAY is a good choice: variable-length, commonly supported, + // and has no side effects on PX4 when received. + // No target fields, so it defaults to broadcast forwarding in PX4. + auto result = passthrough.queue_message( + [](MavlinkAddress mavlink_address, uint8_t channel) { + mavlink_message_t msg{}; + mavlink_msg_debug_float_array_pack_chan( + mavlink_address.system_id, + mavlink_address.component_id, + channel, + &msg, + 0, // time_usec + "bench", // name + 0, // array_id + nullptr // data (empty) + ); + return msg; + }); + + if (result == MavlinkPassthrough::Result::Success) { + stats.messages_sent.fetch_add(1, std::memory_order_relaxed); + + } else { + stats.send_errors.fetch_add(1, std::memory_order_relaxed); + } + + next_send += interval; + std::this_thread::sleep_until(next_send); + } +} + +static void print_usage(const char *bin) +{ + std::cout << "Usage: " << bin << " [options]\n" + << " --duration Test duration in seconds (default: 60)\n" + << " --rate Messages per second per connection (default: 200)\n" + << " --connections Number of UDP connections (default: 2)\n" + << " --url MAVSDK connection URL (e.g. serial:///dev/ttyACM0:57600)\n" + << " When set, uses a single connection (ignores --connections)\n" + << " --gcs-port GCS UDP port (default: 14550)\n" + << " --api-port Offboard/API UDP port (default: 14540)\n" + << " --report Write CSV report to file (default: stdout)\n" + << std::endl; +} + +int main(int argc, char *argv[]) +{ + int duration_sec = 60; + int rate_hz = 200; + int num_connections = 2; + int gcs_port = 14550; + int api_port = 14540; + std::string report_file; + std::string connection_url; + + for (int i = 1; i < argc; ++i) { + std::string arg(argv[i]); + + if (arg == "--duration" && i + 1 < argc) { + duration_sec = std::atoi(argv[++i]); + + } else if (arg == "--rate" && i + 1 < argc) { + rate_hz = std::atoi(argv[++i]); + + } else if (arg == "--connections" && i + 1 < argc) { + num_connections = std::atoi(argv[++i]); + + } else if (arg == "--url" && i + 1 < argc) { + connection_url = argv[++i]; + num_connections = 1; + + } else if (arg == "--gcs-port" && i + 1 < argc) { + gcs_port = std::atoi(argv[++i]); + + } else if (arg == "--api-port" && i + 1 < argc) { + api_port = std::atoi(argv[++i]); + + } else if (arg == "--report" && i + 1 < argc) { + report_file = argv[++i]; + + } else if (arg == "--help" || arg == "-h") { + print_usage(argv[0]); + return 0; + + } else { + std::cerr << "Unknown argument: " << arg << std::endl; + print_usage(argv[0]); + return 1; + } + } + + if (duration_sec <= 0 || rate_hz <= 0 || num_connections <= 0) { + std::cerr << "Error: --duration, --rate, and --connections must be positive" << std::endl; + return 1; + } + + if (num_connections > 10) { + std::cerr << "Error: max 10 connections (sysid 245-254)" << std::endl; + return 1; + } + + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + + std::cout << "=== MAVLink Forwarding Mutex Benchmark ===" << std::endl; + std::cout << "Duration: " << duration_sec << "s" << std::endl; + std::cout << "Rate: " << rate_hz << " Hz per connection" << std::endl; + std::cout << "Connections: " << num_connections << std::endl; + std::cout << std::endl; + + // Build list of connection URLs + std::vector urls; + + if (!connection_url.empty()) { + urls.push_back(connection_url); + + } else { + std::vector ports; + + if (num_connections >= 1) { + ports.push_back(api_port); + } + + if (num_connections >= 2) { + ports.push_back(gcs_port); + } + + for (int i = 2; i < num_connections; ++i) { + ports.push_back(api_port + i); + } + + for (int p : ports) { + urls.push_back("udp://:" + std::to_string(p)); + } + } + + // Create MAVSDK instances and connect + std::vector> mavsdks; + std::vector> passthroughs; + + for (size_t i = 0; i < urls.size(); ++i) { + auto config = Mavsdk::Configuration( + static_cast(245 + i), // unique sysid per connection + MAV_COMP_ID_MISSIONPLANNER, + false // not autopilot + ); + auto mavsdk = std::make_unique(config); + + std::cout << "Connecting to " << urls[i] << " (sysid=" << (245 + i) << ")..." << std::endl; + + auto result = mavsdk->add_any_connection(urls[i]); + + if (result != ConnectionResult::Success) { + std::cerr << "Connection failed: " << urls[i] << std::endl; + return 1; + } + + // Wait for system discovery + std::cout << " Waiting for system..." << std::flush; + bool found = false; + + for (int t = 0; t < 100; ++t) { // 10 second timeout for serial + if (!mavsdk->systems().empty()) { + found = true; + break; + } + + std::this_thread::sleep_for(100ms); + } + + if (!found) { + std::cerr << "\n Timeout waiting for system on " << urls[i] << std::endl; + return 1; + } + + std::cout << " OK" << std::endl; + + auto system = mavsdk->systems().front(); + passthroughs.push_back(std::make_unique(system)); + mavsdks.push_back(std::move(mavsdk)); + } + + std::cout << std::endl; + std::cout << "Starting " << passthroughs.size() << " sender threads at " << rate_hz << " Hz each..." << std::endl; + std::cout << "Total expected rate: " << (rate_hz * static_cast(passthroughs.size())) << " msg/s" << std::endl; + std::cout << std::endl; + + // Launch sender threads + std::vector stats(passthroughs.size()); + std::vector threads; + + auto start_time = steady_clock::now(); + + for (size_t i = 0; i < passthroughs.size(); ++i) { + threads.emplace_back(sender_thread, std::ref(*passthroughs[i]), rate_hz, std::ref(stats[i])); + } + + // Progress reporting + auto report_interval = 5s; + auto next_report = start_time + report_interval; + + while (!g_stop.load()) { + auto now = steady_clock::now(); + + if (now >= start_time + seconds(duration_sec)) { + break; + } + + if (now >= next_report) { + auto elapsed = duration_cast(now - start_time).count(); + uint64_t total_sent = 0; + uint64_t total_errors = 0; + + for (auto &s : stats) { + total_sent += s.messages_sent.load(std::memory_order_relaxed); + total_errors += s.send_errors.load(std::memory_order_relaxed); + } + + double actual_rate = static_cast(total_sent) / static_cast(elapsed); + std::cout << "[" << elapsed << "s] sent=" << total_sent + << " errors=" << total_errors + << " rate=" << static_cast(actual_rate) << " msg/s" + << std::endl; + + next_report += report_interval; + } + + std::this_thread::sleep_for(100ms); + } + + // Stop senders + g_stop.store(true); + + for (auto &t : threads) { + t.join(); + } + + auto end_time = steady_clock::now(); + double total_elapsed = duration_cast(end_time - start_time).count() / 1000.0; + + // Final report + std::cout << std::endl; + std::cout << "=== Results ===" << std::endl; + std::cout << "Elapsed: " << total_elapsed << "s" << std::endl; + + uint64_t grand_total_sent = 0; + uint64_t grand_total_errors = 0; + + for (size_t i = 0; i < stats.size(); ++i) { + uint64_t sent = stats[i].messages_sent.load(); + uint64_t errors = stats[i].send_errors.load(); + double rate = static_cast(sent) / total_elapsed; + + std::cout << " Connection " << i << " (" << urls[i] << "): " + << "sent=" << sent + << " errors=" << errors + << " rate=" << static_cast(rate) << " msg/s" + << std::endl; + + grand_total_sent += sent; + grand_total_errors += errors; + } + + double grand_rate = static_cast(grand_total_sent) / total_elapsed; + std::cout << " Total: sent=" << grand_total_sent + << " errors=" << grand_total_errors + << " rate=" << static_cast(grand_rate) << " msg/s" + << std::endl; + + std::cout << std::endl; + std::cout << "Now run 'perf' in the PX4 console to dump forwarding perf counters." << std::endl; + std::cout << "Look for:" << std::endl; + std::cout << " mavlink: fwd_msg total -- total forward_message() time" << std::endl; + std::cout << " mavlink: fwd_msg lock -- time under mavlink_module_mutex" << std::endl; + std::cout << " mavlink: comp_seen total -- static component_was_seen() time" << std::endl; + std::cout << " mavlink: unsigned_cb -- accept_unsigned_callback time" << std::endl; + + // CSV report output + FILE *out = stdout; + + if (!report_file.empty()) { + out = fopen(report_file.c_str(), "w"); + + if (!out) { + std::cerr << "Failed to open report file: " << report_file << std::endl; + out = stdout; + } + } + + fprintf(out, "connection,url,sysid,messages_sent,send_errors,duration_s,rate_hz_requested,rate_hz_actual\n"); + + for (size_t i = 0; i < stats.size(); ++i) { + uint64_t sent = stats[i].messages_sent.load(); + uint64_t errors = stats[i].send_errors.load(); + double rate = static_cast(sent) / total_elapsed; + + fprintf(out, "%zu,%s,%d,%" PRIu64 ",%" PRIu64 ",%.1f,%d,%.1f\n", + i, urls[i].c_str(), static_cast(245 + i), + sent, errors, total_elapsed, rate_hz, rate); + } + + if (out != stdout) { + fclose(out); + std::cout << "CSV report written to: " << report_file << std::endl; + } + + // Read bench counters from the board via MAVLink shell + std::cout << std::endl; + std::cout << "=== Reading board perf counters ===" << std::endl; + + auto system = mavsdks.front()->systems().front(); + Shell shell(system); + + std::string shell_output; + std::atomic got_bench{false}; + + shell.subscribe_receive([&](std::string output) { + shell_output += output; + std::cout << output; + + if (output.find("fwd_msg lock") != std::string::npos) { + got_bench.store(true); + } + }); + + shell.send("mavlink status"); + + for (int i = 0; i < 100 && !got_bench.load(); ++i) { + std::this_thread::sleep_for(100ms); + } + + std::this_thread::sleep_for(500ms); + + return 0; +}