-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathactivity_models.py
More file actions
140 lines (121 loc) · 5.05 KB
/
activity_models.py
File metadata and controls
140 lines (121 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import re
import uuid
import logging
from hashlib import sha1
from functools import cached_property
from datetime import datetime, timezone
from pydantic import Field, computed_field
from typing import Annotated, Any, Optional
from bbot_server.utils.misc import utc_now
from bbot_server.cli.themes import COLOR, DARK_COLOR
from bbot_server.models.base import HostQuery, BaseHostModel
remove_rich_color_pattern = re.compile(r"\[([\w ]+)\](.*?)\[/\1\]")
log = logging.getLogger(__name__)
class ActivityQuery(HostQuery):
"""Base request body for activity query/count endpoints."""
type: str | None = Field(None, description="Filter by activity type")
class Activity(BaseHostModel):
"""
An Activity is BBOT server's equivalent of an event.
Activities are emitted whenever an agent connects, a scan starts, a new open port is detected, etc.
They are usually associated with an asset, and can be traced back to a specific BBOT event.
"""
__store_type__ = "asset"
__table_name__ = "history"
# id is a UUID
id: Annotated[str, "indexed", "unique"] = Field(default_factory=lambda: str(uuid.uuid4()))
timestamp: Annotated[float, "indexed"] = Field(
description="Timestamp matching the event that triggered this activity"
)
created: Annotated[float, "indexed"] = Field(
default_factory=utc_now, description="Time when this activity was created"
)
archived: Annotated[bool, "indexed"] = False
description: Annotated[str, "indexed", "indexed-text"]
description_colored: str = Field(default="")
detail: dict[str, Any] = {}
module: Annotated[Optional[str], "indexed"] = None
scan: Annotated[Optional[str], "indexed"] = None
host: Annotated[Optional[str], "indexed"] = None
parent_event_uuid: Annotated[Optional[str], "indexed"] = None
parent_event_id: Annotated[Optional[str], "indexed"] = None
parent_scan_run_id: Annotated[Optional[str], "indexed"] = None
parent_activity_id: Annotated[Optional[str], "indexed"] = None
def __init__(self, *args, **kwargs):
# must have a description
if not "description" in kwargs:
raise ValueError("description is required")
# default timestamp is now
if not "timestamp" in kwargs:
kwargs["timestamp"] = datetime.now(timezone.utc).timestamp()
# make a non-colored version of the description
if "description_colored" not in kwargs:
description = kwargs["description"]
# we save the description in two forms - colored and uncolored
kwargs["description_colored"] = description.replace("DARK_COLOR", DARK_COLOR).replace("COLOR", COLOR)
kwargs["description"] = remove_rich_color_pattern.sub(r"\2", description)
event = kwargs.pop("event", None)
parent_activity = kwargs.pop("parent_activity", None)
super().__init__(*args, **kwargs)
if event is not None:
self.set_event(event)
if parent_activity is not None:
self.set_activity(parent_activity)
def set_event(self, event):
"""
Copy data from a BBOT event into the activity
"""
self.parent_event_id = event.id
self.parent_event_uuid = event.uuid
self.module = event.module
self.timestamp = event.timestamp
self.parent_scan_run_id = event.scan
if event.host and not self.host:
self.host = event.host
if event.port and not self.port:
self.port = event.port
if event.netloc and not self.netloc:
self.netloc = event.netloc
if event.scan and not self.scan:
self.scan = event.scan
# handle url
event_data_json = getattr(event, "data_json", None)
if event_data_json is not None:
url = event_data_json.get("url", None)
if url is not None:
self.url = url
def set_activity(self, activity: "Activity"):
"""
Copy data from another activity into this one
"""
self.parent_activity_id = activity.id
for attr_name in (
"url",
"host",
"port",
"module",
"netloc",
"scan",
"parent_event_id",
"parent_event_uuid",
"parent_scan_run_id",
):
# only copy the attribute if it's not already set on self
self_attr = getattr(self, attr_name, None)
activity_attr = getattr(activity, attr_name, None)
if not self_attr and activity_attr:
setattr(self, attr_name, activity_attr)
# @cached_property
# def id(self):
# return f"{self.type}:{self.host}:{self.description}"
@computed_field
@property
def reverse_host(self) -> Annotated[Optional[str], "indexed"]:
if self.host is not None:
return self.host[::-1]
return None
@cached_property
def hash(self):
return sha1(f"{self.type}:{self.netloc}:{self.description}".encode()).hexdigest()
def __eq__(self, other):
return self.hash == other.hash