-
Notifications
You must be signed in to change notification settings - Fork 59
Sigma: support EVTX periodic snapshots and analyzer noise filtering #553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,7 +40,7 @@ def _parse_flags(flags_str): | |
| """Parse flag string like 'im' into re flags int and negate bool.""" | ||
| flags = 0 | ||
| negate = False | ||
| for ch in (flags_str or ""): | ||
| for ch in flags_str or "": | ||
| if ch == "n": | ||
| negate = True | ||
| elif ch in FLAG_MAP: | ||
|
|
@@ -78,6 +78,7 @@ def _compile_match_group(group): | |
|
|
||
| class _BoolExpr: | ||
| """Simple AST nodes for boolean match_logic expressions.""" | ||
|
|
||
| pass | ||
|
|
||
|
|
||
|
|
@@ -273,17 +274,19 @@ def _load_filters(filters_path): | |
| if packages: | ||
| packages = {p.lower() for p in packages} | ||
|
|
||
| compiled_filters.append({ | ||
| "comment": filt.get("comment", ""), | ||
| "rules": set(rules), | ||
| "wildcard": "*" in rules, | ||
| "packages": packages, | ||
| "scope": filt.get("scope", "detection"), | ||
| "action": action, | ||
| "score": filt.get("score"), | ||
| "groups": compiled_groups, | ||
| "logic": logic, | ||
| }) | ||
| compiled_filters.append( | ||
| { | ||
| "comment": filt.get("comment", ""), | ||
| "rules": set(rules), | ||
| "wildcard": "*" in rules, | ||
| "packages": packages, | ||
| "scope": filt.get("scope", "detection"), | ||
| "action": action, | ||
| "score": filt.get("score"), | ||
| "groups": compiled_groups, | ||
| "logic": logic, | ||
| } | ||
| ) | ||
|
|
||
| log.debug("Loaded %d sigma filters from %s", len(compiled_filters), filters_path) | ||
| return compiled_filters | ||
|
|
@@ -347,10 +350,7 @@ def apply_filters(detection, filters, package): | |
| continue | ||
|
|
||
| if filt["scope"] == "event" and matched_events: | ||
| surviving_events = [ | ||
| evt for evt in matched_events | ||
| if not _evaluate_filter_against_event(filt, evt) | ||
| ] | ||
| surviving_events = [evt for evt in matched_events if not _evaluate_filter_against_event(filt, evt)] | ||
|
|
||
| if filt["action"] == "suppress": | ||
| if len(surviving_events) < len(matched_events): | ||
|
|
@@ -473,8 +473,10 @@ def _run_zircolite(self, zircolite_path, rulesets, timeout, input_path, extra_ar | |
| cmd = [ | ||
| sys.executable, | ||
| zircolite_path, | ||
| "-e", input_path, | ||
| "-o", output_file, | ||
| "-e", | ||
| input_path, | ||
| "-o", | ||
| output_file, | ||
| "-q", | ||
| ] | ||
| for rs in rulesets: | ||
|
|
@@ -519,42 +521,58 @@ def _run_zircolite(self, zircolite_path, rulesets, timeout, input_path, extra_ar | |
| shutil.rmtree(tmpdir, ignore_errors=True) | ||
|
|
||
| def _run_evtx(self, zircolite_path, rulesets, timeout): | ||
| """Extract and scan EVTX files.""" | ||
| """Extract and scan EVTX files from all evtx zips (snapshots + final).""" | ||
| evtx_dir = os.path.join(self.analysis_path, "evtx") | ||
| evtx_zip = os.path.join(evtx_dir, "evtx.zip") | ||
|
|
||
| if not os.path.exists(evtx_zip): | ||
| log.debug("No evtx.zip found at %s", evtx_zip) | ||
| if not os.path.isdir(evtx_dir): | ||
| log.debug("No evtx directory found at %s", evtx_dir) | ||
| return None | ||
|
|
||
| # Collect all evtx zip files (evtx.zip + evtx_snapshot_*.zip) | ||
| evtx_zips = sorted( | ||
| os.path.join(evtx_dir, f) | ||
| for f in os.listdir(evtx_dir) | ||
| if f.endswith(".zip") | ||
| ) | ||
| if not evtx_zips: | ||
| log.debug("No evtx zip files found in %s", evtx_dir) | ||
| return None | ||
|
|
||
| tmpdir = None | ||
| try: | ||
| tmpdir = tempfile.mkdtemp(prefix="cape_sigma_evtx_") | ||
| real_tmpdir = os.path.realpath(tmpdir) | ||
| with zipfile.ZipFile(evtx_zip, "r") as zf: | ||
| # Check total uncompressed size to prevent zip bombs | ||
| max_extracted = 5 * 1024 * 1024 * 1024 # 5 GB | ||
| total_uncompressed = sum(m.file_size for m in zf.infolist()) | ||
| if total_uncompressed > max_extracted: | ||
| log.warning("evtx.zip uncompressed size too large (%d bytes), skipping", total_uncompressed) | ||
| return None | ||
|
|
||
| for member in zf.infolist(): | ||
| # Reject symlinks based on Unix external attributes | ||
| if (member.external_attr >> 16) & 0o170000 == 0o120000: | ||
| log.warning("Symlink in evtx.zip rejected: %s", member.filename) | ||
| return None | ||
| target = os.path.realpath(os.path.join(tmpdir, member.filename)) | ||
| if not target.startswith(real_tmpdir + os.sep) and target != real_tmpdir: | ||
| log.warning("Zip slip attempt in evtx.zip: %s", member.filename) | ||
| return None | ||
| zf.extract(member, tmpdir) | ||
|
|
||
| # Extract all evtx zips (snapshots are incremental, each | ||
| # contains events since the last wipe) | ||
| max_extracted = 5 * 1024 * 1024 * 1024 # 5 GB | ||
| for zip_idx, evtx_zip in enumerate(evtx_zips): | ||
| subdir = os.path.join(tmpdir, str(zip_idx)) | ||
| os.makedirs(subdir, exist_ok=True) | ||
| try: | ||
| with zipfile.ZipFile(evtx_zip, "r") as zf: | ||
| total_uncompressed = sum(m.file_size for m in zf.infolist()) | ||
| if total_uncompressed > max_extracted: | ||
| log.warning("evtx zip too large (%d bytes), skipping: %s", total_uncompressed, evtx_zip) | ||
| continue | ||
|
|
||
|
Comment on lines
+546
to
+558
|
||
| for member in zf.infolist(): | ||
| if (member.external_attr >> 16) & 0o170000 == 0o120000: | ||
| log.warning("Symlink in evtx zip rejected: %s", member.filename) | ||
| continue | ||
| target = os.path.realpath(os.path.join(subdir, member.filename)) | ||
| if not target.startswith(os.path.realpath(subdir) + os.sep) and target != os.path.realpath(subdir): | ||
| log.warning("Zip slip attempt in evtx zip: %s", member.filename) | ||
| continue | ||
| zf.extract(member, subdir) | ||
| except Exception as e: | ||
| log.debug("Failed to extract %s: %s", evtx_zip, e) | ||
|
|
||
| # Defense-in-depth: check for symlinks after extraction | ||
| for root, dirs, files in os.walk(tmpdir): | ||
| for name in files + dirs: | ||
| if os.path.islink(os.path.join(root, name)): | ||
| log.warning("Symlink found in evtx.zip: %s", name) | ||
| log.warning("Symlink found in evtx: %s", name) | ||
| return None | ||
|
|
||
| evtx_files = [] | ||
|
|
@@ -566,6 +584,80 @@ def _run_evtx(self, zircolite_path, rulesets, timeout): | |
| log.debug("No .evtx files found in archive") | ||
| return None | ||
|
|
||
| # Filter analyzer noise: convert evtx to JSONL via evtx_dump, | ||
| # strip events from the CAPE analyzer parent process, and | ||
| # feed clean JSONL to zircolite. | ||
| evtx_dump_bin = self.options.get("evtx_dump_bin", "/usr/local/bin/evtx_dump") | ||
| # Load analyzer noise filter from shared config | ||
| analyzer_exclude = set() | ||
| try: | ||
| filters_path = self.options.get("filters", "data/sigma/filters.json") | ||
| filters_local = self.options.get("filters_local", "data/sigma/filters_local.json") | ||
| for fp in [filters_path, filters_local]: | ||
| if fp and not os.path.isabs(fp): | ||
| fp = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), fp) | ||
| if fp and os.path.exists(fp): | ||
| with open(fp) as _f: | ||
| _data = json.load(_f) | ||
| _pf = _data.get("pre_filters", {}) | ||
| for p in _pf.get("exclude_parent_processes", []): | ||
| analyzer_exclude.add(p.lower()) | ||
| for p in _pf.get("exclude_image_processes", []): | ||
| analyzer_exclude.add(p.lower()) | ||
| for p in _pf.get("exclude_target_paths", []): | ||
| analyzer_exclude.add(p.lower()) | ||
| except Exception: | ||
| pass | ||
|
Comment on lines
+593
to
+610
|
||
| if not analyzer_exclude: | ||
| analyzer_exclude = {"icacls.exe", "python.exe", "wevtutil.exe", "conhost.exe"} | ||
| # Compile a single regex for efficient matching | ||
| exclude_re = re.compile("|".join(re.escape(p) for p in analyzer_exclude), re.IGNORECASE) | ||
|
|
||
| if os.path.isfile(evtx_dump_bin): | ||
| filtered_dir = os.path.join(tmpdir, "filtered") | ||
| os.makedirs(filtered_dir, exist_ok=True) | ||
| has_filtered = False | ||
| for evtx_file in evtx_files: | ||
| try: | ||
| # Use unique name per snapshot to avoid collisions | ||
| rel_path = os.path.relpath(evtx_file, tmpdir) | ||
| basename = rel_path.replace(os.sep, "_").rsplit(".", 1)[0] + ".json" | ||
| jsonl_path = os.path.join(filtered_dir, basename) | ||
| # Stream output line-by-line to avoid loading all into memory | ||
| proc = subprocess.Popen( | ||
| [evtx_dump_bin, "-o", "jsonl", evtx_file], | ||
| stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, | ||
| ) | ||
|
Comment on lines
+620
to
+630
|
||
| with open(jsonl_path, "w") as out: | ||
| for line in proc.stdout: | ||
| line = line.rstrip("\n") | ||
| if not line.strip(): | ||
| continue | ||
| # Check specific JSON fields rather than substring on whole line | ||
| try: | ||
| evt = json.loads(line) | ||
| event_data = evt.get("Event", {}).get("EventData", {}) | ||
| image = str(event_data.get("Image", "")) | ||
| parent = str(event_data.get("ParentImage", "")) | ||
| target = str(event_data.get("TargetFilename", "")) | ||
| if exclude_re.search(image) or exclude_re.search(parent) or exclude_re.search(target): | ||
| continue | ||
| except (json.JSONDecodeError, AttributeError): | ||
| pass | ||
| out.write(line + "\n") | ||
| proc.wait(timeout=120) | ||
| if os.path.getsize(jsonl_path) > 0: | ||
| has_filtered = True | ||
| except Exception as e: | ||
| log.debug("evtx_dump filter failed for %s: %s", evtx_file, e) | ||
|
|
||
| if has_filtered: | ||
| return self._run_zircolite( | ||
| zircolite_path, rulesets, timeout, filtered_dir, | ||
| extra_args=["--jsonl"] | ||
| ) | ||
| log.debug("evtx_dump filtering produced no output, falling back to raw evtx") | ||
|
|
||
| return self._run_zircolite(zircolite_path, rulesets, timeout, tmpdir) | ||
| except Exception as e: | ||
| log.warning("EVTX sigma scan failed: %s", e) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
real_tmpdiris assigned but never used after the refactor to extract multiple zip files. Removing it will avoid dead code (and potential lint failures).