From 3bd2a74a56cb2e2310cdc8921bfdea45ae9c2e55 Mon Sep 17 00:00:00 2001 From: Hugo DUPRAS Date: Thu, 23 Apr 2026 14:53:41 +0200 Subject: [PATCH 1/3] feat(connector-linter): add shared AST/regex helpers for VC3xx code checks --- .../checks/vc3xx_code/__init__.py | 1 + .../checks/vc3xx_code/_helpers.py | 403 ++++++++++++++++++ 2 files changed, 404 insertions(+) create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/__init__.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/_helpers.py diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/__init__.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/__init__.py new file mode 100644 index 00000000000..fc41ea17249 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/__init__.py @@ -0,0 +1 @@ +"""VC3xx — Code quality checks.""" diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/_helpers.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/_helpers.py new file mode 100644 index 00000000000..9f8536244c3 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/_helpers.py @@ -0,0 +1,403 @@ +"""Shared helpers for VC3xx code quality checks.""" + +import ast +import re +from dataclasses import dataclass +from pathlib import Path + +# --------------------------------------------------------------------------- +# Regex helpers (for simple text-pattern checks) +# --------------------------------------------------------------------------- + + +def find_pattern_locations( + sources: dict[Path, str], + patterns: list[re.Pattern], +) -> list[tuple[Path, int, str]]: + """Find all occurrences of any pattern in sources. + + Returns list of (file_path, line_number, matched_line). + """ + hits: list[tuple[Path, int, str]] = [] + for file_path, content in sources.items(): + # Scan line-by-line so we can report exact line numbers + lines = content.splitlines() + for line_no, line in enumerate(lines, start=1): + for pattern in patterns: + if pattern.search(line): + hits.append((file_path, line_no, line.strip())) + # One match per line is enough — avoids duplicate reports + # when multiple patterns match the same line + break + return hits + + +# --------------------------------------------------------------------------- +# AST helpers — structural analysis of Python source +# --------------------------------------------------------------------------- + + +@dataclass +class ImportInfo: + """Represents a single imported name from a source file.""" + + module: str # The source module, e.g. "connectors_sdk" or "pydantic_settings" + name: str # The imported symbol, e.g. "BaseConnectorSettings" + alias: str | None # The 'as' alias if present, e.g. "BCS" for `import ... as BCS` + file_path: Path # Which file contains this import + line: int # Line number for reporting + + +def find_imports( + trees: dict[Path, ast.Module], + module_pattern: str | None = None, + name_pattern: str | None = None, +) -> list[ImportInfo]: + """Find imports matching optional module and/or name patterns. + + Args: + trees: Parsed AST modules keyed by file path. + module_pattern: Regex to match the module (e.g. r"connectors_sdk"). + name_pattern: Regex to match the imported name (e.g. r"BaseConnectorSettings"). + + Returns: + List of ImportInfo for matching imports. + + """ + results: list[ImportInfo] = [] + # Pre-compile patterns for efficient repeated matching + mod_re = re.compile(module_pattern) if module_pattern else None + name_re = re.compile(name_pattern) if name_pattern else None + + for file_path, tree in trees.items(): + for node in ast.walk(tree): + # --------------------------------------------------------------------------- + # Handle `from import ` (ImportFrom) + # + # module_pattern filters the source module (e.g. "connectors_sdk") + # name_pattern filters the imported symbol (e.g. "BaseConnectorSettings") + # --------------------------------------------------------------------------- + if isinstance(node, ast.ImportFrom) and node.module: + if mod_re and not mod_re.search(node.module): + continue + for alias in node.names: + if name_re and not name_re.search(alias.name): + continue + results.append( + ImportInfo( + module=node.module, + name=alias.name, + alias=alias.asname, + file_path=file_path, + line=node.lineno, + ), + ) + # --------------------------------------------------------------------------- + # Handle `import ` (Import) + # + # For bare imports, both module and name are the full module name + # (e.g. `import stix2` → module="stix2", name="stix2"). + # Both module_pattern and name_pattern are checked against this name. + # --------------------------------------------------------------------------- + elif isinstance(node, ast.Import): + for alias in node.names: + module_name = alias.name + if mod_re and not mod_re.search(module_name): + continue + if name_re and not name_re.search(module_name): + continue + results.append( + ImportInfo( + module=module_name, + name=module_name, + alias=alias.asname, + file_path=file_path, + line=node.lineno, + ), + ) + return results + + +@dataclass +class ClassInfo: + """Represents a class definition found in source.""" + + name: str + # Base class names are stored unqualified (just the final name): + # class Foo(mod.BaseSettings) → bases = ["BaseSettings"] + # class Foo(BaseSettings) → bases = ["BaseSettings"] + bases: list[str] + file_path: Path + line: int + + +def find_classes( + trees: dict[Path, ast.Module], + base_name: str | None = None, +) -> list[ClassInfo]: + """Find class definitions, optionally filtering by base class name. + + Args: + trees: Parsed AST modules keyed by file path. + base_name: If provided, only return classes inheriting from this name. + + """ + results: list[ClassInfo] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.ClassDef): + continue + bases: list[str] = [] + for base in node.bases: + # ast.Name → direct reference: class Foo(BaseSettings) + if isinstance(base, ast.Name): + bases.append(base.id) + # ast.Attribute → qualified reference: class Foo(mod.BaseSettings) + # We only keep the final attr name for matching simplicity + elif isinstance(base, ast.Attribute): + bases.append(base.attr) + if base_name and base_name not in bases: + continue + results.append( + ClassInfo( + name=node.name, + bases=bases, + file_path=file_path, + line=node.lineno, + ), + ) + return results + + +@dataclass +class ExceptBlockInfo: + """Represents an except handler block found in source.""" + + exception_types: list[str] # e.g. ["ValueError", "TypeError"] + # The body (list of statements) is stored so downstream checks can + # analyze what happens inside the except block (e.g. logging calls) + body: list[ast.stmt] + file_path: Path + line: int + + +def find_except_blocks( + trees: dict[Path, ast.Module], +) -> list[ExceptBlockInfo]: + """Find all except handler blocks across source files.""" + results: list[ExceptBlockInfo] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.ExceptHandler): + continue + + exc_types: list[str] = [] + if node.type is not None: + # Single exception: except ValueError: + if isinstance(node.type, ast.Name): + exc_types.append(node.type.id) + # Tuple of exceptions: except (ValueError, TypeError): + elif isinstance(node.type, ast.Tuple): + for elt in node.type.elts: + if isinstance(elt, ast.Name): + exc_types.append(elt.id) + + results.append( + ExceptBlockInfo( + exception_types=exc_types, + body=node.body, + file_path=file_path, + line=node.lineno, + ), + ) + return results + + +@dataclass +class CallInfo: + """Represents a function/method call found in source.""" + + func_name: str # The method/function name, e.g. "error" or "check_max_tlp" + # The receiver (object the method is called on), reconstructed as a dotted + # string, e.g. "self.helper.connector_logger" for + # self.helper.connector_logger.error(). None for bare function calls. + receiver: str | None + file_path: Path + line: int + + +def find_calls_in_stmts( + stmts: list[ast.stmt], + file_path: Path, + func_names: set[str] | None = None, +) -> list[CallInfo]: + """Find function/method calls within a list of AST statements. + + Operates on a list of statements (not the whole tree) to support + scoped analysis — e.g. searching only inside an except block body. + + Args: + stmts: AST statement nodes to search. + func_names: If provided, only return calls matching these function names. + file_path: File path for reporting. + + """ + results: list[CallInfo] = [] + for stmt in stmts: + for node in ast.walk(stmt): + if not isinstance(node, ast.Call): + continue + + func_name: str | None = None + receiver: str | None = None + + # Bare function call: print_exc() + if isinstance(node.func, ast.Name): + func_name = node.func.id + # Method call: self.logger.error() → func_name="error" + elif isinstance(node.func, ast.Attribute): + func_name = node.func.attr + receiver = _unparse_receiver(node.func.value) + + if func_name is None: + continue + if func_names and func_name not in func_names: + continue + + results.append( + CallInfo( + func_name=func_name, + receiver=receiver, + file_path=file_path, + line=node.lineno, + ), + ) + return results + + +def _unparse_receiver(node: ast.expr) -> str: + """Unparse the receiver of a method call (e.g. self.helper.connector_logger). + + Recursively walks the dotted attribute chain: + self.helper.connector_logger → Name("self") . Attr("helper") . Attr("connector_logger") + producing the string "self.helper.connector_logger". + """ + # Base case: simple name like "self" or "logger" + if isinstance(node, ast.Name): + return node.id + # Recursive case: dotted attribute access (a.b.c → recurse on a.b, append .c) + if isinstance(node, ast.Attribute): + parent = _unparse_receiver(node.value) + return f"{parent}.{node.attr}" + # Fallback for complex expressions (e.g. function calls as receivers) + return "" + + +@dataclass +class FieldDefaultInfo: + """Represents a class field with a default value.""" + + class_name: str # The class containing this field (e.g. "ConnectorSettings") + field_name: str # The field name (e.g. "log_level") + default_value: str | None # Lowercased string of the default (e.g. "error") + file_path: Path + line: int + + +def find_field_defaults( + trees: dict[Path, ast.Module], + field_name: str, + class_base: str | None = None, +) -> list[FieldDefaultInfo]: + """Find class field assignments with defaults, optionally filtered by base class. + + Detects patterns like: + log_level: str = "error" + log_level: str = Field(default="error") + log_level: LogLevelType = LogLevelType.ERROR + log_level: ... = Field(default=LogLevelType.ERROR) + """ + results: list[FieldDefaultInfo] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.ClassDef): + continue + # If class_base is specified, only match classes inheriting from it + if class_base: + base_names = [ + ( + b.id + if isinstance(b, ast.Name) + else b.attr + if isinstance(b, ast.Attribute) + else "" + ) + for b in node.bases + ] + if class_base not in base_names: + continue + + # Only look at annotated assignments in the class body (not nested) + for stmt in node.body: + if isinstance(stmt, ast.AnnAssign) and isinstance( + stmt.target, + ast.Name, + ): + if stmt.target.id != field_name: + continue + # Extract the default from the 4 recognized patterns: + # 1. field: str = "error" (Constant) + # 2. field: X = LogLevelType.ERROR (Attribute/enum) + # 3. field: str = Field(default="error") (Call with kwarg) + # 4. field: str = Field("error") (Call with positional) + default_val = _extract_default_value(stmt.value) + results.append( + FieldDefaultInfo( + class_name=node.name, + field_name=field_name, + default_value=default_val, + file_path=file_path, + line=stmt.lineno, + ), + ) + return results + + +def _extract_default_value(node: ast.expr | None) -> str | None: + """Extract the default value from a field assignment or Field() call. + + Handles each AST node type that can represent a default value: + - ast.Constant → direct string literal (e.g. "error") + - ast.Attribute → enum member access (e.g. LogLevelType.ERROR) + - ast.Call → Pydantic Field() with default= kwarg or positional arg + """ + if node is None: + return None + + # Pattern 1 — Direct constant: log_level = "error" + if isinstance(node, ast.Constant) and isinstance(node.value, str): + return node.value.lower() + + # Pattern 2 — Enum access: log_level = LogLevelType.ERROR → "error" + if isinstance(node, ast.Attribute): + return node.attr.lower() + + # Pattern 3 & 4 — Pydantic Field() call + if isinstance(node, ast.Call): + func_name = "" + if isinstance(node.func, ast.Name): + func_name = node.func.id + elif isinstance(node.func, ast.Attribute): + func_name = node.func.attr + + if func_name == "Field": + # Pattern 3: Field(default="error") — keyword argument + for kw in node.keywords: + if kw.arg == "default": + # Recurse: the default value itself may be a Constant or Attribute + return _extract_default_value(kw.value) + # Pattern 4: Field("error") — positional first argument + if node.args: + return _extract_default_value(node.args[0]) + + return None From 22e0f89ab2aa89d0e516c89f15c7e88fe1de68f6 Mon Sep 17 00:00:00 2001 From: Hugo DUPRAS Date: Wed, 22 Apr 2026 17:28:19 +0200 Subject: [PATCH 2/3] feat(connector-linter): add VC3xx code checks --- .../connector_linter/checks/_helpers.py | 48 +++ .../checks/vc3xx_code/__init__.py | 28 +- .../checks/vc3xx_code/vc301_author_defined.py | 166 ++++++++ .../vc3xx_code/vc302_author_referenced.py | 117 ++++++ .../checks/vc3xx_code/vc303_connector_type.py | 392 ++++++++++++++++++ .../vc3xx_code/vc304_markings_checked.py | 133 ++++++ .../vc3xx_code/vc305_sdk_base_settings.py | 156 +++++++ .../vc3xx_code/vc306_log_level_default.py | 112 +++++ .../checks/vc3xx_code/vc307_except_logging.py | 109 +++++ .../checks/vc3xx_code/vc308_main_traceback.py | 219 ++++++++++ .../vc3xx_code/vc309_absolute_imports.py | 79 ++++ .../vc3xx_code/vc310_external_references.py | 191 +++++++++ .../checks/vc3xx_code/vc311_tlp_markings.py | 234 +++++++++++ .../checks/vc3xx_code/vc312_cleanup_bundle.py | 115 +++++ .../vc3xx_code/vc313_pycti_generate_id.py | 222 ++++++++++ .../vc3xx_code/vc314_auto_backpressure.py | 137 ++++++ .../checks/vc3xx_code/vc315_work_initiated.py | 79 ++++ .../checks/vc3xx_code/vc316_work_closed.py | 118 ++++++ .../vc317_initiate_work_conditional.py | 151 +++++++ .../checks/vc3xx_code/vc318_helper_listen.py | 103 +++++ .../checks/vc3xx_code/vc319_scope_fallback.py | 126 ++++++ .../vc3xx_code/vc320_tlp_access_control.py | 237 +++++++++++ .../vc3xx_code/vc321_playbook_compatible.py | 168 ++++++++ .../checks/vc3xx_code/vc322_former_bundle.py | 90 ++++ .../vc3xx_code/vc323_helper_listen_stream.py | 78 ++++ .../vc324_relationship_start_stop.py | 203 +++++++++ .../vc325_minimal_settings_tests.py | 247 +++++++++++ 27 files changed, 4057 insertions(+), 1 deletion(-) create mode 100644 shared/connector_linter/connector_linter/checks/_helpers.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc301_author_defined.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc302_author_referenced.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc303_connector_type.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc304_markings_checked.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc305_sdk_base_settings.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc306_log_level_default.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc307_except_logging.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc308_main_traceback.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc309_absolute_imports.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc310_external_references.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc311_tlp_markings.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc312_cleanup_bundle.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc313_pycti_generate_id.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc314_auto_backpressure.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc315_work_initiated.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc316_work_closed.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc317_initiate_work_conditional.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc318_helper_listen.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc319_scope_fallback.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc320_tlp_access_control.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc321_playbook_compatible.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc322_former_bundle.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc323_helper_listen_stream.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc324_relationship_start_stop.py create mode 100644 shared/connector_linter/connector_linter/checks/vc3xx_code/vc325_minimal_settings_tests.py diff --git a/shared/connector_linter/connector_linter/checks/_helpers.py b/shared/connector_linter/connector_linter/checks/_helpers.py new file mode 100644 index 00000000000..ccc1f3df1e1 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/_helpers.py @@ -0,0 +1,48 @@ +import ast +from pathlib import Path + +from connector_linter.models import ConnectorContext + +# --------------------------------------------------------------------------- +# Source reading +# --------------------------------------------------------------------------- + + +def read_all_python_sources(ctx: ConnectorContext) -> dict[Path, str]: + """Read all Python source files from the connector's src/ directory.""" + sources: dict[Path, str] = {} + # Convention: all connector Python code lives under /src/ + src_dir = ctx.path / "src" + if not src_dir.exists(): + return sources + for py_file in src_dir.rglob("*.py"): + # Key by relative path (from connector root) for portable reporting + rel_path = py_file.relative_to(ctx.path) + try: + # errors="replace" avoids UnicodeDecodeError on malformed files + sources[rel_path] = py_file.read_text(encoding="utf-8", errors="replace") + except OSError: + # Skip unreadable files (permissions, broken symlinks, etc.) + continue + return sources + + +# --------------------------------------------------------------------------- +# AST helpers — structural analysis of Python source +# --------------------------------------------------------------------------- + + +def parse_sources(sources: dict[Path, str]) -> dict[Path, ast.Module]: + """Parse all source files into AST modules. + + Files that fail to parse (syntax errors) are silently skipped. + """ + trees: dict[Path, ast.Module] = {} + for file_path, content in sources.items(): + try: + trees[file_path] = ast.parse(content, filename=str(file_path)) + except SyntaxError: + # Silently skip files with syntax errors — they can't be analyzed + # structurally, but other checks (regex-based) may still find issues. + continue + return trees diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/__init__.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/__init__.py index fc41ea17249..4811f8cdace 100644 --- a/shared/connector_linter/connector_linter/checks/vc3xx_code/__init__.py +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/__init__.py @@ -1 +1,27 @@ -"""VC3xx — Code quality checks.""" +"""VC3xx — Code quality checks. + +VC301: Connector must define an author identity. +VC302: Author must be referenced on STIX entities (created_by_ref). +VC303: CONNECTOR_TYPE must be defined in application code, not read from env. +VC304: Ensure TLP markings are checked (check_max_tlp). +VC305: Connector must implement Base Settings from connectors-sdk. +VC306: Connector log level should default to 'error'. +VC307: Except blocks should use error/warning logging, not debug/info. +VC308: Main entry point must use traceback for error handling. +VC309: Connector must use only absolute imports, no relative imports. +VC310: External references must not be added by default to non-Identity objects. +VC311: Connector should use TLP markings on entities with appropriate level. +VC312: send_stix2_bundle must use cleanup_inconsistent_bundle=True. +VC313: STIX SDO/SRO objects must use pycti.XXX.generate_id() for deterministic IDs. +VC314: External-import connectors must use schedule_process or schedule_iso. +VC315: Connector must call initiate_work before processing. +VC316: Connector must close work with to_processed after processing. +VC317: initiate_work should only be called when data is available. +VC318: Internal-enrichment connectors must use helper.listen(). +VC319: Enrichment connector must return original bundle when not in scope. +VC320: Enrichment connector must enforce TLP access control. +VC321: Enrichment connector must be playbook-compatible. +VC322: Enrichment connector must read data['stix_objects'] (former bundle). +VC323: Stream connectors must use helper.listen_stream(). +VC324: Relationship should not set both start_time and stop_time. +""" diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc301_author_defined.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc301_author_defined.py new file mode 100644 index 00000000000..fc6ed0a2ae7 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc301_author_defined.py @@ -0,0 +1,166 @@ +"""VC301 — Connector must define an author identity. + +Uses AST to detect author identity definitions by looking for constructor +calls (``Identity(...)``, ``OrganizationAuthor(...)``, ``stix2.Identity(...)``) +and API calls (``helper.api.identity.create(...)``). + +Import validation for bare ``Identity(...)`` calls is also AST-based: +the call is only counted when ``Identity`` is imported from ``stix2`` +or ``pycti`` (not some unrelated class). +""" + +import ast +from pathlib import Path + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Author-definition patterns (AST-based) +# --------------------------------------------------------------------------- + +# Function/constructor names that unambiguously define an author +_UNAMBIGUOUS_AUTHOR_CALLS = {"OrganizationAuthor"} + +# Import modules that confirm bare Identity() comes from stix2/pycti +_IDENTITY_MODULES = {"stix2", "pycti"} + + +def _has_identity_import(trees: dict[Path, ast.Module]) -> bool: + """Check if any source file imports Identity from stix2 or pycti.""" + for tree in trees.values(): + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom) and node.module: + # from stix2[.xxx] import Identity / from pycti[.xxx] import Identity + mod_root = node.module.split(".")[0] + if mod_root in _IDENTITY_MODULES: + for alias in node.names: + if alias.name == "Identity": + return True + elif isinstance(node, ast.Import): + # import stix2 (Identity accessed as stix2.Identity) + for alias in node.names: + if alias.name.split(".")[0] in _IDENTITY_MODULES: + return True + return False + + +def _is_author_call(node: ast.Call, identity_imported: bool) -> bool: + """Determine if an ast.Call node represents an author definition. + + Recognized patterns: + 1. OrganizationAuthor(...) — connectors-sdk + 2. stix2.Identity(...) — qualified stix2 constructor + 3. Identity(...) — bare, only if imported from stix2/pycti + 4. *.api.identity.create(...) — legacy pycti API call + """ + func = node.func + + # Pattern 1 & 3: bare function call — OrganizationAuthor(...) or Identity(...) + if isinstance(func, ast.Name): + if func.id in _UNAMBIGUOUS_AUTHOR_CALLS: + return True + if func.id == "Identity" and identity_imported: + return True + + # Patterns 2 & 4: attribute-based calls + if isinstance(func, ast.Attribute): + # Pattern 2: stix2.Identity(...) + if func.attr == "Identity" and isinstance(func.value, ast.Name): + if func.value.id == "stix2": + return True + + # Pattern 4: *.api.identity.create(...) + if ( + func.attr == "create" + and isinstance(func.value, ast.Attribute) + and func.value.attr == "identity" + and isinstance(func.value.value, ast.Attribute) + and func.value.value.attr == "api" + ): + return True + + return False + + +def find_author_definitions( + sources: dict[Path, str], + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int, str]]: + """Find author definition locations using AST analysis. + + Args: + sources: Raw Python source content keyed by relative path (used for + line-text extraction in findings). + trees: Pre-parsed AST modules (e.g. ``ctx.python_trees``). Passing + the cached property avoids redundant parsing across checks. + + Returns: + List of (file_path, line_number, matched_line_text). + """ + identity_imported = _has_identity_import(trees) + + hits: list[tuple[Path, int, str]] = [] + for file_path, tree in trees.items(): + content_lines = sources[file_path].splitlines() + for node in ast.walk(tree): + if isinstance(node, ast.Call) and _is_author_call(node, identity_imported): + line_text = ( + content_lines[node.lineno - 1].strip() + if node.lineno <= len(content_lines) + else "" + ) + hits.append((file_path, node.lineno, line_text)) + + return hits + + +@CheckRegistry.register( + code="VC301", + name="author-defined", + description="Connector must define an author identity", + severity=Severity.ERROR, + applicable_types={ + ConnectorType.INTERNAL_ENRICHMENT, + ConnectorType.EXTERNAL_IMPORT, + ConnectorType.INTERNAL_IMPORT_FILE, + }, +) +def check_author_defined(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector defines an author identity somewhere in its source.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + hits = find_author_definitions(sources, ctx.python_trees) + + if hits: + file_path, line, _ = hits[0] + return [ + CheckFinding( + message="Author identity defined", + severity=Severity.INFO, + file_path=file_path, + line=line, + ), + ] + + return [ + CheckFinding( + message="No author identity definition found in connector source", + severity=Severity.ERROR, + suggestion=( + "Define an author using one of: " + "stix2.Identity(name=..., identity_class='organization'), " + "OrganizationAuthor(name=...), " + "or self.helper.api.identity.create(type='Organization', name=...)" + ), + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc302_author_referenced.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc302_author_referenced.py new file mode 100644 index 00000000000..567b9c2f423 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc302_author_referenced.py @@ -0,0 +1,117 @@ +"""VC302 — Author must be referenced on STIX entities (created_by_ref). + +Uses AST to detect ``author=`` as a keyword argument in function calls +(avoids false positives from variable assignments like ``author = "John"``). +Also detects ``created_by_ref=`` and ``x_opencti_created_by_ref`` via AST. +""" + +import ast +from pathlib import Path + +from connector_linter.checks.vc3xx_code.vc301_author_defined import ( + find_author_definitions, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Keyword argument names that attach an author identity to STIX entities: +# +# created_by_ref — standard STIX 2.1 field (SDOs) +# author — connectors-sdk model parameter +# x_opencti_created_by_ref — OpenCTI custom property (used on observables/SCOs) +# --------------------------------------------------------------------------- +_AUTHOR_KWARGS = {"created_by_ref", "author", "x_opencti_created_by_ref"} + + +def _find_author_references( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int, str]]: + """Find keyword arguments that reference an author on STIX entities. + + Detects ``created_by_ref=``, ``author=``, and ``x_opencti_created_by_ref`` + as keyword arguments in function/constructor calls — not plain assignments. + """ + hits: list[tuple[Path, int, str]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + # Only check Call nodes — we want keyword arguments in function/ + # constructor calls, not standalone assignments like `author = "John"` + if not isinstance(node, ast.Call): + continue + for kw in node.keywords: + if kw.arg in _AUTHOR_KWARGS: + # Use the keyword node's own lineno when available; + # fall back to the call node's lineno otherwise + hits.append( + ( + file_path, + getattr(kw, "lineno", node.lineno) or node.lineno, + kw.arg, + ), + ) + return hits + + +@CheckRegistry.register( + code="VC302", + name="author-referenced-on-entities", + description="Author must be referenced on STIX entities (created_by_ref)", + severity=Severity.ERROR, + applicable_types={ + ConnectorType.INTERNAL_ENRICHMENT, + ConnectorType.EXTERNAL_IMPORT, + ConnectorType.INTERNAL_IMPORT_FILE, + }, +) +def check_author_referenced(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that created_by_ref or author= is used to attach author to entities.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + # Dependency: an author must be defined first (VC301) before it can be referenced. + # If no author definition exists, we fail with a clear message pointing to VC301. + author_hits = find_author_definitions(sources, ctx.python_trees) + if not author_hits: + return [ + CheckFinding( + message="No author defined — cannot reference author on entities", + severity=Severity.ERROR, + suggestion="Define an author first (see VC301), then use created_by_ref= on STIX objects", + ), + ] + + # Author exists — now check if it's actually referenced on STIX entities (AST-based) + trees = ctx.python_trees + ref_hits = _find_author_references(trees) + + if ref_hits: + file_path, line, _kwarg = ref_hits[0] + return [ + CheckFinding( + message=f"Author referenced on entities ({len(ref_hits)} occurrence(s) found)", + severity=Severity.INFO, + file_path=file_path, + line=line, + ), + ] + + return [ + CheckFinding( + message="Author is defined but never referenced on STIX entities", + severity=Severity.ERROR, + suggestion=( + "Use created_by_ref=self.author.id on SDOs, " + "or x_opencti_created_by_ref in custom_properties for observables, " + "or author= parameter when using connectors-sdk models" + ), + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc303_connector_type.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc303_connector_type.py new file mode 100644 index 00000000000..f0b8f35e027 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc303_connector_type.py @@ -0,0 +1,392 @@ +"""VC303 — CONNECTOR_TYPE must be defined in application code, not read from env.""" + +import ast +from dataclasses import dataclass +from pathlib import Path +from typing import cast + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Valid connector type strings — derived from ConnectorType enum so this set +# stays automatically in sync when new types are added. +# --------------------------------------------------------------------------- +_CONNECTOR_TYPES = frozenset(t.value for t in ConnectorType) + +# --------------------------------------------------------------------------- +# connectors-sdk base config classes — each one hardcodes a connector type, +# so inheriting from one is the preferred way to set the type. +# --------------------------------------------------------------------------- +_SDK_BASE_CONFIG_CLASSES = { + "BaseExternalImportConnectorConfig", + "BaseInternalEnrichmentConnectorConfig", + "BaseStreamConnectorConfig", + "BaseInternalExportFileConnectorConfig", + "BaseInternalImportFileConnectorConfig", +} + + +@dataclass +class _Hit: + file_path: Path + line: int + + +@CheckRegistry.register( + code="VC303", + name="connector-type-hardcoded", + description="CONNECTOR_TYPE must be defined in application code, not read from env", + severity=Severity.ERROR, +) +def check_connector_type_hardcoded(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector type is hardcoded in the application, not read from env.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + if not trees: + return [ + CheckFinding( + message="No parseable Python source files found in src/", + severity=Severity.ERROR, + suggestion="Fix syntax errors in source files under src/", + ), + ] + + # --------------------------------------------------------------------------- + # 4-way detection: scan AST for each pattern in priority order. + # + # env_hit — ANTI-PATTERN: reading CONNECTOR_TYPE from env/config + # sdk_hit — PREFERRED: inheriting from SDK Base*ConnectorConfig + # hardcoded_hit — LEGACY OK: config["connector"]["type"] = "EXTERNAL_IMPORT" + # pydantic_hit — CUSTOM OK: type: Literal["EXTERNAL_IMPORT"] or Field(...) + # --------------------------------------------------------------------------- + env_hit: _Hit | None = None + sdk_hit: _Hit | None = None + hardcoded_hit: _Hit | None = None + pydantic_hit: _Hit | None = None + + for file_path, tree in trees.items(): + for node in ast.walk(tree): + # Anti-pattern: reading CONNECTOR_TYPE from environment + # (e.g. os.environ["CONNECTOR_TYPE"], get_config_variable(...)) + if env_hit is None and _reads_connector_type_from_env(node): + node = cast("ast.Call | ast.Subscript", node) + env_hit = _Hit(file_path=file_path, line=node.lineno) + + # Preferred: SDK base config class (inherits type automatically) + if ( + sdk_hit is None + and isinstance(node, ast.ClassDef) + and any( + _base_name(base) in _SDK_BASE_CONFIG_CLASSES for base in node.bases + ) + ): + sdk_hit = _Hit(file_path=file_path, line=node.lineno) + + # Legacy pycti-style: config["connector"]["type"] = "EXTERNAL_IMPORT" + # or dict literal: {"connector": {"type": "EXTERNAL_IMPORT"}} + if hardcoded_hit is None and isinstance(node, ast.Assign): + if any(_is_connector_type_target(t) for t in node.targets): + if _is_connector_type_value(node.value): + hardcoded_hit = _Hit(file_path=file_path, line=node.lineno) + + if hardcoded_hit is None and isinstance(node, ast.Dict): + hit_line = _find_type_in_connector_dict(node) + if hit_line is not None: + hardcoded_hit = _Hit(file_path=file_path, line=hit_line) + + # Custom Pydantic: type: Literal["EXTERNAL_IMPORT"] or Field(default=...) + if pydantic_hit is None and isinstance(node, ast.AnnAssign): + if isinstance(node.target, ast.Name) and node.target.id == "type": + if _is_literal_type_annotation( + node.annotation, + ) or _is_field_default( + node.value, + ): + pydantic_hit = _Hit(file_path=file_path, line=node.lineno) + + # --------------------------------------------------------------------------- + # Priority order for results: + # 1. env_hit → FAIL (anti-pattern, checked first because it overrides all) + # 2. sdk_hit → PASS (best practice) + # 3. hardcoded_hit → PASS (legacy but acceptable) + # 4. pydantic_hit → PASS (custom but acceptable) + # 5. nothing → FAIL + # --------------------------------------------------------------------------- + if env_hit: + file_path, line = env_hit.file_path, env_hit.line + return [ + CheckFinding( + message="CONNECTOR_TYPE is read from environment", + severity=Severity.ERROR, + file_path=file_path, + line=line, + suggestion=( + "Hardcode the connector type instead of reading from env. " + "Use connectors-sdk (e.g. BaseExternalImportConnectorConfig) " + 'or set config["connector"]["type"] = "EXTERNAL_IMPORT" directly' + ), + ), + ] + + # SDK-based approach (inherits from Base*ConnectorConfig) + if sdk_hit: + file_path, line = sdk_hit.file_path, sdk_hit.line + return [ + CheckFinding( + message="Connector type defined via connectors-sdk", + severity=Severity.INFO, + file_path=file_path, + line=line, + ), + ] + + # pycti-style hardcoded assignment + if hardcoded_hit: + file_path, line = hardcoded_hit.file_path, hardcoded_hit.line + return [ + CheckFinding( + message="Connector type hardcoded", + severity=Severity.WARNING, + file_path=file_path, + line=line, + ), + ] + + # Custom Pydantic Literal or Field default + if pydantic_hit: + file_path, line = pydantic_hit.file_path, pydantic_hit.line + return [ + CheckFinding( + message="Connector type hardcoded via Pydantic field", + severity=Severity.WARNING, + file_path=file_path, + line=line, + ), + ] + + # No type definition found at all + return [ + CheckFinding( + message="No CONNECTOR_TYPE definition found in application code", + severity=Severity.ERROR, + suggestion=( + "Hardcode the connector type in code. " + "Best: use connectors-sdk (e.g. BaseExternalImportConnectorConfig). " + 'Or: set config["connector"]["type"] = "EXTERNAL_IMPORT" in main.py' + ), + ), + ] + + +def _base_name(base: ast.expr) -> str: + """Extract the unqualified class name from a base class expression. + + Handles both direct references (Name) and qualified (Attribute): + BaseSettings → "BaseSettings" + connectors_sdk.BaseXxx → "BaseXxx" + """ + if isinstance(base, ast.Name): + return base.id + if isinstance(base, ast.Attribute): + return base.attr + return "" + + +def _constant_str(node: ast.expr | None) -> str | None: + """Extract a string constant from an AST node, or None if not a string literal.""" + if isinstance(node, ast.Constant) and isinstance(node.value, str): + return node.value + return None + + +def _subscript_key(node: ast.Subscript) -> str | None: + """Extract the string key from a subscript expression like d["key"].""" + slice_node = node.slice + if isinstance(slice_node, ast.Constant) and isinstance(slice_node.value, str): + return slice_node.value + return None + + +def _is_connector_type_target(node: ast.expr) -> bool: + """Check if node is the assignment target config["connector"]["type"]. + + Matches the nested subscript pattern: outer["type"] on inner["connector"]. + """ + if not isinstance(node, ast.Subscript): + return False + if _subscript_key(node) != "type": + return False + inner = node.value + return isinstance(inner, ast.Subscript) and _subscript_key(inner) == "connector" + + +def _is_connector_type_value(node: ast.expr) -> bool: + """Check if node is a string literal matching a valid connector type.""" + value = _constant_str(node) + return value in _CONNECTOR_TYPES if value else False + + +def _find_type_in_connector_dict(node: ast.Dict) -> int | None: + """Find ``"type": "EXTERNAL_IMPORT"`` inside a ``"connector"`` dict literal. + + Matches the pattern:: + + { + "connector": { + "type": "EXTERNAL_IMPORT", # ← returns this line + } + } + + Returns the line number of the ``"type"`` key, or None if not found. + """ + for key, value in zip(node.keys, node.values): + if _constant_str(key) == "connector" and isinstance(value, ast.Dict): + for inner_key, inner_value in zip(value.keys, value.values): + if _constant_str(inner_key) == "type" and _is_connector_type_value( + inner_value + ): + return getattr(inner_key, "lineno", node.lineno) + return None + + +def _is_literal_type_annotation(node: ast.expr) -> bool: + """Check if node is a Literal type annotation containing a valid connector type. + + Matches: + Literal["EXTERNAL_IMPORT"] → single-value Literal + Literal["EXTERNAL_IMPORT", ...] → multi-value Literal (tuple slice) + """ + if not isinstance(node, ast.Subscript): + return False + if not isinstance(node.value, ast.Name) or node.value.id != "Literal": + return False + + slice_node = node.slice + # Single-value Literal: Literal["EXTERNAL_IMPORT"] + if isinstance(slice_node, ast.Constant): + return ( + isinstance(slice_node.value, str) and slice_node.value in _CONNECTOR_TYPES + ) + # Multi-value Literal: Literal["EXTERNAL_IMPORT", "STREAM"] + if isinstance(slice_node, ast.Tuple): + for elt in slice_node.elts: + if isinstance(elt, ast.Constant) and isinstance(elt.value, str): + if elt.value in _CONNECTOR_TYPES: + return True + return False + + +def _is_field_default(node: ast.expr | None) -> bool: + """Check if node is a Pydantic Field() with a valid connector type as default. + + Matches: + Field(default="EXTERNAL_IMPORT") → keyword default + Field("EXTERNAL_IMPORT") → positional default + """ + if not isinstance(node, ast.Call): + return False + func_name = "" + if isinstance(node.func, ast.Name): + func_name = node.func.id + elif isinstance(node.func, ast.Attribute): + func_name = node.func.attr + if func_name != "Field": + return False + + # Check keyword: Field(default="EXTERNAL_IMPORT") + for kw in node.keywords: + if kw.arg == "default" and _is_connector_type_value(kw.value): + return True + # Check positional: Field("EXTERNAL_IMPORT") + return bool(node.args) and _is_connector_type_value(node.args[0]) + + +def _is_call_with_connector_type_arg(node: ast.Call, names: set[str]) -> bool: + """Check if a function call (by name) has "CONNECTOR_TYPE" as first argument. + + Matches both bare calls (``getenv(...)``) and qualified calls + (``self.helper.get_config_variable(...)``). + """ + func_name = "" + if isinstance(node.func, ast.Name): + func_name = node.func.id + elif isinstance(node.func, ast.Attribute): + func_name = node.func.attr + if func_name not in names or not node.args: + return False + return _constant_str(node.args[0]) == "CONNECTOR_TYPE" + + +def _reads_connector_type_from_env(node: ast.AST) -> bool: + """Detect if an AST node reads CONNECTOR_TYPE from the environment. + + Covers multiple patterns: + - os.environ["CONNECTOR_TYPE"] (Subscript on Attribute) + - environ["CONNECTOR_TYPE"] (Subscript on Name) + - get_config_variable("CONNECTOR_TYPE") (pycti helper) + - getenv("CONNECTOR_TYPE") (bare or os.getenv) + - os.environ.get("CONNECTOR_TYPE") (dict .get() method) + """ + # --- Subscript patterns: environ["CONNECTOR_TYPE"] --- + if isinstance(node, ast.Subscript): + key = _subscript_key(node) + if key != "CONNECTOR_TYPE": + return False + # os.environ["CONNECTOR_TYPE"] + if isinstance(node.value, ast.Attribute): + return ( + isinstance(node.value.value, ast.Name) + and node.value.value.id == "os" + and node.value.attr == "environ" + ) + # environ["CONNECTOR_TYPE"] + return isinstance(node.value, ast.Name) and node.value.id == "environ" + + # --- Call patterns: function calls that read from env --- + if isinstance(node, ast.Call): + # Bare function calls: get_config_variable("CONNECTOR_TYPE") + # or getenv("CONNECTOR_TYPE") (from os import getenv) + if isinstance(node.func, ast.Name) and _is_call_with_connector_type_arg( + node, {"get_config_variable", "getenv"} + ): + return True + # Qualified method calls: self.helper.get_config_variable("CONNECTOR_TYPE") + # (but NOT arbitrary .getenv — that's handled separately for os.getenv only) + if isinstance(node.func, ast.Attribute) and _is_call_with_connector_type_arg( + node, {"get_config_variable"} + ): + return True + if isinstance(node.func, ast.Attribute): + # os.getenv("CONNECTOR_TYPE") — verify receiver is `os` + if ( + node.func.attr == "getenv" + and isinstance(node.func.value, ast.Name) + and node.func.value.id == "os" + and node.args + ): + if _constant_str(node.args[0]) == "CONNECTOR_TYPE": + return True + # os.environ.get("CONNECTOR_TYPE") + if node.func.attr == "get" and node.args: + if _constant_str(node.args[0]) != "CONNECTOR_TYPE": + return False + receiver = node.func.value + if isinstance(receiver, ast.Attribute): + return ( + isinstance(receiver.value, ast.Name) + and receiver.value.id == "os" + and receiver.attr == "environ" + ) + return isinstance(receiver, ast.Name) and receiver.id == "environ" + return False diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc304_markings_checked.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc304_markings_checked.py new file mode 100644 index 00000000000..aee14233d75 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc304_markings_checked.py @@ -0,0 +1,133 @@ +"""VC304 — Ensure TLP markings are checked before processing entities. + +Uses AST to detect ``check_max_tlp()`` calls (avoids false positives +from comments/docstrings mentioning check_max_tlp). + +Scope: INTERNAL_ENRICHMENT only — enrichment connectors receive external +entities and must verify TLP markings before processing them. Other +connector types produce their own entities and set TLP themselves. +""" + +import ast +from pathlib import Path + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + + +def _find_check_max_tlp_calls( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int]]: + """Find check_max_tlp() calls via AST. + + Matches two call patterns: + 1. self.helper.check_max_tlp(...) — Attribute node (method call) + 2. check_max_tlp(...) — Name node (bare function call) + """ + hits: list[tuple[Path, int]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + func = node.func + if (isinstance(func, ast.Attribute) and func.attr == "check_max_tlp") or ( + isinstance(func, ast.Name) and func.id == "check_max_tlp" + ): + hits.append((file_path, node.lineno)) + return hits + + +def _find_object_marking_access( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int]]: + """Find objectMarking accesses via AST (string constants or attributes). + + Checks two AST node types because "objectMarking" can appear as: + - Constant: entity["objectMarking"] (dict key string literal) + - Attribute: entity.objectMarking (attribute access) + """ + hits: list[tuple[Path, int]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if (isinstance(node, ast.Constant) and node.value == "objectMarking") or ( + isinstance(node, ast.Attribute) and node.attr == "objectMarking" + ): + hits.append((file_path, node.lineno)) + return hits + + +@CheckRegistry.register( + code="VC304", + name="markings-checked", + description="Ensure TLP markings are checked before processing entities", + severity=Severity.ERROR, + applicable_types={ConnectorType.INTERNAL_ENRICHMENT}, +) +def check_markings_checked(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector verifies TLP markings via check_max_tlp.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + + # --------------------------------------------------------------------------- + # 3-tier detection: + # 1. check_max_tlp() call found → PASS (best: proper TLP validation) + # 2. objectMarking access found → FAIL (TLP extracted but not validated) + # 3. Nothing found → FAIL (no TLP handling at all) + # --------------------------------------------------------------------------- + + # Tier 1: check for check_max_tlp call (AST-based) + tlp_check_hits = _find_check_max_tlp_calls(trees) + + if tlp_check_hits: + file_path, line = tlp_check_hits[0] + return [ + CheckFinding( + message="check_max_tlp() found", + severity=Severity.INFO, + file_path=file_path, + line=line, + ), + ] + + # Tier 2: check_max_tlp not found — check if there's at least TLP extraction + extract_hits = _find_object_marking_access(trees) + + if extract_hits: + file_path, line = extract_hits[0] + return [ + CheckFinding( + message=( + f"TLP extraction found in {file_path}:{line} " + "but check_max_tlp is not called" + ), + severity=Severity.WARNING, + file_path=file_path, + line=line, + suggestion=( + "Add self.helper.check_max_tlp(self.tlp, self.config.max_tlp) " + "to validate TLP before processing the entity" + ), + ), + ] + + return [ + CheckFinding( + message="No TLP marking check found in connector source", + severity=Severity.ERROR, + suggestion=( + 'Implement TLP checking: extract TLP from opencti_entity["objectMarking"], ' + "then call self.helper.check_max_tlp(entity_tlp, max_tlp) " + "and reject processing if it returns False" + ), + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc305_sdk_base_settings.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc305_sdk_base_settings.py new file mode 100644 index 00000000000..13370b4090a --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc305_sdk_base_settings.py @@ -0,0 +1,156 @@ +"""VC305 — Connector must implement Base Settings from connectors-sdk.""" + +import re + +from connector_linter.checks.vc3xx_code._helpers import ( + find_classes, + find_imports, + find_pattern_locations, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Legacy anti-pattern: get_config_variable() from pycti. +# +# This is the old-style configuration loading that manually reads each env +# variable via pycti's get_config_variable(). Should be replaced with +# connectors-sdk's typed Pydantic settings. +# --------------------------------------------------------------------------- +_LEGACY_CONFIG_PATTERNS = [ + re.compile(r"""get_config_variable\s*\(""", re.MULTILINE), +] + + +@CheckRegistry.register( + code="VC305", + name="sdk-base-settings", + description="Connector must implement Base Settings from connectors-sdk", + severity=Severity.ERROR, +) +def check_sdk_base_settings(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector uses BaseConnectorSettings from connectors-sdk.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + + # --------------------------------------------------------------------------- + # 4-tier detection (best → worst): + # + # 1. SDK import + class inheriting BaseConnectorSettings → PASS (best) + # 2. Legacy get_config_variable() calls → FAIL (worst, must migrate) + # 3. Custom pydantic_settings.BaseSettings → PASS with WARNING + # 4. Nothing found → FAIL + # + # Note: SDK import AND class are both required — importing without + # defining a subclass means the migration is incomplete. + # --------------------------------------------------------------------------- + + # Tier 1: check for connectors-sdk BaseConnectorSettings (import + class) + sdk_imports = find_imports( + trees, + module_pattern=r"^connectors_sdk", + name_pattern=r"^BaseConnectorSettings$", + ) + sdk_classes = find_classes(trees, base_name="BaseConnectorSettings") + + # Both import AND class must exist — import alone = incomplete migration + if sdk_imports and sdk_classes: + cls = sdk_classes[0] + return [ + CheckFinding( + message="BaseConnectorSettings implemented", + severity=Severity.INFO, + file_path=cls.file_path, + line=cls.line, + ), + ] + + # Tier 2: check for legacy get_config_variable (regex — AST not needed here) + legacy_hits = find_pattern_locations(sources, _LEGACY_CONFIG_PATTERNS) + if legacy_hits: + file_path, line, _ = legacy_hits[0] + return [ + CheckFinding( + message=( + f"Legacy get_config_variable() found in {file_path}:{line} " + f"({len(legacy_hits)} call(s) total)" + ), + severity=Severity.ERROR, + file_path=file_path, + line=line, + suggestion=( + "Replace get_config_variable() calls with connectors-sdk " + "BaseConnectorSettings. Create a settings.py with a class " + "inheriting BaseConnectorSettings and use typed Pydantic fields " + "for configuration (see connectors-sdk documentation)" + ), + ), + ] + + # Tier 3: check for custom pydantic_settings.BaseSettings (intermediate) + # This is acceptable but not ideal — connectors-sdk provides more features + pydantic_imports = find_imports( + trees, + module_pattern=r"^pydantic_settings$", + name_pattern=r"^BaseSettings$", + ) + if pydantic_imports: + pydantic_classes = find_classes(trees, base_name="BaseSettings") + if pydantic_classes: + cls = pydantic_classes[0] + return [ + CheckFinding( + message=( + f"Custom pydantic BaseSettings found in {cls.file_path}:{cls.line} " + "instead of connectors-sdk BaseConnectorSettings" + ), + severity=Severity.WARNING, + file_path=cls.file_path, + line=cls.line, + suggestion=( + "Consider migrating to connectors-sdk BaseConnectorSettings " + "which provides built-in config loading (env, YAML, .env), " + "deprecation management, and JSON schema generation" + ), + ), + ] + imp = pydantic_imports[0] + return [ + CheckFinding( + message=( + f"Custom pydantic BaseSettings imported in {imp.file_path}:{imp.line} " + "instead of connectors-sdk BaseConnectorSettings" + ), + severity=Severity.WARNING, + file_path=imp.file_path, + line=imp.line, + suggestion=( + "Consider migrating to connectors-sdk BaseConnectorSettings " + "which provides built-in config loading (env, YAML, .env), " + "deprecation management, and JSON schema generation" + ), + ), + ] + + # No settings pattern found at all + return [ + CheckFinding( + message="No settings implementation found in connector source", + severity=Severity.ERROR, + suggestion=( + "Create a settings.py file with a class inheriting from " + "connectors_sdk.BaseConnectorSettings. Use the appropriate " + "typed connector config (e.g. BaseExternalImportConnectorConfig) " + "and define connector-specific settings with BaseConfigModel" + ), + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc306_log_level_default.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc306_log_level_default.py new file mode 100644 index 00000000000..4ccd828d251 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc306_log_level_default.py @@ -0,0 +1,112 @@ +"""VC306 — Connector log level should default to 'error'. + +Severity: WARNING — default log level is a best practice, not a hard +requirement. Connectors that default to debug/info produce excessive +logging in production but still function correctly. +""" + +from connector_linter.checks.vc3xx_code._helpers import ( + find_field_defaults, + find_imports, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Accepted values for the "error" log level. +# Both "error" and "err" are valid — some logging frameworks use the short form. +# --------------------------------------------------------------------------- +_ERROR_VALUES = {"error", "err"} + + +@CheckRegistry.register( + code="VC306", + name="log-level-default-error", + description="Connector log level should default to 'error'", + severity=Severity.WARNING, +) +def check_log_level_default(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector's default log level is 'error'.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + + # --------------------------------------------------------------------------- + # 3 detection paths: + # + # 1. Explicit field default — log_level found in a class body with a default + # value. Check if the value is "error"/"err". + # 2. SDK base config — connector inherits from Base*ConnectorConfig which + # already defaults log_level to "error" (no override needed). + # 3. Nothing found — no log_level config at all (PASS with suggestion). + # --------------------------------------------------------------------------- + + # Path 1: look for explicit log_level field default in any class + field_defaults = find_field_defaults(trees, field_name="log_level") + + if field_defaults: + fd = field_defaults[0] + if fd.default_value and fd.default_value in _ERROR_VALUES: + return [ + CheckFinding( + message=f"Log level defaults to '{fd.default_value}'", + severity=Severity.INFO, + file_path=fd.file_path, + line=fd.line, + ), + ] + display_val = fd.default_value or "unknown" + return [ + CheckFinding( + message=( + f"Log level defaults to '{display_val}' in {fd.file_path}:{fd.line} " + "(should be 'error')" + ), + severity=Severity.WARNING, + file_path=fd.file_path, + line=fd.line, + suggestion=( + "Set the default log level to 'error'. " + "DEBUG logs are useful but should require explicit opt-in. " + "Use connectors-sdk BaseConnectorSettings which defaults to 'error'" + ), + ), + ] + + # Path 2: no explicit log_level override — check if using SDK base + # (inherits "error" as default, so no override is needed) + sdk_imports = find_imports( + trees, + module_pattern=r"^connectors_sdk", + name_pattern=r"^Base(ExternalImport|InternalEnrichment|Stream|InternalExportFile|InternalImportFile)ConnectorConfig$", + ) + if sdk_imports: + imp = sdk_imports[0] + return [ + CheckFinding( + message="Log level inherited from SDK (defaults to 'error')", + severity=Severity.INFO, + file_path=imp.file_path, + line=imp.line, + ), + ] + + # Path 3: no log_level configuration found at all + return [ + CheckFinding( + message="No log_level default configuration found", + severity=Severity.WARNING, + suggestion=( + "Use connectors-sdk BaseConnectorSettings which defaults log_level to 'error'. " + "If using custom config, set log_level default to 'error'" + ), + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc307_except_logging.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc307_except_logging.py new file mode 100644 index 00000000000..86c0247b359 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc307_except_logging.py @@ -0,0 +1,109 @@ +"""VC307 — Except blocks should use error/warning logging, not debug/info.""" + +from connector_linter.checks.vc3xx_code._helpers import ( + find_calls_in_stmts, + find_except_blocks, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Logging level separation: +# +# LOW_LEVEL_METHODS — debug/info: informational, not appropriate as the +# ONLY log in an except block (errors should be loud) +# HIGH_LEVEL_METHODS — error/warning/critical/exception: appropriate for +# except blocks (signal something went wrong) +# +# Having debug/info as supplementary logs alongside error/warning is OK — +# only flag when debug/info is the ONLY logging in the except block. +# --------------------------------------------------------------------------- +_LOW_LEVEL_METHODS = {"debug", "info"} +_HIGH_LEVEL_METHODS = {"error", "warning", "warn", "critical", "exception"} + +# --------------------------------------------------------------------------- +# Exception types exempt from the rule: +# +# KeyboardInterrupt and SystemExit are used for graceful shutdown (CTRL+C, +# sys.exit). Logging them at debug/info is appropriate since they're expected +# control flow, not errors. +# --------------------------------------------------------------------------- +_EXEMPT_EXCEPTIONS = {"KeyboardInterrupt", "SystemExit"} + + +@CheckRegistry.register( + code="VC307", + name="except-logging-level", + description="Except blocks should use error/warning logging, not debug/info", + severity=Severity.WARNING, +) +def check_except_logging_level(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that except blocks use error/warning level logging.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + except_blocks = find_except_blocks(trees) + + issues: list[CheckFinding] = [] + + for block in except_blocks: + # Skip exempt exceptions (KeyboardInterrupt, SystemExit) + if block.exception_types and set(block.exception_types).issubset( + _EXEMPT_EXCEPTIONS, + ): + continue + + # Find all logging calls in the except body (scoped to this block only) + all_log_methods = _LOW_LEVEL_METHODS | _HIGH_LEVEL_METHODS + log_calls = find_calls_in_stmts( + block.body, + func_names=all_log_methods, + file_path=block.file_path, + ) + + if not log_calls: + # No logging at all in this except block — separate concern, + # not flagged by this check (could be a different rule) + continue + + has_high = any(c.func_name in _HIGH_LEVEL_METHODS for c in log_calls) + low_calls = [c for c in log_calls if c.func_name in _LOW_LEVEL_METHODS] + + # Only flag when debug/info is the ONLY log level used in the block. + # If error/warning is also present, the debug/info is supplementary and OK. + if low_calls and not has_high: + call = low_calls[0] + issues.append( + CheckFinding( + message=( + f"Except block at {block.file_path}:{block.line} uses " + f"only {call.func_name}() logging (line {call.line})" + ), + severity=Severity.WARNING, + file_path=block.file_path, + line=block.line, + suggestion=( + "Use logger.error() or logger.warning() in except blocks. " + "debug/info can be used as supplementary logs alongside " + "error/warning, but should not be the only log level" + ), + ), + ) + + if not issues: + return [ + CheckFinding( + message="All except blocks use appropriate logging levels", + severity=Severity.INFO, + ), + ] + + return issues diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc308_main_traceback.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc308_main_traceback.py new file mode 100644 index 00000000000..831f0c17356 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc308_main_traceback.py @@ -0,0 +1,219 @@ +"""VC308 — Use a traceback in the main entry point.""" + +import ast +from pathlib import Path + +from connector_linter.checks.vc3xx_code._helpers import ( + find_calls_in_stmts, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + + +def _check_main_structure( + source: str, + file_path: Path, +) -> tuple[bool, bool, bool, int | None]: + """Analyze the main.py structure. + + Returns a 4-value tuple: + has_traceback_import — `import traceback` or `from traceback import ...` found + has_main_guard — `if __name__ == "__main__":` guard found + has_try_traceback — try/except with traceback.print_exc() inside main guard + main_guard_line — line number of the main guard (for reporting) + """ + try: + tree = ast.parse(source, filename=str(file_path)) + except SyntaxError: + return False, False, False, None + + has_traceback_import = False + has_main_guard = False + has_try_traceback = False + main_guard_line: int | None = None + + for node in ast.walk(tree): + # Check for `import traceback` + if isinstance(node, ast.Import): + for alias in node.names: + if alias.name == "traceback": + has_traceback_import = True + + # Check for `from traceback import ...` + if isinstance(node, ast.ImportFrom) and node.module == "traceback": + has_traceback_import = True + + # Find `if __name__ == "__main__":` at module level + for node in tree.body: + if not isinstance(node, ast.If): + continue + if _is_main_guard(node): + has_main_guard = True + main_guard_line = node.lineno + # Check body for try/except with traceback.print_exc() + has_try_traceback = _has_try_with_traceback(node.body, file_path) + break + + return has_traceback_import, has_main_guard, has_try_traceback, main_guard_line + + +def _is_main_guard(node: ast.If) -> bool: + """Check if an If node is `if __name__ == "__main__":`. + + Handles both comparison orders: + __name__ == "__main__" (standard) + "__main__" == __name__ (reversed, less common but valid) + """ + test = node.test + if isinstance(test, ast.Compare) and len(test.ops) == 1: + if isinstance(test.ops[0], ast.Eq): + left = test.left + comparator = test.comparators[0] + # __name__ == "__main__" or "__main__" == __name__ + if ( + isinstance(left, ast.Name) + and left.id == "__name__" + and isinstance(comparator, ast.Constant) + and comparator.value == "__main__" + ): + return True + if ( + isinstance(left, ast.Constant) + and left.value == "__main__" + and isinstance(comparator, ast.Name) + and comparator.id == "__name__" + ): + return True + return False + + +def _has_try_with_traceback(stmts: list[ast.stmt], file_path: Path) -> bool: + """Check if statements contain a try/except with traceback.print_exc(). + + Uses a recursive search strategy to handle multiple code patterns: + 1. Direct try/except in the main guard body + 2. Try/except inside a function defined in the main guard + (e.g. some connectors wrap logic in a main() function) + 3. Fallback: ast.walk over all nested nodes to catch any other nesting + """ + for stmt in stmts: + # Pattern 1: direct try in the main guard body + if isinstance(stmt, ast.Try): + if _except_has_traceback(stmt.handlers, file_path): + return True + # Pattern 2: function def called from main guard (e.g. def main(): try: ...) + if isinstance(stmt, ast.FunctionDef): + for inner in stmt.body: + if isinstance(inner, ast.Try): + if _except_has_traceback(inner.handlers, file_path): + return True + # Pattern 3: walk all nested nodes as fallback + for node in ast.walk(stmt): + if isinstance(node, ast.Try): + if _except_has_traceback(node.handlers, file_path): + return True + return False + + +def _except_has_traceback(handlers: list[ast.ExceptHandler], file_path: Path) -> bool: + """Check if any except handler calls traceback.print_exc(). + + Verifies the receiver contains "traceback" to distinguish from unrelated + print_exc-like functions. Also accepts bare print_exc() calls (valid when + `from traceback import print_exc` is used). + """ + for handler in handlers: + calls = find_calls_in_stmts( + handler.body, + func_names={"print_exc"}, + file_path=file_path, + ) + for call in calls: + # Check receiver for "traceback" module (e.g. traceback.print_exc()) + if call.receiver and "traceback" in call.receiver: + return True + # Bare print_exc() — valid if `from traceback import print_exc` + if call.receiver is None: + return True + return False + + +@CheckRegistry.register( + code="VC308", + name="main-traceback", + description="Main entry point must use traceback for error handling", + severity=Severity.ERROR, +) +def check_main_traceback(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that main.py uses if __name__ guard with try/except and traceback.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + # Find main.py + main_file = None + main_content = None + for file_path, content in sources.items(): + if file_path.name == "main.py": + main_file = file_path + main_content = content + break + + if main_file is None or main_content is None: + return [ + CheckFinding( + message="No main.py found in src/", + severity=Severity.ERROR, + suggestion="Connector must have a main.py entry point under src/", + ), + ] + + has_import, has_guard, has_try_tb, guard_line = _check_main_structure( + main_content, + main_file, + ) + + # Accumulate all issues found — multiple problems may coexist + issues: list[str] = [] + if not has_import: + issues.append("missing `import traceback`") + if not has_guard: + issues.append('missing `if __name__ == "__main__":` guard') + if has_guard and not has_try_tb: + issues.append("missing `try/except` with `traceback.print_exc()` in main guard") + + if not issues: + return [ + CheckFinding( + message=f"Main entry point has proper error handling in {main_file}", + severity=Severity.INFO, + file_path=main_file, + line=guard_line, + ), + ] + + return [ + CheckFinding( + message=f"Main entry point issues in {main_file}: {'; '.join(issues)}", + severity=Severity.ERROR, + file_path=main_file, + line=guard_line or 1, + suggestion=( + "Use this pattern in main.py:\n" + " import traceback\n" + ' if __name__ == "__main__":\n' + " try:\n" + " connector = MyConnector()\n" + " connector.run()\n" + " except Exception:\n" + " traceback.print_exc()\n" + " exit(1)" + ), + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc309_absolute_imports.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc309_absolute_imports.py new file mode 100644 index 00000000000..bee90052bc3 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc309_absolute_imports.py @@ -0,0 +1,79 @@ +"""VC309 — Connector must use only absolute imports, no relative imports. + +Severity: ERROR — Docker containers run connectors as installed packages. +Relative imports (from . import X) break when the package structure doesn't +match the development layout, causing ImportError at container startup. +""" + +import ast +from typing import TYPE_CHECKING + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +if TYPE_CHECKING: + from pathlib import Path + + +@CheckRegistry.register( + code="VC309", + name="absolute-imports-only", + description="Connector must use only absolute imports, no relative imports", + severity=Severity.ERROR, +) +def check_absolute_imports_only(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector uses only absolute imports (no relative from . / from ..).""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + + # --------------------------------------------------------------------------- + # Detect relative imports via AST. + # + # In Python's AST, ImportFrom nodes have a `level` field: + # level=0 → absolute import (from package import X) + # level=1 → relative import (from . import X) + # level=2 → parent relative (from .. import X) + # etc. + # --------------------------------------------------------------------------- + relative_imports: list[tuple[Path, int, str]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom) and node.level and node.level > 0: + # Reconstruct the import statement for display in findings + dots = "." * node.level + module = node.module or "" + names = ", ".join(alias.name for alias in node.names) + stmt = f"from {dots}{module} import {names}" + relative_imports.append((file_path, node.lineno, stmt)) + + if not relative_imports: + return [ + CheckFinding( + message="All imports are absolute", + severity=Severity.INFO, + ), + ] + + # Report all relative imports + results: list[CheckFinding] = [] + for file_path, line_no, stmt in relative_imports: + results.append( + CheckFinding( + message=f"Relative import: {stmt}", + severity=Severity.ERROR, + file_path=file_path, + line=line_no, + suggestion="Replace with an absolute import", + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc310_external_references.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc310_external_references.py new file mode 100644 index 00000000000..369b18bc6c5 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc310_external_references.py @@ -0,0 +1,191 @@ +"""VC310 — External references must not be added by default to all entities. + +Adding default external references to all entities can trigger enrichment +connectors and create unnecessary ingestion, potentially causing platform +timeouts. External references using ``self.external_reference(s)`` should +only appear on Identity (Organization) objects. Dynamic per-entity external +references created as local variables are not flagged by this check. +""" + +import ast +from pathlib import Path + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Identity type names are EXEMPT from this check. +# +# External references on Identity/Organization objects are expected (they +# describe the connector author). Only non-Identity types are flagged. +# Matching is case-insensitive. +# --------------------------------------------------------------------------- +_IDENTITY_NAMES = {"identity", "organizationauthor"} + + +def _get_call_type_name(call_node: ast.Call) -> str | None: + """Extract the STIX type name from a Call node. + + Distinguishes two call patterns: + - ast.Attribute: stix2.Identity(...) → returns "Identity" (the attr) + - ast.Name: Identity(...) → returns "Identity" (the id) + Returns None for complex expressions (e.g. pycti.Identity.generate_id()). + """ + func = call_node.func + if isinstance(func, ast.Attribute): + # stix2.Identity(...) → attr = "Identity" + return func.attr + if isinstance(func, ast.Name): + # Identity(...) + return func.id + return None + + +def _references_self_ext_ref(node: ast.expr) -> bool: + """Check if an AST expression references self.external_reference(s). + + Detects the pattern where a connector stores its external reference as + an instance attribute and spreads it to all STIX objects. Handles: + - self.external_reference (singular) + - self.external_references (plural) + - [self.external_reference] (wrapped in a list literal) + - [self.external_references] + """ + if isinstance(node, ast.Attribute): + if ( + isinstance(node.value, ast.Name) + and node.value.id == "self" + and node.attr in ("external_reference", "external_references") + ): + return True + elif isinstance(node, ast.List): + return any(_references_self_ext_ref(elt) for elt in node.elts) + return False + + +def _check_custom_properties_dict(node: ast.expr) -> bool: + """Check if a custom_properties dict contains x_opencti_external_references + referencing self.external_reference(s). + + Detects the pattern where external references are smuggled through + custom_properties instead of the standard external_references kwarg: + custom_properties={"x_opencti_external_references": self.external_references} + """ + if not isinstance(node, ast.Dict): + return False + for key, value in zip(node.keys, node.values, strict=False): + if ( + key is not None + and isinstance(key, ast.Constant) + and key.value == "x_opencti_external_references" + and value is not None + and _references_self_ext_ref(value) + ): + return True + return False + + +def _is_identity_type(type_name: str) -> bool: + """Check if a STIX type name refers to an Identity object.""" + return type_name.lower() in _IDENTITY_NAMES + + +def _find_violations(tree: ast.Module, file_path: Path) -> list[tuple[Path, int, str]]: + """Find all calls where self.external_reference(s) is used on non-Identity objects. + + Detection flow: + 1. Walk all Call nodes in the AST + 2. Skip calls to Identity-type constructors (exempt) + 3. Check keyword arguments for: + a. external_references=self.external_reference(s) (direct kwarg) + b. custom_properties={...x_opencti_external_references: self.ext_ref...} + + Returns list of (file_path, line_no, description). + """ + violations: list[tuple[Path, int, str]] = [] + + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + + type_name = _get_call_type_name(node) + if type_name and _is_identity_type(type_name): + continue + + for kw in node.keywords: + if kw.arg == "external_references" and _references_self_ext_ref(kw.value): + context = f"on {type_name}()" if type_name else "in call" + violations.append( + ( + file_path, + kw.value.lineno, + f"Default external_references=self.external_reference(s) {context}", + ), + ) + + if kw.arg == "custom_properties" and _check_custom_properties_dict( + kw.value, + ): + context = f"on {type_name}()" if type_name else "in call" + violations.append( + ( + file_path, + kw.value.lineno, + f"Default x_opencti_external_references via self.external_reference(s) {context}", + ), + ) + + return violations + + +@CheckRegistry.register( + code="VC310", + name="external-references-not-default", + description=( + "External references must not be added by default to all entities; " + "only add on Identity (Organization)" + ), + severity=Severity.ERROR, +) +def check_external_references_not_default(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that self.external_reference(s) is not spread to non-Identity STIX objects.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + + all_violations: list[tuple[Path, int, str]] = [] + for file_path, tree in trees.items(): + all_violations.extend(_find_violations(tree, file_path)) + + if not all_violations: + return [ + CheckFinding( + message="No default external references spread to non-Identity objects", + severity=Severity.INFO, + ), + ] + + results: list[CheckFinding] = [] + for file_path, line_no, description in all_violations: + results.append( + CheckFinding( + message=f"{description}", + severity=Severity.ERROR, + file_path=file_path, + line=line_no, + suggestion=( + "Remove default external_references from non-Identity objects. " + "Only add them on Identity (Organization). See issue #4210." + ), + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc311_tlp_markings.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc311_tlp_markings.py new file mode 100644 index 00000000000..2124dd46d2a --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc311_tlp_markings.py @@ -0,0 +1,234 @@ +"""VC311 — Connector should use TLP markings on entities. + +- Paid source (Commercial Threat Intel): should use TLP:AMBER or TLP:RED +- Free source (Open Source Threat Intel): should use TLP:CLEAR (or TLP:WHITE) + +The source type is inferred from ``connector_manifest.json`` ``use_cases`` +field, but it is not 100% reliable — hence this check is WARNING only. +""" + +import re +from pathlib import Path + +from connector_linter.checks.vc3xx_code._helpers import ( + find_pattern_locations, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Pattern list 1: General TLP usage patterns. +# +# Detects ANY TLP-related code — constants, marking definition refs, etc. +# Used for sub-check A (basic TLP usage detection). +# --------------------------------------------------------------------------- +_TLP_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"\bTLP_WHITE\b"), + re.compile(r"\bTLP_GREEN\b"), + re.compile(r"\bTLP_AMBER\b"), + re.compile(r"\bTLP_RED\b"), + re.compile(r"\bTLP_CLEAR\b"), + re.compile(r"\bTLPMarking\b"), + re.compile(r"\bobject_marking_refs\b"), + re.compile(r"\bTLP_MARKING_DEFINITION_MAPPING\b"), + re.compile(r"marking-definition--"), + re.compile(r"TLP:(?:WHITE|CLEAR|GREEN|AMBER|RED|AMBER\+STRICT)", re.IGNORECASE), +] + +# --------------------------------------------------------------------------- +# Pattern list 2: Paid/commercial TLP levels (TLP:AMBER, TLP:RED). +# +# These are the restrictive TLP levels appropriate for commercial threat +# intelligence sources (data that shouldn't be freely shared). +# --------------------------------------------------------------------------- +_PAID_TLP_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"\bTLP_AMBER\b"), + re.compile(r"\bTLP_RED\b"), + re.compile(r"TLP:AMBER", re.IGNORECASE), + re.compile(r"TLP:RED", re.IGNORECASE), + re.compile(r"AMBER\+STRICT", re.IGNORECASE), + re.compile(r"""['"]amber['"]""", re.IGNORECASE), + re.compile(r"""['"]red['"]""", re.IGNORECASE), + re.compile(r"""['"]amber\+strict['"]""", re.IGNORECASE), +] + +# --------------------------------------------------------------------------- +# Pattern list 3: Free/open-source TLP levels (TLP:CLEAR, TLP:WHITE). +# +# These are the permissive TLP levels appropriate for open-source threat +# intelligence sources (freely shareable data). +# --------------------------------------------------------------------------- +_FREE_TLP_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"\bTLP_WHITE\b"), + re.compile(r"\bTLP_CLEAR\b"), + re.compile(r"TLP:WHITE", re.IGNORECASE), + re.compile(r"TLP:CLEAR", re.IGNORECASE), + re.compile(r"""['"]white['"]""", re.IGNORECASE), + re.compile(r"""['"]clear['"]""", re.IGNORECASE), +] + + +def _has_tlp_usage(sources: dict[Path, str]) -> bool: + """Check if any TLP-related pattern exists in the source files. + + Uses an any-match approach: returns True as soon as any pattern matches + in any source file. Doesn't need to find all matches. + """ + for content in sources.values(): + for pattern in _TLP_PATTERNS: + if pattern.search(content): + return True + return False + + +def _detect_tlp_levels(sources: dict[Path, str]) -> tuple[bool, bool]: + """Detect which TLP levels are used in source files. + + Distinguishes paid (AMBER/RED) from free (CLEAR/WHITE) TLP levels + to determine if the level matches the connector's source type. + Both can be True if the connector uses multiple TLP levels. + + Returns (has_paid_level, has_free_level). + """ + has_paid = False + has_free = False + for content in sources.values(): + for pattern in _PAID_TLP_PATTERNS: + if pattern.search(content): + has_paid = True + for pattern in _FREE_TLP_PATTERNS: + if pattern.search(content): + has_free = True + return has_paid, has_free + + +def _get_source_type(ctx: ConnectorContext) -> str | None: + """Infer source type from connector_manifest.json use_cases. + + Uses a manifest-based heuristic: scans the use_cases list for keywords + like "commercial", "paid", "open source", "free". This is not 100% + reliable — connectors may not declare use_cases, or the wording may + not match. Returns None when undetermined. + + Returns "commercial", "open_source", or None if undetermined. + """ + manifest = ctx.manifest + if not manifest: + return None + + use_cases = manifest.get("use_cases", []) + if not isinstance(use_cases, list): + return None + + for uc in use_cases: + if not isinstance(uc, str): + continue + lower = uc.lower() + if "commercial" in lower or "paid" in lower: + return "commercial" + if "open source" in lower or "free" in lower: + return "open_source" + + return None + + +@CheckRegistry.register( + code="VC311", + name="tlp-markings-on-entities", + description="Connector should use TLP markings on entities with appropriate level", + severity=Severity.WARNING, + applicable_types={ + ConnectorType.INTERNAL_ENRICHMENT, + ConnectorType.EXTERNAL_IMPORT, + }, +) +def check_tlp_markings(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector uses TLP markings and that the level is appropriate.""" + sources = ctx.python_sources + + if not sources: + return [ + CheckFinding( + message="No Python source files found in src/", + severity=Severity.ERROR, + ), + ] + + results: list[CheckFinding] = [] + + # Sub-check A: Are TLP markings used at all? + if not _has_tlp_usage(sources): + results.append( + CheckFinding( + message="No TLP marking usage detected in source code", + severity=Severity.WARNING, + suggestion=( + "Use TLP markings on STIX entities via object_marking_refs. " + "Paid sources should use TLP:AMBER or TLP:RED, " + "free sources should use TLP:CLEAR." + ), + ), + ) + return results + + # TLP is used — report PASS for basic usage + results.append( + CheckFinding( + message="TLP markings are used on entities", + severity=Severity.INFO, + ), + ) + + # Sub-check B: Is the TLP level appropriate for the source type? + source_type = _get_source_type(ctx) + if source_type is None: + return results + + has_paid_level, has_free_level = _detect_tlp_levels(sources) + + if source_type == "commercial" and has_free_level and not has_paid_level: + locations = find_pattern_locations(sources, _FREE_TLP_PATTERNS) + first_loc = locations[0] if locations else None + + results.append( + CheckFinding( + message=( + "Commercial Threat Intel source uses TLP:CLEAR/WHITE " + "— expected TLP:AMBER or TLP:RED" + ), + severity=Severity.WARNING, + file_path=(first_loc[0]) if first_loc else None, + line=first_loc[1] if first_loc else None, + suggestion=( + "Paid/commercial sources should typically use TLP:AMBER " + "or TLP:RED markings, not TLP:CLEAR/WHITE." + ), + ), + ) + + elif source_type == "open_source" and has_paid_level and not has_free_level: + locations = find_pattern_locations(sources, _PAID_TLP_PATTERNS) + first_loc = locations[0] if locations else None + + results.append( + CheckFinding( + message=( + "Open Source Threat Intel source uses TLP:AMBER/RED " + "— expected TLP:CLEAR" + ), + severity=Severity.WARNING, + file_path=(first_loc[0]) if first_loc else None, + line=first_loc[1] if first_loc else None, + suggestion=( + "Open source/free sources should typically use TLP:CLEAR " + "(TLP:WHITE) markings, not TLP:AMBER/RED." + ), + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc312_cleanup_bundle.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc312_cleanup_bundle.py new file mode 100644 index 00000000000..5eeb2ded29d --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc312_cleanup_bundle.py @@ -0,0 +1,115 @@ +"""VC312 — send_stix2_bundle must use cleanup_inconsistent_bundle=True. + +Since pycti >= 6.3.3, ``send_stix2_bundle`` accepts +``cleanup_inconsistent_bundle=True`` to avoid MISSING_REFERENCE_ERROR. +All connectors calling this method must pass this parameter. +""" + +import ast +from pathlib import Path + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + + +def _find_send_bundle_calls( + tree: ast.Module, + file_path: Path, +) -> list[tuple[Path, int, bool]]: + """Find all send_stix2_bundle calls and whether cleanup_inconsistent_bundle=True. + + Matches two call patterns: + 1. self.helper.send_stix2_bundle(...) — Attribute node (method call) + 2. send_stix2_bundle(...) — Name node (bare function call) + + For each call, checks if cleanup_inconsistent_bundle=True is passed as + a keyword argument. Only checks for the literal True value (not variables + or expressions). + + Returns list of (file_path, line_no, has_cleanup_true). + """ + results: list[tuple[Path, int, bool]] = [] + + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + + # Match *.send_stix2_bundle(...) or send_stix2_bundle(...) + func_name = None + if ( + isinstance(node.func, ast.Attribute) + and node.func.attr == "send_stix2_bundle" + ) or (isinstance(node.func, ast.Name) and node.func.id == "send_stix2_bundle"): + func_name = "send_stix2_bundle" + + if func_name is None: + continue + + has_cleanup_true = False + for kw in node.keywords: + if kw.arg == "cleanup_inconsistent_bundle": + if isinstance(kw.value, ast.Constant) and kw.value.value is True: + has_cleanup_true = True + + results.append((file_path, node.lineno, has_cleanup_true)) + + return results + + +@CheckRegistry.register( + code="VC312", + name="cleanup-inconsistent-bundle", + description="send_stix2_bundle must use cleanup_inconsistent_bundle=True", + severity=Severity.ERROR, +) +def check_cleanup_inconsistent_bundle(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that all send_stix2_bundle calls pass cleanup_inconsistent_bundle=True.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + + all_calls: list[tuple[Path, int, bool]] = [] + for file_path, tree in trees.items(): + all_calls.extend(_find_send_bundle_calls(tree, file_path)) + + # No send_stix2_bundle calls found → return empty (check doesn't apply). + # Returning [] instead of FAIL because not all connectors use send_stix2_bundle + # (e.g. stream connectors, or SDK-based connectors that handle bundles internally). + if not all_calls: + return [] + + results: list[CheckFinding] = [] + for file_path, line_no, has_cleanup_true in all_calls: + if has_cleanup_true: + results.append( + CheckFinding( + message="send_stix2_bundle uses cleanup_inconsistent_bundle=True", + severity=Severity.INFO, + file_path=file_path, + line=line_no, + ), + ) + else: + results.append( + CheckFinding( + message="send_stix2_bundle missing cleanup_inconsistent_bundle=True", + severity=Severity.ERROR, + file_path=file_path, + line=line_no, + suggestion=( + "Add cleanup_inconsistent_bundle=True to send_stix2_bundle() " + "to avoid MISSING_REFERENCE_ERROR. Also ensure author and " + "markings, and any related objects are included in the bundle." + ), + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc313_pycti_generate_id.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc313_pycti_generate_id.py new file mode 100644 index 00000000000..5121171f4a5 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc313_pycti_generate_id.py @@ -0,0 +1,222 @@ +"""VC313 — STIX objects must use pycti.XXX.generate_id() for deterministic IDs. + +When creating stix2 SDO/SRO objects, the ``id`` parameter must be explicitly +set using ``pycti.XXX.generate_id(...)`` to ensure deterministic deduplication +in OpenCTI. If the connectors-sdk is used, IDs are handled automatically. + +SCOs (observables like IPv4Address, DomainName) are exempt — stix2 generates +deterministic IDs for SCOs from their contributing properties (per STIX 2.1 +spec section 2.9), so no explicit id= is needed. +""" + +import ast +from pathlib import Path + +from connector_linter.checks.vc3xx_code._helpers import ( + find_imports, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# STIX Domain Objects (SDOs) — these represent higher-level threat intel +# concepts and REQUIRE explicit id= via pycti.XXX.generate_id() because +# stix2 would otherwise generate random UUIDs, breaking deduplication. +# --------------------------------------------------------------------------- +_STIX_SDO_TYPES = frozenset( + { + "AttackPattern", + "Campaign", + "CourseOfAction", + "Grouping", + "Identity", + "Indicator", + "Infrastructure", + "IntrusionSet", + "Location", + "Malware", + "MalwareAnalysis", + "Note", + "ObservedData", + "Opinion", + "Report", + "ThreatActor", + "Tool", + "Vulnerability", + }, +) + +# --------------------------------------------------------------------------- +# STIX Relationship Objects (SROs) — also require explicit id= for the +# same deduplication reason as SDOs. +# --------------------------------------------------------------------------- +_STIX_SRO_TYPES = frozenset({"Relationship", "Sighting"}) + +# Combined set for checking +_STIX_TYPES_NEEDING_ID = _STIX_SDO_TYPES | _STIX_SRO_TYPES + +# --------------------------------------------------------------------------- +# Custom OpenCTI types — these are OpenCTI-specific extensions that also +# need explicit id= for deterministic deduplication (not part of STIX 2.1 +# but follow the same pattern). +# --------------------------------------------------------------------------- +_CUSTOM_OCTI_TYPES = frozenset( + { + "CustomObjectCaseIncident", + "CustomObjectTask", + "CustomObjectChannel", + "CustomObservableCryptocurrencyWallet", + "CustomObservableHostname", + "CustomObservableMediaContent", + "CustomObservableUserAgent", + }, +) + +_ALL_TYPES_NEEDING_ID = _STIX_TYPES_NEEDING_ID | _CUSTOM_OCTI_TYPES + + +def _get_stix2_imported_names(trees: dict[Path, ast.Module]) -> dict[Path, set[str]]: + """Get a mapping of file_path → set of stix2 type names imported. + + Tracks per-file imports so we only flag bare Name calls (e.g. Identity(...)) + when that name was actually imported from stix2 in the same file. Handles + both `from stix2 import XXX` and `from stix2.v21 import XXX`. + + Alias handling: if `from stix2 import Identity as Id`, tracks "Id" + (the alias) as the name to match against Call nodes. + """ + result: dict[Path, set[str]] = {} + for file_path, tree in trees.items(): + names: set[str] = set() + for node in ast.walk(tree): + if ( + isinstance(node, ast.ImportFrom) + and node.module + and (node.module == "stix2" or node.module.startswith("stix2.")) + ): + for alias in node.names: + if alias.name in _ALL_TYPES_NEEDING_ID: + imported_name = alias.asname or alias.name + names.add(imported_name) + result[file_path] = names + return result + + +def _find_stix_calls_without_id( + tree: ast.Module, + file_path: Path, + imported_names: set[str], +) -> list[tuple[Path, int, str]]: + """Find stix2 constructor calls that lack an explicit id= parameter. + + Checks three call patterns: + 1. stix2.XXX(...) — qualified call (e.g. stix2.Identity(...)) + 2. stix2.v21.XXX(...) — fully-qualified call (e.g. stix2.v21.Identity(...)) + 3. XXX(...) — bare call (Name node), only matched if XXX was + imported from stix2 in this file (via imported_names) + + For each matching call, checks if id= is present as a keyword argument. + + Returns list of (file_path, line_no, type_name). + """ + violations: list[tuple[Path, int, str]] = [] + + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + + type_name = None + + # Pattern 1: stix2.XXX(...) or pycti.XXX(...) + if ( + isinstance(node.func, ast.Attribute) + and isinstance(node.func.value, ast.Name) + and node.func.value.id in ("stix2", "pycti") + and node.func.attr in _ALL_TYPES_NEEDING_ID + ): + type_name = node.func.attr + + # Pattern 2: stix2.v21.XXX(...) — fully-qualified constructor + elif ( + isinstance(node.func, ast.Attribute) + and node.func.attr in _ALL_TYPES_NEEDING_ID + and isinstance(node.func.value, ast.Attribute) + and node.func.value.attr == "v21" + and isinstance(node.func.value.value, ast.Name) + and node.func.value.value.id == "stix2" + ): + type_name = node.func.attr + + # Pattern 3: XXX(...) where XXX was imported from stix2 + elif isinstance(node.func, ast.Name) and node.func.id in imported_names: + type_name = node.func.id + + if type_name is None: + continue + + # Check if id= keyword argument is present + has_id = any(kw.arg == "id" for kw in node.keywords) + if not has_id: + violations.append((file_path, node.lineno, type_name)) + + return violations + + +@CheckRegistry.register( + code="VC313", + name="pycti-generate-id", + description="STIX SDO/SRO objects must use pycti.XXX.generate_id() for deterministic IDs", + severity=Severity.ERROR, +) +def check_pycti_generate_id(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that stix2 SDO/SRO constructors include an explicit id= parameter.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + + # Track which stix2 types are imported per file + imported_names_map = _get_stix2_imported_names(trees) + + all_violations: list[tuple[Path, int, str]] = [] + for file_path, tree in trees.items(): + imported_names = imported_names_map.get(file_path, set()) + all_violations.extend( + _find_stix_calls_without_id(tree, file_path, imported_names), + ) + + if not all_violations: + # Note whether connectors-sdk is in use (its models handle IDs + # automatically, but raw stix2.* calls are still scanned above) + sdk_imports = find_imports(trees, module_pattern=r"connectors_sdk") + detail = " (connectors-sdk handles IDs for SDK models)" if sdk_imports else "" + return [ + CheckFinding( + message=f"All STIX SDO/SRO objects use explicit id= parameter{detail}", + severity=Severity.INFO, + ), + ] + + results: list[CheckFinding] = [] + for file_path, line_no, type_name in all_violations: + results.append( + CheckFinding( + message=f"stix2.{type_name}() missing explicit id= parameter", + severity=Severity.ERROR, + file_path=file_path, + line=line_no, + suggestion=( + f"Use id=pycti.{type_name}.generate_id(...) for deterministic " + f"deduplication. See https://docs.opencti.io/latest/usage/deduplication" + ), + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc314_auto_backpressure.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc314_auto_backpressure.py new file mode 100644 index 00000000000..f180cf3be10 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc314_auto_backpressure.py @@ -0,0 +1,137 @@ +"""VC314 — External-import connectors must use schedule_process or schedule_iso. + +Uses AST to detect ``while True`` loops (avoids false positives from +comments like ``# migrated from while True``). + +``self.helper.schedule_process(message_callback=..., duration_period=...)`` +or ``self.helper.schedule_iso(message_callback=..., duration_period=...)`` +provide automatic scheduling with backpressure. Manual ``while True`` / +``time.sleep`` loops should be migrated. + +Scope: EXTERNAL_IMPORT only — other connector types are event-driven. +""" + +import ast +from pathlib import Path + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + + +def _find_schedule_calls( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int, str]]: + """Find schedule_process or schedule_iso calls via AST. + + These are the two valid scheduling methods from pycti: + - schedule_process: legacy scheduler with polling interval + - schedule_iso: ISO 8601 duration-based scheduler (preferred) + Both provide automatic backpressure management. + """ + hits: list[tuple[Path, int, str]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + func = node.func + if isinstance(func, ast.Attribute) and func.attr in ( + "schedule_process", + "schedule_iso", + ): + hits.append((file_path, node.lineno, func.attr)) + return hits + + +def _find_while_true_loops( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int]]: + """Find ``while True:`` loops via AST. + + The while True + time.sleep() pattern is an anti-pattern for connectors: + it doesn't respect platform backpressure, can cause duplicate processing, + and lacks proper state management. Should be replaced with schedule_iso. + """ + hits: list[tuple[Path, int]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.While): + continue + if isinstance(node.test, ast.Constant) and node.test.value is True: + hits.append((file_path, node.lineno)) + return hits + + +@CheckRegistry.register( + code="VC314", + name="auto-backpressure", + description="External-import connectors must use schedule_process or schedule_iso", + severity=Severity.ERROR, + applicable_types={ConnectorType.EXTERNAL_IMPORT}, +) +def check_auto_backpressure(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that external-import connectors use schedule_process/schedule_iso.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + + # --------------------------------------------------------------------------- + # Detection priority: + # 1. schedule_process/schedule_iso found → PASS (proper scheduling) + # 2. while True loop found → FAIL with specific detail + # 3. Nothing found → FAIL with generic message + # --------------------------------------------------------------------------- + + # Check for either valid scheduling method (AST) + schedule_hits = _find_schedule_calls(trees) + + if schedule_hits: + first = schedule_hits[0] + return [ + CheckFinding( + message=f"Connector uses {first[2]} for scheduling", + severity=Severity.INFO, + file_path=first[0], + line=first[1], + ), + ] + + # No valid scheduling found — check for while True anti-pattern (AST) + results: list[CheckFinding] = [] + while_loops = _find_while_true_loops(trees) + for file_path, line_no in while_loops: + results.append( + CheckFinding( + message="Uses while True loop instead of scheduler", + severity=Severity.WARNING, + file_path=file_path, + line=line_no, + suggestion=( + "Replace manual while True / time.sleep loop with " + "self.helper.schedule_iso(message_callback=..., " + "duration_period=...) or schedule_process(). See PR #4227." + ), + ), + ) + + if not results: + results.append( + CheckFinding( + message="No schedule_process or schedule_iso call found", + severity=Severity.ERROR, + suggestion=( + "Use self.helper.schedule_iso(message_callback=self.process_message, " + "duration_period=self.config.connector.duration_period) for scheduling." + ), + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc315_work_initiated.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc315_work_initiated.py new file mode 100644 index 00000000000..7d58d47e6de --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc315_work_initiated.py @@ -0,0 +1,79 @@ +"""VC315 — Connector must initiate work before processing. + +External-import connectors must call +``self.helper.api.work.initiate_work(self.helper.connect_id, friendly_name)`` +to properly track work lifecycle in the OpenCTI platform. + +Scope: EXTERNAL_IMPORT only. +""" + +import re + +from connector_linter.checks.vc3xx_code._helpers import ( + find_pattern_locations, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Regex to detect .initiate_work( calls. +# +# Matches any method call ending in .initiate_work(, which is the pycti API +# for creating a new work tracking record. The leading dot ensures we match +# method calls (e.g. self.helper.api.work.initiate_work) and not unrelated +# functions that happen to contain "initiate_work" in their name. +# --------------------------------------------------------------------------- +_INITIATE_WORK = re.compile(r"\.initiate_work\s*\(") + + +# --------------------------------------------------------------------------- +# Applicable connector types. +# +# Only EXTERNAL_IMPORT connectors need to initiate work explicitly — +# other types have their work lifecycle managed by the platform or SDK. +# --------------------------------------------------------------------------- +@CheckRegistry.register( + code="VC315", + name="work-initiated", + description="Connector must call initiate_work before processing", + severity=Severity.ERROR, + applicable_types={ConnectorType.EXTERNAL_IMPORT}, +) +def check_work_initiated(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector calls initiate_work.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + # Simple presence check: scan all source files for .initiate_work( calls + locations = find_pattern_locations(sources, [_INITIATE_WORK]) + + if locations: + first = locations[0] + return [ + CheckFinding( + message="Connector calls initiate_work to track work lifecycle", + severity=Severity.INFO, + file_path=first[0], + line=first[1], + ), + ] + + return [ + CheckFinding( + message="No initiate_work call found", + severity=Severity.ERROR, + suggestion=( + "Add self.helper.api.work.initiate_work(" + "self.helper.connect_id, friendly_name) " + "before processing to track work lifecycle." + ), + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc316_work_closed.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc316_work_closed.py new file mode 100644 index 00000000000..77e23015296 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc316_work_closed.py @@ -0,0 +1,118 @@ +"""VC316 — Connector must close work with to_processed after processing. + +Uses AST to verify: +- ``to_processed()`` is called somewhere. +- ``in_error=`` is used as a keyword argument specifically in a + ``to_processed()`` call (not just any ``in_error=`` assignment). + +Scope: EXTERNAL_IMPORT only — other connector types have their work +lifecycle managed by the platform or SDK. +""" + +import ast +from pathlib import Path + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# Only EXTERNAL_IMPORT connectors need to explicitly close work + + +def _find_to_processed_calls( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int, bool]]: + """Find to_processed() calls. Returns (file, line, has_in_error_kwarg). + + Matches the Attribute pattern only (e.g. self.helper.api.work.to_processed(...)) + since to_processed is always called as a method. Also detects whether the + in_error= keyword argument is used, which signals proper error handling + during work closure. + """ + hits: list[tuple[Path, int, bool]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + func = node.func + # Match *.to_processed(...) method calls + if isinstance(func, ast.Attribute) and func.attr == "to_processed": + # Check if in_error= keyword argument is present + has_in_error = any(kw.arg == "in_error" for kw in node.keywords) + hits.append((file_path, node.lineno, has_in_error)) + return hits + + +@CheckRegistry.register( + code="VC316", + name="work-closed", + description="Connector must close work with to_processed after processing", + severity=Severity.ERROR, + applicable_types={ConnectorType.EXTERNAL_IMPORT}, +) +def check_work_closed(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector calls to_processed to close work.""" + sources = ctx.python_sources + + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + calls = _find_to_processed_calls(trees) + + # --------------------------------------------------------------------------- + # Two sub-checks: + # 1. to_processed exists (ERROR if missing) — work must be closed + # 2. in_error= is used (WARNING if missing) — proper error signaling + # --------------------------------------------------------------------------- + + if not calls: + return [ + CheckFinding( + message="No to_processed call found — work is never closed", + severity=Severity.ERROR, + suggestion=( + "Add self.helper.api.work.to_processed(work_id, message) " + "to close work after processing. Use in_error=True on " + "exception or interruption." + ), + ), + ] + + results: list[CheckFinding] = [] + first = calls[0] + + # Sub-check 1: to_processed exists → PASS + results.append( + CheckFinding( + message="Connector closes work with to_processed", + severity=Severity.INFO, + file_path=first[0], + line=first[1], + ), + ) + + # Sub-check 2: check if any to_processed call uses in_error= kwarg + # (WARNING severity — not blocking, but important for proper error reporting) + has_error_handling = any(has_in_error for _, _, has_in_error in calls) + if not has_error_handling: + results.append( + CheckFinding( + message="to_processed never uses in_error=True for error handling", + severity=Severity.WARNING, + file_path=ctx.path / first[0], + line=first[1], + suggestion=( + "On exception or CTRL+C, close work with " + "to_processed(work_id, message, in_error=True) " + "to properly signal errors to the platform." + ), + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc317_initiate_work_conditional.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc317_initiate_work_conditional.py new file mode 100644 index 00000000000..76cb4197083 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc317_initiate_work_conditional.py @@ -0,0 +1,151 @@ +"""VC317 — initiate_work should only be called when data is available. + +Creating jobs with zero bundles clutters the UI and confuses users. +``initiate_work`` should be guarded by a condition that verifies data +availability *before* creating the work. + +Scope: EXTERNAL_IMPORT, INTERNAL_ENRICHMENT +Severity: WARNING (heuristic — hard to prove statically) +""" + +import ast +from typing import TYPE_CHECKING + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, +) +from connector_linter.registry import CheckRegistry + +if TYPE_CHECKING: + from pathlib import Path + +# Only external-import and internal-enrichment connectors create "work" items. +# Stream connectors use a different event model and never call initiate_work. + + +# --------------------------------------------------------------------------- +# AST NodeVisitor: track initiate_work calls and whether they are guarded +# +# Uses a depth counter (_if_depth) to detect if the call site is nested +# inside at least one ``if`` block. The visitor increments the counter on +# entering an ``if`` and decrements it on exit, so any Call node found +# with _if_depth > 0 is considered "conditional". +# +# This is a heuristic — it cannot prove the condition actually checks data +# availability — but it catches the most common anti-pattern of calling +# initiate_work at the top level with no guard at all. +# --------------------------------------------------------------------------- +class _InitiateWorkVisitor(ast.NodeVisitor): + """Collect initiate_work calls and whether they are inside an ``if``.""" + + def __init__(self) -> None: + self.calls: list[tuple[int, bool]] = [] # (lineno, is_conditional) + # Depth counter: >0 means we are inside at least one ``if`` block + self._if_depth = 0 + + def visit_If(self, node: ast.If) -> None: + # Increment depth before visiting children, decrement after. + # This way any Call node encountered while depth > 0 is conditional. + self._if_depth += 1 + self.generic_visit(node) + self._if_depth -= 1 + + def visit_Call(self, node: ast.Call) -> None: + # Record every initiate_work call with its conditionality status + if self._is_initiate_work(node): + self.calls.append((node.lineno, self._if_depth > 0)) + self.generic_visit(node) + + @staticmethod + def _is_initiate_work(node: ast.Call) -> bool: + """Match ``*.initiate_work(...)``.""" + # --------------------------------------------------------------------------- + # AST pattern: *.initiate_work(...) + # + # Matches any method call where the attribute name is "initiate_work": + # self.helper.initiate_work(...) — typical usage + # helper.initiate_work(...) — also matched + # We only check the Attribute node, not the receiver, to stay flexible. + # --------------------------------------------------------------------------- + func = node.func + return bool(isinstance(func, ast.Attribute) and func.attr == "initiate_work") + + +@CheckRegistry.register( + code="VC317", + name="initiate-work-conditional", + description=( + "initiate_work should only be called when data is available " + "(never create empty jobs)" + ), + severity=Severity.WARNING, + applicable_types={ConnectorType.EXTERNAL_IMPORT, ConnectorType.INTERNAL_ENRICHMENT}, +) +def check_initiate_work_conditional(ctx: ConnectorContext) -> list[CheckFinding]: + """Warn if initiate_work is called unconditionally (outside any ``if``).""" + sources = ctx.python_sources + if not sources: + return [] + + trees = ctx.python_trees + if not trees: + return [] + + # Partition all initiate_work calls into unconditional (top-level) and + # conditional (inside at least one ``if`` block). + unconditional: list[tuple[Path, int]] = [] + conditional: list[tuple[Path, int]] = [] + + for filepath, tree in trees.items(): + visitor = _InitiateWorkVisitor() + visitor.visit(tree) + for lineno, is_cond in visitor.calls: + if is_cond: + conditional.append((filepath, lineno)) + else: + unconditional.append((filepath, lineno)) + + if not unconditional and not conditional: + # No initiate_work found anywhere — VC315 already flags that case, + # so we return an empty list to avoid duplicate findings. + return [] + + results: list[CheckFinding] = [] + + # Unconditional initiate_work is the anti-pattern: if no data was fetched, + # an empty work item clutters the OpenCTI jobs UI. We report it as + # passed=True with a WARNING because this is heuristic-based. + if unconditional: + first = unconditional[0] + results.append( + CheckFinding( + message=( + "initiate_work is called unconditionally — " + "may create empty jobs when no data is available" + ), + severity=Severity.WARNING, + file_path=first[0], + line=first[1], + suggestion=( + "Guard initiate_work with a condition that checks data " + "availability first. Only create a work when the bundle " + "will contain objects. Track " + "'last_run_end_datetime_with_ingested_data' in state." + ), + ), + ) + else: + first = conditional[0] + results.append( + CheckFinding( + message="initiate_work is guarded by a condition", + severity=Severity.INFO, + file_path=first[0], + line=first[1], + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc318_helper_listen.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc318_helper_listen.py new file mode 100644 index 00000000000..481a3355adb --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc318_helper_listen.py @@ -0,0 +1,103 @@ +"""VC318 — Internal-enrichment connectors must use helper.listen(). + +Uses AST to verify the call is ``self.helper.listen()`` specifically, +avoiding false positives from other ``.listen()`` calls (e.g. sockets). + +Scope: INTERNAL_ENRICHMENT only. +""" + +import ast +from pathlib import Path + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Scope: INTERNAL_ENRICHMENT only. +# +# Enrichment connectors are event-driven — they react to platform events +# (e.g. "enrich this indicator") by registering a callback via +# self.helper.listen(message_callback=self.process_message). +# +# This is distinct from VC323 which checks helper.listen_stream() for +# STREAM connectors. listen() and listen_stream() are different methods: +# - listen() → enrichment, receives entity-level events +# - listen_stream() → stream, receives the full event stream +# --------------------------------------------------------------------------- + + +def _find_helper_listen_calls( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int]]: + """Find ``*.helper.listen(...)`` calls using AST.""" + hits: list[tuple[Path, int]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + func = node.func + # First level: the called method must be .listen() + if not isinstance(func, ast.Attribute) or func.attr != "listen": + continue + # --------------------------------------------------------------------------- + # Second level: verify the receiver is "helper" to avoid false positives + # + # Two patterns are accepted: + # 1. self.helper.listen(...) — Attribute chain: receiver is + # an ast.Attribute with attr == "helper" + # 2. helper.listen(...) — bare name: receiver is + # an ast.Name with id == "helper" + # + # This prevents matching unrelated .listen() calls (e.g. sockets). + # --------------------------------------------------------------------------- + receiver = func.value + if (isinstance(receiver, ast.Attribute) and receiver.attr == "helper") or ( + isinstance(receiver, ast.Name) and receiver.id == "helper" + ): + hits.append((file_path, node.lineno)) + return hits + + +@CheckRegistry.register( + code="VC318", + name="helper-listen", + description="Internal-enrichment connectors must use helper.listen()", + severity=Severity.ERROR, + applicable_types={ConnectorType.INTERNAL_ENRICHMENT}, +) +def check_helper_listen(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector uses helper.listen().""" + sources = ctx.python_sources + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + locations = _find_helper_listen_calls(trees) + + if not locations: + return [ + CheckFinding( + message="helper.listen() not found — enrichment connector is not event-driven", + severity=Severity.ERROR, + suggestion=( + "Add self.helper.listen(message_callback=self.process_message) " + "in the run method to listen for platform events." + ), + ), + ] + + first = locations[0] + return [ + CheckFinding( + message="Connector uses helper.listen() for event-driven processing", + severity=Severity.INFO, + file_path=first[0], + line=first[1], + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc319_scope_fallback.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc319_scope_fallback.py new file mode 100644 index 00000000000..c51936d4eb6 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc319_scope_fallback.py @@ -0,0 +1,126 @@ +"""VC319 — Enrichment connector must return original bundle when not in scope. + +When an enrichment connector is triggered by a playbook for an entity type +outside its scope, it **must** send the original bundle back unchanged so the +playbook can continue. Detection relies on the ``event_type`` field in data: +if absent, the trigger came from a playbook. + +Scope: INTERNAL_ENRICHMENT only. +Severity: WARNING (best practice, not yet universally adopted). +""" + +import re + +from connector_linter.checks.vc3xx_code._helpers import ( + find_pattern_locations, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Regex: detect access to the "event_type" key in source code. +# +# Matches string literals 'event_type' or "event_type". When an enrichment +# connector is triggered by a playbook, the data dict does NOT contain an +# event_type field. Checking for its presence lets the connector +# distinguish between "manual enrichment" (has event_type) and "playbook +# trigger" (no event_type). +# --------------------------------------------------------------------------- +_EVENT_TYPE_PATTERN = re.compile(r"""['"](event_type)['"]""") + +# --------------------------------------------------------------------------- +# Regex: detect access to data['stix_objects'] or data["stix_objects"]. +# +# The stix_objects field contains the original STIX bundle that was passed +# to the enrichment connector. Reading it is a prerequisite for returning +# the bundle unchanged when the entity is out of scope. +# --------------------------------------------------------------------------- +_STIX_OBJECTS_DATA = re.compile(r"""data\s*\[\s*['"]stix_objects['"]\s*\]""") + +# Only enrichment connectors receive playbook triggers with scope concerns + + +@CheckRegistry.register( + code="VC319", + name="scope-fallback-bundle", + description=( + "Enrichment connector must return original bundle when entity is " + "not in scope (playbook compatibility)" + ), + severity=Severity.WARNING, + applicable_types={ConnectorType.INTERNAL_ENRICHMENT}, +) +def check_scope_fallback(ctx: ConnectorContext) -> list[CheckFinding]: + """Warn if the connector does not handle out-of-scope playbook triggers. + + Severity is WARNING because this is a best practice that is not yet + universally adopted across the connector ecosystem. When an enrichment + connector receives a playbook trigger for an entity type it does not + handle (e.g. a "Domain Name" enricher gets an "IPv4"), it should send + the original bundle back unchanged so the playbook pipeline can continue. + """ + sources = ctx.python_sources + if not sources: + return [] + + # Detection flow: + # 1. First check stix_objects — reading it is required to be able to + # return the original bundle. If missing, we fail early. + # 2. Then check event_type — its absence in the data dict signals a + # playbook trigger vs. a manual enrichment request. + event_type_locs = find_pattern_locations(sources, [_EVENT_TYPE_PATTERN]) + stix_objects_locs = find_pattern_locations(sources, [_STIX_OBJECTS_DATA]) + + results: list[CheckFinding] = [] + + # If stix_objects is never read, the connector cannot return the original + # bundle — flag this first (more fundamental issue than missing event_type). + if not stix_objects_locs: + results.append( + CheckFinding( + message=( + "data['stix_objects'] is never read — original bundle " + "cannot be returned for out-of-scope playbook triggers" + ), + severity=Severity.WARNING, + suggestion=( + "Read data['stix_objects'] and send the original bundle " + "back unchanged when the entity is not in scope. " + "Check data.get('event_type') to detect playbook triggers." + ), + ), + ) + return results + + if not event_type_locs: + results.append( + CheckFinding( + message=( + "No event_type check found — out-of-scope playbook " + "triggers may not return the original bundle" + ), + severity=Severity.WARNING, + suggestion=( + "Use data.get('event_type') to detect playbook triggers. " + "When not in scope and no event_type, send back the " + "original stix_objects bundle unchanged." + ), + ), + ) + else: + first = event_type_locs[0] + results.append( + CheckFinding( + message="Connector handles event_type for playbook compatibility", + severity=Severity.INFO, + file_path=first[0], + line=first[1], + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc320_tlp_access_control.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc320_tlp_access_control.py new file mode 100644 index 00000000000..be1494e8a15 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc320_tlp_access_control.py @@ -0,0 +1,237 @@ +"""VC320 — Enrichment connector must enforce TLP access control. + +Before processing an entity, the connector must extract its TLP marking +from ``objectMarking``, validate it against the configured ``max_tlp``, +and reject processing if the TLP is too high. This prevents data +leakage — e.g. sending TLP:RED data on a TLP:CLEAR platform. + +Distinct from VC304 (which checks that ``check_max_tlp`` is called): +VC320 verifies the **complete access-control flow**: + 1. Extract TLP from ``objectMarking`` + 2. Call ``check_max_tlp`` + 3. Reject (raise) when invalid + +Scope: INTERNAL_ENRICHMENT only. +Severity: ERROR. +""" + +import ast +from typing import TYPE_CHECKING, cast + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +if TYPE_CHECKING: + from pathlib import Path + +# Only enrichment connectors need TLP access control — they receive +# individual entities from the platform and must not leak sensitive data. + + +# --------------------------------------------------------------------------- +# Step 1 helper: detect objectMarking access +# +# The TLP marking is stored in the entity's "objectMarking" field. +# Two AST patterns are matched: +# - ast.Constant with value "objectMarking" → dict subscript access +# e.g. entity["objectMarking"] +# - ast.Attribute with attr "objectMarking" → attribute access +# e.g. entity.objectMarking +# --------------------------------------------------------------------------- +def _has_object_marking_access(tree: ast.Module) -> int | None: + """Check if objectMarking is accessed (subscript or attribute).""" + for node in ast.walk(tree): + if isinstance(node, ast.Constant) and node.value == "objectMarking": + return node.lineno + if isinstance(node, ast.Attribute) and node.attr == "objectMarking": + return node.lineno + return None + + +# --------------------------------------------------------------------------- +# Step 2 helper: detect check_max_tlp() call +# +# Two call patterns are matched: +# - self.helper.check_max_tlp(...) → Attribute node with attr "check_max_tlp" +# - check_max_tlp(...) → bare Name node (imported function) +# --------------------------------------------------------------------------- +def _has_check_max_tlp(tree: ast.Module) -> int | None: + """Check if check_max_tlp() is called.""" + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + func = node.func + if isinstance(func, ast.Attribute) and func.attr == "check_max_tlp": + return node.lineno + if isinstance(func, ast.Name) and func.id == "check_max_tlp": + return node.lineno + return None + + +# --------------------------------------------------------------------------- +# Step 3 helper: detect raise after TLP check +# +# Scoped to FunctionDef/AsyncFunctionDef to ensure the raise and the +# check_max_tlp call are in the same function (not unrelated code). +# We walk children of each function looking for BOTH a check_max_tlp call +# AND a raise statement. If the same function contains both, the connector +# is considered to reject invalid TLP. +# --------------------------------------------------------------------------- +def _has_raise_after_tlp_check(tree: ast.Module) -> int | None: + """Check for a raise statement that could reject invalid TLP. + + Looks for raise inside an if block that follows or contains check_max_tlp. + Also matches raise inside a function that contains check_max_tlp. + """ + for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + has_tlp_check = False + has_raise = False + raise_line = None + for child in ast.walk(node): + if isinstance(child, ast.Call): + func = child.func + if ( + isinstance(func, ast.Attribute) and func.attr == "check_max_tlp" + ) or (isinstance(func, ast.Name) and func.id == "check_max_tlp"): + has_tlp_check = True + if isinstance(child, ast.Raise): + has_raise = True + raise_line = child.lineno + if has_tlp_check and has_raise: + return raise_line + return None + + +@CheckRegistry.register( + code="VC320", + name="tlp-access-control", + description=( + "Enrichment connector must enforce TLP access control: " + "extract objectMarking, check_max_tlp, reject if invalid" + ), + severity=Severity.ERROR, + applicable_types={ConnectorType.INTERNAL_ENRICHMENT}, +) +def check_tlp_access_control(ctx: ConnectorContext) -> list[CheckFinding]: + """Verify the complete TLP access-control flow.""" + sources = ctx.python_sources + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + + # --------------------------------------------------------------------------- + # 3-step TLP access control verification: + # Step 1: Extract — objectMarking is accessed to get the entity's TLP + # Step 2: Validate — check_max_tlp() is called to compare against max + # Step 3: Reject — a raise statement prevents processing if TLP too high + # + # Distinct from VC304 which only checks that check_max_tlp is called. + # VC320 verifies the COMPLETE access-control flow end-to-end. + # --------------------------------------------------------------------------- + + # Step 1: objectMarking access + marking_line = None + marking_file: Path | None = None + for fp, tree in trees.items(): + line = _has_object_marking_access(tree) + if line is not None: + marking_line = line + marking_file = fp + break + + if marking_line is None: + return [ + CheckFinding( + message=( + "objectMarking is never accessed — TLP of incoming " + "entities is not extracted" + ), + severity=Severity.ERROR, + suggestion=( + "Extract TLP from opencti_entity['objectMarking'], " + "call self.helper.check_max_tlp(tlp, max_tlp), " + "and raise if the TLP exceeds the configured maximum. " + "This prevents leaking paid TLP:RED data on a TLP:CLEAR platform." + ), + ), + ] + # cast() is needed for mypy: marking_file is Path at this point + # (we returned [] above if None), but mypy can't infer that. + marking_file = cast("Path", marking_file) # for mypy + + # Step 2: check_max_tlp call + check_line = None + check_file = None + for fp, tree in trees.items(): + line = _has_check_max_tlp(tree) + if line is not None: + check_line = line + check_file = fp + break + + if check_line is None: + return [ + CheckFinding( + message=( + "objectMarking is read but check_max_tlp is never " + "called — TLP is not enforced" + ), + severity=Severity.ERROR, + file_path=marking_file, + line=marking_line, + suggestion=( + "After extracting TLP from objectMarking, call " + "self.helper.check_max_tlp(tlp, max_tlp) and raise " + "an error if the entity's TLP exceeds the maximum." + ), + ), + ] + + # cast() for mypy: same pattern as marking_file above + check_file = cast("Path", check_file) # for mypy + + # Step 3: reject (raise) when invalid + results: list[CheckFinding] = [] + reject_line = None + for fp, tree in trees.items(): + line = _has_raise_after_tlp_check(tree) + if line is not None: + reject_line = line + break + + if reject_line is None: + results.append( + CheckFinding( + message=( + "check_max_tlp is called but no raise found — " + "invalid TLP may not be rejected" + ), + severity=Severity.WARNING, + file_path=check_file, + line=check_line, + suggestion=( + "After check_max_tlp returns False, raise an error " + "to prevent processing entities with TLP exceeding max." + ), + ), + ) + else: + results.append( + CheckFinding( + message="TLP access control is enforced (objectMarking + check_max_tlp + reject)", + severity=Severity.INFO, + file_path=check_file, + line=check_line, + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc321_playbook_compatible.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc321_playbook_compatible.py new file mode 100644 index 00000000000..40d0912f8c2 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc321_playbook_compatible.py @@ -0,0 +1,168 @@ +"""VC321 — Enrichment connector must be playbook-compatible. + +Uses AST to verify ``playbook_compatible=True`` is a keyword argument +in a constructor call (avoids false positives from comments/docstrings). + +Sub-check A: ``playbook_compatible=True`` is set (ERROR). +Sub-check B: ``send_stix2_bundle`` is called (WARNING — bundle-based flow). + +Scope: INTERNAL_ENRICHMENT only. +""" + +import ast +from pathlib import Path + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# Only enrichment connectors participate in playbook automation. +# External-import connectors are scheduled, not triggered by playbooks. + + +# --------------------------------------------------------------------------- +# AST helper: find playbook_compatible= keyword argument +# +# Walks every Call node and inspects its keyword arguments. When a keyword +# named "playbook_compatible" is found, we check whether its value is the +# literal True (ast.Constant with value True). +# +# Returns tuples of (file, line, is_true) — is_true distinguishes between +# playbook_compatible=True (correct) and playbook_compatible=False or +# playbook_compatible=some_var (incorrect / ambiguous). +# --------------------------------------------------------------------------- +def _find_playbook_compatible_kwarg( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int, bool]]: + """Find playbook_compatible= keyword args. Returns (file, line, is_true).""" + hits: list[tuple[Path, int, bool]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + for kw in node.keywords: + if kw.arg == "playbook_compatible": + # Check if the value is literally True (ast.Constant) + is_true = ( + isinstance(kw.value, ast.Constant) and kw.value.value is True + ) + hits.append((file_path, node.lineno, is_true)) + return hits + + +# --------------------------------------------------------------------------- +# AST helper: find send_stix2_bundle() calls +# +# Playbook-compatible connectors must send bundles so the playbook pipeline +# can forward enriched data to the next step. This detects *.send_stix2_bundle() +# method calls (Attribute node with attr "send_stix2_bundle"). +# --------------------------------------------------------------------------- +def _find_send_bundle_calls( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int]]: + """Find send_stix2_bundle() calls via AST.""" + hits: list[tuple[Path, int]] = [] + for file_path, tree in trees.items(): + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + func = node.func + if isinstance(func, ast.Attribute) and func.attr == "send_stix2_bundle": + hits.append((file_path, node.lineno)) + return hits + + +@CheckRegistry.register( + code="VC321", + name="playbook-compatible", + description="Enrichment connector must be playbook-compatible", + severity=Severity.ERROR, + applicable_types={ConnectorType.INTERNAL_ENRICHMENT}, +) +def check_playbook_compatible(ctx: ConnectorContext) -> list[CheckFinding]: + """Verify playbook compatibility.""" + sources = ctx.python_sources + if not sources: + return [no_python_sources_finding()] + + trees = ctx.python_trees + results: list[CheckFinding] = [] + + # --------------------------------------------------------------------------- + # Sub-check A: playbook_compatible=True keyword argument + # + # Three outcomes: + # 1. Found with value True → PASS + # 2. Found with value != True (False, variable, etc.) → FAIL + # 3. Not found at all → FAIL (missing from OpenCTIConnectorHelper call) + # --------------------------------------------------------------------------- + pb_hits = _find_playbook_compatible_kwarg(trees) + + if pb_hits: + first = pb_hits[0] + if first[2]: # is_true + results.append( + CheckFinding( + message="playbook_compatible=True is set", + severity=Severity.INFO, + file_path=first[0], + line=first[1], + ), + ) + else: + results.append( + CheckFinding( + message=( + "playbook_compatible is set but not to True — " + "playbook automation will not work" + ), + severity=Severity.ERROR, + file_path=first[0], + line=first[1], + suggestion="Set playbook_compatible=True in OpenCTIConnectorHelper().", + ), + ) + else: + results.append( + CheckFinding( + message="playbook_compatible is not set in OpenCTIConnectorHelper()", + severity=Severity.ERROR, + suggestion=( + "Add playbook_compatible=True to your " + "OpenCTIConnectorHelper() call. Ensure the connector " + "sends a bundle back in all paths (success, not-in-scope, error)." + ), + ), + ) + + # --------------------------------------------------------------------------- + # Sub-check B: send_stix2_bundle() is called + # + # Severity is WARNING (not ERROR) because the bundle-based flow is the + # ideal pattern for playbook compatibility, but some connectors may use + # alternative methods. Missing send_stix2_bundle is a strong signal + # the connector won't forward enriched data through playbooks. + # --------------------------------------------------------------------------- + bundle_hits = _find_send_bundle_calls(trees) + if not bundle_hits: + results.append( + CheckFinding( + message=( + "send_stix2_bundle is never called — playbook cannot " + "forward the enriched bundle" + ), + severity=Severity.WARNING, + suggestion=( + "Use self.helper.send_stix2_bundle() to send enriched " + "data. The bundle must be sent in all paths: success, " + "not-in-scope (original bundle), and error (original bundle)." + ), + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc322_former_bundle.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc322_former_bundle.py new file mode 100644 index 00000000000..c182c6cfb70 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc322_former_bundle.py @@ -0,0 +1,90 @@ +"""VC322 — Enrichment connector must read the former bundle from data. + +For playbook compatibility the connector must read ``data["stix_objects"]`` +(the original bundle) early in ``process_message`` so it can be returned +in **every** code path: success, not-in-scope, and error. + +Scope: INTERNAL_ENRICHMENT only. +Severity: ERROR. +""" + +import re + +from connector_linter.checks.vc3xx_code._helpers import ( + find_pattern_locations, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Regex: detect access to data['stix_objects'] or data["stix_objects"] +# +# Matches patterns like: +# data['stix_objects'] +# data["stix_objects"] +# data [ 'stix_objects' ] (with optional whitespace) +# +# The stix_objects field in the incoming data dict contains the original +# STIX bundle that the platform sent to the enrichment connector. +# Reading it early in process_message is essential so it can be returned +# unchanged in all 3 code paths: +# 1. Success — enriched objects appended to original bundle +# 2. Not-in-scope — original bundle returned as-is for playbook continuity +# 3. Error — original bundle returned so the playbook is not broken +# --------------------------------------------------------------------------- +_STIX_OBJECTS_DATA = re.compile(r"""data\s*\[\s*['"]stix_objects['"]\s*\]""") + +# Only enrichment connectors receive data['stix_objects'] from the platform. +# External-import connectors fetch data themselves and don't receive bundles. + + +@CheckRegistry.register( + code="VC322", + name="former-bundle-read", + description=( + "Enrichment connector must read data['stix_objects'] " + "(former bundle) for playbook compatibility" + ), + severity=Severity.ERROR, + applicable_types={ConnectorType.INTERNAL_ENRICHMENT}, +) +def check_former_bundle(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that data['stix_objects'] is read.""" + sources = ctx.python_sources + if not sources: + return [no_python_sources_finding()] + + locations = find_pattern_locations(sources, [_STIX_OBJECTS_DATA]) + + if not locations: + return [ + CheckFinding( + message=( + "data['stix_objects'] is never read — the original " + "bundle cannot be returned on error or not-in-scope" + ), + severity=Severity.ERROR, + suggestion=( + "Read the former bundle with " + "stix_objects = data['stix_objects'] early in " + "process_message. Send it back unchanged on error " + "or when entity is not in scope (playbook compatibility)." + ), + ), + ] + + first = locations[0] + return [ + CheckFinding( + message="Former bundle is read from data['stix_objects']", + severity=Severity.INFO, + file_path=first[0], + line=first[1], + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc323_helper_listen_stream.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc323_helper_listen_stream.py new file mode 100644 index 00000000000..d0d09cb0916 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc323_helper_listen_stream.py @@ -0,0 +1,78 @@ +"""VC323 — Stream connectors must use helper.listen_stream(). + +Stream connectors receive live events from the platform via +``self.helper.listen_stream(message_callback=self.process_message)``. + +Scope: STREAM only. +""" + +import re + +from connector_linter.checks.vc3xx_code._helpers import ( + find_pattern_locations, +) +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + ConnectorType, + Severity, + no_python_sources_finding, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Regex: detect .listen_stream( calls in source code +# +# Matches the method call pattern: +# .listen_stream( — with optional whitespace before the paren +# +# This is the stream-specific counterpart to VC318's helper.listen(): +# - VC318 checks helper.listen() → enrichment connectors (event-driven) +# - VC323 checks helper.listen_stream() → stream connectors (live event stream) +# +# Stream connectors receive the full event stream from the OpenCTI platform +# (create, update, delete events) via listen_stream, whereas enrichment +# connectors receive individual entity enrichment requests via listen. +# --------------------------------------------------------------------------- +_LISTEN_STREAM = re.compile(r"""\.listen_stream\s*\(""") + +# Only STREAM type connectors use listen_stream. +# Enrichment connectors use helper.listen() (checked by VC318). + + +@CheckRegistry.register( + code="VC323", + name="helper-listen-stream", + description="Stream connectors must use helper.listen_stream()", + severity=Severity.ERROR, + applicable_types={ConnectorType.STREAM}, +) +def check_helper_listen_stream(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector uses helper.listen_stream().""" + sources = ctx.python_sources + if not sources: + return [no_python_sources_finding()] + + locations = find_pattern_locations(sources, [_LISTEN_STREAM]) + + if not locations: + return [ + CheckFinding( + message="helper.listen_stream() not found — stream connector is not receiving events", + severity=Severity.ERROR, + suggestion=( + "Add self.helper.listen_stream(message_callback=self.process_message) " + "in the run method to receive live events from the platform." + ), + ), + ] + + first = locations[0] + return [ + CheckFinding( + message="Connector uses helper.listen_stream() for event processing", + severity=Severity.INFO, + file_path=first[0], + line=first[1], + ), + ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc324_relationship_start_stop.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc324_relationship_start_stop.py new file mode 100644 index 00000000000..a154f21d370 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc324_relationship_start_stop.py @@ -0,0 +1,203 @@ +"""VC324 — Relationship objects should not set both start_time and stop_time. + +When a STIX Relationship object includes both ``start_time`` and ``stop_time``, +OpenCTI's deduplication uses the time window as part of the relationship's +identity. This means that different time ranges between the same source and +target create **distinct** relationship objects rather than merging into one. + +Connectors that set wide or varying time spans can therefore produce many +duplicate-looking relationships in the platform, inflating the knowledge graph +and causing confusion during analysis. + +**Recommendation:** + +- If temporal context matters, use only ``start_time`` (omit ``stop_time``). +- If the relationship is not time-bound, omit both properties entirely. +- If you absolutely need both, ensure the time window is narrow and intentional. + +This check detects direct ``Relationship()`` / ``stix2.Relationship()`` / +``stix2.v21.Relationship()`` constructor calls that pass both ``start_time=`` +and ``stop_time=`` as keyword arguments. + +**Limitations:** + +- Does not detect ``start_time`` / ``stop_time`` set inside a + ``custom_properties`` dict. +- Does not resolve aliases (e.g. ``Rel = stix2.Relationship``). + +Severity: WARNING (the pattern is valid STIX 2.1 but can cause unintended +relationship duplication). +Scope: Common (all connector types). + +Reference: + STIX 2.1 §5.1 — Relationship Object + https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html#_2i4bto1y4jwa +""" + +import ast +from pathlib import Path + +from connector_linter.models import ( + CheckFinding, + ConnectorContext, + Severity, +) +from connector_linter.registry import CheckRegistry + +# --------------------------------------------------------------------------- +# Names that identify a Relationship constructor call. +# +# We check for: +# - Relationship(...) → bare name (imported from stix2) +# - stix2.Relationship(...) → qualified name +# - stix2.v21.Relationship(...) → fully-qualified name (less common) +# --------------------------------------------------------------------------- +_RELATIONSHIP_NAMES = {"Relationship"} + + +def _is_relationship_call(node: ast.Call) -> bool: + """Return True if ``node`` is a call to a Relationship constructor. + + Matches three patterns: + + 1. ``Relationship(...)`` — bare imported name + 2. ``stix2.Relationship(...)`` — module-qualified + 3. ``stix2.v21.Relationship(...)`` — fully-qualified (rare) + """ + func = node.func + + # Pattern 1: Relationship(...) + if isinstance(func, ast.Name) and func.id in _RELATIONSHIP_NAMES: + return True + + # Pattern 2: stix2.Relationship(...) + if ( + isinstance(func, ast.Attribute) + and func.attr in _RELATIONSHIP_NAMES + and isinstance(func.value, ast.Name) + and func.value.id == "stix2" + ): + return True + + # Pattern 3: stix2.v21.Relationship(...) + if ( + isinstance(func, ast.Attribute) + and func.attr in _RELATIONSHIP_NAMES + and isinstance(func.value, ast.Attribute) + and func.value.attr == "v21" + and isinstance(func.value.value, ast.Name) + and func.value.value.id == "stix2" + ): + return True + + return False + + +def _has_both_start_stop_kwargs(node: ast.Call) -> bool: + """Check if a Call node has both ``start_time=`` and ``stop_time=`` kwargs. + + This covers the most common pattern: + + Relationship( + ..., + start_time="2020-01-01T00:00:00Z", + stop_time="2024-01-01T00:00:00Z", + ) + """ + kwarg_names = {kw.arg for kw in node.keywords if kw.arg is not None} + return "start_time" in kwarg_names and "stop_time" in kwarg_names + + +def _find_relationship_with_start_stop( + trees: dict[Path, ast.Module], +) -> list[tuple[Path, int]]: + """Walk all ASTs to find Relationship() calls with both start_time and stop_time. + + Returns a list of (file_path, line_number) for each violation found. + """ + hits: list[tuple[Path, int]] = [] + + for file_path, tree in trees.items(): + for node in ast.walk(tree): + # We only care about function/constructor calls + if not isinstance(node, ast.Call): + continue + + # Skip calls that are not Relationship constructors + if not _is_relationship_call(node): + continue + + # Check if both start_time= and stop_time= are present + if _has_both_start_stop_kwargs(node): + hits.append((file_path, node.lineno)) + + return hits + + +# --------------------------------------------------------------------------- +# Registry entry +# --------------------------------------------------------------------------- + + +@CheckRegistry.register( + code="VC324", + name="relationship-start-stop-time", + description=( + "Relationship should not set both start_time and stop_time " + "(can overload Redis with time-bucketed duplicates)" + ), + severity=Severity.WARNING, +) +def check_relationship_start_stop_time(ctx: ConnectorContext) -> list[CheckFinding]: + """Warn when Relationship objects use both start_time and stop_time. + + Setting both properties causes OpenCTI to create a separate relationship + for each time bucket in the [start_time, stop_time] range. For long spans, + this can generate thousands of Redis entries and degrade platform performance. + """ + sources = ctx.python_sources + + if not sources: + return [ + CheckFinding( + message="No Python source files found in src/", + severity=Severity.ERROR, + ), + ] + + trees = ctx.python_trees + + # Find all Relationship() calls with both start_time and stop_time + hits = _find_relationship_with_start_stop(trees) + + if not hits: + return [ + CheckFinding( + message="No Relationship with both start_time and stop_time ✓", + severity=Severity.INFO, + ), + ] + + # Report each occurrence as a warning + results: list[CheckFinding] = [] + for file_path, line_no in hits: + results.append( + CheckFinding( + message=( + f"{file_path}:{line_no}: Relationship() sets both " + f"start_time and stop_time" + ), + severity=Severity.WARNING, # WARNING-level: advisory, not a blocker + file_path=file_path, + line=line_no, + suggestion=( + "Setting both start_time and stop_time on a Relationship " + "causes OpenCTI to create one object per time bucket in " + "that range — potentially thousands of Redis entries. " + "Use only start_time (omit stop_time) or remove both " + "if the relationship is not time-bound." + ), + ), + ) + + return results diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc325_minimal_settings_tests.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc325_minimal_settings_tests.py new file mode 100644 index 00000000000..7076ec16f41 --- /dev/null +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc325_minimal_settings_tests.py @@ -0,0 +1,247 @@ +"""VC325 — Connector must have minimal settings tests. + +Verified connectors must include at minimum a settings test that: + +1. Exercises ``ConnectorSettings`` with **valid** input (all required fields + present) and asserts the object is created successfully. +2. Exercises ``ConnectorSettings`` with **invalid** input (missing a required + field) and asserts ``ConfigValidationError`` is raised. + +This baseline matches the pattern established in the connector templates +(``templates//tests/tests_connector/test_settings.py``). + +Detection is AST-based: +- A settings test file is one whose name contains "settings" or that + contains an ``ImportFrom`` of ``ConnectorSettings``. +- Valid-input coverage is detected by finding an ``Assign`` or ``AnnAssign`` + whose right-hand side is a ``Call`` to a class whose name ends in + ``"Settings"`` — i.e., ``settings = FakeConnectorSettings()``. +- Error-input coverage is detected by finding a ``pytest.raises(...)`` call. + +Missing a settings test file is a WARNING (tests exist, just not for +settings). Missing valid-input or error-input coverage inside an existing +settings test is an ERROR. + +Scope: Common (all connector types). +""" + +import ast +from pathlib import Path + +from connector_linter.models import CheckFinding, ConnectorContext, Severity +from connector_linter.registry import CheckRegistry + + +def _read_test_files(ctx: ConnectorContext) -> dict[Path, str]: + """Read all ``test_*.py`` files from the connector's ``tests/`` directory.""" + sources: dict[Path, str] = {} + tests_dir = ctx.path / "tests" + if not tests_dir.exists(): + return sources + for py_file in tests_dir.rglob("test_*.py"): + rel_path = py_file.relative_to(ctx.path) + try: + sources[rel_path] = py_file.read_text(encoding="utf-8", errors="replace") + except OSError: + continue + return sources + + +def _parse_test_files(sources: dict[Path, str]) -> dict[Path, ast.Module]: + """Parse test source files into AST modules, skipping files with syntax errors.""" + trees: dict[Path, ast.Module] = {} + for file_path, content in sources.items(): + try: + trees[file_path] = ast.parse(content, filename=str(file_path)) + except SyntaxError: + continue + return trees + + +def _imports_connector_settings(tree: ast.Module) -> bool: + """Return ``True`` if the module imports ``ConnectorSettings``.""" + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom): + for alias in node.names: + if alias.name == "ConnectorSettings": + return True + return False + + +class _SettingsCallFinder(ast.NodeVisitor): + """Locate ``*Settings()`` or ``*Loader()`` calls that are **not** inside + a ``pytest.raises(...)`` block. + + Handles all forms observed in the connector codebase: + + * Direct assignment: ``settings = FakeConnectorSettings()`` + * Method chaining: ``config = ConnectorSettings().model_dump()`` + * Bare expression: ``ConnectorSettings()`` (mokn-style smoke test) + * Config-loader name: ``settings = FakeConfigLoader(**d)`` + """ + + def __init__(self) -> None: + self._inside_raises_depth: int = 0 + self.found: bool = False + + def visit_With(self, node: ast.With) -> None: + is_raises = any( + isinstance(item.context_expr, ast.Call) + and isinstance(item.context_expr.func, ast.Attribute) + and item.context_expr.func.attr == "raises" + and isinstance(item.context_expr.func.value, ast.Name) + and item.context_expr.func.value.id == "pytest" + for item in node.items + ) + if is_raises: + self._inside_raises_depth += 1 + self.generic_visit(node) + if is_raises: + self._inside_raises_depth -= 1 + + def visit_Call(self, node: ast.Call) -> None: + if ( + self._inside_raises_depth == 0 + and isinstance(node.func, ast.Name) + and (node.func.id.endswith("Settings") or node.func.id.endswith("Loader")) + ): + self.found = True + self.generic_visit(node) + + +def _has_settings_valid_call(tree: ast.Module) -> bool: + """Return ``True`` if the module calls a ``*Settings()`` or ``*Loader()`` + class outside of a ``pytest.raises(...)`` block.""" + finder = _SettingsCallFinder() + finder.visit(tree) + return finder.found + + +def _has_pytest_raises(tree: ast.Module) -> bool: + """Return ``True`` if the module calls ``pytest.raises(...)``.""" + for node in ast.walk(tree): + if ( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Attribute) + and node.func.attr == "raises" + and isinstance(node.func.value, ast.Name) + and node.func.value.id == "pytest" + ): + return True + return False + + +@CheckRegistry.register( + code="VC325", + name="minimal-settings-tests", + description="Connector must have minimal settings tests covering valid and invalid inputs", + severity=Severity.ERROR, +) +def check_minimal_settings_tests(ctx: ConnectorContext) -> list[CheckFinding]: + """Check that the connector has minimal settings tests.""" + if not ctx.has_tests: + return [ + CheckFinding( + message="No tests/ directory found", + severity=Severity.ERROR, + suggestion=( + "Create a tests/ directory with at minimum a " + "tests_connector/test_settings.py that verifies ConnectorSettings " + "accepts valid input and raises ConfigValidationError for missing " + "required fields (see templates//tests/ for the expected pattern)" + ), + ) + ] + + test_sources = _read_test_files(ctx) + if not test_sources: + return [ + CheckFinding( + message="No test files (test_*.py) found in tests/", + severity=Severity.ERROR, + suggestion=( + "Add tests/tests_connector/test_settings.py that verifies " + "ConnectorSettings accepts valid input and raises " + "ConfigValidationError for missing required fields" + ), + ) + ] + + trees = _parse_test_files(test_sources) + + # A settings test file is identified by filename or by importing ConnectorSettings. + settings_test_trees = { + path: tree + for path, tree in trees.items() + if "settings" in path.name.lower() or _imports_connector_settings(tree) + } + + if not settings_test_trees: + return [ + CheckFinding( + message=( + "No settings test file found in tests/ " + "(expected a file named test_settings*.py or one that imports ConnectorSettings)" + ), + # Advisory: the connector has other tests, but settings are not covered yet. + severity=Severity.WARNING, + suggestion=( + "Create tests/tests_connector/test_settings.py that imports " + "ConnectorSettings and tests both valid input and " + "missing required fields (raises ConfigValidationError)" + ), + ) + ] + + has_valid_test = any( + _has_settings_valid_call(tree) for tree in settings_test_trees.values() + ) + has_error_test = any( + _has_pytest_raises(tree) for tree in settings_test_trees.values() + ) + + results: list[CheckFinding] = [] + settings_paths = ", ".join(str(p) for p in settings_test_trees) + + if not has_valid_test: + results.append( + CheckFinding( + message=( + f"Settings test(s) ({settings_paths}) do not cover valid input " + "(no *Settings() or *Loader() call found outside pytest.raises)" + ), + severity=Severity.ERROR, + suggestion=( + "Add a parametrized test that instantiates ConnectorSettings (or a " + "fake subclass) with a valid config dict and asserts the settings " + "object loads correctly " + "(see templates//tests/tests_connector/test_settings.py)" + ), + ) + ) + + if not has_error_test: + results.append( + CheckFinding( + message=( + f"Settings test(s) ({settings_paths}) do not cover missing required fields " + "(no pytest.raises(...) call found)" + ), + severity=Severity.WARNING, + suggestion=( + "Add a test using pytest.raises(ConfigValidationError) that verifies " + "required settings raise an error when missing " + "(see templates//tests/tests_connector/test_settings.py)" + ), + ) + ) + + if results: + return results + + return [ + CheckFinding( + message=f"Settings tests cover both valid and invalid inputs ✓ ({settings_paths})", + severity=Severity.INFO, + ) + ] From 9a8f9af5d7e4486fc7fd5daf3e0dc16340afcc4e Mon Sep 17 00:00:00 2001 From: Hugo DUPRAS Date: Wed, 13 May 2026 11:35:30 +0200 Subject: [PATCH 3/3] fix: Misc fixes --- .../checks/vc3xx_code/_helpers.py | 4 +--- .../checks/vc3xx_code/vc303_connector_type.py | 2 ++ .../vc3xx_code/vc304_markings_checked.py | 2 +- .../vc3xx_code/vc306_log_level_default.py | 4 ++-- .../vc3xx_code/vc313_pycti_generate_id.py | 21 +------------------ .../vc3xx_code/vc314_auto_backpressure.py | 2 +- 6 files changed, 8 insertions(+), 27 deletions(-) diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/_helpers.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/_helpers.py index 9f8536244c3..bad6ee75e7a 100644 --- a/shared/connector_linter/connector_linter/checks/vc3xx_code/_helpers.py +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/_helpers.py @@ -328,9 +328,7 @@ def find_field_defaults( ( b.id if isinstance(b, ast.Name) - else b.attr - if isinstance(b, ast.Attribute) - else "" + else b.attr if isinstance(b, ast.Attribute) else "" ) for b in node.bases ] diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc303_connector_type.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc303_connector_type.py index f0b8f35e027..5f409d0461e 100644 --- a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc303_connector_type.py +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc303_connector_type.py @@ -158,6 +158,7 @@ def check_connector_type_hardcoded(ctx: ConnectorContext) -> list[CheckFinding]: CheckFinding( message="Connector type hardcoded", severity=Severity.WARNING, + suggestion="Consider using connectors-sdk base config classes to set the connector type via inheritance.", file_path=file_path, line=line, ), @@ -170,6 +171,7 @@ def check_connector_type_hardcoded(ctx: ConnectorContext) -> list[CheckFinding]: CheckFinding( message="Connector type hardcoded via Pydantic field", severity=Severity.WARNING, + suggestion="Consider using connectors-sdk base config classes to set the connector type via inheritance.", file_path=file_path, line=line, ), diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc304_markings_checked.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc304_markings_checked.py index aee14233d75..5f52aed4832 100644 --- a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc304_markings_checked.py +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc304_markings_checked.py @@ -110,7 +110,7 @@ def check_markings_checked(ctx: ConnectorContext) -> list[CheckFinding]: f"TLP extraction found in {file_path}:{line} " "but check_max_tlp is not called" ), - severity=Severity.WARNING, + severity=Severity.ERROR, file_path=file_path, line=line, suggestion=( diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc306_log_level_default.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc306_log_level_default.py index 4ccd828d251..bbd032a847c 100644 --- a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc306_log_level_default.py +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc306_log_level_default.py @@ -21,7 +21,7 @@ # Accepted values for the "error" log level. # Both "error" and "err" are valid — some logging frameworks use the short form. # --------------------------------------------------------------------------- -_ERROR_VALUES = {"error", "err"} +_ERROR_VALUES = "error" @CheckRegistry.register( @@ -54,7 +54,7 @@ def check_log_level_default(ctx: ConnectorContext) -> list[CheckFinding]: if field_defaults: fd = field_defaults[0] - if fd.default_value and fd.default_value in _ERROR_VALUES: + if fd.default_value and fd.default_value == _ERROR_VALUES: return [ CheckFinding( message=f"Log level defaults to '{fd.default_value}'", diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc313_pycti_generate_id.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc313_pycti_generate_id.py index 5121171f4a5..d81a9b260ea 100644 --- a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc313_pycti_generate_id.py +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc313_pycti_generate_id.py @@ -58,26 +58,7 @@ _STIX_SRO_TYPES = frozenset({"Relationship", "Sighting"}) # Combined set for checking -_STIX_TYPES_NEEDING_ID = _STIX_SDO_TYPES | _STIX_SRO_TYPES - -# --------------------------------------------------------------------------- -# Custom OpenCTI types — these are OpenCTI-specific extensions that also -# need explicit id= for deterministic deduplication (not part of STIX 2.1 -# but follow the same pattern). -# --------------------------------------------------------------------------- -_CUSTOM_OCTI_TYPES = frozenset( - { - "CustomObjectCaseIncident", - "CustomObjectTask", - "CustomObjectChannel", - "CustomObservableCryptocurrencyWallet", - "CustomObservableHostname", - "CustomObservableMediaContent", - "CustomObservableUserAgent", - }, -) - -_ALL_TYPES_NEEDING_ID = _STIX_TYPES_NEEDING_ID | _CUSTOM_OCTI_TYPES +_ALL_TYPES_NEEDING_ID = _STIX_SDO_TYPES | _STIX_SRO_TYPES def _get_stix2_imported_names(trees: dict[Path, ast.Module]) -> dict[Path, set[str]]: diff --git a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc314_auto_backpressure.py b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc314_auto_backpressure.py index f180cf3be10..5149fa6cdf0 100644 --- a/shared/connector_linter/connector_linter/checks/vc3xx_code/vc314_auto_backpressure.py +++ b/shared/connector_linter/connector_linter/checks/vc3xx_code/vc314_auto_backpressure.py @@ -111,7 +111,7 @@ def check_auto_backpressure(ctx: ConnectorContext) -> list[CheckFinding]: results.append( CheckFinding( message="Uses while True loop instead of scheduler", - severity=Severity.WARNING, + severity=Severity.ERROR, file_path=file_path, line=line_no, suggestion=(