diff --git a/README.md b/README.md index d401c378..964c09db 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,14 @@ For further Information see the [license file](https://misp.github.io/misp-modul * [Real-time Blackhost Lists Lookup](https://misp.github.io/misp-modules/expansion/#real-time-blackhost-lists-lookup) - Module to check an IPv4 address against known RBLs. * [Recorded Future Enrich](https://misp.github.io/misp-modules/expansion/#recorded-future-enrich) - Module to enrich attributes with threat intelligence from Recorded Future. * [Reverse DNS](https://misp.github.io/misp-modules/expansion/#reverse-dns) - Simple Reverse DNS expansion service to resolve reverse DNS from MISP attributes. +* [RST Cloud Cobalt Strike Beacon](https://misp.github.io/misp-modules/expansion/#rst-cloud-cobalt-strike-beacon) - Scan a target for Cobalt Strike beacon configurations via RST Scan API. +* [RST Cloud Favicon](https://misp.github.io/misp-modules/expansion/#rst-cloud-favicon) - Fetch favicon image and hashes for Shodan/Netlas/Censys/FOFA pivoting via RST Scan API. +* [RST Cloud HTML Fetcher](https://misp.github.io/misp-modules/expansion/#rst-cloud-html-fetcher) - Fetch rendered HTML body or extracted JavaScript via RST Scan API. +* [RST Cloud IoC Lookup](https://misp.github.io/misp-modules/expansion/#rst-cloud-ioc-lookup) - Enrich indicators with RST Cloud threat intelligence. +* [RST Cloud Noise Control](https://misp.github.io/misp-modules/expansion/#rst-cloud-noise-control) - Check whether an indicator is known-good or noisy via RST Noise Control. +* [RST Cloud Screenshot](https://misp.github.io/misp-modules/expansion/#rst-cloud-screenshot) - Capture a page screenshot via RST Scan API. +* [RST Cloud SSL Certificate](https://misp.github.io/misp-modules/expansion/#rst-cloud-ssl-certificate) - Fetch TLS certificate as an x509 MISP object via RST Scan API. +* [RST Cloud Whois](https://misp.github.io/misp-modules/expansion/#rst-cloud-whois) - Retrieve parsed WHOIS for a domain via RST Cloud. * [ReversingLabs Spectra Analyze](https://misp.github.io/misp-modules/expansion/#reversinglabs-spectra-analyze) - Threat intelligence enrichment module * [SecurityTrails Lookup](https://misp.github.io/misp-modules/expansion/#securitytrails-lookup) - An expansion modules for SecurityTrails. * [Shodan Lookup](https://misp.github.io/misp-modules/expansion/#shodan-lookup) - Module to query on Shodan. diff --git a/documentation/logos/rstcloud.png b/documentation/logos/rstcloud.png new file mode 100644 index 00000000..af2f9c3b Binary files /dev/null and b/documentation/logos/rstcloud.png differ diff --git a/documentation/mkdocs/expansion.md b/documentation/mkdocs/expansion.md index e7f966f3..7f1d78a8 100644 --- a/documentation/mkdocs/expansion.md +++ b/documentation/mkdocs/expansion.md @@ -2118,6 +2118,260 @@ Module to check an IPv4 address against known RBLs. ----- +#### [RST Cloud Cobalt Strike Beacon](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_cs_beacon.py) + + + +Scan a target IP[:port] for a Cobalt Strike beacon configuration via RST Scan API. +[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_cs_beacon.py)] + +- **features**: +>Probes the target for Cobalt Strike beacon configurations via RST Scan GET /scan/cs-beacon. On a hit, returns file MISP object(s) with pivotable SHA-256 hashes tagged to the Cobalt Strike galaxy. + +- **config**: +> - api_key +> - base_url +> - port +> - timeout + +- **input**: +>IP, URL, domain, or hostname attribute (optional port via config). + +- **output**: +>file MISP object(s) with beacon hashes and Cobalt Strike galaxy tag. + +- **references**: +>https://api.rstcloud.net/ + +- **requirements**: +> - rstapi>=1.2.0 (PyPI) +> - An RST Cloud API key + +----- + +#### [RST Cloud Favicon](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_favicon.py) + + + +Fetch a target's favicon (image + all hashes for Shodan/Netlas/Censys pivoting) via RST Scan API. +[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_favicon.py)] + +- **features**: +>Retrieves the favicon image and cryptographic hashes via RST Scan GET /scan/favicon. Returns a file MISP object with MD5/SHA-1/SHA-256 and a standalone Murmur3 favicon-hash attribute for Shodan/FOFA-style pivoting. + +- **config**: +> - api_key +> - base_url +> - timeout + +- **input**: +>URL, domain, hostname, or IP attribute. + +- **output**: +>file MISP object, favicon-hash attribute, and resolved favicon URL. + +- **references**: +>https://api.rstcloud.net/ + +- **requirements**: +> - rstapi>=1.2.0 (PyPI) +> - An RST Cloud API key + +----- + +#### [RST Cloud HTML Fetcher](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_html.py) + + + +Fetch rendered HTML body or extracted JavaScript for a URL/IP target via RST Scan API. +[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_html.py)] + +- **features**: +>Fetches the rendered HTML body or extracted JavaScript from the target via RST Scan. Returns a file MISP object with the page attached and pivotable content hashes. Configurable mode: body (default) or js. + +- **config**: +> - api_key +> - base_url +> - mode +> - port +> - timeout + +- **input**: +>URL, domain, hostname, or IP attribute (optional port via config). + +- **output**: +>file MISP object (page.html or page.js) with hashes and HTTP metadata. + +- **references**: +>https://api.rstcloud.net/ + +- **requirements**: +> - rstapi>=1.2.0 (PyPI) +> - An RST Cloud API key + +----- + +#### [RST Cloud IoC Lookup](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_ioc.py) + + + +Enrich indicators with RST Cloud threat intelligence. +[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_ioc.py)] + +- **features**: +>Queries RST Cloud GET /ioc for threat scores, attribution, geo/ASN, DNS, WHOIS, TTPs, CVEs, and related indicators. Returns a structured rst-ioc MISP object with galaxy tags and optional pivotable related hashes/IPs. When misp_url and misp_key are configured, also writes score/threat tags onto the enriched attribute via the MISP API. + +- **config**: +> - api_key +> - base_url +> - misp_url +> - misp_key +> - misp_verifycert + +- **input**: +>IP, domain, hostname, URL, or hash attribute (incl. host|port composites). + +- **output**: +>rst-ioc MISP object, galaxy/score tags, and optional related attributes. + +- **references**: +>https://api.rstcloud.net/ +>https://github.com/MISP/misp-objects/pull/526 + +- **requirements**: +> - rstapi>=1.2.0 (PyPI) +> - An RST Cloud API key +> - rst-ioc object template installed on MISP ([misp-objects #526](https://github.com/MISP/misp-objects/pull/526)) + +----- + +#### [RST Cloud Noise Control](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_noise_control.py) + + + +Check whether a value is known-good / noise via RST Noise Control. +[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_noise_control.py)] + +- **features**: +>Queries RST Cloud GET /benign/lookup for benign/noisy verdicts. Returns an rst-noise MISP object with false-positive risk tags. When misp_url and misp_key are configured, also annotates the source attribute in place (tags, comment, to_ids, false-positive sightings). + +- **config**: +> - api_key +> - base_url +> - misp_url +> - misp_key +> - misp_verifycert + +- **input**: +>IP, domain, hostname, URL, or hash attribute (incl. host|port composites). + +- **output**: +>rst-noise MISP object with verdict, category, and risk/noise tags. + +- **references**: +>https://api.rstcloud.net/ +>https://github.com/MISP/misp-taxonomies/pull/335 + +- **requirements**: +> - rstapi>=1.2.0 (PyPI) +> - An RST Cloud API key +> - rst-noise object template on MISP ([misp-objects #526](https://github.com/MISP/misp-objects/pull/526)) +> - rstcloud taxonomy on MISP ([misp-taxonomies #335](https://github.com/MISP/misp-taxonomies/pull/335)) + +----- + +#### [RST Cloud Screenshot](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_screenshot.py) + + + +Capture a page screenshot of a URL/IP target via RST Scan API. +[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_screenshot.py)] + +- **features**: +>Renders the target page and returns a PNG screenshot as an image MISP object (inline in MISP). Configurable frame: first, full (default), or last. + +- **config**: +> - api_key +> - base_url +> - frame +> - port +> - timeout + +- **input**: +>URL, domain, hostname, or IP attribute (optional port via config). + +- **output**: +>image MISP object with PNG attachment linked to the enriched attribute. + +- **references**: +>https://api.rstcloud.net/ + +- **requirements**: +> - rstapi>=1.2.0 (PyPI) +> - An RST Cloud API key + +----- + +#### [RST Cloud SSL Certificate](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_ssl.py) + + + +Fetch the SSL certificate for an IP[:port] as an x509 object via RST Scan API. +[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_ssl.py)] + +- **features**: +>Connects to the target service and retrieves the TLS certificate via RST Scan GET /scan/ssl/certificate. Returns an x509 MISP object with pivotable fingerprints (SHA-1/256/MD5), subject, issuer, and validity dates. + +- **config**: +> - api_key +> - base_url +> - port +> - timeout + +- **input**: +>IP, hostname, or domain attribute (optional port via config or composite). + +- **output**: +>x509 MISP object referencing the enriched attribute. + +- **references**: +>https://api.rstcloud.net/ + +- **requirements**: +> - rstapi>=1.2.0 (PyPI) +> - An RST Cloud API key + +----- + +#### [RST Cloud Whois](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_whois.py) + + + +Retrieve parsed WHOIS information for a domain via RST Cloud. +[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_whois.py)] + +- **features**: +>Queries RST Cloud GET /whois for parsed domain registration data. Returns a standard whois MISP object (registrar, registrant, dates, nameservers) linked back to the enriched attribute. + +- **config**: +> - api_key +> - base_url + +- **input**: +>Domain or hostname attribute. + +- **output**: +>whois MISP object with registration and nameserver fields. + +- **references**: +>https://api.rstcloud.net/ + +- **requirements**: +> - rstapi>=1.2.0 (PyPI) +> - An RST Cloud API key + +----- + #### [Recorded Future Enrich](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/recordedfuture.py) diff --git a/documentation/mkdocs/index.md b/documentation/mkdocs/index.md index e0f5e888..27549435 100644 --- a/documentation/mkdocs/index.md +++ b/documentation/mkdocs/index.md @@ -91,6 +91,14 @@ For more information: [Extending MISP with Python modules](https://www.misp-proj * [RandomcoinDB Lookup](https://misp.github.io/misp-modules/expansion/#randomcoindb-lookup) - Module to access the ransomcoinDB (see https://ransomcoindb.concinnity-risks.com) * [r7_akb](https://misp.github.io/misp-modules/expansion/#r7_akb) - Enrich CVEs via AttackerKB and return structured MISP events. Handles rate limits, regex CVE detection, and markdown cleanup. * [Real-time Blackhost Lists Lookup](https://misp.github.io/misp-modules/expansion/#real-time-blackhost-lists-lookup) - Module to check an IPv4 address against known RBLs. +* [RST Cloud Cobalt Strike Beacon](https://misp.github.io/misp-modules/expansion/#rst-cloud-cobalt-strike-beacon) - Scan a target for Cobalt Strike beacon configurations via RST Scan API. +* [RST Cloud Favicon](https://misp.github.io/misp-modules/expansion/#rst-cloud-favicon) - Fetch favicon image and hashes for Shodan/Netlas/Censys/FOFA pivoting via RST Scan API. +* [RST Cloud HTML Fetcher](https://misp.github.io/misp-modules/expansion/#rst-cloud-html-fetcher) - Fetch rendered HTML body or extracted JavaScript via RST Scan API. +* [RST Cloud IoC Lookup](https://misp.github.io/misp-modules/expansion/#rst-cloud-ioc-lookup) - Enrich indicators with RST Cloud threat intelligence. +* [RST Cloud Noise Control](https://misp.github.io/misp-modules/expansion/#rst-cloud-noise-control) - Check whether an indicator is known-good or noisy via RST Noise Control. +* [RST Cloud Screenshot](https://misp.github.io/misp-modules/expansion/#rst-cloud-screenshot) - Capture a page screenshot via RST Scan API. +* [RST Cloud SSL Certificate](https://misp.github.io/misp-modules/expansion/#rst-cloud-ssl-certificate) - Fetch TLS certificate as an x509 MISP object via RST Scan API. +* [RST Cloud Whois](https://misp.github.io/misp-modules/expansion/#rst-cloud-whois) - Retrieve parsed WHOIS for a domain via RST Cloud. * [Recorded Future Enrich](https://misp.github.io/misp-modules/expansion/#recorded-future-enrich) - Module to enrich attributes with threat intelligence from Recorded Future. * [ReversingLabs Enrichment](https://misp.github.io/misp-modules/expansion/#reversinglabs-enrichment) - Module to enrich file hashes, domains, IPs and URLs with ReversingLabs Spectra Analyze threat intelligence. * [Reverse DNS](https://misp.github.io/misp-modules/expansion/#reverse-dns) - Simple Reverse DNS expansion service to resolve reverse DNS from MISP attributes. diff --git a/misp_modules/modules/expansion/_rstcloud/__init__.py b/misp_modules/modules/expansion/_rstcloud/__init__.py new file mode 100644 index 00000000..7b6d8a06 --- /dev/null +++ b/misp_modules/modules/expansion/_rstcloud/__init__.py @@ -0,0 +1,19 @@ +"""Shared RST Cloud helpers for expansion modules (not registered).""" + +from .client import ( # noqa: F401 + apply_to_source_attribute, + error, + host_only, + misp_event_with_source, + new_enrichment_object, + rst_kwargs, + rst_resolver_from_config, + scan_group, + scan_kwargs, + scan_target, + standard_results, + text_result, + threat_tags, + unwrap, + value_from_request, +) diff --git a/misp_modules/modules/expansion/_rstcloud/client.py b/misp_modules/modules/expansion/_rstcloud/client.py new file mode 100644 index 00000000..0884a801 --- /dev/null +++ b/misp_modules/modules/expansion/_rstcloud/client.py @@ -0,0 +1,528 @@ +"""Shared helpers for the RST Cloud expansion modules. + +The modules ride the official ``rstapi`` library (PyPI) for transport, so a +misp-modules deployment just needs ``pip install rstapi``. These helpers cover +config parsing, error unwrapping, and the misp-modules result shape. +""" + +from __future__ import annotations + +import json +import re + +DEFAULT_BASE_URL = "https://api.rstcloud.net/v1" + + +def api_key_from_config(config: dict | None) -> str: + """misp-modules passes user config as a dict; accept common key names.""" + config = config or {} + return ( + config.get("api_key") + or config.get("apikey") + or config.get("rst_api_key") + or "" + ) + + +def base_url_from_config(config: dict | None) -> str: + return (config or {}).get("base_url") or DEFAULT_BASE_URL + + +def rst_kwargs(config: dict | None) -> dict: + """Constructor kwargs shared by every rstapi client.""" + return { + "APIKEY": api_key_from_config(config), + "APIURL": base_url_from_config(config), + } + + +def scan_kwargs(config: dict | None) -> dict: + """Constructor kwargs for rstapi.scan with optional read timeout. + + Extends ``rst_kwargs``. Scan endpoints (ssl/html/favicon/screenshot/ + cs-beacon) are synchronous: the RST Cloud server connects to the target + during your request, so they can take much longer than a database lookup. + The default rstapi READ timeout is 20 s, which is sometimes not enough. + Set ``timeout`` in the module config (seconds, default 60) to override it. + """ + kw = rst_kwargs(config) + try: + kw["READ"] = max(1, int((config or {}).get("timeout") or 60)) + except (ValueError, TypeError): + kw["READ"] = 60 + return kw + + +def value_from_request(request: dict, keys) -> str | None: + """Pull the indicator from a misp-modules request (attribute or typed). + + Handles all three shapes MISP sends: a full ``attribute`` object, a typed + top-level key (incl. composites like ``ip-dst|port``), and object-level + enrichment where the value lives in ``object["Attribute"]``. + """ + if request.get("attribute"): + return request["attribute"].get("value") + for key in keys: + if request.get(key): + return request[key] + obj = request.get("object") + if isinstance(obj, dict): + wanted = set(keys) + for a in obj.get("Attribute") or []: + if a.get("type") in wanted and a.get("value"): + return a["value"] + return None + + +def host_only(value): + """Strip a MISP composite ``|port`` suffix; return bare host/indicator. + + Used by the lookup modules (ioc / noise-control / whois) where the API keys + on the value itself and the port is irrelevant. + """ + if not value: + return value + return str(value).split("|", 1)[0].strip() + + +_PORT_SUFFIX = re.compile(r":\d{1,5}$") +_PORT_RELATIONS = ("dst-port", "src-port", "port") + + +def _join_host_port(host: str, port) -> str: + """host:port, bracketing IPv6 literals so the colons aren't ambiguous.""" + if host.count(":") >= 2 and not host.startswith("["): + return f"[{host}]:{port}" + return f"{host}:{port}" + + +def _has_explicit_port(host: str) -> bool: + # "1.2.3.4:443" / "host:443" — but not a bare IPv4 or IPv6 literal. + return bool(_PORT_SUFFIX.search(host)) and host.count(":") == 1 + + +def _sibling_port(request) -> str | None: + """Port taken from a sibling attribute when MISP passes the whole object. + + An ``ip-port`` object stores the port as its own attribute + (object_relation ``dst-port`` / ``src-port`` / ``port``); when MISP + includes ``object`` in the request, pick it up so the user doesn't have + to set one. + """ + obj = request.get("object") + if not isinstance(obj, dict): + return None + for a in obj.get("Attribute") or []: + rel = (a.get("object_relation") or "").lower() + typ = (a.get("type") or "").lower() + if (rel in _PORT_RELATIONS or typ == "port") and a.get("value"): + return str(a["value"]).strip() + return None + + +def scan_target( + request, + inputs, + config, + *, + as_url=False, + default_port=None, + default_scheme="https", +): + """Build a Scan-API target from a MISP attribute with optional port. + + IP/host attributes carry no port, but the Scan API addresses a *service*: + ``host:port`` for ssl / cs-beacon / favicon, or a URL for html / + screenshot. Port resolution, most specific first: + + 1. an explicit port already in the value (``1.2.3.4:8443`` or a URL), + 2. a MISP ``host|port`` composite (e.g. an ``ip-dst|port`` attribute), + 3. a sibling port attribute in the same MISP object (``ip-port``), + 4. the optional ``port`` set in the module config, + 5. ``default_port`` (module-specific fallback, may be ``None``). + + For URL endpoints (``as_url=True``) a bare host becomes + ``://host[:port]`` where scheme is the config ``scheme`` or + ``default_scheme``. Returns ``None`` when no value is present. + """ + raw = value_from_request(request, inputs) + if not raw: + return None + raw = str(raw).strip() + cfg = config or {} + + if raw.startswith(("http://", "https://")): + return raw # already a full URL — it encodes its own port + + # Port, most specific source first. + port = None + if "|" in raw: # MISP composite "host|port" + host, _, p = raw.partition("|") + raw, port = host.strip(), (p.strip() or None) + port = port or _sibling_port(request) or (cfg.get("port") or None) + + has_port = _has_explicit_port(raw) # value was already "host:port" + if as_url: + scheme = (cfg.get("scheme") or default_scheme).strip().lower() + host = raw if has_port or not port else _join_host_port(raw, port) + return f"{scheme}://{host}" + + # host:port endpoints (ssl / cs-beacon / favicon) + if has_port: + return raw + p = port or default_port + return _join_host_port(raw, p) if p else raw + + +def unwrap(resp): + """Return (data, None) or (None, error_message) for an rstapi response.""" + if isinstance(resp, dict) and resp.get("status") == "error": + return None, str(resp.get("message", "RST Cloud API error")) + return resp, None + + +def error(message: str) -> dict: + return {"error": message} + + +# Threat-suffix -> (built-in MISP galaxy predicate, RST galaxy stix_type). +# Kept in sync with rstmisp.misp.tagging; duplicated here so the modules +# stay droppable into misp-modules standalone. The 2nd element selects which +# RST custom galaxy (rst-) a name belongs to when resolving the +# real cluster tag. +_THREAT_SUFFIX = { + "_group": ("misp-galaxy:threat-actor", "intrusion-set"), + "_actor": ("misp-galaxy:threat-actor", "intrusion-set"), + "_tool": ("misp-galaxy:tool", "tool"), + "_stealer": ("misp-galaxy:stealer", "malware"), + "_backdoor": ("misp-galaxy:backdoor", "malware"), + "_ransomware": ("misp-galaxy:ransomware", "malware"), + "_miner": ("misp-galaxy:cryptominers", "malware"), + "_exploit": ("misp-galaxy:exploit-kit", "malware"), + "_botnet": ("misp-galaxy:botnet", "malware"), + "_rat": ("misp-galaxy:rat", "malware"), + "_campaign": ("misp-galaxy:campaign", "campaign"), +} +# Names with no recognised suffix are malware families. +_THREAT_DEFAULT = ("misp-galaxy:malware", "malware") + +# RST custom galaxy types in MISP (namespace rstcloud); stix_type = type[4:]. +_RST_GALAXY_TYPES = ( + "rst-malware", + "rst-tool", + "rst-intrusion-set", + "rst-campaign", +) + +# Per-process caches: a misp-modules worker is long-lived, so reuse the PyMISP +# client + resolved galaxy ids across calls instead of reconnecting on every +# hover. +_RESOLVER_CACHE: dict = {} + + +def _truthy(v) -> bool: + if isinstance(v, bool): + return v + return str(v).strip().lower() in ("1", "true", "yes", "on") + + +class _RstClusterResolver: + """Resolve an RST threat ``(stix_type, name)`` to its MISP cluster's real + ``tag_name`` (``misp-galaxy:rst-*=""``), so an enrichment tag + attaches the RST Threat Library galaxy — the same node the library/ + reports/ feed connectors use. MISP stores a CUSTOM cluster's tag keyed on + the UUID, not the name, so the value-form ``rstcloud:rst-*="name"`` + would not link. + + Targeted ``search_galaxy_clusters`` per name (a handful per enrichment + call), not a full galaxy pull; per-name results are memoised on the + instance. + """ + + def __init__(self, misp, galaxy_ids: dict): + self._misp = misp + self._ids = galaxy_ids + self._cache: dict = {} + + def __call__(self, stix_type: str, name_lower: str): + gid = self._ids.get("rst-" + stix_type) + if not gid: + return None + key = (stix_type, name_lower) + if key not in self._cache: + self._cache[key] = self._lookup(gid, name_lower) + return self._cache[key] + + def _lookup(self, gid, name_lower): + try: + clusters = self._misp.search_galaxy_clusters( + gid, context="all", searchall=name_lower, pythonify=False + ) + except Exception: + return None + for c in clusters or []: + gc = c.get("GalaxyCluster", c) + tname = gc.get("tag_name") + if not tname: + continue + if (gc.get("value") or "").lower() == name_lower: + return tname + for el in gc.get("GalaxyElement") or []: + if ( + el.get("key") == "synonyms" + and (el.get("value") or "").lower() == name_lower + ): + return tname + return None + + +def rst_resolver_from_config(config: dict | None): + """Build an RST cluster resolver from optional MISP config, or None. + + Needs ``misp_url`` + ``misp_key`` in the module config; without them + (the default standalone deployment) returns None and ``threat_tags`` falls + back to built-in galaxy tags. PyMISP is imported lazily so the modules + still install with just ``rstapi`` when MISP resolution isn't configured. + """ + config = config or {} + url = config.get("misp_url") + key = config.get("misp_key") + if not url or not key: + return None + if url in _RESOLVER_CACHE: + return _RESOLVER_CACHE[url] + try: + from pymisp import PyMISP + except Exception: + return None + try: + misp = PyMISP( + url, key, ssl=_truthy(config.get("misp_verifycert", False)) + ) + ids = {} + for g in misp.galaxies(pythonify=False) or []: + gd = g.get("Galaxy", g) + if gd.get("type") in _RST_GALAXY_TYPES and gd.get("id"): + ids.setdefault(gd["type"], gd["id"]) + except Exception: + return None + resolver = _RstClusterResolver(misp, ids) + _RESOLVER_CACHE[url] = resolver + return resolver + + +def threat_tags(threats, rst_resolver=None) -> list: + """Map RST threat names to MISP galaxy tags (best-effort, suffix-driven). + + When ``rst_resolver`` is supplied (built from MISP config), each name first + resolves to its RST Threat Library cluster's real ``tag_name`` so the tag + attaches that galaxy; on a miss (or when no resolver) it falls back to the + built-in ``misp-galaxy:*`` value-form tag. ``rst_resolver`` is any callable + ``(stix_type, name_lower) -> tag_name | None``. + """ + tags = [] + for threat in threats or []: + if threat.endswith(("_technique", "_vuln")): + continue + predicate, stix_type = _THREAT_DEFAULT + name = threat + for suffix, (pred, st) in _THREAT_SUFFIX.items(): + if threat.endswith(suffix): + n = len(suffix) + predicate, stix_type, name = pred, st, threat[:-n] + break + clean = name.replace("_", " ") + tag = None + if rst_resolver: + try: + tag = rst_resolver(stix_type, clean.lower()) + except Exception: + tag = None + tags.append(tag or f'{predicate}="{clean}"') + return tags + + +def scan_group(request, source): + """uuid scan-result objects should reference so each result stays tied + to exactly what was enriched — without spawning extra container objects. + + 1. the parent object, when MISP includes it in the request (``object``); + 2. otherwise the enriched source attribute itself. + + A screenshot / certificate / fetched body cannot be an *attribute* of a + ``url`` / ``ip-port`` / ``domain-ip`` object — MISP object templates are + fixed and have no such relation — so each is returned as its own object + that references this anchor (``identifies`` / ``screenshot-of`` / …). + Returns the anchor uuid, or ``None`` (typed-key request with no attribute + to point at). + """ + obj = request.get("object") + if isinstance(obj, dict) and obj.get("uuid"): + return obj["uuid"] + return source.uuid if source is not None else None + + +def misp_event_with_source(request): + """Start a ``MISPEvent`` seeded with the triggering attribute. + + Returns ``(event, source_attribute_or_None)``. Enrichment objects/ + attributes added to the event can ``add_reference(source.uuid, ...)`` so + MISP links them to the attribute the analyst enriched. Requires pymisp, + present in a misp-modules deployment (it's a core dependency). + """ + from pymisp import MISPAttribute, MISPEvent + + event = MISPEvent() + source = None + attr = request.get("attribute") + if attr: + source = MISPAttribute() + source.from_dict(**attr) + event.add_attribute(**source) + return event, source + + +def new_enrichment_object(name): + """Build a ``MISPObject`` for an RST enrichment template. + + Returns ``(object, dedicated)``. Uses the ``rst-*`` template from the MISP + object library (install via + [MISP/misp-objects](https://github.com/MISP/misp-objects), e.g. + [PR #526](https://github.com/MISP/misp-objects/pull/526)). Falls back to + a generic ``annotation`` object if the template is not installed yet, so + output stays valid misp_standard on any MISP. + """ + from pymisp import MISPObject + + try: + obj = MISPObject(name) + if getattr(obj, "_known_template", False): + return obj, True + except Exception: + pass + return MISPObject("annotation"), False + + +def standard_results(event) -> dict: + """Serialise a ``MISPEvent`` into the misp_standard result envelope.""" + parsed = json.loads(event.to_json()) + return { + "results": { + k: parsed[k] for k in ("Attribute", "Object") if parsed.get(k) + } + } + + +def text_result(value: str, comment: str = "") -> dict: + """misp_standard fallback when nothing structured to return (one text).""" + attr = {"type": "text", "value": value} + if comment: + attr["comment"] = comment + return {"results": {"Attribute": [attr]}} + + +_PYMISP_CACHE: dict = {} + + +def _pymisp(cfg): + """Cached PyMISP client from module config (misp_url/misp_key), or None. + + Reused across calls (a misp-modules worker is long-lived). Returns None + when creds are absent or PyMISP can't connect, so callers degrade + gracefully. + """ + url, key = cfg.get("misp_url"), cfg.get("misp_key") + if not (url and key): + return None + ck = (url, key, bool(_truthy(cfg.get("misp_verifycert", False)))) + if ck in _PYMISP_CACHE: + return _PYMISP_CACHE[ck] + try: + from pymisp import PyMISP + + client = PyMISP(url, key, ssl=ck[2]) + except Exception: + return None + _PYMISP_CACHE[ck] = client + return client + + +def apply_to_source_attribute( + config, + request, + *, + tags=None, + comment_note=None, + comment_prefix=None, + replace_tag_prefixes=(), + set_to_ids=None, + fp_sightings=0, +): + """Write enrichment back ONTO the enriched attribute via the MISP API. + + MISP enrichment itself can only ADD new attributes/objects — it can't + modify the attribute you ran the module on. So, *only when* + ``misp_url``/``misp_key`` are set in the module config, this updates the + source attribute in place: + + * removes the module's own prior tags (``replace_tag_prefixes``) then + adds ``tags`` — so re-running replaces rather than stacks verdicts; + * appends ``comment_note`` to the existing comment (dropping any + previous note that started with ``comment_prefix``, so re-runs stay + tidy); + * sets ``to_ids`` when ``set_to_ids`` is not None; + * adds ``fp_sightings`` false-positive sightings (type 1) — a benign + signal that feeds MISP's decay/scoring. + + Returns True if it wrote back (caller should then return an empty result + so no duplicate attribute is created); False otherwise (caller returns + normally). + """ + cfg = config or {} + attr = request.get("attribute") or {} + uuid = attr.get("uuid") + misp = _pymisp(cfg) + if not (uuid and misp): + return False + try: + full = misp.get_attribute(uuid, pythonify=True) + except Exception: + return False + try: + changed = False + if comment_note is not None: + existing = ( + getattr(full, "comment", None) or attr.get("comment") or "" + ) + segments = [ + s + for s in existing.split(" | ") + if s and not (comment_prefix and s.startswith(comment_prefix)) + ] + segments.append(comment_note) + full.comment = " | ".join(segments) + changed = True + if set_to_ids is not None: + full.to_ids = bool(set_to_ids) + changed = True + if changed: + misp.update_attribute(full) + if replace_tag_prefixes: + for t in getattr(full, "tags", []) or []: + name = getattr(t, "name", "") or "" + if any(name.startswith(p) for p in replace_tag_prefixes): + misp.untag(uuid, name) + for tag in tags or []: + misp.tag(uuid, tag) + if fp_sightings: + from pymisp import MISPSighting + + for _ in range(int(fp_sightings)): + sighting = MISPSighting() + sighting.from_dict( + type="1", source="RST Noise Control" + ) # 1 = false-positive + misp.add_sighting(sighting, attribute=uuid) + return True + except Exception: + return False diff --git a/misp_modules/modules/expansion/rst_cs_beacon.py b/misp_modules/modules/expansion/rst_cs_beacon.py new file mode 100644 index 00000000..c2e7ccfa --- /dev/null +++ b/misp_modules/modules/expansion/rst_cs_beacon.py @@ -0,0 +1,165 @@ +"""rst_cs_beacon — scan for Cobalt Strike beacon (GET /scan/cs-beacon).""" + +from __future__ import annotations + +import json + +import rstapi + +from ._rstcloud.client import ( + error, + misp_event_with_source, + rst_kwargs, + scan_group, + scan_kwargs, + scan_target, + standard_results, + text_result, + unwrap, +) + +misperrors = {"error": "Error"} + +_INPUTS = [ + "ip-dst", + "ip-src", + "url", + "domain", + "hostname", + "ip-dst|port", + "ip-src|port", + "hostname|port", + "domain|port", +] +# misp_standard: on a hit, return the beacon blob sha256(s) as pivotable +# attributes tagged to the Cobalt Strike galaxy. +mispattributes = {"input": _INPUTS, "format": "misp_standard"} + +moduleinfo = { + "version": "0.2", + "author": "RST Cloud", + "description": ( + "Scan a target IP[:port] for a Cobalt Strike beacon configuration" + " via RST Scan API." + ), + "module-type": ["expansion"], + "name": "RST Cloud Cobalt Strike Beacon", + "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."], + "features": ( + "Probes the target for Cobalt Strike beacon configurations via RST" + " Scan GET /scan/cs-beacon. On a hit, returns file MISP object(s)" + " with pivotable SHA-256 hashes tagged to the Cobalt Strike" + " galaxy." + ), + "references": [ + "https://api.rstcloud.net/", + "https://pypi.org/project/rstapi/", + ], + "input": ( + "IP, URL, domain, or hostname attribute (optional port via config)." + ), + "output": ( + "file MISP object(s) with beacon hashes and Cobalt Strike galaxy tag." + ), +} +# 'port' (optional): port to probe when the attribute carries none +# (default 443). +moduleconfig = ["api_key", "base_url", "port", "timeout"] + +_CS_TAG = 'misp-galaxy:tool="Cobalt Strike"' + + +def _arch(node): + return node if isinstance(node, dict) else {} + + +def _to_int(v): + try: + return int(v) + except (TypeError, ValueError): + return 0 + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + config = request.get("config") + if not rst_kwargs(config)["APIKEY"]: + return error( + "An RST Cloud API key is required (set api_key in the module" + " config)." + ) + target = scan_target(request, _INPUTS, config, default_port=443) + if not target: + return error("No target found in the request.") + + data, err = unwrap(rstapi.scan(**scan_kwargs(config)).GetCsBeacon(target)) + if err: + return error(f"RST CS beacon scan failed: {err}") + if not isinstance(data, dict) or not data: + return text_result( + f"{target}: no Cobalt Strike beacon found", "RST CS Beacon" + ) + + # The scanner ALWAYS returns x86/x64 probe blocks; an actual beacon is only + # present when a block carries a parsed `config` (or a non-zero `size`). An + # empty config / size 0 means "probed, nothing found" — NOT a detection. + blocks = {"x86": _arch(data.get("x86")), "x64": _arch(data.get("x64"))} + hits = { + arch: b + for arch, b in blocks.items() + if b.get("config") or _to_int(b.get("size")) > 0 + } + if not hits: + return text_result( + f"{target}: no Cobalt Strike beacon detected", "RST CS Beacon" + ) + + from pymisp import MISPObject + + event, source = misp_event_with_source(request) + anchor = scan_group(request, source) + + seen = set() + for arch, block in hits.items(): + sha = block.get("sha256") + if not sha or sha in seen: + continue + seen.add(sha) + # The beacon payload is a file; group its hash + config as a file + # object so the detection is tied to the scanned host, not a loose + # sha256. + fobj = MISPObject("file") + sha_attr = fobj.add_attribute("sha256", value=sha) + sha_attr.add_tag(_CS_TAG) # tags attach to attributes, not objects + if block.get("md5"): + fobj.add_attribute("md5", value=block["md5"]) + if block.get("size"): + fobj.add_attribute("size-in-bytes", value=block["size"]) + cfg = block.get("config") or {} + fobj.add_attribute( + "text", + value=( + f"Cobalt Strike beacon ({arch}) on {target}; " + f"config: {json.dumps(cfg)[:400]}" + ), + ) + fobj.comment = "RST CS Beacon" + if anchor: + fobj.add_reference(anchor, "characterizes") + event.add_object(fobj) + return standard_results(event) + + +if __name__ == "__main__": + print(json.dumps(version(), indent=2)) diff --git a/misp_modules/modules/expansion/rst_favicon.py b/misp_modules/modules/expansion/rst_favicon.py new file mode 100644 index 00000000..cfcd8b80 --- /dev/null +++ b/misp_modules/modules/expansion/rst_favicon.py @@ -0,0 +1,169 @@ +"""rst_favicon — favicon hashes as a file object (GET /scan/favicon).""" + +from __future__ import annotations + +import base64 +import json +from io import BytesIO + +import rstapi + +from ._rstcloud.client import ( + error, + host_only, + misp_event_with_source, + rst_kwargs, + scan_group, + scan_kwargs, + standard_results, + text_result, + unwrap, + value_from_request, +) + +misperrors = {"error": "Error"} + +_INPUTS = [ + "url", + "domain", + "hostname", + "ip-src", + "ip-dst", + "ip-src|port", + "ip-dst|port", + "hostname|port", + "domain|port", +] +# misp_standard: file object (md5/sha1/sha256 pivotable in Netlas/Censys) +# plus a standalone favicon_hash attribute (Murmur3/MMH3, Shodan/FOFA). +mispattributes = {"input": _INPUTS, "format": "misp_standard"} + +moduleinfo = { + "version": "0.3", + "author": "RST Cloud", + "description": ( + "Fetch a target's favicon (image + all hashes for" + " Shodan/Netlas/Censys pivoting) via RST Scan API." + ), + "module-type": ["expansion"], + "name": "RST Cloud Favicon", + "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."], + "features": ( + "Retrieves the favicon image and cryptographic hashes via RST Scan" + " GET /scan/favicon. Returns a file MISP object with" + " MD5/SHA-1/SHA-256 for Censys/Netlas pivoting and a standalone" + " Murmur3 favicon-hash attribute for Shodan/FOFA-style pivoting." + ), + "references": [ + "https://api.rstcloud.net/", + "https://pypi.org/project/rstapi/", + ], + "input": "URL, domain, hostname, or IP attribute.", + "output": ( + "file MISP object, favicon-hash attribute, and resolved favicon URL." + ), +} +moduleconfig = ["api_key", "base_url", "timeout"] + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + config = request.get("config") + if not rst_kwargs(config)["APIKEY"]: + return error( + "An RST Cloud API key is required (set api_key in the module" + " config)." + ) + # Favicon endpoint expects a bare host or a full URL — never host:port. + # The API fetches the page over HTTP/HTTPS itself; adding ":443" breaks it. + raw = value_from_request(request, _INPUTS) + if not raw: + return error("No target found in the request.") + target = raw if raw.startswith(("http://", "https://")) else host_only(raw) + + data, err = unwrap( + rstapi.scan(**scan_kwargs(config)).GetFavicon( + target, include_base64=True + ) + ) + if err: + return error(f"RST favicon scan failed: {err}") + if not isinstance(data, dict) or not data.get("favicon_hash"): + return text_result(f"{target}: no favicon returned", "RST Favicon") + + from pymisp import MISPObject + + fhash = str(data["favicon_hash"]) + req_loc = data.get("req_location") or "" + content_type = data.get("req_content_type") or "image/x-icon" + + # Real filename from the resolved favicon URL (e.g. "drive_2026_32dp.ico") + raw_fname = ( + req_loc.rstrip("/").split("/")[-1].split("?")[0] if req_loc else "" + ) + fname = raw_fname if (raw_fname and "." in raw_fname) else "favicon.ico" + + event, source = misp_event_with_source(request) + anchor = scan_group(request, source) + + # file object: standard hashes (md5/sha1/sha256) are auto-correlated and + # indexed for pivoting in Netlas, Censys, and other threat-intel platforms. + fobj = MISPObject("file") + fobj.add_attribute("filename", value=fname) + fobj.add_attribute("mimetype", value=content_type) + for htype in ("md5", "sha1", "sha256"): + if data.get(htype): + fobj.add_attribute(htype, value=data[htype]) + + # Attach the raw image when the API returned base64 + try: + raw = ( + base64.b64decode(data["base64_image"]) + if data.get("base64_image") + else None + ) + except Exception: + raw = None + if raw: + fobj.add_attribute("attachment", value=fname, data=BytesIO(raw)) + + fobj.comment = "RST Favicon" + if anchor: + fobj.add_reference(anchor, "identifies") + event.add_object(fobj) + + # Resolved favicon URL — where the image actually lives after redirects + if req_loc: + event.add_attribute( + "link", + req_loc, + comment="RST Favicon resolved URL", + to_ids=False, + ) + + # favicon_hash (Murmur3/MMH3): standalone attribute for independent + # correlation across events and Shodan/FOFA-style hunting workflows. + fav_attr = event.add_attribute( + "other", + fhash, + comment=f"Murmur3 favicon hash for {target} (Shodan/FOFA pivot)", + to_ids=False, + ) + fav_attr.add_tag(f'rstcloud:favicon:hash="{fhash}"') + + return standard_results(event) + + +if __name__ == "__main__": + print(json.dumps(version(), indent=2)) diff --git a/misp_modules/modules/expansion/rst_html.py b/misp_modules/modules/expansion/rst_html.py new file mode 100644 index 00000000..ab7471d8 --- /dev/null +++ b/misp_modules/modules/expansion/rst_html.py @@ -0,0 +1,158 @@ +"""rst_html — fetch HTML/JS as attachment (GET /scan/html/body[/js]). + +Target format: host:port (e.g. drive.google.com:443). Full URLs pass through +unchanged. +""" + +from __future__ import annotations + +import json +from io import BytesIO + +import rstapi + +from ._rstcloud.client import ( + error, + misp_event_with_source, + rst_kwargs, + scan_group, + scan_kwargs, + scan_target, + standard_results, + text_result, + unwrap, +) + +misperrors = {"error": "Error"} + +_INPUTS = [ + "url", + "domain", + "hostname", + "ip-src", + "ip-dst", + "ip-src|port", + "ip-dst|port", + "hostname|port", + "domain|port", +] +# misp_standard: return the fetched body as a downloadable attachment. +mispattributes = {"input": _INPUTS, "format": "misp_standard"} + +moduleinfo = { + "version": "0.2", + "author": "RST Cloud", + "description": ( + "Fetch rendered HTML body or extracted JavaScript for a URL/IP" + " target via RST Scan API." + ), + "module-type": ["expansion"], + "name": "RST Cloud HTML Fetcher", + "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."], + "features": ( + "Fetches the rendered HTML body or extracted JavaScript from the" + " target via RST Scan. Returns a file MISP object with the page" + " attached and pivotable content hashes. Configurable mode: body" + " (default) or js." + ), + "references": [ + "https://api.rstcloud.net/", + "https://pypi.org/project/rstapi/", + ], + "input": ( + "URL, domain, hostname, or IP attribute (optional port via config)." + ), + "output": ( + "file MISP object (page.html or page.js) with hashes and HTTP" + " metadata." + ), +} +# 'mode' = body | js (default body). 'port' (optional): override default +# port 443. +moduleconfig = ["api_key", "base_url", "mode", "port", "timeout"] + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + config = request.get("config") or {} + if not rst_kwargs(config)["APIKEY"]: + return error( + "An RST Cloud API key is required (set api_key in the module" + " config)." + ) + target = scan_target(request, _INPUTS, config, default_port=443) + if not target: + return error("No target found in the request.") + + is_js = (config.get("mode") or "body").lower() == "js" + client = rstapi.scan(**scan_kwargs(config)) + method = client.GetHtmlBodyJs if is_js else client.GetHtmlBody + data, err = unwrap(method(target)) + if err: + return error(f"RST HTML fetch failed: {err}") + + body = ( + data.get("body") + if isinstance(data, dict) + else (data if isinstance(data, str) else "") + ) + if not body: + return text_result(f"{target}: empty response", "RST HTML Fetcher") + + from pymisp import MISPObject + + meta = data if isinstance(data, dict) else {} + hashes = meta.get("hashes") or {} + event, source = misp_event_with_source(request) + anchor = scan_group(request, source) + + # The fetched body IS a file: group it (attachment + pivotable hashes + + # response metadata) in a `file` object rather than a lone size string. + filename = "page.js" if is_js else "page.html" + label = "extracted JavaScript" if is_js else "HTML body" + fobj = MISPObject("file") + fobj.add_attribute( + "attachment", + value=filename, + data=BytesIO(body.encode("utf-8", "replace")), + to_ids=False, + ) + fobj.add_attribute("filename", value=filename) + fobj.add_attribute( + "mimetype", + value="application/javascript" if is_js else "text/html", + ) + fobj.add_attribute( + "size-in-bytes", value=meta.get("content_length") or len(body) + ) + for htype in ("md5", "sha1", "sha256"): + if hashes.get(htype): + fobj.add_attribute(htype, value=hashes[htype]) + info = [f"RST {label} for {target}"] + if meta.get("http_status"): + info.append(f"HTTP {meta['http_status']}") + if meta.get("title"): + info.append(f"title: {meta['title']}") + if meta.get("truncated"): + info.append("(body truncated)") + fobj.add_attribute("text", value="; ".join(info)) + fobj.comment = "RST HTML Fetcher" + if anchor: + fobj.add_reference(anchor, "derived-from") + event.add_object(fobj) + return standard_results(event) + + +if __name__ == "__main__": + print(json.dumps(version(), indent=2)) diff --git a/misp_modules/modules/expansion/rst_ioc.py b/misp_modules/modules/expansion/rst_ioc.py new file mode 100644 index 00000000..73e452c2 --- /dev/null +++ b/misp_modules/modules/expansion/rst_ioc.py @@ -0,0 +1,449 @@ +"""rst_ioc — enrich an indicator with RST threat intel (GET /ioc).""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone + +import rstapi + +from ._rstcloud.client import ( + apply_to_source_attribute, + error, + host_only, + misp_event_with_source, + new_enrichment_object, + rst_kwargs, + rst_resolver_from_config, + scan_group, + standard_results, + text_result, + threat_tags, + unwrap, + value_from_request, +) + +misperrors = {"error": "Error"} + +_INPUTS = [ + "ip-src", + "ip-dst", + "domain", + "hostname", + "url", + "md5", + "sha1", + "sha256", + "ip-src|port", + "ip-dst|port", + "hostname|port", + "domain|port", +] +mispattributes = {"input": _INPUTS, "format": "misp_standard"} + +moduleinfo = { + "version": "0.4", + "author": "RST Cloud", + "description": ( + "Enrich indicators with RST Cloud threat intelligence. Returns an" + " rst-ioc object (score, attribution, geo/ASN for IPs, DNS/WHOIS" + " for domains, parsed components for URLs, related hashes for file" + " hashes) linked back to the enriched attribute." + ), + "module-type": ["expansion", "hover"], + "name": "RST Cloud IoC Lookup", + "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."], + "features": ( + "Queries RST Cloud GET /ioc for threat scores, attribution," + " geo/ASN, DNS, WHOIS, TTPs, CVEs, and related indicators. Returns" + " a structured rst-ioc MISP object with galaxy tags and optional" + " pivotable related hashes/IPs. When misp_url and misp_key are" + " configured, also writes score/threat tags onto the enriched" + " attribute via the MISP API." + ), + "references": [ + "https://api.rstcloud.net/", + "https://pypi.org/project/rstapi/", + ], + "input": ( + "IP, domain, hostname, URL, or hash attribute (incl. host|port" + " composites)." + ), + "output": ( + "rst-ioc MISP object, galaxy/score tags, and optional related" + " attributes." + ), +} +# misp_url/misp_key (optional): when set, tags + score note are also written +# directly onto the enriched attribute via the MISP API (like rst_noise). +moduleconfig = [ + "api_key", + "base_url", + "misp_url", + "misp_key", + "misp_verifycert", +] + +_HASH_TYPES = {"md5", "sha1", "sha256"} + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo + + +def _ts(val) -> str: + """Unix timestamp string/int -> YYYY-MM-DD (UTC), or empty string.""" + try: + return datetime.fromtimestamp(int(val), tz=timezone.utc).strftime( + "%Y-%m-%d" + ) + except (TypeError, ValueError, OSError): + return "" + + +def _f(val, precision=1) -> str: + try: + return f"{float(val):.{precision}f}" + except (TypeError, ValueError): + return "" + + +def _known(v) -> bool: + return bool(v) and str(v).strip().lower() not in ( + "", + "none", + "null", + "n/a", + ) + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + config = request.get("config") + if not rst_kwargs(config)["APIKEY"]: + return error( + "An RST Cloud API key is required (set api_key in the module" + " config)." + ) + value = host_only(value_from_request(request, _INPUTS)) + if not value: + return error("No supported indicator value found in the request.") + + # /ioc always returns HTTP 200; a miss carries an "error" key and no "id". + data, err = unwrap( + rstapi.ioclookup(**rst_kwargs(config)).GetIndicator(value) + ) + if err: + return error(f"RST Cloud lookup failed: {err}") + if not isinstance(data, dict) or data.get("error") or not data.get("id"): + return text_result( + f"{value}: not found in RST Cloud", "RST IoC Lookup" + ) + + ioc_type = (data.get("ioc_type") or "").lower() + is_ip = ioc_type in ("ipv4", "ipv6") + is_domain = ioc_type == "domain" + is_url = ioc_type == "url" + is_hash = ioc_type in _HASH_TYPES + + score_block = data.get("score") or {} + total = score_block.get("total") + try: + total_int = int(float(str(total))) + except (TypeError, ValueError): + total_int = None + conf_sub = _f(score_block.get("tags"), 2) # context sub-score + relev_sub = _f(score_block.get("frequency"), 2) # relevance sub-score + + threats = data.get("threat") or [] + tags_str = (data.get("tags") or {}).get("str") or [] + ttp = data.get("ttp") or [] + cve = data.get("cve") or [] + industry = data.get("industry") or [] + fp = data.get("fp") or {} + fp_alarm = str(fp.get("alarm") or "").strip().lower() + fp_flagged = fp_alarm in ("true", "possible") + geo = data.get("geo") or {} + asn_blk = data.get("asn") or {} + src_blk = data.get("src") or {} + resolved = data.get("resolved") or {} + parsed = data.get("parsed") or {} + fseen = _ts(data.get("fseen")) + lseen = _ts(data.get("lseen")) + + # ------------------------------------------------------------------------- + # Derive the type-specific context strings once; reused for both the typed + # rst-ioc object and the annotation fallback text. + # ------------------------------------------------------------------------- + geo_str = asn_str = whois_str = http_status = "" + dns_records: list[str] = [] # ["A: 1.2.3.4", "CNAME: ..."] + resolved_ips: list[str] = [] # pivotable A-record IPs (domains) + url_parts: list[str] = [] + filenames = [f for f in data.get("filename") or [] if _known(f)] + + if is_ip: + if geo.get("country"): + parts = [geo["country"]] + if geo.get("region") and geo["region"] != geo["country"]: + parts.append(geo["region"]) + if geo.get("city") and geo["city"] not in parts: + parts.append(geo["city"]) + geo_str = ", ".join(parts) + if asn_blk.get("num"): + asn_str = f"AS{asn_blk['num']}" + if asn_blk.get("isp"): + asn_str += f" {asn_blk['isp']}" + if asn_blk.get("org") and asn_blk["org"] != asn_blk.get("isp"): + asn_str += f" / {asn_blk['org']}" + + if is_domain: + res_ip = resolved.get("ip") or {} + a_records = [r for r in res_ip.get("a") or [] if _known(r)] + cnames = [r for r in res_ip.get("cname") or [] if _known(r)] + aliases = [r for r in res_ip.get("alias") or [] if _known(r)] + resolved_ips = a_records + if a_records: + dns_records.append("A: " + ", ".join(a_records)) + if cnames: + dns_records.append("CNAME: " + ", ".join(cnames)) + if aliases: + dns_records.append("alias: " + ", ".join(aliases)) + + whois = resolved.get("whois") or {} + if whois.get("havedata") == "true" or whois.get("registrar"): + w_parts = [] + if _known(whois.get("registrar")): + w_parts.append(f"registrar: {whois['registrar']}") + if _known(whois.get("registrant")): + w_parts.append(f"registrant: {whois['registrant']}") + if _known(whois.get("created")): + w_parts.append(f"created: {whois['created'][:10]}") + if _known(whois.get("expires")): + w_parts.append(f"expires: {whois['expires'][:10]}") + if _known(whois.get("updated")): + w_parts.append(f"updated: {whois['updated'][:10]}") + if _known(whois.get("age")): + w_parts.append(f"age: {whois['age']} days") + whois_str = ", ".join(w_parts) + + if is_url: + if _known(parsed.get("domain")): + url_parts.append(f"domain: {parsed['domain']}") + if _known(parsed.get("path")) and parsed.get("path") not in ( + "/", + "None", + "none", + ): + url_parts.append(f"path: {parsed['path']}") + if _known(parsed.get("port")) and parsed.get("port") != "None": + url_parts.append(f"port: {parsed['port']}") + if _known(resolved.get("status")): + http_status = str(resolved["status"]) + + # Source report URLs (deduped, order preserved). + ref_urls: list[str] = [] + seen_refs: set[str] = set() + for report_url in (src_blk.get("report") or "").split(","): + report_url = report_url.strip() + if report_url and report_url not in seen_refs: + ref_urls.append(report_url) + seen_refs.add(report_url) + src_names = src_blk.get("name") or [] + + # ------------------------------------------------------------------------- + # Annotation fallback text (also a useful human summary). + # ------------------------------------------------------------------------- + lines: list[str] = [] + score_parts = [] + if total_int is not None: + score_parts.append(f"total: {total_int}/100") + if _f(score_block.get("src")): + score_parts.append(f"src: {_f(score_block['src'])}") + if conf_sub: + score_parts.append(f"context: {conf_sub}") + if relev_sub: + score_parts.append(f"relevance: {relev_sub}") + if score_parts: + lines.append("Score: " + ", ".join(score_parts)) + if fseen or lseen: + lines.append(f"Seen: {fseen or '?'} to {lseen or '?'}") + if geo_str: + lines.append("Geo: " + geo_str) + if asn_str: + lines.append("ASN: " + asn_str) + if dns_records: + lines.append("DNS: " + " | ".join(dns_records)) + if whois_str: + lines.append("WHOIS: " + whois_str) + if url_parts: + lines.append("URL: " + ", ".join(url_parts)) + if http_status: + lines.append("HTTP status: " + http_status) + if is_hash: + hash_parts = [ + f"{h.upper()}: {data[h]}" + for h in ("md5", "sha1", "sha256") + if _known(data.get(h)) + ] + if hash_parts: + lines.append("Hashes: " + ", ".join(hash_parts)) + if filenames: + lines.append("Filenames: " + ", ".join(filenames)) + if industry: + lines.append("Industry: " + ", ".join(industry)) + if threats: + lines.append("Threats: " + ", ".join(threats)) + if tags_str: + lines.append("Tags: " + ", ".join(tags_str)) + if ttp: + lines.append("TTPs: " + ", ".join(ttp)) + if cve: + lines.append("CVEs: " + ", ".join(cve)) + if fp_flagged: + note = f"FP alarm: {fp_alarm}" + if fp.get("descr"): + note += f" - {fp['descr']}" + lines.append(note) + if data.get("description"): + lines.append("Description: " + data["description"]) + if src_names: + lines.append("Sources: " + ", ".join(src_names)) + + # ------------------------------------------------------------------------- + # Galaxy + score / FP tags + # ------------------------------------------------------------------------- + rst_resolver = rst_resolver_from_config(config) + galaxy_tags = threat_tags(threats, rst_resolver) + if total_int is not None: + galaxy_tags.append(f'rstcloud:score-total="{total_int}"') + if fp_flagged: + risk = "high" if fp_alarm == "true" else "medium" + galaxy_tags.append(f'false-positive:risk="{risk}"') + + # ------------------------------------------------------------------------- + # Build MISP result + # ------------------------------------------------------------------------- + event, source = misp_event_with_source(request) + anchor = scan_group(request, source) + + obj, dedicated = new_enrichment_object("rst-ioc") + obj.comment = "RST IoC Lookup" + + if dedicated: + tag_target = None + if total_int is not None: + tag_target = obj.add_attribute( + "score-total", value=str(total_int), to_ids=False + ) + if conf_sub: + obj.add_attribute("score-confidence", value=conf_sub, to_ids=False) + if relev_sub: + obj.add_attribute("score-relevance", value=relev_sub, to_ids=False) + if fseen: + obj.add_attribute("first-seen", value=fseen, to_ids=False) + if lseen: + obj.add_attribute("last-seen", value=lseen, to_ids=False) + for t in threats: + a = obj.add_attribute("threat", value=t, to_ids=False) + tag_target = tag_target or a + for t in ttp: + obj.add_attribute("ttp", value=t, to_ids=False) + for c in cve: + obj.add_attribute("cve", value=c, to_ids=False) + for ind in industry: + obj.add_attribute("industry", value=ind, to_ids=False) + for t in tags_str: + obj.add_attribute("tag", value=t, to_ids=False) + if fp_flagged: + fp_val = fp_alarm + ( + f" - {fp['descr']}" if fp.get("descr") else "" + ) + obj.add_attribute("false-positive", value=fp_val, to_ids=False) + if geo_str: + obj.add_attribute("geo", value=geo_str, to_ids=False) + if asn_str: + obj.add_attribute("asn", value=asn_str, to_ids=False) + for rec in dns_records: + obj.add_attribute("dns", value=rec, to_ids=False) + if whois_str: + obj.add_attribute("whois", value=whois_str, to_ids=False) + if http_status: + obj.add_attribute("http-status", value=http_status, to_ids=False) + for fn in filenames: + obj.add_attribute("filename", value=fn, to_ids=False) + if data.get("description"): + obj.add_attribute( + "description", value=data["description"], to_ids=False + ) + for ref in ref_urls: + obj.add_attribute("ref", value=ref, to_ids=False) + # Fall back to a text attribute as the tag anchor if nothing else + # exists. + tag_target = tag_target or obj.add_attribute( + "description", value="\n".join(lines), to_ids=False + ) + else: + obj.add_attribute("type", value="RST IoC Lookup", to_ids=False) + tag_target = obj.add_attribute( + "text", value="\n".join(lines), to_ids=False + ) + if fseen: + obj.add_attribute("creation-date", value=fseen, to_ids=False) + for ref in ref_urls: + obj.add_attribute("ref", value=ref, to_ids=False) + + for tag in galaxy_tags: + tag_target.add_tag(tag) + if anchor: + obj.add_reference(anchor, "characterizes") + event.add_object(obj) + + # Pivotable hashes: expose related hash values as searchable IOC + # attributes (separate from the object) so they correlate across events. + if is_hash: + for htype in ("md5", "sha1", "sha256"): + hval = data.get(htype) + if _known(hval) and hval != value: + a = event.add_attribute( + htype, + value=hval, + to_ids=True, + comment="RST IoC Lookup - related hash", + ) + for tag in galaxy_tags: + a.add_tag(tag) + + # Pivotable resolved IPs for domains (context, not detection-worthy). + for rip in resolved_ips: + event.add_attribute( + "ip-dst", + value=rip, + to_ids=False, + comment="RST IoC Lookup - resolved IP", + ) + + # Optional write-back: apply tags + brief note onto the enriched attribute + # directly via the MISP API when misp_url/misp_key are configured. + apply_to_source_attribute( + config, + request, + tags=galaxy_tags, + comment_note=( + f"RST score {total_int}/100" + + (f"; threats: {', '.join(threats)}" if threats else "") + ), + comment_prefix="RST score", + ) + + return standard_results(event) + + +if __name__ == "__main__": + print(json.dumps(version(), indent=2)) diff --git a/misp_modules/modules/expansion/rst_noise_control.py b/misp_modules/modules/expansion/rst_noise_control.py new file mode 100644 index 00000000..1668f800 --- /dev/null +++ b/misp_modules/modules/expansion/rst_noise_control.py @@ -0,0 +1,253 @@ +"""rst_noise_control — benign/noise check (GET /benign/lookup).""" + +from __future__ import annotations + +import json + +import rstapi + +from ._rstcloud.client import ( + apply_to_source_attribute, + error, + host_only, + misp_event_with_source, + new_enrichment_object, + rst_kwargs, + scan_group, + standard_results, + text_result, + unwrap, + value_from_request, +) + +misperrors = {"error": "Error"} + +_INPUTS = [ + "ip-src", + "ip-dst", + "domain", + "hostname", + "url", + "md5", + "sha1", + "sha256", + "ip-src|port", + "ip-dst|port", + "hostname|port", + "domain|port", +] +mispattributes = {"input": _INPUTS, "format": "misp_standard"} + +moduleinfo = { + "version": "0.4", + "author": "RST Cloud", + "description": ( + "Check whether a value (IP, domain, URL or hash) is known-good /" + " noise via RST Noise Control. Returns an rst-noise object" + " (verdict, category) linked back to the enriched attribute." + ), + "module-type": ["expansion", "hover"], + "name": "RST Cloud Noise Control", + "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."], + "features": ( + "Queries RST Cloud GET /benign/lookup for benign/noisy verdicts." + " Returns an rst-noise MISP object with false-positive risk tags." + " When misp_url and misp_key are configured, also annotates the" + " source attribute in place (tags, comment, to_ids, false-positive" + " sightings)." + ), + "references": [ + "https://api.rstcloud.net/", + "https://pypi.org/project/rstapi/", + ], + "input": ( + "IP, domain, hostname, URL, or hash attribute (incl. host|port" + " composites)." + ), + "output": ( + "rst-noise MISP object with verdict, category, and risk/noise tags." + ), +} +# misp_url/misp_key/misp_verifycert (optional): when set the verdict is ALSO +# written directly onto the enriched attribute (tags, comment, to_ids, FP +# sightings) via the MISP API — the annotation object is always returned +# regardless. +moduleconfig = [ + "api_key", + "base_url", + "misp_url", + "misp_key", + "misp_verifycert", +] + +# Tag families we own — stripped before re-adding so re-runs replace not stack. +_TAG_PREFIXES = ( + "false-positive:risk=", + "rstcloud:noise-control=", + "rstcloud:noise-category=", +) + + +def _category(reason: str) -> str: + """'Change Score Shodan/Scanners/Shodan' -> 'Shodan/Scanners/Shodan'.""" + for action in ("Change Score ", "Drop "): + if reason.startswith(action): + n = len(action) + return reason[n:].strip() + return reason.strip() + + +def _category_tag(category: str) -> str: + """First ``/``-delimited segment for ``rstcloud:noise-category``. + + Lower cardinality than the full category path. Full category path stays in + the object/comment text; the tag uses only the top-level bucket before + the first ``/``. + + Example (md5, ``Drop Ubuntu Server 26.04 LTS/pam_sepermit.so/``):: + + Verdict: BENIGN - known-good + Category: Ubuntu Server 26.04 LTS/pam_sepermit.so/ + Type: md5 + rstcloud:noise-category="Ubuntu Server 26.04 LTS" + """ + category = (category or "").strip() + if not category: + return category + return category.split("/", 1)[0].strip() + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + config = request.get("config") + if not rst_kwargs(config)["APIKEY"]: + return error( + "An RST Cloud API key is required (set api_key in the module" + " config)." + ) + value = host_only(value_from_request(request, _INPUTS)) + if not value: + return error("No supported value found in the request.") + + # /benign/lookup always returns HTTP 200 with + # {value, type, benign, reason}. + # `benign` is the STRING "true"/"false". The `reason` prefix encodes + # action: + # "Drop ..." -> known-good, safe to suppress (FP risk high) + # "Change Score ..." -> noisy infra (scanners/CDN…), reduce score only + # (FP risk medium — do NOT treat as clean) + # benign=="false" -> unknown / not in database. + # + # Example benign md5 (reason "Drop Ubuntu Server 26.04 LTS/.../"): + # object/comment Category: Ubuntu Server 26.04 LTS/pam_sepermit.so/ + # tag rstcloud:noise-category="Ubuntu Server 26.04 LTS" + # (+ false-positive:risk="high", rstcloud:noise-control="drop") + data, err = unwrap( + rstapi.noisecontrol(**rst_kwargs(config)).ValueLookup(value) + ) + if err: + return error(f"RST Noise Control lookup failed: {err}") + if not isinstance(data, dict): + return text_result( + f"{value}: unexpected response from RST Noise Control", + "RST Noise Control", + ) + + benign = str(data.get("benign", "")).strip().lower() == "true" + reason = (data.get("reason") or "").strip() + ioc_type = (data.get("type") or "").strip() + category = _category(reason) + tag_category = _category_tag(category) + + # --- Determine verdict, tags, and write-back actions --- + if not benign: + verdict = "Not flagged" + # "Not Found in our database" is the API constant for unknown — + # not a category. + detail = "" + tags = [] + fp_sightings = 0 + set_to_ids = None + elif reason.lower().startswith("change score"): + verdict = "NOISY - reduce score" + detail = category + tags = [ + 'false-positive:risk="medium"', + 'rstcloud:noise-control="change-score"', + f'rstcloud:noise-category="{tag_category}"', + ] + fp_sightings = 1 + set_to_ids = None + else: + verdict = "BENIGN - known-good" + detail = category + tags = [ + 'false-positive:risk="high"', + 'rstcloud:noise-control="drop"', + f'rstcloud:noise-category="{tag_category}"', + ] + fp_sightings = 2 + set_to_ids = False + + # --- Build annotation fallback text --- + lines = [f"Verdict: {verdict}"] + if detail: + lines.append(f"Category: {detail}") + if ioc_type: + lines.append(f"Type: {ioc_type}") + + # --- Build MISP result --- + event, source = misp_event_with_source(request) + anchor = scan_group(request, source) + + obj, dedicated = new_enrichment_object("rst-noise") + obj.comment = "RST Noise Control" + if dedicated: + tag_target = obj.add_attribute("verdict", value=verdict, to_ids=False) + if detail: + obj.add_attribute("category", value=detail, to_ids=False) + if ioc_type: + obj.add_attribute("ioc-type", value=ioc_type, to_ids=False) + obj.add_attribute("benign", value=str(benign).lower(), to_ids=False) + else: + obj.add_attribute("type", value="RST Noise Control", to_ids=False) + tag_target = obj.add_attribute( + "text", value="\n".join(lines), to_ids=False + ) + for tag in tags: + tag_target.add_tag(tag) + if anchor: + obj.add_reference(anchor, "related-to") + event.add_object(obj) + + # Optional write-back: when MISP creds are configured, ALSO annotate the + # source attribute in place (tags, comment, to_ids flip, FP sightings). + # The annotation object is returned regardless. + apply_to_source_attribute( + config, + request, + tags=tags, + comment_note=f"RST Noise Control: {verdict}" + + (f" - {detail}" if detail else ""), + comment_prefix="RST Noise Control:", + replace_tag_prefixes=_TAG_PREFIXES, + set_to_ids=set_to_ids, + fp_sightings=fp_sightings, + ) + + return standard_results(event) + + +if __name__ == "__main__": + print(json.dumps(version(), indent=2)) diff --git a/misp_modules/modules/expansion/rst_screenshot.py b/misp_modules/modules/expansion/rst_screenshot.py new file mode 100644 index 00000000..9aea9c19 --- /dev/null +++ b/misp_modules/modules/expansion/rst_screenshot.py @@ -0,0 +1,144 @@ +"""rst_screenshot — page screenshot as image (GET /scan/html/screenshot/*).""" + +from __future__ import annotations + +import base64 +import json +from io import BytesIO + +import rstapi + +from ._rstcloud.client import ( + error, + misp_event_with_source, + rst_kwargs, + scan_group, + scan_kwargs, + scan_target, + standard_results, + text_result, + unwrap, +) + +misperrors = {"error": "Error"} + +_INPUTS = [ + "url", + "domain", + "hostname", + "ip-src", + "ip-dst", + "ip-src|port", + "ip-dst|port", + "hostname|port", + "domain|port", +] +# misp_standard: return an image MISPObject with the PNG attached (rendered +# inline in MISP) instead of a text description. +mispattributes = {"input": _INPUTS, "format": "misp_standard"} + +moduleinfo = { + "version": "0.2", + "author": "RST Cloud", + "description": ( + "Capture a page screenshot (first/full/last frame) of a URL/IP" + " target via RST Scan API." + ), + "module-type": ["expansion"], + "name": "RST Cloud Screenshot", + "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."], + "features": ( + "Renders the target page and returns a PNG screenshot as an image" + " MISP object (inline in MISP). Configurable frame: first, full" + " (default), or last." + ), + "references": [ + "https://api.rstcloud.net/", + "https://pypi.org/project/rstapi/", + ], + "input": ( + "URL, domain, hostname, or IP attribute (optional port via config)." + ), + "output": ( + "image MISP object with PNG attachment linked to the enriched" + " attribute." + ), +} +# 'frame' selects which screenshot endpoint to call (first/full/last, +# default full). +# 'port' (optional): override default port 443. +moduleconfig = ["api_key", "base_url", "frame", "port", "timeout"] + +_FRAMES = { + "first": "GetHtmlScreenshotFirst", + "full": "GetHtmlScreenshotFull", + "last": "GetHtmlScreenshotLast", +} + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + config = request.get("config") or {} + if not rst_kwargs(config)["APIKEY"]: + return error( + "An RST Cloud API key is required (set api_key in the module" + " config)." + ) + target = scan_target(request, _INPUTS, config, default_port=443) + if not target: + return error("No target found in the request.") + + method = _FRAMES.get( + (config.get("frame") or "full").lower(), "GetHtmlScreenshotFull" + ) + data, err = unwrap( + getattr(rstapi.scan(**scan_kwargs(config)), method)(target) + ) + if err: + return error(f"RST screenshot failed: {err}") + + b64 = data.get("image_base64") if isinstance(data, dict) else None + try: + raw = base64.b64decode(b64) if b64 else None + except Exception: + raw = None + if not raw: + return text_result( + f"{target}: no screenshot returned ({method})", + "RST Screenshot", + ) + + from pymisp import MISPObject + + event, source = misp_event_with_source(request) + anchor = scan_group(request, source) + image = MISPObject("image") + image.add_attribute( + "attachment", value="screenshot.png", data=BytesIO(raw) + ) + image.comment = f"RST Screenshot ({method})" + event.add_attribute( + "link", + f"https://{target}" if "://" not in target else target, + comment=f"RST Screenshot source ({method})", + to_ids=False, + ) + if anchor: + image.add_reference(anchor, "screenshot-of") + event.add_object(image) + return standard_results(event) + + +if __name__ == "__main__": + print(json.dumps(version(), indent=2)) diff --git a/misp_modules/modules/expansion/rst_ssl.py b/misp_modules/modules/expansion/rst_ssl.py new file mode 100644 index 00000000..51c944ad --- /dev/null +++ b/misp_modules/modules/expansion/rst_ssl.py @@ -0,0 +1,134 @@ +"""rst_ssl — SSL certificate as x509 object (GET /scan/ssl/certificate).""" + +from __future__ import annotations + +import json + +import rstapi + +from ._rstcloud.client import ( + error, + misp_event_with_source, + rst_kwargs, + scan_group, + scan_kwargs, + scan_target, + standard_results, + text_result, + unwrap, +) + +misperrors = {"error": "Error"} + +_INPUTS = [ + "ip-dst", + "ip-src", + "hostname", + "domain", + "ip-dst|port", + "ip-src|port", + "hostname|port", + "domain|port", +] +# misp_standard: return a real x509 MISPObject (searchable subject/issuer, +# pivotable fingerprints) instead of a text blob. +mispattributes = {"input": _INPUTS, "format": "misp_standard"} + +moduleinfo = { + "version": "0.2", + "author": "RST Cloud", + "description": ( + "Fetch the SSL certificate for an IP[:port] as an x509 object via" + " RST Scan API." + ), + "module-type": ["expansion"], + "name": "RST Cloud SSL Certificate", + "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."], + "features": ( + "Connects to the target service and retrieves the TLS certificate" + " via RST Scan GET /scan/ssl/certificate. Returns an x509 MISP" + " object with pivotable fingerprints (SHA-1/256/MD5), subject," + " issuer, and validity dates." + ), + "references": [ + "https://api.rstcloud.net/", + "https://pypi.org/project/rstapi/", + ], + "input": ( + "IP, hostname, or domain attribute (optional port via config or" + " composite)." + ), + "output": "x509 MISP object referencing the enriched attribute.", +} +# 'port' (optional): TLS port to scan when the attribute carries none (API +# defaults to 443 if omitted). +moduleconfig = ["api_key", "base_url", "port", "timeout"] + +# RST certificate field -> x509 object_relation (pymisp infers attribute +# type from the template, so fingerprints become pivotable x509-fingerprint-* +# types). +_X509_MAP = { + "subject_dn": "subject", + "issuer_dn": "issuer", + "serial_number": "serial-number", + "version": "version", + "not_before": "validity-not-before", + "not_after": "validity-not-after", + "fingerprint_sha1": "x509-fingerprint-sha1", + "fingerprint_sha256": "x509-fingerprint-sha256", + "fingerprint_md5": "x509-fingerprint-md5", +} + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + config = request.get("config") + if not rst_kwargs(config)["APIKEY"]: + return error( + "An RST Cloud API key is required (set api_key in the module" + " config)." + ) + target = scan_target(request, _INPUTS, config, default_port=443) + if not target: + return error( + "No target found in the request (expects an IP/hostname)." + ) + + data, err = unwrap( + rstapi.scan(**scan_kwargs(config)).GetSslCertificate(target) + ) + if err: + return error(f"RST SSL scan failed: {err}") + if not isinstance(data, dict) or not data.get("subject_dn"): + return text_result( + f"{target}: no certificate returned", "RST SSL Certificate" + ) + + from pymisp import MISPObject + + event, source = misp_event_with_source(request) + anchor = scan_group(request, source) + x509 = MISPObject("x509") + for field, relation in _X509_MAP.items(): + if data.get(field): + x509.add_attribute(relation, value=data[field]) + x509.comment = f"RST SSL Certificate for {target}" + if anchor: + x509.add_reference(anchor, "identifies") + event.add_object(x509) + return standard_results(event) + + +if __name__ == "__main__": + print(json.dumps(version(), indent=2)) diff --git a/misp_modules/modules/expansion/rst_whois.py b/misp_modules/modules/expansion/rst_whois.py new file mode 100644 index 00000000..bac6f396 --- /dev/null +++ b/misp_modules/modules/expansion/rst_whois.py @@ -0,0 +1,155 @@ +"""rst_whois — parsed WHOIS as whois object (GET /whois/{domain}).""" + +from __future__ import annotations + +import json + +import rstapi + +from ._rstcloud.client import ( + error, + misp_event_with_source, + rst_kwargs, + scan_group, + standard_results, + text_result, + unwrap, + value_from_request, +) + +misperrors = {"error": "Error"} + +_INPUTS = ["domain", "hostname"] +mispattributes = {"input": _INPUTS, "format": "misp_standard"} + +moduleinfo = { + "version": "0.2", + "author": "RST Cloud", + "description": ( + "Retrieve parsed WHOIS information for a domain via RST Cloud." + ), + "module-type": ["expansion", "hover"], + "name": "RST Cloud Whois", + "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."], + "features": ( + "Queries RST Cloud GET /whois for parsed domain registration data." + " Returns a standard whois MISP object (registrar, registrant," + " dates, nameservers) linked back to the enriched attribute." + ), + "references": [ + "https://api.rstcloud.net/", + "https://pypi.org/project/rstapi/", + ], + "input": "Domain or hostname attribute.", + "output": "whois MISP object with registration and nameserver fields.", +} +moduleconfig = ["api_key", "base_url"] + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo + + +def _known(v) -> bool: + """True when a value is present and not a placeholder like 'unknown'.""" + return bool(v) and str(v).strip().lower() not in ( + "unknown", + "none", + "", + "null", + "n/a", + ) + + +def handler(q=False): + if q is False: + return False + request = json.loads(q) + config = request.get("config") + if not rst_kwargs(config)["APIKEY"]: + return error( + "An RST Cloud API key is required (set api_key in the module" + " config)." + ) + domain = value_from_request(request, _INPUTS) + if not domain: + return error("No domain found in the request.") + + data, err = unwrap( + rstapi.whoisapi(**rst_kwargs(config)).GetDomainInfo(domain) + ) + if err: + return error(f"RST Whois API lookup failed: {err}") + if not isinstance(data, dict): + return text_result(f"{domain}: no WHOIS data found", "RST Whois API") + + from pymisp import MISPObject + + event, source = misp_event_with_source(request) + anchor = scan_group(request, source) + + obj = MISPObject("whois") + obj.comment = f"RST Whois API lookup for {domain}" + + # Identity + if _known(data.get("domain")): + obj.add_attribute("domain", value=data["domain"], to_ids=False) + if _known(data.get("registrar")): + obj.add_attribute("registrar", value=data["registrar"], to_ids=False) + if _known(data.get("registrant")): + obj.add_attribute( + "registrant-name", value=data["registrant"], to_ids=False + ) + if _known(data.get("registrant_org")): + obj.add_attribute( + "registrant-org", value=data["registrant_org"], to_ids=False + ) + if _known(data.get("registrant_email")): + obj.add_attribute( + "registrant-email", + value=data["registrant_email"], + to_ids=False, + ) + + # Dates (API returns "created_on" / "updated_on" / "expires_on") + if _known(data.get("created_on")): + obj.add_attribute( + "creation-date", value=data["created_on"], to_ids=False + ) + if _known(data.get("updated_on")): + obj.add_attribute( + "modification-date", value=data["updated_on"], to_ids=False + ) + if _known(data.get("expires_on")): + obj.add_attribute( + "expiration-date", value=data["expires_on"], to_ids=False + ) + + # Nameservers — one attribute per NS + for ns in (data.get("nameservers") or "").split(","): + ns = ns.strip() + if ns: + obj.add_attribute("nameserver", value=ns, to_ids=False) + + # Domain age + status as a free-text note (no dedicated whois relation) + notes = [] + if data.get("age") is not None: + notes.append(f"age: {data['age']} days") + if _known(data.get("status")): + notes.append(f"status: {data['status']}") + if notes: + obj.add_attribute("text", value="; ".join(notes), to_ids=False) + + if anchor: + obj.add_reference(anchor, "related-to") + event.add_object(obj) + return standard_results(event) + + +if __name__ == "__main__": + print(json.dumps(version(), indent=2)) diff --git a/poetry.lock b/poetry.lock index 00c6c771..482021eb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -6367,6 +6367,25 @@ files = [ {file = "rpds_py-2026.5.1.tar.gz", hash = "sha256:07b24fea40541e28570e5b795a4a38fbdcd12550c06bd0748005ecc8116ca256"}, ] +[[package]] +name = "rstapi" +version = "1.2.0" +description = "Python library to access the RST Cloud API." +optional = true +python-versions = "*" +groups = ["main"] +markers = "extra == \"minimal\" or extra == \"all\"" +files = [ + {file = "rstapi-1.2.0-py3-none-any.whl", hash = "sha256:aab1fbb4e520135a3b280bebadb6511ad805f5aee46a31ed18a28aa3b6ae529c"}, + {file = "rstapi-1.2.0.tar.gz", hash = "sha256:19c5d98f522b9dbf03f1960aa76c137ce03be6d8c205b0cf9336ec7a8c9f25ff"}, +] + +[package.dependencies] +requests = "*" + +[package.extras] +test = ["pytest (>=7)"] + [[package]] name = "rtfde" version = "0.1.2.2" @@ -7890,10 +7909,10 @@ test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more_it type = ["pytest-mypy (>=1.0.1) ; platform_python_implementation != \"PyPy\""] [extras] -all = ["anyrun-sdk", "apiosintds", "assemblyline_client", "backscatter", "blockchain", "censys", "clamd", "crowdstrike-falconpy", "dnsdb2", "domaintools_api", "geoip2", "greynoise", "jbxapi", "lief", "lxml", "maclookup", "matplotlib", "mattermostdriver", "misp-stix", "mwdblib", "ndjson", "numpy", "oauth2", "opencv-python", "openpyxl", "pandas", "pandas_ods_reader", "pandoc", "passivetotal", "pdftotext", "pycountry", "pyeti-python3", "pyeupi", "pygeoip", "pyintel471", "pyipasnhistory", "pymisp", "pypdns", "pypssl", "pysafebrowsing", "pytesseract", "python-docx", "python-pptx", "pyzbar", "requests", "setuptools", "shodan", "sigmatools", "sigmf", "slack-sdk", "socialscan", "sparqlwrapper", "tau-clients", "taxii2-client", "trustar", "urlarchiver", "vt-graph-api", "vt-py", "vulners", "vysion", "wand", "xlrd", "yara-python"] -minimal = ["backscatter", "blockchain", "censys", "clamd", "crowdstrike-falconpy", "dnsdb2", "domaintools_api", "geoip2", "greynoise", "jbxapi", "maclookup", "matplotlib", "mattermostdriver", "ndjson", "oauth2", "passivetotal", "pycountry", "pyeti-python3", "pyeupi", "pygeoip", "pyintel471", "pyipasnhistory", "pypdns", "pypssl", "pysafebrowsing", "requests", "slack-sdk", "socialscan", "urlarchiver", "vt-graph-api", "vt-py", "vulners", "yara-python"] +all = ["anyrun-sdk", "apiosintds", "assemblyline_client", "backscatter", "blockchain", "censys", "clamd", "crowdstrike-falconpy", "dnsdb2", "domaintools_api", "geoip2", "greynoise", "jbxapi", "lief", "lxml", "maclookup", "matplotlib", "mattermostdriver", "misp-stix", "mwdblib", "ndjson", "numpy", "oauth2", "opencv-python", "openpyxl", "pandas", "pandas_ods_reader", "pandoc", "passivetotal", "pdftotext", "pycountry", "pyeti-python3", "pyeupi", "pygeoip", "pyintel471", "pyipasnhistory", "pymisp", "pypdns", "pypssl", "pysafebrowsing", "pytesseract", "python-docx", "python-pptx", "pyzbar", "requests", "rstapi", "setuptools", "shodan", "sigmatools", "sigmf", "slack-sdk", "socialscan", "sparqlwrapper", "tau-clients", "taxii2-client", "trustar", "urlarchiver", "vt-graph-api", "vt-py", "vulners", "vysion", "wand", "xlrd", "yara-python"] +minimal = ["backscatter", "blockchain", "censys", "clamd", "crowdstrike-falconpy", "dnsdb2", "domaintools_api", "geoip2", "greynoise", "jbxapi", "maclookup", "matplotlib", "mattermostdriver", "ndjson", "oauth2", "passivetotal", "pycountry", "pyeti-python3", "pyeupi", "pygeoip", "pyintel471", "pyipasnhistory", "pypdns", "pypssl", "pysafebrowsing", "requests", "rstapi", "slack-sdk", "socialscan", "urlarchiver", "vt-graph-api", "vt-py", "vulners", "yara-python"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<4.0" -content-hash = "380a4bb70a1fcbc16a4f8092bb827d9bd8f828c7bd249a391e9eb03626b5938c" +content-hash = "4ccfa27333f21a63bb882b6c1feeebf3deca81a602a910954d72922e98ac65e8" diff --git a/pyproject.toml b/pyproject.toml index d94a8467..2cc122cb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ minimal = [ "pyipasnhistory", "pysafebrowsing", "requests[security]", + "rstapi>=1.2.0,<2", "slack-sdk", "urlarchiver", "vt-graph-api", @@ -120,6 +121,7 @@ all = [ "python-pptx", "pyzbar", "requests[security]", ## in minimal + "rstapi>=1.2.0,<2", ## in minimal "setuptools", "shodan", "sigmatools", diff --git a/tests/test_rst_ioc.py b/tests/test_rst_ioc.py new file mode 100644 index 00000000..f5878dd6 --- /dev/null +++ b/tests/test_rst_ioc.py @@ -0,0 +1,58 @@ +"""Unit tests for rst_ioc (mocked rstapi, no network).""" + +import json +from unittest.mock import patch + +from misp_modules.modules.expansion import rst_ioc + + +class _FakeClient: + def __init__(self, payload): + self._payload = payload + + def GetIndicator(self, value): + return self._payload + + +def _query(attribute, config=None): + return json.dumps({ + "module": "rst_ioc", + "attribute": attribute, + "config": config if config is not None else {"api_key": "test-key"}, + }) + + +def test_rst_ioc_not_found_returns_text(): + attribute = {"type": "ip-dst", "value": "8.8.8.8", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + with patch.object(rst_ioc.rstapi, "ioclookup", return_value=_FakeClient({"error": "Not Found"})): + result = rst_ioc.handler(_query(attribute)) + assert "Attribute" in result["results"] + assert any("not found" in a["value"].lower() for a in result["results"]["Attribute"]) + + +def test_rst_ioc_hit_returns_rst_ioc_object_with_score_tag(): + attribute = {"type": "domain", "value": "evil.example", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + payload = { + "id": "rst-123", + "ioc_type": "domain", + "score": {"total": "82", "tags": "0.5", "frequency": "0.3"}, + "threat": ["akira_ransomware"], + "fp": {"alarm": "false"}, + } + with patch.object(rst_ioc.rstapi, "ioclookup", return_value=_FakeClient(payload)): + result = rst_ioc.handler(_query(attribute)) + obj = result["results"]["Object"][0] + assert obj["name"] in ("rst-ioc", "annotation") + relations = {a.get("object_relation"): a for a in obj["Attribute"]} + tag_target = relations.get("score-total") or relations.get("text") or obj["Attribute"][0] + tag_names = [t["name"] for t in tag_target.get("Tag", [])] + assert 'rstcloud:score-total="82"' in tag_names + assert any("akira" in t.lower() for t in tag_names) + + +def test_rst_ioc_missing_api_key(): + attribute = {"type": "domain", "value": "evil.example", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + with patch.object(rst_ioc.rstapi, "ioclookup") as mock_lookup: + result = rst_ioc.handler(_query(attribute, config={})) + mock_lookup.assert_not_called() + assert result == {"error": "An RST Cloud API key is required (set api_key in the module config)."} diff --git a/tests/test_rst_noise_control.py b/tests/test_rst_noise_control.py new file mode 100644 index 00000000..60a92698 --- /dev/null +++ b/tests/test_rst_noise_control.py @@ -0,0 +1,92 @@ +"""Unit tests for rst_noise_control (mocked rstapi, no network).""" + +import json +from unittest.mock import patch + +from misp_modules.modules.expansion import rst_noise_control + + +class _FakeClient: + def __init__(self, payload): + self._payload = payload + + def ValueLookup(self, value): + return self._payload + + +def _query(attribute, config=None): + return json.dumps({ + "module": "rst_noise_control", + "attribute": attribute, + "config": config if config is not None else {"api_key": "test-key"}, + }) + + +def _verdict_attr(result): + obj = result["results"]["Object"][0] + for a in obj["Attribute"]: + if a.get("object_relation") == "verdict": + return a + for a in obj["Attribute"]: + val = a.get("value") or "" + if a.get("object_relation") == "text" or (a.get("type") == "text" and "Verdict:" in val): + return a + return obj["Attribute"][-1] + + +def _verdict_tags(result): + obj = result["results"]["Object"][0] + for a in obj["Attribute"]: + if a.get("Tag"): + return [t["name"] for t in a["Tag"]] + return [] + + +def test_rst_noise_control_drop_verdict(): + attribute = {"type": "ip-dst", "value": "8.8.8.8", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + payload = {"benign": "true", "reason": "Drop Public DNS/Services/Google", "type": "ipv4"} + with patch.object(rst_noise_control.rstapi, "noisecontrol", return_value=_FakeClient(payload)): + result = rst_noise_control.handler(_query(attribute)) + verdict = _verdict_attr(result) + assert "BENIGN" in verdict["value"] + tags = _verdict_tags(result) + assert 'false-positive:risk="high"' in tags + assert 'rstcloud:noise-control="drop"' in tags + assert 'rstcloud:noise-category="Public DNS"' in tags + + +_UBUNTU_CATEGORY = "Ubuntu Server 26.04 LTS/pam_sepermit.so/" +_UBUNTU_TAG = "Ubuntu Server 26.04 LTS" + + +def test_rst_noise_control_ubuntu_benign_hash(): + attribute = {"type": "md5", "value": "abc", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + payload = {"benign": "true", "reason": f"Drop {_UBUNTU_CATEGORY}", "type": "md5"} + with patch.object(rst_noise_control.rstapi, "noisecontrol", return_value=_FakeClient(payload)): + result = rst_noise_control.handler(_query(attribute)) + verdict = _verdict_attr(result) + assert "BENIGN" in verdict["value"] + tags = _verdict_tags(result) + assert 'false-positive:risk="high"' in tags + assert 'rstcloud:noise-control="drop"' in tags + assert f'rstcloud:noise-category="{_UBUNTU_TAG}"' in tags + + +def test_rst_noise_control_deep_category_tag(): + full = "NSRL 2025.03.1_modern/726.LibOVRPlatform64_1.dll/Meta - Oculus Platform SDK" + attribute = {"type": "md5", "value": "abc", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + payload = {"benign": "true", "reason": f"Drop {full}", "type": "md5"} + with patch.object(rst_noise_control.rstapi, "noisecontrol", return_value=_FakeClient(payload)): + result = rst_noise_control.handler(_query(attribute)) + tags = _verdict_tags(result) + assert 'rstcloud:noise-category="NSRL 2025.03.1_modern"' in tags + + +def test_rst_noise_control_not_in_database(): + attribute = {"type": "ip-dst", "value": "1.2.3.4", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + payload = {"benign": "false", "reason": "Not Found in our database", "type": "ipv4"} + with patch.object(rst_noise_control.rstapi, "noisecontrol", return_value=_FakeClient(payload)): + result = rst_noise_control.handler(_query(attribute)) + verdict = _verdict_attr(result) + assert "not flagged" in verdict["value"].lower() + assert _verdict_tags(result) == [] diff --git a/tests/test_rst_ssl.py b/tests/test_rst_ssl.py new file mode 100644 index 00000000..6ce7832a --- /dev/null +++ b/tests/test_rst_ssl.py @@ -0,0 +1,48 @@ +"""Unit tests for rst_ssl (mocked rstapi, no network).""" + +import json +from unittest.mock import patch + +from misp_modules.modules.expansion import rst_ssl + + +class _FakeClient: + def __init__(self, payload): + self._payload = payload + + def GetSslCertificate(self, target): + return self._payload + + +def _query(attribute, config=None): + return json.dumps({ + "module": "rst_ssl", + "attribute": attribute, + "config": config if config is not None else {"api_key": "test-key", "port": "443"}, + }) + + +def test_rst_ssl_returns_x509_object(): + attribute = {"type": "ip-dst", "value": "93.184.216.34", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + payload = { + "subject_dn": "CN=example.com", + "issuer_dn": "CN=DigiCert", + "fingerprint_sha1": "a" * 40, + "fingerprint_sha256": "b" * 64, + "not_after": "2026-12-21T19:20:01Z", + "serial_number": "01", + } + with patch.object(rst_ssl.rstapi, "scan", return_value=_FakeClient(payload)): + result = rst_ssl.handler(_query(attribute)) + obj = result["results"]["Object"][0] + assert obj["name"] == "x509" + relations = {a["object_relation"]: a for a in obj["Attribute"]} + assert relations["subject"]["value"] == "CN=example.com" + assert relations["x509-fingerprint-sha256"]["value"] == "b" * 64 + + +def test_rst_ssl_no_certificate(): + attribute = {"type": "ip-dst", "value": "1.2.3.4", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"} + with patch.object(rst_ssl.rstapi, "scan", return_value=_FakeClient({})): + result = rst_ssl.handler(_query(attribute)) + assert any("no certificate" in a["value"].lower() for a in result["results"]["Attribute"])