Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions lib/cuckoo/common/cleaners_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
mongo_update_one,
mongo_update_many,
mongo_delete_calls_by_task_id_in_range,
mongo_delete_data_range
mongo_delete_data_range,
)
elif repconf.elasticsearchdb.enabled:
from dev_utils.elasticsearchdb import all_docs, delete_analysis_and_related_calls, get_analysis_index
Expand Down Expand Up @@ -238,7 +238,10 @@ def delete_bulk_tasks_n_folders(ids: list, delete_mongo: bool, delete_db_tasks=F
sys.exit()
mongo_delete_data(ids_tmp)
if delete_db_tasks:
db.delete_tasks(ids_tmp)
try:
db.delete_tasks(task_ids=ids_tmp)
except Exception as e:
log.error("Failed to delete tasks from DB: %s", str(e))


def fail_job(tid):
Expand Down Expand Up @@ -352,9 +355,7 @@ def cuckoo_clean_failed_tasks():
# ToDo rewrite for bulk delete
ids = [task.id for task in tasks_list]
delete_bulk_tasks_n_folders(ids, delete_mongo=True)
tasks_list = db.list_tasks(
status=f"{TASK_FAILED_ANALYSIS}|{TASK_FAILED_PROCESSING}|{TASK_FAILED_REPORTING}|{TASK_RECOVERED}", delete=True
)
tasks_list = db.delete_tasks(status=f"{TASK_FAILED_ANALYSIS}|{TASK_FAILED_PROCESSING}|{TASK_FAILED_REPORTING}|{TASK_RECOVERED}")


def cuckoo_clean_bson_suri_logs():
Expand Down Expand Up @@ -441,7 +442,7 @@ def cuckoo_clean_lower_score(malscore: int):
log.info("number of matching records %s", len(id_arr))
# resolver_pool.map(lambda tid: delete_data(tid), id_arr)
if id_arr:
delete_bulk_tasks_n_folders(id_arr, delete_mongo=True)
delete_bulk_tasks_n_folders(id_arr, delete_mongo=True, delete_db_tasks=True)
Comment thread
doomedraven marked this conversation as resolved.


def tmp_clean_before(timerange: str):
Expand Down Expand Up @@ -537,7 +538,7 @@ def cuckoo_clean_before(args: dict):
mongo_delete_data_range(range_end=highest_id)
# cleanup_files_collection_by_id(highest_id)

db.list_tasks(added_before=added_before, category=category, delete=True)
db.delete_tasks(added_before=added_before, category=category)


def cuckoo_clean_sorted_pcap_dump():
Expand Down Expand Up @@ -618,7 +619,7 @@ def cuckoo_clean_pending_tasks(timerange: str = None, delete: bool = False):
# clean_handler = delete_data if delete else fail_job
# resolver_pool.map(lambda tid: clean_handler(pending_tasks), pending_tasks)
if delete:
db.list_tasks(status=TASK_PENDING, added_before=before_time, delete=True)
db.delete_tasks(status=TASK_PENDING, added_before=before_time)
else:
resolver_pool.map(lambda tid: fail_job(pending_tasks), pending_tasks)

Expand All @@ -639,7 +640,7 @@ def cuckoo_clean_range_tasks(range_: str):
ids: list[int] = [task.id for task in pending_tasks]
delete_bulk_tasks_n_folders(ids, delete_mongo=False)
mongo_delete_data(ids)
db.list_tasks(id_after=(start - 1), id_before=(end + 1), delete=True)
db.delete_tasks(id_after=(start - 1), id_before=(end + 1))


def delete_unused_file_data_in_mongo():
Expand Down
138 changes: 116 additions & 22 deletions lib/cuckoo/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
if repconf.mongodb.enabled:
from dev_utils.mongodb import mongo_find
if repconf.elasticsearchdb.enabled:
from dev_utils.elasticsearchdb import elastic_handler # , get_analysis_index
from dev_utils.elasticsearchdb import elastic_handler # , get_analysis_index

es = elastic_handler

Expand Down Expand Up @@ -2020,7 +2020,6 @@ def list_tasks(
include_hashes=False,
user_id=None,
for_update=False,
delete=False,
) -> List[Task]:
"""Retrieve list of task.
@param limit: specify a limit of entries.
Expand All @@ -2042,7 +2041,6 @@ def list_tasks(
@param include_hashes: return task+samples details
@param user_id: list of tasks submitted by user X
@param for_update: If True, use "SELECT FOR UPDATE" in order to create a row-level lock on the selected tasks.
@param delete: delete selected tasks
@return: list of tasks.
"""
tasks: List[Task] = []
Expand Down Expand Up @@ -2090,10 +2088,6 @@ def list_tasks(
if user_id is not None:
search = search.filter(Task.user_id == user_id)

if delete:
search.delete()
return []

if order_by is not None and isinstance(order_by, tuple):
search = search.order_by(*order_by)
elif order_by is not None:
Expand All @@ -2108,6 +2102,121 @@ def list_tasks(

return tasks

def delete_task(self, task_id):
"""Delete information on a task.
@param task_id: ID of the task to query.
@return: operation status.
"""
task = self.session.get(Task, task_id)
if task is None:
return False
self.session.delete(task)
return True

def delete_tasks(
self,
category=None,
status=None,
sample_id=None,
not_status=None,
completed_after=None,
added_before=None,
id_before=None,
id_after=None,
options_like=False,
options_not_like=False,
tags_tasks_like=False,
task_ids=False,
user_id=None,
):
"""Delete tasks based on parameters. If no filters are provided, no tasks will be deleted.

Args:
category: filter by category
status: filter by task status
sample_id: filter tasks for a sample
not_status: exclude this task status from filter
completed_after: only list tasks completed after this timestamp
added_before: tasks added before a specific timestamp
id_before: filter by tasks which is less than this value
id_after: filter by tasks which is greater than this value
options_like: filter tasks by specific option inside of the options
options_not_like: filter tasks by specific option not inside of the options
tags_tasks_like: filter tasks by specific tag
task_ids: list of task_id
user_id: list of tasks submitted by user X

Returns:
bool: True if the operation was successful (including no tasks to delete), False otherwise.
"""
filters_applied = False
search = self.session.query(Task)

if status:
if "|" in status:
search = search.filter(Task.status.in_(status.split("|")))
else:
search = search.filter(Task.status == status)
filters_applied = True
if not_status:
search = search.filter(Task.status != not_status)
filters_applied = True
if category:
search = search.filter(Task.category.in_([category] if isinstance(category, str) else category))
filters_applied = True
if sample_id is not None:
search = search.filter(Task.sample_id == sample_id)
filters_applied = True
if id_before is not None:
search = search.filter(Task.id < id_before)
filters_applied = True
if id_after is not None:
search = search.filter(Task.id > id_after)
filters_applied = True
if completed_after:
search = search.filter(Task.completed_on > completed_after)
filters_applied = True
if added_before:
search = search.filter(Task.added_on < added_before)
filters_applied = True
if options_like:
# Replace '*' wildcards with wildcard for sql
options_like = options_like.replace("*", "%")
search = search.filter(Task.options.like(f"%{options_like}%"))
filters_applied = True
if options_not_like:
# Replace '*' wildcards with wildcard for sql
options_not_like = options_not_like.replace("*", "%")
search = search.filter(Task.options.notlike(f"%{options_not_like}%"))
filters_applied = True
if tags_tasks_like:
search = search.filter(Task.tags_tasks.like(f"%{tags_tasks_like}%"))
filters_applied = True
if task_ids:
search = search.filter(Task.id.in_(task_ids))
filters_applied = True
if user_id is not None:
search = search.filter(Task.user_id == user_id)
filters_applied = True

if not filters_applied:
log.warning("No filters provided for delete_tasks. No tasks will be deleted.")
return True # Indicate success as no deletion was requested/needed

try:
# Perform the deletion and get the count of deleted rows
deleted_count = search.delete(synchronize_session=False)
log.info("Deleted %d tasks matching the criteria.", deleted_count)
# The commit is handled by the calling context (e.g., `with db.session.begin():`)
return True
except Exception as e:
log.error("Error deleting tasks: %s", str(e))
# Rollback might be needed if this function is called outside a `with db.session.begin():`
# but typically it should be called within one.
# self.session.rollback()
return False


def check_tasks_timeout(self, timeout):
"""Find tasks which were added_on more than timeout ago and clean"""
tasks: List[Task] = []
Expand Down Expand Up @@ -2212,21 +2321,6 @@ def add_statistics_to_task(self, task_id, details): # pragma: no cover
task.anti_issues = details["anti_issues"]
return True

def delete_task(self, task_id):
"""Delete information on a task.
@param task_id: ID of the task to query.
@return: operation status.
"""
task = self.session.get(Task, task_id)
if task is None:
return False
self.session.delete(task)
return True

def delete_tasks(self, ids):
self.session.query(Task).filter(Task.id.in_(ids)).delete(synchronize_session=False)
return True

def view_sample(self, sample_id):
"""Retrieve information on a sample given a sample id.
@param sample_id: ID of the sample to query.
Expand Down
6 changes: 3 additions & 3 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,12 +909,12 @@ def test_delete_tasks(self, db: _Database, temp_filename):
t2 = db.add_path(temp_filename, tags="x86")
t3 = db.add_url("https://3.com")
with db.session.begin():
assert db.delete_tasks([])
assert db.delete_tasks([t1, t2, t3 + 1])
assert db.delete_tasks(task_ids=[])
assert db.delete_tasks(task_ids=[t1, t2, t3 + 1])
tasks = db.session.query(Task).all()
assert len(tasks) == 1
assert tasks[0].id == t3
assert db.delete_tasks([t1, t2])
assert db.delete_tasks(task_ids=[t1, t2])
tasks = db.session.query(Task).all()
assert len(tasks) == 1
assert tasks[0].id == t3
Expand Down