Skip to content

Commit 77f07d9

Browse files
blashttp request_batch_stream
1 parent 554feb4 commit 77f07d9

13 files changed

Lines changed: 199 additions & 174 deletions

File tree

bbot/core/helpers/web/web.py

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,23 @@
2020
log = logging.getLogger("bbot.core.helpers.web")
2121

2222

23+
async def iter_batch_results(stream):
24+
"""
25+
Yield individual ``BatchResult`` objects from a ``request_batch_stream`` iterator.
26+
27+
The native blasthttp 0.4.0 iterator yields lists of ``BatchResult`` (drained in
28+
chunks of 1000 / 200ms to amortize the Python↔Rust boundary). A future Python
29+
wrapper is expected to unwrap these into individual items. This helper handles
30+
both shapes so callers can write a single ``async for`` loop.
31+
"""
32+
async for item in stream:
33+
if isinstance(item, list):
34+
for r in item:
35+
yield r
36+
else:
37+
yield item
38+
39+
2340
class WebHelper:
2441
"""
2542
Main utility class for managing HTTP operations in BBOT. Uses blasthttp (Rust) as the
@@ -297,23 +314,26 @@ async def request(self, *args, **kwargs):
297314
log.trace(traceback.format_exc())
298315
raise
299316

300-
async def request_batch(self, urls, threads=10, **kwargs):
317+
async def request_batch_stream(self, urls, threads=10, **kwargs):
301318
"""
302-
Request multiple URLs in parallel via blasthttp's native Rust batch engine.
319+
Request multiple URLs in parallel via blasthttp's native Rust batch engine,
320+
yielding each response as soon as it completes (completion order, not input
321+
order).
303322
304323
Applies the same header/cookie/proxy/timeout logic as ``request()`` — each
305-
entry is translated into a ``blasthttp.BatchConfig`` and sent to Rust in one
306-
shot. Results are returned as a list (not streamed).
324+
entry is translated into a ``blasthttp.BatchConfig`` and dispatched through
325+
``blasthttp.request_batch_stream``. A slow request no longer blocks faster
326+
peers behind it, and Python work overlaps with in-flight HTTP I/O.
307327
308328
Each entry in ``urls`` can be:
309329
- A plain URL string (uses shared ``**kwargs`` for all requests)
310330
- A ``(url, per_request_kwargs)`` tuple for per-request options
311331
- A ``(url, per_request_kwargs, tracker)`` tuple to attach arbitrary
312-
tracking data that is returned alongside the response
332+
tracking data that is yielded alongside the response
313333
314-
Returns:
315-
When entries are plain strings: ``list[(url, response)]``
316-
When any entry includes a tracker: ``list[(url, response, tracker)]``
334+
Yields:
335+
When entries are plain strings: ``(url, response)``
336+
When any entry includes a tracker: ``(url, response, tracker)``
317337
318338
Args:
319339
urls: URLs to visit — strings or ``(url, kwargs[, tracker])`` tuples.
@@ -324,15 +344,13 @@ async def request_batch(self, urls, threads=10, **kwargs):
324344
Examples:
325345
Simple (shared kwargs)::
326346
327-
results = await self.helpers.request_batch(urls, headers={"X-Test": "Test"})
328-
for url, response in results:
347+
async for url, response in self.helpers.request_batch_stream(urls, headers={"X-Test": "Test"}):
329348
...
330349
331350
Per-request kwargs with tracker::
332351
333352
reqs = [("http://example.com", {"method": "POST"}, "my-tracker")]
334-
results = await self.helpers.request_batch(reqs)
335-
for url, response, tracker in results:
353+
async for url, response, tracker in self.helpers.request_batch_stream(reqs):
336354
...
337355
"""
338356
import blasthttp
@@ -354,33 +372,33 @@ async def request_batch(self, urls, threads=10, **kwargs):
354372
entries.append((str(entry), kwargs, None))
355373

356374
if not entries:
357-
return []
375+
return
376+
377+
# Build BatchConfig objects using the same logic as request().
378+
# Map each config URL back to a queue of trackers so we can correlate
379+
# completion-order results to original entries even when multiple entries
380+
# share a URL.
381+
from collections import deque
358382

359-
# Build BatchConfig objects using the same logic as request()
360383
configs = []
361-
trackers = []
384+
trackers_by_url = {}
362385
for url, req_kwargs, tracker in entries:
363386
url, method, blast_kwargs = self._build_blasthttp_kwargs(url, **req_kwargs)
364387
config = blasthttp.BatchConfig(url, **blast_kwargs)
365388
configs.append(config)
366-
trackers.append(tracker)
367-
368-
# Send to Rust — all I/O happens here
369-
batch_results = await self.client.request_batch(configs, concurrency=threads)
389+
trackers_by_url.setdefault(config.url, deque()).append(tracker)
370390

371-
# Convert to (url, response[, tracker]) tuples
372-
# Results are returned in the same order as configs
373-
results = []
374-
for i, br in enumerate(batch_results):
391+
async for br in iter_batch_results(self.client.request_batch_stream(configs, concurrency=threads)):
375392
if br.response is not None:
376393
response = BlasthttpResponse(br.response, request_url=br.url, method="GET")
377394
else:
378395
response = None
379396
if has_tracker:
380-
results.append((br.url, response, trackers[i]))
397+
queue = trackers_by_url.get(br.url)
398+
tracker = queue.popleft() if queue else None
399+
yield br.url, response, tracker
381400
else:
382-
results.append((br.url, response))
383-
return results
401+
yield br.url, response
384402

385403
async def download(self, url, **kwargs):
386404
"""

bbot/modules/git.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async def handle_event(self, event):
2424
self.helpers.urljoin(base_url, ".git/config"),
2525
self.helpers.urljoin(f"{base_url}/", ".git/config"),
2626
}
27-
for url, response in await self.helpers.request_batch(urls):
27+
async for url, response in self.helpers.request_batch_stream(urls):
2828
text = getattr(response, "text", "")
2929
if not text:
3030
text = ""

bbot/modules/http.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import blasthttp
66

7+
from bbot.core.helpers.web.web import iter_batch_results
78
from bbot.modules.base import BaseModule
89

910

@@ -212,8 +213,12 @@ async def handle_batch(self, *events):
212213
)
213214
configs.append(config)
214215

215-
# blasthttp batch returns a native coroutine via pyo3-async-runtimes
216-
results = await self.client.request_batch(configs, self.threads)
216+
# Drain the streaming batch into a list — we need every result in hand
217+
# before we can decide http/https suppression for paired OPEN_TCP_PORT probes.
218+
# Python conversion still overlaps with in-flight HTTP I/O via the stream.
219+
results = []
220+
async for r in iter_batch_results(self.client.request_batch_stream(configs, concurrency=self.threads)):
221+
results.append(r)
217222

218223
# For OPEN_TCP_PORT probes, suppress redundant https when http already succeeded.
219224
# When probing an unknown port, we try both http:// and https://. If http works,

bbot/modules/iis_shortnames.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ async def solve_valid_chars(self, method, target, affirmative_status_code):
142142
url = f"{target}{payload}{suffix}"
143143
urls_and_kwargs.append((url, kwargs, (c, file_part)))
144144

