Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions graph_db/mixins/recon/js_recon_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,37 @@ def _link_to_file(session, node_id: str, node_label: str, rel_type: str, source_
return False
if rel_type not in ('HAS_JS_FINDING', 'HAS_SECRET', 'HAS_ENDPOINT'):
return False
session.run(
result = session.run(
f"""
MATCH (file:JsReconFinding {{id: $fid, finding_type: 'js_file'}})
MATCH (n:{node_label} {{id: $nid}})
MERGE (file)-[:{rel_type}]->(n)
MERGE (file)-[r:{rel_type}]->(n)
RETURN count(r) AS linked
""",
fid=file_node_id, nid=node_id
)
return True
record = result.single()
linked = record.get("linked", 0) if record else 0
return int(linked) > 0

def _link_endpoint_to_file(session, source_url: str, path: str, method: str, baseurl: str) -> bool:
"""Link a JS file to an Endpoint using the Endpoint canonical identity."""
file_node_id = file_node_ids.get(source_url)
if not file_node_id:
return False
result = session.run(
"""
MATCH (file:JsReconFinding {id: $fid, finding_type: 'js_file'})
MATCH (n:Endpoint {path: $path, method: $method, baseurl: $baseurl, user_id: $uid, project_id: $pid})
MERGE (file)-[r:HAS_ENDPOINT]->(n)
RETURN count(r) AS linked
""",
fid=file_node_id, path=path, method=method, baseurl=baseurl,
uid=user_id, pid=project_id,
)
record = result.single()
linked = record.get("linked", 0) if record else 0
return int(linked) > 0

# --- 1. JsReconFinding nodes (non-secret findings) ---
finding_types = [
Expand Down Expand Up @@ -549,6 +571,9 @@ def _link_to_file(session, node_id: str, node_label: str, rel_type: str, source_
created_endpoints = set()
for ep in js_recon_data.get("endpoints", []):
try:
if ep.get("validation_status") != "hittable":
continue

path = ep.get("path", "")
method = ep.get("method", "GET")
source_js = ep.get("source_js", "")
Expand All @@ -569,29 +594,49 @@ def _link_to_file(session, node_id: str, node_label: str, rel_type: str, source_
created_endpoints.add(ep_key)

effective_baseurl = base_url or 'upload'
id_hash = hashlib.sha256(f"{effective_baseurl}:{method}:{path}".encode()).hexdigest()[:16]
node_id = f"endpoint-{user_id}-{project_id}-js-{id_hash}"

session.run(
result = session.run(
"""
MERGE (e:Endpoint {path: $path, method: $method, baseurl: $baseurl, user_id: $uid, project_id: $pid})
ON CREATE SET e.source = 'js_recon',
ON CREATE SET e.id = COALESCE(e.id, $id),
e.source = 'js_recon',
e.category = $category,
e.full_url = $full_url,
e.status_code = $status_code,
e.resolved_url = $resolved_url,
e.validation_status = $validation_status,
e.endpoint_type = $ep_type,
e._js_recon_created = true,
e.updated_at = datetime()
ON MATCH SET e.js_recon_source = true,
ON MATCH SET e.id = COALESCE(e.id, $id),
e.js_recon_source = true,
e.endpoint_type = COALESCE(e.endpoint_type, $ep_type),
e.full_url = COALESCE(e.full_url, $full_url),
e.status_code = COALESCE($status_code, e.status_code),
e.resolved_url = CASE WHEN $resolved_url <> '' THEN $resolved_url ELSE e.resolved_url END,
e.validation_status = COALESCE($validation_status, e.validation_status),
e.updated_at = datetime()
WITH e, COALESCE(e._js_recon_created, false) AS created
REMOVE e._js_recon_created
RETURN created AS created
""",
path=path, method=method, baseurl=effective_baseurl,
uid=user_id, pid=project_id,
id=node_id,
category=ep.get("category", "endpoint"),
full_url=ep.get("full_url", ""),
status_code=ep.get("status_code"),
resolved_url=ep.get("resolved_url", ""),
validation_status=ep.get("validation_status"),
ep_type=ep.get("type", "rest"),
)
stats["endpoints_created"] += 1
record = result.single()
if record and bool(record.get("created", False)):
stats["endpoints_created"] += 1

if _link_to_file(session, node_id, 'Endpoint', 'HAS_ENDPOINT', source_js):
if _link_endpoint_to_file(session, source_js, path, method, effective_baseurl):
stats["relationships_created"] += 1

except Exception as e:
Expand Down
207 changes: 206 additions & 1 deletion recon/main_recon_modules/js_recon.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import requests
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
from urllib.parse import urljoin, urlparse
from typing import Optional

from recon.helpers.js_recon.patterns import (
Expand Down Expand Up @@ -66,6 +66,93 @@
_MAX_JS_FILE_SIZE = 5 * 1024 * 1024


def _parse_endpoint_validation_headers(header_lines: list) -> dict:
"""Parse user-provided endpoint validation headers."""
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; RedAmon-JsRecon/1.0)',
}

for line in header_lines:
if not isinstance(line, str):
continue

line = line.strip()
if not line or ':' not in line:
continue

name, value = line.split(':', 1)
name = name.strip()
value = value.strip()
if not name or not value:
continue

headers[name] = value

return headers


def _resolve_endpoint_candidate_url(endpoint: dict) -> str:
"""Resolve an endpoint candidate to an absolute URL suitable for validation."""
candidate = endpoint.get('full_url') or endpoint.get('path') or ''
if not isinstance(candidate, str):
return ''

candidate = candidate.strip()
if not candidate:
return ''

if candidate.startswith(('http://', 'https://')):
return candidate

source_js = endpoint.get('source_js') or ''
source_scheme = ''
if isinstance(source_js, str) and source_js:
source_scheme = urlparse(source_js).scheme

if candidate.startswith('//'):
scheme = source_scheme if source_scheme in ('http', 'https') else 'https'
return f'{scheme}:{candidate}'

if not isinstance(source_js, str) or urlparse(source_js).scheme not in ('http', 'https'):
return ''

return urljoin(source_js, candidate)


def _url_origin(url: str) -> str:
"""Return a normalized URL origin, or an empty string when unavailable."""
if not isinstance(url, str):
return ''
try:
parsed = urlparse(url)
if parsed.scheme in ('http', 'https') and parsed.netloc:
return f"{parsed.scheme}://{parsed.netloc}".lower()
except Exception:
pass
return ''


def _endpoint_probe_method(extracted_method: str) -> str:
"""Choose a non-mutating method for endpoint validation probes."""
method = extracted_method.upper() if isinstance(extracted_method, str) else 'GET'
if method in ('HEAD', 'OPTIONS'):
return method
if method == 'GET':
return 'GET'
return 'OPTIONS'


def _endpoint_probe_headers(endpoint: dict, resolved_url: str, custom_header_lines: list) -> dict:
"""Use custom validation headers only for same-origin endpoint probes."""
default_headers = _parse_endpoint_validation_headers([])
custom_headers = _parse_endpoint_validation_headers(custom_header_lines)
source_origin = _url_origin(endpoint.get('source_js', ''))
target_origin = _url_origin(resolved_url)
if source_origin and source_origin == target_origin:
return custom_headers
return default_headers


def _is_js_url(url: str) -> bool:
"""Check if a URL points to a JavaScript file."""
try:
Expand Down Expand Up @@ -538,6 +625,104 @@ def _validate_one(secret: dict) -> None:
return secrets


def _validate_extracted_endpoints(endpoints: list, settings: dict, request_func=None) -> list:
"""Validate extracted endpoints with lightweight non-following HTTP probes."""
if not settings.get('JS_RECON_VALIDATE_ENDPOINTS', True):
for endpoint in endpoints:
endpoint['validation_status'] = 'unvalidated'
endpoint['validation_error'] = 'validation_disabled'
return endpoints

def _clamp_int(value, default: int, minimum: int, maximum: int) -> int:
try:
value = int(value)
except (TypeError, ValueError):
value = default
return max(minimum, min(value, maximum))

def _configured_status_codes(value) -> set:
default_codes = [200, 201, 204, 301, 302, 307, 308, 401, 403, 405]
if value is None:
value = default_codes
elif isinstance(value, str):
value = re.split(r'[\s,]+', value)

codes = set()
try:
iterator = iter(value)
except TypeError:
iterator = iter([value])

for code in iterator:
try:
codes.add(int(code))
except (TypeError, ValueError):
continue

return codes or set(default_codes)

accepted_statuses = _configured_status_codes(
settings.get('JS_RECON_ENDPOINT_ACCEPT_STATUS')
)
custom_header_lines = settings.get('JS_RECON_ENDPOINT_CUSTOM_HEADERS', [])
timeout = _clamp_int(settings.get('JS_RECON_VALIDATION_TIMEOUT', 5), 5, 1, 30)
workers = _clamp_int(settings.get('JS_RECON_CONCURRENCY', 10), 10, 1, 20)
requester = request_func or requests.request

def _validate_one(endpoint: dict) -> None:
candidate = endpoint.get('full_url') or endpoint.get('path') or ''
if isinstance(candidate, str) and candidate.strip().lower().startswith(('ws://', 'wss://')):
endpoint['validation_status'] = 'unvalidated'
endpoint['validation_error'] = 'unsupported_scheme'
return

method = endpoint.get('method') or 'GET'
if not isinstance(method, str):
method = 'GET'
method = method.upper()
if endpoint.get('type') == 'websocket' or method in ('WS', 'WSS'):
endpoint['validation_status'] = 'unvalidated'
endpoint['validation_error'] = 'unsupported_scheme'
return

resolved_url = _resolve_endpoint_candidate_url(endpoint)
if not resolved_url:
endpoint['validation_status'] = 'unvalidated'
endpoint['validation_error'] = 'unresolved_url'
return

endpoint['resolved_url'] = resolved_url
probe_method = _endpoint_probe_method(method)
headers = _endpoint_probe_headers(endpoint, resolved_url, custom_header_lines)

try:
response = requester(
probe_method,
resolved_url,
timeout=timeout,
headers=headers,
allow_redirects=False,
)
status_code = int(getattr(response, 'status_code', 0))
endpoint['status_code'] = status_code
endpoint['validation_status'] = (
'hittable' if status_code in accepted_statuses else 'not_hittable'
)
endpoint['validation_error'] = ''
except requests.Timeout:
endpoint['validation_status'] = 'not_hittable'
endpoint['validation_error'] = 'timeout'
except Exception as exc:
endpoint['validation_status'] = 'not_hittable'
endpoint['validation_error'] = type(exc).__name__

if endpoints:
with ThreadPoolExecutor(max_workers=workers) as ex:
list(ex.map(_validate_one, endpoints))

return endpoints


def _extract_subdomains(
endpoints: list,
root_domain: str,
Expand Down Expand Up @@ -607,11 +792,18 @@ def _build_summary(results: dict) -> dict:
validated['unvalidated'] += 1

filtered_stats = results.get('_filtered_stats', {})
endpoint_validation = {'hittable': 0, 'not_hittable': 0, 'unvalidated': 0}

for endpoint in results.get('endpoints', []):
vstatus = endpoint.get('validation_status', 'unvalidated')
endpoint_validation[vstatus] = endpoint_validation.get(vstatus, 0) + 1

return {
'secrets_by_severity': severity_counts,
'secrets_by_type': type_counts,
'validated_keys': validated,
'endpoint_validation': endpoint_validation,
'endpoints_hittable': endpoint_validation.get('hittable', 0),
'false_positives_filtered': filtered_stats,
'false_positives_filtered_total': sum(filtered_stats.values()),
'dependency_confusion_count': len(results.get('dependencies', [])),
Expand Down Expand Up @@ -688,6 +880,9 @@ def run_js_recon(combined_result: dict, settings: dict) -> dict:
("JS_RECON_CUSTOM_FRAMEWORKS", "Filtering"),
("JS_RECON_VALIDATE_KEYS", "Validation"),
("JS_RECON_VALIDATION_TIMEOUT", "Validation"),
("JS_RECON_VALIDATE_ENDPOINTS", "Endpoint validation"),
("JS_RECON_ENDPOINT_ACCEPT_STATUS", "Endpoint validation"),
("JS_RECON_ENDPOINT_CUSTOM_HEADERS", "Endpoint validation"),
("JS_RECON_AI_SDK_DETECTION_ENABLED", "AI surface"),
],
)
Expand Down Expand Up @@ -742,6 +937,16 @@ def run_js_recon(combined_result: dict, settings: dict) -> dict:
print(f"[*][JsRecon] Validating {len(results['secrets'])} discovered secrets...")
results['secrets'] = _validate_secrets(results['secrets'], settings)

# 6. Validate endpoints
if results.get('endpoints'):
print(f"[*][JsRecon] Validating {len(results['endpoints'])} discovered endpoints...")
results['endpoints'] = _validate_extracted_endpoints(results['endpoints'], settings)
hittable_count = sum(
1 for endpoint in results['endpoints']
if endpoint.get('validation_status') == 'hittable'
)
print(f"[+][JsRecon] Endpoint validation: {hittable_count} hittable")

# 6. Subdomain feedback loop
root_domain = combined_result.get('domain', '')
known_subs = set()
Expand Down
6 changes: 6 additions & 0 deletions recon/project_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@
'JS_RECON_VALIDATE_KEYS': True,
'JS_RECON_VALIDATION_TIMEOUT': 5,
'JS_RECON_EXTRACT_ENDPOINTS': True,
'JS_RECON_VALIDATE_ENDPOINTS': True,
'JS_RECON_ENDPOINT_ACCEPT_STATUS': [200, 201, 204, 301, 302, 307, 308, 401, 403, 405],
'JS_RECON_ENDPOINT_CUSTOM_HEADERS': [],
'JS_RECON_REGEX_PATTERNS': True,
'JS_RECON_SOURCE_MAPS': True,
'JS_RECON_DEPENDENCY_CHECK': True,
Expand Down Expand Up @@ -1026,6 +1029,9 @@ def fetch_project_settings(project_id: str, webapp_url: str) -> dict[str, Any]:
settings['JS_RECON_CUSTOM_PACKAGES'] = project.get('jsReconCustomPackages', DEFAULT_SETTINGS['JS_RECON_CUSTOM_PACKAGES'])
settings['JS_RECON_CUSTOM_ENDPOINT_KEYWORDS'] = project.get('jsReconCustomEndpointKeywords', DEFAULT_SETTINGS['JS_RECON_CUSTOM_ENDPOINT_KEYWORDS'])
settings['JS_RECON_CUSTOM_FRAMEWORKS'] = project.get('jsReconCustomFrameworks', DEFAULT_SETTINGS['JS_RECON_CUSTOM_FRAMEWORKS'])
settings['JS_RECON_VALIDATE_ENDPOINTS'] = project.get('jsReconValidateEndpoints', DEFAULT_SETTINGS['JS_RECON_VALIDATE_ENDPOINTS'])
settings['JS_RECON_ENDPOINT_ACCEPT_STATUS'] = project.get('jsReconEndpointAcceptStatus') or DEFAULT_SETTINGS['JS_RECON_ENDPOINT_ACCEPT_STATUS']
settings['JS_RECON_ENDPOINT_CUSTOM_HEADERS'] = project.get('jsReconEndpointCustomHeaders', DEFAULT_SETTINGS['JS_RECON_ENDPOINT_CUSTOM_HEADERS'])
settings['JS_RECON_AI_SDK_DETECTION_ENABLED'] = project.get('jsReconAiSdkDetectionEnabled', DEFAULT_SETTINGS['JS_RECON_AI_SDK_DETECTION_ENABLED'])

# FFuf Directory Fuzzer
Expand Down
Loading