|
| 1 | +"""Interactive SSH host key policy with persistent trust-on-first-use (TOFU). |
| 2 | +
|
| 3 | +Replaces the MITM-prone ``paramiko.AutoAddPolicy`` / ``paramiko.WarningPolicy``: |
| 4 | +unknown host keys are shown to the user via a Qt dialog with their SHA256 |
| 5 | +fingerprint, and only accepted on explicit confirmation. Confirmed hosts are |
| 6 | +persisted to ``~/.pybreeze/ssh_known_hosts`` so subsequent connections verify |
| 7 | +automatically. |
| 8 | +""" |
| 9 | +from __future__ import annotations |
| 10 | + |
| 11 | +import base64 |
| 12 | +import hashlib |
| 13 | +from pathlib import Path |
| 14 | +from typing import TYPE_CHECKING |
| 15 | + |
| 16 | +import paramiko |
| 17 | +from je_editor import language_wrapper |
| 18 | +from PySide6.QtWidgets import QMessageBox |
| 19 | + |
| 20 | +from pybreeze.utils.logging.logger import pybreeze_logger |
| 21 | + |
| 22 | +if TYPE_CHECKING: |
| 23 | + from PySide6.QtWidgets import QWidget |
| 24 | + |
| 25 | + |
| 26 | +def _known_hosts_path() -> Path: |
| 27 | + """Return the PyBreeze-managed known_hosts file path, ensuring the parent dir exists.""" |
| 28 | + home_dir = Path.home() / ".pybreeze" |
| 29 | + home_dir.mkdir(parents=True, exist_ok=True) |
| 30 | + return home_dir / "ssh_known_hosts" |
| 31 | + |
| 32 | + |
| 33 | +def _fingerprint_sha256(key: paramiko.PKey) -> str: |
| 34 | + """Return an OpenSSH-style SHA256 fingerprint (``SHA256:base64`` without padding).""" |
| 35 | + digest = hashlib.sha256(key.asbytes()).digest() |
| 36 | + return "SHA256:" + base64.b64encode(digest).rstrip(b"=").decode("ascii") |
| 37 | + |
| 38 | + |
| 39 | +class InteractiveHostKeyPolicy(paramiko.MissingHostKeyPolicy): |
| 40 | + """Policy that prompts the user to verify unknown host keys. |
| 41 | +
|
| 42 | + Accepted keys are persisted so later connections pass through ``RejectPolicy``-like |
| 43 | + strictness automatically. Declined keys abort the connection with ``SSHException``. |
| 44 | + """ |
| 45 | + |
| 46 | + def __init__(self, parent: QWidget | None = None) -> None: |
| 47 | + super().__init__() |
| 48 | + self._parent = parent |
| 49 | + self._word_dict = language_wrapper.language_word_dict |
| 50 | + |
| 51 | + def missing_host_key( |
| 52 | + self, |
| 53 | + client: paramiko.SSHClient, |
| 54 | + hostname: str, |
| 55 | + key: paramiko.PKey, |
| 56 | + ) -> None: |
| 57 | + fingerprint = _fingerprint_sha256(key) |
| 58 | + key_type = key.get_name() |
| 59 | + |
| 60 | + title = self._word_dict.get( |
| 61 | + "ssh_host_key_policy_dialog_title_verify_host", |
| 62 | + "Verify SSH host key", |
| 63 | + ) |
| 64 | + message_template = self._word_dict.get( |
| 65 | + "ssh_host_key_policy_dialog_message_verify_host", |
| 66 | + "The authenticity of host '{host}' cannot be established.\n" |
| 67 | + "{key_type} key fingerprint is {fingerprint}.\n\n" |
| 68 | + "Do you want to trust this host and continue connecting?", |
| 69 | + ) |
| 70 | + message = message_template.format( |
| 71 | + host=hostname, key_type=key_type, fingerprint=fingerprint |
| 72 | + ) |
| 73 | + |
| 74 | + box = QMessageBox(self._parent) |
| 75 | + box.setIcon(QMessageBox.Icon.Warning) |
| 76 | + box.setWindowTitle(title) |
| 77 | + box.setText(message) |
| 78 | + box.setStandardButtons(QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No) |
| 79 | + box.setDefaultButton(QMessageBox.StandardButton.No) |
| 80 | + response = box.exec() |
| 81 | + |
| 82 | + if response != QMessageBox.StandardButton.Yes: |
| 83 | + pybreeze_logger.warning( |
| 84 | + "SSH host key for %s rejected by user (%s)", hostname, fingerprint |
| 85 | + ) |
| 86 | + raise paramiko.SSHException( |
| 87 | + f"Host key for {hostname} rejected by user." |
| 88 | + ) |
| 89 | + |
| 90 | + client.get_host_keys().add(hostname, key_type, key) |
| 91 | + try: |
| 92 | + client.save_host_keys(str(_known_hosts_path())) |
| 93 | + except OSError as err: |
| 94 | + pybreeze_logger.warning( |
| 95 | + "Failed to persist SSH host key for %s: %s", hostname, err |
| 96 | + ) |
| 97 | + pybreeze_logger.info( |
| 98 | + "SSH host key for %s accepted and stored (%s)", hostname, fingerprint |
| 99 | + ) |
| 100 | + |
| 101 | + |
| 102 | +def apply_host_key_policy(client: paramiko.SSHClient, parent: QWidget | None) -> None: |
| 103 | + """Load known hosts and attach the interactive TOFU policy to *client*.""" |
| 104 | + client.load_system_host_keys() |
| 105 | + known_hosts = _known_hosts_path() |
| 106 | + if known_hosts.is_file(): |
| 107 | + try: |
| 108 | + client.load_host_keys(str(known_hosts)) |
| 109 | + except OSError as err: |
| 110 | + pybreeze_logger.warning("Failed to load PyBreeze known_hosts: %s", err) |
| 111 | + client.set_missing_host_key_policy(InteractiveHostKeyPolicy(parent)) |
0 commit comments