Skip to content

Commit ab35656

Browse files
author
Nathan Roberts
committed
Added alphamountain expansion module
1 parent afc9a49 commit ab35656

1 file changed

Lines changed: 233 additions & 0 deletions

File tree

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
import json
2+
import requests
3+
4+
misperrors = {'error': 'Error'}
5+
mispattributes = {
6+
'input': ['ip', 'ip-src', 'ip-dst', 'ip-src|port', 'ip-dst|port', 'domain', 'hostname', 'url', 'uri', 'link'],
7+
'output': ['text', 'comment'],
8+
'format': 'misp_standard'
9+
}
10+
11+
userConfig = {
12+
"license": {
13+
"type": "String",
14+
"errorMessage": "alphaMountain license key is required",
15+
"message": "Your alphaMountain API license key",
16+
"required": True,
17+
},
18+
"scan_depth": {
19+
"type": "String",
20+
"message": "Scan depth: low, medium, or high (default: medium)",
21+
"default": "medium",
22+
"required": False,
23+
},
24+
"partner_type": {
25+
"type": "String",
26+
"message": "Partner type (default: partner.info)",
27+
"default": "partner.info",
28+
"required": False,
29+
},
30+
}
31+
32+
moduleconfig = list(userConfig.keys())
33+
34+
moduleinfo = {
35+
'version': '1.0',
36+
'author': 'alphaMountain',
37+
'description': 'alphaMountain threat intelligence lookup for IPs, domains, hostnames, and URLs - adds risk score tags',
38+
'module-type': ['expansion', 'hover'],
39+
'name': 'alphaMountain_risk',
40+
'logo': '',
41+
'requirements': [],
42+
'features': '',
43+
'references': [],
44+
}
45+
46+
def handler(q=False):
47+
"""Main handler function called by MISP"""
48+
49+
if q is False:
50+
return False
51+
52+
try:
53+
request = json.loads(q)
54+
except Exception as e:
55+
return {'error': f'Invalid JSON input: {str(e)}'}
56+
57+
if not request.get('config'):
58+
return {'error': 'Configuration required'}
59+
60+
config = request['config']
61+
if 'license' not in config:
62+
return {'error': 'License key required in module configuration'}
63+
64+
attribute = request.get('attribute')
65+
if not attribute:
66+
return {'error': 'No attribute provided'}
67+
68+
# Check for required fields for MISP modules
69+
required_fields = ['type', 'value', 'uuid']
70+
if not all(field in attribute for field in required_fields):
71+
return {'error': f'Invalid attribute format, missing fields: {required_fields}'}
72+
73+
# Validate attribute type is supported
74+
if attribute['type'] not in mispattributes['input']:
75+
return {'error': f'Unsupported attribute type: {attribute["type"]}'}
76+
77+
license_key = config['license']
78+
api_url = 'https://api.alphamountain.ai/threat/uri'
79+
scan_depth = config.get('scan_depth', 'medium')
80+
partner_type = config.get('partner_type', 'partner.info')
81+
82+
# Map MISP attribute type to alphaMountain API category and extract value
83+
ioc_value = get_ioc_value(attribute)
84+
85+
try:
86+
threat_data = query_alphamountain_api(api_url, license_key, ioc_value, scan_depth, partner_type)
87+
88+
if not threat_data:
89+
return {'error': 'No data returned from alphaMountain API'}
90+
91+
# Create results dictionary - DO NOT include the original attribute
92+
results = {
93+
'Attribute': []
94+
}
95+
96+
# Process the response and create attributes with tags
97+
process_threat_response(threat_data, results, attribute)
98+
99+
return {'results': results}
100+
101+
except requests.RequestException as e:
102+
return {'error': f'API request failed: {str(e)}'}
103+
except Exception as e:
104+
return {'error': f'Processing error: {str(e)}'}
105+
106+
def get_ioc_value(attribute):
107+
"""Extract IOC value from attribute, handling IPs with ports"""
108+
attr_type = attribute['type']
109+
attr_value = attribute['value']
110+
111+
# For IP addresses with ports, strip the port (alphaMountain API doesn't support ports)
112+
if attr_type in ['ip-src|port', 'ip-dst|port']:
113+
return attr_value.split('|')[0]
114+
115+
return attr_value
116+
117+
def query_alphamountain_api(api_url, license_key, ioc_value, scan_depth, partner_type):
118+
"""Query the alphaMountain API for threat intelligence"""
119+
120+
headers = {'Content-Type': 'application/json'}
121+
payload = {
122+
'uri': ioc_value,
123+
'license': license_key,
124+
'version': 1,
125+
'type': partner_type,
126+
'scan_depth': scan_depth
127+
}
128+
129+
try:
130+
response = requests.post(api_url, json=payload, headers=headers, timeout=30)
131+
response.raise_for_status()
132+
133+
return response.json()
134+
135+
except requests.exceptions.HTTPError as e:
136+
status = response.status_code if response else "No status"
137+
body = response.text[:500] if response else "No response body"
138+
raise ValueError(f"HTTP {status}: {body}")
139+
140+
except requests.exceptions.RequestException as e:
141+
raise ValueError(f"Request failed: {str(e)}")
142+
143+
except Exception as e:
144+
raise ValueError(f"Unexpected error: {str(e)}")
145+
146+
def process_threat_response(threat_data, results, original_attribute):
147+
"""Process the API response and apply risk score tag to the original attribute"""
148+
149+
# Create tags list - only risk score
150+
tags = []
151+
152+
if not isinstance(threat_data, dict):
153+
# Add error tag
154+
tags.append({'name': 'alphaMountain:error', 'colour': '#ff0000'})
155+
else:
156+
status = threat_data.get('status', {})
157+
if status.get('threat') != 'Success':
158+
tags.append({'name': 'alphaMountain:api-error', 'colour': '#ff0000'})
159+
else:
160+
threat_info = threat_data.get('threat', {})
161+
162+
if isinstance(threat_info, dict) and threat_info:
163+
score = threat_info.get('score', 'N/A')
164+
165+
# Add risk score tag
166+
if score != 'N/A':
167+
try:
168+
score_float = round(float(score), 2)
169+
risk_level = get_risk_level(score_float)
170+
risk_color = get_risk_color(risk_level)
171+
172+
# Add specific score tag with full precision
173+
tags.append({
174+
'name': f'alphaMountain:risk-score="{score_float}"',
175+
'colour': risk_color
176+
})
177+
178+
except (ValueError, TypeError):
179+
# If score is not a valid number, add a generic tag
180+
tags.append({'name': 'alphaMountain:risk-score="unknown"', 'colour': '#ffa500'})
181+
182+
# If no tags were added, add a no-data tag
183+
if not tags:
184+
tags.append({'name': 'alphaMountain:no-data', 'colour': '#ffa500'})
185+
186+
# Create enriched attribute with only risk score tag
187+
enriched_attribute = {
188+
'type': original_attribute['type'],
189+
'value': original_attribute['value'],
190+
'category': original_attribute.get('category', 'Network activity'),
191+
'to_ids': original_attribute.get('to_ids', False),
192+
'disable_correlation': original_attribute.get('disable_correlation', False),
193+
'comment': 'alphaMountain threat intelligence analysis',
194+
'Tag': tags
195+
}
196+
197+
results['Attribute'].append(enriched_attribute)
198+
199+
def get_risk_level(score):
200+
"""Determine risk level based on score"""
201+
int_score = int(score)
202+
if int_score >= 8:
203+
return 'high'
204+
elif int_score >= 7:
205+
return 'medium'
206+
elif int_score >= 6:
207+
return 'low'
208+
else:
209+
return 'minimal'
210+
211+
def get_risk_color(risk_level):
212+
"""Get color code for risk level"""
213+
color_map = {
214+
'high': '#ff0000', # Red
215+
'medium': '#ffa500', # Orange
216+
'low': '#ffff00', # Yellow
217+
'minimal': '#00cc00' # Green
218+
}
219+
return color_map.get(risk_level, '#cccccc')
220+
221+
def introspection():
222+
"""Return module metadata for MISP"""
223+
modulesetup = {}
224+
modulesetup['userConfig'] = userConfig
225+
modulesetup['input'] = mispattributes['input']
226+
modulesetup['output'] = mispattributes['output']
227+
modulesetup['format'] = 'misp_standard'
228+
return modulesetup
229+
230+
def version():
231+
"""Return module version"""
232+
moduleinfo['config'] = moduleconfig
233+
return moduleinfo

0 commit comments

Comments
 (0)