Skip to content

Commit f99ae04

Browse files
ncatlindoomedraven
andauthored
New feature: Cape dynamic test framework (kevoreilly#2904)
* initial ui and DB implementation commit * database.py refactored into multiple mixins * backend implementation of objectives, ui alignment * implement test updating/removal during reload * hide inactive tests from tasking * implement audit session deletion * tests are now fully evaluated and reported * session auto-refresh, task storage deletion * queue clearing, config and timing display improvements * implement audit session list paging * remove tasks from analysis search. implement tags_tasks_not_like filter in list tasks. * code tidying and moving files to final structure * audit framework: implement conf enable toggle. add auth decorator to all paths * fix some audit bugs * clear old audit py module structure, fix queue bugs * add config edit functionality * pluralize table row counts * prepare audit_packages dir * prepare audit_packages dir * resolve lingering issues from fork * these modules largely use the tasking functions, so label as tasking mixin to improve intellisense * fix infuriating visual studio django auto formatting * more minor fixes - exception handling, imports * final minor changes * linting, improve config update workflow, test cascade * more linting * cleared ruff issues * fix pytest issues * fix pytest issues * resolve odd fstring complaint from tests * typo * Use test status constants; fix audit task handling Standardize status checks by replacing hard-coded status strings with TEST_* constants across audit code and web views. Improve TestLoader payload handling to treat an extracted directory with multiple items as the payload rather than raising an error. Fix a typo in a docstring. Optimize update_audit_tasks_status to only update/evaluate runs when the status actually changes and commit the DB session once if any changes occurred. Also prevent re-queuing tests that aren't in the unqueued state and use constants when unqueuing/queuing runs. * Update imports: remove unused statuses, add TASK_RUNNING Remove unused test status constants (TEST_FAILED, TEST_UNQUEUED) from lib/cuckoo/core/data/audits.py imports, add TASK_RUNNING import to modules/machinery/az.py so the Azure machinery can reference the running state, and clean up a stray trailing whitespace in lib/cuckoo/common/audit_utils.py. These changes tidy up imports and prevent missing-constant usage in the AZ module. * rework audit db usage, improve config edits, improve add test UX * Update audits.py * Revert "Update audits.py" This reverts commit f142b50. * improve db and error handling of test reloading * ruff blank lines * docs --------- Co-authored-by: doomedraven <doommedraven@gmail.com>
1 parent a5eab85 commit f99ae04

46 files changed

Lines changed: 5552 additions & 2458 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ tests/test_bson.bson.compressed
2323
installer/cape-config.sh
2424
installer/kvm-config.sh
2525

26-
docs/book/src/_build
26+
docs/book/src/_build
27+
/.vs

conf/default/web.conf.default

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,3 +246,6 @@ enabled = no
246246

247247
[pcap_ng]
248248
enabled = no
249+
250+
[audit_framework]
251+
enabled = no

docs/book/src/usage/audit.rst

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
.. _audit_framework:
2+
3+
===============
4+
Audit Framework
5+
===============
6+
7+
The Audit Framework is a specialized subsystem within CAPE designed to verify the correctness and reliability of the sandbox's analysis capabilities. It allows operators to define specific test cases ("Audit Packages") that run known samples with expected behavioral outcomes ("Objectives"). This is particularly useful for validating that CAPE is correctly capturing specific behaviors (e.g., shellcode injection, network beacons) after updates or configuration changes.
8+
9+
Concepts
10+
========
11+
12+
* **Available Test**: A test case definition stored on the disk. It consists of a payload (e.g., a malware sample) and a Python script defining the success criteria.
13+
* **Test Session**: A collection of test runs. You can group multiple tests into a session to validate a specific aspect of the system (e.g., "Weekly Regression Test").
14+
* **Test Run**: A single execution of an *Available Test* within a *Test Session*. It links to a standard CAPE Task ID.
15+
* **Objective**: A specific criterion that must be met for a test to pass (e.g., "DNS request to evil.com observed", "File dropped in AppData").
16+
17+
Configuration
18+
=============
19+
20+
To enable the Audit Framework, ensure the feature is enabled in your web configuration.
21+
22+
Edit ``conf/web.conf``:
23+
24+
.. code-block:: ini
25+
26+
[audit_framework]
27+
enabled = yes
28+
29+
The framework looks for test packages in ``tests/audit_packages/`` by default.
30+
31+
Creating Audit Packages
32+
=======================
33+
34+
Audit packages are directory-based. Each package must be a subdirectory inside ``tests/audit_packages/`` (or the configured path) containing at least two files:
35+
36+
1. ``payload.zip``: A zip file containing the sample to be analyzed.
37+
* *Note*: If the zip contains a single file, that file is treated as the payload. If it contains multiple files, the extracted directory is treated as the payload (useful for packages requiring dependencies).
38+
2. ``test.py``: A Python script defining the test metadata, objectives, and evaluation logic.
39+
40+
Directory Structure Example
41+
---------------------------
42+
43+
.. code-block:: text
44+
45+
tests/audit_packages/
46+
├── Emotet_Network_Beacon/
47+
│ ├── payload.zip
48+
│ └── test.py
49+
└── AsyncRAT_Config_Extract/
50+
├── payload.zip
51+
└── test.py
52+
53+
The ``test.py`` Structure
54+
-------------------------
55+
56+
The Python script must define a class named ``CapeDynamicTest`` that implements the following methods:
57+
58+
* ``get_metadata()``: Returns a dictionary of test settings.
59+
* ``get_objectives()``: Returns a list of objective objects.
60+
* ``evaluate_results(task_dir)``: Analyzes the analysis results.
61+
* ``get_results()``: Returns the final status of objectives.
62+
63+
**Example `test.py`:**
64+
65+
.. code-block:: python
66+
67+
import os
68+
import json
69+
70+
class TestObjective:
71+
def __init__(self, name, requirement, children=None):
72+
self.name = name
73+
self.requirement = requirement
74+
self.children = children or []
75+
76+
class CapeDynamicTest:
77+
def __init__(self):
78+
self._results = {}
79+
80+
def get_metadata(self):
81+
"""
82+
Define high-level test information.
83+
"""
84+
return {
85+
"Name": "Emotet Beacon Test",
86+
"Description": "Verifies that CAPE detects the C2 network connection.",
87+
"Package": "exe", # CAPE analysis package to use
88+
"Timeout": 200, # Analysis timeout in seconds
89+
"Zip Password": "infected" # Password for payload.zip (optional)
90+
}
91+
92+
def get_objectives(self):
93+
"""
94+
Define the criteria for success.
95+
"""
96+
return [
97+
TestObjective("network_c2", "Must connect to C2 server 1.2.3.4"),
98+
TestObjective("dropped_payload", "Must drop the second stage loader")
99+
]
100+
101+
def evaluate_results(self, task_dir):
102+
"""
103+
Parse the CAPE report to verify objectives.
104+
task_dir: Path to the storage directory for this task (contains report.json, etc.)
105+
"""
106+
report_path = os.path.join(task_dir, "reports", "report.json")
107+
108+
# Default state
109+
self._results = {
110+
"network_c2": {"state": "failure", "state_reason": "IP not found"},
111+
"dropped_payload": {"state": "failure", "state_reason": "File not found"}
112+
}
113+
114+
if not os.path.exists(report_path):
115+
return
116+
117+
with open(report_path, "r") as f:
118+
report = json.load(f)
119+
120+
# Check Network
121+
for host in report.get("network", {}).get("hosts", []):
122+
if host == "1.2.3.4":
123+
self._results["network_c2"] = {"state": "success", "state_reason": "Connection found"}
124+
125+
# Check Dropped Files
126+
if "dropped" in report:
127+
self._results["dropped_payload"] = {"state": "success", "state_reason": "Dropped files present"}
128+
129+
def get_results(self):
130+
"""
131+
Return the dictionary of results calculated in evaluate_results.
132+
Keys must match the Objective names.
133+
"""
134+
return self._results
135+
136+
Web Interface Usage
137+
===================
138+
139+
Access the Audit interface via the sidebar menu or at ``/audit/``.
140+
141+
1. **Manage Tests**:
142+
The main dashboard lists all available tests.
143+
* If you have added new tests to the disk, click **Reload Tests** to update the database.
144+
145+
2. **Create Session**:
146+
* Select the checkboxes next to the tests you wish to run.
147+
* Click **Create Session**.
148+
* You will be redirected to the Session view.
149+
150+
3. **Run Audit**:
151+
* In the Session view, you can see the status of each test (Unqueued, Queued, Running, Complete).
152+
* Click **Queue All** to submit all unqueued tests to CAPE.
153+
* The status will update automatically as CAPE processes the tasks.
154+
155+
4. **View Results**:
156+
* Once a test is ``Complete``, the framework automatically runs the ``evaluate_results`` logic from your `test.py`.
157+
* The UI will display a **Pass** (Green) or **Fail** (Red) badge for each objective.
158+
* You can expand a test row to see detailed reasons for failure or success.

docs/book/src/usage/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ This chapter explains how to use CAPE.
1212
submit
1313
web
1414
api
15+
audit
1516
dist
1617
cluster_administration
1718
packages

lib/cuckoo/common/abstracts.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
from lib.cuckoo.common.path_utils import path_exists, path_mkdir
4141
from lib.cuckoo.common.url_validate import url as url_validator
4242
from lib.cuckoo.common.utils import create_folder, get_memdump_path, load_categories
43-
from lib.cuckoo.core.database import Database, Machine, _Database, Task
43+
from lib.cuckoo.core.database import Database, _Database
44+
from lib.cuckoo.core.data.task import Task
45+
from lib.cuckoo.core.data.machines import Machine
4446

4547
try:
4648
import re2 as re

lib/cuckoo/common/audit_utils.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import os
2+
import logging
3+
import zipfile
4+
import shutil
5+
from pathlib import Path
6+
from typing import Any, List, Dict
7+
import importlib.util
8+
from lib.cuckoo.core.data import task as db_task
9+
from lib.cuckoo.core.data.audit_data import TEST_RUNNING, TEST_COMPLETE, TEST_FAILED, TEST_QUEUED
10+
11+
log = logging.getLogger(__name__)
12+
13+
def load_module(module_path):
14+
module_name = "test_py_module"
15+
spec = importlib.util.spec_from_file_location(module_name, str(module_path))
16+
module = importlib.util.module_from_spec(spec)
17+
spec.loader.exec_module(module)
18+
19+
if not hasattr(module, 'CapeDynamicTest'):
20+
log.warning(str(dir(module)))
21+
raise ValueError("Module has no CapeDynamicTest class")
22+
tester = module.CapeDynamicTest()
23+
24+
if not hasattr(tester, 'get_metadata'):
25+
raise ValueError(f"CapeDynamicTest from {module_path} lacks get_metadata() function")
26+
return tester
27+
28+
29+
class TestLoader():
30+
def __init__(self, tests_directory):
31+
if not os.path.exists(tests_directory):
32+
raise ValueError(f"Tests directory '{tests_directory}' does not exist.")
33+
self.tests_root = tests_directory
34+
35+
def _extract_payload(self, payload_archive, payload_output_dir, zip_password=None):
36+
37+
# Verify payload ZIP integrity
38+
try:
39+
with zipfile.ZipFile(payload_archive, 'r') as z:
40+
# If a password is provided in JSON, verify we can access the list
41+
if zip_password:
42+
z.setpassword(zip_password.encode())
43+
# Test if the zip is actually readable/not corrupt
44+
z.testzip()
45+
except zipfile.BadZipFile:
46+
if zip_password:
47+
raise zipfile.BadZipFile(f"{payload_archive} is not usable with the given password")
48+
else:
49+
raise zipfile.BadZipFile(f"{payload_archive} is corrupt")
50+
51+
# delete the unwrapped payload in case a new zip has been uploaded
52+
if os.path.exists(payload_output_dir):
53+
shutil.rmtree(payload_output_dir)
54+
55+
with zipfile.ZipFile(payload_archive, 'r') as zip_ref:
56+
if zip_password:
57+
zip_ref.extractall(payload_output_dir, pwd=zip_password)
58+
else:
59+
zip_ref.extractall(payload_output_dir)
60+
61+
payload_path = None
62+
if not os.path.isdir(payload_output_dir):
63+
raise NotADirectoryError("Bad payload directory extracted")
64+
65+
dir_path = Path(payload_output_dir)
66+
dir_contents = list(dir_path.iterdir())
67+
if not dir_contents:
68+
raise FileNotFoundError("Nothing in extracted payload directory")
69+
70+
if len(dir_contents) == 1:
71+
payload_path = str(dir_contents[0])
72+
else:
73+
# If multiple items, treat the directory itself as the payload
74+
payload_path = payload_output_dir
75+
76+
if not os.path.exists(payload_path):
77+
raise FileNotFoundError("Nothing extracted from payload archive or it could not be written to disk")
78+
79+
return payload_path
80+
81+
def validate_test_directory(self, test_path: str) -> Dict[str, Any]:
82+
"""
83+
Validates a single test directory and returns the metadata from the test module.
84+
Raises ValueError if the anything is invalid.
85+
"""
86+
payload_archive = os.path.join(test_path, "payload.zip")
87+
module_path = os.path.join(test_path, "test.py")
88+
89+
# Check for required files
90+
if not os.path.exists(payload_archive):
91+
raise ValueError(f"Missing payload.zip in {payload_archive}")
92+
if not os.path.exists(module_path):
93+
raise ValueError(f"Missing test.py in {module_path}")
94+
95+
test_metadata = {}
96+
test_metadata['module_path'] = module_path
97+
98+
# Load and instantiate the python test module and fetch metadata
99+
try:
100+
tester = load_module(module_path)
101+
test_metadata['info'] = tester.get_metadata()
102+
103+
test_metadata['objectives'] = []
104+
105+
def load_objective(objective):
106+
objdict = {'name': objective.name,
107+
'requirement': objective.requirement,
108+
'children': [load_objective(child) for child in objective.children]
109+
}
110+
return objdict
111+
for objective in tester.get_objectives():
112+
test_metadata['objectives'].append(load_objective(objective))
113+
114+
except Exception as e:
115+
raise ValueError(f"Failed to load test module or fetch metadata from {module_path}: {e}")
116+
117+
conf = test_metadata['info'].get("Task Config", None)
118+
if conf:
119+
if conf.get("Request Options",None) is None:
120+
test_metadata['info']["Request Options"] = ""
121+
122+
if 'Name' not in test_metadata['info']:
123+
raise ValueError(f"Metadata in {module_path} missing 'Name' field")
124+
if 'Package' not in test_metadata['info']:
125+
raise ValueError(f"Metadata in {module_path} missing 'Package' field")
126+
127+
zip_password = test_metadata['info'].get("Zip Password", None)
128+
payload_output_dir = os.path.join(test_path, "payload")
129+
test_metadata['payload_path'] = self._extract_payload(payload_archive, payload_output_dir, zip_password)
130+
131+
# Return prepared metadata for DB ingest
132+
return test_metadata
133+
134+
def load_tests(self) -> List[Dict[str, Any]]:
135+
"""
136+
Walks the root directory and yields validated test configurations.
137+
"""
138+
available_tests = []
139+
unavailable_tests = []
140+
141+
if not os.path.exists(self.tests_root):
142+
log.error("Tests root %s does not exist.", self.tests_root)
143+
return {"error": f"Tests root {self.tests_root} does not exist."}
144+
145+
for entry in os.scandir(self.tests_root):
146+
if entry.is_dir():
147+
test_config = None
148+
try:
149+
test_config = self.validate_test_directory(entry.path)
150+
available_tests.append(test_config)
151+
log.info("Loaded test: %s",test_config['info']['Name'])
152+
except Exception as e:
153+
log.exception("Skipping directory %s due to exception",entry.path)
154+
unavailable_tests.append({"module_path":entry.path, "error":str(e)})
155+
156+
return {'available':available_tests, 'unavailable': unavailable_tests}
157+
158+
159+
class TestResultValidator():
160+
def __init__(self, test_module_path:str, task_storage_directory: str):
161+
if os.path.isdir(task_storage_directory):
162+
self.task_directory = task_storage_directory
163+
else:
164+
raise NotADirectoryError(f"Invalid task directory: {task_storage_directory}")
165+
166+
try:
167+
self.test_module = load_module(test_module_path)
168+
except Exception as e:
169+
raise ValueError(f"Failed to load test evaluation module {test_module_path}: {e}")
170+
171+
def evaluate(self):
172+
self.test_module.evaluate_results(self.task_directory)
173+
return self.test_module.get_results()
174+
175+
def task_status_to_run_status(cape_task_status):
176+
if cape_task_status == db_task.TASK_REPORTED:
177+
return TEST_COMPLETE
178+
if cape_task_status == db_task.TASK_PENDING:
179+
return TEST_QUEUED
180+
if cape_task_status in [db_task.TASK_RUNNING,
181+
db_task.TASK_DISTRIBUTED,
182+
db_task.TASK_RECOVERED,
183+
db_task.TASK_COMPLETED,
184+
db_task.TASK_DISTRIBUTED_COMPLETED]:
185+
return TEST_RUNNING
186+
if cape_task_status in [db_task.TASK_BANNED,
187+
db_task.TASK_FAILED_ANALYSIS,
188+
db_task.TASK_FAILED_PROCESSING,
189+
db_task.TASK_FAILED_REPORTING
190+
]:
191+
return TEST_FAILED
192+
193+
raise Exception(f"Unknown cape task status: {cape_task_status}")

0 commit comments

Comments
 (0)