Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/app/blueprints/rest/case/case_timeline_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def case_edit_event(cur_id, caseid):
return response_success("Event updated", data=event_dump)

except ValidationError as e:
return response_error(e.get_message(), data=e.get_data())
return response_error('Data error', data=e.normalized_messages())
except BusinessProcessingError as e:
return response_error(e.get_message(), data=e.get_data())

Expand Down
45 changes: 45 additions & 0 deletions source/app/business/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,55 @@
from app.iris_engine.module_handler.module_handler import call_modules_hook


def _validate_event_parent_relationship(event: CasesEvent):
"""Validate that parent assignment stays inside case and does not create cycles."""
parent_event_id = event.parent_event_id
if parent_event_id is None:
return

if parent_event_id == event.event_id:
raise BusinessProcessingError('An event cannot be its own parent')

visited_parent_ids = set()
guard = 0
max_depth = 10000

while parent_event_id is not None and guard < max_depth:
if parent_event_id in visited_parent_ids:
raise BusinessProcessingError('Invalid parent event hierarchy')
visited_parent_ids.add(parent_event_id)

parent_event = CasesEvent.query.with_entities(
CasesEvent.event_id,
CasesEvent.case_id,
CasesEvent.parent_event_id
).filter(
CasesEvent.event_id == parent_event_id
).first()
if not parent_event:
raise BusinessProcessingError('Invalid parent event ID')

if parent_event.case_id != event.case_id:
raise BusinessProcessingError('Parent event must belong to the same case')

if parent_event.event_id == event.event_id:
raise BusinessProcessingError('Parent event assignment would create a cycle')

parent_event_id = parent_event.parent_event_id
guard += 1

if guard >= max_depth:
raise BusinessProcessingError('Parent event hierarchy is too deep')


def events_create(case_identifier, event: CasesEvent, event_category_id, event_assets, event_iocs, sync_iocs_assets) -> CasesEvent:

event.case_id = case_identifier
event.event_added = datetime.utcnow()
event.user_id = iris_current_user.id

_validate_event_parent_relationship(event)

add_obj_history_entry(event, 'created')

db.session.add(event)
Expand Down Expand Up @@ -75,6 +118,8 @@ def events_get(identifier) -> CasesEvent:


def events_update(event: CasesEvent, event_category_id, event_assets, event_iocs, event_sync_iocs_assets) -> CasesEvent:
_validate_event_parent_relationship(event)

add_obj_history_entry(event, 'updated')

update_timeline_state(event.case_id)
Expand Down
26 changes: 26 additions & 0 deletions tests/tests_rest_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,32 @@ def test_update_event_should_set_event_parent_id_when_provided(self):
response = self._subject.update(f'/api/v2/cases/{case_identifier}/events/{identifier}', body).json()
self.assertEqual(parent_event_identifier, response['parent_event_id'])

def test_update_event_should_return_400_when_parent_assignment_creates_cycle(self):
case_identifier = self._subject.create_dummy_case()

parent_body = {'event_title': 'parent', 'event_category_id': 1,
'event_date': '2025-03-26T00:00:00.000', 'event_tz': '+00:00',
'event_assets': [], 'event_iocs': []}
parent_event = self._subject.create(f'/api/v2/cases/{case_identifier}/events', parent_body).json()

child_body = {'event_title': 'child', 'event_category_id': 1,
'event_date': '2025-03-26T01:00:00.000', 'event_tz': '+00:00',
'event_assets': [], 'event_iocs': [],
'parent_event_id': parent_event['event_id']}
child_event = self._subject.create(f'/api/v2/cases/{case_identifier}/events', child_body).json()

# Try to make the parent a child of its own child: this must be rejected.
update_parent_body = {'event_title': 'parent', 'event_category_id': 1,
'event_date': '2025-03-26T00:00:00.000', 'event_tz': '+00:00',
'event_assets': [], 'event_iocs': [],
'parent_event_id': child_event['event_id']}

response = self._subject.update(
f'/api/v2/cases/{case_identifier}/events/{parent_event["event_id"]}',
update_parent_body
)
self.assertEqual(400, response.status_code)

def test_delete_event_should_return_204(self):
case_identifier = self._subject.create_dummy_case()
body = {'event_title': 'title', 'event_category_id': 1,
Expand Down
22 changes: 22 additions & 0 deletions tests/tests_rest_miscellaneous.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,28 @@ def test_get_timeline_state_should_return_200(self):
response = self._subject.get('/case/timeline/state', query_parameters={'cid': 1})
self.assertEqual(200, response.status_code)

def test_legacy_timeline_event_update_should_return_400_for_validation_error(self):
case_identifier = self._subject.create_dummy_case()
body = {'event_title': 'title', 'event_category_id': 1,
'event_date': '2025-03-26T00:00:00.000', 'event_tz': '+00:00',
'event_assets': [], 'event_iocs': []}
event = self._subject.create(f'/api/v2/cases/{case_identifier}/events', body).json()

invalid_update_payload = {'event_title': 'title',
'event_category_id': None,
'event_date': '2025-03-26T00:00:00.000',
'event_tz': '+00:00',
'event_assets': [],
'event_iocs': []}

response = self._subject.create(
f'/case/timeline/events/update/{event["event_id"]}',
invalid_update_payload,
query_parameters={'cid': case_identifier}
)

self.assertEqual(400, response.status_code)

# TODO should probably move this in a test suite related to modules?
# TODO skipping this tests, because it randomly triggers exceptions in the iriswebappp_worker
# (psycopg2.errors.NotNullViolation) null value in column "client_id" violates not-null constraint
Expand Down
Loading