|
18 | 18 | _probe_ssh, |
19 | 19 | _read_server_banner, |
20 | 20 | _read_ssh_packet, |
| 21 | + probe_raw, |
21 | 22 | probe_tls, |
22 | 23 | ) |
23 | 24 |
|
@@ -1128,3 +1129,175 @@ def fake_recv(n: int) -> bytes: |
1128 | 1129 | assert "Could not parse SSH KEXINIT" in result.error |
1129 | 1130 | assert result.tls_version == "SSHv2" |
1130 | 1131 |
|
| 1132 | + |
| 1133 | +# --------------------------------------------------------------------------- |
| 1134 | +# probe_raw |
| 1135 | +# --------------------------------------------------------------------------- |
| 1136 | + |
| 1137 | + |
| 1138 | +def _make_raw_proc(stdout: bytes = b"", stderr: bytes = b"", returncode: int = 0): |
| 1139 | + proc = MagicMock() |
| 1140 | + proc.stdout = stdout |
| 1141 | + proc.stderr = stderr |
| 1142 | + proc.returncode = returncode |
| 1143 | + return proc |
| 1144 | + |
| 1145 | + |
| 1146 | +class TestProbeRaw: |
| 1147 | + def test_returns_combined_output_on_success(self, monkeypatch): |
| 1148 | + stdout = b"Connecting to 1.2.3.4\nCONNECTION ESTABLISHED\n" |
| 1149 | + stderr = b"depth=0 CN=example.com\n" |
| 1150 | + monkeypatch.setattr( |
| 1151 | + "quantumvalidator.tls_utils.subprocess.run", |
| 1152 | + lambda *a, **kw: _make_raw_proc(stdout, stderr), |
| 1153 | + ) |
| 1154 | + result = probe_raw("example.com", 25) |
| 1155 | + assert result == (stdout + stderr).decode("utf-8", errors="replace") |
| 1156 | + |
| 1157 | + def test_returns_none_when_openssl_missing(self, monkeypatch): |
| 1158 | + monkeypatch.setattr( |
| 1159 | + "quantumvalidator.tls_utils.check_openssl", |
| 1160 | + lambda: (False, "openssl not found"), |
| 1161 | + ) |
| 1162 | + assert probe_raw("example.com", 25) is None |
| 1163 | + |
| 1164 | + def test_returns_none_on_timeout(self, monkeypatch): |
| 1165 | + def _raise(*a, **kw): |
| 1166 | + raise subprocess.TimeoutExpired(cmd="openssl", timeout=12) |
| 1167 | + |
| 1168 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", _raise) |
| 1169 | + assert probe_raw("example.com", 25) is None |
| 1170 | + |
| 1171 | + def test_returns_none_on_oserror(self, monkeypatch): |
| 1172 | + def _raise(*a, **kw): |
| 1173 | + raise OSError("connection refused") |
| 1174 | + |
| 1175 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", _raise) |
| 1176 | + assert probe_raw("example.com", 25) is None |
| 1177 | + |
| 1178 | + def test_returns_none_for_invalid_port(self, monkeypatch): |
| 1179 | + # _validate_target raises ValueError for port 0; probe_raw catches it. |
| 1180 | + # Mock check_openssl so the test exercises _validate_target regardless |
| 1181 | + # of whether openssl is installed on the CI machine. |
| 1182 | + monkeypatch.setattr( |
| 1183 | + "quantumvalidator.tls_utils.check_openssl", |
| 1184 | + lambda: (True, "openssl 3.0.0"), |
| 1185 | + ) |
| 1186 | + assert probe_raw("example.com", 0) is None |
| 1187 | + |
| 1188 | + def test_starttls_smtp_in_cmd(self, monkeypatch): |
| 1189 | + captured = {} |
| 1190 | + |
| 1191 | + def fake_run(cmd, **kw): |
| 1192 | + captured["cmd"] = cmd |
| 1193 | + return _make_raw_proc() |
| 1194 | + |
| 1195 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", fake_run) |
| 1196 | + probe_raw("example.com", 25, starttls="smtp") |
| 1197 | + assert "-starttls" in captured["cmd"] |
| 1198 | + assert "smtp" in captured["cmd"] |
| 1199 | + |
| 1200 | + def test_sni_hostname_adds_servername(self, monkeypatch): |
| 1201 | + captured = {} |
| 1202 | + |
| 1203 | + def fake_run(cmd, **kw): |
| 1204 | + captured["cmd"] = cmd |
| 1205 | + return _make_raw_proc() |
| 1206 | + |
| 1207 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", fake_run) |
| 1208 | + probe_raw("1.2.3.4", 25, sni_hostname="mail.example.com") |
| 1209 | + assert "-servername" in captured["cmd"] |
| 1210 | + assert "mail.example.com" in captured["cmd"] |
| 1211 | + |
| 1212 | + def test_no_servername_when_sni_none(self, monkeypatch): |
| 1213 | + captured = {} |
| 1214 | + |
| 1215 | + def fake_run(cmd, **kw): |
| 1216 | + captured["cmd"] = cmd |
| 1217 | + return _make_raw_proc() |
| 1218 | + |
| 1219 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", fake_run) |
| 1220 | + probe_raw("example.com", 25) |
| 1221 | + assert "-servername" not in captured["cmd"] |
| 1222 | + |
| 1223 | + def test_brief_not_in_cmd(self, monkeypatch): |
| 1224 | + captured = {} |
| 1225 | + |
| 1226 | + def fake_run(cmd, **kw): |
| 1227 | + captured["cmd"] = cmd |
| 1228 | + return _make_raw_proc() |
| 1229 | + |
| 1230 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", fake_run) |
| 1231 | + probe_raw("example.com", 25) |
| 1232 | + assert "-brief" not in captured["cmd"] |
| 1233 | + |
| 1234 | + def test_ign_eof_in_cmd(self, monkeypatch): |
| 1235 | + captured = {} |
| 1236 | + |
| 1237 | + def fake_run(cmd, **kw): |
| 1238 | + captured["cmd"] = cmd |
| 1239 | + return _make_raw_proc() |
| 1240 | + |
| 1241 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", fake_run) |
| 1242 | + probe_raw("example.com", 25) |
| 1243 | + assert "-ign_eof" in captured["cmd"] |
| 1244 | + |
| 1245 | + def test_groups_in_cmd(self, monkeypatch): |
| 1246 | + captured = {} |
| 1247 | + |
| 1248 | + def fake_run(cmd, **kw): |
| 1249 | + captured["cmd"] = cmd |
| 1250 | + return _make_raw_proc() |
| 1251 | + |
| 1252 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", fake_run) |
| 1253 | + probe_raw("example.com", 25) |
| 1254 | + assert "-groups" in captured["cmd"] |
| 1255 | + groups_idx = captured["cmd"].index("-groups") |
| 1256 | + groups_val = captured["cmd"][groups_idx + 1] |
| 1257 | + assert all(g in groups_val for g in PROBE_GROUPS) |
| 1258 | + |
| 1259 | + def test_ipv6_bracket_notation(self, monkeypatch): |
| 1260 | + captured = {} |
| 1261 | + |
| 1262 | + def fake_run(cmd, **kw): |
| 1263 | + captured["cmd"] = cmd |
| 1264 | + return _make_raw_proc() |
| 1265 | + |
| 1266 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", fake_run) |
| 1267 | + probe_raw("::1", 25) |
| 1268 | + connect_idx = captured["cmd"].index("-connect") |
| 1269 | + connect_val = captured["cmd"][connect_idx + 1] |
| 1270 | + assert connect_val.startswith("[::1]:") |
| 1271 | + |
| 1272 | + def test_input_is_quit_crlf(self, monkeypatch): |
| 1273 | + captured = {} |
| 1274 | + |
| 1275 | + def fake_run(cmd, **kw): |
| 1276 | + captured["kwargs"] = kw |
| 1277 | + return _make_raw_proc() |
| 1278 | + |
| 1279 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", fake_run) |
| 1280 | + probe_raw("example.com", 25) |
| 1281 | + assert captured["kwargs"].get("input") == b"QUIT\r\n" |
| 1282 | + |
| 1283 | + def test_no_starttls_flag_when_none(self, monkeypatch): |
| 1284 | + captured = {} |
| 1285 | + |
| 1286 | + def fake_run(cmd, **kw): |
| 1287 | + captured["cmd"] = cmd |
| 1288 | + return _make_raw_proc() |
| 1289 | + |
| 1290 | + monkeypatch.setattr("quantumvalidator.tls_utils.subprocess.run", fake_run) |
| 1291 | + probe_raw("example.com", 443) |
| 1292 | + assert "-starttls" not in captured["cmd"] |
| 1293 | + |
| 1294 | + def test_non_utf8_output_decoded_with_replace(self, monkeypatch): |
| 1295 | + raw_bytes = b"ok \xff\xfe output" |
| 1296 | + monkeypatch.setattr( |
| 1297 | + "quantumvalidator.tls_utils.subprocess.run", |
| 1298 | + lambda *a, **kw: _make_raw_proc(stdout=raw_bytes), |
| 1299 | + ) |
| 1300 | + result = probe_raw("example.com", 443) |
| 1301 | + assert result is not None |
| 1302 | + assert "ok" in result |
| 1303 | + |
0 commit comments