Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
39d8a5a
add parameter emmision to wayback
liquidsec Feb 17, 2026
27f1409
mods to the wayback parameter extraction
liquidsec Feb 17, 2026
aac9674
more features / bug fixes for new wayback
liquidsec Feb 18, 2026
251d48d
allow from-wayback tag to propagate
liquidsec Feb 18, 2026
ca211fc
update docs for wayback
liquidsec Feb 18, 2026
de97820
add waf string 4xx filtering
liquidsec Feb 18, 2026
d448bca
add Akamai WAF string to waf_strings helper
liquidsec Feb 18, 2026
9b81791
add directory listing excavate submodule
liquidsec Feb 18, 2026
1991e28
improve wayback CDX error logging and increase timeout
liquidsec Feb 18, 2026
5d0cde7
add rate limiting, retry, and bloom filter dedup to wayback archive f…
liquidsec Feb 19, 2026
c0ccda5
add CDX server-side filters and 100k URL limit to wayback module
liquidsec Feb 19, 2026
b55d2a1
fixing wayback rate limiting
liquidsec Feb 19, 2026
ab154c8
improving wayback delay system
liquidsec Feb 19, 2026
454f169
make cpu heavy processing non-blocking
liquidsec Feb 19, 2026
ec111a9
make max_records configurable, fix archive retry logic, demote log level
liquidsec Feb 19, 2026
630bbb5
fix _event_host() using resolved IP instead of URL hostname
liquidsec Feb 20, 2026
b11d3ef
skip URL collapse when there are no URLs to process
liquidsec Feb 21, 2026
4ab6588
ruff format
liquidsec Feb 23, 2026
786bc09
add timeout and recovery protections to run_in_executor_mp
liquidsec Feb 23, 2026
d4e7a58
ruff check fixes
liquidsec Feb 25, 2026
036a187
Fix three test failures: dedup, validation, and wildcard defense
liquidsec Mar 6, 2026
e72cc93
Speed up wayback archive fetching with HEAD pre-check and reactive ra…
liquidsec Mar 15, 2026
cb53d78
Fix pytest hanging after test_run_in_executor_mp
liquidsec Mar 23, 2026
86e4ae9
Kill stuck process pool workers on timeout instead of leaking them
liquidsec Mar 25, 2026
5e917fe
Fix wayback module for URL DictHostEvent migration, rename wayback-in…
liquidsec Mar 26, 2026
9b3b3f4
Replace archive_url inheritance with tag-based parent traversal
liquidsec Mar 30, 2026
bce4b86
Fix tests for blasthttp migration: httpx_mock→blasthttp_mock, whiteli…
liquidsec Apr 3, 2026
5758f3b
Merge branch 'blasthttp-integration-clean' into wayback-upgrade
liquidsec Apr 3, 2026
75c7cb9
Merge branch 'blasthttp-integration-clean' into wayback-upgrade
liquidsec Apr 3, 2026
25d60bc
Merge branch 'blasthttp-integration-clean' into wayback-upgrade
liquidsec Apr 3, 2026
670766d
Merge branch 'blasthttp-integration-clean' into wayback-upgrade
liquidsec Apr 16, 2026
a23a312
Skip non-HTTP archived URLs in wayback, truncate sanitization warnings
liquidsec Apr 16, 2026
246f74e
Merge branch 'blasthttp-integration-clean' into wayback-upgrade
liquidsec Apr 16, 2026
53cbb3d
Merge branch 'blasthttp-integration-clean' into wayback-upgrade
liquidsec Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ def __init__(
self.data = self._sanitize_data(data)
except Exception as e:
log.trace(traceback.format_exc())
raise ValidationError(f'Error sanitizing event data "{data}" for type "{self.type}": {e}')
data_preview = str(data)[:200] + "..." if len(str(data)) > 200 else str(data)
raise ValidationError(f'Error sanitizing event data "{data_preview}" for type "{self.type}": {e}')

if not self.data:
raise ValidationError(f'Invalid event data "{data}" for type "{self.type}"')
Expand Down Expand Up @@ -626,7 +627,7 @@ def parent(self, parent):
self.web_spider_distance = getattr(parent, "web_spider_distance", 0)
event_has_url = getattr(self, "parsed_url", None) is not None
for t in parent.tags:
if t in ("affiliate",):
if t in ("affiliate", "from-wayback"):
self.add_tag(t)
elif t.startswith("mutation-"):
self.add_tag(t)
Expand Down Expand Up @@ -655,6 +656,26 @@ def parent_uuid(self):
return parent_uuid
return self._parent_uuid

@property
def archive_url(self):
"""Traverse the parent chain to find the nearest archive_url.

The 'from-wayback' tag signals that this event descends from archived content.
The actual archive URL is stored only in the data dict of the originating
wayback HTTP_RESPONSE; this property walks upward to find it.
"""
if "from-wayback" not in self.tags:
return None
event = self
while event is not None:
if isinstance(event.data, dict) and "archive_url" in event.data:
return event.data["archive_url"]
parent = getattr(event, "parent", None)
if parent is None or parent is event:
break
event = parent
return None

@property
def validators(self):
"""
Expand Down Expand Up @@ -1783,6 +1804,7 @@ class _data_validator(BaseModel):
full_url: Optional[str] = None
path: Optional[str] = None
cves: Optional[list[str]] = None
archive_url: Optional[str] = None
_validate_url = field_validator("url")(validators.validate_url)
_validate_host = field_validator("host")(validators.validate_host)
_validate_severity = field_validator("severity")(validators.validate_severity)
Expand Down Expand Up @@ -2185,7 +2207,8 @@ def make_event(
data = validators.validate_host(data)
except Exception as e:
log.trace(traceback.format_exc())
raise ValidationError(f'Error sanitizing event data "{data}" for type "{event_type}": {e}')
data_preview = str(data)[:200] + "..." if len(str(data)) > 200 else str(data)
raise ValidationError(f'Error sanitizing event data "{data_preview}" for type "{event_type}": {e}')
data_is_ip = is_ip(data)
if event_type == "DNS_NAME" and data_is_ip:
event_type = "IP_ADDRESS"
Expand Down
69 changes: 59 additions & 10 deletions bbot/core/helpers/helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import sys
import asyncio
import logging
from pathlib import Path
import multiprocessing as mp
Expand Down Expand Up @@ -75,15 +77,12 @@ def __init__(self, preset):

self._loop = None

# multiprocessing thread pool
# multiprocessing process pool
start_method = mp.get_start_method()
if start_method != "spawn":
self.warning(f"Multiprocessing spawn method is set to {start_method}.")

# we spawn 1 fewer processes than cores
# this helps to avoid locking up the system or competing with the main python process for cpu time
num_processes = max(1, mp.cpu_count() - 1)
self.process_pool = ProcessPoolExecutor(max_workers=num_processes)
self.process_pool = self._create_process_pool()
self._pool_reset_lock = asyncio.Lock()

self._cloud = None
self._blasthttp_client = None
Expand Down Expand Up @@ -214,6 +213,18 @@ def loop(self):
self._loop.set_default_executor(self._io_executor)
return self._loop

@staticmethod
def _create_process_pool():
# we spawn 1 fewer processes than cores
# this helps to avoid locking up the system or competing with the main python process for cpu time
num_processes = max(1, mp.cpu_count() - 1)
pool_kwargs = {"max_workers": num_processes}
# max_tasks_per_child replaces workers after N tasks, preventing memory leaks
# and reducing the chance of a degraded worker process causing hangs
if sys.version_info >= (3, 11):
pool_kwargs["max_tasks_per_child"] = 25
return ProcessPoolExecutor(**pool_kwargs)

def run_in_executor_io(self, callback, *args, **kwargs):
"""
Run a synchronous task in the event loop's default thread pool executor
Expand All @@ -237,17 +248,55 @@ def run_in_executor_cpu(self, callback, *args, **kwargs):
callback = partial(callback, **kwargs)
return self.loop.run_in_executor(self._cpu_executor, callback, *args)

def run_in_executor_mp(self, callback, *args, **kwargs):
async def run_in_executor_mp(self, callback, *args, **kwargs):
"""
Same as run_in_executor_io() except with a process pool executor
Use only in cases where callback is CPU-bound
Same as run_in_executor_io() except with a process pool executor.
Use only in cases where callback is CPU-bound.

Includes a timeout (default 300s) to prevent indefinite hangs if a child process dies or the pool enters a broken state.
On timeout, the entire pool is terminated and replaced so that stuck workers cannot accumulate and starve the scan.

Pass ``_timeout=seconds`` to override the default timeout.

Examples:
Execute callback:
>>> result = await self.helpers.run_in_executor_mp(callback_fn, arg1, arg2)
"""
timeout = kwargs.pop("_timeout", 300)
callback = partial(callback, **kwargs)
return self.loop.run_in_executor(self.process_pool, callback, *args)
future = self.loop.run_in_executor(self.process_pool, callback, *args)
Comment on lines +251 to +267
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout needs to be applied one level deeper, because it cancels the awaiting of the coroutine, but leaves the stuck task executing in the process pool, thereby taking up a child process indefinitely.

  1. We should be executing network/api requests in the main thread
  2. We should only be submitting cpu-bound tasks (i.e. bulk URL parsing) to the process pool

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was already only cpu-bound tasks, there was no network requests here

try:
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
log.warning(f"Process pool task timed out after {timeout}s, killing stuck workers and replacing pool")
await self._reset_process_pool()
raise

async def _reset_process_pool(self):
"""Terminate all workers in the current process pool and replace it.

This is the nuclear option — every in-flight task on the old pool will fail with BrokenProcessPool.
We accept that trade-off because a timeout means something is genuinely broken, and leaving the stuck worker alive would permanently consume a pool slot.

# TODO: Python 3.14 adds ProcessPoolExecutor.terminate_workers()
# and kill_workers() (https://github.com/python/cpython/pull/130849).
# Once we drop 3.13 support we can replace the _processes access
# with those official methods.
"""
async with self._pool_reset_lock:
old_pool = self.process_pool
self.process_pool = self._create_process_pool()
# snapshot workers before shutdown (shutdown sets _processes = None)
workers = list((old_pool._processes or {}).values())
# terminate workers before shutdown so stuck ones don't block
for proc in workers:
if proc.is_alive():
proc.terminate()
old_pool.shutdown(wait=False, cancel_futures=True)
# escalate to SIGKILL for anything that ignored SIGTERM
for proc in workers:
if proc.is_alive():
proc.kill()

@property
def in_tests(self):
Expand Down
1 change: 1 addition & 0 deletions bbot/core/helpers/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2723,6 +2723,7 @@ def get_waf_strings():
return [
"The requested URL was rejected",
"This content has been blocked",
"You don't have permission to access ",
]


Expand Down
1 change: 1 addition & 0 deletions bbot/defaults.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ parameter_blacklist:
- .AspNetCore.Session
- PHPSESSID
- __cf_bm
- _cfuvid
- f5_cspm

parameter_blacklist_prefixes:
Expand Down
8 changes: 8 additions & 0 deletions bbot/modules/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async def setup(self):
self.max_response_size = self.config.get("max_response_size", 5242880)
self.store_responses = self.config.get("store_responses", False)
self.client = self.helpers.blasthttp
self.waf_yara_rule = self.helpers.yara.compile_strings(self.helpers.get_waf_strings(), nocase=True)
return True

async def filter_event(self, event):
Expand Down Expand Up @@ -274,6 +275,13 @@ async def handle_batch(self, *events):
self.debug(f'Discarding 404 from "{url}"')
continue

# discard 4xx responses that contain WAF strings
if 400 <= status_code < 500:
body = j.get("body", "")
if body and await self.helpers.yara.match(self.waf_yara_rule, body):
self.debug(f'Discarding WAF {status_code} from "{url}"')
continue

# main URL
tags = [f"status-{status_code}"]

Expand Down
89 changes: 78 additions & 11 deletions bbot/modules/internal/excavate.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,42 @@ def in_bl(self, value):

return False

def _is_archived(self, event):
"""Check if an event represents archived wayback content."""
return isinstance(event.data, dict) and "archive_url" in event.data

def _event_host(self, event):
"""Get the effective host from an event.

For archived wayback content, data["host"] contains the original target hostname
(since data["url"] points to archive.org). For regular events, we use event.host.

NOTE: Regular HTTP_RESPONSE events also have data["host"], but it contains the
resolved IP from the httpx binary — NOT a hostname override.
"""
if self._is_archived(event) and event.data.get("host"):
return str(event.data["host"])
return str(event.host)

def _event_base_url(self, event):
"""Get the effective base URL from an event.

For archived wayback content, reconstructs the original URL from override fields
(host/scheme/port/path) since parsed_url points to archive.org.
For regular events, returns event.parsed_url directly.
"""
if not self._is_archived(event):
return event.parsed_url
scheme = event.data.get("scheme", event.parsed_url.scheme)
host = self._event_host(event)
port = event.data.get("port")
if port is not None:
port = int(port)
if not ((scheme == "http" and port == 80) or (scheme == "https" and port == 443)):
host = f"{host}:{port}"
path = event.data.get("path", event.parsed_url.path)
return urlparse(f"{scheme}://{host}{path}")

def url_unparse(self, param_type, parsed_url):
# Reconstructs a URL, optionally omitting the query string based on remove_querystring configuration value.
if param_type == "GETPARAM":
Expand Down Expand Up @@ -641,8 +677,9 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte

# The endpoint is usually a form action - we should use it if we have it. If not, default to URL.
else:
# Use the original URL as the base and resolve the endpoint correctly in case of relative paths
base_url = f"{event.parsed_url.scheme}://{event.parsed_url.netloc}{event.parsed_url.path}"
# Use the effective base URL (which may differ from parsed_url for archived content)
event_base = self.excavate._event_base_url(event)
base_url = f"{event_base.scheme}://{event_base.netloc}{event_base.path}"
if not self.excavate.remove_querystring and len(event.parsed_url.query) > 0:
base_url += f"?{event.parsed_url.query}"
url = urljoin(base_url, endpoint)
Expand Down Expand Up @@ -986,6 +1023,34 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte
if yara_results:
event.add_tag("login-page")

class DirectoryListingExtractor(ExcavateRule):
description = "Detects directory listing pages from web servers."
signatures = {
"Apache_Nginx": '"<title>Index of /"',
"IIS": '"[To Parent Directory]"',
"Python_HTTP_Server": '"<h1>Directory listing for"',
"Generic_Directory_Listing": '"<title>Directory Listing"',
}
yara_rules = {}

def __init__(self, excavate):
super().__init__(excavate)
signature_component_list = []
for signature_name, signature in self.signatures.items():
signature_component_list.append(rf"${signature_name} = {signature}")
signature_component = " ".join(signature_component_list)
self.yara_rules["directory_listing"] = (
f'rule directory_listing {{meta: description = "contains a directory listing" strings: {signature_component} condition: any of them}}'
)

async def process(self, yara_results, event, yara_rule_settings, discovery_context):
for identifier in yara_results.keys():
for findings in yara_results[identifier]:
event_data = {
"description": f"{discovery_context} {yara_rule_settings.description} ({identifier})"
}
await self.report(event_data, event, yara_rule_settings, discovery_context, event_type="FINDING")

def add_yara_rule(self, rule_name, rule_content, rule_instance):
rule_instance.name = rule_name
self.yara_rules_dict[rule_name] = rule_content
Expand Down Expand Up @@ -1013,12 +1078,13 @@ async def emit_custom_parameters(self, event, config_key, param_type, descriptio
# Emits WEB_PARAMETER events for custom headers and cookies from the configuration.
custom_params = self.scan.web_config.get(config_key, {})
for param_name, param_value in custom_params.items():
event_base = self._event_base_url(event)
await self.emit_web_parameter(
host=event.parsed_url.hostname,
host=self._event_host(event),
param_type=param_type,
name=param_name,
original_value=param_value,
url=self.url_unparse(param_type, event.parsed_url),
url=self.url_unparse(param_type, event_base),
description=f"HTTP Extracted Parameter [{param_name}] ({description_suffix})",
additional_params=_exclude_key(custom_params, param_name),
event=event,
Expand Down Expand Up @@ -1134,15 +1200,15 @@ async def search(self, data, event, content_type, discovery_context="HTTP respon
if results:
for parameter_name, original_value in results:
await self.emit_web_parameter(
host=str(event.host),
host=self._event_host(event),
param_type="SPECULATIVE",
name=parameter_name,
original_value=original_value,
url=event.url,
description=f"HTTP Extracted Parameter (speculative from {source_type} content) [{parameter_name}]",
additional_params={},
event=event,
context=f"excavate's Parameter extractor found a speculative WEB_PARAMETER: {parameter_name} by parsing {source_type} data from {str(event.host)}",
context=f"excavate's Parameter extractor found a speculative WEB_PARAMETER: {parameter_name} by parsing {source_type} data from {self._event_host(event)}",
)
return

Expand Down Expand Up @@ -1194,7 +1260,7 @@ async def handle_event(self, event, **kwargs):
) in extract_params_url(event.parsed_url):
if self.in_bl(parameter_name) is False:
await self.emit_web_parameter(
host=parsed_url.hostname,
host=self._event_host(event),
param_type="GETPARAM",
name=parameter_name,
original_value=original_value,
Expand Down Expand Up @@ -1228,12 +1294,13 @@ async def handle_event(self, event, **kwargs):

if self.in_bl(cookie_name) is False:
self.assigned_cookies[cookie_name] = cookie_value
event_base = self._event_base_url(event)
await self.emit_web_parameter(
host=str(event.host),
host=self._event_host(event),
param_type="COOKIE",
name=cookie_name,
original_value=cookie_value,
url=self.url_unparse("COOKIE", event.parsed_url),
url=self.url_unparse("COOKIE", event_base),
description=f"Set-Cookie Assigned Cookie [{cookie_name}]",
additional_params={},
event=event,
Expand Down Expand Up @@ -1270,10 +1337,10 @@ async def handle_event(self, event, **kwargs):
original_value,
regex_name,
additional_params,
) in extract_params_location(header_value, event.parsed_url):
) in extract_params_location(header_value, self._event_base_url(event)):
if self.in_bl(parameter_name) is False:
await self.emit_web_parameter(
host=parsed_url.hostname,
host=self._event_host(event),
param_type="GETPARAM",
name=parameter_name,
original_value=original_value,
Expand Down
Loading
Loading