From 8d16661cbee82b6d8d617f65bc96c830d9719d0f Mon Sep 17 00:00:00 2001 From: devs6186 Date: Sun, 15 Mar 2026 19:57:24 +0530 Subject: [PATCH 1/4] rules: filter rules with incompatible global features before analysis Add RuleSet.filter_rules_by_meta_features() which walks each rule's statement tree and removes rules whose OS, architecture, or format requirements are provably unsatisfiable given the binary's global features. The check is conservative: rules with no global-feature constraints or os: any are always kept; only rules that explicitly require a different platform are dropped. Transitive dependencies of surviving rules are preserved to maintain RuleSet invariants. Call the new method once at the start of find_static_capabilities() and find_dynamic_capabilities(), before the per-function/per-process loops, so that incompatible rules are excluded from every subsequent scope evaluation. Adds two tests: one verifying OS-specific rules are pruned/kept correctly, and one verifying cross-platform rules are never pruned. Closes: https://github.com/mandiant/capa/issues/2127 --- capa/capabilities/dynamic.py | 10 ++++ capa/capabilities/static.py | 10 ++++ capa/rules/__init__.py | 88 ++++++++++++++++++++++++++++++++++++ tests/test_match.py | 74 +++++++++++++++++++++++++++++- 4 files changed, 181 insertions(+), 1 deletion(-) diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index a84f5d3f78..1e3b84396b 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -279,6 +279,16 @@ def find_dynamic_capabilities( feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) + # Prune rules that cannot match this binary's global features (OS, arch, format) + # once, before the per-process matching loop. This eliminates rules that could + # never match — e.g. Windows-specific rules when analysing a Linux trace — from all + # per-process, per-thread, and per-call evaluations. + # See: https://github.com/mandiant/capa/issues/2127 + global_features: FeatureSet = collections.defaultdict(set) + for feature, addr in extractor.extract_global_features(): + global_features[feature].add(addr) + ruleset = ruleset.filter_rules_by_meta_features(global_features) + assert isinstance(extractor, DynamicFeatureExtractor) processes: list[ProcessHandle] = list(extractor.get_processes()) n_processes: int = len(processes) diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py index d485aa48c7..954cd4733e 100644 --- a/capa/capabilities/static.py +++ b/capa/capabilities/static.py @@ -159,6 +159,16 @@ def find_static_capabilities( feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) library_functions: tuple[rdoc.LibraryFunction, ...] = () + # Prune rules that cannot match this binary's global features (OS, arch, format) + # once, before the per-function matching loop. This eliminates rules that could + # never match — e.g. Windows-specific rules when analysing a Linux ELF — from all + # per-function, per-basic-block, and per-instruction evaluations. + # See: https://github.com/mandiant/capa/issues/2127 + global_features: FeatureSet = collections.defaultdict(set) + for feature, addr in extractor.extract_global_features(): + global_features[feature].add(addr) + ruleset = ruleset.filter_rules_by_meta_features(global_features) + assert isinstance(extractor, StaticFeatureExtractor) functions: list[FunctionHandle] = list(extractor.get_functions()) n_funcs: int = len(functions) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index da0a7d0360..156ef6ec58 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1909,6 +1909,94 @@ def filter_rules_by_meta(self, tag: str) -> "RuleSet": break return RuleSet(list(rules_filtered)) + def filter_rules_by_meta_features(self, features: FeatureSet) -> "RuleSet": + """ + Return a new RuleSet with rules removed whose global-feature requirements + cannot be satisfied by the binary under analysis. + + Global features — OS, architecture, and format — are determined once at the + start of analysis from the binary's headers. Any rule that requires, for + example, ``os: windows`` while we are analyzing a Linux ELF can never match + and is safe to discard before the per-function matching loop begins. + + The filtering is conservative: a rule is only removed when its global-feature + constraints are *provably* unsatisfiable. Rules with no global-feature + constraints, or with ``os: any``-style wildcards, are always kept. + + Rules that are kept as transitive dependencies of other kept rules are also + retained, so the returned RuleSet always satisfies internal dependency + invariants. + + Args: + features: the global FeatureSet for the binary (typically the output of + ``extractor.extract_global_features()``). + + Returns: + A new :class:`RuleSet` with incompatible rules removed, or *self* if + no rules were pruned. + """ + global_features: FeatureSet = { + feature: locations + for feature, locations in features.items() + if capa.features.common.is_global_feature(feature) + } + + if not global_features: + return self + + def can_match(node) -> bool: + """ + Return True if *node* might be satisfiable given the known global features. + Returns False only when provably unsatisfiable. + """ + if isinstance(node, capa.features.common.Feature): + if capa.features.common.is_global_feature(node): + return bool(node.evaluate(global_features)) + return True + + if isinstance(node, ceng.Not): + return True + + if isinstance(node, ceng.And): + return all(can_match(child) for child in node.children) + + if isinstance(node, (ceng.Or, ceng.Some)): + if isinstance(node, ceng.Some) and node.count == 0: + return True + return any(can_match(child) for child in node.children) + + if isinstance(node, ceng.Range): + if node.min == 0: + return True + return can_match(node.child) + + return True + + compatible_rule_names = {rule.name for rule in self.rules.values() if can_match(rule.statement)} + + if len(compatible_rule_names) == len(self.rules): + return self + + # Collect the surviving rules plus all of their transitive dependencies + # to ensure RuleSet dependency invariants are maintained. + all_rules = list(self.rules.values()) + rules_to_keep: set[str] = set() + for rule_name in compatible_rule_names: + rules_to_keep.update(r.name for r in get_rules_and_dependencies(all_rules, rule_name)) + + pruned_count = len(self.rules) - len(rules_to_keep) + if pruned_count == 0: + return self + + logger.debug( + "pruned %d rules incompatible with global features (%s)", + pruned_count, + ", ".join(f"{f.name}: {f.value}" for f in global_features), + ) + + surviving_rules = [self.rules[name] for name in rules_to_keep] + return RuleSet(surviving_rules) + # this routine is unstable and may change before the next major release. @staticmethod def _sort_rules_by_index(rule_index_by_rule_name: dict[str, int], rules: list[Rule]): diff --git a/tests/test_match.py b/tests/test_match.py index 9e763bbc82..642955f98e 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -21,7 +21,7 @@ import capa.features.insn import capa.features.common from capa.rules import Scope -from capa.features.common import OS, OS_ANY, OS_WINDOWS, String, MatchedRule +from capa.features.common import OS, OS_ANY, OS_LINUX, OS_WINDOWS, String, MatchedRule def match(rules, features, va, scope=Scope.FUNCTION): @@ -887,3 +887,75 @@ def test_index_features_nested_unstable(): assert not index.string_rules assert not index.bytes_rules + + +def test_filter_rules_by_meta_features_prunes_incompatible_os(): + """Rules requiring a different OS than the binary's are removed from the RuleSet.""" + windows_rule = textwrap.dedent( + """ + rule: + meta: + name: windows only rule + scopes: + static: function + dynamic: process + features: + - and: + - os: windows + - api: CreateFile + """ + ) + linux_rule = textwrap.dedent( + """ + rule: + meta: + name: linux only rule + scopes: + static: function + dynamic: process + features: + - and: + - os: linux + - api: open + """ + ) + rr = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml(windows_rule), + capa.rules.Rule.from_yaml(linux_rule), + ] + ) + assert len(rr.rules) == 2 + + # When analyzing a Linux binary, windows-only rules are pruned + linux_features = {OS(OS_LINUX): {0x0}} + filtered = rr.filter_rules_by_meta_features(linux_features) + assert "linux only rule" in filtered.rules + assert "windows only rule" not in filtered.rules + + # When analyzing a Windows binary, linux-only rules are pruned + windows_features = {OS(OS_WINDOWS): {0x0}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "windows only rule" in filtered.rules + assert "linux only rule" not in filtered.rules + + +def test_filter_rules_by_meta_features_keeps_any_os(): + """Rules with os: any or no OS requirement are kept regardless of binary OS.""" + any_os_rule = textwrap.dedent( + """ + rule: + meta: + name: cross-platform rule + scopes: + static: function + dynamic: process + features: + - api: malloc + """ + ) + rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(any_os_rule)]) + + windows_features = {OS(OS_WINDOWS): {0x0}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "cross-platform rule" in filtered.rules From 927fd07e4c019c4a18e5954192e82f56ad8f351b Mon Sep 17 00:00:00 2001 From: devs6186 Date: Sun, 15 Mar 2026 20:01:00 +0530 Subject: [PATCH 2/4] rules: filter rules with incompatible global features before analysis Closes #2127 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c35033d780..0cef84b909 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ ### Development +- rules: prune rules with incompatible OS/arch/format requirements before analysis to skip them across all scopes #2127 - ci: deprecate macos-13 runner and use Python v3.13 for testing @mike-hunhoff #2777 ### Raw diffs From 9d039059b960c90f313187baade0ff8f71f6df45 Mon Sep 17 00:00:00 2001 From: devs6186 Date: Mon, 16 Mar 2026 07:38:23 +0530 Subject: [PATCH 3/4] rules: tighten meta-feature pruning logic and tests --- capa/rules/__init__.py | 79 ++++++++++++++--- tests/test_match.py | 191 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 250 insertions(+), 20 deletions(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 156ef6ec58..1fdc6335ff 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1440,6 +1440,12 @@ def __init__( self.rules = {rule.name: rule for rule in rules} self.rules_by_namespace = index_rules_by_namespace(rules) self.rules_by_scope = {scope: self._get_rules_for_scope(rules, scope) for scope in scopes} + self._dependencies_by_rule_name = { + rule.name: set(rule.get_dependencies(self.rules_by_namespace)) for rule in rules + } + self._rules_with_global_features = { + rule.name for rule in rules if self._statement_uses_global_features(rule.statement) + } # these structures are unstable and may change before the next major release. scores_by_rule: dict[str, int] = {} @@ -1909,6 +1915,19 @@ def filter_rules_by_meta(self, tag: str) -> "RuleSet": break return RuleSet(list(rules_filtered)) + @staticmethod + def _statement_uses_global_features(node: Union[Feature, Statement]) -> bool: + if isinstance(node, capa.features.common.Feature): + return capa.features.common.is_global_feature(node) + + return any(RuleSet._statement_uses_global_features(child) for child in node.get_children()) + + def _clone_with_rule_subset(self, rule_names: set[str]) -> "RuleSet": + clone = copy.copy(self) + clone.rules = {name: self.rules[name] for name in self.rules if name in rule_names} + clone._rules_with_global_features = self._rules_with_global_features & rule_names + return clone + def filter_rules_by_meta_features(self, features: FeatureSet) -> "RuleSet": """ Return a new RuleSet with rules removed whose global-feature requirements @@ -1944,6 +1963,16 @@ def filter_rules_by_meta_features(self, features: FeatureSet) -> "RuleSet": if not global_features: return self + rules_with_global_features = getattr(self, "_rules_with_global_features", None) + if rules_with_global_features is None: + rules_with_global_features = { + rule.name for rule in self.rules.values() if self._statement_uses_global_features(rule.statement) + } + self._rules_with_global_features = rules_with_global_features + + if not rules_with_global_features: + return self + def can_match(node) -> bool: """ Return True if *node* might be satisfiable given the known global features. @@ -1960,11 +1989,14 @@ def can_match(node) -> bool: if isinstance(node, ceng.And): return all(can_match(child) for child in node.children) - if isinstance(node, (ceng.Or, ceng.Some)): - if isinstance(node, ceng.Some) and node.count == 0: - return True + if isinstance(node, ceng.Or): return any(can_match(child) for child in node.children) + if isinstance(node, ceng.Some): + if node.count == 0: + return True + return sum(1 for child in node.children if can_match(child)) >= node.count + if isinstance(node, ceng.Range): if node.min == 0: return True @@ -1972,17 +2004,34 @@ def can_match(node) -> bool: return True - compatible_rule_names = {rule.name for rule in self.rules.values() if can_match(rule.statement)} + compatible_rule_names = set(self.rules) - rules_with_global_features + compatible_rule_names.update( + rule_name for rule_name in rules_with_global_features if can_match(self.rules[rule_name].statement) + ) if len(compatible_rule_names) == len(self.rules): return self # Collect the surviving rules plus all of their transitive dependencies # to ensure RuleSet dependency invariants are maintained. - all_rules = list(self.rules.values()) - rules_to_keep: set[str] = set() - for rule_name in compatible_rule_names: - rules_to_keep.update(r.name for r in get_rules_and_dependencies(all_rules, rule_name)) + dependencies_by_rule_name = getattr(self, "_dependencies_by_rule_name", None) + if dependencies_by_rule_name is None: + dependencies_by_rule_name = { + rule.name: set(rule.get_dependencies(self.rules_by_namespace)) for rule in self.rules.values() + } + self._dependencies_by_rule_name = dependencies_by_rule_name + + rules_to_keep: set[str] = set(compatible_rule_names) + stack: list[str] = list(compatible_rule_names) + while stack: + rule_name = stack.pop() + for dependency_name in dependencies_by_rule_name.get(rule_name, ()): + if dependency_name not in self.rules: + continue + if dependency_name in rules_to_keep: + continue + rules_to_keep.add(dependency_name) + stack.append(dependency_name) pruned_count = len(self.rules) - len(rules_to_keep) if pruned_count == 0: @@ -1991,11 +2040,10 @@ def can_match(node) -> bool: logger.debug( "pruned %d rules incompatible with global features (%s)", pruned_count, - ", ".join(f"{f.name}: {f.value}" for f in global_features), + ", ".join(f"{f.name}: {f.value!r}" for f in global_features), ) - surviving_rules = [self.rules[name] for name in rules_to_keep] - return RuleSet(surviving_rules) + return self._clone_with_rule_subset(rules_to_keep) # this routine is unstable and may change before the next major release. @staticmethod @@ -2109,6 +2157,8 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea if wanted_bytes.evaluate(bytes_features): candidate_rule_names.add(rule_name) + candidate_rule_names.intersection_update(self.rules) + # No rules can possibly match, so quickly return. if not candidate_rule_names: return (features, {}) @@ -2168,6 +2218,11 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea new_candidates.extend(feature_index.rules_by_feature.get(new_feature, ())) if new_candidates: + new_candidates = [ + rule_name + for rule_name in new_candidates + if rule_name in self.rules and rule_name not in candidate_rule_names + ] candidate_rule_names.update(new_candidates) candidate_rules.extend([self.rules[rule_name] for rule_name in new_candidates]) RuleSet._sort_rules_by_index(rule_index_by_rule_name, candidate_rules) @@ -2198,7 +2253,7 @@ def match( features, matches = self._match(scope, features, addr) if paranoid: - rules: list[Rule] = self.rules_by_scope[scope] + rules: list[Rule] = [rule for rule in self.rules_by_scope[scope] if rule.name in self.rules] paranoid_features, paranoid_matches = capa.engine.match(rules, features, addr) if features != paranoid_features: diff --git a/tests/test_match.py b/tests/test_match.py index 642955f98e..f454d2c59c 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -21,7 +21,19 @@ import capa.features.insn import capa.features.common from capa.rules import Scope -from capa.features.common import OS, OS_ANY, OS_LINUX, OS_WINDOWS, String, MatchedRule +from capa.features.common import ( + OS, + OS_ANY, + OS_LINUX, + FORMAT_PE, + ARCH_AMD64, + OS_WINDOWS, + Arch, + Format, + String, + MatchedRule, +) +from capa.features.address import NO_ADDRESS def match(rules, features, va, scope=Scope.FUNCTION): @@ -928,13 +940,13 @@ def test_filter_rules_by_meta_features_prunes_incompatible_os(): assert len(rr.rules) == 2 # When analyzing a Linux binary, windows-only rules are pruned - linux_features = {OS(OS_LINUX): {0x0}} + linux_features: capa.engine.FeatureSet = {OS(OS_LINUX): {NO_ADDRESS}} filtered = rr.filter_rules_by_meta_features(linux_features) assert "linux only rule" in filtered.rules assert "windows only rule" not in filtered.rules # When analyzing a Windows binary, linux-only rules are pruned - windows_features = {OS(OS_WINDOWS): {0x0}} + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} filtered = rr.filter_rules_by_meta_features(windows_features) assert "windows only rule" in filtered.rules assert "linux only rule" not in filtered.rules @@ -946,16 +958,179 @@ def test_filter_rules_by_meta_features_keeps_any_os(): """ rule: meta: - name: cross-platform rule + name: any os rule scopes: static: function dynamic: process features: - - api: malloc + - and: + - os: any + - api: malloc + """ + ) + no_os_rule = textwrap.dedent( + """ + rule: + meta: + name: no os rule + scopes: + static: function + dynamic: process + features: + - api: calloc + """ + ) + rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(any_os_rule), capa.rules.Rule.from_yaml(no_os_rule)]) + + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "any os rule" in filtered.rules + assert "no os rule" in filtered.rules + + linux_features: capa.engine.FeatureSet = {OS(OS_LINUX): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(linux_features) + assert "any os rule" in filtered.rules + assert "no os rule" in filtered.rules + + +def test_filter_rules_by_meta_features_prunes_unreachable_some_count(): + """Rules with an unsatisfiable Some-count over global features are pruned.""" + unreachable_some_rule = textwrap.dedent( + """ + rule: + meta: + name: unreachable some rule + scopes: + static: function + dynamic: process + features: + - 3 or more: + - os: windows + - os: linux + - api: CreateFile + """ + ) + baseline_rule = textwrap.dedent( + """ + rule: + meta: + name: baseline rule + scopes: + static: function + dynamic: process + features: + - api: ReadFile + """ + ) + rr = capa.rules.RuleSet( + [capa.rules.Rule.from_yaml(unreachable_some_rule), capa.rules.Rule.from_yaml(baseline_rule)] + ) + + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "unreachable some rule" not in filtered.rules + assert "baseline rule" in filtered.rules + + +def test_filter_rules_by_meta_features_keeps_reachable_some_count(): + """Rules with a satisfiable Some-count over global features are kept.""" + reachable_some_rule = textwrap.dedent( + """ + rule: + meta: + name: reachable some rule + scopes: + static: function + dynamic: process + features: + - 2 or more: + - os: windows + - os: linux + - api: CreateFile + """ + ) + rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(reachable_some_rule)]) + + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "reachable some rule" in filtered.rules + + +def test_filter_rules_by_meta_features_prunes_incompatible_arch_and_format(): + """Rules with incompatible arch/format are pruned while compatible rules are kept.""" + incompatible_rule = textwrap.dedent( + """ + rule: + meta: + name: incompatible arch format rule + scopes: + static: function + dynamic: process + features: + - and: + - arch: i386 + - format: elf + - api: open + """ + ) + compatible_rule = textwrap.dedent( + """ + rule: + meta: + name: compatible arch format rule + scopes: + static: function + dynamic: process + features: + - and: + - arch: amd64 + - format: pe + - api: CreateFile + """ + ) + rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(incompatible_rule), capa.rules.Rule.from_yaml(compatible_rule)]) + + features: capa.engine.FeatureSet = {Arch(ARCH_AMD64): {NO_ADDRESS}, Format(FORMAT_PE): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(features) + assert "incompatible arch format rule" not in filtered.rules + assert "compatible arch format rule" in filtered.rules + + +def test_filter_rules_by_meta_features_keeps_dependencies_of_surviving_rules(): + """Dependencies of compatible rules are retained even if dependency globals are incompatible.""" + dependency_rule = textwrap.dedent( + """ + rule: + meta: + name: linux dependency rule + scopes: + static: function + dynamic: process + features: + - and: + - os: linux + - api: open + """ + ) + parent_rule = textwrap.dedent( + """ + rule: + meta: + name: windows parent rule + scopes: + static: function + dynamic: process + features: + - and: + - os: windows + - or: + - api: CreateFile + - match: linux dependency rule """ ) - rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(any_os_rule)]) + rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(dependency_rule), capa.rules.Rule.from_yaml(parent_rule)]) - windows_features = {OS(OS_WINDOWS): {0x0}} + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} filtered = rr.filter_rules_by_meta_features(windows_features) - assert "cross-platform rule" in filtered.rules + assert "windows parent rule" in filtered.rules + assert "linux dependency rule" in filtered.rules From f2fdc76cffcae437d9bacc9da5b5167d64b8d397 Mon Sep 17 00:00:00 2001 From: devs6186 Date: Mon, 16 Mar 2026 13:09:59 +0530 Subject: [PATCH 4/4] rules: filter all derived indexes in _clone_with_rule_subset, remove _match guards MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously _clone_with_rule_subset did a shallow copy and relied on a candidate_rule_names.intersection_update(self.rules) guard in the hot _match path to exclude pruned rules. That guard ran on every scope evaluation (every function, basic-block, instruction call), adding per-call overhead that the benchmarks showed dominated any savings. This commit moves the filtering work to _clone_with_rule_subset where it belongs — paid once at analysis start, not millions of times. _clone_with_rule_subset now filters: - rules_by_scope (per-scope Rule lists) - _rule_index_by_scope (topological sort keys; gaps are fine) - rules_by_namespace (Rule lists per namespace; empty entries dropped) - _feature_indexes_by_scopes (rules_by_feature sets, string_rules, bytes_rules — all trimmed to survivors) - _dependencies_by_rule_name (dep sets for pruned rules dropped) With consistent indexes, _match no longer needs: - intersection_update(self.rules) after candidate collection - "rule_name in self.rules" guard during dependency expansion - list-comprehension filter in paranoid mode Cost: O(total feature-index entries across all scopes) — comparable to _index_rules_by_feature at init, paid once per binary analysis. --- capa/rules/__init__.py | 56 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index c474b8a428..0afbeff821 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1934,9 +1934,57 @@ def _statement_uses_global_features(node: Union[Feature, Statement]) -> bool: return any(RuleSet._statement_uses_global_features(child) for child in node.get_children()) def _clone_with_rule_subset(self, rule_names: set[str]) -> "RuleSet": + """ + Return a shallow-then-filtered clone of this RuleSet restricted to ``rule_names``. + + All derived data structures are filtered so the hot ``_match`` path needs no runtime + guards to skip pruned rules. The cost — O(total index entries across all scopes) — is + paid once at analysis start rather than on every per-function / per-instruction call. + """ clone = copy.copy(self) + clone.rules = {name: self.rules[name] for name in self.rules if name in rule_names} clone._rules_with_global_features = self._rules_with_global_features & rule_names + clone._dependencies_by_rule_name = { + name: deps for name, deps in self._dependencies_by_rule_name.items() if name in rule_names + } + + # Filter per-scope rule lists (preserves topological order; used by paranoid mode). + clone.rules_by_scope = { + scope: [rule for rule in scope_rules if rule.name in rule_names] + for scope, scope_rules in self.rules_by_scope.items() + } + + # Filter topological index: gaps in index values are fine because the values are only + # used as sort keys, and the relative order of surviving rules is already correct. + clone._rule_index_by_scope = { + scope: {name: idx for name, idx in rule_index.items() if name in rule_names} + for scope, rule_index in self._rule_index_by_scope.items() + } + + # Filter namespace index (values are lists of Rule objects, not strings). + clone.rules_by_namespace = { + namespace: [rule for rule in ns_rules if rule.name in rule_names] + for namespace, ns_rules in self.rules_by_namespace.items() + } + # Drop namespaces that became empty after pruning. + clone.rules_by_namespace = {ns: rules for ns, rules in clone.rules_by_namespace.items() if rules} + + # Filter feature indexes: remove pruned rule names from every set in rules_by_feature, + # and drop string/bytes scan entries for pruned rules. + clone._feature_indexes_by_scopes = {} + for scope, feature_index in self._feature_indexes_by_scopes.items(): + new_rules_by_feature: dict[Feature, set[str]] = {} + for feature, rule_set in feature_index.rules_by_feature.items(): + filtered_set = rule_set & rule_names + if filtered_set: + new_rules_by_feature[feature] = filtered_set + clone._feature_indexes_by_scopes[scope] = RuleSet._RuleFeatureIndex( + new_rules_by_feature, + {name: feats for name, feats in feature_index.string_rules.items() if name in rule_names}, + {name: feats for name, feats in feature_index.bytes_rules.items() if name in rule_names}, + ) + return clone def filter_rules_by_meta_features(self, features: FeatureSet) -> "RuleSet": @@ -2168,8 +2216,6 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea if wanted_bytes.evaluate(bytes_features): candidate_rule_names.add(rule_name) - candidate_rule_names.intersection_update(self.rules) - # No rules can possibly match, so quickly return. if not candidate_rule_names: return (features, {}) @@ -2232,9 +2278,7 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea if new_candidates: new_candidates = [ - rule_name - for rule_name in new_candidates - if rule_name in self.rules and rule_name not in candidate_rule_names + rule_name for rule_name in new_candidates if rule_name not in candidate_rule_names ] candidate_rule_names.update(new_candidates) candidate_rules.extend([self.rules[rule_name] for rule_name in new_candidates]) @@ -2267,7 +2311,7 @@ def match( features, matches = self._match(scope, features, addr) if paranoid: - rules: list[Rule] = [rule for rule in self.rules_by_scope[scope] if rule.name in self.rules] + rules: list[Rule] = self.rules_by_scope[scope] paranoid_features, paranoid_matches = capa.engine.match(rules, features, addr) if features != paranoid_features: