diff --git a/CHANGELOG.md b/CHANGELOG.md index db5fe728ea..38d766ba58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ ### Development +- rules: prune rules with incompatible OS/arch/format requirements before analysis to skip them across all scopes #2127 - doc: document that default output shows top-level matches only; -v/-vv show nested matches @devs6186 #1410 - doc: fix typo in usage.md, add documentation links to README @devs6186 #2274 - doc: add table comparing ways to consume capa output (CLI, IDA, Ghidra, dynamic sandbox, web) @devs6186 #2273 diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index 3c98a16caa..1ae6727357 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -281,6 +281,16 @@ def find_dynamic_capabilities( # Tuples are immutable, so `t += (x,)` copies the entire tuple each time. process_feature_counts: list[rdoc.ProcessFeatureCount] = [] + # Prune rules that cannot match this binary's global features (OS, arch, format) + # once, before the per-process matching loop. This eliminates rules that could + # never match — e.g. Windows-specific rules when analysing a Linux trace — from all + # per-process, per-thread, and per-call evaluations. + # See: https://github.com/mandiant/capa/issues/2127 + global_features: FeatureSet = collections.defaultdict(set) + for feature, addr in extractor.extract_global_features(): + global_features[feature].add(addr) + ruleset = ruleset.filter_rules_by_meta_features(global_features) + assert isinstance(extractor, DynamicFeatureExtractor) processes: list[ProcessHandle] = list(extractor.get_processes()) n_processes: int = len(processes) diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py index 893887f77b..c79dacb579 100644 --- a/capa/capabilities/static.py +++ b/capa/capabilities/static.py @@ -162,6 +162,16 @@ def find_static_capabilities( function_feature_counts: list[rdoc.FunctionFeatureCount] = [] library_functions_list: list[rdoc.LibraryFunction] = [] + # Prune rules that cannot match this binary's global features (OS, arch, format) + # once, before the per-function matching loop. This eliminates rules that could + # never match — e.g. Windows-specific rules when analysing a Linux ELF — from all + # per-function, per-basic-block, and per-instruction evaluations. + # See: https://github.com/mandiant/capa/issues/2127 + global_features: FeatureSet = collections.defaultdict(set) + for feature, addr in extractor.extract_global_features(): + global_features[feature].add(addr) + ruleset = ruleset.filter_rules_by_meta_features(global_features) + assert isinstance(extractor, StaticFeatureExtractor) functions: list[FunctionHandle] = list(extractor.get_functions()) n_funcs: int = len(functions) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 23fd0dd3c0..0afbeff821 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1442,6 +1442,12 @@ def __init__( self.rules = {rule.name: rule for rule in rules} self.rules_by_namespace = index_rules_by_namespace(rules) self.rules_by_scope = {scope: self._get_rules_for_scope(rules, scope) for scope in scopes} + self._dependencies_by_rule_name = { + rule.name: set(rule.get_dependencies(self.rules_by_namespace)) for rule in rules + } + self._rules_with_global_features = { + rule.name for rule in rules if self._statement_uses_global_features(rule.statement) + } # these structures are unstable and may change before the next major release. scores_by_rule: dict[str, int] = {} @@ -1920,6 +1926,184 @@ def filter_rules_by_meta(self, tag: str) -> "RuleSet": break return RuleSet(list(rules_filtered)) + @staticmethod + def _statement_uses_global_features(node: Union[Feature, Statement]) -> bool: + if isinstance(node, capa.features.common.Feature): + return capa.features.common.is_global_feature(node) + + return any(RuleSet._statement_uses_global_features(child) for child in node.get_children()) + + def _clone_with_rule_subset(self, rule_names: set[str]) -> "RuleSet": + """ + Return a shallow-then-filtered clone of this RuleSet restricted to ``rule_names``. + + All derived data structures are filtered so the hot ``_match`` path needs no runtime + guards to skip pruned rules. The cost — O(total index entries across all scopes) — is + paid once at analysis start rather than on every per-function / per-instruction call. + """ + clone = copy.copy(self) + + clone.rules = {name: self.rules[name] for name in self.rules if name in rule_names} + clone._rules_with_global_features = self._rules_with_global_features & rule_names + clone._dependencies_by_rule_name = { + name: deps for name, deps in self._dependencies_by_rule_name.items() if name in rule_names + } + + # Filter per-scope rule lists (preserves topological order; used by paranoid mode). + clone.rules_by_scope = { + scope: [rule for rule in scope_rules if rule.name in rule_names] + for scope, scope_rules in self.rules_by_scope.items() + } + + # Filter topological index: gaps in index values are fine because the values are only + # used as sort keys, and the relative order of surviving rules is already correct. + clone._rule_index_by_scope = { + scope: {name: idx for name, idx in rule_index.items() if name in rule_names} + for scope, rule_index in self._rule_index_by_scope.items() + } + + # Filter namespace index (values are lists of Rule objects, not strings). + clone.rules_by_namespace = { + namespace: [rule for rule in ns_rules if rule.name in rule_names] + for namespace, ns_rules in self.rules_by_namespace.items() + } + # Drop namespaces that became empty after pruning. + clone.rules_by_namespace = {ns: rules for ns, rules in clone.rules_by_namespace.items() if rules} + + # Filter feature indexes: remove pruned rule names from every set in rules_by_feature, + # and drop string/bytes scan entries for pruned rules. + clone._feature_indexes_by_scopes = {} + for scope, feature_index in self._feature_indexes_by_scopes.items(): + new_rules_by_feature: dict[Feature, set[str]] = {} + for feature, rule_set in feature_index.rules_by_feature.items(): + filtered_set = rule_set & rule_names + if filtered_set: + new_rules_by_feature[feature] = filtered_set + clone._feature_indexes_by_scopes[scope] = RuleSet._RuleFeatureIndex( + new_rules_by_feature, + {name: feats for name, feats in feature_index.string_rules.items() if name in rule_names}, + {name: feats for name, feats in feature_index.bytes_rules.items() if name in rule_names}, + ) + + return clone + + def filter_rules_by_meta_features(self, features: FeatureSet) -> "RuleSet": + """ + Return a new RuleSet with rules removed whose global-feature requirements + cannot be satisfied by the binary under analysis. + + Global features — OS, architecture, and format — are determined once at the + start of analysis from the binary's headers. Any rule that requires, for + example, ``os: windows`` while we are analyzing a Linux ELF can never match + and is safe to discard before the per-function matching loop begins. + + The filtering is conservative: a rule is only removed when its global-feature + constraints are *provably* unsatisfiable. Rules with no global-feature + constraints, or with ``os: any``-style wildcards, are always kept. + + Rules that are kept as transitive dependencies of other kept rules are also + retained, so the returned RuleSet always satisfies internal dependency + invariants. + + Args: + features: the global FeatureSet for the binary (typically the output of + ``extractor.extract_global_features()``). + + Returns: + A new :class:`RuleSet` with incompatible rules removed, or *self* if + no rules were pruned. + """ + global_features: FeatureSet = { + feature: locations + for feature, locations in features.items() + if capa.features.common.is_global_feature(feature) + } + + if not global_features: + return self + + rules_with_global_features = getattr(self, "_rules_with_global_features", None) + if rules_with_global_features is None: + rules_with_global_features = { + rule.name for rule in self.rules.values() if self._statement_uses_global_features(rule.statement) + } + self._rules_with_global_features = rules_with_global_features + + if not rules_with_global_features: + return self + + def can_match(node) -> bool: + """ + Return True if *node* might be satisfiable given the known global features. + Returns False only when provably unsatisfiable. + """ + if isinstance(node, capa.features.common.Feature): + if capa.features.common.is_global_feature(node): + return bool(node.evaluate(global_features)) + return True + + if isinstance(node, ceng.Not): + return True + + if isinstance(node, ceng.And): + return all(can_match(child) for child in node.children) + + if isinstance(node, ceng.Or): + return any(can_match(child) for child in node.children) + + if isinstance(node, ceng.Some): + if node.count == 0: + return True + return sum(1 for child in node.children if can_match(child)) >= node.count + + if isinstance(node, ceng.Range): + if node.min == 0: + return True + return can_match(node.child) + + return True + + compatible_rule_names = set(self.rules) - rules_with_global_features + compatible_rule_names.update( + rule_name for rule_name in rules_with_global_features if can_match(self.rules[rule_name].statement) + ) + + if len(compatible_rule_names) == len(self.rules): + return self + + # Collect the surviving rules plus all of their transitive dependencies + # to ensure RuleSet dependency invariants are maintained. + dependencies_by_rule_name = getattr(self, "_dependencies_by_rule_name", None) + if dependencies_by_rule_name is None: + dependencies_by_rule_name = { + rule.name: set(rule.get_dependencies(self.rules_by_namespace)) for rule in self.rules.values() + } + self._dependencies_by_rule_name = dependencies_by_rule_name + + rules_to_keep: set[str] = set(compatible_rule_names) + stack: list[str] = list(compatible_rule_names) + while stack: + rule_name = stack.pop() + for dependency_name in dependencies_by_rule_name.get(rule_name, ()): + if dependency_name not in self.rules: + continue + if dependency_name in rules_to_keep: + continue + rules_to_keep.add(dependency_name) + stack.append(dependency_name) + + pruned_count = len(self.rules) - len(rules_to_keep) + if pruned_count == 0: + return self + + logger.debug( + "pruned %d rules incompatible with global features (%s)", + pruned_count, + ", ".join(f"{f.name}: {f.value!r}" for f in global_features), + ) + + return self._clone_with_rule_subset(rules_to_keep) + # this routine is unstable and may change before the next major release. @staticmethod def _sort_rules_by_index(rule_index_by_rule_name: dict[str, int], rules: list[Rule]): @@ -2093,6 +2277,9 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea new_candidates.extend(feature_index.rules_by_feature.get(new_feature, ())) if new_candidates: + new_candidates = [ + rule_name for rule_name in new_candidates if rule_name not in candidate_rule_names + ] candidate_rule_names.update(new_candidates) candidate_rules.extend([self.rules[rule_name] for rule_name in new_candidates]) RuleSet._sort_rules_by_index(rule_index_by_rule_name, candidate_rules) diff --git a/tests/test_match.py b/tests/test_match.py index cfeeb7e9fc..34da77662b 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -21,7 +21,19 @@ import capa.features.insn import capa.features.common from capa.rules import Scope -from capa.features.common import OS, OS_ANY, OS_WINDOWS, String, MatchedRule +from capa.features.common import ( + OS, + OS_ANY, + OS_LINUX, + FORMAT_PE, + ARCH_AMD64, + OS_WINDOWS, + Arch, + Format, + String, + MatchedRule, +) +from capa.features.address import NO_ADDRESS def match(rules, features, va, scope=Scope.FUNCTION): @@ -816,3 +828,238 @@ def test_index_features_nested_unstable(): assert not index.string_rules assert not index.bytes_rules + + +def test_filter_rules_by_meta_features_prunes_incompatible_os(): + """Rules requiring a different OS than the binary's are removed from the RuleSet.""" + windows_rule = textwrap.dedent( + """ + rule: + meta: + name: windows only rule + scopes: + static: function + dynamic: process + features: + - and: + - os: windows + - api: CreateFile + """ + ) + linux_rule = textwrap.dedent( + """ + rule: + meta: + name: linux only rule + scopes: + static: function + dynamic: process + features: + - and: + - os: linux + - api: open + """ + ) + rr = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml(windows_rule), + capa.rules.Rule.from_yaml(linux_rule), + ] + ) + assert len(rr.rules) == 2 + + # When analyzing a Linux binary, windows-only rules are pruned + linux_features: capa.engine.FeatureSet = {OS(OS_LINUX): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(linux_features) + assert "linux only rule" in filtered.rules + assert "windows only rule" not in filtered.rules + + # When analyzing a Windows binary, linux-only rules are pruned + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "windows only rule" in filtered.rules + assert "linux only rule" not in filtered.rules + + +def test_filter_rules_by_meta_features_keeps_any_os(): + """Rules with os: any or no OS requirement are kept regardless of binary OS.""" + any_os_rule = textwrap.dedent( + """ + rule: + meta: + name: any os rule + scopes: + static: function + dynamic: process + features: + - and: + - os: any + - api: malloc + """ + ) + no_os_rule = textwrap.dedent( + """ + rule: + meta: + name: no os rule + scopes: + static: function + dynamic: process + features: + - api: calloc + """ + ) + rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(any_os_rule), capa.rules.Rule.from_yaml(no_os_rule)]) + + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "any os rule" in filtered.rules + assert "no os rule" in filtered.rules + + linux_features: capa.engine.FeatureSet = {OS(OS_LINUX): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(linux_features) + assert "any os rule" in filtered.rules + assert "no os rule" in filtered.rules + + +def test_filter_rules_by_meta_features_prunes_unreachable_some_count(): + """Rules with an unsatisfiable Some-count over global features are pruned.""" + unreachable_some_rule = textwrap.dedent( + """ + rule: + meta: + name: unreachable some rule + scopes: + static: function + dynamic: process + features: + - 3 or more: + - os: windows + - os: linux + - api: CreateFile + """ + ) + baseline_rule = textwrap.dedent( + """ + rule: + meta: + name: baseline rule + scopes: + static: function + dynamic: process + features: + - api: ReadFile + """ + ) + rr = capa.rules.RuleSet( + [capa.rules.Rule.from_yaml(unreachable_some_rule), capa.rules.Rule.from_yaml(baseline_rule)] + ) + + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "unreachable some rule" not in filtered.rules + assert "baseline rule" in filtered.rules + + +def test_filter_rules_by_meta_features_keeps_reachable_some_count(): + """Rules with a satisfiable Some-count over global features are kept.""" + reachable_some_rule = textwrap.dedent( + """ + rule: + meta: + name: reachable some rule + scopes: + static: function + dynamic: process + features: + - 2 or more: + - os: windows + - os: linux + - api: CreateFile + """ + ) + rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(reachable_some_rule)]) + + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "reachable some rule" in filtered.rules + + +def test_filter_rules_by_meta_features_prunes_incompatible_arch_and_format(): + """Rules with incompatible arch/format are pruned while compatible rules are kept.""" + incompatible_rule = textwrap.dedent( + """ + rule: + meta: + name: incompatible arch format rule + scopes: + static: function + dynamic: process + features: + - and: + - arch: i386 + - format: elf + - api: open + """ + ) + compatible_rule = textwrap.dedent( + """ + rule: + meta: + name: compatible arch format rule + scopes: + static: function + dynamic: process + features: + - and: + - arch: amd64 + - format: pe + - api: CreateFile + """ + ) + rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(incompatible_rule), capa.rules.Rule.from_yaml(compatible_rule)]) + + features: capa.engine.FeatureSet = {Arch(ARCH_AMD64): {NO_ADDRESS}, Format(FORMAT_PE): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(features) + assert "incompatible arch format rule" not in filtered.rules + assert "compatible arch format rule" in filtered.rules + + +def test_filter_rules_by_meta_features_keeps_dependencies_of_surviving_rules(): + """Dependencies of compatible rules are retained even if dependency globals are incompatible.""" + dependency_rule = textwrap.dedent( + """ + rule: + meta: + name: linux dependency rule + scopes: + static: function + dynamic: process + features: + - and: + - os: linux + - api: open + """ + ) + parent_rule = textwrap.dedent( + """ + rule: + meta: + name: windows parent rule + scopes: + static: function + dynamic: process + features: + - and: + - os: windows + - or: + - api: CreateFile + - match: linux dependency rule + """ + ) + rr = capa.rules.RuleSet([capa.rules.Rule.from_yaml(dependency_rule), capa.rules.Rule.from_yaml(parent_rule)]) + + windows_features: capa.engine.FeatureSet = {OS(OS_WINDOWS): {NO_ADDRESS}} + filtered = rr.filter_rules_by_meta_features(windows_features) + assert "windows parent rule" in filtered.rules + assert "linux dependency rule" in filtered.rules