Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion install/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ pyyaml
pytest-asyncio
vulture
detect-secrets
git+https://github.com/SECEF/python-idmefv2.git
idmefv2==0.8
18 changes: 10 additions & 8 deletions slips_files/common/idmefv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class IDMEFv2Status(Enum):
ALERT = "Incident"


class IDMEFv2Severity(Enum):
class IDMEFv2PriorityEnum(Enum):
UNKNOWN = "Unknown"
INFO = "Info"
LOW = "Low"
Expand All @@ -53,7 +53,7 @@ def __init__(self, logger: Output, db):
self.model: str = utils.get_slips_version()

# the used idmef version
self.version = "2.0.3"
self.version = "2.D.V03"

def _get_analyzer(self, interface):
return {
Expand All @@ -75,19 +75,20 @@ def get_host_ip(self, interface: str) -> str:
def print(self, *args, **kwargs):
return self.printer.print(*args, **kwargs)

def convert_threat_level_to_idmefv2_severity(
def convert_threat_level_to_idmefv2_priority(
self, threat_lvl: ThreatLevel
) -> str:
"""
converts slips threat level to a valid IDMEFv2 Severity value
All threat levels have a corresponding sevirity except
for the Critical threat level, so we map it to High severity
"""
if hasattr(IDMEFv2Severity, threat_lvl.name):
return getattr(IDMEFv2Severity, threat_lvl.name).value
if hasattr(IDMEFv2PriorityEnum, threat_lvl.name):
return getattr(IDMEFv2PriorityEnum, threat_lvl.name).value

if threat_lvl.name == "CRITICAL":
return IDMEFv2Severity.HIGH.value
return IDMEFv2PriorityEnum.HIGH.value
return IDMEFv2PriorityEnum.UNKNOWN.value

def extract_role_type(
self, evidence: Evidence, role=None
Expand All @@ -111,6 +112,7 @@ def extract_role_type(
IoCType.DOMAIN.name: "Hostname",
IoCType.URL.name: "URL",
}
ioc_type = ioc_type.name if isinstance(ioc_type, IoCType) else ioc_type
# todo make sure that its a fq domain
return ioc, type_[ioc_type]

Expand Down Expand Up @@ -195,7 +197,7 @@ def convert_to_idmef_event(self, evidence: Evidence) -> Message:
attacker, attacker_type = self.extract_role_type(
evidence, role="attacker"
)
severity: str = self.convert_threat_level_to_idmefv2_severity(
priority: str = self.convert_threat_level_to_idmefv2_priority(
evidence.threat_level
)

Expand All @@ -207,7 +209,7 @@ def convert_to_idmef_event(self, evidence: Evidence) -> Message:
"Status": IDMEFv2Status.EVIDENCE.value,
# that is a uuid4()
"ID": evidence.id,
"Severity": severity,
"Priority": priority,
# Timestamp indicating the deduced start of the event
"StartTime": iso_ts,
# Timestamp indicating when the message was created
Expand Down
95 changes: 95 additions & 0 deletions tests/unit/slips_files/common/test_idmefv2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from unittest.mock import Mock, patch

import pytest

from slips_files.common.idmefv2 import IDMEFv2
from slips_files.core.structures.evidence import (
Direction,
EvidenceType,
IoCType,
Proto,
ThreatLevel,
)
from tests.module_factory import ModuleFactory


class MessageStub(dict):
"""Minimal dict-like IDMEF message for converter unit tests."""

def validate(self) -> None:
"""Skip external schema validation.

Return:
None.
"""
return None


@pytest.mark.parametrize(
"threat_level, expected_priority",
[
(ThreatLevel.INFO, "Info"),
(ThreatLevel.LOW, "Low"),
(ThreatLevel.MEDIUM, "Medium"),
(ThreatLevel.HIGH, "High"),
(ThreatLevel.CRITICAL, "High"),
],
)
def test_convert_threat_level_to_idmefv2_priority(
threat_level: ThreatLevel, expected_priority: str
) -> None:
"""Verify Slips threat levels map to IDMEFv2 priority values.

Parameters:
threat_level: Slips threat level to convert.
expected_priority: Expected IDMEFv2 priority string.

Return:
None.
"""
module_factory = ModuleFactory()
idmefv2 = IDMEFv2(module_factory.logger, Mock())

assert (
idmefv2.convert_threat_level_to_idmefv2_priority(threat_level)
== expected_priority
)


def test_convert_to_idmef_event_uses_priority_field() -> None:
"""Verify converted IDMEFv2 events use Priority instead of Severity.

Return:
None.
"""
module_factory = ModuleFactory()
db = Mock()
db.is_running_non_stop.return_value = False
idmefv2 = IDMEFv2(module_factory.logger, db)
attacker = module_factory.create_attacker_obj(
value="192.168.1.1", direction=Direction.SRC, ioc_type=IoCType.IP
)
victim = module_factory.create_victim_obj(
value="192.168.1.2", direction=Direction.DST, ioc_type=IoCType.IP
)
evidence = module_factory.create_evidence_obj(
evidence_type=EvidenceType.ARP_SCAN,
description="ARP scan detected",
attacker=attacker,
threat_level=ThreatLevel.MEDIUM,
victim=victim,
profile=module_factory.create_profileid_obj(ip="192.168.1.1"),
timewindow=module_factory.create_timewindow_obj(number=1),
uid=["uid1"],
timestamp="2023/10/26 10:10:10.000000+0000",
proto=Proto.TCP,
dst_port=80,
id="d4afbe1a-1cb9-4db4-9fac-74f2da6f5f34",
confidence=0.8,
)

with patch("slips_files.common.idmefv2.Message", MessageStub):
event = idmefv2.convert_to_idmef_event(evidence)

assert event["Priority"] == "Medium"
assert "Severity" not in event
Loading