Skip to content

Commit d45f5fa

Browse files
authored
Merge pull request #3080 from blacklanternsecurity/blasthttp-streaming
Blashttp request_batch_stream
2 parents 554feb4 + 5396147 commit d45f5fa

13 files changed

Lines changed: 313 additions & 284 deletions

File tree

bbot/core/helpers/web/web.py

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,23 @@
2020
log = logging.getLogger("bbot.core.helpers.web")
2121

2222

23+
async def iter_batch_results(stream):
24+
"""
25+
Yield individual ``BatchResult`` objects from a ``request_batch_stream`` iterator.
26+
27+
The native blasthttp 0.4.0 iterator yields lists of ``BatchResult`` (drained in
28+
chunks of 1000 / 200ms to amortize the Python↔Rust boundary). A future Python
29+
wrapper is expected to unwrap these into individual items. This helper handles
30+
both shapes so callers can write a single ``async for`` loop.
31+
"""
32+
async for item in stream:
33+
if isinstance(item, list):
34+
for r in item:
35+
yield r
36+
else:
37+
yield item
38+
39+
2340
class WebHelper:
2441
"""
2542
Main utility class for managing HTTP operations in BBOT. Uses blasthttp (Rust) as the
@@ -297,23 +314,26 @@ async def request(self, *args, **kwargs):
297314
log.trace(traceback.format_exc())
298315
raise
299316

300-
async def request_batch(self, urls, threads=10, **kwargs):
317+
async def request_batch_stream(self, urls, threads=10, **kwargs):
301318
"""
302-
Request multiple URLs in parallel via blasthttp's native Rust batch engine.
319+
Request multiple URLs in parallel via blasthttp's native Rust batch engine,
320+
yielding each response as soon as it completes (completion order, not input
321+
order).
303322
304323
Applies the same header/cookie/proxy/timeout logic as ``request()`` — each
305-
entry is translated into a ``blasthttp.BatchConfig`` and sent to Rust in one
306-
shot. Results are returned as a list (not streamed).
324+
entry is translated into a ``blasthttp.BatchConfig`` and dispatched through
325+
``blasthttp.request_batch_stream``. A slow request no longer blocks faster
326+
peers behind it, and Python work overlaps with in-flight HTTP I/O.
307327
308328
Each entry in ``urls`` can be:
309329
- A plain URL string (uses shared ``**kwargs`` for all requests)
310330
- A ``(url, per_request_kwargs)`` tuple for per-request options
311331
- A ``(url, per_request_kwargs, tracker)`` tuple to attach arbitrary
312-
tracking data that is returned alongside the response
332+
tracking data that is yielded alongside the response
313333
314-
Returns:
315-
When entries are plain strings: ``list[(url, response)]``
316-
When any entry includes a tracker: ``list[(url, response, tracker)]``
334+
Yields:
335+
When entries are plain strings: ``(url, response)``
336+
When any entry includes a tracker: ``(url, response, tracker)``
317337
318338
Args:
319339
urls: URLs to visit — strings or ``(url, kwargs[, tracker])`` tuples.
@@ -324,15 +344,13 @@ async def request_batch(self, urls, threads=10, **kwargs):
324344
Examples:
325345
Simple (shared kwargs)::
326346
327-
results = await self.helpers.request_batch(urls, headers={"X-Test": "Test"})
328-
for url, response in results:
347+
async for url, response in self.helpers.request_batch_stream(urls, headers={"X-Test": "Test"}):
329348
...
330349
331350
Per-request kwargs with tracker::
332351
333352
reqs = [("http://example.com", {"method": "POST"}, "my-tracker")]
334-
results = await self.helpers.request_batch(reqs)
335-
for url, response, tracker in results:
353+
async for url, response, tracker in self.helpers.request_batch_stream(reqs):
336354
...
337355
"""
338356
import blasthttp
@@ -354,33 +372,33 @@ async def request_batch(self, urls, threads=10, **kwargs):
354372
entries.append((str(entry), kwargs, None))
355373

356374
if not entries:
357-
return []
375+
return
376+
377+
# Build BatchConfig objects using the same logic as request().
378+
# Map each config URL back to a queue of trackers so we can correlate
379+
# completion-order results to original entries even when multiple entries
380+
# share a URL.
381+
from collections import deque
358382

359-
# Build BatchConfig objects using the same logic as request()
360383
configs = []
361-
trackers = []
384+
trackers_by_url = {}
362385
for url, req_kwargs, tracker in entries:
363386
url, method, blast_kwargs = self._build_blasthttp_kwargs(url, **req_kwargs)
364387
config = blasthttp.BatchConfig(url, **blast_kwargs)
365388
configs.append(config)
366-
trackers.append(tracker)
367-
368-
# Send to Rust — all I/O happens here
369-
batch_results = await self.client.request_batch(configs, concurrency=threads)
389+
trackers_by_url.setdefault(config.url, deque()).append(tracker)
370390

371-
# Convert to (url, response[, tracker]) tuples
372-
# Results are returned in the same order as configs
373-
results = []
374-
for i, br in enumerate(batch_results):
391+
async for br in iter_batch_results(self.client.request_batch_stream(configs, concurrency=threads)):
375392
if br.response is not None:
376393
response = BlasthttpResponse(br.response, request_url=br.url, method="GET")
377394
else:
378395
response = None
379396
if has_tracker:
380-
results.append((br.url, response, trackers[i]))
397+
queue = trackers_by_url.get(br.url)
398+
tracker = queue.popleft() if queue else None
399+
yield br.url, response, tracker
381400
else:
382-
results.append((br.url, response))
383-
return results
401+
yield br.url, response
384402

385403
async def download(self, url, **kwargs):
386404
"""

