diff --git a/README.md b/README.md
index d401c378..964c09db 100644
--- a/README.md
+++ b/README.md
@@ -123,6 +123,14 @@ For further Information see the [license file](https://misp.github.io/misp-modul
* [Real-time Blackhost Lists Lookup](https://misp.github.io/misp-modules/expansion/#real-time-blackhost-lists-lookup) - Module to check an IPv4 address against known RBLs.
* [Recorded Future Enrich](https://misp.github.io/misp-modules/expansion/#recorded-future-enrich) - Module to enrich attributes with threat intelligence from Recorded Future.
* [Reverse DNS](https://misp.github.io/misp-modules/expansion/#reverse-dns) - Simple Reverse DNS expansion service to resolve reverse DNS from MISP attributes.
+* [RST Cloud Cobalt Strike Beacon](https://misp.github.io/misp-modules/expansion/#rst-cloud-cobalt-strike-beacon) - Scan a target for Cobalt Strike beacon configurations via RST Scan API.
+* [RST Cloud Favicon](https://misp.github.io/misp-modules/expansion/#rst-cloud-favicon) - Fetch favicon image and hashes for Shodan/Netlas/Censys/FOFA pivoting via RST Scan API.
+* [RST Cloud HTML Fetcher](https://misp.github.io/misp-modules/expansion/#rst-cloud-html-fetcher) - Fetch rendered HTML body or extracted JavaScript via RST Scan API.
+* [RST Cloud IoC Lookup](https://misp.github.io/misp-modules/expansion/#rst-cloud-ioc-lookup) - Enrich indicators with RST Cloud threat intelligence.
+* [RST Cloud Noise Control](https://misp.github.io/misp-modules/expansion/#rst-cloud-noise-control) - Check whether an indicator is known-good or noisy via RST Noise Control.
+* [RST Cloud Screenshot](https://misp.github.io/misp-modules/expansion/#rst-cloud-screenshot) - Capture a page screenshot via RST Scan API.
+* [RST Cloud SSL Certificate](https://misp.github.io/misp-modules/expansion/#rst-cloud-ssl-certificate) - Fetch TLS certificate as an x509 MISP object via RST Scan API.
+* [RST Cloud Whois](https://misp.github.io/misp-modules/expansion/#rst-cloud-whois) - Retrieve parsed WHOIS for a domain via RST Cloud.
* [ReversingLabs Spectra Analyze](https://misp.github.io/misp-modules/expansion/#reversinglabs-spectra-analyze) - Threat intelligence enrichment module
* [SecurityTrails Lookup](https://misp.github.io/misp-modules/expansion/#securitytrails-lookup) - An expansion modules for SecurityTrails.
* [Shodan Lookup](https://misp.github.io/misp-modules/expansion/#shodan-lookup) - Module to query on Shodan.
diff --git a/documentation/logos/rstcloud.png b/documentation/logos/rstcloud.png
new file mode 100644
index 00000000..af2f9c3b
Binary files /dev/null and b/documentation/logos/rstcloud.png differ
diff --git a/documentation/mkdocs/expansion.md b/documentation/mkdocs/expansion.md
index e7f966f3..7f1d78a8 100644
--- a/documentation/mkdocs/expansion.md
+++ b/documentation/mkdocs/expansion.md
@@ -2118,6 +2118,260 @@ Module to check an IPv4 address against known RBLs.
-----
+#### [RST Cloud Cobalt Strike Beacon](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_cs_beacon.py)
+
+
+
+Scan a target IP[:port] for a Cobalt Strike beacon configuration via RST Scan API.
+[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_cs_beacon.py)]
+
+- **features**:
+>Probes the target for Cobalt Strike beacon configurations via RST Scan GET /scan/cs-beacon. On a hit, returns file MISP object(s) with pivotable SHA-256 hashes tagged to the Cobalt Strike galaxy.
+
+- **config**:
+> - api_key
+> - base_url
+> - port
+> - timeout
+
+- **input**:
+>IP, URL, domain, or hostname attribute (optional port via config).
+
+- **output**:
+>file MISP object(s) with beacon hashes and Cobalt Strike galaxy tag.
+
+- **references**:
+>https://api.rstcloud.net/
+
+- **requirements**:
+> - rstapi>=1.2.0 (PyPI)
+> - An RST Cloud API key
+
+-----
+
+#### [RST Cloud Favicon](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_favicon.py)
+
+
+
+Fetch a target's favicon (image + all hashes for Shodan/Netlas/Censys pivoting) via RST Scan API.
+[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_favicon.py)]
+
+- **features**:
+>Retrieves the favicon image and cryptographic hashes via RST Scan GET /scan/favicon. Returns a file MISP object with MD5/SHA-1/SHA-256 and a standalone Murmur3 favicon-hash attribute for Shodan/FOFA-style pivoting.
+
+- **config**:
+> - api_key
+> - base_url
+> - timeout
+
+- **input**:
+>URL, domain, hostname, or IP attribute.
+
+- **output**:
+>file MISP object, favicon-hash attribute, and resolved favicon URL.
+
+- **references**:
+>https://api.rstcloud.net/
+
+- **requirements**:
+> - rstapi>=1.2.0 (PyPI)
+> - An RST Cloud API key
+
+-----
+
+#### [RST Cloud HTML Fetcher](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_html.py)
+
+
+
+Fetch rendered HTML body or extracted JavaScript for a URL/IP target via RST Scan API.
+[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_html.py)]
+
+- **features**:
+>Fetches the rendered HTML body or extracted JavaScript from the target via RST Scan. Returns a file MISP object with the page attached and pivotable content hashes. Configurable mode: body (default) or js.
+
+- **config**:
+> - api_key
+> - base_url
+> - mode
+> - port
+> - timeout
+
+- **input**:
+>URL, domain, hostname, or IP attribute (optional port via config).
+
+- **output**:
+>file MISP object (page.html or page.js) with hashes and HTTP metadata.
+
+- **references**:
+>https://api.rstcloud.net/
+
+- **requirements**:
+> - rstapi>=1.2.0 (PyPI)
+> - An RST Cloud API key
+
+-----
+
+#### [RST Cloud IoC Lookup](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_ioc.py)
+
+
+
+Enrich indicators with RST Cloud threat intelligence.
+[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_ioc.py)]
+
+- **features**:
+>Queries RST Cloud GET /ioc for threat scores, attribution, geo/ASN, DNS, WHOIS, TTPs, CVEs, and related indicators. Returns a structured rst-ioc MISP object with galaxy tags and optional pivotable related hashes/IPs. When misp_url and misp_key are configured, also writes score/threat tags onto the enriched attribute via the MISP API.
+
+- **config**:
+> - api_key
+> - base_url
+> - misp_url
+> - misp_key
+> - misp_verifycert
+
+- **input**:
+>IP, domain, hostname, URL, or hash attribute (incl. host|port composites).
+
+- **output**:
+>rst-ioc MISP object, galaxy/score tags, and optional related attributes.
+
+- **references**:
+>https://api.rstcloud.net/
+>https://github.com/MISP/misp-objects/pull/526
+
+- **requirements**:
+> - rstapi>=1.2.0 (PyPI)
+> - An RST Cloud API key
+> - rst-ioc object template installed on MISP ([misp-objects #526](https://github.com/MISP/misp-objects/pull/526))
+
+-----
+
+#### [RST Cloud Noise Control](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_noise_control.py)
+
+
+
+Check whether a value is known-good / noise via RST Noise Control.
+[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_noise_control.py)]
+
+- **features**:
+>Queries RST Cloud GET /benign/lookup for benign/noisy verdicts. Returns an rst-noise MISP object with false-positive risk tags. When misp_url and misp_key are configured, also annotates the source attribute in place (tags, comment, to_ids, false-positive sightings).
+
+- **config**:
+> - api_key
+> - base_url
+> - misp_url
+> - misp_key
+> - misp_verifycert
+
+- **input**:
+>IP, domain, hostname, URL, or hash attribute (incl. host|port composites).
+
+- **output**:
+>rst-noise MISP object with verdict, category, and risk/noise tags.
+
+- **references**:
+>https://api.rstcloud.net/
+>https://github.com/MISP/misp-taxonomies/pull/335
+
+- **requirements**:
+> - rstapi>=1.2.0 (PyPI)
+> - An RST Cloud API key
+> - rst-noise object template on MISP ([misp-objects #526](https://github.com/MISP/misp-objects/pull/526))
+> - rstcloud taxonomy on MISP ([misp-taxonomies #335](https://github.com/MISP/misp-taxonomies/pull/335))
+
+-----
+
+#### [RST Cloud Screenshot](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_screenshot.py)
+
+
+
+Capture a page screenshot of a URL/IP target via RST Scan API.
+[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_screenshot.py)]
+
+- **features**:
+>Renders the target page and returns a PNG screenshot as an image MISP object (inline in MISP). Configurable frame: first, full (default), or last.
+
+- **config**:
+> - api_key
+> - base_url
+> - frame
+> - port
+> - timeout
+
+- **input**:
+>URL, domain, hostname, or IP attribute (optional port via config).
+
+- **output**:
+>image MISP object with PNG attachment linked to the enriched attribute.
+
+- **references**:
+>https://api.rstcloud.net/
+
+- **requirements**:
+> - rstapi>=1.2.0 (PyPI)
+> - An RST Cloud API key
+
+-----
+
+#### [RST Cloud SSL Certificate](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_ssl.py)
+
+
+
+Fetch the SSL certificate for an IP[:port] as an x509 object via RST Scan API.
+[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_ssl.py)]
+
+- **features**:
+>Connects to the target service and retrieves the TLS certificate via RST Scan GET /scan/ssl/certificate. Returns an x509 MISP object with pivotable fingerprints (SHA-1/256/MD5), subject, issuer, and validity dates.
+
+- **config**:
+> - api_key
+> - base_url
+> - port
+> - timeout
+
+- **input**:
+>IP, hostname, or domain attribute (optional port via config or composite).
+
+- **output**:
+>x509 MISP object referencing the enriched attribute.
+
+- **references**:
+>https://api.rstcloud.net/
+
+- **requirements**:
+> - rstapi>=1.2.0 (PyPI)
+> - An RST Cloud API key
+
+-----
+
+#### [RST Cloud Whois](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_whois.py)
+
+
+
+Retrieve parsed WHOIS information for a domain via RST Cloud.
+[[source code](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/rst_whois.py)]
+
+- **features**:
+>Queries RST Cloud GET /whois for parsed domain registration data. Returns a standard whois MISP object (registrar, registrant, dates, nameservers) linked back to the enriched attribute.
+
+- **config**:
+> - api_key
+> - base_url
+
+- **input**:
+>Domain or hostname attribute.
+
+- **output**:
+>whois MISP object with registration and nameserver fields.
+
+- **references**:
+>https://api.rstcloud.net/
+
+- **requirements**:
+> - rstapi>=1.2.0 (PyPI)
+> - An RST Cloud API key
+
+-----
+
#### [Recorded Future Enrich](https://github.com/MISP/misp-modules/tree/main/misp_modules/modules/expansion/recordedfuture.py)
diff --git a/documentation/mkdocs/index.md b/documentation/mkdocs/index.md
index e0f5e888..27549435 100644
--- a/documentation/mkdocs/index.md
+++ b/documentation/mkdocs/index.md
@@ -91,6 +91,14 @@ For more information: [Extending MISP with Python modules](https://www.misp-proj
* [RandomcoinDB Lookup](https://misp.github.io/misp-modules/expansion/#randomcoindb-lookup) - Module to access the ransomcoinDB (see https://ransomcoindb.concinnity-risks.com)
* [r7_akb](https://misp.github.io/misp-modules/expansion/#r7_akb) - Enrich CVEs via AttackerKB and return structured MISP events. Handles rate limits, regex CVE detection, and markdown cleanup.
* [Real-time Blackhost Lists Lookup](https://misp.github.io/misp-modules/expansion/#real-time-blackhost-lists-lookup) - Module to check an IPv4 address against known RBLs.
+* [RST Cloud Cobalt Strike Beacon](https://misp.github.io/misp-modules/expansion/#rst-cloud-cobalt-strike-beacon) - Scan a target for Cobalt Strike beacon configurations via RST Scan API.
+* [RST Cloud Favicon](https://misp.github.io/misp-modules/expansion/#rst-cloud-favicon) - Fetch favicon image and hashes for Shodan/Netlas/Censys/FOFA pivoting via RST Scan API.
+* [RST Cloud HTML Fetcher](https://misp.github.io/misp-modules/expansion/#rst-cloud-html-fetcher) - Fetch rendered HTML body or extracted JavaScript via RST Scan API.
+* [RST Cloud IoC Lookup](https://misp.github.io/misp-modules/expansion/#rst-cloud-ioc-lookup) - Enrich indicators with RST Cloud threat intelligence.
+* [RST Cloud Noise Control](https://misp.github.io/misp-modules/expansion/#rst-cloud-noise-control) - Check whether an indicator is known-good or noisy via RST Noise Control.
+* [RST Cloud Screenshot](https://misp.github.io/misp-modules/expansion/#rst-cloud-screenshot) - Capture a page screenshot via RST Scan API.
+* [RST Cloud SSL Certificate](https://misp.github.io/misp-modules/expansion/#rst-cloud-ssl-certificate) - Fetch TLS certificate as an x509 MISP object via RST Scan API.
+* [RST Cloud Whois](https://misp.github.io/misp-modules/expansion/#rst-cloud-whois) - Retrieve parsed WHOIS for a domain via RST Cloud.
* [Recorded Future Enrich](https://misp.github.io/misp-modules/expansion/#recorded-future-enrich) - Module to enrich attributes with threat intelligence from Recorded Future.
* [ReversingLabs Enrichment](https://misp.github.io/misp-modules/expansion/#reversinglabs-enrichment) - Module to enrich file hashes, domains, IPs and URLs with ReversingLabs Spectra Analyze threat intelligence.
* [Reverse DNS](https://misp.github.io/misp-modules/expansion/#reverse-dns) - Simple Reverse DNS expansion service to resolve reverse DNS from MISP attributes.
diff --git a/misp_modules/modules/expansion/_rstcloud/__init__.py b/misp_modules/modules/expansion/_rstcloud/__init__.py
new file mode 100644
index 00000000..7b6d8a06
--- /dev/null
+++ b/misp_modules/modules/expansion/_rstcloud/__init__.py
@@ -0,0 +1,19 @@
+"""Shared RST Cloud helpers for expansion modules (not registered)."""
+
+from .client import ( # noqa: F401
+ apply_to_source_attribute,
+ error,
+ host_only,
+ misp_event_with_source,
+ new_enrichment_object,
+ rst_kwargs,
+ rst_resolver_from_config,
+ scan_group,
+ scan_kwargs,
+ scan_target,
+ standard_results,
+ text_result,
+ threat_tags,
+ unwrap,
+ value_from_request,
+)
diff --git a/misp_modules/modules/expansion/_rstcloud/client.py b/misp_modules/modules/expansion/_rstcloud/client.py
new file mode 100644
index 00000000..0884a801
--- /dev/null
+++ b/misp_modules/modules/expansion/_rstcloud/client.py
@@ -0,0 +1,528 @@
+"""Shared helpers for the RST Cloud expansion modules.
+
+The modules ride the official ``rstapi`` library (PyPI) for transport, so a
+misp-modules deployment just needs ``pip install rstapi``. These helpers cover
+config parsing, error unwrapping, and the misp-modules result shape.
+"""
+
+from __future__ import annotations
+
+import json
+import re
+
+DEFAULT_BASE_URL = "https://api.rstcloud.net/v1"
+
+
+def api_key_from_config(config: dict | None) -> str:
+ """misp-modules passes user config as a dict; accept common key names."""
+ config = config or {}
+ return (
+ config.get("api_key")
+ or config.get("apikey")
+ or config.get("rst_api_key")
+ or ""
+ )
+
+
+def base_url_from_config(config: dict | None) -> str:
+ return (config or {}).get("base_url") or DEFAULT_BASE_URL
+
+
+def rst_kwargs(config: dict | None) -> dict:
+ """Constructor kwargs shared by every rstapi client."""
+ return {
+ "APIKEY": api_key_from_config(config),
+ "APIURL": base_url_from_config(config),
+ }
+
+
+def scan_kwargs(config: dict | None) -> dict:
+ """Constructor kwargs for rstapi.scan with optional read timeout.
+
+ Extends ``rst_kwargs``. Scan endpoints (ssl/html/favicon/screenshot/
+ cs-beacon) are synchronous: the RST Cloud server connects to the target
+ during your request, so they can take much longer than a database lookup.
+ The default rstapi READ timeout is 20 s, which is sometimes not enough.
+ Set ``timeout`` in the module config (seconds, default 60) to override it.
+ """
+ kw = rst_kwargs(config)
+ try:
+ kw["READ"] = max(1, int((config or {}).get("timeout") or 60))
+ except (ValueError, TypeError):
+ kw["READ"] = 60
+ return kw
+
+
+def value_from_request(request: dict, keys) -> str | None:
+ """Pull the indicator from a misp-modules request (attribute or typed).
+
+ Handles all three shapes MISP sends: a full ``attribute`` object, a typed
+ top-level key (incl. composites like ``ip-dst|port``), and object-level
+ enrichment where the value lives in ``object["Attribute"]``.
+ """
+ if request.get("attribute"):
+ return request["attribute"].get("value")
+ for key in keys:
+ if request.get(key):
+ return request[key]
+ obj = request.get("object")
+ if isinstance(obj, dict):
+ wanted = set(keys)
+ for a in obj.get("Attribute") or []:
+ if a.get("type") in wanted and a.get("value"):
+ return a["value"]
+ return None
+
+
+def host_only(value):
+ """Strip a MISP composite ``|port`` suffix; return bare host/indicator.
+
+ Used by the lookup modules (ioc / noise-control / whois) where the API keys
+ on the value itself and the port is irrelevant.
+ """
+ if not value:
+ return value
+ return str(value).split("|", 1)[0].strip()
+
+
+_PORT_SUFFIX = re.compile(r":\d{1,5}$")
+_PORT_RELATIONS = ("dst-port", "src-port", "port")
+
+
+def _join_host_port(host: str, port) -> str:
+ """host:port, bracketing IPv6 literals so the colons aren't ambiguous."""
+ if host.count(":") >= 2 and not host.startswith("["):
+ return f"[{host}]:{port}"
+ return f"{host}:{port}"
+
+
+def _has_explicit_port(host: str) -> bool:
+ # "1.2.3.4:443" / "host:443" — but not a bare IPv4 or IPv6 literal.
+ return bool(_PORT_SUFFIX.search(host)) and host.count(":") == 1
+
+
+def _sibling_port(request) -> str | None:
+ """Port taken from a sibling attribute when MISP passes the whole object.
+
+ An ``ip-port`` object stores the port as its own attribute
+ (object_relation ``dst-port`` / ``src-port`` / ``port``); when MISP
+ includes ``object`` in the request, pick it up so the user doesn't have
+ to set one.
+ """
+ obj = request.get("object")
+ if not isinstance(obj, dict):
+ return None
+ for a in obj.get("Attribute") or []:
+ rel = (a.get("object_relation") or "").lower()
+ typ = (a.get("type") or "").lower()
+ if (rel in _PORT_RELATIONS or typ == "port") and a.get("value"):
+ return str(a["value"]).strip()
+ return None
+
+
+def scan_target(
+ request,
+ inputs,
+ config,
+ *,
+ as_url=False,
+ default_port=None,
+ default_scheme="https",
+):
+ """Build a Scan-API target from a MISP attribute with optional port.
+
+ IP/host attributes carry no port, but the Scan API addresses a *service*:
+ ``host:port`` for ssl / cs-beacon / favicon, or a URL for html /
+ screenshot. Port resolution, most specific first:
+
+ 1. an explicit port already in the value (``1.2.3.4:8443`` or a URL),
+ 2. a MISP ``host|port`` composite (e.g. an ``ip-dst|port`` attribute),
+ 3. a sibling port attribute in the same MISP object (``ip-port``),
+ 4. the optional ``port`` set in the module config,
+ 5. ``default_port`` (module-specific fallback, may be ``None``).
+
+ For URL endpoints (``as_url=True``) a bare host becomes
+ ``://host[:port]`` where scheme is the config ``scheme`` or
+ ``default_scheme``. Returns ``None`` when no value is present.
+ """
+ raw = value_from_request(request, inputs)
+ if not raw:
+ return None
+ raw = str(raw).strip()
+ cfg = config or {}
+
+ if raw.startswith(("http://", "https://")):
+ return raw # already a full URL — it encodes its own port
+
+ # Port, most specific source first.
+ port = None
+ if "|" in raw: # MISP composite "host|port"
+ host, _, p = raw.partition("|")
+ raw, port = host.strip(), (p.strip() or None)
+ port = port or _sibling_port(request) or (cfg.get("port") or None)
+
+ has_port = _has_explicit_port(raw) # value was already "host:port"
+ if as_url:
+ scheme = (cfg.get("scheme") or default_scheme).strip().lower()
+ host = raw if has_port or not port else _join_host_port(raw, port)
+ return f"{scheme}://{host}"
+
+ # host:port endpoints (ssl / cs-beacon / favicon)
+ if has_port:
+ return raw
+ p = port or default_port
+ return _join_host_port(raw, p) if p else raw
+
+
+def unwrap(resp):
+ """Return (data, None) or (None, error_message) for an rstapi response."""
+ if isinstance(resp, dict) and resp.get("status") == "error":
+ return None, str(resp.get("message", "RST Cloud API error"))
+ return resp, None
+
+
+def error(message: str) -> dict:
+ return {"error": message}
+
+
+# Threat-suffix -> (built-in MISP galaxy predicate, RST galaxy stix_type).
+# Kept in sync with rstmisp.misp.tagging; duplicated here so the modules
+# stay droppable into misp-modules standalone. The 2nd element selects which
+# RST custom galaxy (rst-) a name belongs to when resolving the
+# real cluster tag.
+_THREAT_SUFFIX = {
+ "_group": ("misp-galaxy:threat-actor", "intrusion-set"),
+ "_actor": ("misp-galaxy:threat-actor", "intrusion-set"),
+ "_tool": ("misp-galaxy:tool", "tool"),
+ "_stealer": ("misp-galaxy:stealer", "malware"),
+ "_backdoor": ("misp-galaxy:backdoor", "malware"),
+ "_ransomware": ("misp-galaxy:ransomware", "malware"),
+ "_miner": ("misp-galaxy:cryptominers", "malware"),
+ "_exploit": ("misp-galaxy:exploit-kit", "malware"),
+ "_botnet": ("misp-galaxy:botnet", "malware"),
+ "_rat": ("misp-galaxy:rat", "malware"),
+ "_campaign": ("misp-galaxy:campaign", "campaign"),
+}
+# Names with no recognised suffix are malware families.
+_THREAT_DEFAULT = ("misp-galaxy:malware", "malware")
+
+# RST custom galaxy types in MISP (namespace rstcloud); stix_type = type[4:].
+_RST_GALAXY_TYPES = (
+ "rst-malware",
+ "rst-tool",
+ "rst-intrusion-set",
+ "rst-campaign",
+)
+
+# Per-process caches: a misp-modules worker is long-lived, so reuse the PyMISP
+# client + resolved galaxy ids across calls instead of reconnecting on every
+# hover.
+_RESOLVER_CACHE: dict = {}
+
+
+def _truthy(v) -> bool:
+ if isinstance(v, bool):
+ return v
+ return str(v).strip().lower() in ("1", "true", "yes", "on")
+
+
+class _RstClusterResolver:
+ """Resolve an RST threat ``(stix_type, name)`` to its MISP cluster's real
+ ``tag_name`` (``misp-galaxy:rst-*=""``), so an enrichment tag
+ attaches the RST Threat Library galaxy — the same node the library/
+ reports/ feed connectors use. MISP stores a CUSTOM cluster's tag keyed on
+ the UUID, not the name, so the value-form ``rstcloud:rst-*="name"``
+ would not link.
+
+ Targeted ``search_galaxy_clusters`` per name (a handful per enrichment
+ call), not a full galaxy pull; per-name results are memoised on the
+ instance.
+ """
+
+ def __init__(self, misp, galaxy_ids: dict):
+ self._misp = misp
+ self._ids = galaxy_ids
+ self._cache: dict = {}
+
+ def __call__(self, stix_type: str, name_lower: str):
+ gid = self._ids.get("rst-" + stix_type)
+ if not gid:
+ return None
+ key = (stix_type, name_lower)
+ if key not in self._cache:
+ self._cache[key] = self._lookup(gid, name_lower)
+ return self._cache[key]
+
+ def _lookup(self, gid, name_lower):
+ try:
+ clusters = self._misp.search_galaxy_clusters(
+ gid, context="all", searchall=name_lower, pythonify=False
+ )
+ except Exception:
+ return None
+ for c in clusters or []:
+ gc = c.get("GalaxyCluster", c)
+ tname = gc.get("tag_name")
+ if not tname:
+ continue
+ if (gc.get("value") or "").lower() == name_lower:
+ return tname
+ for el in gc.get("GalaxyElement") or []:
+ if (
+ el.get("key") == "synonyms"
+ and (el.get("value") or "").lower() == name_lower
+ ):
+ return tname
+ return None
+
+
+def rst_resolver_from_config(config: dict | None):
+ """Build an RST cluster resolver from optional MISP config, or None.
+
+ Needs ``misp_url`` + ``misp_key`` in the module config; without them
+ (the default standalone deployment) returns None and ``threat_tags`` falls
+ back to built-in galaxy tags. PyMISP is imported lazily so the modules
+ still install with just ``rstapi`` when MISP resolution isn't configured.
+ """
+ config = config or {}
+ url = config.get("misp_url")
+ key = config.get("misp_key")
+ if not url or not key:
+ return None
+ if url in _RESOLVER_CACHE:
+ return _RESOLVER_CACHE[url]
+ try:
+ from pymisp import PyMISP
+ except Exception:
+ return None
+ try:
+ misp = PyMISP(
+ url, key, ssl=_truthy(config.get("misp_verifycert", False))
+ )
+ ids = {}
+ for g in misp.galaxies(pythonify=False) or []:
+ gd = g.get("Galaxy", g)
+ if gd.get("type") in _RST_GALAXY_TYPES and gd.get("id"):
+ ids.setdefault(gd["type"], gd["id"])
+ except Exception:
+ return None
+ resolver = _RstClusterResolver(misp, ids)
+ _RESOLVER_CACHE[url] = resolver
+ return resolver
+
+
+def threat_tags(threats, rst_resolver=None) -> list:
+ """Map RST threat names to MISP galaxy tags (best-effort, suffix-driven).
+
+ When ``rst_resolver`` is supplied (built from MISP config), each name first
+ resolves to its RST Threat Library cluster's real ``tag_name`` so the tag
+ attaches that galaxy; on a miss (or when no resolver) it falls back to the
+ built-in ``misp-galaxy:*`` value-form tag. ``rst_resolver`` is any callable
+ ``(stix_type, name_lower) -> tag_name | None``.
+ """
+ tags = []
+ for threat in threats or []:
+ if threat.endswith(("_technique", "_vuln")):
+ continue
+ predicate, stix_type = _THREAT_DEFAULT
+ name = threat
+ for suffix, (pred, st) in _THREAT_SUFFIX.items():
+ if threat.endswith(suffix):
+ n = len(suffix)
+ predicate, stix_type, name = pred, st, threat[:-n]
+ break
+ clean = name.replace("_", " ")
+ tag = None
+ if rst_resolver:
+ try:
+ tag = rst_resolver(stix_type, clean.lower())
+ except Exception:
+ tag = None
+ tags.append(tag or f'{predicate}="{clean}"')
+ return tags
+
+
+def scan_group(request, source):
+ """uuid scan-result objects should reference so each result stays tied
+ to exactly what was enriched — without spawning extra container objects.
+
+ 1. the parent object, when MISP includes it in the request (``object``);
+ 2. otherwise the enriched source attribute itself.
+
+ A screenshot / certificate / fetched body cannot be an *attribute* of a
+ ``url`` / ``ip-port`` / ``domain-ip`` object — MISP object templates are
+ fixed and have no such relation — so each is returned as its own object
+ that references this anchor (``identifies`` / ``screenshot-of`` / …).
+ Returns the anchor uuid, or ``None`` (typed-key request with no attribute
+ to point at).
+ """
+ obj = request.get("object")
+ if isinstance(obj, dict) and obj.get("uuid"):
+ return obj["uuid"]
+ return source.uuid if source is not None else None
+
+
+def misp_event_with_source(request):
+ """Start a ``MISPEvent`` seeded with the triggering attribute.
+
+ Returns ``(event, source_attribute_or_None)``. Enrichment objects/
+ attributes added to the event can ``add_reference(source.uuid, ...)`` so
+ MISP links them to the attribute the analyst enriched. Requires pymisp,
+ present in a misp-modules deployment (it's a core dependency).
+ """
+ from pymisp import MISPAttribute, MISPEvent
+
+ event = MISPEvent()
+ source = None
+ attr = request.get("attribute")
+ if attr:
+ source = MISPAttribute()
+ source.from_dict(**attr)
+ event.add_attribute(**source)
+ return event, source
+
+
+def new_enrichment_object(name):
+ """Build a ``MISPObject`` for an RST enrichment template.
+
+ Returns ``(object, dedicated)``. Uses the ``rst-*`` template from the MISP
+ object library (install via
+ [MISP/misp-objects](https://github.com/MISP/misp-objects), e.g.
+ [PR #526](https://github.com/MISP/misp-objects/pull/526)). Falls back to
+ a generic ``annotation`` object if the template is not installed yet, so
+ output stays valid misp_standard on any MISP.
+ """
+ from pymisp import MISPObject
+
+ try:
+ obj = MISPObject(name)
+ if getattr(obj, "_known_template", False):
+ return obj, True
+ except Exception:
+ pass
+ return MISPObject("annotation"), False
+
+
+def standard_results(event) -> dict:
+ """Serialise a ``MISPEvent`` into the misp_standard result envelope."""
+ parsed = json.loads(event.to_json())
+ return {
+ "results": {
+ k: parsed[k] for k in ("Attribute", "Object") if parsed.get(k)
+ }
+ }
+
+
+def text_result(value: str, comment: str = "") -> dict:
+ """misp_standard fallback when nothing structured to return (one text)."""
+ attr = {"type": "text", "value": value}
+ if comment:
+ attr["comment"] = comment
+ return {"results": {"Attribute": [attr]}}
+
+
+_PYMISP_CACHE: dict = {}
+
+
+def _pymisp(cfg):
+ """Cached PyMISP client from module config (misp_url/misp_key), or None.
+
+ Reused across calls (a misp-modules worker is long-lived). Returns None
+ when creds are absent or PyMISP can't connect, so callers degrade
+ gracefully.
+ """
+ url, key = cfg.get("misp_url"), cfg.get("misp_key")
+ if not (url and key):
+ return None
+ ck = (url, key, bool(_truthy(cfg.get("misp_verifycert", False))))
+ if ck in _PYMISP_CACHE:
+ return _PYMISP_CACHE[ck]
+ try:
+ from pymisp import PyMISP
+
+ client = PyMISP(url, key, ssl=ck[2])
+ except Exception:
+ return None
+ _PYMISP_CACHE[ck] = client
+ return client
+
+
+def apply_to_source_attribute(
+ config,
+ request,
+ *,
+ tags=None,
+ comment_note=None,
+ comment_prefix=None,
+ replace_tag_prefixes=(),
+ set_to_ids=None,
+ fp_sightings=0,
+):
+ """Write enrichment back ONTO the enriched attribute via the MISP API.
+
+ MISP enrichment itself can only ADD new attributes/objects — it can't
+ modify the attribute you ran the module on. So, *only when*
+ ``misp_url``/``misp_key`` are set in the module config, this updates the
+ source attribute in place:
+
+ * removes the module's own prior tags (``replace_tag_prefixes``) then
+ adds ``tags`` — so re-running replaces rather than stacks verdicts;
+ * appends ``comment_note`` to the existing comment (dropping any
+ previous note that started with ``comment_prefix``, so re-runs stay
+ tidy);
+ * sets ``to_ids`` when ``set_to_ids`` is not None;
+ * adds ``fp_sightings`` false-positive sightings (type 1) — a benign
+ signal that feeds MISP's decay/scoring.
+
+ Returns True if it wrote back (caller should then return an empty result
+ so no duplicate attribute is created); False otherwise (caller returns
+ normally).
+ """
+ cfg = config or {}
+ attr = request.get("attribute") or {}
+ uuid = attr.get("uuid")
+ misp = _pymisp(cfg)
+ if not (uuid and misp):
+ return False
+ try:
+ full = misp.get_attribute(uuid, pythonify=True)
+ except Exception:
+ return False
+ try:
+ changed = False
+ if comment_note is not None:
+ existing = (
+ getattr(full, "comment", None) or attr.get("comment") or ""
+ )
+ segments = [
+ s
+ for s in existing.split(" | ")
+ if s and not (comment_prefix and s.startswith(comment_prefix))
+ ]
+ segments.append(comment_note)
+ full.comment = " | ".join(segments)
+ changed = True
+ if set_to_ids is not None:
+ full.to_ids = bool(set_to_ids)
+ changed = True
+ if changed:
+ misp.update_attribute(full)
+ if replace_tag_prefixes:
+ for t in getattr(full, "tags", []) or []:
+ name = getattr(t, "name", "") or ""
+ if any(name.startswith(p) for p in replace_tag_prefixes):
+ misp.untag(uuid, name)
+ for tag in tags or []:
+ misp.tag(uuid, tag)
+ if fp_sightings:
+ from pymisp import MISPSighting
+
+ for _ in range(int(fp_sightings)):
+ sighting = MISPSighting()
+ sighting.from_dict(
+ type="1", source="RST Noise Control"
+ ) # 1 = false-positive
+ misp.add_sighting(sighting, attribute=uuid)
+ return True
+ except Exception:
+ return False
diff --git a/misp_modules/modules/expansion/rst_cs_beacon.py b/misp_modules/modules/expansion/rst_cs_beacon.py
new file mode 100644
index 00000000..c2e7ccfa
--- /dev/null
+++ b/misp_modules/modules/expansion/rst_cs_beacon.py
@@ -0,0 +1,165 @@
+"""rst_cs_beacon — scan for Cobalt Strike beacon (GET /scan/cs-beacon)."""
+
+from __future__ import annotations
+
+import json
+
+import rstapi
+
+from ._rstcloud.client import (
+ error,
+ misp_event_with_source,
+ rst_kwargs,
+ scan_group,
+ scan_kwargs,
+ scan_target,
+ standard_results,
+ text_result,
+ unwrap,
+)
+
+misperrors = {"error": "Error"}
+
+_INPUTS = [
+ "ip-dst",
+ "ip-src",
+ "url",
+ "domain",
+ "hostname",
+ "ip-dst|port",
+ "ip-src|port",
+ "hostname|port",
+ "domain|port",
+]
+# misp_standard: on a hit, return the beacon blob sha256(s) as pivotable
+# attributes tagged to the Cobalt Strike galaxy.
+mispattributes = {"input": _INPUTS, "format": "misp_standard"}
+
+moduleinfo = {
+ "version": "0.2",
+ "author": "RST Cloud",
+ "description": (
+ "Scan a target IP[:port] for a Cobalt Strike beacon configuration"
+ " via RST Scan API."
+ ),
+ "module-type": ["expansion"],
+ "name": "RST Cloud Cobalt Strike Beacon",
+ "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."],
+ "features": (
+ "Probes the target for Cobalt Strike beacon configurations via RST"
+ " Scan GET /scan/cs-beacon. On a hit, returns file MISP object(s)"
+ " with pivotable SHA-256 hashes tagged to the Cobalt Strike"
+ " galaxy."
+ ),
+ "references": [
+ "https://api.rstcloud.net/",
+ "https://pypi.org/project/rstapi/",
+ ],
+ "input": (
+ "IP, URL, domain, or hostname attribute (optional port via config)."
+ ),
+ "output": (
+ "file MISP object(s) with beacon hashes and Cobalt Strike galaxy tag."
+ ),
+}
+# 'port' (optional): port to probe when the attribute carries none
+# (default 443).
+moduleconfig = ["api_key", "base_url", "port", "timeout"]
+
+_CS_TAG = 'misp-galaxy:tool="Cobalt Strike"'
+
+
+def _arch(node):
+ return node if isinstance(node, dict) else {}
+
+
+def _to_int(v):
+ try:
+ return int(v)
+ except (TypeError, ValueError):
+ return 0
+
+
+def introspection():
+ return mispattributes
+
+
+def version():
+ moduleinfo["config"] = moduleconfig
+ return moduleinfo
+
+
+def handler(q=False):
+ if q is False:
+ return False
+ request = json.loads(q)
+ config = request.get("config")
+ if not rst_kwargs(config)["APIKEY"]:
+ return error(
+ "An RST Cloud API key is required (set api_key in the module"
+ " config)."
+ )
+ target = scan_target(request, _INPUTS, config, default_port=443)
+ if not target:
+ return error("No target found in the request.")
+
+ data, err = unwrap(rstapi.scan(**scan_kwargs(config)).GetCsBeacon(target))
+ if err:
+ return error(f"RST CS beacon scan failed: {err}")
+ if not isinstance(data, dict) or not data:
+ return text_result(
+ f"{target}: no Cobalt Strike beacon found", "RST CS Beacon"
+ )
+
+ # The scanner ALWAYS returns x86/x64 probe blocks; an actual beacon is only
+ # present when a block carries a parsed `config` (or a non-zero `size`). An
+ # empty config / size 0 means "probed, nothing found" — NOT a detection.
+ blocks = {"x86": _arch(data.get("x86")), "x64": _arch(data.get("x64"))}
+ hits = {
+ arch: b
+ for arch, b in blocks.items()
+ if b.get("config") or _to_int(b.get("size")) > 0
+ }
+ if not hits:
+ return text_result(
+ f"{target}: no Cobalt Strike beacon detected", "RST CS Beacon"
+ )
+
+ from pymisp import MISPObject
+
+ event, source = misp_event_with_source(request)
+ anchor = scan_group(request, source)
+
+ seen = set()
+ for arch, block in hits.items():
+ sha = block.get("sha256")
+ if not sha or sha in seen:
+ continue
+ seen.add(sha)
+ # The beacon payload is a file; group its hash + config as a file
+ # object so the detection is tied to the scanned host, not a loose
+ # sha256.
+ fobj = MISPObject("file")
+ sha_attr = fobj.add_attribute("sha256", value=sha)
+ sha_attr.add_tag(_CS_TAG) # tags attach to attributes, not objects
+ if block.get("md5"):
+ fobj.add_attribute("md5", value=block["md5"])
+ if block.get("size"):
+ fobj.add_attribute("size-in-bytes", value=block["size"])
+ cfg = block.get("config") or {}
+ fobj.add_attribute(
+ "text",
+ value=(
+ f"Cobalt Strike beacon ({arch}) on {target}; "
+ f"config: {json.dumps(cfg)[:400]}"
+ ),
+ )
+ fobj.comment = "RST CS Beacon"
+ if anchor:
+ fobj.add_reference(anchor, "characterizes")
+ event.add_object(fobj)
+ return standard_results(event)
+
+
+if __name__ == "__main__":
+ print(json.dumps(version(), indent=2))
diff --git a/misp_modules/modules/expansion/rst_favicon.py b/misp_modules/modules/expansion/rst_favicon.py
new file mode 100644
index 00000000..cfcd8b80
--- /dev/null
+++ b/misp_modules/modules/expansion/rst_favicon.py
@@ -0,0 +1,169 @@
+"""rst_favicon — favicon hashes as a file object (GET /scan/favicon)."""
+
+from __future__ import annotations
+
+import base64
+import json
+from io import BytesIO
+
+import rstapi
+
+from ._rstcloud.client import (
+ error,
+ host_only,
+ misp_event_with_source,
+ rst_kwargs,
+ scan_group,
+ scan_kwargs,
+ standard_results,
+ text_result,
+ unwrap,
+ value_from_request,
+)
+
+misperrors = {"error": "Error"}
+
+_INPUTS = [
+ "url",
+ "domain",
+ "hostname",
+ "ip-src",
+ "ip-dst",
+ "ip-src|port",
+ "ip-dst|port",
+ "hostname|port",
+ "domain|port",
+]
+# misp_standard: file object (md5/sha1/sha256 pivotable in Netlas/Censys)
+# plus a standalone favicon_hash attribute (Murmur3/MMH3, Shodan/FOFA).
+mispattributes = {"input": _INPUTS, "format": "misp_standard"}
+
+moduleinfo = {
+ "version": "0.3",
+ "author": "RST Cloud",
+ "description": (
+ "Fetch a target's favicon (image + all hashes for"
+ " Shodan/Netlas/Censys pivoting) via RST Scan API."
+ ),
+ "module-type": ["expansion"],
+ "name": "RST Cloud Favicon",
+ "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."],
+ "features": (
+ "Retrieves the favicon image and cryptographic hashes via RST Scan"
+ " GET /scan/favicon. Returns a file MISP object with"
+ " MD5/SHA-1/SHA-256 for Censys/Netlas pivoting and a standalone"
+ " Murmur3 favicon-hash attribute for Shodan/FOFA-style pivoting."
+ ),
+ "references": [
+ "https://api.rstcloud.net/",
+ "https://pypi.org/project/rstapi/",
+ ],
+ "input": "URL, domain, hostname, or IP attribute.",
+ "output": (
+ "file MISP object, favicon-hash attribute, and resolved favicon URL."
+ ),
+}
+moduleconfig = ["api_key", "base_url", "timeout"]
+
+
+def introspection():
+ return mispattributes
+
+
+def version():
+ moduleinfo["config"] = moduleconfig
+ return moduleinfo
+
+
+def handler(q=False):
+ if q is False:
+ return False
+ request = json.loads(q)
+ config = request.get("config")
+ if not rst_kwargs(config)["APIKEY"]:
+ return error(
+ "An RST Cloud API key is required (set api_key in the module"
+ " config)."
+ )
+ # Favicon endpoint expects a bare host or a full URL — never host:port.
+ # The API fetches the page over HTTP/HTTPS itself; adding ":443" breaks it.
+ raw = value_from_request(request, _INPUTS)
+ if not raw:
+ return error("No target found in the request.")
+ target = raw if raw.startswith(("http://", "https://")) else host_only(raw)
+
+ data, err = unwrap(
+ rstapi.scan(**scan_kwargs(config)).GetFavicon(
+ target, include_base64=True
+ )
+ )
+ if err:
+ return error(f"RST favicon scan failed: {err}")
+ if not isinstance(data, dict) or not data.get("favicon_hash"):
+ return text_result(f"{target}: no favicon returned", "RST Favicon")
+
+ from pymisp import MISPObject
+
+ fhash = str(data["favicon_hash"])
+ req_loc = data.get("req_location") or ""
+ content_type = data.get("req_content_type") or "image/x-icon"
+
+ # Real filename from the resolved favicon URL (e.g. "drive_2026_32dp.ico")
+ raw_fname = (
+ req_loc.rstrip("/").split("/")[-1].split("?")[0] if req_loc else ""
+ )
+ fname = raw_fname if (raw_fname and "." in raw_fname) else "favicon.ico"
+
+ event, source = misp_event_with_source(request)
+ anchor = scan_group(request, source)
+
+ # file object: standard hashes (md5/sha1/sha256) are auto-correlated and
+ # indexed for pivoting in Netlas, Censys, and other threat-intel platforms.
+ fobj = MISPObject("file")
+ fobj.add_attribute("filename", value=fname)
+ fobj.add_attribute("mimetype", value=content_type)
+ for htype in ("md5", "sha1", "sha256"):
+ if data.get(htype):
+ fobj.add_attribute(htype, value=data[htype])
+
+ # Attach the raw image when the API returned base64
+ try:
+ raw = (
+ base64.b64decode(data["base64_image"])
+ if data.get("base64_image")
+ else None
+ )
+ except Exception:
+ raw = None
+ if raw:
+ fobj.add_attribute("attachment", value=fname, data=BytesIO(raw))
+
+ fobj.comment = "RST Favicon"
+ if anchor:
+ fobj.add_reference(anchor, "identifies")
+ event.add_object(fobj)
+
+ # Resolved favicon URL — where the image actually lives after redirects
+ if req_loc:
+ event.add_attribute(
+ "link",
+ req_loc,
+ comment="RST Favicon resolved URL",
+ to_ids=False,
+ )
+
+ # favicon_hash (Murmur3/MMH3): standalone attribute for independent
+ # correlation across events and Shodan/FOFA-style hunting workflows.
+ fav_attr = event.add_attribute(
+ "other",
+ fhash,
+ comment=f"Murmur3 favicon hash for {target} (Shodan/FOFA pivot)",
+ to_ids=False,
+ )
+ fav_attr.add_tag(f'rstcloud:favicon:hash="{fhash}"')
+
+ return standard_results(event)
+
+
+if __name__ == "__main__":
+ print(json.dumps(version(), indent=2))
diff --git a/misp_modules/modules/expansion/rst_html.py b/misp_modules/modules/expansion/rst_html.py
new file mode 100644
index 00000000..ab7471d8
--- /dev/null
+++ b/misp_modules/modules/expansion/rst_html.py
@@ -0,0 +1,158 @@
+"""rst_html — fetch HTML/JS as attachment (GET /scan/html/body[/js]).
+
+Target format: host:port (e.g. drive.google.com:443). Full URLs pass through
+unchanged.
+"""
+
+from __future__ import annotations
+
+import json
+from io import BytesIO
+
+import rstapi
+
+from ._rstcloud.client import (
+ error,
+ misp_event_with_source,
+ rst_kwargs,
+ scan_group,
+ scan_kwargs,
+ scan_target,
+ standard_results,
+ text_result,
+ unwrap,
+)
+
+misperrors = {"error": "Error"}
+
+_INPUTS = [
+ "url",
+ "domain",
+ "hostname",
+ "ip-src",
+ "ip-dst",
+ "ip-src|port",
+ "ip-dst|port",
+ "hostname|port",
+ "domain|port",
+]
+# misp_standard: return the fetched body as a downloadable attachment.
+mispattributes = {"input": _INPUTS, "format": "misp_standard"}
+
+moduleinfo = {
+ "version": "0.2",
+ "author": "RST Cloud",
+ "description": (
+ "Fetch rendered HTML body or extracted JavaScript for a URL/IP"
+ " target via RST Scan API."
+ ),
+ "module-type": ["expansion"],
+ "name": "RST Cloud HTML Fetcher",
+ "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."],
+ "features": (
+ "Fetches the rendered HTML body or extracted JavaScript from the"
+ " target via RST Scan. Returns a file MISP object with the page"
+ " attached and pivotable content hashes. Configurable mode: body"
+ " (default) or js."
+ ),
+ "references": [
+ "https://api.rstcloud.net/",
+ "https://pypi.org/project/rstapi/",
+ ],
+ "input": (
+ "URL, domain, hostname, or IP attribute (optional port via config)."
+ ),
+ "output": (
+ "file MISP object (page.html or page.js) with hashes and HTTP"
+ " metadata."
+ ),
+}
+# 'mode' = body | js (default body). 'port' (optional): override default
+# port 443.
+moduleconfig = ["api_key", "base_url", "mode", "port", "timeout"]
+
+
+def introspection():
+ return mispattributes
+
+
+def version():
+ moduleinfo["config"] = moduleconfig
+ return moduleinfo
+
+
+def handler(q=False):
+ if q is False:
+ return False
+ request = json.loads(q)
+ config = request.get("config") or {}
+ if not rst_kwargs(config)["APIKEY"]:
+ return error(
+ "An RST Cloud API key is required (set api_key in the module"
+ " config)."
+ )
+ target = scan_target(request, _INPUTS, config, default_port=443)
+ if not target:
+ return error("No target found in the request.")
+
+ is_js = (config.get("mode") or "body").lower() == "js"
+ client = rstapi.scan(**scan_kwargs(config))
+ method = client.GetHtmlBodyJs if is_js else client.GetHtmlBody
+ data, err = unwrap(method(target))
+ if err:
+ return error(f"RST HTML fetch failed: {err}")
+
+ body = (
+ data.get("body")
+ if isinstance(data, dict)
+ else (data if isinstance(data, str) else "")
+ )
+ if not body:
+ return text_result(f"{target}: empty response", "RST HTML Fetcher")
+
+ from pymisp import MISPObject
+
+ meta = data if isinstance(data, dict) else {}
+ hashes = meta.get("hashes") or {}
+ event, source = misp_event_with_source(request)
+ anchor = scan_group(request, source)
+
+ # The fetched body IS a file: group it (attachment + pivotable hashes +
+ # response metadata) in a `file` object rather than a lone size string.
+ filename = "page.js" if is_js else "page.html"
+ label = "extracted JavaScript" if is_js else "HTML body"
+ fobj = MISPObject("file")
+ fobj.add_attribute(
+ "attachment",
+ value=filename,
+ data=BytesIO(body.encode("utf-8", "replace")),
+ to_ids=False,
+ )
+ fobj.add_attribute("filename", value=filename)
+ fobj.add_attribute(
+ "mimetype",
+ value="application/javascript" if is_js else "text/html",
+ )
+ fobj.add_attribute(
+ "size-in-bytes", value=meta.get("content_length") or len(body)
+ )
+ for htype in ("md5", "sha1", "sha256"):
+ if hashes.get(htype):
+ fobj.add_attribute(htype, value=hashes[htype])
+ info = [f"RST {label} for {target}"]
+ if meta.get("http_status"):
+ info.append(f"HTTP {meta['http_status']}")
+ if meta.get("title"):
+ info.append(f"title: {meta['title']}")
+ if meta.get("truncated"):
+ info.append("(body truncated)")
+ fobj.add_attribute("text", value="; ".join(info))
+ fobj.comment = "RST HTML Fetcher"
+ if anchor:
+ fobj.add_reference(anchor, "derived-from")
+ event.add_object(fobj)
+ return standard_results(event)
+
+
+if __name__ == "__main__":
+ print(json.dumps(version(), indent=2))
diff --git a/misp_modules/modules/expansion/rst_ioc.py b/misp_modules/modules/expansion/rst_ioc.py
new file mode 100644
index 00000000..73e452c2
--- /dev/null
+++ b/misp_modules/modules/expansion/rst_ioc.py
@@ -0,0 +1,449 @@
+"""rst_ioc — enrich an indicator with RST threat intel (GET /ioc)."""
+
+from __future__ import annotations
+
+import json
+from datetime import datetime, timezone
+
+import rstapi
+
+from ._rstcloud.client import (
+ apply_to_source_attribute,
+ error,
+ host_only,
+ misp_event_with_source,
+ new_enrichment_object,
+ rst_kwargs,
+ rst_resolver_from_config,
+ scan_group,
+ standard_results,
+ text_result,
+ threat_tags,
+ unwrap,
+ value_from_request,
+)
+
+misperrors = {"error": "Error"}
+
+_INPUTS = [
+ "ip-src",
+ "ip-dst",
+ "domain",
+ "hostname",
+ "url",
+ "md5",
+ "sha1",
+ "sha256",
+ "ip-src|port",
+ "ip-dst|port",
+ "hostname|port",
+ "domain|port",
+]
+mispattributes = {"input": _INPUTS, "format": "misp_standard"}
+
+moduleinfo = {
+ "version": "0.4",
+ "author": "RST Cloud",
+ "description": (
+ "Enrich indicators with RST Cloud threat intelligence. Returns an"
+ " rst-ioc object (score, attribution, geo/ASN for IPs, DNS/WHOIS"
+ " for domains, parsed components for URLs, related hashes for file"
+ " hashes) linked back to the enriched attribute."
+ ),
+ "module-type": ["expansion", "hover"],
+ "name": "RST Cloud IoC Lookup",
+ "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."],
+ "features": (
+ "Queries RST Cloud GET /ioc for threat scores, attribution,"
+ " geo/ASN, DNS, WHOIS, TTPs, CVEs, and related indicators. Returns"
+ " a structured rst-ioc MISP object with galaxy tags and optional"
+ " pivotable related hashes/IPs. When misp_url and misp_key are"
+ " configured, also writes score/threat tags onto the enriched"
+ " attribute via the MISP API."
+ ),
+ "references": [
+ "https://api.rstcloud.net/",
+ "https://pypi.org/project/rstapi/",
+ ],
+ "input": (
+ "IP, domain, hostname, URL, or hash attribute (incl. host|port"
+ " composites)."
+ ),
+ "output": (
+ "rst-ioc MISP object, galaxy/score tags, and optional related"
+ " attributes."
+ ),
+}
+# misp_url/misp_key (optional): when set, tags + score note are also written
+# directly onto the enriched attribute via the MISP API (like rst_noise).
+moduleconfig = [
+ "api_key",
+ "base_url",
+ "misp_url",
+ "misp_key",
+ "misp_verifycert",
+]
+
+_HASH_TYPES = {"md5", "sha1", "sha256"}
+
+
+def introspection():
+ return mispattributes
+
+
+def version():
+ moduleinfo["config"] = moduleconfig
+ return moduleinfo
+
+
+def _ts(val) -> str:
+ """Unix timestamp string/int -> YYYY-MM-DD (UTC), or empty string."""
+ try:
+ return datetime.fromtimestamp(int(val), tz=timezone.utc).strftime(
+ "%Y-%m-%d"
+ )
+ except (TypeError, ValueError, OSError):
+ return ""
+
+
+def _f(val, precision=1) -> str:
+ try:
+ return f"{float(val):.{precision}f}"
+ except (TypeError, ValueError):
+ return ""
+
+
+def _known(v) -> bool:
+ return bool(v) and str(v).strip().lower() not in (
+ "",
+ "none",
+ "null",
+ "n/a",
+ )
+
+
+def handler(q=False):
+ if q is False:
+ return False
+ request = json.loads(q)
+ config = request.get("config")
+ if not rst_kwargs(config)["APIKEY"]:
+ return error(
+ "An RST Cloud API key is required (set api_key in the module"
+ " config)."
+ )
+ value = host_only(value_from_request(request, _INPUTS))
+ if not value:
+ return error("No supported indicator value found in the request.")
+
+ # /ioc always returns HTTP 200; a miss carries an "error" key and no "id".
+ data, err = unwrap(
+ rstapi.ioclookup(**rst_kwargs(config)).GetIndicator(value)
+ )
+ if err:
+ return error(f"RST Cloud lookup failed: {err}")
+ if not isinstance(data, dict) or data.get("error") or not data.get("id"):
+ return text_result(
+ f"{value}: not found in RST Cloud", "RST IoC Lookup"
+ )
+
+ ioc_type = (data.get("ioc_type") or "").lower()
+ is_ip = ioc_type in ("ipv4", "ipv6")
+ is_domain = ioc_type == "domain"
+ is_url = ioc_type == "url"
+ is_hash = ioc_type in _HASH_TYPES
+
+ score_block = data.get("score") or {}
+ total = score_block.get("total")
+ try:
+ total_int = int(float(str(total)))
+ except (TypeError, ValueError):
+ total_int = None
+ conf_sub = _f(score_block.get("tags"), 2) # context sub-score
+ relev_sub = _f(score_block.get("frequency"), 2) # relevance sub-score
+
+ threats = data.get("threat") or []
+ tags_str = (data.get("tags") or {}).get("str") or []
+ ttp = data.get("ttp") or []
+ cve = data.get("cve") or []
+ industry = data.get("industry") or []
+ fp = data.get("fp") or {}
+ fp_alarm = str(fp.get("alarm") or "").strip().lower()
+ fp_flagged = fp_alarm in ("true", "possible")
+ geo = data.get("geo") or {}
+ asn_blk = data.get("asn") or {}
+ src_blk = data.get("src") or {}
+ resolved = data.get("resolved") or {}
+ parsed = data.get("parsed") or {}
+ fseen = _ts(data.get("fseen"))
+ lseen = _ts(data.get("lseen"))
+
+ # -------------------------------------------------------------------------
+ # Derive the type-specific context strings once; reused for both the typed
+ # rst-ioc object and the annotation fallback text.
+ # -------------------------------------------------------------------------
+ geo_str = asn_str = whois_str = http_status = ""
+ dns_records: list[str] = [] # ["A: 1.2.3.4", "CNAME: ..."]
+ resolved_ips: list[str] = [] # pivotable A-record IPs (domains)
+ url_parts: list[str] = []
+ filenames = [f for f in data.get("filename") or [] if _known(f)]
+
+ if is_ip:
+ if geo.get("country"):
+ parts = [geo["country"]]
+ if geo.get("region") and geo["region"] != geo["country"]:
+ parts.append(geo["region"])
+ if geo.get("city") and geo["city"] not in parts:
+ parts.append(geo["city"])
+ geo_str = ", ".join(parts)
+ if asn_blk.get("num"):
+ asn_str = f"AS{asn_blk['num']}"
+ if asn_blk.get("isp"):
+ asn_str += f" {asn_blk['isp']}"
+ if asn_blk.get("org") and asn_blk["org"] != asn_blk.get("isp"):
+ asn_str += f" / {asn_blk['org']}"
+
+ if is_domain:
+ res_ip = resolved.get("ip") or {}
+ a_records = [r for r in res_ip.get("a") or [] if _known(r)]
+ cnames = [r for r in res_ip.get("cname") or [] if _known(r)]
+ aliases = [r for r in res_ip.get("alias") or [] if _known(r)]
+ resolved_ips = a_records
+ if a_records:
+ dns_records.append("A: " + ", ".join(a_records))
+ if cnames:
+ dns_records.append("CNAME: " + ", ".join(cnames))
+ if aliases:
+ dns_records.append("alias: " + ", ".join(aliases))
+
+ whois = resolved.get("whois") or {}
+ if whois.get("havedata") == "true" or whois.get("registrar"):
+ w_parts = []
+ if _known(whois.get("registrar")):
+ w_parts.append(f"registrar: {whois['registrar']}")
+ if _known(whois.get("registrant")):
+ w_parts.append(f"registrant: {whois['registrant']}")
+ if _known(whois.get("created")):
+ w_parts.append(f"created: {whois['created'][:10]}")
+ if _known(whois.get("expires")):
+ w_parts.append(f"expires: {whois['expires'][:10]}")
+ if _known(whois.get("updated")):
+ w_parts.append(f"updated: {whois['updated'][:10]}")
+ if _known(whois.get("age")):
+ w_parts.append(f"age: {whois['age']} days")
+ whois_str = ", ".join(w_parts)
+
+ if is_url:
+ if _known(parsed.get("domain")):
+ url_parts.append(f"domain: {parsed['domain']}")
+ if _known(parsed.get("path")) and parsed.get("path") not in (
+ "/",
+ "None",
+ "none",
+ ):
+ url_parts.append(f"path: {parsed['path']}")
+ if _known(parsed.get("port")) and parsed.get("port") != "None":
+ url_parts.append(f"port: {parsed['port']}")
+ if _known(resolved.get("status")):
+ http_status = str(resolved["status"])
+
+ # Source report URLs (deduped, order preserved).
+ ref_urls: list[str] = []
+ seen_refs: set[str] = set()
+ for report_url in (src_blk.get("report") or "").split(","):
+ report_url = report_url.strip()
+ if report_url and report_url not in seen_refs:
+ ref_urls.append(report_url)
+ seen_refs.add(report_url)
+ src_names = src_blk.get("name") or []
+
+ # -------------------------------------------------------------------------
+ # Annotation fallback text (also a useful human summary).
+ # -------------------------------------------------------------------------
+ lines: list[str] = []
+ score_parts = []
+ if total_int is not None:
+ score_parts.append(f"total: {total_int}/100")
+ if _f(score_block.get("src")):
+ score_parts.append(f"src: {_f(score_block['src'])}")
+ if conf_sub:
+ score_parts.append(f"context: {conf_sub}")
+ if relev_sub:
+ score_parts.append(f"relevance: {relev_sub}")
+ if score_parts:
+ lines.append("Score: " + ", ".join(score_parts))
+ if fseen or lseen:
+ lines.append(f"Seen: {fseen or '?'} to {lseen or '?'}")
+ if geo_str:
+ lines.append("Geo: " + geo_str)
+ if asn_str:
+ lines.append("ASN: " + asn_str)
+ if dns_records:
+ lines.append("DNS: " + " | ".join(dns_records))
+ if whois_str:
+ lines.append("WHOIS: " + whois_str)
+ if url_parts:
+ lines.append("URL: " + ", ".join(url_parts))
+ if http_status:
+ lines.append("HTTP status: " + http_status)
+ if is_hash:
+ hash_parts = [
+ f"{h.upper()}: {data[h]}"
+ for h in ("md5", "sha1", "sha256")
+ if _known(data.get(h))
+ ]
+ if hash_parts:
+ lines.append("Hashes: " + ", ".join(hash_parts))
+ if filenames:
+ lines.append("Filenames: " + ", ".join(filenames))
+ if industry:
+ lines.append("Industry: " + ", ".join(industry))
+ if threats:
+ lines.append("Threats: " + ", ".join(threats))
+ if tags_str:
+ lines.append("Tags: " + ", ".join(tags_str))
+ if ttp:
+ lines.append("TTPs: " + ", ".join(ttp))
+ if cve:
+ lines.append("CVEs: " + ", ".join(cve))
+ if fp_flagged:
+ note = f"FP alarm: {fp_alarm}"
+ if fp.get("descr"):
+ note += f" - {fp['descr']}"
+ lines.append(note)
+ if data.get("description"):
+ lines.append("Description: " + data["description"])
+ if src_names:
+ lines.append("Sources: " + ", ".join(src_names))
+
+ # -------------------------------------------------------------------------
+ # Galaxy + score / FP tags
+ # -------------------------------------------------------------------------
+ rst_resolver = rst_resolver_from_config(config)
+ galaxy_tags = threat_tags(threats, rst_resolver)
+ if total_int is not None:
+ galaxy_tags.append(f'rstcloud:score-total="{total_int}"')
+ if fp_flagged:
+ risk = "high" if fp_alarm == "true" else "medium"
+ galaxy_tags.append(f'false-positive:risk="{risk}"')
+
+ # -------------------------------------------------------------------------
+ # Build MISP result
+ # -------------------------------------------------------------------------
+ event, source = misp_event_with_source(request)
+ anchor = scan_group(request, source)
+
+ obj, dedicated = new_enrichment_object("rst-ioc")
+ obj.comment = "RST IoC Lookup"
+
+ if dedicated:
+ tag_target = None
+ if total_int is not None:
+ tag_target = obj.add_attribute(
+ "score-total", value=str(total_int), to_ids=False
+ )
+ if conf_sub:
+ obj.add_attribute("score-confidence", value=conf_sub, to_ids=False)
+ if relev_sub:
+ obj.add_attribute("score-relevance", value=relev_sub, to_ids=False)
+ if fseen:
+ obj.add_attribute("first-seen", value=fseen, to_ids=False)
+ if lseen:
+ obj.add_attribute("last-seen", value=lseen, to_ids=False)
+ for t in threats:
+ a = obj.add_attribute("threat", value=t, to_ids=False)
+ tag_target = tag_target or a
+ for t in ttp:
+ obj.add_attribute("ttp", value=t, to_ids=False)
+ for c in cve:
+ obj.add_attribute("cve", value=c, to_ids=False)
+ for ind in industry:
+ obj.add_attribute("industry", value=ind, to_ids=False)
+ for t in tags_str:
+ obj.add_attribute("tag", value=t, to_ids=False)
+ if fp_flagged:
+ fp_val = fp_alarm + (
+ f" - {fp['descr']}" if fp.get("descr") else ""
+ )
+ obj.add_attribute("false-positive", value=fp_val, to_ids=False)
+ if geo_str:
+ obj.add_attribute("geo", value=geo_str, to_ids=False)
+ if asn_str:
+ obj.add_attribute("asn", value=asn_str, to_ids=False)
+ for rec in dns_records:
+ obj.add_attribute("dns", value=rec, to_ids=False)
+ if whois_str:
+ obj.add_attribute("whois", value=whois_str, to_ids=False)
+ if http_status:
+ obj.add_attribute("http-status", value=http_status, to_ids=False)
+ for fn in filenames:
+ obj.add_attribute("filename", value=fn, to_ids=False)
+ if data.get("description"):
+ obj.add_attribute(
+ "description", value=data["description"], to_ids=False
+ )
+ for ref in ref_urls:
+ obj.add_attribute("ref", value=ref, to_ids=False)
+ # Fall back to a text attribute as the tag anchor if nothing else
+ # exists.
+ tag_target = tag_target or obj.add_attribute(
+ "description", value="\n".join(lines), to_ids=False
+ )
+ else:
+ obj.add_attribute("type", value="RST IoC Lookup", to_ids=False)
+ tag_target = obj.add_attribute(
+ "text", value="\n".join(lines), to_ids=False
+ )
+ if fseen:
+ obj.add_attribute("creation-date", value=fseen, to_ids=False)
+ for ref in ref_urls:
+ obj.add_attribute("ref", value=ref, to_ids=False)
+
+ for tag in galaxy_tags:
+ tag_target.add_tag(tag)
+ if anchor:
+ obj.add_reference(anchor, "characterizes")
+ event.add_object(obj)
+
+ # Pivotable hashes: expose related hash values as searchable IOC
+ # attributes (separate from the object) so they correlate across events.
+ if is_hash:
+ for htype in ("md5", "sha1", "sha256"):
+ hval = data.get(htype)
+ if _known(hval) and hval != value:
+ a = event.add_attribute(
+ htype,
+ value=hval,
+ to_ids=True,
+ comment="RST IoC Lookup - related hash",
+ )
+ for tag in galaxy_tags:
+ a.add_tag(tag)
+
+ # Pivotable resolved IPs for domains (context, not detection-worthy).
+ for rip in resolved_ips:
+ event.add_attribute(
+ "ip-dst",
+ value=rip,
+ to_ids=False,
+ comment="RST IoC Lookup - resolved IP",
+ )
+
+ # Optional write-back: apply tags + brief note onto the enriched attribute
+ # directly via the MISP API when misp_url/misp_key are configured.
+ apply_to_source_attribute(
+ config,
+ request,
+ tags=galaxy_tags,
+ comment_note=(
+ f"RST score {total_int}/100"
+ + (f"; threats: {', '.join(threats)}" if threats else "")
+ ),
+ comment_prefix="RST score",
+ )
+
+ return standard_results(event)
+
+
+if __name__ == "__main__":
+ print(json.dumps(version(), indent=2))
diff --git a/misp_modules/modules/expansion/rst_noise_control.py b/misp_modules/modules/expansion/rst_noise_control.py
new file mode 100644
index 00000000..1668f800
--- /dev/null
+++ b/misp_modules/modules/expansion/rst_noise_control.py
@@ -0,0 +1,253 @@
+"""rst_noise_control — benign/noise check (GET /benign/lookup)."""
+
+from __future__ import annotations
+
+import json
+
+import rstapi
+
+from ._rstcloud.client import (
+ apply_to_source_attribute,
+ error,
+ host_only,
+ misp_event_with_source,
+ new_enrichment_object,
+ rst_kwargs,
+ scan_group,
+ standard_results,
+ text_result,
+ unwrap,
+ value_from_request,
+)
+
+misperrors = {"error": "Error"}
+
+_INPUTS = [
+ "ip-src",
+ "ip-dst",
+ "domain",
+ "hostname",
+ "url",
+ "md5",
+ "sha1",
+ "sha256",
+ "ip-src|port",
+ "ip-dst|port",
+ "hostname|port",
+ "domain|port",
+]
+mispattributes = {"input": _INPUTS, "format": "misp_standard"}
+
+moduleinfo = {
+ "version": "0.4",
+ "author": "RST Cloud",
+ "description": (
+ "Check whether a value (IP, domain, URL or hash) is known-good /"
+ " noise via RST Noise Control. Returns an rst-noise object"
+ " (verdict, category) linked back to the enriched attribute."
+ ),
+ "module-type": ["expansion", "hover"],
+ "name": "RST Cloud Noise Control",
+ "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."],
+ "features": (
+ "Queries RST Cloud GET /benign/lookup for benign/noisy verdicts."
+ " Returns an rst-noise MISP object with false-positive risk tags."
+ " When misp_url and misp_key are configured, also annotates the"
+ " source attribute in place (tags, comment, to_ids, false-positive"
+ " sightings)."
+ ),
+ "references": [
+ "https://api.rstcloud.net/",
+ "https://pypi.org/project/rstapi/",
+ ],
+ "input": (
+ "IP, domain, hostname, URL, or hash attribute (incl. host|port"
+ " composites)."
+ ),
+ "output": (
+ "rst-noise MISP object with verdict, category, and risk/noise tags."
+ ),
+}
+# misp_url/misp_key/misp_verifycert (optional): when set the verdict is ALSO
+# written directly onto the enriched attribute (tags, comment, to_ids, FP
+# sightings) via the MISP API — the annotation object is always returned
+# regardless.
+moduleconfig = [
+ "api_key",
+ "base_url",
+ "misp_url",
+ "misp_key",
+ "misp_verifycert",
+]
+
+# Tag families we own — stripped before re-adding so re-runs replace not stack.
+_TAG_PREFIXES = (
+ "false-positive:risk=",
+ "rstcloud:noise-control=",
+ "rstcloud:noise-category=",
+)
+
+
+def _category(reason: str) -> str:
+ """'Change Score Shodan/Scanners/Shodan' -> 'Shodan/Scanners/Shodan'."""
+ for action in ("Change Score ", "Drop "):
+ if reason.startswith(action):
+ n = len(action)
+ return reason[n:].strip()
+ return reason.strip()
+
+
+def _category_tag(category: str) -> str:
+ """First ``/``-delimited segment for ``rstcloud:noise-category``.
+
+ Lower cardinality than the full category path. Full category path stays in
+ the object/comment text; the tag uses only the top-level bucket before
+ the first ``/``.
+
+ Example (md5, ``Drop Ubuntu Server 26.04 LTS/pam_sepermit.so/``)::
+
+ Verdict: BENIGN - known-good
+ Category: Ubuntu Server 26.04 LTS/pam_sepermit.so/
+ Type: md5
+ rstcloud:noise-category="Ubuntu Server 26.04 LTS"
+ """
+ category = (category or "").strip()
+ if not category:
+ return category
+ return category.split("/", 1)[0].strip()
+
+
+def introspection():
+ return mispattributes
+
+
+def version():
+ moduleinfo["config"] = moduleconfig
+ return moduleinfo
+
+
+def handler(q=False):
+ if q is False:
+ return False
+ request = json.loads(q)
+ config = request.get("config")
+ if not rst_kwargs(config)["APIKEY"]:
+ return error(
+ "An RST Cloud API key is required (set api_key in the module"
+ " config)."
+ )
+ value = host_only(value_from_request(request, _INPUTS))
+ if not value:
+ return error("No supported value found in the request.")
+
+ # /benign/lookup always returns HTTP 200 with
+ # {value, type, benign, reason}.
+ # `benign` is the STRING "true"/"false". The `reason` prefix encodes
+ # action:
+ # "Drop ..." -> known-good, safe to suppress (FP risk high)
+ # "Change Score ..." -> noisy infra (scanners/CDN…), reduce score only
+ # (FP risk medium — do NOT treat as clean)
+ # benign=="false" -> unknown / not in database.
+ #
+ # Example benign md5 (reason "Drop Ubuntu Server 26.04 LTS/.../"):
+ # object/comment Category: Ubuntu Server 26.04 LTS/pam_sepermit.so/
+ # tag rstcloud:noise-category="Ubuntu Server 26.04 LTS"
+ # (+ false-positive:risk="high", rstcloud:noise-control="drop")
+ data, err = unwrap(
+ rstapi.noisecontrol(**rst_kwargs(config)).ValueLookup(value)
+ )
+ if err:
+ return error(f"RST Noise Control lookup failed: {err}")
+ if not isinstance(data, dict):
+ return text_result(
+ f"{value}: unexpected response from RST Noise Control",
+ "RST Noise Control",
+ )
+
+ benign = str(data.get("benign", "")).strip().lower() == "true"
+ reason = (data.get("reason") or "").strip()
+ ioc_type = (data.get("type") or "").strip()
+ category = _category(reason)
+ tag_category = _category_tag(category)
+
+ # --- Determine verdict, tags, and write-back actions ---
+ if not benign:
+ verdict = "Not flagged"
+ # "Not Found in our database" is the API constant for unknown —
+ # not a category.
+ detail = ""
+ tags = []
+ fp_sightings = 0
+ set_to_ids = None
+ elif reason.lower().startswith("change score"):
+ verdict = "NOISY - reduce score"
+ detail = category
+ tags = [
+ 'false-positive:risk="medium"',
+ 'rstcloud:noise-control="change-score"',
+ f'rstcloud:noise-category="{tag_category}"',
+ ]
+ fp_sightings = 1
+ set_to_ids = None
+ else:
+ verdict = "BENIGN - known-good"
+ detail = category
+ tags = [
+ 'false-positive:risk="high"',
+ 'rstcloud:noise-control="drop"',
+ f'rstcloud:noise-category="{tag_category}"',
+ ]
+ fp_sightings = 2
+ set_to_ids = False
+
+ # --- Build annotation fallback text ---
+ lines = [f"Verdict: {verdict}"]
+ if detail:
+ lines.append(f"Category: {detail}")
+ if ioc_type:
+ lines.append(f"Type: {ioc_type}")
+
+ # --- Build MISP result ---
+ event, source = misp_event_with_source(request)
+ anchor = scan_group(request, source)
+
+ obj, dedicated = new_enrichment_object("rst-noise")
+ obj.comment = "RST Noise Control"
+ if dedicated:
+ tag_target = obj.add_attribute("verdict", value=verdict, to_ids=False)
+ if detail:
+ obj.add_attribute("category", value=detail, to_ids=False)
+ if ioc_type:
+ obj.add_attribute("ioc-type", value=ioc_type, to_ids=False)
+ obj.add_attribute("benign", value=str(benign).lower(), to_ids=False)
+ else:
+ obj.add_attribute("type", value="RST Noise Control", to_ids=False)
+ tag_target = obj.add_attribute(
+ "text", value="\n".join(lines), to_ids=False
+ )
+ for tag in tags:
+ tag_target.add_tag(tag)
+ if anchor:
+ obj.add_reference(anchor, "related-to")
+ event.add_object(obj)
+
+ # Optional write-back: when MISP creds are configured, ALSO annotate the
+ # source attribute in place (tags, comment, to_ids flip, FP sightings).
+ # The annotation object is returned regardless.
+ apply_to_source_attribute(
+ config,
+ request,
+ tags=tags,
+ comment_note=f"RST Noise Control: {verdict}"
+ + (f" - {detail}" if detail else ""),
+ comment_prefix="RST Noise Control:",
+ replace_tag_prefixes=_TAG_PREFIXES,
+ set_to_ids=set_to_ids,
+ fp_sightings=fp_sightings,
+ )
+
+ return standard_results(event)
+
+
+if __name__ == "__main__":
+ print(json.dumps(version(), indent=2))
diff --git a/misp_modules/modules/expansion/rst_screenshot.py b/misp_modules/modules/expansion/rst_screenshot.py
new file mode 100644
index 00000000..9aea9c19
--- /dev/null
+++ b/misp_modules/modules/expansion/rst_screenshot.py
@@ -0,0 +1,144 @@
+"""rst_screenshot — page screenshot as image (GET /scan/html/screenshot/*)."""
+
+from __future__ import annotations
+
+import base64
+import json
+from io import BytesIO
+
+import rstapi
+
+from ._rstcloud.client import (
+ error,
+ misp_event_with_source,
+ rst_kwargs,
+ scan_group,
+ scan_kwargs,
+ scan_target,
+ standard_results,
+ text_result,
+ unwrap,
+)
+
+misperrors = {"error": "Error"}
+
+_INPUTS = [
+ "url",
+ "domain",
+ "hostname",
+ "ip-src",
+ "ip-dst",
+ "ip-src|port",
+ "ip-dst|port",
+ "hostname|port",
+ "domain|port",
+]
+# misp_standard: return an image MISPObject with the PNG attached (rendered
+# inline in MISP) instead of a text description.
+mispattributes = {"input": _INPUTS, "format": "misp_standard"}
+
+moduleinfo = {
+ "version": "0.2",
+ "author": "RST Cloud",
+ "description": (
+ "Capture a page screenshot (first/full/last frame) of a URL/IP"
+ " target via RST Scan API."
+ ),
+ "module-type": ["expansion"],
+ "name": "RST Cloud Screenshot",
+ "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."],
+ "features": (
+ "Renders the target page and returns a PNG screenshot as an image"
+ " MISP object (inline in MISP). Configurable frame: first, full"
+ " (default), or last."
+ ),
+ "references": [
+ "https://api.rstcloud.net/",
+ "https://pypi.org/project/rstapi/",
+ ],
+ "input": (
+ "URL, domain, hostname, or IP attribute (optional port via config)."
+ ),
+ "output": (
+ "image MISP object with PNG attachment linked to the enriched"
+ " attribute."
+ ),
+}
+# 'frame' selects which screenshot endpoint to call (first/full/last,
+# default full).
+# 'port' (optional): override default port 443.
+moduleconfig = ["api_key", "base_url", "frame", "port", "timeout"]
+
+_FRAMES = {
+ "first": "GetHtmlScreenshotFirst",
+ "full": "GetHtmlScreenshotFull",
+ "last": "GetHtmlScreenshotLast",
+}
+
+
+def introspection():
+ return mispattributes
+
+
+def version():
+ moduleinfo["config"] = moduleconfig
+ return moduleinfo
+
+
+def handler(q=False):
+ if q is False:
+ return False
+ request = json.loads(q)
+ config = request.get("config") or {}
+ if not rst_kwargs(config)["APIKEY"]:
+ return error(
+ "An RST Cloud API key is required (set api_key in the module"
+ " config)."
+ )
+ target = scan_target(request, _INPUTS, config, default_port=443)
+ if not target:
+ return error("No target found in the request.")
+
+ method = _FRAMES.get(
+ (config.get("frame") or "full").lower(), "GetHtmlScreenshotFull"
+ )
+ data, err = unwrap(
+ getattr(rstapi.scan(**scan_kwargs(config)), method)(target)
+ )
+ if err:
+ return error(f"RST screenshot failed: {err}")
+
+ b64 = data.get("image_base64") if isinstance(data, dict) else None
+ try:
+ raw = base64.b64decode(b64) if b64 else None
+ except Exception:
+ raw = None
+ if not raw:
+ return text_result(
+ f"{target}: no screenshot returned ({method})",
+ "RST Screenshot",
+ )
+
+ from pymisp import MISPObject
+
+ event, source = misp_event_with_source(request)
+ anchor = scan_group(request, source)
+ image = MISPObject("image")
+ image.add_attribute(
+ "attachment", value="screenshot.png", data=BytesIO(raw)
+ )
+ image.comment = f"RST Screenshot ({method})"
+ event.add_attribute(
+ "link",
+ f"https://{target}" if "://" not in target else target,
+ comment=f"RST Screenshot source ({method})",
+ to_ids=False,
+ )
+ if anchor:
+ image.add_reference(anchor, "screenshot-of")
+ event.add_object(image)
+ return standard_results(event)
+
+
+if __name__ == "__main__":
+ print(json.dumps(version(), indent=2))
diff --git a/misp_modules/modules/expansion/rst_ssl.py b/misp_modules/modules/expansion/rst_ssl.py
new file mode 100644
index 00000000..51c944ad
--- /dev/null
+++ b/misp_modules/modules/expansion/rst_ssl.py
@@ -0,0 +1,134 @@
+"""rst_ssl — SSL certificate as x509 object (GET /scan/ssl/certificate)."""
+
+from __future__ import annotations
+
+import json
+
+import rstapi
+
+from ._rstcloud.client import (
+ error,
+ misp_event_with_source,
+ rst_kwargs,
+ scan_group,
+ scan_kwargs,
+ scan_target,
+ standard_results,
+ text_result,
+ unwrap,
+)
+
+misperrors = {"error": "Error"}
+
+_INPUTS = [
+ "ip-dst",
+ "ip-src",
+ "hostname",
+ "domain",
+ "ip-dst|port",
+ "ip-src|port",
+ "hostname|port",
+ "domain|port",
+]
+# misp_standard: return a real x509 MISPObject (searchable subject/issuer,
+# pivotable fingerprints) instead of a text blob.
+mispattributes = {"input": _INPUTS, "format": "misp_standard"}
+
+moduleinfo = {
+ "version": "0.2",
+ "author": "RST Cloud",
+ "description": (
+ "Fetch the SSL certificate for an IP[:port] as an x509 object via"
+ " RST Scan API."
+ ),
+ "module-type": ["expansion"],
+ "name": "RST Cloud SSL Certificate",
+ "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."],
+ "features": (
+ "Connects to the target service and retrieves the TLS certificate"
+ " via RST Scan GET /scan/ssl/certificate. Returns an x509 MISP"
+ " object with pivotable fingerprints (SHA-1/256/MD5), subject,"
+ " issuer, and validity dates."
+ ),
+ "references": [
+ "https://api.rstcloud.net/",
+ "https://pypi.org/project/rstapi/",
+ ],
+ "input": (
+ "IP, hostname, or domain attribute (optional port via config or"
+ " composite)."
+ ),
+ "output": "x509 MISP object referencing the enriched attribute.",
+}
+# 'port' (optional): TLS port to scan when the attribute carries none (API
+# defaults to 443 if omitted).
+moduleconfig = ["api_key", "base_url", "port", "timeout"]
+
+# RST certificate field -> x509 object_relation (pymisp infers attribute
+# type from the template, so fingerprints become pivotable x509-fingerprint-*
+# types).
+_X509_MAP = {
+ "subject_dn": "subject",
+ "issuer_dn": "issuer",
+ "serial_number": "serial-number",
+ "version": "version",
+ "not_before": "validity-not-before",
+ "not_after": "validity-not-after",
+ "fingerprint_sha1": "x509-fingerprint-sha1",
+ "fingerprint_sha256": "x509-fingerprint-sha256",
+ "fingerprint_md5": "x509-fingerprint-md5",
+}
+
+
+def introspection():
+ return mispattributes
+
+
+def version():
+ moduleinfo["config"] = moduleconfig
+ return moduleinfo
+
+
+def handler(q=False):
+ if q is False:
+ return False
+ request = json.loads(q)
+ config = request.get("config")
+ if not rst_kwargs(config)["APIKEY"]:
+ return error(
+ "An RST Cloud API key is required (set api_key in the module"
+ " config)."
+ )
+ target = scan_target(request, _INPUTS, config, default_port=443)
+ if not target:
+ return error(
+ "No target found in the request (expects an IP/hostname)."
+ )
+
+ data, err = unwrap(
+ rstapi.scan(**scan_kwargs(config)).GetSslCertificate(target)
+ )
+ if err:
+ return error(f"RST SSL scan failed: {err}")
+ if not isinstance(data, dict) or not data.get("subject_dn"):
+ return text_result(
+ f"{target}: no certificate returned", "RST SSL Certificate"
+ )
+
+ from pymisp import MISPObject
+
+ event, source = misp_event_with_source(request)
+ anchor = scan_group(request, source)
+ x509 = MISPObject("x509")
+ for field, relation in _X509_MAP.items():
+ if data.get(field):
+ x509.add_attribute(relation, value=data[field])
+ x509.comment = f"RST SSL Certificate for {target}"
+ if anchor:
+ x509.add_reference(anchor, "identifies")
+ event.add_object(x509)
+ return standard_results(event)
+
+
+if __name__ == "__main__":
+ print(json.dumps(version(), indent=2))
diff --git a/misp_modules/modules/expansion/rst_whois.py b/misp_modules/modules/expansion/rst_whois.py
new file mode 100644
index 00000000..bac6f396
--- /dev/null
+++ b/misp_modules/modules/expansion/rst_whois.py
@@ -0,0 +1,155 @@
+"""rst_whois — parsed WHOIS as whois object (GET /whois/{domain})."""
+
+from __future__ import annotations
+
+import json
+
+import rstapi
+
+from ._rstcloud.client import (
+ error,
+ misp_event_with_source,
+ rst_kwargs,
+ scan_group,
+ standard_results,
+ text_result,
+ unwrap,
+ value_from_request,
+)
+
+misperrors = {"error": "Error"}
+
+_INPUTS = ["domain", "hostname"]
+mispattributes = {"input": _INPUTS, "format": "misp_standard"}
+
+moduleinfo = {
+ "version": "0.2",
+ "author": "RST Cloud",
+ "description": (
+ "Retrieve parsed WHOIS information for a domain via RST Cloud."
+ ),
+ "module-type": ["expansion", "hover"],
+ "name": "RST Cloud Whois",
+ "requirements": ["An RST Cloud API key.", "rstapi>=1.2.0 (PyPI)."],
+ "features": (
+ "Queries RST Cloud GET /whois for parsed domain registration data."
+ " Returns a standard whois MISP object (registrar, registrant,"
+ " dates, nameservers) linked back to the enriched attribute."
+ ),
+ "references": [
+ "https://api.rstcloud.net/",
+ "https://pypi.org/project/rstapi/",
+ ],
+ "input": "Domain or hostname attribute.",
+ "output": "whois MISP object with registration and nameserver fields.",
+}
+moduleconfig = ["api_key", "base_url"]
+
+
+def introspection():
+ return mispattributes
+
+
+def version():
+ moduleinfo["config"] = moduleconfig
+ return moduleinfo
+
+
+def _known(v) -> bool:
+ """True when a value is present and not a placeholder like 'unknown'."""
+ return bool(v) and str(v).strip().lower() not in (
+ "unknown",
+ "none",
+ "",
+ "null",
+ "n/a",
+ )
+
+
+def handler(q=False):
+ if q is False:
+ return False
+ request = json.loads(q)
+ config = request.get("config")
+ if not rst_kwargs(config)["APIKEY"]:
+ return error(
+ "An RST Cloud API key is required (set api_key in the module"
+ " config)."
+ )
+ domain = value_from_request(request, _INPUTS)
+ if not domain:
+ return error("No domain found in the request.")
+
+ data, err = unwrap(
+ rstapi.whoisapi(**rst_kwargs(config)).GetDomainInfo(domain)
+ )
+ if err:
+ return error(f"RST Whois API lookup failed: {err}")
+ if not isinstance(data, dict):
+ return text_result(f"{domain}: no WHOIS data found", "RST Whois API")
+
+ from pymisp import MISPObject
+
+ event, source = misp_event_with_source(request)
+ anchor = scan_group(request, source)
+
+ obj = MISPObject("whois")
+ obj.comment = f"RST Whois API lookup for {domain}"
+
+ # Identity
+ if _known(data.get("domain")):
+ obj.add_attribute("domain", value=data["domain"], to_ids=False)
+ if _known(data.get("registrar")):
+ obj.add_attribute("registrar", value=data["registrar"], to_ids=False)
+ if _known(data.get("registrant")):
+ obj.add_attribute(
+ "registrant-name", value=data["registrant"], to_ids=False
+ )
+ if _known(data.get("registrant_org")):
+ obj.add_attribute(
+ "registrant-org", value=data["registrant_org"], to_ids=False
+ )
+ if _known(data.get("registrant_email")):
+ obj.add_attribute(
+ "registrant-email",
+ value=data["registrant_email"],
+ to_ids=False,
+ )
+
+ # Dates (API returns "created_on" / "updated_on" / "expires_on")
+ if _known(data.get("created_on")):
+ obj.add_attribute(
+ "creation-date", value=data["created_on"], to_ids=False
+ )
+ if _known(data.get("updated_on")):
+ obj.add_attribute(
+ "modification-date", value=data["updated_on"], to_ids=False
+ )
+ if _known(data.get("expires_on")):
+ obj.add_attribute(
+ "expiration-date", value=data["expires_on"], to_ids=False
+ )
+
+ # Nameservers — one attribute per NS
+ for ns in (data.get("nameservers") or "").split(","):
+ ns = ns.strip()
+ if ns:
+ obj.add_attribute("nameserver", value=ns, to_ids=False)
+
+ # Domain age + status as a free-text note (no dedicated whois relation)
+ notes = []
+ if data.get("age") is not None:
+ notes.append(f"age: {data['age']} days")
+ if _known(data.get("status")):
+ notes.append(f"status: {data['status']}")
+ if notes:
+ obj.add_attribute("text", value="; ".join(notes), to_ids=False)
+
+ if anchor:
+ obj.add_reference(anchor, "related-to")
+ event.add_object(obj)
+ return standard_results(event)
+
+
+if __name__ == "__main__":
+ print(json.dumps(version(), indent=2))
diff --git a/poetry.lock b/poetry.lock
index 00c6c771..482021eb 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -6367,6 +6367,25 @@ files = [
{file = "rpds_py-2026.5.1.tar.gz", hash = "sha256:07b24fea40541e28570e5b795a4a38fbdcd12550c06bd0748005ecc8116ca256"},
]
+[[package]]
+name = "rstapi"
+version = "1.2.0"
+description = "Python library to access the RST Cloud API."
+optional = true
+python-versions = "*"
+groups = ["main"]
+markers = "extra == \"minimal\" or extra == \"all\""
+files = [
+ {file = "rstapi-1.2.0-py3-none-any.whl", hash = "sha256:aab1fbb4e520135a3b280bebadb6511ad805f5aee46a31ed18a28aa3b6ae529c"},
+ {file = "rstapi-1.2.0.tar.gz", hash = "sha256:19c5d98f522b9dbf03f1960aa76c137ce03be6d8c205b0cf9336ec7a8c9f25ff"},
+]
+
+[package.dependencies]
+requests = "*"
+
+[package.extras]
+test = ["pytest (>=7)"]
+
[[package]]
name = "rtfde"
version = "0.1.2.2"
@@ -7890,10 +7909,10 @@ test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more_it
type = ["pytest-mypy (>=1.0.1) ; platform_python_implementation != \"PyPy\""]
[extras]
-all = ["anyrun-sdk", "apiosintds", "assemblyline_client", "backscatter", "blockchain", "censys", "clamd", "crowdstrike-falconpy", "dnsdb2", "domaintools_api", "geoip2", "greynoise", "jbxapi", "lief", "lxml", "maclookup", "matplotlib", "mattermostdriver", "misp-stix", "mwdblib", "ndjson", "numpy", "oauth2", "opencv-python", "openpyxl", "pandas", "pandas_ods_reader", "pandoc", "passivetotal", "pdftotext", "pycountry", "pyeti-python3", "pyeupi", "pygeoip", "pyintel471", "pyipasnhistory", "pymisp", "pypdns", "pypssl", "pysafebrowsing", "pytesseract", "python-docx", "python-pptx", "pyzbar", "requests", "setuptools", "shodan", "sigmatools", "sigmf", "slack-sdk", "socialscan", "sparqlwrapper", "tau-clients", "taxii2-client", "trustar", "urlarchiver", "vt-graph-api", "vt-py", "vulners", "vysion", "wand", "xlrd", "yara-python"]
-minimal = ["backscatter", "blockchain", "censys", "clamd", "crowdstrike-falconpy", "dnsdb2", "domaintools_api", "geoip2", "greynoise", "jbxapi", "maclookup", "matplotlib", "mattermostdriver", "ndjson", "oauth2", "passivetotal", "pycountry", "pyeti-python3", "pyeupi", "pygeoip", "pyintel471", "pyipasnhistory", "pypdns", "pypssl", "pysafebrowsing", "requests", "slack-sdk", "socialscan", "urlarchiver", "vt-graph-api", "vt-py", "vulners", "yara-python"]
+all = ["anyrun-sdk", "apiosintds", "assemblyline_client", "backscatter", "blockchain", "censys", "clamd", "crowdstrike-falconpy", "dnsdb2", "domaintools_api", "geoip2", "greynoise", "jbxapi", "lief", "lxml", "maclookup", "matplotlib", "mattermostdriver", "misp-stix", "mwdblib", "ndjson", "numpy", "oauth2", "opencv-python", "openpyxl", "pandas", "pandas_ods_reader", "pandoc", "passivetotal", "pdftotext", "pycountry", "pyeti-python3", "pyeupi", "pygeoip", "pyintel471", "pyipasnhistory", "pymisp", "pypdns", "pypssl", "pysafebrowsing", "pytesseract", "python-docx", "python-pptx", "pyzbar", "requests", "rstapi", "setuptools", "shodan", "sigmatools", "sigmf", "slack-sdk", "socialscan", "sparqlwrapper", "tau-clients", "taxii2-client", "trustar", "urlarchiver", "vt-graph-api", "vt-py", "vulners", "vysion", "wand", "xlrd", "yara-python"]
+minimal = ["backscatter", "blockchain", "censys", "clamd", "crowdstrike-falconpy", "dnsdb2", "domaintools_api", "geoip2", "greynoise", "jbxapi", "maclookup", "matplotlib", "mattermostdriver", "ndjson", "oauth2", "passivetotal", "pycountry", "pyeti-python3", "pyeupi", "pygeoip", "pyintel471", "pyipasnhistory", "pypdns", "pypssl", "pysafebrowsing", "requests", "rstapi", "slack-sdk", "socialscan", "urlarchiver", "vt-graph-api", "vt-py", "vulners", "yara-python"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<4.0"
-content-hash = "380a4bb70a1fcbc16a4f8092bb827d9bd8f828c7bd249a391e9eb03626b5938c"
+content-hash = "4ccfa27333f21a63bb882b6c1feeebf3deca81a602a910954d72922e98ac65e8"
diff --git a/pyproject.toml b/pyproject.toml
index d94a8467..2cc122cb 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -63,6 +63,7 @@ minimal = [
"pyipasnhistory",
"pysafebrowsing",
"requests[security]",
+ "rstapi>=1.2.0,<2",
"slack-sdk",
"urlarchiver",
"vt-graph-api",
@@ -120,6 +121,7 @@ all = [
"python-pptx",
"pyzbar",
"requests[security]", ## in minimal
+ "rstapi>=1.2.0,<2", ## in minimal
"setuptools",
"shodan",
"sigmatools",
diff --git a/tests/test_rst_ioc.py b/tests/test_rst_ioc.py
new file mode 100644
index 00000000..f5878dd6
--- /dev/null
+++ b/tests/test_rst_ioc.py
@@ -0,0 +1,58 @@
+"""Unit tests for rst_ioc (mocked rstapi, no network)."""
+
+import json
+from unittest.mock import patch
+
+from misp_modules.modules.expansion import rst_ioc
+
+
+class _FakeClient:
+ def __init__(self, payload):
+ self._payload = payload
+
+ def GetIndicator(self, value):
+ return self._payload
+
+
+def _query(attribute, config=None):
+ return json.dumps({
+ "module": "rst_ioc",
+ "attribute": attribute,
+ "config": config if config is not None else {"api_key": "test-key"},
+ })
+
+
+def test_rst_ioc_not_found_returns_text():
+ attribute = {"type": "ip-dst", "value": "8.8.8.8", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
+ with patch.object(rst_ioc.rstapi, "ioclookup", return_value=_FakeClient({"error": "Not Found"})):
+ result = rst_ioc.handler(_query(attribute))
+ assert "Attribute" in result["results"]
+ assert any("not found" in a["value"].lower() for a in result["results"]["Attribute"])
+
+
+def test_rst_ioc_hit_returns_rst_ioc_object_with_score_tag():
+ attribute = {"type": "domain", "value": "evil.example", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
+ payload = {
+ "id": "rst-123",
+ "ioc_type": "domain",
+ "score": {"total": "82", "tags": "0.5", "frequency": "0.3"},
+ "threat": ["akira_ransomware"],
+ "fp": {"alarm": "false"},
+ }
+ with patch.object(rst_ioc.rstapi, "ioclookup", return_value=_FakeClient(payload)):
+ result = rst_ioc.handler(_query(attribute))
+ obj = result["results"]["Object"][0]
+ assert obj["name"] in ("rst-ioc", "annotation")
+ relations = {a.get("object_relation"): a for a in obj["Attribute"]}
+ tag_target = relations.get("score-total") or relations.get("text") or obj["Attribute"][0]
+ tag_names = [t["name"] for t in tag_target.get("Tag", [])]
+ assert 'rstcloud:score-total="82"' in tag_names
+ assert any("akira" in t.lower() for t in tag_names)
+
+
+def test_rst_ioc_missing_api_key():
+ attribute = {"type": "domain", "value": "evil.example", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
+ with patch.object(rst_ioc.rstapi, "ioclookup") as mock_lookup:
+ result = rst_ioc.handler(_query(attribute, config={}))
+ mock_lookup.assert_not_called()
+ assert result == {"error": "An RST Cloud API key is required (set api_key in the module config)."}
diff --git a/tests/test_rst_noise_control.py b/tests/test_rst_noise_control.py
new file mode 100644
index 00000000..60a92698
--- /dev/null
+++ b/tests/test_rst_noise_control.py
@@ -0,0 +1,92 @@
+"""Unit tests for rst_noise_control (mocked rstapi, no network)."""
+
+import json
+from unittest.mock import patch
+
+from misp_modules.modules.expansion import rst_noise_control
+
+
+class _FakeClient:
+ def __init__(self, payload):
+ self._payload = payload
+
+ def ValueLookup(self, value):
+ return self._payload
+
+
+def _query(attribute, config=None):
+ return json.dumps({
+ "module": "rst_noise_control",
+ "attribute": attribute,
+ "config": config if config is not None else {"api_key": "test-key"},
+ })
+
+
+def _verdict_attr(result):
+ obj = result["results"]["Object"][0]
+ for a in obj["Attribute"]:
+ if a.get("object_relation") == "verdict":
+ return a
+ for a in obj["Attribute"]:
+ val = a.get("value") or ""
+ if a.get("object_relation") == "text" or (a.get("type") == "text" and "Verdict:" in val):
+ return a
+ return obj["Attribute"][-1]
+
+
+def _verdict_tags(result):
+ obj = result["results"]["Object"][0]
+ for a in obj["Attribute"]:
+ if a.get("Tag"):
+ return [t["name"] for t in a["Tag"]]
+ return []
+
+
+def test_rst_noise_control_drop_verdict():
+ attribute = {"type": "ip-dst", "value": "8.8.8.8", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
+ payload = {"benign": "true", "reason": "Drop Public DNS/Services/Google", "type": "ipv4"}
+ with patch.object(rst_noise_control.rstapi, "noisecontrol", return_value=_FakeClient(payload)):
+ result = rst_noise_control.handler(_query(attribute))
+ verdict = _verdict_attr(result)
+ assert "BENIGN" in verdict["value"]
+ tags = _verdict_tags(result)
+ assert 'false-positive:risk="high"' in tags
+ assert 'rstcloud:noise-control="drop"' in tags
+ assert 'rstcloud:noise-category="Public DNS"' in tags
+
+
+_UBUNTU_CATEGORY = "Ubuntu Server 26.04 LTS/pam_sepermit.so/"
+_UBUNTU_TAG = "Ubuntu Server 26.04 LTS"
+
+
+def test_rst_noise_control_ubuntu_benign_hash():
+ attribute = {"type": "md5", "value": "abc", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
+ payload = {"benign": "true", "reason": f"Drop {_UBUNTU_CATEGORY}", "type": "md5"}
+ with patch.object(rst_noise_control.rstapi, "noisecontrol", return_value=_FakeClient(payload)):
+ result = rst_noise_control.handler(_query(attribute))
+ verdict = _verdict_attr(result)
+ assert "BENIGN" in verdict["value"]
+ tags = _verdict_tags(result)
+ assert 'false-positive:risk="high"' in tags
+ assert 'rstcloud:noise-control="drop"' in tags
+ assert f'rstcloud:noise-category="{_UBUNTU_TAG}"' in tags
+
+
+def test_rst_noise_control_deep_category_tag():
+ full = "NSRL 2025.03.1_modern/726.LibOVRPlatform64_1.dll/Meta - Oculus Platform SDK"
+ attribute = {"type": "md5", "value": "abc", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
+ payload = {"benign": "true", "reason": f"Drop {full}", "type": "md5"}
+ with patch.object(rst_noise_control.rstapi, "noisecontrol", return_value=_FakeClient(payload)):
+ result = rst_noise_control.handler(_query(attribute))
+ tags = _verdict_tags(result)
+ assert 'rstcloud:noise-category="NSRL 2025.03.1_modern"' in tags
+
+
+def test_rst_noise_control_not_in_database():
+ attribute = {"type": "ip-dst", "value": "1.2.3.4", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
+ payload = {"benign": "false", "reason": "Not Found in our database", "type": "ipv4"}
+ with patch.object(rst_noise_control.rstapi, "noisecontrol", return_value=_FakeClient(payload)):
+ result = rst_noise_control.handler(_query(attribute))
+ verdict = _verdict_attr(result)
+ assert "not flagged" in verdict["value"].lower()
+ assert _verdict_tags(result) == []
diff --git a/tests/test_rst_ssl.py b/tests/test_rst_ssl.py
new file mode 100644
index 00000000..6ce7832a
--- /dev/null
+++ b/tests/test_rst_ssl.py
@@ -0,0 +1,48 @@
+"""Unit tests for rst_ssl (mocked rstapi, no network)."""
+
+import json
+from unittest.mock import patch
+
+from misp_modules.modules.expansion import rst_ssl
+
+
+class _FakeClient:
+ def __init__(self, payload):
+ self._payload = payload
+
+ def GetSslCertificate(self, target):
+ return self._payload
+
+
+def _query(attribute, config=None):
+ return json.dumps({
+ "module": "rst_ssl",
+ "attribute": attribute,
+ "config": config if config is not None else {"api_key": "test-key", "port": "443"},
+ })
+
+
+def test_rst_ssl_returns_x509_object():
+ attribute = {"type": "ip-dst", "value": "93.184.216.34", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
+ payload = {
+ "subject_dn": "CN=example.com",
+ "issuer_dn": "CN=DigiCert",
+ "fingerprint_sha1": "a" * 40,
+ "fingerprint_sha256": "b" * 64,
+ "not_after": "2026-12-21T19:20:01Z",
+ "serial_number": "01",
+ }
+ with patch.object(rst_ssl.rstapi, "scan", return_value=_FakeClient(payload)):
+ result = rst_ssl.handler(_query(attribute))
+ obj = result["results"]["Object"][0]
+ assert obj["name"] == "x509"
+ relations = {a["object_relation"]: a for a in obj["Attribute"]}
+ assert relations["subject"]["value"] == "CN=example.com"
+ assert relations["x509-fingerprint-sha256"]["value"] == "b" * 64
+
+
+def test_rst_ssl_no_certificate():
+ attribute = {"type": "ip-dst", "value": "1.2.3.4", "uuid": "5b582d80-7a7e-4b6a-9f22-77656e72bb3b"}
+ with patch.object(rst_ssl.rstapi, "scan", return_value=_FakeClient({})):
+ result = rst_ssl.handler(_query(attribute))
+ assert any("no certificate" in a["value"].lower() for a in result["results"]["Attribute"])