diff --git a/documentation/logos/socradar.png b/documentation/logos/socradar.png new file mode 100644 index 00000000..d37ecc18 Binary files /dev/null and b/documentation/logos/socradar.png differ diff --git a/misp_modules/modules/expansion/socradar_lookup.py b/misp_modules/modules/expansion/socradar_lookup.py new file mode 100644 index 00000000..0b8282a1 --- /dev/null +++ b/misp_modules/modules/expansion/socradar_lookup.py @@ -0,0 +1,479 @@ +""" +SOCRadar Threat Intelligence Expansion Module for MISP +======================================================= +Enrich MISP attributes (IP, domain, URL, hash) by querying SOCRadar's +IoC Enrichment API. Returns threat context including categorization, +malware families, threat actors, MITRE ATT&CK mappings, confidence +levels, feed source history, and geographic attribution. + +Two enrichment modes: + 1. STIX mode (default) — calls /indicator_details_stix for fast, + structured STIX 2.1 output. Best for automated enrichment. + 2. Full mode — calls /indicator_details for rich JSON output with + categorization, classifications, score, history, and optionally + AI-generated insight (slower due to AI processing). + +Requires a SOCRadar Advanced Threat Intelligence API key. +Get yours at: https://platform.socradar.com → API Management + +Configuration (MISP → Server Settings → Plugin Settings → Enrichment): + - socradar_api_key: SOCRadar API key (required) + - socradar_api_url: API base URL (default: https://platform.socradar.com/api) + - socradar_mode: "stix" (fast) or "full" (detailed). Default: full + - socradar_ai_insight: Include AI insight in full mode (slower). Default: false +""" + +import json +import re + +import requests + +misperrors = {"error": "Error"} + +mispattributes = { + "input": [ + "ip-src", + "ip-dst", + "domain", + "hostname", + "url", + "md5", + "sha1", + "sha256", + "email-src", + "email-dst", + ], + "output": ["text"], + "format": "misp_standard", +} + +moduleinfo = { + "version": "1.0", + "author": "SOCRadar", + "description": ( + "Enrich MISP attributes with SOCRadar threat intelligence. " + "Queries SOCRadar IoC Enrichment API for categorization, " + "malware families, threat actors, confidence scores, " + "feed source history, and geographic attribution." + ), + "module-type": ["expansion", "hover"], + "name": "SOCRadar Threat Intelligence", + "logo": "socradar.png", + "requirements": ["requests"], + "features": ( + "Query SOCRadar IoC Enrichment API to enrich MISP attributes with " + "threat intelligence context. Supports IP, domain, URL, hash, and " + "email lookups. Two modes: STIX (fast) and Full (detailed with " + "categorization, history, and optional AI insight). " + "Requires a SOCRadar Advanced Threat Intelligence API key." + ), + "references": [ + "https://socradar.io", + "https://platform.socradar.com", + ], + "input": ( + "A MISP attribute of type ip-src, ip-dst, domain, hostname, url, md5, sha1, sha256, email-src, or email-dst." + ), + "output": ( + "Enrichment data including categorization, malware families, " + "threat actors, confidence score, SOCRadar threat score, " + "feed source history, and country attribution." + ), +} + +moduleconfig = [ + "socradar_api_key", + "socradar_api_url", + "socradar_mode", + "socradar_ai_insight", +] + +# ═══════════════════════════════════════════════════════════════════════════ +# API base URL +# ═══════════════════════════════════════════════════════════════════════════ + +DEFAULT_API_URL = "https://platform.socradar.com/api" + +# ═══════════════════════════════════════════════════════════════════════════ +# API calls +# ═══════════════════════════════════════════════════════════════════════════ + + +def _api_headers(api_key): + return { + "Content-Type": "application/json", + "API-Key": api_key, + } + + +def _call_indicator_details(api_url, api_key, indicator, include_ai=False): + """Call /ioc_enrichment/get/indicator_details for rich JSON output.""" + url = f"{api_url}/ioc_enrichment/get/indicator_details" + + fields = [ + "indicator_details", + "indicator_history", + "indicator_relations", + ] + if include_ai: + fields.append("indicator_ai_insight") + + payload = { + "indicator": indicator, + "fields": fields, + } + + resp = requests.post( + url, + headers=_api_headers(api_key), + json=payload, + timeout=30 if not include_ai else 60, + ) + resp.raise_for_status() + return resp.json() + + +def _call_indicator_details_stix(api_url, api_key, indicator): + """Call /ioc_enrichment/get/indicator_details_stix for STIX output.""" + url = f"{api_url}/ioc_enrichment/get/indicator_details_stix" + + payload = { + "indicator": indicator, + "show_credit_details": False, + } + + resp = requests.post( + url, + headers=_api_headers(api_key), + json=payload, + timeout=30, + ) + resp.raise_for_status() + return resp.json() + + +# ═══════════════════════════════════════════════════════════════════════════ +# Result formatters +# ═══════════════════════════════════════════════════════════════════════════ + + +def _format_full_result(data, indicator): + """Format /indicator_details JSON response into readable text + tags.""" + lines = [] + tags = [] + + lines.append("══════ SOCRadar Threat Intelligence ══════") + lines.append(f"Indicator: {indicator}") + lines.append("") + + # --- Score & Confidence --- + details = data.get("details", {}) + score = details.get("score") + if score is not None: + lines.append(f"SOCRadar Score: {score}/100") + tags.append(f"socradar:score={score}") + + cross_conf = data.get("cross_source_confidence") + if cross_conf: + lines.append(f"Cross-Source Confidence: {cross_conf}") + conf_map = { + "Very High": 'confidence-level:confidence="completely-confident"', + "High": 'confidence-level:confidence="usually-confident"', + "Medium": 'confidence-level:confidence="fairly-confident"', + "Low": 'confidence-level:confidence="rarely-confident"', + } + if cross_conf in conf_map: + tags.append(conf_map[cross_conf]) + + signal = data.get("ioc_signal_strength") + if signal: + lines.append(f"IoC Signal Strength: {signal}") + tags.append(f"socradar:signal={signal.lower().replace(' ', '-')}") + + lines.append("") + + # --- Categorization --- + cat = data.get("categorization", {}) + active_cats = [k for k, v in cat.items() if v is True] + if active_cats: + lines.append(f"Categorization: {', '.join(active_cats)}") + for c in active_cats: + tags.append(f"socradar:category={c}") + lines.append("") + + # --- Classifications --- + cls = data.get("classifications", {}) + + malwares = cls.get("malwares", []) + if malwares: + lines.append(f"Malware Families: {', '.join(malwares)}") + for mw in malwares: + tags.append(f"malware:{mw.lower()}") + + threat_actors = cls.get("threat_actors", []) + if threat_actors: + lines.append(f"Threat Actors: {', '.join(threat_actors)}") + for ta in threat_actors: + tags.append(f"threat-actor:{ta.lower()}") + + industries = cls.get("industries", []) + if industries: + lines.append(f"Targeted Industries: {', '.join(industries)}") + + campaign = cls.get("campaign") + if campaign: + lines.append(f"Campaign: {campaign}") + + target_countries = cls.get("target_country_list", []) + if target_countries: + lines.append(f"Target Countries: {', '.join(target_countries)}") + + country = cls.get("country") + if country: + lines.append(f"Origin Country: {country}") + tags.append(f"country:{country.lower()}") + + lines.append("") + + # --- Summary (ASN, geo) --- + summary = data.get("summary", {}) + if summary: + geo_parts = [] + if summary.get("city"): + geo_parts.append(summary["city"]) + if summary.get("region"): + geo_parts.append(summary["region"]) + if summary.get("country"): + geo_parts.append(summary["country"]) + if geo_parts: + lines.append(f"Location: {', '.join(geo_parts)}") + + asn = summary.get("asn_name") + asn_code = summary.get("asn_code") + if asn: + lines.append(f"ASN: {asn} (AS{asn_code})" if asn_code else f"ASN: {asn}") + + lines.append("") + + # --- Details (dates) --- + first_seen = details.get("first_seen_date") + last_seen = details.get("last_seen_date") + if first_seen: + lines.append(f"First Seen: {first_seen}") + if last_seen: + lines.append(f"Last Seen: {last_seen}") + + # --- History --- + history = data.get("history", {}) + hist_items = history.get("indicator_history", []) + if hist_items: + lines.append("") + lines.append(f"Feed History ({len(hist_items)} records):") + for h in hist_items[:10]: # Show max 10 + event = h.get("event", "") + source = h.get("feed_source", "") + date = h.get("insert_date", "") + lines.append(f" [{date}] {source}: {event[:80]}") + if source: + tags.append(f"feed-source:{source}") + if len(hist_items) > 10: + lines.append(f" ... and {len(hist_items) - 10} more") + + # --- AI Insight --- + ai = data.get("ai_insight", {}) + insight = ai.get("insight") + if insight: + lines.append("") + lines.append("AI Insight:") + lines.append(f" {insight[:500]}") + if len(insight) > 500: + lines.append(" [truncated]") + + # Dedup tags + tags = list(dict.fromkeys(tags)) + + # Add source tag + tags.insert(0, "source:SOCRadar") + + return "\n".join(lines), tags + + +def _format_stix_result(data, indicator): + """Format /indicator_details_stix STIX bundle into readable text + tags.""" + lines = [] + tags = ["source:SOCRadar"] + + lines.append("══════ SOCRadar Threat Intelligence (STIX) ══════") + lines.append(f"Indicator: {indicator}") + lines.append("") + + objects = data.get("objects", []) + + # Find indicator objects + for obj in objects: + if obj.get("type") != "indicator": + continue + + name = obj.get("name", "") + pattern = obj.get("pattern", "") + labels = obj.get("labels", []) + + if name: + lines.append(f"Name: {name}") + if pattern: + lines.append(f"Pattern: {pattern}") + + if labels: + lines.append(f"Labels: {', '.join(labels)}") + + # Extract MITRE techniques + for lbl in labels: + for tid in re.findall(r"[Tt]\d{4}(?:\.\d{3})?", lbl): + tags.append(f"mitre-attack:{tid.upper()}") + + # Check for extensions + ext = obj.get("extensions", {}).get("extra-info-ext", {}) + score = ext.get("score") + if score is not None: + lines.append(f"SOCRadar Score: {score}") + tags.append(f"socradar:score={score}") + + ext_tags = ext.get("tags", []) + mitre_tags = [t for t in ext_tags if t.get("type") == "MITRE_ATTCK"] + if mitre_tags: + lines.append("") + lines.append("MITRE ATT&CK:") + for mt in mitre_tags: + tid = mt.get("tag", "").upper() + desc = mt.get("description", "") + lines.append(f" {tid}: {desc}" if desc else f" {tid}") + tags.append(f"mitre-attack:{tid}") + + content_tags = [t for t in ext_tags if t.get("type") == "TAG"] + if content_tags: + tag_names = [t.get("tag", "") for t in content_tags if t.get("tag")] + lines.append(f"Tags: {', '.join(tag_names)}") + + country_tags = [t for t in ext_tags if t.get("type") == "COUNTRY"] + if country_tags: + countries = [t.get("tag", "") for t in country_tags if t.get("tag")] + lines.append(f"Country: {', '.join(countries)}") + for c in countries: + tags.append(f"country:{c.lower()}") + + feed_sources = ext.get("feed_source_list", []) + if feed_sources: + lines.append("") + lines.append("Feed Sources:") + for fs in feed_sources: + name = fs.get("source_name", "") + count = fs.get("seen_count", "") + first = fs.get("first_seen_date", "") + lines.append(f" {name} (seen: {count}, first: {first})") + if name: + tags.append(f"feed-source:{name}") + + # Find identity objects + for obj in objects: + if obj.get("type") == "identity": + identity_name = obj.get("name", "") + if identity_name: + lines.append(f"Source Identity: {identity_name}") + + # Dedup tags + tags = list(dict.fromkeys(tags)) + + return "\n".join(lines), tags + + +# ═══════════════════════════════════════════════════════════════════════════ +# MISP module interface +# ═══════════════════════════════════════════════════════════════════════════ + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + config = request.get("config", {}) + + # --- Validate API key --- + api_key = config.get("socradar_api_key", "").strip() + if not api_key: + misperrors["error"] = ( + "SOCRadar API key is required. " + "Get your Advanced Threat Intelligence API key at " + "https://platform.socradar.com → API Management" + ) + return misperrors + + api_url = config.get("socradar_api_url", DEFAULT_API_URL).rstrip("/") + mode = config.get("socradar_mode", "full").strip().lower() + include_ai = config.get("socradar_ai_insight", "false").strip().lower() in ("true", "1", "yes") + + # --- Extract attribute value --- + attribute = request.get("attribute", {}) + search_value = attribute.get("value", "") + + if not search_value: + for attr_type in mispattributes["input"]: + if attr_type in request: + search_value = request[attr_type] + break + + if not search_value: + misperrors["error"] = "No attribute value provided for enrichment" + return misperrors + + # --- Query SOCRadar API --- + try: + if mode == "stix": + data = _call_indicator_details_stix(api_url, api_key, search_value) + enrichment_text, tags = _format_stix_result(data, search_value) + else: + data = _call_indicator_details(api_url, api_key, search_value, include_ai) + enrichment_text, tags = _format_full_result(data, search_value) + + except requests.exceptions.HTTPError as e: + status = e.response.status_code if e.response else "unknown" + if status == 401: + misperrors["error"] = ( + "SOCRadar API authentication failed. " + "Please check your API key. " + "A valid Advanced Threat Intelligence API key is required. " + "Get yours at https://platform.socradar.com → API Management" + ) + elif status == 400: + misperrors["error"] = f"SOCRadar API bad request for indicator: {search_value}" + else: + misperrors["error"] = f"SOCRadar API error (HTTP {status}): {str(e)}" + return misperrors + + except requests.exceptions.Timeout: + misperrors["error"] = ( + "SOCRadar API request timed out. If using AI insight mode, try disabling it for faster results." + ) + return misperrors + + except Exception as e: + misperrors["error"] = f"SOCRadar API query failed: {str(e)}" + return misperrors + + # --- Return results --- + result = { + "types": ["text"], + "values": [enrichment_text], + "tags": tags, + } + + return {"results": [result]} + + +def introspection(): + return mispattributes + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo diff --git a/misp_modules/modules/import_mod/socradar_taxii_feed.py b/misp_modules/modules/import_mod/socradar_taxii_feed.py new file mode 100644 index 00000000..0ae9a735 --- /dev/null +++ b/misp_modules/modules/import_mod/socradar_taxii_feed.py @@ -0,0 +1,579 @@ +""" +SOCRadar TAXII Feed Import Module for MISP +============================================ +Import threat intelligence indicators from SOCRadar's TAXII 2.1 server +into MISP events with rich tagging (MITRE ATT&CK, TLP, malware families, +confidence levels, feed sources, and geo tags). + +SOCRadar's TAXII server provides STIX 2.1 indicators enriched with: + - MITRE ATT&CK technique mappings + - Malware family classifications + - Confidence scores (0-100) + - SOCRadar threat scores + - Feed source attribution + - Geographic attribution (country tags) + +How it works: + 1. If no collection UUID is provided, the module first fetches the list + of available collections and returns them as a summary so the user + can pick the right UUID. + 2. If a collection UUID is provided, the module fetches indicators from + that collection and returns them as MISP attributes with full tagging. + +Configuration (MISP → Server Settings → Plugin Settings → Import): + - socradar_taxii_url: TAXII base URL (default: https://taxii2.socradar.com) + - socradar_api_root: API root path (default: radar_alpha) + - socradar_username: TAXII username + - socradar_password: TAXII password +""" + +import json +import re +import urllib.parse + +import requests + +misperrors = {"error": "Error"} + +userConfig = { + "collection_id": { + "type": "String", + "message": ( + "TAXII collection UUID to fetch indicators from. " + "Leave EMPTY and type 'list' in Paste Input to see all available collections with their UUIDs." + ), + }, + "default_tlp": { + "type": "String", + "message": "Default TLP marking for imported indicators (e.g. tlp:amber). Default: tlp:amber", + }, + "max_indicators": { + "type": "String", + "message": ( + "Maximum number of indicators to import per run. " + "Default: 500. Set to 0 for unlimited (may cause timeout on large collections)." + ), + }, +} + +inputSource = ["paste"] + +moduleinfo = { + "version": "1.1", + "author": "SOCRadar", + "description": ( + "Import threat indicators from SOCRadar TAXII 2.1 feed " + "with MITRE ATT&CK, malware family, confidence, and geo tagging. " + "Leave collection_id empty and type 'list' to discover available collections." + ), + "module-type": ["import"], + "name": "SOCRadar TAXII Feed Import", + "logo": "socradar.png", + "requirements": ["requests"], + "features": ( + "Connect to SOCRadar TAXII 2.1 server and import enriched " + "threat indicators into MISP. Indicators are automatically " + "tagged with MITRE ATT&CK techniques, malware families, " + "confidence levels, feed sources, and country information. " + "Supports collection auto-discovery — leave collection_id " + "empty and type 'list' to see all available collections." + ), + "references": [ + "https://socradar.io", + "https://docs.socradar.io/taxii", + ], + "input": "SOCRadar TAXII 2.1 collection endpoint.", + "output": "MISP attributes with rich tagging.", +} + +moduleconfig = [ + "socradar_taxii_url", + "socradar_api_root", + "socradar_username", + "socradar_password", +] + +# ═══════════════════════════════════════════════════════════════════════════ +# Known malware families +# ═══════════════════════════════════════════════════════════════════════════ + +KNOWN_MALWARE = { + "redline", + "redline stealer", + "raccoon", + "raccoon stealer", + "vidar", + "stealc", + "risepro", + "lumma", + "lumma stealer", + "meduza", + "meduza stealer", + "azorult", + "aurora", + "emotet", + "trickbot", + "qakbot", + "qbot", + "icedid", + "bumblebee", + "batloader", + "pikabot", + "smokeloader", + "amadey", + "systembc", + "danabot", + "darkgate", + "asyncrat", + "remcos", + "njrat", + "nanocore", + "darkcomet", + "agent tesla", + "agenttesla", + "formbook", + "xworm", + "lokibot", + "warzone", + "dcrat", + "cobalt strike", + "cobaltstrike", + "metasploit", + "sliver", + "havoc", + "brute ratel", + "mythic", + "mirai", + "gafgyt", + "mozi", + "hajime", + "lockbit", + "blackcat", + "alphv", + "clop", + "cl0p", + "conti", + "royal", + "black basta", + "akira", + "rhysida", + "play", + "medusa ransomware", + "8base", + "hunters international", + "bianlian", + "cactus", + "blacksuit", + "prometei", + "xmrig", + "coinminer", + "cryptonight", + "screenconnect", + "connectwise", +} + +# ═══════════════════════════════════════════════════════════════════════════ +# TAXII 2.1 client +# ═══════════════════════════════════════════════════════════════════════════ + + +def _taxii_headers(): + return { + "Accept": "application/taxii+json;version=2.1", + "Content-Type": "application/taxii+json;version=2.1", + } + + +def _taxii_get(url, username, password): + resp = requests.get( + url, + headers=_taxii_headers(), + auth=(username, password), + timeout=120, + ) + resp.raise_for_status() + return resp.json() + + +def _get_credentials(config): + base = config.get("socradar_taxii_url", "https://taxii2.socradar.com").rstrip("/") + api_root = config.get("socradar_api_root", "radar_alpha").strip("/") + username = config.get("socradar_username", "") + password = config.get("socradar_password", "") + if not username or not password: + raise ValueError( + "SOCRadar TAXII username and password are required. Configure them in Plugin Settings → Import." + ) + return base, api_root, username, password + + +# ═══════════════════════════════════════════════════════════════════════════ +# Collection discovery +# ═══════════════════════════════════════════════════════════════════════════ + + +def _list_collections(config): + """Fetch available collections and return them as a text summary.""" + base, api_root, username, password = _get_credentials(config) + url = f"{base}/{api_root}/collections/" + data = _taxii_get(url, username, password) + collections = data.get("collections", []) + + if not collections: + return { + "results": [ + { + "types": ["text"], + "values": ["No collections found on SOCRadar TAXII server."], + } + ] + } + + results = [] + + # Summary text + lines = [ + f"Found {len(collections)} SOCRadar TAXII collections.", + "", + ( + "Copy the UUID of the collection you want and paste it into the " + "'collection_id' field, then run the import again." + ), + "", + "=" * 60, + ] + + for i, col in enumerate(collections, 1): + col_id = col.get("id", "") + title = col.get("title", "Unknown") + desc = col.get("description", "") + + lines.append("") + lines.append(f"[{i}] {title}") + lines.append(f" UUID: {col_id}") + if desc: + lines.append(f" Feeds: {desc[:200]}") + lines.append(f" Readable: {col.get('can_read', False)}") + + lines.append("") + lines.append("=" * 60) + lines.append("To import: go to Populate from → SOCRadar TAXII Feed → paste the UUID into collection_id field.") + + results.append( + { + "types": ["text"], + "categories": ["External analysis"], + "values": ["\n".join(lines)], + "comment": "SOCRadar TAXII Collection List", + } + ) + + return {"results": results} + + +# ═══════════════════════════════════════════════════════════════════════════ +# Parsing helpers +# ═══════════════════════════════════════════════════════════════════════════ + + +def _extract_ioc_from_pattern(pattern): + if not pattern: + return None + m = re.search( + r"\[(\S+?):(?:value|hashes\.(?:'[^']+?'|\S+?))\s*=\s*'([^']+)'\]", + pattern, + ) + if not m: + return None + obj_type, value = m.group(1).lower(), m.group(2) + type_map = { + "ipv4-addr": "ip-dst", + "ipv6-addr": "ip-dst", + "domain-name": "domain", + "url": "url", + "email-addr": "email-dst", + } + if obj_type in type_map: + return type_map[obj_type], value + if obj_type == "file": + ht_match = re.search(r"hashes\.(?:'([^']+)'|(\S+?))\s*=", pattern) + if ht_match: + ht = (ht_match.group(1) or ht_match.group(2)).lower() + ht = ht.replace("-", "").replace("'", "") + misp_ht = { + "md5": "md5", + "sha1": "sha1", + "sha256": "sha256", + "sha512": "sha512", + "ssdeep": "ssdeep", + }.get(ht, ht) + return misp_ht, value + vl = len(value) + if vl == 32: + return "md5", value + elif vl == 40: + return "sha1", value + elif vl == 64: + return "sha256", value + return "md5", value + return None + + +def _confidence_tag(confidence): + if confidence is None: + return 'confidence-level:confidence="unknown"' + c = float(confidence) + if c >= 80: + return 'confidence-level:confidence="completely-confident"' + if c >= 60: + return 'confidence-level:confidence="usually-confident"' + if c >= 40: + return 'confidence-level:confidence="fairly-confident"' + if c >= 20: + return 'confidence-level:confidence="rarely-confident"' + return 'confidence-level:confidence="unconfident"' + + +def _extract_mitre(indicator): + techniques = set() + for lbl in indicator.get("labels", []): + for tid in re.findall(r"[Tt]\d{4}(?:\.\d{3})?", lbl): + techniques.add(tid.upper()) + ext = indicator.get("extensions", {}).get("extra-info-ext", {}) + for tag in ext.get("tags", []): + if tag.get("type") == "MITRE_ATTCK": + for tid in re.findall(r"[Tt]\d{4}(?:\.\d{3})?", tag.get("tag", "")): + techniques.add(tid.upper()) + return sorted(techniques) + + +def _extract_countries(indicator): + countries = [] + ext = indicator.get("extensions", {}).get("extra-info-ext", {}) + for tag in ext.get("tags", []): + if tag.get("type") == "COUNTRY" and tag.get("tag"): + countries.append(tag["tag"].strip().lower()) + return countries + + +def _extract_feed_sources(indicator): + sources = [] + top = indicator.get("threat_feed_source_name") + if top: + sources.append(top) + ext = indicator.get("extensions", {}).get("extra-info-ext", {}) + for fs in ext.get("feed_source_list", []): + name = fs.get("source_name", "") + if name and name not in sources: + sources.append(name) + return sources + + +def _detect_malware(indicator): + searchable = list(indicator.get("labels", [])) + desc = indicator.get("description", "") + if desc and desc != "N/A": + searchable.append(desc) + name = indicator.get("name", "") + if name: + searchable.append(name) + ext = indicator.get("extensions", {}).get("extra-info-ext", {}) + for tag in ext.get("tags", []): + if tag.get("tag"): + searchable.append(tag["tag"]) + combined = " ".join(searchable).lower() + for family in KNOWN_MALWARE: + if re.search(r"\b" + re.escape(family) + r"\b", combined): + return family.title() + return None + + +def _get_score(indicator): + ext = indicator.get("extensions", {}).get("extra-info-ext", {}) + score = ext.get("score") + return float(score) if score is not None else None + + +# ═══════════════════════════════════════════════════════════════════════════ +# Indicator fetcher with limit +# ═══════════════════════════════════════════════════════════════════════════ + + +def _fetch_indicators(config, collection_id, max_indicators=500): + """Fetch indicators from a single collection with a limit.""" + base, api_root, username, password = _get_credentials(config) + all_indicators = [] + objects_url = f"{base}/{api_root}/collections/{collection_id}/objects/" + page = 0 + max_pages = 20 + + while objects_url and page < max_pages: + page += 1 + envelope = _taxii_get(objects_url, username, password) + page_objects = envelope.get("objects", []) + + for obj in page_objects: + if obj.get("type") == "indicator": + all_indicators.append(obj) + if max_indicators > 0 and len(all_indicators) >= max_indicators: + return all_indicators + + if envelope.get("more", False) and envelope.get("next"): + base_url = f"{base}/{api_root}/collections/{collection_id}/objects/" + objects_url = f"{base_url}?next={urllib.parse.quote(envelope['next'])}" + else: + objects_url = None + + return all_indicators + + +# ═══════════════════════════════════════════════════════════════════════════ +# MISP module interface +# ═══════════════════════════════════════════════════════════════════════════ + + +def handler(q=False): + if q is False: + return False + + request = json.loads(q) + config = request.get("config", {}) + + # Get user config + collection_id = config.get("collection_id", "").strip() + default_tlp = config.get("default_tlp", "tlp:amber").strip() + if not default_tlp: + default_tlp = "tlp:amber" + + max_ind_str = config.get("max_indicators", "500").strip() + try: + max_indicators = int(max_ind_str) if max_ind_str else 500 + except ValueError: + max_indicators = 500 + + # If no collection_id provided → list collections + if not collection_id: + try: + return _list_collections(config) + except Exception as e: + misperrors["error"] = f"SOCRadar TAXII collection discovery failed: {str(e)}" + return misperrors + + # Fetch indicators from the specified collection + try: + indicators = _fetch_indicators(config, collection_id, max_indicators) + except Exception as e: + misperrors["error"] = f"SOCRadar TAXII fetch failed: {str(e)}" + return misperrors + + if not indicators: + return { + "results": [ + { + "types": ["text"], + "values": [ + f"No indicators found in collection {collection_id}. " + "The collection may be empty or the UUID may be incorrect." + ], + } + ] + } + + # Build MISP results + results = [] + seen_values = set() + + for ind in indicators: + parsed = _extract_ioc_from_pattern(ind.get("pattern", "")) + if not parsed: + continue + + attr_type, attr_value = parsed + + if attr_value in seen_values: + continue + seen_values.add(attr_value) + + # Build tags + tags = [default_tlp, "source:SOCRadar", "type:OSINT", "socradar:feed"] + + for tid in _extract_mitre(ind): + tags.append(f"mitre-attack:{tid}") + + conf = ind.get("confidence") + tags.append(_confidence_tag(conf)) + + score = _get_score(ind) + if score is not None: + tags.append(f"socradar:score={score}") + + family = _detect_malware(ind) + if family: + tags.append(f"malware:{family.lower()}") + + for c in _extract_countries(ind): + tags.append(f"country:{c}") + + sources = _extract_feed_sources(ind) + for src in sources: + tags.append(f"feed-source:{src}") + + # Comment + comment_parts = [] + desc = ind.get("description", "") + if desc and desc != "N/A": + comment_parts.append(desc[:200]) + if sources: + comment_parts.append(f"Sources: {', '.join(sources)}") + comment = " | ".join(comment_parts) if comment_parts else "" + + # to_ids + ind_types = ind.get("indicator_types", []) + to_ids = "malicious-activity" in ind_types or not ind_types + + result = { + "types": [attr_type], + "values": [attr_value], + "comment": comment, + "tags": tags, + "to_ids": to_ids, + } + results.append(result) + + # Add summary as first result + summary = f"SOCRadar TAXII Import: {len(results)} indicators imported from collection {collection_id}" + if max_indicators > 0 and len(indicators) >= max_indicators: + summary += f" (limited to {max_indicators}, more available)" + + results.insert( + 0, + { + "types": ["text"], + "categories": ["External analysis"], + "values": [summary], + "comment": "SOCRadar TAXII Feed Import Summary", + }, + ) + + return {"results": results} + + +def introspection(): + modulesetup = {} + try: + userConfig + modulesetup["userConfig"] = userConfig + except NameError: + pass + try: + inputSource + modulesetup["inputSource"] = inputSource + except NameError: + pass + modulesetup["format"] = "misp_standard" + return modulesetup + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo