Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 133 additions & 41 deletions modules/processing/sigma.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _parse_flags(flags_str):
"""Parse flag string like 'im' into re flags int and negate bool."""
flags = 0
negate = False
for ch in (flags_str or ""):
for ch in flags_str or "":
if ch == "n":
negate = True
elif ch in FLAG_MAP:
Expand Down Expand Up @@ -78,6 +78,7 @@ def _compile_match_group(group):

class _BoolExpr:
"""Simple AST nodes for boolean match_logic expressions."""

pass


Expand Down Expand Up @@ -273,17 +274,19 @@ def _load_filters(filters_path):
if packages:
packages = {p.lower() for p in packages}

compiled_filters.append({
"comment": filt.get("comment", ""),
"rules": set(rules),
"wildcard": "*" in rules,
"packages": packages,
"scope": filt.get("scope", "detection"),
"action": action,
"score": filt.get("score"),
"groups": compiled_groups,
"logic": logic,
})
compiled_filters.append(
{
"comment": filt.get("comment", ""),
"rules": set(rules),
"wildcard": "*" in rules,
"packages": packages,
"scope": filt.get("scope", "detection"),
"action": action,
"score": filt.get("score"),
"groups": compiled_groups,
"logic": logic,
}
)

log.debug("Loaded %d sigma filters from %s", len(compiled_filters), filters_path)
return compiled_filters
Expand Down Expand Up @@ -347,10 +350,7 @@ def apply_filters(detection, filters, package):
continue

if filt["scope"] == "event" and matched_events:
surviving_events = [
evt for evt in matched_events
if not _evaluate_filter_against_event(filt, evt)
]
surviving_events = [evt for evt in matched_events if not _evaluate_filter_against_event(filt, evt)]

if filt["action"] == "suppress":
if len(surviving_events) < len(matched_events):
Expand Down Expand Up @@ -473,8 +473,10 @@ def _run_zircolite(self, zircolite_path, rulesets, timeout, input_path, extra_ar
cmd = [
sys.executable,
zircolite_path,
"-e", input_path,
"-o", output_file,
"-e",
input_path,
"-o",
output_file,
"-q",
]
for rs in rulesets:
Expand Down Expand Up @@ -519,42 +521,58 @@ def _run_zircolite(self, zircolite_path, rulesets, timeout, input_path, extra_ar
shutil.rmtree(tmpdir, ignore_errors=True)

def _run_evtx(self, zircolite_path, rulesets, timeout):
"""Extract and scan EVTX files."""
"""Extract and scan EVTX files from all evtx zips (snapshots + final)."""
evtx_dir = os.path.join(self.analysis_path, "evtx")
evtx_zip = os.path.join(evtx_dir, "evtx.zip")

if not os.path.exists(evtx_zip):
log.debug("No evtx.zip found at %s", evtx_zip)
if not os.path.isdir(evtx_dir):
log.debug("No evtx directory found at %s", evtx_dir)
return None

# Collect all evtx zip files (evtx.zip + evtx_snapshot_*.zip)
evtx_zips = sorted(
os.path.join(evtx_dir, f)
for f in os.listdir(evtx_dir)
if f.endswith(".zip")
)
if not evtx_zips:
log.debug("No evtx zip files found in %s", evtx_dir)
return None

tmpdir = None
try:
tmpdir = tempfile.mkdtemp(prefix="cape_sigma_evtx_")
real_tmpdir = os.path.realpath(tmpdir)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real_tmpdir is assigned but never used after the refactor to extract multiple zip files. Removing it will avoid dead code (and potential lint failures).

Suggested change
real_tmpdir = os.path.realpath(tmpdir)

Copilot uses AI. Check for mistakes.
with zipfile.ZipFile(evtx_zip, "r") as zf:
# Check total uncompressed size to prevent zip bombs
max_extracted = 5 * 1024 * 1024 * 1024 # 5 GB
total_uncompressed = sum(m.file_size for m in zf.infolist())
if total_uncompressed > max_extracted:
log.warning("evtx.zip uncompressed size too large (%d bytes), skipping", total_uncompressed)
return None

for member in zf.infolist():
# Reject symlinks based on Unix external attributes
if (member.external_attr >> 16) & 0o170000 == 0o120000:
log.warning("Symlink in evtx.zip rejected: %s", member.filename)
return None
target = os.path.realpath(os.path.join(tmpdir, member.filename))
if not target.startswith(real_tmpdir + os.sep) and target != real_tmpdir:
log.warning("Zip slip attempt in evtx.zip: %s", member.filename)
return None
zf.extract(member, tmpdir)

# Extract all evtx zips (snapshots are incremental, each
# contains events since the last wipe)
max_extracted = 5 * 1024 * 1024 * 1024 # 5 GB
for zip_idx, evtx_zip in enumerate(evtx_zips):
subdir = os.path.join(tmpdir, str(zip_idx))
os.makedirs(subdir, exist_ok=True)
try:
with zipfile.ZipFile(evtx_zip, "r") as zf:
total_uncompressed = sum(m.file_size for m in zf.infolist())
if total_uncompressed > max_extracted:
log.warning("evtx zip too large (%d bytes), skipping: %s", total_uncompressed, evtx_zip)
continue

Comment on lines +546 to +558
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zip-bomb protection is now applied per-zip (total_uncompressed > max_extracted) but not across all snapshot zips. A crafted analysis could include many zips each under 5GB and still force huge total extraction + disk usage. Consider tracking cumulative extracted bytes across all zips (and/or capping number of zips) and aborting once a global limit is exceeded, similar to the previous single-zip behavior.

Copilot uses AI. Check for mistakes.
for member in zf.infolist():
if (member.external_attr >> 16) & 0o170000 == 0o120000:
log.warning("Symlink in evtx zip rejected: %s", member.filename)
continue
target = os.path.realpath(os.path.join(subdir, member.filename))
if not target.startswith(os.path.realpath(subdir) + os.sep) and target != os.path.realpath(subdir):
log.warning("Zip slip attempt in evtx zip: %s", member.filename)
continue
zf.extract(member, subdir)
except Exception as e:
log.debug("Failed to extract %s: %s", evtx_zip, e)

# Defense-in-depth: check for symlinks after extraction
for root, dirs, files in os.walk(tmpdir):
for name in files + dirs:
if os.path.islink(os.path.join(root, name)):
log.warning("Symlink found in evtx.zip: %s", name)
log.warning("Symlink found in evtx: %s", name)
return None

evtx_files = []
Expand All @@ -566,6 +584,80 @@ def _run_evtx(self, zircolite_path, rulesets, timeout):
log.debug("No .evtx files found in archive")
return None

# Filter analyzer noise: convert evtx to JSONL via evtx_dump,
# strip events from the CAPE analyzer parent process, and
# feed clean JSONL to zircolite.
evtx_dump_bin = self.options.get("evtx_dump_bin", "/usr/local/bin/evtx_dump")
# Load analyzer noise filter from shared config
analyzer_exclude = set()
try:
filters_path = self.options.get("filters", "data/sigma/filters.json")
filters_local = self.options.get("filters_local", "data/sigma/filters_local.json")
for fp in [filters_path, filters_local]:
if fp and not os.path.isabs(fp):
fp = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), fp)
if fp and os.path.exists(fp):
with open(fp) as _f:
_data = json.load(_f)
_pf = _data.get("pre_filters", {})
for p in _pf.get("exclude_parent_processes", []):
analyzer_exclude.add(p.lower())
for p in _pf.get("exclude_image_processes", []):
analyzer_exclude.add(p.lower())
for p in _pf.get("exclude_target_paths", []):
analyzer_exclude.add(p.lower())
except Exception:
pass
Comment on lines +593 to +610
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pre_filters config load wraps everything in except Exception: pass, which will silently ignore JSON parsing errors / path issues and make misconfiguration hard to diagnose. Consider logging at least a debug/warning with the exception (and which file failed), while still falling back to defaults.

