|
9 | 9 | """ |
10 | 10 |
|
11 | 11 | from argparse import ArgumentParser |
| 12 | +from logging import debug as logging_debug |
| 13 | +from logging import error as logging_error |
12 | 14 | from logging import info as logging_info |
13 | 15 | from logging import warning as logging_warning |
14 | 16 | from os import path as os_path |
15 | 17 | from pathlib import Path |
16 | 18 | from time import sleep as time_sleep |
| 19 | +from time import time as time_time |
17 | 20 | from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast |
18 | 21 |
|
19 | 22 | from pymavlink import mavutil |
@@ -475,6 +478,220 @@ def is_battery_monitoring_enabled(self) -> bool: |
475 | 478 | """Check if battery monitoring is enabled - delegates to commands manager.""" |
476 | 479 | return self._commands_manager.is_battery_monitoring_enabled() |
477 | 480 |
|
| 481 | + # MAVLink Signing Functionality |
| 482 | + |
| 483 | + def setup_signing( # pylint: disable=too-many-arguments, too-many-positional-arguments |
| 484 | + self, |
| 485 | + key: bytes, |
| 486 | + sign_outgoing: bool = True, |
| 487 | + allow_unsigned_in: bool = True, |
| 488 | + initial_timestamp: int = 0, |
| 489 | + link_id: int = 0, |
| 490 | + ) -> bool: |
| 491 | + """ |
| 492 | + Set up MAVLink 2.0 message signing for secure communication. |
| 493 | +
|
| 494 | + This enables HMAC-SHA256 signing of MAVLink messages to provide |
| 495 | + authentication and prevent message tampering/injection. |
| 496 | +
|
| 497 | + Args: |
| 498 | + key: 32-byte signing key (256-bit) for HMAC-SHA256 |
| 499 | + sign_outgoing: Whether to sign outgoing messages (default: True) |
| 500 | + allow_unsigned_in: Whether to accept unsigned incoming messages (default: True) |
| 501 | + initial_timestamp: Initial timestamp for replay protection (default: 0 = use current time) |
| 502 | + link_id: Link ID for signing (0-255, default: 0) |
| 503 | +
|
| 504 | + Returns: |
| 505 | + bool: True if signing was set up successfully |
| 506 | +
|
| 507 | + Raises: |
| 508 | + ConnectionError: If no flight controller connection is available |
| 509 | + ValueError: If key is not 32 bytes or link_id is out of range |
| 510 | + NotImplementedError: If pymavlink version doesn't support signing |
| 511 | + RuntimeError: If signing setup fails for other reasons |
| 512 | +
|
| 513 | + Note: |
| 514 | + MAVLink signing provides authentication (not encryption). |
| 515 | + Messages are still sent in plaintext but include a cryptographic |
| 516 | + signature to verify authenticity and detect tampering. |
| 517 | +
|
| 518 | + """ |
| 519 | + if self.master is None: |
| 520 | + msg = _("No flight controller connection available for signing setup") |
| 521 | + logging_warning(msg) |
| 522 | + raise ConnectionError(msg) |
| 523 | + |
| 524 | + if len(key) != 32: |
| 525 | + msg = ( |
| 526 | + f"Signing key must be 32 bytes, got {len(key)} bytes. " |
| 527 | + "Generate a new key using: SigningKeystore().generate_key()" |
| 528 | + ) |
| 529 | + raise ValueError(msg) |
| 530 | + |
| 531 | + if not 0 <= link_id <= 255: |
| 532 | + msg = f"link_id must be between 0 and 255, got {link_id}. Use 0 for default link ID." |
| 533 | + raise ValueError(msg) |
| 534 | + |
| 535 | + # Validate timestamp |
| 536 | + if initial_timestamp < 0: |
| 537 | + msg = "initial_timestamp cannot be negative" |
| 538 | + raise ValueError(msg) |
| 539 | + |
| 540 | + # Reject if timestamp is suspiciously old (> 7 days in the past) to prevent replay attacks |
| 541 | + if initial_timestamp > 0: |
| 542 | + current_time_us = int(time_time() * 1e6) |
| 543 | + age_days = (current_time_us - initial_timestamp) / (86400 * 1e6) |
| 544 | + if age_days > 7: |
| 545 | + msg = ( |
| 546 | + _( |
| 547 | + "Initial signing timestamp is %.1f days old - rejecting to prevent replay attack. " |
| 548 | + "Use initial_timestamp=0 to use current time, or provide a recent timestamp." |
| 549 | + ) |
| 550 | + % age_days |
| 551 | + ) |
| 552 | + logging_error(msg) |
| 553 | + raise ValueError(msg) |
| 554 | + |
| 555 | + try: |
| 556 | + # Set up the signing state on the MAVLink connection |
| 557 | + # pymavlink's mavlink_connection supports signing setup |
| 558 | + # Type ignore needed because MavlinkConnection is a Union including object fallback |
| 559 | + self.master.setup_signing( # type: ignore[union-attr] |
| 560 | + key, |
| 561 | + sign_outgoing=sign_outgoing, |
| 562 | + allow_unsigned_callback=self._unsigned_callback if allow_unsigned_in else None, |
| 563 | + initial_timestamp=initial_timestamp, |
| 564 | + link_id=link_id, |
| 565 | + ) |
| 566 | + |
| 567 | + logging_info( |
| 568 | + _("MAVLink signing configured: sign_outgoing=%(sign)s, allow_unsigned=%(unsigned)s"), |
| 569 | + {"sign": sign_outgoing, "unsigned": allow_unsigned_in}, |
| 570 | + ) |
| 571 | + return True |
| 572 | + |
| 573 | + except AttributeError as exc: |
| 574 | + msg = _("MAVLink signing not supported by this pymavlink version") |
| 575 | + logging_error(msg) |
| 576 | + raise NotImplementedError(msg) from exc |
| 577 | + except Exception as exc: |
| 578 | + msg = _("Failed to set up MAVLink signing: %(error)s") % {"error": str(exc)} |
| 579 | + logging_error(msg) |
| 580 | + raise RuntimeError(msg) from exc |
| 581 | + |
| 582 | + def _unsigned_callback(self, msg: object) -> bool: |
| 583 | + """ |
| 584 | + Callback to handle unsigned incoming messages when signing is enabled. |
| 585 | +
|
| 586 | + This callback is invoked for each unsigned message when signing is |
| 587 | + configured with allow_unsigned_in=True. It allows filtering which |
| 588 | + unsigned messages to accept. |
| 589 | +
|
| 590 | + Args: |
| 591 | + msg: The unsigned MAVLink message |
| 592 | +
|
| 593 | + Returns: |
| 594 | + bool: True to accept the message, False to reject it |
| 595 | +
|
| 596 | + Note: |
| 597 | + ⚠️ SECURITY WARNING: This accepts ALL unsigned messages (permissive mode). |
| 598 | + This is necessary for compatibility during signing setup and with mixed |
| 599 | + signed/unsigned systems, but reduces security. |
| 600 | +
|
| 601 | + For maximum security, consider: |
| 602 | + 1. Only accepting unsigned messages during initial connection (time-limited) |
| 603 | + 2. Whitelisting specific message types (e.g., HEARTBEAT, PARAM_VALUE) |
| 604 | + 3. Setting allow_unsigned_in=False after signing is fully established |
| 605 | +
|
| 606 | + """ |
| 607 | + # Log unsigned messages for security monitoring |
| 608 | + msg_type = getattr(msg, "get_type", lambda: "unknown")() |
| 609 | + logging_debug(_("⚠️ Received UNSIGNED MAVLink message: %(type)s"), {"type": msg_type}) |
| 610 | + |
| 611 | + # Accept all unsigned messages (permissive mode) |
| 612 | + # This allows communication during signing setup and transition periods |
| 613 | + # For stricter security, users should set allow_unsigned_in=False after establishing signing |
| 614 | + return True |
| 615 | + |
| 616 | + def disable_signing(self) -> bool: |
| 617 | + """ |
| 618 | + Disable MAVLink message signing. |
| 619 | +
|
| 620 | + This removes signing configuration and returns to unsigned communication. |
| 621 | +
|
| 622 | + Returns: |
| 623 | + bool: True if signing was disabled successfully |
| 624 | +
|
| 625 | + Raises: |
| 626 | + ConnectionError: If no flight controller connection is available |
| 627 | + NotImplementedError: If pymavlink version doesn't support signing |
| 628 | + RuntimeError: If disabling signing fails for other reasons |
| 629 | +
|
| 630 | + """ |
| 631 | + if self.master is None: |
| 632 | + msg = _("No flight controller connection available") |
| 633 | + logging_warning(msg) |
| 634 | + raise ConnectionError(msg) |
| 635 | + |
| 636 | + try: |
| 637 | + # Disable signing by passing None as key |
| 638 | + # Type ignore needed because MavlinkConnection is a Union including object fallback |
| 639 | + self.master.setup_signing(None, sign_outgoing=False, allow_unsigned_callback=None) # type: ignore[union-attr] |
| 640 | + logging_info(_("MAVLink signing disabled")) |
| 641 | + return True |
| 642 | + except AttributeError as exc: |
| 643 | + msg = _("MAVLink signing not supported by this pymavlink version") |
| 644 | + logging_error(msg) |
| 645 | + raise NotImplementedError(msg) from exc |
| 646 | + except Exception as exc: |
| 647 | + msg = _("Failed to disable MAVLink signing: %(error)s") % {"error": str(exc)} |
| 648 | + logging_error(msg) |
| 649 | + raise RuntimeError(msg) from exc |
| 650 | + |
| 651 | + def get_signing_status(self) -> dict[str, object]: |
| 652 | + """ |
| 653 | + Get the current MAVLink signing status. |
| 654 | +
|
| 655 | + Returns: |
| 656 | + dict: Signing status with keys: |
| 657 | + - enabled: Whether signing is configured |
| 658 | + - sign_outgoing: Whether outgoing messages are signed |
| 659 | + - allow_unsigned: Whether unsigned messages are accepted |
| 660 | + - link_id: Current link ID (if signing enabled) |
| 661 | + - message: Human-readable status message |
| 662 | +
|
| 663 | + """ |
| 664 | + status: dict[str, object] = { |
| 665 | + "enabled": False, |
| 666 | + "sign_outgoing": False, |
| 667 | + "allow_unsigned": True, |
| 668 | + "link_id": 0, |
| 669 | + "message": _("No connection"), |
| 670 | + } |
| 671 | + |
| 672 | + if self.master is None: |
| 673 | + return status |
| 674 | + |
| 675 | + try: |
| 676 | + # Check if signing is set up on the MAVLink connection |
| 677 | + # Type ignore needed because MavlinkConnection is a Union including object fallback |
| 678 | + mav = self.master.mav # type: ignore[union-attr] |
| 679 | + if hasattr(mav, "signing") and mav.signing is not None: |
| 680 | + signing = mav.signing |
| 681 | + status["enabled"] = True |
| 682 | + status["sign_outgoing"] = getattr(signing, "sign_outgoing", False) |
| 683 | + status["allow_unsigned"] = getattr(signing, "allow_unsigned_callback", None) is not None |
| 684 | + status["link_id"] = getattr(signing, "link_id", 0) |
| 685 | + status["message"] = _("Signing enabled") |
| 686 | + else: |
| 687 | + status["message"] = _("Signing not configured") |
| 688 | + |
| 689 | + except Exception as exc: # pylint: disable=broad-except |
| 690 | + logging_debug(_("Error getting signing status: %(error)s"), {"error": str(exc)}) |
| 691 | + status["message"] = _("Error: %(error)s") % {"error": str(exc)} |
| 692 | + |
| 693 | + return status |
| 694 | + |
478 | 695 | def get_frame_info(self) -> tuple[int, int]: |
479 | 696 | """Get frame class and frame type - delegates to commands manager.""" |
480 | 697 | return self._commands_manager.get_frame_info() |
|
0 commit comments