bbot/modules/git.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async def handle_event(self, event):
2424
self.helpers.urljoin(base_url, ".git/config"),
2525
self.helpers.urljoin(f"{base_url}/", ".git/config"),
2626
}
27-
for url, response in await self.helpers.request_batch(urls):
27+
async for url, response in self.helpers.request_batch_stream(urls):
2828
text = getattr(response, "text", "")
2929
if not text:
3030
text = ""

bbot/modules/http.py

Lines changed: 118 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import blasthttp
66

7+
from bbot.core.helpers.web.web import iter_batch_results
78
from bbot.modules.base import BaseModule
89

910

@@ -174,10 +175,75 @@ def _response_to_json(self, url_input, response):
174175

175176
return j
176177

178+
async def _process_result(self, result, parent_event):
179+
"""Emit URL + HTTP_RESPONSE events for one batch result. Returns True if status was usable."""
180+
if not result.success:
181+
self.debug(f"blasthttp error for {result.url}: {result.error}")
182+
return False
183+
184+
response = result.response
185+
status_code = response.status
186+
if status_code == 0:
187+
self.debug(f'No HTTP status code for "{result.url}"')
188+
return False
189+
190+
url = response.url
191+
192+
# The "input" field represents the original scan target (host:port),
193+
# not the full URL. Other modules and output consumers use this to
194+
# correlate responses back to the target that produced them.
195+
input_parsed = urlparse(result.url)
196+
url_input = input_parsed.netloc or result.url
197+
j = self._response_to_json(url_input, response)
198+
199+
# discard 404s from unverified URLs
200+
path = j.get("path", "/")
201+
if parent_event.type == "URL_UNVERIFIED" and status_code in (404,) and path != "/":
202+
self.debug(f'Discarding 404 from "{url}"')
203+
return True
204+
205+
tags = [f"status-{status_code}"]
206+
url_context = "{module} visited {event.parent.data} and got status code {event.http_status}"
207+
if parent_event.type == "OPEN_TCP_PORT":
208+
url_context += " at {event.data}"
209+
210+
url_event = self.make_event(url, "URL", parent_event, tags=tags, context=url_context)
211+
if url_event:
212+
response_ip = j.get("host", "")
213+
if response_ip:
214+
url_event._resolved_hosts.add(response_ip)
215+
title = j.get("title", "")
216+
if title:
217+
url_event.http_title = title
218+
location = j.get("location", "")
219+
if location:
220+
url_event.redirect_location = location
221+
if url_event != parent_event:
222+
await self.emit_event(url_event)
223+
content_type = j.get("header", {}).get("content_type", "unspecified").split(";")[0]
224+
content_length = self.helpers.bytes_to_human(j.get("content_length", 0))
225+
await self.emit_event(
226+
j,
227+
"HTTP_RESPONSE",
228+
url_event,
229+
tags=url_event.tags,
230+
context=f"HTTP_RESPONSE was {content_length} with {content_type} content type",
231+
)
232+
233+
if self.store_responses:
234+
response_dir = self.scan.home / "http_responses"
235+
self.helpers.mkdir(response_dir)
236+
filename = f"{j['host']}.{urlparse(url).port or 443}{path.replace('/', '[slash]')}.txt"
237+
response_file = response_dir / filename
238+
response_file.write_text(j.get("raw_header", "") + j.get("body", ""))
239+
return True
240+
177241
async def handle_batch(self, *events):
178242
stdin = {}
179243
# Track dual-scheme probes from OPEN_TCP_PORT: {(host, port): {"http": url, "https": url}}
180244
port_probes = {}
245+
# Reverse index: each paired probe URL → its (host, port) key
246+
paired_probe_urls = {}
181247

