-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathIrisVTInterface.py
More file actions
148 lines (120 loc) · 6.36 KB
/
IrisVTInterface.py
File metadata and controls
148 lines (120 loc) · 6.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
#
# IRIS VT Module Source Code
# contact@dfir-iris.org
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import iris_interface.IrisInterfaceStatus as InterfaceStatus
from iris_interface.IrisModuleInterface import IrisModuleInterface, IrisModuleTypes
import iris_vt_module.IrisVTConfig as interface_conf
from iris_vt_module.vt_handler.vt_handler import VtHandler
class IrisVTInterface(IrisModuleInterface):
"""
Provide the interface between Iris and VT
"""
name = "IrisVTInterface"
_module_name = interface_conf.module_name
_module_description = interface_conf.module_description
_interface_version = interface_conf.interface_version
_module_version = interface_conf.module_version
_pipeline_support = interface_conf.pipeline_support
_pipeline_info = interface_conf.pipeline_info
_module_configuration = interface_conf.module_configuration
_module_type = IrisModuleTypes.module_processor
def register_hooks(self, module_id: int):
"""
Registers all the hooks
:param module_id: Module ID provided by IRIS
:return: Nothing
"""
self.module_id = module_id
module_conf = self.module_dict_conf
if module_conf.get('vt_on_create_hook_enabled'):
status = self.register_to_hook(module_id, iris_hook_name='on_postload_ioc_create')
if status.is_failure():
self.log.error(status.get_message())
self.log.error(status.get_data())
else:
self.log.info("Successfully registered on_postload_ioc_create hook")
else:
self.deregister_from_hook(module_id=self.module_id, iris_hook_name='on_postload_ioc_create')
if module_conf.get('vt_on_update_hook_enabled'):
status = self.register_to_hook(module_id, iris_hook_name='on_postload_ioc_update')
if status.is_failure():
self.log.error(status.get_message())
self.log.error(status.get_data())
else:
self.log.info("Successfully registered on_postload_ioc_update hook")
else:
self.deregister_from_hook(module_id=self.module_id, iris_hook_name='on_postload_ioc_update')
if module_conf.get('vt_manual_hook_enabled'):
status = self.register_to_hook(module_id, iris_hook_name='on_manual_trigger_ioc',
manual_hook_name='Get VT insight')
if status.is_failure():
self.log.error(status.get_message())
self.log.error(status.get_data())
else:
self.log.info("Successfully registered on_manual_trigger_ioc hook")
else:
self.deregister_from_hook(module_id=self.module_id, iris_hook_name='on_manual_trigger_ioc')
def hooks_handler(self, hook_name: str, hook_ui_name: str, data: any):
"""
Hooks handler table. Calls corresponding methods depending on the hooks name.
:param hook_name: Name of the hook which triggered
:param hook_ui_name: Name of the ui hook
:param data: Data associated with the trigger.
:return: Data
"""
self.log.info(f'Received {hook_name}')
if hook_name in ['on_postload_ioc_create', 'on_postload_ioc_update', 'on_manual_trigger_ioc']:
status = self._handle_ioc(data=data)
else:
self.log.critical(f'Received unsupported hook {hook_name}')
return InterfaceStatus.I2Error(data=data, logs=list(self.message_queue))
if status.is_failure():
self.log.error(f"Encountered error processing hook {hook_name}")
return InterfaceStatus.I2Error(data=data, logs=list(self.message_queue))
self.log.info(f"Successfully processed hook {hook_name}")
return InterfaceStatus.I2Success(data=data, logs=list(self.message_queue))
def _handle_ioc(self, data) -> InterfaceStatus.IIStatus:
"""
Handle the IOC data the module just received. The module registered
to on_postload hooks, so it receives instances of IOC object.
These objects are attached to a dedicated SQlAlchemy session so data can
be modified safely.
:param data: Data associated to the hook, here IOC object
:return: IIStatus
"""
vt_handler = VtHandler(mod_config=self.module_dict_conf,
server_config=self.server_dict_conf,
logger=self.log)
in_status = InterfaceStatus.IIStatus(code=InterfaceStatus.I2CodeNoError)
for element in data:
# Check that the IOC we receive is of type the module can handle and dispatch
if 'ip-' in element.ioc_type.type_name:
status = vt_handler.handle_vt_ip(ioc=element)
in_status = InterfaceStatus.merge_status(in_status, status)
elif 'domain' in element.ioc_type.type_name:
status = vt_handler.handle_vt_domain(ioc=element)
in_status = InterfaceStatus.merge_status(in_status, status)
elif element.ioc_type.type_name in ['md5', 'sha1', 'sha224', 'sha256', 'sha512']:
status = vt_handler.handle_vt_hash(ioc=element)
in_status = InterfaceStatus.merge_status(in_status, status)
elif element.ioc_type.type_name in ['filename|md5', 'filename|sha1', 'filename|sha224', 'filename|sha256', 'filename|sha512']:
status = vt_handler.handle_vt_filename_hash(ioc=element)
in_status = InterfaceStatus.merge_status(in_status, status)
else:
self.log.error(f'IOC type {element.ioc_type.type_name} not handled by VT module. Skipping')
return in_status(data=data)