Skip to content

Commit b684963

Browse files
authored
Merge pull request #1988 from stratosphereips/alya/fix_idmef_warning
Fix idmef warning
2 parents dedff5a + f6c8bfa commit b684963

3 files changed

Lines changed: 106 additions & 9 deletions

File tree

install/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,4 @@ pyyaml
5252
pytest-asyncio
5353
vulture
5454
detect-secrets
55-
git+https://github.com/SECEF/python-idmefv2.git
55+
idmefv2==0.8

slips_files/common/idmefv2.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class IDMEFv2Status(Enum):
2626
ALERT = "Incident"
2727

2828

29-
class IDMEFv2Severity(Enum):
29+
class IDMEFv2PriorityEnum(Enum):
3030
UNKNOWN = "Unknown"
3131
INFO = "Info"
3232
LOW = "Low"
@@ -53,7 +53,7 @@ def __init__(self, logger: Output, db):
5353
self.model: str = utils.get_slips_version()
5454

5555
# the used idmef version
56-
self.version = "2.0.3"
56+
self.version = "2.D.V03"
5757

5858
def _get_analyzer(self, interface):
5959
return {
@@ -75,19 +75,20 @@ def get_host_ip(self, interface: str) -> str:
7575
def print(self, *args, **kwargs):
7676
return self.printer.print(*args, **kwargs)
7777

78-
def convert_threat_level_to_idmefv2_severity(
78+
def convert_threat_level_to_idmefv2_priority(
7979
self, threat_lvl: ThreatLevel
8080
) -> str:
8181
"""
8282
converts slips threat level to a valid IDMEFv2 Severity value
8383
All threat levels have a corresponding sevirity except
8484
for the Critical threat level, so we map it to High severity
8585
"""
86-
if hasattr(IDMEFv2Severity, threat_lvl.name):
87-
return getattr(IDMEFv2Severity, threat_lvl.name).value
86+
if hasattr(IDMEFv2PriorityEnum, threat_lvl.name):
87+
return getattr(IDMEFv2PriorityEnum, threat_lvl.name).value
8888

8989
if threat_lvl.name == "CRITICAL":
90-
return IDMEFv2Severity.HIGH.value
90+
return IDMEFv2PriorityEnum.HIGH.value
91+
return IDMEFv2PriorityEnum.UNKNOWN.value
9192

9293
def extract_role_type(
9394
self, evidence: Evidence, role=None
@@ -111,6 +112,7 @@ def extract_role_type(
111112
IoCType.DOMAIN.name: "Hostname",
112113
IoCType.URL.name: "URL",
113114
}
115+
ioc_type = ioc_type.name if isinstance(ioc_type, IoCType) else ioc_type
114116
# todo make sure that its a fq domain
115117
return ioc, type_[ioc_type]
116118

@@ -195,7 +197,7 @@ def convert_to_idmef_event(self, evidence: Evidence) -> Message:
195197
attacker, attacker_type = self.extract_role_type(
196198
evidence, role="attacker"
197199
)
198-
severity: str = self.convert_threat_level_to_idmefv2_severity(
200+
priority: str = self.convert_threat_level_to_idmefv2_priority(
199201
evidence.threat_level
200202
)
201203

@@ -207,7 +209,7 @@ def convert_to_idmef_event(self, evidence: Evidence) -> Message:
207209
"Status": IDMEFv2Status.EVIDENCE.value,
208210
# that is a uuid4()
209211
"ID": evidence.id,
210-
"Severity": severity,
212+
"Priority": priority,
211213
# Timestamp indicating the deduced start of the event
212214
"StartTime": iso_ts,
213215
# Timestamp indicating when the message was created
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from unittest.mock import Mock, patch
2+
3+
import pytest
4+
5+
from slips_files.common.idmefv2 import IDMEFv2
6+
from slips_files.core.structures.evidence import (
7+
Direction,
8+
EvidenceType,
9+
IoCType,
10+
Proto,
11+
ThreatLevel,
12+
)
13+
from tests.module_factory import ModuleFactory
14+
15+
16+
class MessageStub(dict):
17+
"""Minimal dict-like IDMEF message for converter unit tests."""
18+
19+
def validate(self) -> None:
20+
"""Skip external schema validation.
21+
22+
Return:
23+
None.
24+
"""
25+
return None
26+
27+
28+
@pytest.mark.parametrize(
29+
"threat_level, expected_priority",
30+
[
31+
(ThreatLevel.INFO, "Info"),
32+
(ThreatLevel.LOW, "Low"),
33+
(ThreatLevel.MEDIUM, "Medium"),
34+
(ThreatLevel.HIGH, "High"),
35+
(ThreatLevel.CRITICAL, "High"),
36+
],
37+
)
38+
def test_convert_threat_level_to_idmefv2_priority(
39+
threat_level: ThreatLevel, expected_priority: str
40+
) -> None:
41+
"""Verify Slips threat levels map to IDMEFv2 priority values.
42+
43+
Parameters:
44+
threat_level: Slips threat level to convert.
45+
expected_priority: Expected IDMEFv2 priority string.
46+
47+
Return:
48+
None.
49+
"""
50+
module_factory = ModuleFactory()
51+
idmefv2 = IDMEFv2(module_factory.logger, Mock())
52+
53+
assert (
54+
idmefv2.convert_threat_level_to_idmefv2_priority(threat_level)
55+
== expected_priority
56+
)
57+
58+
59+
def test_convert_to_idmef_event_uses_priority_field() -> None:
60+
"""Verify converted IDMEFv2 events use Priority instead of Severity.
61+
62+
Return:
63+
None.
64+
"""
65+
module_factory = ModuleFactory()
66+
db = Mock()
67+
db.is_running_non_stop.return_value = False
68+
idmefv2 = IDMEFv2(module_factory.logger, db)
69+
attacker = module_factory.create_attacker_obj(
70+
value="192.168.1.1", direction=Direction.SRC, ioc_type=IoCType.IP
71+
)
72+
victim = module_factory.create_victim_obj(
73+
value="192.168.1.2", direction=Direction.DST, ioc_type=IoCType.IP
74+
)
75+
evidence = module_factory.create_evidence_obj(
76+
evidence_type=EvidenceType.ARP_SCAN,
77+
description="ARP scan detected",
78+
attacker=attacker,
79+
threat_level=ThreatLevel.MEDIUM,
80+
victim=victim,
81+
profile=module_factory.create_profileid_obj(ip="192.168.1.1"),
82+
timewindow=module_factory.create_timewindow_obj(number=1),
83+
uid=["uid1"],
84+
timestamp="2023/10/26 10:10:10.000000+0000",
85+
proto=Proto.TCP,
86+
dst_port=80,
87+
id="d4afbe1a-1cb9-4db4-9fac-74f2da6f5f34",
88+
confidence=0.8,
89+
)
90+
91+
with patch("slips_files.common.idmefv2.Message", MessageStub):
92+
event = idmefv2.convert_to_idmef_event(evidence)
93+
94+
assert event["Priority"] == "Medium"
95+
assert "Severity" not in event

0 commit comments

Comments
 (0)