|
15 | 15 | import filecmp |
16 | 16 | import errno |
17 | 17 | from tempfile import NamedTemporaryFile |
| 18 | +from contextlib import contextmanager |
18 | 19 | from util import ( |
19 | 20 | wait_for_mount, |
20 | 21 | umount, |
@@ -123,6 +124,9 @@ def test_sshfs( |
123 | 124 | # FUSE Cache |
124 | 125 | cmdline += ["-o", "entry_timeout=0", "-o", "attr_timeout=0"] |
125 | 126 |
|
| 127 | + # Disable containment so tst_symlink can test absolute targets |
| 128 | + cmdline += ["-o", "no_contain_symlinks"] |
| 129 | + |
126 | 130 | if multiconn: |
127 | 131 | cmdline += ["-o", "max_conns=3"] |
128 | 132 |
|
@@ -305,6 +309,12 @@ def tst_symlink(mnt_dir): |
305 | 309 | assert fstat.st_nlink == 1 |
306 | 310 | assert linkname in os.listdir(mnt_dir) |
307 | 311 |
|
| 312 | + # Relative symlink without .. should also work |
| 313 | + linkname2 = name_generator() |
| 314 | + fullname2 = mnt_dir + "/" + linkname2 |
| 315 | + os.symlink("subdir/file", fullname2) |
| 316 | + assert os.readlink(fullname2) == "subdir/file" |
| 317 | + |
308 | 318 | os.unlink(fullname) |
309 | 319 | assert linkname not in os.listdir(mnt_dir) |
310 | 320 |
|
@@ -910,3 +920,163 @@ def test_bad_sftp_reply_len(tmpdir): |
910 | 920 | ) |
911 | 921 | assert res.returncode != 0 |
912 | 922 | assert "bad reply len: 0" in res.stderr |
| 923 | + |
| 924 | + |
| 925 | +@contextmanager |
| 926 | +def _sshfs_mount(src_dir, mnt_dir, extra_opts=None): |
| 927 | + """Mount src_dir via sshfs, yield, then unmount.""" |
| 928 | + cmdline = base_cmdline + [ |
| 929 | + pjoin(basename, "sshfs"), "-f", |
| 930 | + f"localhost:{src_dir}", mnt_dir, |
| 931 | + "-o", "entry_timeout=0", "-o", "attr_timeout=0", |
| 932 | + ] |
| 933 | + if extra_opts: |
| 934 | + for opt in extra_opts: |
| 935 | + cmdline += ["-o", opt] |
| 936 | + new_env = dict(os.environ) |
| 937 | + new_env["G_DEBUG"] = "fatal-warnings" |
| 938 | + mount_process = subprocess.Popen(cmdline, env=new_env) |
| 939 | + try: |
| 940 | + wait_for_mount(mount_process, mnt_dir) |
| 941 | + yield mnt_dir |
| 942 | + except Exception: |
| 943 | + cleanup(mount_process, mnt_dir) |
| 944 | + raise |
| 945 | + else: |
| 946 | + umount(mount_process, mnt_dir) |
| 947 | + |
| 948 | + |
| 949 | +def test_contain_symlinks(tmpdir, capfd) -> None: |
| 950 | + """Default containment: safe symlinks resolve, dangerous ones get EPERM.""" |
| 951 | + |
| 952 | + capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0) |
| 953 | + _check_ssh_localhost() |
| 954 | + |
| 955 | + mnt_dir = str(tmpdir.mkdir("mnt")) |
| 956 | + src_dir = str(tmpdir.mkdir("src")) |
| 957 | + |
| 958 | + os.makedirs(pjoin(src_dir, "sub")) |
| 959 | + with open(pjoin(src_dir, "sub", "target"), "w") as f: |
| 960 | + f.write("hello") |
| 961 | + |
| 962 | + os.symlink("sub/target", pjoin(src_dir, "safe")) |
| 963 | + os.symlink("./sub/target", pjoin(src_dir, "safe_dot")) |
| 964 | + os.symlink("/etc/passwd", pjoin(src_dir, "abs")) |
| 965 | + os.symlink("../../../etc/passwd", pjoin(src_dir, "dotdot")) |
| 966 | + os.symlink("sub/../../etc/passwd", pjoin(src_dir, "interleaved")) |
| 967 | + os.symlink("..", pjoin(src_dir, "bare_dotdot")) |
| 968 | + |
| 969 | + with _sshfs_mount(src_dir, mnt_dir): |
| 970 | + # Safe symlinks pass through and resolve |
| 971 | + assert os.readlink(pjoin(mnt_dir, "safe")) == "sub/target" |
| 972 | + assert os.readlink(pjoin(mnt_dir, "safe_dot")) == "./sub/target" |
| 973 | + with open(pjoin(mnt_dir, "safe")) as f: |
| 974 | + assert f.read() == "hello" |
| 975 | + |
| 976 | + # Dangerous: readlink returns EPERM |
| 977 | + for name in ("abs", "dotdot", "interleaved", "bare_dotdot"): |
| 978 | + with pytest.raises(OSError) as exc_info: |
| 979 | + os.readlink(pjoin(mnt_dir, name)) |
| 980 | + assert exc_info.value.errno == errno.EPERM |
| 981 | + |
| 982 | + # Dangerous: traversal (open/stat) also EPERM |
| 983 | + with pytest.raises(OSError) as exc_info: |
| 984 | + open(pjoin(mnt_dir, "abs")) |
| 985 | + assert exc_info.value.errno == errno.EPERM |
| 986 | + |
| 987 | + with pytest.raises(OSError) as exc_info: |
| 988 | + os.stat(pjoin(mnt_dir, "dotdot")) |
| 989 | + assert exc_info.value.errno == errno.EPERM |
| 990 | + |
| 991 | + |
| 992 | +def test_no_contain_symlinks(tmpdir, capfd) -> None: |
| 993 | + """Opt-out: symlinks pass through and actually resolve.""" |
| 994 | + |
| 995 | + capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0) |
| 996 | + _check_ssh_localhost() |
| 997 | + |
| 998 | + mnt_dir = str(tmpdir.mkdir("mnt")) |
| 999 | + src_dir = str(tmpdir.mkdir("src")) |
| 1000 | + |
| 1001 | + os.symlink("/etc/passwd", pjoin(src_dir, "abs_link")) |
| 1002 | + os.symlink("../../../etc/passwd", pjoin(src_dir, "rel_escape")) |
| 1003 | + |
| 1004 | + with _sshfs_mount(src_dir, mnt_dir, ["no_contain_symlinks"]): |
| 1005 | + assert os.readlink(pjoin(mnt_dir, "abs_link")) == "/etc/passwd" |
| 1006 | + assert os.readlink(pjoin(mnt_dir, "rel_escape")) == "../../../etc/passwd" |
| 1007 | + |
| 1008 | + # Absolute symlink actually resolves (reads local /etc/passwd) |
| 1009 | + with open(pjoin(mnt_dir, "abs_link")) as f: |
| 1010 | + assert "root" in f.read() |
| 1011 | + |
| 1012 | + # Relative escape: kernel must traverse the link (not EPERM). |
| 1013 | + # Target won't exist on the test host, so we just assert that |
| 1014 | + # sshfs didn't block it - any errno other than EPERM proves |
| 1015 | + # containment is genuinely disabled. |
| 1016 | + with pytest.raises(OSError) as exc_info: |
| 1017 | + os.stat(pjoin(mnt_dir, "rel_escape")) |
| 1018 | + assert exc_info.value.errno != errno.EPERM |
| 1019 | + |
| 1020 | + |
| 1021 | +def test_transform_with_contain(tmpdir, capfd) -> None: |
| 1022 | + """transform_symlinks + default containment: transformed ../x is rejected.""" |
| 1023 | + |
| 1024 | + capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0) |
| 1025 | + capfd.register_output(r"^warning: transform_symlinks.+", count=0) |
| 1026 | + _check_ssh_localhost() |
| 1027 | + |
| 1028 | + mnt_dir = str(tmpdir.mkdir("mnt")) |
| 1029 | + src_dir = str(tmpdir.mkdir("src")) |
| 1030 | + |
| 1031 | + os.makedirs(pjoin(src_dir, "other")) |
| 1032 | + with open(pjoin(src_dir, "other", "file"), "w") as f: |
| 1033 | + f.write("data") |
| 1034 | + # Absolute in-base: transform rewrites to "other/file" (no ..) |
| 1035 | + os.symlink(pjoin(src_dir, "other", "file"), pjoin(src_dir, "inbase")) |
| 1036 | + # Absolute in-base but sibling: transform rewrites to "../other/file" |
| 1037 | + os.makedirs(pjoin(src_dir, "sub")) |
| 1038 | + os.symlink(pjoin(src_dir, "other", "file"), pjoin(src_dir, "sub", "sibling")) |
| 1039 | + |
| 1040 | + with _sshfs_mount(src_dir, mnt_dir, ["transform_symlinks"]): |
| 1041 | + # Direct child: transform produces "other/file" - no .., passes |
| 1042 | + link = os.readlink(pjoin(mnt_dir, "inbase")) |
| 1043 | + assert ".." not in link.split("/") |
| 1044 | + with open(pjoin(mnt_dir, "inbase")) as f: |
| 1045 | + assert f.read() == "data" |
| 1046 | + |
| 1047 | + # Sibling: transform produces "../other/file" - has .., EPERM |
| 1048 | + with pytest.raises(OSError) as exc_info: |
| 1049 | + os.readlink(pjoin(mnt_dir, "sub", "sibling")) |
| 1050 | + assert exc_info.value.errno == errno.EPERM |
| 1051 | + |
| 1052 | + # Same setup with no_contain_symlinks: sibling works |
| 1053 | + with _sshfs_mount(src_dir, mnt_dir, |
| 1054 | + ["transform_symlinks", "no_contain_symlinks"]): |
| 1055 | + link = os.readlink(pjoin(mnt_dir, "sub", "sibling")) |
| 1056 | + assert ".." in link |
| 1057 | + with open(pjoin(mnt_dir, "sub", "sibling")) as f: |
| 1058 | + assert f.read() == "data" |
| 1059 | + |
| 1060 | + |
| 1061 | +def test_contain_symlinks_option_precedence(tmpdir, capfd) -> None: |
| 1062 | + """Last option wins when contain_symlinks and no_contain_symlinks both set.""" |
| 1063 | + |
| 1064 | + capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0) |
| 1065 | + _check_ssh_localhost() |
| 1066 | + |
| 1067 | + mnt_dir = str(tmpdir.mkdir("mnt")) |
| 1068 | + src_dir = str(tmpdir.mkdir("src")) |
| 1069 | + |
| 1070 | + os.symlink("/etc/passwd", pjoin(src_dir, "abs")) |
| 1071 | + |
| 1072 | + # no_contain_symlinks last: containment disabled, readlink succeeds |
| 1073 | + with _sshfs_mount(src_dir, mnt_dir, |
| 1074 | + ["contain_symlinks", "no_contain_symlinks"]): |
| 1075 | + assert os.readlink(pjoin(mnt_dir, "abs")) == "/etc/passwd" |
| 1076 | + |
| 1077 | + # contain_symlinks last: containment enabled, EPERM |
| 1078 | + with _sshfs_mount(src_dir, mnt_dir, |
| 1079 | + ["no_contain_symlinks", "contain_symlinks"]): |
| 1080 | + with pytest.raises(OSError) as exc_info: |
| 1081 | + os.readlink(pjoin(mnt_dir, "abs")) |
| 1082 | + assert exc_info.value.errno == errno.EPERM |
0 commit comments