145-
for url, response, (c, file_part) in await self.helpers.request_batch(urls_and_kwargs):
145+
async for url, response, (c, file_part) in self.helpers.request_batch_stream(urls_and_kwargs):
146146
if response is not None:
147147
if response.status_code == affirmative_status_code:
148148
if file_part == "stem":
@@ -183,7 +183,7 @@ async def solve_shortname_recursive(
183183
kwargs = {"method": method}
184184
urls_and_kwargs.append((url, kwargs, c))
185185

186-
for url, response, c in await self.helpers.request_batch(urls_and_kwargs):
186+
async for url, response, c in self.helpers.request_batch_stream(urls_and_kwargs):
187187
if response is not None:
188188
if response.status_code == affirmative_status_code:
189189
found_results = True

bbot/modules/ntlm.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,9 @@ async def handle_event(self, event):
9696
urls.add(f"{event.parsed_url.scheme}://{event.parsed_url.netloc}/{endpoint}")
9797

9898
num_urls = len(urls)
99-
results = await self.helpers.request_batch(
99+
async for url, response in self.helpers.request_batch_stream(
100100
urls, headers=NTLM_test_header, allow_redirects=False, timeout=self.http_timeout
101-
)
102-
for url, response in results:
101+
):
103102
ntlm_resp = response.headers.get("WWW-Authenticate", "")
104103
if not ntlm_resp:
105104
continue

bbot/modules/pgp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async def query(self, query):
3737
results = set()
3838
urls = self.config.get("search_urls", [])
3939
urls = [url.replace("<query>", self.helpers.quote(query)) for url in urls]
40-
for url, response in await self.helpers.request_batch(urls):
40+
async for url, response in self.helpers.request_batch_stream(urls):
4141
keyserver = self.helpers.urlparse(url).netloc
4242
if response is not None:
4343
for email in await self.helpers.re.extract_emails(response.text):

bbot/modules/telerik.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,8 @@ async def handle_event(self, event):
299299
url = self.create_url(base_url, f"{dh}?dp=1")
300300
urls[url] = dh
301301

302-
results = await self.helpers.request_batch(list(urls))
303302
fail_count = 0
304-
for url, response in results:
303+
async for url, response in self.helpers.request_batch_stream(list(urls)):
305304
# cancel if we run into timeouts etc.
306305
if response is None:
307306
fail_count += 1

bbot/modules/templates/bucket.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,9 @@ async def brute_buckets(self, buckets, permutations=False, omit_base=False):
131131
for bucket_name in new_buckets:
132132
url, kwargs = self.build_bucket_request(bucket_name, base_domain, region)
133133
bucket_urls_kwargs.append((url, kwargs, (bucket_name, base_domain, region)))
134-
for url, response, (bucket_name, base_domain, region) in await self.helpers.request_batch(bucket_urls_kwargs):
134+
async for url, response, (bucket_name, base_domain, region) in self.helpers.request_batch_stream(
135+
bucket_urls_kwargs
136+
):
135137
existent_bucket, tags = self._check_bucket_exists(bucket_name, response)
136138
if existent_bucket:
137139
yield bucket_name, url, tags, num_buckets

bbot/modules/web_brute.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import blasthttp
55

6+
from bbot.core.helpers.web.web import iter_batch_results
67
from bbot.modules.base import BaseModule
78

89

@@ -182,8 +183,9 @@ async def baseline_fuzz(self, url, exts=None, prefix="", suffix=""):
182183

183184
canary_results = []
184185
canary_waf_count = 0
185-
results = await self.blast_client.request_batch(canary_configs, 4, rate_limit=self.rate)
186-
for result in results:
186+
async for result in iter_batch_results(
187+
self.blast_client.request_batch_stream(canary_configs, 4, rate_limit=self.rate)
188+
):
187189
if result.success:
188190
canary_results.append(self._batch_response_metrics(result.response))
189191
if await self.helpers.yara.match(self.waf_yara_rules, result.response.body):
@@ -313,12 +315,13 @@ async def execute_fuzz(
313315

314316
self.debug(f"Fuzzing {len(configs)} URLs for ext [{ext}]")
315317

316-
# Fire all requests via native blasthttp batch (Rust concurrency)
317-
results = await self.blast_client.request_batch(configs, self.concurrency, rate_limit=self.rate)
318-
319-
# Index results by URL for ordered processing
318+
# Fire all requests via native blasthttp batch (Rust concurrency).
319+
# Stream results into a URL-keyed dict so we can re-process them in
320+
# wordlist order (canary appended last) below.
320321
results_by_url = {}
321-
for result in results:
322+
async for result in iter_batch_results(
323+
self.blast_client.request_batch_stream(configs, self.concurrency, rate_limit=self.rate)
324+
):
322325
results_by_url[result.url] = result
323326

324327
# Process in wordlist order so canary (appended last) is checked last
@@ -389,9 +392,13 @@ async def execute_fuzz(
389392
proxy=proxy,
390393
)
391394
]
392-
canary_batch = await self.blast_client.request_batch(canary_configs, 1, rate_limit=self.rate)
393-
if canary_batch and canary_batch[0].success:
394-
canary_metrics = self._batch_response_metrics(canary_batch[0].response)
395+
canary_result = None
396+
async for r in iter_batch_results(
397+
self.blast_client.request_batch_stream(canary_configs, 1, rate_limit=self.rate)
398+
):
399+
canary_result = r
400+
if canary_result is not None and canary_result.success:
401+
canary_metrics = self._batch_response_metrics(canary_result.response)
395402
if not self._is_baseline_match(canary_metrics, ext_filter):
396403
self.verbose(
397404
f"Would have reported {len(hits)} hit(s), but mid-scan baseline check failed. "

bbot/test/conftest.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,11 @@ async def patched_request(self, *args, **kwargs):
132132
return result
133133
return await original_request(self, *args, **kwargs)
134134

135-
original_request_batch = WebHelper.request_batch
135+
original_request_batch_stream = WebHelper.request_batch_stream
136136

137-
async def patched_request_batch(self, urls, threads=10, **kwargs):
137+
async def patched_request_batch_stream(self, urls, threads=10, **kwargs):
138138
import blasthttp
139+
from collections import deque
139140

140141
# Run the real entry-parsing and config-building logic unmodified
141142
entries = []
@@ -154,38 +155,35 @@ async def patched_request_batch(self, urls, threads=10, **kwargs):
154155
entries.append((str(entry), kwargs, None))
155156

156157
if not entries:
157-
return []
158+
return
158159

159160
configs = []
160-
trackers = []
161+
trackers_by_url = {}
161162
for url, req_kwargs, tracker in entries:
162163
url, method, blast_kwargs = self._build_blasthttp_kwargs(url, **req_kwargs)
163164
config = blasthttp.BatchConfig(url, **blast_kwargs)
164165
configs.append(config)
165-
trackers.append(tracker)
166-
167-
# Route through mock's batch handler instead of Rust client directly
168-
batch_results = await mock.handle_batch(self.client, configs, concurrency=threads)
166+
trackers_by_url.setdefault(config.url, deque()).append(tracker)
169167

170168
from bbot.core.helpers.web.blast_response import BlasthttpResponse
171169

172-
results = []
173-
for i, br in enumerate(batch_results):
170+
async for br in mock.handle_batch_stream(self.client, configs, concurrency=threads):
174171
if br.response is not None:
175172
response = BlasthttpResponse(br.response, request_url=br.url, method="GET")
176173
else:
177174
response = None
178175
if has_tracker:
179-
results.append((br.url, response, trackers[i]))
176+
queue = trackers_by_url.get(br.url)
177+
tracker = queue.popleft() if queue else None
178+
yield br.url, response, tracker
180179
else:
181-
results.append((br.url, response))
182-
return results
180+
yield br.url, response
183181

184182
WebHelper.request = patched_request
185-
WebHelper.request_batch = patched_request_batch
183+
WebHelper.request_batch_stream = patched_request_batch_stream
186184
yield mock
187185
WebHelper.request = original_request
188-
WebHelper.request_batch = original_request_batch
186+
WebHelper.request_batch_stream = original_request_batch_stream
189187

190188

191189
@pytest.fixture

0 commit comments

Comments
 (0)