-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmcp_server.py
More file actions
2247 lines (1983 loc) · 134 KB
/
mcp_server.py
File metadata and controls
2247 lines (1983 loc) · 134 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
ContrastAPI MCP Server — stdio + Streamable HTTP transport
Exposes ContrastAPI endpoints as MCP tools for Claude Code / Claude Desktop.
Calls the live API at api.contrastcyber.com (no local server needed).
Stdio usage (.mcp.json):
{
"mcpServers": {
"contrastapi": {
"command": "python3",
"args": ["/path/to/contrastapi/mcp_server.py"]
}
}
}
HTTP usage: POST https://api.contrastcyber.com/mcp
"""
import contextvars
import functools
import ipaddress
import json
import logging
import os
import pathlib
import re
import sys
from typing import Annotated, Literal
from urllib.parse import quote
# v1.22.1 — when main.py loads this file via importlib.util.spec_from_file_location,
# the spec loader does NOT add the parent directory to sys.path the way `python
# mcp_server.py` would. Without this, `from app.exceptions import ...` below
# raises ModuleNotFoundError, main.py silently catches it, and the MCP route is
# never mounted (production /mcp/ → 404). Adding the repo root explicitly makes
# the `app.*` package importable in BOTH execution contexts (pytest + spec load).
#
# v1.27.1 — also add repo_root/app/ to sys.path. The 5 per-package schemas import
# `from schemas import BaseSuccessResponse` which only resolves when app/ is on
# sys.path. Production uvicorn launches from app/ cwd so this is implicit there;
# standalone runners (Glama Docker `python3 /app/mcp_server.py`) only have the
# repo root on sys.path → ModuleNotFoundError on `from schemas`.
_REPO_ROOT = str(pathlib.Path(__file__).parent)
_APP_DIR = str(pathlib.Path(__file__).parent / "app")
for _p in (_REPO_ROOT, _APP_DIR):
if _p not in sys.path:
sys.path.insert(0, _p)
import httpx # noqa: E402 (must follow sys.path patch above)
# Bare `auth` (NOT `app.auth`): routes + the MCP gate all do `from auth
# import ...`, creating sys.modules["auth"]. Importing `app.auth` here would
# load a SECOND module object with its own random INTERNAL_TRUST_TOKEN, so the
# token we send would never match the one _is_trusted_internal checks → Opt 2
# silently dead. Resolvable in every context: _APP_DIR is on sys.path (above).
from auth import INTERNAL_TRUST_TOKEN # noqa: E402
from fastapi import HTTPException # noqa: E402
from mcp.server.fastmcp import FastMCP # noqa: E402
from mcp.server.fastmcp.exceptions import ToolError # noqa: E402
from mcp.server.transport_security import TransportSecuritySettings # noqa: E402
from mcp.types import ToolAnnotations # noqa: E402
from pydantic import Field, ValidationError # noqa: E402
from app.atlas.schemas import ( # noqa: E402
AtlasCaseStudyResponse,
AtlasCaseStudySearchResponse,
AtlasTechniqueResponse,
AtlasTechniqueSearchResponse,
BulkAtlasTechniqueResponse,
)
from app.codesec.schemas import ( # noqa: E402
CheckHeadersResponse,
CodeCheckResponse,
DependenciesResponse,
ScanHeadersResponse,
)
from app.cve.schemas import ( # noqa: E402
BulkCveResponse,
CveResponse,
CveSearchResponse,
CvssDetailsResponse,
CweLookupResponse,
ExploitResponse,
KevDetailResponse,
RiskScoreResponse,
)
from app.d3fend.schemas import ( # noqa: E402
D3fendCoverageResponse,
D3fendDefenseResponse,
D3fendDefenseSearchResponse,
D3fendForAttackResponse,
)
from app.domain.schemas import ( # noqa: E402
AsnResponse,
AuditResponse,
BrandAssetsResponse,
DisposableResponse,
DnsResponse,
DomainReportResponse,
EmailMxResponse,
EmailSecurityPostureResponse,
EmailVerifyResponse,
IpLookupResponse,
PhoneLookupResponse,
RedirectChainResponse,
RobotsTxtResponse,
SeoAuditResponse,
SslResponse,
SubdomainsResponse,
TechResponse,
ThreatReportResponse,
ThreatResponse,
UsernameLookupResponse,
WaybackResponse,
WhoisResponse,
)
from app.exceptions import ( # noqa: E402
AppException,
AuthRequiredException,
InvalidArgumentException,
InvalidCveIdException,
InvalidDomainException,
InvalidHashException,
InvalidIpException,
NotFoundException,
RateLimitExceededException,
TierLimitException,
UpstreamErrorException,
UpstreamTimeoutException,
)
from app.ioc.schemas import ( # noqa: E402
BulkIocResponse,
HashResponse,
IocResponse,
PasswordResponse,
PhishingResponse,
)
from app.schemas import ErrorResponse, TechStackCveAuditResponse # noqa: E402
from app.sigma.schemas import ( # noqa: E402
BulkSigmaRuleLookupResponse,
SigmaRuleLookupResponse,
)
# Shared annotations — all tools are read-only API lookups.
# v1.22.0 splits the legacy `_RO` (open-world default) into closed/open world
# variants so agents can reason about latency and retry semantics:
# _RO_CLOSED_WORLD — local DB lookups (CVE/CWE/ATLAS/D3FEND catalog, codesec
# regex), deterministic, no external network.
# _RO_OPEN_WORLD — live external fetches (DNS/WHOIS/SSL, Shodan/AbuseIPDB,
# crt.sh, etc.), may time out or rate-limit.
# `_RO` retained as an alias for legacy callsites until Commit C swaps them.
_RO_CLOSED_WORLD = ToolAnnotations(
readOnlyHint=True,
destructiveHint=False,
idempotentHint=True,
openWorldHint=False,
)
_RO_OPEN_WORLD = ToolAnnotations(
readOnlyHint=True,
destructiveHint=False,
idempotentHint=True,
openWorldHint=True,
)
logger = logging.getLogger("contrastapi.mcp")
# Carries the real client IP from MCP HTTP handler to internal API calls,
# so backend rate limiting sees the original IP instead of localhost.
_client_ip_var: contextvars.ContextVar[str] = contextvars.ContextVar("mcp_client_ip", default="")
# v1.32.4 Pattern B foundation: carries the caller's tier ("free" | "pro" | "test")
# from the MCP gate into MCP tool functions. Used by `_audit_domain_impl()` and
# `_threat_report_impl()` (Batches 4-5) so they can gate Pro-only sub-calls
# (AbuseIPDB, Shodan) without an HTTP round-trip back to the REST `require_auth`
# layer. Default "pro" so direct stdio / local-CLI invocations get full access.
_user_tier_var: contextvars.ContextVar[str] = contextvars.ContextVar("mcp_user_tier", default="pro")
def _get_user_tier() -> str:
"""Read the caller's tier from the MCP gate's ContextVar.
Returns "pro" as a safe default when no gate has run (direct stdio calls,
unit tests). MCP tool wrappers should call this instead of touching
`_user_tier_var` directly so future logic (e.g. tier inference from key
metadata) lives in one place.
"""
return _user_tier_var.get()
mcp = FastMCP(
"contrastapi",
stateless_http=True,
json_response=True, # JSON instead of SSE — Cloudflare compatible
# Mounted at /mcp in FastAPI — sub-app route must be "/"
streamable_http_path="/",
transport_security=TransportSecuritySettings(
enable_dns_rebinding_protection=False, # nginx handles this
),
)
# FastMCP doesn't accept a `version` kwarg, so the lowlevel Server falls back
# to the installed `mcp` package version (currently 1.27.0) for serverInfo.
# Pin it to OUR application version so MCP clients and indexers can tell which
# release of ContrastAPI they're talking to. We poke the private `_mcp_server`
# attribute because FastMCP does not expose a setter; if a future SDK upgrade
# renames or restructures it, log the failure (don't block startup) so we can
# notice the silent revert to the package version.
try:
from app.config import VERSION as _APP_VERSION
mcp._mcp_server.version = _APP_VERSION
except Exception as _ver_pin_exc: # pragma: no cover - metadata, never block startup
logger.warning(
"Failed to pin MCP serverInfo.version to app.config.VERSION (%s); "
"serverInfo will fall back to the installed mcp package version.",
_ver_pin_exc,
)
# Use local API if running on the server, otherwise use public API
API_BASE = os.environ.get("CONTRASTAPI_URL", "http://localhost:8002")
API_KEY = os.environ.get("CONTRASTAPI_KEY", "")
TIMEOUT = 30.0
_LOG_SANITIZE = re.compile(
r"/v1/(phone|email/mx|email/disposable|ip|domain|dns|whois|subdomains|certs|ssl|threat|tech|monitor|ioc|phishing|scan/headers|asn|password|archive|username|cve|cves|exploit|hash|epss)(?:/(lookup|search|leading|bulk|report))?/[^?]+",
re.IGNORECASE,
)
_CONTROL_CHARS = re.compile(r"[\x00-\x1f\x7f]")
def _safe_path(path: str) -> str:
"""Redact PII from API paths for safe logging."""
safe = _CONTROL_CHARS.sub("", path)
query_idx = safe.find("?")
if query_idx >= 0:
safe = safe[:query_idx]
return _LOG_SANITIZE.sub(
lambda m: (
f"/v1/{m.group(1).lower()}/{m.group(2).lower()}/***" if m.group(2) else f"/v1/{m.group(1).lower()}/***"
),
safe,
)
def _safe_ip(ip: str) -> str:
"""Validate and sanitize client IP — reject spoofed/malformed values."""
ip = _CONTROL_CHARS.sub("", ip).strip()
if not ip:
return ""
try:
return str(ipaddress.ip_address(ip))
except ValueError:
return ""
def _headers() -> dict:
h = {"Accept": "application/json"}
if API_KEY:
h["Authorization"] = f"Bearer {API_KEY}"
# Opt 2: mark this as a trusted in-process hop so REST require_auth skips
# re-auth + re-charge (the /mcp/ gate already charged the resolved tier).
h["X-Internal-Auth"] = INTERNAL_TRUST_TOKEN
h["X-Internal-Tier"] = _get_user_tier()
# Forward real client IP so backend applies correct rate limits
client_ip = _safe_ip(_client_ip_var.get())
if client_ip:
h["X-Forwarded-For"] = client_ip
return h
_http_client: httpx.AsyncClient | None = None
def _get_client() -> httpx.AsyncClient:
"""Return a shared httpx client with connection pooling."""
global _http_client
if _http_client is None or _http_client.is_closed:
_http_client = httpx.AsyncClient(base_url=API_BASE, timeout=TIMEOUT)
return _http_client
def _log_ip() -> str:
"""Return sanitized client IP for logging."""
return _safe_ip(_client_ip_var.get()) or "unknown"
# === v1.22.0 raise-pattern infrastructure ====================================
#
# Coexists with the legacy `_get`/`_post`/`_validate_*` helpers above. Commit C
# swaps every tool body to the new helpers and deletes the legacy ones; until
# then both surfaces are live so the suite stays green at every commit boundary.
def _extract_upstream_message(resp: httpx.Response) -> str:
"""Pull a useful, length-capped error message out of an upstream JSON body.
Mirrors the field-priority of legacy `_format_error` but returns a single
string suitable for `ErrorDetail.message` (which itself enforces
max_length=500). Falls back to bare 'Error <status>' on parse failure.
"""
status = resp.status_code
try:
body = resp.json()
except (ValueError, json.JSONDecodeError):
return f"Error {status}"
if not isinstance(body, dict):
return f"Error {status}"
msg = body.get("error") or body.get("detail") or body.get("message")
# Post-1.22.2 envelope: {error: {code, message, ...}}. Pre-1.22.2 was a
# bare string; both shapes still appear in the wild so handle both.
if isinstance(msg, dict):
inner = msg.get("message")
if isinstance(inner, str) and inner:
return inner[:500]
msg = None
if isinstance(msg, str) and msg:
return msg[:500]
hint = body.get("hint") or body.get("suggestion") or body.get("upgrade")
if isinstance(hint, str) and hint:
return hint[:500]
if isinstance(hint, dict):
inner = hint.get("message")
if isinstance(inner, str) and inner:
return inner[:500]
return f"Error {status}"
def _http_error_to_app_exception(resp: httpx.Response) -> AppException:
"""Map an upstream `httpx.Response` to the appropriate `AppException` subclass.
Status -> exception:
400, 422 -> InvalidArgumentException
401 -> AuthRequiredException
403 -> TierLimitException (carries upgrade_url)
404 -> NotFoundException
429 -> RateLimitExceededException (carries retry_after + upgrade_url)
504 -> UpstreamTimeoutException
anything else -> UpstreamErrorException
Centralizing the mapping means tool bodies never branch on status code.
"""
status = resp.status_code
detail = _extract_upstream_message(resp)
upgrade = "https://contrastcyber.com/pricing"
if status == 404:
return NotFoundException(detail)
if status == 429:
try:
retry = int(resp.headers.get("retry-after", "60"))
except (TypeError, ValueError):
retry = 60
# Cap at 1h — agents that respect retry_after literally must not get
# tricked into multi-year backoffs by a hostile/buggy upstream header.
retry = max(0, min(retry, 3600))
return RateLimitExceededException(detail, retry_after=retry, upgrade_url=upgrade)
if status == 401:
return AuthRequiredException(detail)
if status == 403:
return TierLimitException(detail, upgrade_url=upgrade)
if status == 504:
return UpstreamTimeoutException(detail)
if status in (400, 422):
return InvalidArgumentException(detail)
return UpstreamErrorException(detail)
async def _aget(path: str, params: dict | None = None) -> dict:
"""v1.22 raise-pattern GET. Returns the JSON dict on success; raises an
`AppException` subclass on any failure (mapping in `_http_error_to_app_exception`).
Network/timeout failures collapse to `UpstreamTimeoutException`.
"""
client_ip = _log_ip()
try:
resp = await _get_client().get(path, params=params, headers=_headers())
resp.raise_for_status()
logger.info("mcp_tool GET %s %d %s", _safe_path(path), resp.status_code, client_ip)
return resp.json()
except httpx.HTTPStatusError as e:
logger.info("mcp_tool GET %s %d %s", _safe_path(path), e.response.status_code, client_ip)
raise _http_error_to_app_exception(e.response) from e
except httpx.HTTPError as e:
logger.info("mcp_tool GET %s err %s", _safe_path(path), client_ip)
raise UpstreamTimeoutException("Request failed") from e
async def _apost(path: str, json_body: dict, params: dict | None = None) -> dict:
"""v1.22 raise-pattern POST. See `_aget`."""
client_ip = _log_ip()
try:
resp = await _get_client().post(path, json=json_body, params=params, headers=_headers())
resp.raise_for_status()
logger.info("mcp_tool POST %s %d %s", _safe_path(path), resp.status_code, client_ip)
return resp.json()
except httpx.HTTPStatusError as e:
logger.info("mcp_tool POST %s %d %s", _safe_path(path), e.response.status_code, client_ip)
raise _http_error_to_app_exception(e.response) from e
except httpx.HTTPError as e:
logger.info("mcp_tool POST %s err %s", _safe_path(path), client_ip)
raise UpstreamTimeoutException("Request failed") from e
# --- Raise-pattern input validators (v1.22.0) ---
#
# Mirror the legacy `_validate_*` helpers above but raise an
# `InvalidArgumentException` subclass on bad input and return the normalized
# value on success. Tool bodies in Commit C use these directly, so the body
# can be a single line: `return CveResponse(**await _aget(f"/v1/cve/{_require_cve(cve_id)}"))`.
def _require_domain(domain: str) -> str:
"""Validate + normalize. Raises InvalidDomainException on bad input."""
domain = (domain or "").strip().lower().rstrip(".")
if not _DOMAIN_RE.match(domain):
raise InvalidDomainException(f"Invalid domain format: {domain!r}. Expected format: example.com")
return domain
def _require_ip(ip: str) -> str:
"""Validate any IP (public or private). Raises InvalidIpException on bad input."""
ip = (ip or "").strip()
try:
ipaddress.ip_address(ip)
except ValueError as e:
raise InvalidIpException(f"Invalid IP address: {ip!r}. Expected IPv4 (1.2.3.4) or IPv6.") from e
return ip
def _require_public_ip(ip: str) -> str:
"""Validate IP and reject private/reserved ranges. Raises InvalidIpException.
Mirrors `app/validation.py:is_private_ip()` SSRF guard: rejects unspecified
(0.0.0.0, ::) in addition to private / loopback / reserved / link-local /
multicast — keeps MCP-layer validation in lockstep with the HTTP layer.
"""
ip = (ip or "").strip()
try:
addr = ipaddress.ip_address(ip)
except ValueError as e:
raise InvalidIpException(f"Invalid IP address: {ip!r}. Expected IPv4 (1.2.3.4) or IPv6.") from e
if (
addr.is_private
or addr.is_loopback
or addr.is_reserved
or addr.is_link_local
or addr.is_multicast
or addr.is_unspecified
):
raise InvalidIpException(f"Private/reserved IP addresses are not allowed: {ip!r}")
return ip
def _require_cve(cve_id: str) -> str:
cve_id = (cve_id or "").strip()
if not _CVE_RE.match(cve_id):
raise InvalidCveIdException(f"Invalid CVE ID: {cve_id!r}. Expected format: CVE-2024-1234")
return cve_id.upper()
def _require_cwe(cwe_id: str) -> str:
cwe_id = (cwe_id or "").strip()
if not _CWE_RE.match(cwe_id):
raise InvalidArgumentException(f"Invalid CWE ID: {cwe_id!r}. Expected format: CWE-79 (or just '79')")
return cwe_id
def _require_hash(file_hash: str) -> str:
file_hash = (file_hash or "").strip()
if not _HASH_RE.match(file_hash):
raise InvalidHashException(
f"Invalid hash: {file_hash!r}. Expected MD5 (32 hex), SHA-1 (40 hex), or SHA-256 (64 hex)."
)
return file_hash.lower()
def _require_atlas_technique(value: str) -> str:
value = (value or "").strip()
if not _ATLAS_TECHNIQUE_RE.match(value):
raise InvalidArgumentException(
f"Invalid ATLAS technique id: {value!r}. Expected 'AML.T####' or 'AML.T####.###' (e.g. AML.T0000)"
)
return value.upper()
def _require_atlas_case_study(value: str) -> str:
value = (value or "").strip()
if not _ATLAS_CASE_STUDY_RE.match(value):
raise InvalidArgumentException(
f"Invalid ATLAS case study id: {value!r}. Expected 'AML.CS####' (e.g. AML.CS0000)"
)
return value.upper()
def _require_atlas_tactic(value: str) -> str:
value = (value or "").strip()
if not _ATLAS_TACTIC_RE.match(value):
raise InvalidArgumentException(f"Invalid ATLAS tactic id: {value!r}. Expected 'AML.TA####' (e.g. AML.TA0002)")
return value.upper()
def _require_d3fend_defense(value: str) -> str:
value = (value or "").strip()
if not _D3FEND_DEFENSE_RE.match(value):
raise InvalidArgumentException(
f"Invalid D3FEND defense_id: {value!r}. Expected CamelCase slug (e.g. 'TokenBinding')"
)
return value
def _require_attack_technique(value: str) -> str:
value = (value or "").strip()
if not _ATTACK_TECHNIQUE_RE.match(value):
raise InvalidArgumentException(
f"Invalid ATT&CK technique id: {value!r}. Expected 'T####' or 'T####.###' (e.g. T1059, T1550.001)"
)
return value.upper()
def mcp_tool_safe(*, annotations: ToolAnnotations):
"""v1.22.0 tool decorator. Wraps `@mcp.tool` so AppException (and Pydantic
ValidationError raised when the upstream response body does not match the
declared response model) becomes a spec-compliant MCP error result.
On success: FastMCP emits both `content[0].text` (JSON) and `structuredContent`
(dict) — matching MCP 1.0 spec for tools whose output is a Pydantic model union.
On failure (v1.32.2): raises FastMCP `ToolError` so the wire response carries
`isError: true` per MCP spec. Prior versions returned an `ErrorResponse`
instance which FastMCP packaged as a successful tool result with the error
payload nested inside `structuredContent` — agents could not branch on
success vs error reliably. The ToolError message embeds the full
`ErrorResponse` JSON so agents parsing `content[0].text` still get the
structured `{error: {code, message, retry_after_seconds, ...}}` envelope.
"""
def decorator(fn):
@functools.wraps(fn)
async def wrapped(*args, **kwargs):
try:
return await fn(*args, **kwargs)
except AppException as e:
payload = ErrorResponse(error=e.to_error_detail()).model_dump_json(exclude_none=True)
raise ToolError(payload) from None
except ValidationError:
# Upstream returned a body that does not match our Pydantic schema
# (cache poisoning / sync drift / partial JSON). Surface as
# UpstreamErrorException with a fixed-length, sanitized message —
# the raw ValidationError carries upstream-controlled values that
# we MUST NOT log verbatim (CRLF injection into plain-text sinks)
# nor ship to the MCP wire (would re-trigger ErrorDetail.message
# max_length=500 if oversized, raising a second unhandled error).
logger.warning("mcp_tool %s upstream response failed schema validation", fn.__name__)
exc = UpstreamErrorException("Upstream response validation failed")
payload = ErrorResponse(error=exc.to_error_detail()).model_dump_json(exclude_none=True)
raise ToolError(payload) from None
return mcp.tool(annotations=annotations, structured_output=True)(wrapped)
return decorator
# --- Input validation ---
_DOMAIN_RE = re.compile(r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.[A-Za-z0-9-]{1,63})*\.[A-Za-z]{2,}$")
_CVE_RE = re.compile(r"^CVE-\d{4}-\d{4,}$", re.IGNORECASE)
_HASH_RE = re.compile(r"^[a-fA-F0-9]{32}$|^[a-fA-F0-9]{40}$|^[a-fA-F0-9]{64}$")
_CWE_RE = re.compile(r"^(?:CWE[- ]?)?\d{1,6}$", re.IGNORECASE)
_ATLAS_TECHNIQUE_RE = re.compile(r"^AML\.T\d{4}(?:\.\d{3})?$", re.IGNORECASE)
_ATLAS_CASE_STUDY_RE = re.compile(r"^AML\.CS\d{4}$", re.IGNORECASE)
_ATLAS_TACTIC_RE = re.compile(r"^AML\.TA\d{4}$", re.IGNORECASE)
_D3FEND_DEFENSE_RE = re.compile(r"^[A-Za-z][A-Za-z0-9]{0,63}$")
_ATTACK_TECHNIQUE_RE = re.compile(r"^T\d{4}(?:\.\d{3})?$", re.IGNORECASE)
_D3FEND_TACTICS = {"Model", "Harden", "Detect", "Isolate", "Deceive", "Evict", "Restore"}
_SIGMA_UUID_RE = re.compile(r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$")
# v1.23.0 — target-type auto-detection for contrast_triage prompt.
# Order matters: most-specific patterns first so an ATLAS sub-technique like
# "AML.T0000.000" cannot be misclassified as an ATT&CK T-code (which it isn't).
TargetType = Literal[
"cve",
"atlas_technique",
"attack_technique",
"cwe",
"hash",
"ip",
"domain",
"unknown",
]
def _detect_target_type(target: str) -> TargetType:
"""Classify a triage target string by format.
Used by `contrast_triage` Prompt to pick the right tool chain.
Domains and IPs share dotted notation; resolution order:
1. CVE-YYYY-NNNN
2. ATLAS technique (AML.T#### or AML.T####.###)
3. ATT&CK T-code (T#### / T####.###)
4. CWE-#### (or bare 'CWE-79')
5. Hash (32/40/64 hex)
6. IP (ipaddress.ip_address — covers IPv4 + IPv6)
7. Domain (FQDN regex)
Returns 'unknown' when nothing matches.
"""
s = (target or "").strip()
if not s:
return "unknown"
if _CVE_RE.match(s):
return "cve"
if _ATLAS_TECHNIQUE_RE.match(s):
return "atlas_technique"
if _ATTACK_TECHNIQUE_RE.match(s):
return "attack_technique"
if "CWE" in s.upper() and _CWE_RE.match(s):
# _CWE_RE alone would also match a bare digit string ("79"), which is
# ambiguous — could be an ASN, port, or IP octet. Triage classifies as
# CWE only when the 'CWE' prefix is explicit.
return "cwe"
if _HASH_RE.match(s):
return "hash"
try:
ipaddress.ip_address(s)
return "ip"
except ValueError:
pass
if _DOMAIN_RE.match(s):
return "domain"
return "unknown"
# === Domain Intelligence ===
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def domain_report(
domain: Annotated[
str, Field(description="Root domain to analyze, without protocol or path (e.g. 'example.com', 'shopify.com')")
],
include_all_txt: Annotated[
bool,
Field(
description="Return every TXT record (default: False, only SPF/DMARC/DKIM/MTA-STS/TLS-RPT kept). dns.total_txt_records is always emitted with the honest pre-filter count. Default filter strips vendor verification strings (google-site-verification, ms=, facebook-domain-verification, etc.) that bloat the response without security signal. Set True only when you need the raw TXT inventory."
),
] = False,
) -> DomainReportResponse | ErrorResponse:
"""Query DNS, WHOIS, SSL, subdomains, and threat intel for a domain in one call. By default dns.txt is filtered to security-relevant entries (SPF, DMARC, DKIM, MTA-STS, TLS-RPT) and dns.total_txt_records reports the honest pre-filter count; pass include_all_txt=true for the raw TXT list. Use as a starting point for domain investigations; use audit_domain for live headers + tech stack. Response carries next_calls — chain with subdomain_enum (always emitted), ssl_check + tech_fingerprint (when an A record resolves) for the standard recon depth without re-prompting. Free: 30/hr, Pro: 500/hr. Returns domain report with DNS records, WHOIS data, SSL cert, risk score, email config, threat status, recommendation, and next_calls."""
params = {"include_all_txt": "true"} if include_all_txt else None
return DomainReportResponse(**await _aget(f"/v1/domain/{_require_domain(domain)}", params=params))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def audit_domain(
domain: Annotated[
str,
Field(description="Root domain to audit, without protocol or path (e.g. 'example.com', 'shopify.com')"),
],
include_all_txt: Annotated[
bool,
Field(
description="Return every TXT record under report.dns.txt (default: False, only SPF/DMARC/DKIM/MTA-STS/TLS-RPT kept). report.dns.total_txt_records is always emitted with the honest pre-filter count. Default filter strips vendor verification strings (google-site-verification, ms=, facebook-domain-verification, etc.) that bloat the response without security signal. Set True only when you need the raw TXT inventory."
),
] = False,
) -> AuditResponse | ErrorResponse:
"""Perform comprehensive domain audit: combines domain_report + live HTTP security headers + technology fingerprinting. By default report.dns.txt is filtered to security-relevant entries (SPF, DMARC, DKIM, MTA-STS, TLS-RPT) and report.dns.total_txt_records reports the honest pre-filter count; pass include_all_txt=true for the raw TXT list. Use when you need the full picture (recon + active checks); use domain_report for passive-only assessment. Response carries next_calls — chain with subdomain_enum (always emitted) and ssl_check (when an A record resolves) for the residual recon depth (tech_fingerprint already inline as `technologies`). Free: 30/hr (costs 6 credits), Pro: 500/hr. Returns {domain, report, technologies, live_headers, summary, next_calls}."""
# v1.32.4 Pattern B: call the shared internal helper directly instead of
# HTTP-hopping to /v1/audit/{domain} via _aget(). The MCP gate already
# charged COST_AUDIT via _TOOL_COST["audit_domain"]; a REST round-trip
# would double-charge and add ~50ms loopback latency.
from app.domain.routes import _audit_domain_impl
try:
result = await _audit_domain_impl(
_require_domain(domain),
include_all_txt=include_all_txt,
tier=_get_user_tier(),
client_ip=_safe_ip(_client_ip_var.get()),
)
except HTTPException as e:
# _audit_domain_impl raises FastAPI HTTPException for validation /
# timeout / upstream failure (REST contract). @mcp_tool_safe only
# catches AppException, so convert here: 400/422 → invalid_argument,
# 504 → upstream_timeout, anything else → upstream_error.
detail = e.detail if isinstance(e.detail, str) else "Audit failed"
if e.status_code in (400, 422):
raise InvalidArgumentException(detail) from None
if e.status_code == 504:
raise UpstreamTimeoutException(detail) from None
raise UpstreamErrorException(detail) from None
return AuditResponse(**result)
@mcp_tool_safe(annotations=_RO_CLOSED_WORLD)
async def tech_stack_cve_audit(
domain: Annotated[
str,
Field(
description="Target domain to fingerprint and CVE-audit (e.g. 'example.com'). IPs and internal hostnames are rejected.",
min_length=1,
max_length=253,
),
],
) -> TechStackCveAuditResponse | ErrorResponse:
"""Composite tech-stack + CVE audit (MCP-only, no REST endpoint). Detects technologies on the target domain, queries CVE database for known vulnerabilities per product, enriches top-10 CVE candidates with CISA KEV federal patch deadlines, and checks public exploit / PoC availability. Identical for every tier — all data is sourced from local DB mirrors (no Shodan/AbuseIPDB), so there is no tier gating. CVE candidate batch: 50. Cost: 10 credits per call — Free 30/hr ≈ 3 audits, Pro 500/hr ≈ 50 audits. Returns {domain, technologies, cves_by_tech, kev_findings, exploit_findings, summary, next_calls}."""
from app.domain.routes import _tech_stack_cve_audit_impl
try:
result = await _tech_stack_cve_audit_impl(
domain,
tier=_get_user_tier(),
client_ip="",
)
except HTTPException as e:
detail = e.detail if isinstance(e.detail, str) else "tech_stack_cve_audit failed"
if e.status_code in (400, 422):
raise InvalidArgumentException(detail) from None
if e.status_code == 504:
raise UpstreamTimeoutException(detail) from None
logger.warning("tech_stack_cve_audit: unmapped HTTPException status %d", e.status_code)
raise UpstreamErrorException(detail) from None
return TechStackCveAuditResponse(**result)
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def threat_report(
ip: Annotated[
str,
Field(
description="Public IPv4 or IPv6 address to investigate (e.g. '8.8.8.8', '1.1.1.1'). Private/reserved IPs are rejected."
),
],
) -> ThreatReportResponse | ErrorResponse:
"""Query comprehensive threat profile for an IP: Shodan host data, AbuseIPDB reputation, ASN/geolocation, and open ports. Use for IP investigation and SOC alert triage; for domain data use domain_report. Note: nested asn block always returns at most 50 IPv4/IPv6 prefixes — call asn_lookup with include_full_prefixes=True for the full announced-prefixes list. enrichment.vulns is severity-aware list[VulnInfo] (cve_id + severity + cvss_v3) — Phase 2 v1.16.0 BREAKING; pre-1.16 it was list[str] of CVE IDs. Free: 30/hr (costs 6 credits), Pro: 500/hr. Returns {ip, enrichment, abuseipdb, shodan, asn, threat_level}."""
# v1.32.4 Pattern B: call the shared internal helper directly instead of
# HTTP-hopping to /v1/threat-report/{ip} via _aget(). The MCP gate already
# charged COST_THREAT_REPORT via _TOOL_COST["threat_report"]; a REST round
# trip would double-charge and add ~50ms loopback latency. Tier comes from
# the ContextVar published by the MCP gate after authentication so Pro-only
# AbuseIPDB/Shodan tasks fire for Pro callers and Free callers get the
# pro_only stubs (parity with the REST handler's auth.tier branching).
from app.domain.routes import _threat_report_impl
try:
result = await _threat_report_impl(
_require_public_ip(ip),
tier=_get_user_tier(),
client_ip=_safe_ip(_client_ip_var.get()),
)
except HTTPException as e:
# _threat_report_impl raises FastAPI HTTPException for validation
# (invalid IP, private IP). @mcp_tool_safe only catches AppException,
# so convert here: 400/422 → invalid_argument, 504 → upstream_timeout,
# anything else → upstream_error.
detail = e.detail if isinstance(e.detail, str) else "Threat report failed"
if e.status_code in (400, 422):
raise InvalidArgumentException(detail) from None
if e.status_code == 504:
raise UpstreamTimeoutException(detail) from None
raise UpstreamErrorException(detail) from None
return ThreatReportResponse(**result)
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def dns_lookup(
domain: Annotated[
str, Field(description="Root domain to query, without protocol or path (e.g. 'example.com', 'cloudflare.com')")
],
) -> DnsResponse | ErrorResponse:
"""Query all DNS record types (A, AAAA, MX, NS, TXT, CNAME, SOA) for a domain. Use for mail routing inspection, nameserver verification, or SPF/DMARC checks; for full overview use domain_report. TXT records are returned raw (no filter) — `total_txt_records` always carries the honest count (use domain_report for the security-only filtered TXT view). Free: 30/hr, Pro: 500/hr. Returns {domain, records: {a, aaaa, mx, ns, txt, total_txt_records, cname, soa}, summary}."""
return DnsResponse(**await _aget(f"/v1/dns/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def whois_lookup(
domain: Annotated[str, Field(description="Root domain to query WHOIS for (e.g. 'example.com', 'github.com')")],
) -> WhoisResponse | ErrorResponse:
"""Retrieve WHOIS registration data: registrar, creation/expiry dates, nameservers, status. Use to verify domain ownership, age, expiration; for full audit use domain_report. Free: 30/hr, Pro: 500/hr. Returns {domain, whois: {registrar, creation_date, expiry_date, updated_date, name_servers, status, raw_length, error}, summary}."""
return WhoisResponse(**await _aget(f"/v1/whois/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def ssl_check(
domain: Annotated[
str, Field(description="Domain to check SSL/TLS certificate for (e.g. 'example.com', 'api.stripe.com')")
],
) -> SslResponse | ErrorResponse:
"""Analyze SSL/TLS certificate: grade (A/B/C/D/F), protocol version, cipher suite, chain, expiry, Subject Alternative Names, and structured validation findings. Invalid certs (expired, self-signed, hostname mismatch, untrusted root) are reported as findings via valid=false + validation_errors[] rather than as endpoint failures, so an unreachable cert still returns useful intel. Grade D = cert readable but invalid; F = expired, legacy TLS, or probe failure. Use to audit certificate validity and detect expiring certs; for full domain audit use audit_domain. Free: 30/hr, Pro: 500/hr. Returns {grade, valid, validation_errors, protocol, cipher, issuer, subject, not_before, not_after, days_remaining, chain, san, warnings}."""
return SslResponse(**await _aget(f"/v1/ssl/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def subdomain_enum(
domain: Annotated[
str, Field(description="Root domain to enumerate subdomains for (e.g. 'example.com', 'tesla.com')")
],
) -> SubdomainsResponse | ErrorResponse:
"""Discover subdomains using passive methods: Certificate Transparency logs + DNS brute-force (no active probing). Use to map organization's attack surface; non-intrusive. Response carries next_calls — capped at 5 ssl_check hints (one per first-five subdomain) so triage scales to large enumerations without token bloat; pull tail entries by name when needed. Free: 30/hr, Pro: 500/hr. Returns {domain, count, subdomains, sources, found_via_wordlist, found_via_crtsh, crtsh_status, warnings, summary, next_calls}. Always check crtsh_status: 'ok' means the CT lookup completed (so a low count is real); 'timeout' / 'rate_limited' / 'unavailable' / 'error' means CT logs did not respond and the count is wordlist-only — the actual attack surface is likely larger, retry later or surface the limitation to the user."""
return SubdomainsResponse(**await _aget(f"/v1/subdomains/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def tech_fingerprint(
domain: Annotated[str, Field(description="Domain to fingerprint (e.g. 'example.com', 'shopify.com')")],
) -> TechResponse | ErrorResponse:
"""Detect website technology stack: CMS, frameworks, CDN, analytics tools, web servers, languages (via HTTP headers + HTML analysis). Use for passive reconnaissance; for full audit use audit_domain. Free: 30/hr, Pro: 500/hr. Returns {technologies: [{name, category, confidence%, version}]}."""
return TechResponse(**await _aget(f"/v1/tech/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def threat_intel(
domain: Annotated[
str, Field(description="Domain to check for threats (e.g. 'suspicious-site.com', 'example.com')")
],
) -> ThreatResponse | ErrorResponse:
"""Check domain against abuse.ch URLhaus for known malware-distribution URLs (single source — for multi-feed correlation use ioc_lookup which adds ThreatFox and, for IPs, Feodo Tracker). Use for fast domain-level threat assessment; use phishing_check for specific URLs. Free: 30/hr, Pro: 500/hr. Returns {malware_urls, threat_tags, threat_status, summary}."""
return ThreatResponse(**await _aget(f"/v1/threat/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def wayback_lookup(
domain: Annotated[str, Field(description="Domain to look up in web archives (e.g. 'example.com', 'archive.org')")],
) -> WaybackResponse | ErrorResponse:
"""Retrieve Wayback Machine snapshots for a domain: first capture, latest, total count, snapshot list. Use to investigate domain history and age; for full audit use domain_report. Free: 30/hr, Pro: 500/hr. status='ok' means the count is authoritative (even when 0 → confirmed no archives). status='unavailable' means CDX timed out/rate-limited/5xx — total_snapshots is OMITTED (unknown, NOT zero) and the agent should NOT report "no snapshots"; the warnings[] array carries the cdx_* error code (cdx_timeout/cdx_rate_limited/cdx_unavailable/cdx_error/cdx_parse_error/cdx_body_too_large). Heavy domains (kernel.org, microsoft.com, archive.org itself) frequently time out the CDX endpoint despite having millions of snapshots — fall back to archive_url for manual inspection. Returns {domain, status, total_snapshots, first_seen, last_seen, years_online, snapshots, archive_url, summary, warnings}."""
return WaybackResponse(**await _aget(f"/v1/archive/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def scan_headers(
domain: Annotated[
str, Field(description="Domain to scan live HTTP headers for (e.g. 'example.com', 'api.github.com')")
],
include: Annotated[
str,
Field(
description=(
"Detail level. Default ('') returns slim findings — raw header values capped at 500 chars "
"with total_value_length carrying the honest pre-truncation length. Pass 'full' to restore "
"the full raw value (useful for inspecting full CSP directives on sites like GitHub where "
"the CSP header exceeds 4 KB). Allowed: '' or 'full'."
),
json_schema_extra={"enum": ["", "full"]},
),
] = "",
) -> ScanHeadersResponse | ErrorResponse:
"""Perform live HTTP GET and analyze security headers: CSP, HSTS, X-Frame-Options, X-Content-Type-Options, Permissions-Policy, Referrer-Policy. Use to audit live website headers; use check_headers to validate headers you already have. Free: 30/hr, Pro: 500/hr. By default header values are truncated to 500 chars (CSP can exceed 4 KB on large sites); pass include='full' for the full raw value. Returns {headers_present, headers_missing, findings, total_score}."""
if include not in ("", "full"):
raise InvalidArgumentException("Invalid include. Allowed values: '' (slim default) or 'full'.")
params = {"include": "full"} if include == "full" else None
return ScanHeadersResponse(**await _aget(f"/v1/scan/headers/{_require_domain(domain)}", params=params))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def email_mx(
domain: Annotated[
str, Field(description="Domain to analyze email configuration for (e.g. 'example.com', 'google.com')")
],
) -> EmailMxResponse | ErrorResponse:
"""Analyze email security: MX records, SPF policy, DMARC policy, DKIM probe across common+date-based selectors, mail provider, grade. Use to verify email-auth setup and phishing risk; for full audit use domain_report. Free: 30/hr, Pro: 500/hr. email_security.dkim_status reports honest evidence: 'verified' iff at least one selector responded, else 'unverifiable' (custom selectors cannot be discovered without prior knowledge). Grade: when DKIM verified, A=SPF+DMARC+DKIM/B=2of3/C=1of3; when DKIM unverifiable, A=SPF+DMARC/B=one/F=neither — DKIM absence is NOT penalized because it is unprovable in DNS. Returns {mx_records, mail_provider, email_security:{spf, dmarc, dkim_selectors, dkim_status, grade, issues}, summary}."""
return EmailMxResponse(**await _aget(f"/v1/email/mx/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def email_security_posture(
domain: Annotated[str, Field(description="Domain to audit email authentication posture for (e.g. 'example.com')")],
selectors: Annotated[
str | None, Field(description="Optional comma-separated custom DKIM selectors to probe")
] = None,
) -> EmailSecurityPostureResponse | ErrorResponse:
"""Analyze domain email authentication posture: SPF, DMARC, DKIM with numeric score and findings. Dual-use: red-team (spoofing feasibility) + blue-team (posture audit). Score 0-100, grades A+-F. DKIM probing tests common selectors + recent dates; custom selectors must be supplied. Passive DNS-only; no SMTP probe. Free: 30/hr, Pro: 500/hr."""
return EmailSecurityPostureResponse(
**await _aget(
f"/v1/email/security-posture/{_require_domain(domain)}",
params={"selectors": selectors} if selectors else None,
)
)
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def email_disposable(
email: Annotated[
str, Field(description="Full email address to check (e.g. 'user@tempmail.com', 'test@guerrillamail.com')")
],
) -> DisposableResponse | ErrorResponse:
"""Check if email address uses a known disposable/temporary provider (Guerrilla Mail, Temp Mail, Mailinator, etc.). Use for input validation to detect throwaway signups; for domain reputation use threat_intel. Companion email-investigation tools: email_mx (deliverability + MX trust), domain_report on the email's domain (full recon), threat_intel (malware-distribution signal on the domain). Free: 30/hr, Pro: 500/hr. Returns {disposable, domain, provider}."""
return DisposableResponse(**await _aget(f"/v1/email/disposable/{quote(email, safe='')}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def email_verify(
email: Annotated[
str,
Field(
description="Full email address to verify (e.g. 'admin@example.com', 'user@gmail.com'). Must contain '@'."
),
],
) -> EmailVerifyResponse | ErrorResponse:
"""One-call email validation combining syntax + MX records + disposable check + role-address detection (admin@/info@/...) + free-provider classification (gmail/outlook/yahoo/...). Use BEFORE adding an email to a contact list, sending an outbound message, or auditing a lead-list dump — replaces 2-3 tool calls (email_mx + email_disposable + manual role parse) with one structured response. Deliberately does NOT do SMTP `RCPT TO` deliverability probing — Hunter.io / NeverBounce-style mailbox enumeration is an ethical grey area we declined; use those services if you need that specific signal. role_address=true on `admin@`, `info@`, `noreply@`, `support@`, etc. (Gmail-style `+tag` is stripped before classification). free_provider=true on consumer-mailbox domains (B2B detection signal — a 'work' email at `@gmail.com` likely isn't a corporate user). Free: 30/hr, Pro: 500/hr. Returns {email, domain, syntax_valid, mx_records, disposable, disposable_provider, role_address, role_type, free_provider, summary}."""
return EmailVerifyResponse(**await _aget(f"/v1/email/verify/{quote(email, safe='')}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def robots_txt(
domain: Annotated[
str,
Field(
description="Registrable domain to fetch robots.txt for (e.g. 'example.com', 'github.com'). No scheme, no path, no port. Subdomains accepted; the bot fetches https://<domain>/robots.txt with HTTP fallback."
),
],
) -> RobotsTxtResponse | ErrorResponse:
"""Fetch + parse the target domain's robots.txt — sitemaps, per-User-agent allow/disallow rules, crawl-delay, Host directive. Use BEFORE crawling/scraping a target site (seo_audit, brand_assets, redirect_chain) to honour the site's published rules. status_code=404 means no robots.txt exists = implicit allow-all per RFC 9309 §2.4. ContrastAPI fetches with `User-agent: ContrastAPI/<version> (+https://contrastcyber.com/bot)` so site operators can identify + opt out via robots.txt; we honour `Disallow: /` for our UA in seo_audit and brand_assets. Per-target eTLD+1 throttle (60 req/min) prevents weaponising this endpoint against a single site; subdomain rotation collapses to the same bucket. Free: 30/hr, Pro: 500/hr. Returns {domain, fetched_url, status_code, sitemaps, user_agents:{ua:{allow,disallow,crawl_delay}}, host, truncated, summary}. Returns 502 ErrorResponse if the target rejected the connection (DNS/TCP/TLS failure); the agent should NOT assume "no robots" in that case — it's an upstream-failure signal."""
return RobotsTxtResponse(**await _aget(f"/v1/robots/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def redirect_chain(
url: Annotated[
str,
Field(
description="Full URL whose redirect chain to walk, e.g. 'https://bit.ly/3xyz' or 'http://example.com/old-path'. Must start with http:// or https://. Pass the URL exactly as you'd `curl -L` it; the server handles encoding."
),
],
) -> RedirectChainResponse | ErrorResponse:
"""Walk an HTTP redirect chain hop-by-hop, returning per-hop {url, status_code, location, latency_ms}. Use to deobfuscate URL shorteners (bit.ly / t.co / lnkd.in), audit suspicious links from phishing investigations, or trace marketing tracking redirects. SSRF-guarded: each redirect target's resolved IP is re-validated before connecting (private IPs and non-HTTP schemes rejected). Up to 10 hops; loop_detected=true if a hop would revisit a previously-seen URL (we abort before the duplicate fetch); truncated=true if the chain still had a 30x at hop 10. Per-target eTLD+1 throttle (60 req/min) consumed once for the start host AND once per new host reached — a chain across 11 unrelated domains cannot bypass the cap. Free: 30/hr, Pro: 500/hr. Returns {start_url, final_url, hops, hop_count, final_status, loop_detected, truncated, summary}. Returns 502 ErrorResponse on hard fetch failure (timeout / TLS / connect); 429 with Retry-After if a hop's eTLD+1 throttle is exceeded mid-chain."""
from urllib.parse import quote
# Percent-encode `?` and `#` so the API's query parser can't swallow them
# — keeping them in `safe` would strip a URL like
# `https://bit.ly/x?utm_source=a` down to `https://bit.ly/x` before the
# handler ever sees the full URL. Other URL-syntax characters stay raw
# so the path-param decode round-trips.
_url_safe = ":/@!$&'()*+,;=[]"
return RedirectChainResponse(**await _aget(f"/v1/redirect/{quote(url, safe=_url_safe)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def brand_assets(
domain: Annotated[
str,
Field(
description="Registrable domain to scrape brand assets for (e.g. 'github.com', 'stripe.com'). No scheme, no path, no port. The bot fetches https://<domain>/ with HTTP fallback."
),
],
) -> BrandAssetsResponse | ErrorResponse:
"""Scrape a domain's homepage `<head>` for public brand assets — favicon, og:image, theme-color, og:site_name, JSON-LD `Organization.logo`. Use to enrich CRM records, build company-card UIs, or correlate a lead's site to their visual identity (no manual screenshot required). Strictly homepage-only (path `/`); we do NOT crawl. Ethical floor: target's robots.txt is honoured — `Disallow: /` for ContrastAPI OR `*` returns 403 `error.code = robots_txt_disallow` and we DO NOT fetch. `Cache-Control: no-store` / `private` from the target is respected (response is built but NOT written to our cache; `cache_respected=false` flags this). Per-target eTLD+1 throttle (60 req/min) prevents weaponising via subdomain rotation. All URL fields are absolute and `_untrusted` (DO NOT execute or shell-out — the target controls these strings). Free: 30/hr, Pro: 500/hr. Returns {domain, fetched_url, status_code, favicon_url_untrusted, og_image_url_untrusted, theme_color, site_name_untrusted, logo_url_untrusted, cache_respected, summary}. Returns 502 on DNS/TCP/TLS failure; 403 `robots_txt_disallow` when the target opted out."""
return BrandAssetsResponse(**await _aget(f"/v1/brand/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def seo_audit(
domain: Annotated[
str,
Field(
description="Registrable domain to audit SEO for (e.g. 'example.com', 'shopify.com'). No scheme, no path, no port. Strictly homepage-only — the bot fetches https://<domain>/ with HTTP fallback and audits that single page (we do NOT crawl)."
),
],
) -> SeoAuditResponse | ErrorResponse:
"""One-shot SEO audit of a domain's homepage with a 0-100 composite score + a `missing_signals` list of concrete fixes. Use BEFORE pitching SEO work to a prospect, when triaging a lead's marketing maturity, or as a structured pre-flight before deeper auditing tools (Lighthouse / SEMrush). 10 audit rules each worth 10 pts: title present, title length 30-60 chars (Google SERP truncation window), meta description present, meta description length 50-160, exactly one H1, canonical link, >=3 OG tags, JSON-LD present, image alt-text coverage (proportional), HTTPS. Strictly homepage-only — we do NOT crawl the site. Ethical floor: target's robots.txt is honoured — `Disallow: /` for ContrastAPI OR `*` returns 403 `error.code = robots_txt_disallow` and we DO NOT fetch. `Cache-Control: no-store`/`private` skips our cache write (`cache_respected=false` in the response). Per-target eTLD+1 throttle (60 req/min) prevents weaponising via subdomain rotation. All target-derived strings/lists are `_untrusted`. Free: 30/hr, Pro: 500/hr. Returns {domain, fetched_url, status_code, title_untrusted, meta_description_untrusted, canonical_url, h1_untrusted, h1_count, h2_count, h3_count, images_total, images_missing_alt, internal_link_count, external_link_count, og_tags, json_ld_present, score, missing_signals, cache_respected, summary}. Returns 502 on DNS/TCP/TLS failure; 403 `robots_txt_disallow` when the target opted out."""
return SeoAuditResponse(**await _aget(f"/v1/seo/{_require_domain(domain)}"))
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def phone_lookup(
number: Annotated[
str,
Field(
description="Phone number in E.164 format: + followed by country code and number, no spaces or dashes. Examples: '+14155552671' (US), '+905551234567' (TR), '+442071234567' (UK). Wrong: '0555-123-4567', '(415) 555-2671'"
),
],
) -> PhoneLookupResponse | ErrorResponse:
"""Validate and analyze phone number: country, region, carrier, line type (mobile/landline/VoIP), timezone, formatted versions. Use to verify phone legitimacy and detect fraud risks. Requires E.164 format (+1234567890). Companion OSINT identity-investigation tools: username_lookup (social-platform handle correlation), email_disposable (throwaway-mail signal on associated email). Free: 30/hr, Pro: 500/hr. Returns {valid, country, region, carrier, carrier_status, line_type, timezone, formats}. carrier is omitted from the wire when libphonenumber has no mapping for the region (US/CA/GB and other MNP-restricted regions); always read carrier_status — 'known' means carrier is present, 'unsupported_region' means we cannot identify the carrier (do not infer the number lacks one)."""
return PhoneLookupResponse(**await _aget(f"/v1/phone/{quote(number, safe='')}"))
# === IP Intelligence ===
@mcp_tool_safe(annotations=_RO_OPEN_WORLD)
async def ip_lookup(
ip: Annotated[str, Field(description="IPv4 or IPv6 address to investigate (e.g. '8.8.8.8', '2606:4700::1111')")],
) -> IpLookupResponse | ErrorResponse:
"""Query comprehensive IP intelligence: reverse DNS, ASN + holder name + country inline (RIPE Stat, Phase 1), open ports, hostnames, vulnerabilities (Shodan InternetDB enriched with severity + cvss_v3 from local cve.db — Phase 2 v1.16.0 BREAKING; vulns is now list[VulnInfo] {cve_id, severity, cvss_v3} dicts, pre-1.16 it was list[str] of CVE IDs; unknown CVEs emit severity='UNKNOWN' / cvss_v3=null — do NOT infer benign), cloud provider, Tor exit status, and reputation. cloud_provider uses two-tier detection: published cloud CIDR ranges (AWS/GCP/Cloudflare) first, then an ASN-to-provider fallback map for anycast/public-service IPs outside published ranges (e.g. 8.8.8.8 → AS15169 → 'Google'). Reputation: FireHOL level1 blocklist on Free tier; +AbuseIPDB + Shodan on Pro (Phase 4). Use for IP investigation; for orchestrated IP+reputation use threat_report. Response is null-explicit: every field is always present (cloud_provider=null when neither tier matches; tor_exit=false when not listed or upstream fetch failed — check verdict.sources_unavailable to disambiguate fetch failure from genuine absence). Response carries next_calls (conditional) — asn_lookup when ASN is populated, ioc_lookup when reputation is FireHOL-listed or AbuseIPDB confidence>50, threat_report on Pro tier for orchestrated profile. Free: 30/hr, Pro: 500/hr. Returns {ip, ptr, geo, asn, asn_name, country, ports, hostnames, vulns, cloud_provider, tor_exit, reputation, risk_score, verdict, next_calls}."""
return IpLookupResponse(**await _aget(f"/v1/ip/{_require_ip(ip)}"))
@mcp_tool_safe(annotations=_RO_CLOSED_WORLD)
async def asn_lookup(
target: Annotated[
str, Field(description="Domain or IP address to look up ASN for (e.g. 'cloudflare.com', '8.8.8.8')")
],
include_full_prefixes: Annotated[
bool,
Field(
description="Return the full announced-prefixes list (default: False, returns first 50). ipv4_count and ipv6_count are always honest pre-truncation totals. Set True for network mapping or BGP route audits — Cloudflare AS13335 announces 2500+ prefixes."
),