182248
for event in events:
183249
urls, url_hash = self.make_url_metadata(event)
@@ -190,6 +256,13 @@ async def handle_batch(self, *events):
190256
scheme = "https" if url.startswith("https://") else "http"
191257
port_probes[key][scheme] = url
192258

259+
# Only ports with BOTH schemes are subject to suppression — single-scheme
260+
# OPEN_TCP_PORT probes (rare, but possible) stream through normally.
261+
for key, schemes in port_probes.items():
262+
if "http" in schemes and "https" in schemes:
263+
paired_probe_urls[schemes["http"]] = key
264+
paired_probe_urls[schemes["https"]] = key
265+
193266
if not stdin:
194267
return
195268

@@ -198,7 +271,6 @@ async def handle_batch(self, *events):
198271
timeout = self.scan.blasthttp_timeout
199272
retries = self.scan.blasthttp_retries
200273

201-
# Build batch configs
202274
configs = []
203275
for url in stdin:
204276
config = blasthttp.BatchConfig(
@@ -212,110 +284,51 @@ async def handle_batch(self, *events):
212284
)
213285
configs.append(config)
214286

215-
# blasthttp batch returns a native coroutine via pyo3-async-runtimes
216-
results = await self.client.request_batch(configs, self.threads)
217-
218-
# For OPEN_TCP_PORT probes, suppress redundant https when http already succeeded.
219-
# When probing an unknown port, we try both http:// and https://. If http works,
220-
# the port definitely speaks HTTP — the https result may be a proxy artifact
221-
# (intercepting proxies like Burp terminate TLS themselves, making any https://
222-
# URL "succeed" regardless of whether the target actually speaks TLS).
223-
# If http fails but https succeeds, the port genuinely speaks TLS.
224-
# Explicit URLs (URL_UNVERIFIED/URL) are never suppressed — this only applies
225-
# to speculative OPEN_TCP_PORT probes.
226-
suppressed_urls = set()
227-
if port_probes:
228-
successful_urls = {r.url for r in results if r.success and r.response.status != 0}
229-
for key, schemes in port_probes.items():
230-
http_url = schemes.get("http")
231-
https_url = schemes.get("https")
232-
if not (http_url and https_url):
287+
# Suppress redundant https probes when http already succeeded for the same
288+
# (host, port). When probing an unknown port, we try both schemes; if http
289+
# works, the port definitely speaks HTTP, and the https result is likely a
290+
# proxy artifact (intercepting proxies like Burp terminate TLS themselves,
291+
# making any https:// URL "succeed" regardless of whether the target really
292+
# speaks TLS). Explicit URL/URL_UNVERIFIED events are never suppressed —
293+
# only speculative OPEN_TCP_PORT probes.
294+
#
295+
# Streaming requires per-pair coordination: emit http immediately, defer
296+
# https until http's outcome is known (or the stream ends).
297+
http_succeeded = {} # key -> bool, set when http result arrives
298+
deferred_https = {} # key -> result, awaiting http verdict
299+
300+
async def resolve_https(key, result):
301+
if http_succeeded.get(key) and result.success and result.response.status != 0:
302+
self.debug(f"Suppressing https probe {result.url} (http already succeeded for {key})")
303+
return
304+
await self._process_result(result, stdin[result.url])
305+
306+
async for result in iter_batch_results(self.client.request_batch_stream(configs, concurrency=self.threads)):
307+
key = paired_probe_urls.get(result.url)
308+
if key is None:
309+
# Non-paired URL — emit immediately
310+
parent_event = stdin.get(result.url)
311+
if parent_event is None:
312+
self.warning(f"Unable to correlate parent event for: {result.url}")
233313
continue
234-
if http_url in successful_urls and https_url in successful_urls:
235-
self.debug(f"Suppressing https probe {https_url} (http already succeeded: {http_url})")
236-
suppressed_urls.add(https_url)
237-
238-
for i in range(len(results)):
239-
result = results[i]
240-
results[i] = None # free response body memory as we go
241-
if not result.success:
242-
self.debug(f"blasthttp error for {result.url}: {result.error}")
243-
continue
244-
245-
response = result.response
246-
status_code = response.status
247-
if status_code == 0:
248-
self.debug(f'No HTTP status code for "{result.url}"')
249-
continue
250-
251-
if result.url in suppressed_urls:
314+
await self._process_result(result, parent_event)
252315
continue
253316

