|
| 1 | +"""Tests for WebDAVServer._run_server_thread, stop() running-server path, |
| 2 | +and test_connection() PROPFIND probe. |
| 3 | +""" |
| 4 | +from unittest.mock import MagicMock, patch |
| 5 | + |
| 6 | +import pytest |
| 7 | + |
| 8 | +from services.webdav_server import WebDAVServer |
| 9 | +from config.config import config_service |
| 10 | + |
| 11 | + |
| 12 | +@pytest.fixture |
| 13 | +def server(): |
| 14 | + s = WebDAVServer() |
| 15 | + s._setup_signal_handlers = lambda: None |
| 16 | + return s |
| 17 | + |
| 18 | + |
| 19 | +# ---------- _run_server_thread (waitress branch) ---------- |
| 20 | + |
| 21 | +def test_run_server_thread_waitress_calls_serve(server): |
| 22 | + """The thread runner must call waitress.serve with the configured |
| 23 | + host/port + the production threading parameters.""" |
| 24 | + captured = {} |
| 25 | + def fake_serve(app, **kw): |
| 26 | + captured.update(kw) |
| 27 | + raise RuntimeError("stop here") |
| 28 | + |
| 29 | + with patch('services.webdav_server.WSGI_SERVER', 'waitress'), \ |
| 30 | + patch('services.webdav_server.serve', side_effect=fake_serve), \ |
| 31 | + patch.object(config_service, 'clear_webdav_pid'): |
| 32 | + server.app = MagicMock() |
| 33 | + server._run_server_thread() |
| 34 | + |
| 35 | + assert captured['host'] == server.config['host'] |
| 36 | + assert captured['port'] == server.config['port'] |
| 37 | + # Production threading config that callers depend on |
| 38 | + assert captured['threads'] == 10 |
| 39 | + assert captured['connection_limit'] == 1000 |
| 40 | + assert captured['ident'] == 'Internxt WebDAV Server' |
| 41 | + |
| 42 | + |
| 43 | +def test_run_server_thread_keyboard_interrupt_is_caught(server): |
| 44 | + """Ctrl+C in serve() must NOT propagate — let cleanup run.""" |
| 45 | + with patch('services.webdav_server.WSGI_SERVER', 'waitress'), \ |
| 46 | + patch('services.webdav_server.serve', side_effect=KeyboardInterrupt()), \ |
| 47 | + patch.object(config_service, 'clear_webdav_pid') as mock_clear: |
| 48 | + server.app = MagicMock() |
| 49 | + server._run_server_thread() # must not raise |
| 50 | + # Cleanup ran (clear_webdav_pid called from finally) |
| 51 | + mock_clear.assert_called() |
| 52 | + assert server.is_running is False |
| 53 | + |
| 54 | + |
| 55 | +def test_run_server_thread_exception_when_stopping_is_silent(server): |
| 56 | + """If server is being stopped, exceptions during teardown must NOT |
| 57 | + print error trace — that's just normal shutdown noise.""" |
| 58 | + server.is_stopping = True |
| 59 | + with patch('services.webdav_server.WSGI_SERVER', 'waitress'), \ |
| 60 | + patch('services.webdav_server.serve', |
| 61 | + side_effect=RuntimeError("normal shutdown")), \ |
| 62 | + patch.object(config_service, 'clear_webdav_pid'): |
| 63 | + server.app = MagicMock() |
| 64 | + server._run_server_thread() # must not raise |
| 65 | + assert server.is_running is False |
| 66 | + |
| 67 | + |
| 68 | +def test_run_server_thread_cheroot_branch(server): |
| 69 | + """When WSGI_SERVER='cheroot', should construct a Cheroot Server and |
| 70 | + call .start() on it.""" |
| 71 | + fake_server_inst = MagicMock() |
| 72 | + fake_server_inst.start.side_effect = RuntimeError("stop here") |
| 73 | + fake_wsgi_module = MagicMock() |
| 74 | + fake_wsgi_module.Server.return_value = fake_server_inst |
| 75 | + |
| 76 | + with patch('services.webdav_server.WSGI_SERVER', 'cheroot'), \ |
| 77 | + patch('services.webdav_server.wsgi', fake_wsgi_module), \ |
| 78 | + patch.object(config_service, 'clear_webdav_pid'): |
| 79 | + server.app = MagicMock() |
| 80 | + server._run_server_thread() |
| 81 | + |
| 82 | + fake_wsgi_module.Server.assert_called_once() |
| 83 | + fake_server_inst.start.assert_called_once() |
| 84 | + # The Server constructor must receive the configured bind_addr |
| 85 | + _, kwargs = fake_wsgi_module.Server.call_args |
| 86 | + assert kwargs['bind_addr'] == (server.config['host'], server.config['port']) |
| 87 | + |
| 88 | + |
| 89 | +# ---------- stop() — foreground server still running ---------- |
| 90 | + |
| 91 | +def test_stop_foreground_cheroot_calls_server_stop(server): |
| 92 | + """When running cheroot in foreground, stop must call self.server.stop().""" |
| 93 | + fake_server = MagicMock() |
| 94 | + server.server = fake_server |
| 95 | + server.is_running = True |
| 96 | + |
| 97 | + with patch.object(config_service, 'read_webdav_pid', return_value=None), \ |
| 98 | + patch.object(config_service, 'clear_webdav_pid'), \ |
| 99 | + patch('services.webdav_server.WSGI_SERVER', 'cheroot'): |
| 100 | + result = server.stop() |
| 101 | + |
| 102 | + fake_server.stop.assert_called_once() |
| 103 | + assert result['success'] is True |
| 104 | + |
| 105 | + |
| 106 | +def test_stop_foreground_cheroot_tolerates_server_stop_error(server): |
| 107 | + """If self.server.stop() raises, we still report overall success.""" |
| 108 | + fake_server = MagicMock() |
| 109 | + fake_server.stop.side_effect = RuntimeError("can't stop") |
| 110 | + server.server = fake_server |
| 111 | + server.is_running = True |
| 112 | + |
| 113 | + with patch.object(config_service, 'read_webdav_pid', return_value=None), \ |
| 114 | + patch.object(config_service, 'clear_webdav_pid'), \ |
| 115 | + patch('services.webdav_server.WSGI_SERVER', 'cheroot'): |
| 116 | + result = server.stop() |
| 117 | + |
| 118 | + # Wrapped: outer try still succeeds because inner exception was caught |
| 119 | + assert result['success'] is True |
| 120 | + |
| 121 | + |
| 122 | +# ---------- test_connection (the WebDAV PROPFIND probe) ---------- |
| 123 | + |
| 124 | +def test_test_connection_returns_failure_when_not_running(server): |
| 125 | + server.is_running = False |
| 126 | + with patch.object(config_service, 'read_webdav_pid', return_value=None): |
| 127 | + result = server.test_connection() |
| 128 | + assert result['success'] is False |
| 129 | + assert 'not running' in result['message'].lower() |
| 130 | + |
| 131 | + |
| 132 | +def test_test_connection_returns_success_on_207_xml(server): |
| 133 | + fake_resp = MagicMock() |
| 134 | + fake_resp.status_code = 207 |
| 135 | + fake_resp.text = '<?xml version="1.0"?><D:multistatus />' |
| 136 | + |
| 137 | + with patch.object(server, 'status', return_value={'running': True}), \ |
| 138 | + patch('requests.request', return_value=fake_resp): |
| 139 | + result = server.test_connection() |
| 140 | + assert result['success'] is True |
| 141 | + assert result['status_code'] == 207 |
| 142 | + |
| 143 | + |
| 144 | +def test_test_connection_reports_failure_on_non_207(server): |
| 145 | + fake_resp = MagicMock() |
| 146 | + fake_resp.status_code = 401 |
| 147 | + fake_resp.text = '' |
| 148 | + |
| 149 | + with patch.object(server, 'status', return_value={'running': True}), \ |
| 150 | + patch('requests.request', return_value=fake_resp): |
| 151 | + result = server.test_connection() |
| 152 | + assert result['success'] is False |
| 153 | + assert result['status_code'] == 401 |
| 154 | + |
| 155 | + |
| 156 | +def test_test_connection_reports_failure_on_request_exception(server): |
| 157 | + with patch.object(server, 'status', return_value={'running': True}), \ |
| 158 | + patch('requests.request', side_effect=ConnectionError("refused")): |
| 159 | + result = server.test_connection() |
| 160 | + assert result['success'] is False |
| 161 | + assert 'failed' in result['message'].lower() |
| 162 | + |
| 163 | + |
| 164 | +# ---------- get_mount_instructions content checks ---------- |
| 165 | + |
| 166 | +def test_mount_instructions_macos_includes_finder_and_cli(server): |
| 167 | + inst = server.get_mount_instructions() |
| 168 | + macos = inst['macos'] |
| 169 | + assert 'Finder' in macos |
| 170 | + assert 'Cmd+K' in macos |
| 171 | + assert 'mount -t webdav' in macos |
| 172 | + |
| 173 | + |
| 174 | +def test_mount_instructions_linux_includes_davfs(server): |
| 175 | + inst = server.get_mount_instructions() |
| 176 | + linux = inst['linux'] |
| 177 | + assert 'davfs' in linux.lower() |
| 178 | + assert 'mount -t davfs' in linux |
| 179 | + |
| 180 | + |
| 181 | +def test_mount_instructions_windows_includes_map_drive(server): |
| 182 | + inst = server.get_mount_instructions() |
| 183 | + windows = inst['windows'] |
| 184 | + assert 'Map network drive' in windows or 'Map' in windows |
| 185 | + |
| 186 | + |
| 187 | +# ---------- _check_port_available ---------- |
| 188 | + |
| 189 | +def test_check_port_available_returns_true_for_free_port(server): |
| 190 | + """A high port number that's almost certainly unused should report available.""" |
| 191 | + assert isinstance(server._check_port_available(58743), bool) |
| 192 | + |
| 193 | + |
| 194 | +def test_check_port_available_returns_false_when_port_in_use(server): |
| 195 | + import socket |
| 196 | + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 197 | + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 198 | + sock.bind(('localhost', 0)) |
| 199 | + sock.listen(1) |
| 200 | + busy_port = sock.getsockname()[1] |
| 201 | + try: |
| 202 | + assert server._check_port_available(busy_port) is False |
| 203 | + finally: |
| 204 | + sock.close() |
0 commit comments