diff --git a/install/requirements.txt b/install/requirements.txt index b19e7fd7c..0fc4832fa 100644 --- a/install/requirements.txt +++ b/install/requirements.txt @@ -52,4 +52,4 @@ pyyaml pytest-asyncio vulture detect-secrets -git+https://github.com/SECEF/python-idmefv2.git +idmefv2==0.8 diff --git a/slips_files/common/idmefv2.py b/slips_files/common/idmefv2.py index c946a6936..f512b832d 100644 --- a/slips_files/common/idmefv2.py +++ b/slips_files/common/idmefv2.py @@ -26,7 +26,7 @@ class IDMEFv2Status(Enum): ALERT = "Incident" -class IDMEFv2Severity(Enum): +class IDMEFv2PriorityEnum(Enum): UNKNOWN = "Unknown" INFO = "Info" LOW = "Low" @@ -53,7 +53,7 @@ def __init__(self, logger: Output, db): self.model: str = utils.get_slips_version() # the used idmef version - self.version = "2.0.3" + self.version = "2.D.V03" def _get_analyzer(self, interface): return { @@ -75,7 +75,7 @@ def get_host_ip(self, interface: str) -> str: def print(self, *args, **kwargs): return self.printer.print(*args, **kwargs) - def convert_threat_level_to_idmefv2_severity( + def convert_threat_level_to_idmefv2_priority( self, threat_lvl: ThreatLevel ) -> str: """ @@ -83,11 +83,12 @@ def convert_threat_level_to_idmefv2_severity( All threat levels have a corresponding sevirity except for the Critical threat level, so we map it to High severity """ - if hasattr(IDMEFv2Severity, threat_lvl.name): - return getattr(IDMEFv2Severity, threat_lvl.name).value + if hasattr(IDMEFv2PriorityEnum, threat_lvl.name): + return getattr(IDMEFv2PriorityEnum, threat_lvl.name).value if threat_lvl.name == "CRITICAL": - return IDMEFv2Severity.HIGH.value + return IDMEFv2PriorityEnum.HIGH.value + return IDMEFv2PriorityEnum.UNKNOWN.value def extract_role_type( self, evidence: Evidence, role=None @@ -111,6 +112,7 @@ def extract_role_type( IoCType.DOMAIN.name: "Hostname", IoCType.URL.name: "URL", } + ioc_type = ioc_type.name if isinstance(ioc_type, IoCType) else ioc_type # todo make sure that its a fq domain return ioc, type_[ioc_type] @@ -195,7 +197,7 @@ def convert_to_idmef_event(self, evidence: Evidence) -> Message: attacker, attacker_type = self.extract_role_type( evidence, role="attacker" ) - severity: str = self.convert_threat_level_to_idmefv2_severity( + priority: str = self.convert_threat_level_to_idmefv2_priority( evidence.threat_level ) @@ -207,7 +209,7 @@ def convert_to_idmef_event(self, evidence: Evidence) -> Message: "Status": IDMEFv2Status.EVIDENCE.value, # that is a uuid4() "ID": evidence.id, - "Severity": severity, + "Priority": priority, # Timestamp indicating the deduced start of the event "StartTime": iso_ts, # Timestamp indicating when the message was created diff --git a/tests/unit/slips_files/common/test_idmefv2.py b/tests/unit/slips_files/common/test_idmefv2.py new file mode 100644 index 000000000..fbf573437 --- /dev/null +++ b/tests/unit/slips_files/common/test_idmefv2.py @@ -0,0 +1,95 @@ +from unittest.mock import Mock, patch + +import pytest + +from slips_files.common.idmefv2 import IDMEFv2 +from slips_files.core.structures.evidence import ( + Direction, + EvidenceType, + IoCType, + Proto, + ThreatLevel, +) +from tests.module_factory import ModuleFactory + + +class MessageStub(dict): + """Minimal dict-like IDMEF message for converter unit tests.""" + + def validate(self) -> None: + """Skip external schema validation. + + Return: + None. + """ + return None + + +@pytest.mark.parametrize( + "threat_level, expected_priority", + [ + (ThreatLevel.INFO, "Info"), + (ThreatLevel.LOW, "Low"), + (ThreatLevel.MEDIUM, "Medium"), + (ThreatLevel.HIGH, "High"), + (ThreatLevel.CRITICAL, "High"), + ], +) +def test_convert_threat_level_to_idmefv2_priority( + threat_level: ThreatLevel, expected_priority: str +) -> None: + """Verify Slips threat levels map to IDMEFv2 priority values. + + Parameters: + threat_level: Slips threat level to convert. + expected_priority: Expected IDMEFv2 priority string. + + Return: + None. + """ + module_factory = ModuleFactory() + idmefv2 = IDMEFv2(module_factory.logger, Mock()) + + assert ( + idmefv2.convert_threat_level_to_idmefv2_priority(threat_level) + == expected_priority + ) + + +def test_convert_to_idmef_event_uses_priority_field() -> None: + """Verify converted IDMEFv2 events use Priority instead of Severity. + + Return: + None. + """ + module_factory = ModuleFactory() + db = Mock() + db.is_running_non_stop.return_value = False + idmefv2 = IDMEFv2(module_factory.logger, db) + attacker = module_factory.create_attacker_obj( + value="192.168.1.1", direction=Direction.SRC, ioc_type=IoCType.IP + ) + victim = module_factory.create_victim_obj( + value="192.168.1.2", direction=Direction.DST, ioc_type=IoCType.IP + ) + evidence = module_factory.create_evidence_obj( + evidence_type=EvidenceType.ARP_SCAN, + description="ARP scan detected", + attacker=attacker, + threat_level=ThreatLevel.MEDIUM, + victim=victim, + profile=module_factory.create_profileid_obj(ip="192.168.1.1"), + timewindow=module_factory.create_timewindow_obj(number=1), + uid=["uid1"], + timestamp="2023/10/26 10:10:10.000000+0000", + proto=Proto.TCP, + dst_port=80, + id="d4afbe1a-1cb9-4db4-9fac-74f2da6f5f34", + confidence=0.8, + ) + + with patch("slips_files.common.idmefv2.Message", MessageStub): + event = idmefv2.convert_to_idmef_event(evidence) + + assert event["Priority"] == "Medium" + assert "Severity" not in event