254-
# Map back to parent event using the input URL
255-
parent_event = stdin.get(result.url, None)
256-
257-
if parent_event is None:
258-
self.warning(f"Unable to correlate parent event for: {result.url}")
259-
continue
260-
261-
url = response.url
262-
263-
# Build JSON dict for HTTP_RESPONSE event
264-
# The "input" field represents the original scan target (host:port),
265-
# not the full URL. Other modules and output consumers use this to
266-
# correlate responses back to the target that produced them.
267-
input_parsed = urlparse(result.url)
268-
url_input = input_parsed.netloc or result.url
269-
j = self._response_to_json(url_input, response)
270-
271-
# discard 404s from unverified URLs
272-
path = j.get("path", "/")
273-
if parent_event.type == "URL_UNVERIFIED" and status_code in (404,) and path != "/":
274-
self.debug(f'Discarding 404 from "{url}"')
275-
continue
276-
277-
# main URL
278-
tags = [f"status-{status_code}"]
279-
280-
url_context = "{module} visited {event.parent.data} and got status code {event.http_status}"
281-
if parent_event.type == "OPEN_TCP_PORT":
282-
url_context += " at {event.data}"
283-
284-
url_event = self.make_event(
285-
url,
286-
"URL",
287-
parent_event,
288-
tags=tags,
289-
context=url_context,
290-
)
291-
if url_event:
292-
response_ip = j.get("host", "")
293-
if response_ip:
294-
url_event._resolved_hosts.add(response_ip)
295-
title = j.get("title", "")
296-
if title:
297-
url_event.http_title = title
298-
location = j.get("location", "")
299-
if location:
300-
url_event.redirect_location = location
301-
if url_event != parent_event:
302-
await self.emit_event(url_event)
303-
# HTTP response
304-
content_type = j.get("header", {}).get("content_type", "unspecified").split(";")[0]
305-
content_length = j.get("content_length", 0)
306-
content_length = self.helpers.bytes_to_human(content_length)
307-
await self.emit_event(
308-
j,
309-
"HTTP_RESPONSE",
310-
url_event,
311-
tags=url_event.tags,
312-
context=f"HTTP_RESPONSE was {content_length} with {content_type} content type",
313-
)
314-
315-
# Store responses if configured
316-
if self.store_responses:
317-
response_dir = self.scan.home / "http_responses"
318-
self.helpers.mkdir(response_dir)
319-
filename = f"{j['host']}.{urlparse(url).port or 443}{path.replace('/', '[slash]')}.txt"
320-
response_file = response_dir / filename
321-
response_file.write_text(j.get("raw_header", "") + j.get("body", ""))
317+
# Paired OPEN_TCP_PORT probe
318+
is_http = result.url == port_probes[key]["http"]
319+
if is_http:
320+
http_succeeded[key] = result.success and result.response is not None and result.response.status != 0
321+
await self._process_result(result, stdin[result.url])
322+
# If https for this key arrived first and was buffered, resolve it now
323+
pending = deferred_https.pop(key, None)
324+
if pending is not None:
325+
await resolve_https(key, pending)
326+
else: # is https
327+
if key in http_succeeded:
328+
await resolve_https(key, result)
329+
else:
330+
deferred_https[key] = result
331+
332+
# Stream ended — any leftover https had no http result, so emit unconditionally
333+
for key, result in deferred_https.items():
334+
await self._process_result(result, stdin[result.url])

0 commit comments

Comments
 (0)