Copilot uses AI. Check for mistakes.
if not analyzer_exclude:
analyzer_exclude = {"icacls.exe", "python.exe", "wevtutil.exe", "conhost.exe"}
# Compile a single regex for efficient matching
exclude_re = re.compile("|".join(re.escape(p) for p in analyzer_exclude), re.IGNORECASE)

if os.path.isfile(evtx_dump_bin):
filtered_dir = os.path.join(tmpdir, "filtered")
os.makedirs(filtered_dir, exist_ok=True)
has_filtered = False
for evtx_file in evtx_files:
try:
# Use unique name per snapshot to avoid collisions
rel_path = os.path.relpath(evtx_file, tmpdir)
basename = rel_path.replace(os.sep, "_").rsplit(".", 1)[0] + ".json"
jsonl_path = os.path.join(filtered_dir, basename)
# Stream output line-by-line to avoid loading all into memory
proc = subprocess.Popen(
[evtx_dump_bin, "-o", "jsonl", evtx_file],
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True,
)
Comment on lines +620 to +630
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The evtx_dump step uses a hardcoded timeout=60 per EVTX file and is not bounded by the timeout passed into _run_evtx (which is intended to cap processing time). With multiple snapshots/logs, total runtime can far exceed the configured sigma timeout. Consider deriving a per-file timeout from the remaining overall budget or making it configurable via self.options.

Copilot uses AI. Check for mistakes.
with open(jsonl_path, "w") as out:
for line in proc.stdout:
line = line.rstrip("\n")
if not line.strip():
continue
# Check specific JSON fields rather than substring on whole line
try:
evt = json.loads(line)
event_data = evt.get("Event", {}).get("EventData", {})
image = str(event_data.get("Image", ""))
parent = str(event_data.get("ParentImage", ""))
target = str(event_data.get("TargetFilename", ""))
if exclude_re.search(image) or exclude_re.search(parent) or exclude_re.search(target):
continue
except (json.JSONDecodeError, AttributeError):
pass
out.write(line + "\n")
proc.wait(timeout=120)
if os.path.getsize(jsonl_path) > 0:
has_filtered = True
except Exception as e:
log.debug("evtx_dump filter failed for %s: %s", evtx_file, e)

if has_filtered:
return self._run_zircolite(
zircolite_path, rulesets, timeout, filtered_dir,
extra_args=["--jsonl"]
)
log.debug("evtx_dump filtering produced no output, falling back to raw evtx")

return self._run_zircolite(zircolite_path, rulesets, timeout, tmpdir)
except Exception as e:
log.warning("EVTX sigma scan failed: %s", e)
Expand Down
Loading