Skip to content

Commit edf7c7d

Browse files
authored
Merge pull request #3210 from blacklanternsecurity/e2e-tests
Add E2E tests, bump cloudcheck to 11.x
2 parents 2802c97 + cf92ce9 commit edf7c7d

2 files changed

Lines changed: 217 additions & 1 deletion

File tree

bbot/test/test_step_1/test_e2e.py

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
"""
2+
End-to-end tests that install bbot into a fresh virtualenv and run real CLI scans.
3+
4+
Tests:
5+
1. Install from local source into a temp venv
6+
2. DNS-only scan against a real public target
7+
3. Web scan against a local HTTP server (exercises http + excavate)
8+
"""
9+
10+
import json
11+
import subprocess
12+
import sys
13+
import os
14+
import socket
15+
import textwrap
16+
import time
17+
from pathlib import Path
18+
19+
import pytest
20+
21+
22+
def _find_repo_root():
23+
"""Find the bbot repo root from the source tree, without requiring git."""
24+
# Walk up from this test file to find pyproject.toml
25+
d = Path(__file__).resolve().parent
26+
for _ in range(10):
27+
if (d / "pyproject.toml").is_file():
28+
return str(d)
29+
d = d.parent
30+
raise FileNotFoundError("could not locate repo root (no pyproject.toml found)")
31+
32+
33+
@pytest.fixture(scope="module")
34+
def bbot_venv(tmp_path_factory):
35+
"""Create a fresh virtualenv and pip-install bbot from the local checkout."""
36+
venv_dir = tmp_path_factory.mktemp("bbot_e2e_venv")
37+
repo_root = _find_repo_root()
38+
39+
subprocess.check_call([sys.executable, "-m", "venv", str(venv_dir)])
40+
pip = str(venv_dir / "bin" / "pip")
41+
bbot = str(venv_dir / "bin" / "bbot")
42+
43+
subprocess.check_call([pip, "install", "-e", repo_root, "--quiet"], timeout=300)
44+
assert os.path.isfile(bbot), f"bbot CLI not found at {bbot}"
45+
return bbot
46+
47+
48+
def _free_port():
49+
"""Find a free TCP port."""
50+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
51+
s.bind(("127.0.0.1", 0))
52+
return s.getsockname()[1]
53+
54+
55+
@pytest.fixture()
56+
def local_webserver(tmp_path):
57+
"""Spawn a local HTTP server with a page that excavate can extract from."""
58+
port = _free_port()
59+
webroot = tmp_path / "webroot"
60+
webroot.mkdir()
61+
62+
html = textwrap.dedent("""\
63+
<html>
64+
<head><title>E2E Test Page</title></head>
65+
<body>
66+
<a href="/secondpage.html">Second Page</a>
67+
</body>
68+
</html>
69+
""")
70+
(webroot / "index.html").write_text(html)
71+
(webroot / "secondpage.html").write_text("<html><body>second page</body></html>")
72+
73+
server = subprocess.Popen(
74+
[sys.executable, "-m", "http.server", str(port), "--directory", str(webroot), "--bind", "127.0.0.1"],
75+
stdout=subprocess.DEVNULL,
76+
stderr=subprocess.DEVNULL,
77+
)
78+
79+
# wait for server to be ready
80+
for _ in range(20):
81+
try:
82+
with socket.create_connection(("127.0.0.1", port), timeout=0.5):
83+
break
84+
except OSError:
85+
time.sleep(0.25)
86+
else:
87+
server.kill()
88+
pytest.fail(f"Local web server failed to start on port {port}")
89+
90+
yield f"http://127.0.0.1:{port}"
91+
92+
server.terminate()
93+
try:
94+
server.wait(timeout=5)
95+
except subprocess.TimeoutExpired:
96+
server.kill()
97+
98+
99+
def run_bbot(bbot_bin, *args, timeout=180):
100+
"""Run the bbot CLI as a real subprocess."""
101+
return subprocess.run(
102+
[bbot_bin] + list(args),
103+
capture_output=True,
104+
text=True,
105+
timeout=timeout,
106+
)
107+
108+
109+
class TestE2E:
110+
def test_install_and_help(self, bbot_venv):
111+
"""bbot installs from local source and -h works."""
112+
r = run_bbot(bbot_venv, "-h")
113+
assert r.returncode == 0, f"bbot -h failed:\n{r.stderr[-2000:]}"
114+
assert "usage" in (r.stdout + r.stderr).lower()
115+
116+
def test_scan_dns(self, bbot_venv, tmp_path):
117+
"""DNS-only scan against one.one.one.one resolves and produces expected events."""
118+
scan_name = "e2e_dns"
119+
r = run_bbot(
120+
bbot_venv,
121+
"-y",
122+
"-t",
123+
"one.one.one.one",
124+
"-n",
125+
scan_name,
126+
"-c",
127+
"dns.minimal=true",
128+
f"home={tmp_path}",
129+
"--json",
130+
)
131+
assert r.returncode == 0, f"bbot exited {r.returncode}:\n{r.stderr[-2000:]}"
132+
133+
events = [json.loads(line) for line in r.stdout.splitlines() if line.strip()]
134+
types = {e["type"] for e in events}
135+
assert "DNS_NAME" in types, f"no DNS_NAME events, got types: {types}"
136+
137+
dns_events = [e for e in events if e["type"] == "DNS_NAME" and e["data"] == "one.one.one.one"]
138+
assert dns_events, "no DNS_NAME event for one.one.one.one"
139+
dns_event = dns_events[0]
140+
assert "in-scope" in dns_event["tags"]
141+
resolved = set(dns_event.get("resolved_hosts", []))
142+
assert resolved & {"1.1.1.1", "1.0.0.1"}, f"expected Cloudflare IPs in resolved_hosts, got {resolved}"
143+
144+
# verify output files
145+
scan_home = tmp_path / "scans" / scan_name
146+
for f in ("output.json", "output.txt", "output.csv", "preset.yml", "scan.log"):
147+
assert (scan_home / f).is_file(), f"{f} not found in scan output"
148+
149+
def test_scan_web(self, bbot_venv, tmp_path, local_webserver):
150+
"""Web scan against a local server exercises http + excavate."""
151+
scan_name = "e2e_web"
152+
r = run_bbot(
153+
bbot_venv,
154+
"-y",
155+
"-t",
156+
local_webserver,
157+
"-n",
158+
scan_name,
159+
"-p",
160+
"spider",
161+
"-c",
162+
f"home={tmp_path}",
163+
"--json",
164+
)
165+
assert r.returncode == 0, f"bbot exited {r.returncode}:\n{r.stderr[-2000:]}"
166+
167+
events = [json.loads(line) for line in r.stdout.splitlines() if line.strip()]
168+
types = {e["type"] for e in events}
169+
170+
assert "URL" in types, f"no URL events, got types: {types}"
171+
172+
url_events = [e for e in events if e["type"] == "URL"]
173+
urls = [e.get("data_json", {}).get("url", "") for e in url_events]
174+
assert any("secondpage.html" in u for u in urls), f"excavate+spider didn't find secondpage.html in {urls}"
175+
176+
def test_clean_shutdown(self, bbot_venv, tmp_path):
177+
"""Scan completes cleanly: no errors, no orphaned processes."""
178+
import psutil
179+
180+
r = run_bbot(
181+
bbot_venv,
182+
"-y",
183+
"-t",
184+
"one.one.one.one",
185+
"-n",
186+
"e2e_shutdown",
187+
"-c",
188+
"dns.minimal=true",
189+
f"home={tmp_path}",
190+
)
191+
assert r.returncode == 0, f"bbot exited {r.returncode}:\n{r.stderr[-2000:]}"
192+
193+
# check output.json for clean completion
194+
output_json = tmp_path / "scans" / "e2e_shutdown" / "output.json"
195+
assert output_json.is_file(), "output.json not found"
196+
events = [json.loads(line) for line in output_json.read_text().splitlines() if line.strip()]
197+
scan_events = [e for e in events if e.get("type") == "SCAN"]
198+
statuses = [e.get("data_json", {}).get("status") for e in scan_events]
199+
assert "FINISHED" in statuses, f"scan didn't reach FINISHED status, got: {statuses}"
200+
201+
# no critical/error messages in stderr
202+
for line in r.stderr.splitlines():
203+
assert "[CRIT]" not in line, f"critical error during scan: {line.strip()}"
204+
205+
# no orphaned bbot processes
206+
current = psutil.Process()
207+
children = current.children(recursive=True)
208+
bbot_orphans = []
209+
for p in children:
210+
try:
211+
name = p.name()
212+
except (psutil.NoSuchProcess, psutil.AccessDenied):
213+
continue
214+
if name == "bbot":
215+
bbot_orphans.append(p)
216+
assert not bbot_orphans, f"orphaned bbot processes after scan: {[(p.pid, p.cmdline()) for p in bbot_orphans]}"

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)