|
| 1 | +"""VC301 — Connector must define an author identity. |
| 2 | +
|
| 3 | +Uses AST to detect author identity definitions by looking for constructor |
| 4 | +calls (``Identity(...)``, ``OrganizationAuthor(...)``, ``stix2.Identity(...)``) |
| 5 | +and API calls (``helper.api.identity.create(...)``). |
| 6 | +
|
| 7 | +Import validation for bare ``Identity(...)`` calls is also AST-based: |
| 8 | +the call is only counted when ``Identity`` is imported from ``stix2`` |
| 9 | +or ``pycti`` (not some unrelated class). |
| 10 | +""" |
| 11 | + |
| 12 | +import ast |
| 13 | +from pathlib import Path |
| 14 | + |
| 15 | +from connector_linter.models import ( |
| 16 | + CheckFinding, |
| 17 | + ConnectorContext, |
| 18 | + Severity, |
| 19 | + no_python_sources_finding, |
| 20 | +) |
| 21 | +from connector_linter.registry import CheckRegistry |
| 22 | + |
| 23 | +# --------------------------------------------------------------------------- |
| 24 | +# Author-definition patterns (AST-based) |
| 25 | +# --------------------------------------------------------------------------- |
| 26 | + |
| 27 | +# Function/constructor names that unambiguously define an author |
| 28 | +_UNAMBIGUOUS_AUTHOR_CALLS = {"OrganizationAuthor"} |
| 29 | + |
| 30 | +# Import modules that confirm bare Identity() comes from stix2/pycti |
| 31 | +_IDENTITY_MODULES = {"stix2", "pycti"} |
| 32 | + |
| 33 | + |
| 34 | +def _has_identity_import(trees: dict[Path, ast.Module]) -> bool: |
| 35 | + """Check if any source file imports Identity from stix2 or pycti.""" |
| 36 | + for tree in trees.values(): |
| 37 | + for node in ast.walk(tree): |
| 38 | + if isinstance(node, ast.ImportFrom) and node.module: |
| 39 | + # from stix2[.xxx] import Identity / from pycti[.xxx] import Identity |
| 40 | + mod_root = node.module.split(".")[0] |
| 41 | + if mod_root in _IDENTITY_MODULES: |
| 42 | + for alias in node.names: |
| 43 | + if alias.name == "Identity": |
| 44 | + return True |
| 45 | + elif isinstance(node, ast.Import): |
| 46 | + # import stix2 (Identity accessed as stix2.Identity) |
| 47 | + for alias in node.names: |
| 48 | + if alias.name.split(".")[0] in _IDENTITY_MODULES: |
| 49 | + return True |
| 50 | + return False |
| 51 | + |
| 52 | + |
| 53 | +def _is_author_call(node: ast.Call, identity_imported: bool) -> bool: |
| 54 | + """Determine if an ast.Call node represents an author definition. |
| 55 | +
|
| 56 | + Recognized patterns: |
| 57 | + 1. OrganizationAuthor(...) — connectors-sdk |
| 58 | + 2. stix2.Identity(...) — qualified stix2 constructor |
| 59 | + 3. Identity(...) — bare, only if imported from stix2/pycti |
| 60 | + 4. *.api.identity.create(...) — legacy pycti API call |
| 61 | + """ |
| 62 | + func = node.func |
| 63 | + |
| 64 | + # Pattern 1 & 3: bare function call — OrganizationAuthor(...) or Identity(...) |
| 65 | + if isinstance(func, ast.Name): |
| 66 | + if func.id in _UNAMBIGUOUS_AUTHOR_CALLS: |
| 67 | + return True |
| 68 | + if func.id == "Identity" and identity_imported: |
| 69 | + return True |
| 70 | + |
| 71 | + # Patterns 2 & 4: attribute-based calls |
| 72 | + if isinstance(func, ast.Attribute): |
| 73 | + # Pattern 2: stix2.Identity(...) |
| 74 | + if func.attr == "Identity" and isinstance(func.value, ast.Name): |
| 75 | + if func.value.id == "stix2": |
| 76 | + return True |
| 77 | + |
| 78 | + # Pattern 4: *.api.identity.create(...) |
| 79 | + if ( |
| 80 | + func.attr == "create" |
| 81 | + and isinstance(func.value, ast.Attribute) |
| 82 | + and func.value.attr == "identity" |
| 83 | + and isinstance(func.value.value, ast.Attribute) |
| 84 | + and func.value.value.attr == "api" |
| 85 | + ): |
| 86 | + return True |
| 87 | + |
| 88 | + return False |
| 89 | + |
| 90 | + |
| 91 | +def find_author_definitions( |
| 92 | + sources: dict[Path, str], |
| 93 | + trees: dict[Path, ast.Module], |
| 94 | +) -> list[tuple[Path, int, str]]: |
| 95 | + """Find author definition locations using AST analysis. |
| 96 | +
|
| 97 | + Args: |
| 98 | + sources: Raw Python source content keyed by relative path (used for |
| 99 | + line-text extraction in findings). |
| 100 | + trees: Pre-parsed AST modules (e.g. ``ctx.python_trees``). Passing |
| 101 | + the cached property avoids redundant parsing across checks. |
| 102 | +
|
| 103 | + Returns: |
| 104 | + List of (file_path, line_number, matched_line_text). |
| 105 | + """ |
| 106 | + identity_imported = _has_identity_import(trees) |
| 107 | + |
| 108 | + hits: list[tuple[Path, int, str]] = [] |
| 109 | + for file_path, tree in trees.items(): |
| 110 | + content_lines = sources[file_path].splitlines() |
| 111 | + for node in ast.walk(tree): |
| 112 | + if isinstance(node, ast.Call) and _is_author_call(node, identity_imported): |
| 113 | + line_text = ( |
| 114 | + content_lines[node.lineno - 1].strip() |
| 115 | + if node.lineno <= len(content_lines) |
| 116 | + else "" |
| 117 | + ) |
| 118 | + hits.append((file_path, node.lineno, line_text)) |
| 119 | + |
| 120 | + return hits |
| 121 | + |
| 122 | + |
| 123 | +@CheckRegistry.register( |
| 124 | + code="VC301", |
| 125 | + name="author-defined", |
| 126 | + description="Connector must define an author identity", |
| 127 | + severity=Severity.ERROR, |
| 128 | +) |
| 129 | +def check_author_defined(ctx: ConnectorContext) -> list[CheckFinding]: |
| 130 | + """Check that the connector defines an author identity somewhere in its source.""" |
| 131 | + sources = ctx.python_sources |
| 132 | + |
| 133 | + if not sources: |
| 134 | + return [no_python_sources_finding()] |
| 135 | + |
| 136 | + hits = find_author_definitions(sources, ctx.python_trees) |
| 137 | + |
| 138 | + if hits: |
| 139 | + file_path, line, _ = hits[0] |
| 140 | + return [ |
| 141 | + CheckFinding( |
| 142 | + message="Author identity defined", |
| 143 | + severity=Severity.INFO, |
| 144 | + file_path=file_path, |
| 145 | + line=line, |
| 146 | + ), |
| 147 | + ] |
| 148 | + |
| 149 | + return [ |
| 150 | + CheckFinding( |
| 151 | + message="No author identity definition found in connector source", |
| 152 | + severity=Severity.ERROR, |
| 153 | + suggestion=( |
| 154 | + "Define an author using one of: " |
| 155 | + "stix2.Identity(name=..., identity_class='organization'), " |
| 156 | + "OrganizationAuthor(name=...), " |
| 157 | + "or self.helper.api.identity.create(type='Organization', name=...)" |
| 158 | + ), |
| 159 | + ), |
| 160 | + ] |
0 commit comments