11"""Svn repository."""
22
33import contextlib
4+ import functools
45import os
56import pathlib
67import re
78from collections .abc import Callable , Generator , Sequence
89from pathlib import Path
910from typing import NamedTuple
11+ from urllib .parse import urlparse
1012
1113from dfetch .log import get_logger
1214from dfetch .util .cmdline import SubprocessCommandError , run_on_cmdline
1517
1618logger = get_logger (__name__ )
1719
20+ _SSH_HOST_KEY_MSGS = ("host key verification failed" , "authenticity of host" )
21+
22+
23+ # As a cli tool, we can safely assume this remains stable during the runtime, caching for speed is better
24+ @functools .lru_cache
25+ def _extend_env_for_non_interactive_mode () -> dict [str , str ]:
26+ """Extend the environment vars for svn running in non-interactive mode."""
27+ env = os .environ .copy ()
28+ ssh_cmd = env .get ("SVN_SSH" , "ssh" )
29+ if "BatchMode=" not in ssh_cmd :
30+ ssh_cmd += " -o BatchMode=yes"
31+ else :
32+ logger .debug ('BatchMode already configured in SVN_SSH: "%s"' , ssh_cmd )
33+ env ["SVN_SSH" ] = ssh_cmd
34+ return env
35+
36+
37+ def _ssh_target_from_url (url : str ) -> str :
38+ """Return the ``[user@]host`` portion of a svn+ssh URL, or the URL itself."""
39+ parsed = urlparse (url )
40+ host = parsed .hostname or url
41+ return f"{ parsed .username } @{ host } " if parsed .username else host
42+
43+
44+ def _raise_if_ssh_host_key_error (url : str , exc : SubprocessCommandError ) -> None :
45+ """Raise a helpful RuntimeError if *exc* looks like an SSH host-key failure."""
46+ stderr_lower = exc .stderr .lower ()
47+ if any (msg in stderr_lower for msg in _SSH_HOST_KEY_MSGS ):
48+ target = _ssh_target_from_url (url )
49+ raise RuntimeError (
50+ f"SSH host key verification failed while connecting to '{ url } '.\n "
51+ "Add the host to your known hosts file, for example by running:\n "
52+ f" ssh-keyscan { target } >> ~/.ssh/known_hosts\n "
53+ "Or test the SSH connection manually:\n "
54+ f" ssh -T { target } "
55+ ) from exc
56+
1857
1958def get_svn_version () -> tuple [str , str ]:
2059 """Get the name and version of svn."""
@@ -49,9 +88,14 @@ def __init__(self, remote: str) -> None:
4988 def is_svn (self ) -> bool :
5089 """Check if is SVN."""
5190 try :
52- run_on_cmdline (logger , ["svn" , "info" , self ._remote , "--non-interactive" ])
91+ run_on_cmdline (
92+ logger ,
93+ ["svn" , "info" , self ._remote , "--non-interactive" ],
94+ env = _extend_env_for_non_interactive_mode (),
95+ )
5396 return True
5497 except SubprocessCommandError as exc :
98+ _raise_if_ssh_host_key_error (self ._remote , exc )
5599 if exc .stderr .startswith ("svn: E170013" ):
56100 raise RuntimeError (
57101 f">>>{ exc .cmd } <<< failed!\n "
@@ -67,20 +111,30 @@ def list_of_branches(self) -> list[str]:
67111 result = run_on_cmdline (
68112 logger ,
69113 ["svn" , "ls" , "--non-interactive" , f"{ self ._remote } /branches" ],
114+ env = _extend_env_for_non_interactive_mode (),
70115 )
71116 return [
72117 line .strip ("/\r " )
73118 for line in result .stdout .decode ().splitlines ()
74119 if line .strip ("/\r " )
75120 ]
76- except (SubprocessCommandError , RuntimeError ):
121+ except SubprocessCommandError as exc :
122+ _raise_if_ssh_host_key_error (self ._remote , exc )
123+ return []
124+ except RuntimeError :
77125 return []
78126
79127 def list_of_tags (self ) -> list [str ]:
80128 """Get list of all available tags."""
81- result = run_on_cmdline (
82- logger , ["svn" , "ls" , "--non-interactive" , f"{ self ._remote } /tags" ]
83- )
129+ try :
130+ result = run_on_cmdline (
131+ logger ,
132+ ["svn" , "ls" , "--non-interactive" , f"{ self ._remote } /tags" ],
133+ env = _extend_env_for_non_interactive_mode (),
134+ )
135+ except SubprocessCommandError as exc :
136+ _raise_if_ssh_host_key_error (self ._remote , exc )
137+ raise
84138 return [
85139 str (tag ).strip ("/\r " ) for tag in result .stdout .decode ().split ("\n " ) if tag
86140 ]
@@ -116,7 +170,9 @@ def ls_tree(self, url_path: str) -> list[tuple[str, bool]]:
116170 """List immediate children of *url_path* as ``(name, is_dir)`` pairs."""
117171 try :
118172 result = run_on_cmdline (
119- logger , ["svn" , "ls" , "--non-interactive" , url_path ]
173+ logger ,
174+ ["svn" , "ls" , "--non-interactive" , url_path ],
175+ env = _extend_env_for_non_interactive_mode (),
120176 )
121177 entries : list [tuple [str , bool ]] = []
122178 for line in result .stdout .decode ().splitlines ():
@@ -126,7 +182,10 @@ def ls_tree(self, url_path: str) -> list[tuple[str, bool]]:
126182 is_dir = line .endswith ("/" )
127183 entries .append ((line .rstrip ("/" ), is_dir ))
128184 return entries
129- except (SubprocessCommandError , RuntimeError ):
185+ except SubprocessCommandError as exc :
186+ _raise_if_ssh_host_key_error (url_path , exc )
187+ return []
188+ except RuntimeError :
130189 return []
131190
132191
@@ -176,7 +235,7 @@ def externals_from_url(url: str, revision: str = "") -> list[External]:
176235 if revision :
177236 cmd += ["--revision" , revision ]
178237 cmd += [url ]
179- result = run_on_cmdline (logger , cmd )
238+ result = run_on_cmdline (logger , cmd , env = _extend_env_for_non_interactive_mode () )
180239 repo_root = SvnRepo .get_info_from_target (url )["Repository Root" ]
181240 normalized = SvnRepo ._normalize_url_prefix (result .stdout .decode (), url )
182241 return SvnRepo ._parse_externals (normalized , repo_root )
@@ -292,9 +351,12 @@ def get_info_from_target(target: str = "") -> dict[str, str]:
292351 """Get the info of the given target."""
293352 try :
294353 result = run_on_cmdline (
295- logger , ["svn" , "info" , "--non-interactive" , target .strip ()]
354+ logger ,
355+ ["svn" , "info" , "--non-interactive" , target .strip ()],
356+ env = _extend_env_for_non_interactive_mode (),
296357 ).stdout .decode ()
297358 except SubprocessCommandError as exc :
359+ _raise_if_ssh_host_key_error (target , exc )
298360 if exc .stderr .startswith ("svn: E170013" ):
299361 raise RuntimeError (
300362 f">>>{ exc .cmd } <<< failed!\n "
@@ -335,6 +397,7 @@ def get_last_changed_revision(target: str | Path) -> str:
335397 "last-changed-revision" ,
336398 target_str ,
337399 ],
400+ env = _extend_env_for_non_interactive_mode (),
338401 )
339402 .stdout .decode ()
340403 .strip ()
@@ -382,6 +445,7 @@ def export(url: str, rev: str = "", dst: str = ".") -> None:
382445 ["svn" , "export" , "--non-interactive" , "--force" ]
383446 + (["--revision" , rev ] if rev else [])
384447 + [url , dst ],
448+ env = _extend_env_for_non_interactive_mode (),
385449 )
386450
387451 @staticmethod
@@ -390,7 +454,9 @@ def files_in_path(url_path: str) -> list[str]:
390454 return [
391455 str (line )
392456 for line in run_on_cmdline (
393- logger , ["svn" , "list" , "--non-interactive" , url_path ]
457+ logger ,
458+ ["svn" , "list" , "--non-interactive" , url_path ],
459+ env = _extend_env_for_non_interactive_mode (),
394460 )
395461 .stdout .decode ()
396462 .splitlines ()
0 commit comments