Skip to content

Commit 4d12e30

Browse files
committed
feat(connector-linter): add VC3xx code checks
1 parent de18174 commit 4d12e30

27 files changed

Lines changed: 4057 additions & 1 deletion
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import ast
2+
from pathlib import Path
3+
4+
from connector_linter.models import ConnectorContext
5+
6+
# ---------------------------------------------------------------------------
7+
# Source reading
8+
# ---------------------------------------------------------------------------
9+
10+
11+
def read_all_python_sources(ctx: ConnectorContext) -> dict[Path, str]:
12+
"""Read all Python source files from the connector's src/ directory."""
13+
sources: dict[Path, str] = {}
14+
# Convention: all connector Python code lives under <connector>/src/
15+
src_dir = ctx.path / "src"
16+
if not src_dir.exists():
17+
return sources
18+
for py_file in src_dir.rglob("*.py"):
19+
# Key by relative path (from connector root) for portable reporting
20+
rel_path = py_file.relative_to(ctx.path)
21+
try:
22+
# errors="replace" avoids UnicodeDecodeError on malformed files
23+
sources[rel_path] = py_file.read_text(encoding="utf-8", errors="replace")
24+
except OSError:
25+
# Skip unreadable files (permissions, broken symlinks, etc.)
26+
continue
27+
return sources
28+
29+
30+
# ---------------------------------------------------------------------------
31+
# AST helpers — structural analysis of Python source
32+
# ---------------------------------------------------------------------------
33+
34+
35+
def parse_sources(sources: dict[Path, str]) -> dict[Path, ast.Module]:
36+
"""Parse all source files into AST modules.
37+
38+
Files that fail to parse (syntax errors) are silently skipped.
39+
"""
40+
trees: dict[Path, ast.Module] = {}
41+
for file_path, content in sources.items():
42+
try:
43+
trees[file_path] = ast.parse(content, filename=str(file_path))
44+
except SyntaxError:
45+
# Silently skip files with syntax errors — they can't be analyzed
46+
# structurally, but other checks (regex-based) may still find issues.
47+
continue
48+
return trees
Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,27 @@
1-
"""VC3xx — Code quality checks."""
1+
"""VC3xx — Code quality checks.
2+
3+
VC301: Connector must define an author identity.
4+
VC302: Author must be referenced on STIX entities (created_by_ref).
5+
VC303: CONNECTOR_TYPE must be defined in application code, not read from env.
6+
VC304: Ensure TLP markings are checked (check_max_tlp).
7+
VC305: Connector must implement Base Settings from connectors-sdk.
8+
VC306: Connector log level should default to 'error'.
9+
VC307: Except blocks should use error/warning logging, not debug/info.
10+
VC308: Main entry point must use traceback for error handling.
11+
VC309: Connector must use only absolute imports, no relative imports.
12+
VC310: External references must not be added by default to non-Identity objects.
13+
VC311: Connector should use TLP markings on entities with appropriate level.
14+
VC312: send_stix2_bundle must use cleanup_inconsistent_bundle=True.
15+
VC313: STIX SDO/SRO objects must use pycti.XXX.generate_id() for deterministic IDs.
16+
VC314: External-import connectors must use schedule_process or schedule_iso.
17+
VC315: Connector must call initiate_work before processing.
18+
VC316: Connector must close work with to_processed after processing.
19+
VC317: initiate_work should only be called when data is available.
20+
VC318: Internal-enrichment connectors must use helper.listen().
21+
VC319: Enrichment connector must return original bundle when not in scope.
22+
VC320: Enrichment connector must enforce TLP access control.
23+
VC321: Enrichment connector must be playbook-compatible.
24+
VC322: Enrichment connector must read data['stix_objects'] (former bundle).
25+
VC323: Stream connectors must use helper.listen_stream().
26+
VC324: Relationship should not set both start_time and stop_time.
27+
"""
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
"""VC301 — Connector must define an author identity.
2+
3+
Uses AST to detect author identity definitions by looking for constructor
4+
calls (``Identity(...)``, ``OrganizationAuthor(...)``, ``stix2.Identity(...)``)
5+
and API calls (``helper.api.identity.create(...)``).
6+
7+
Import validation for bare ``Identity(...)`` calls is also AST-based:
8+
the call is only counted when ``Identity`` is imported from ``stix2``
9+
or ``pycti`` (not some unrelated class).
10+
"""
11+
12+
import ast
13+
from pathlib import Path
14+
15+
from connector_linter.models import (
16+
CheckFinding,
17+
ConnectorContext,
18+
ConnectorType,
19+
Severity,
20+
no_python_sources_finding,
21+
)
22+
from connector_linter.registry import CheckRegistry
23+
24+
# ---------------------------------------------------------------------------
25+
# Author-definition patterns (AST-based)
26+
# ---------------------------------------------------------------------------
27+
28+
# Function/constructor names that unambiguously define an author
29+
_UNAMBIGUOUS_AUTHOR_CALLS = {"OrganizationAuthor"}
30+
31+
# Import modules that confirm bare Identity() comes from stix2/pycti
32+
_IDENTITY_MODULES = {"stix2", "pycti"}
33+
34+
35+
def _has_identity_import(trees: dict[Path, ast.Module]) -> bool:
36+
"""Check if any source file imports Identity from stix2 or pycti."""
37+
for tree in trees.values():
38+
for node in ast.walk(tree):
39+
if isinstance(node, ast.ImportFrom) and node.module:
40+
# from stix2[.xxx] import Identity / from pycti[.xxx] import Identity
41+
mod_root = node.module.split(".")[0]
42+
if mod_root in _IDENTITY_MODULES:
43+
for alias in node.names:
44+
if alias.name == "Identity":
45+
return True
46+
elif isinstance(node, ast.Import):
47+
# import stix2 (Identity accessed as stix2.Identity)
48+
for alias in node.names:
49+
if alias.name.split(".")[0] in _IDENTITY_MODULES:
50+
return True
51+
return False
52+
53+
54+
def _is_author_call(node: ast.Call, identity_imported: bool) -> bool:
55+
"""Determine if an ast.Call node represents an author definition.
56+
57+
Recognized patterns:
58+
1. OrganizationAuthor(...) — connectors-sdk
59+
2. stix2.Identity(...) — qualified stix2 constructor
60+
3. Identity(...) — bare, only if imported from stix2/pycti
61+
4. *.api.identity.create(...) — legacy pycti API call
62+
"""
63+
func = node.func
64+
65+
# Pattern 1 & 3: bare function call — OrganizationAuthor(...) or Identity(...)
66+
if isinstance(func, ast.Name):
67+
if func.id in _UNAMBIGUOUS_AUTHOR_CALLS:
68+
return True
69+
if func.id == "Identity" and identity_imported:
70+
return True
71+
72+
# Patterns 2 & 4: attribute-based calls
73+
if isinstance(func, ast.Attribute):
74+
# Pattern 2: stix2.Identity(...)
75+
if func.attr == "Identity" and isinstance(func.value, ast.Name):
76+
if func.value.id == "stix2":
77+
return True
78+
79+
# Pattern 4: *.api.identity.create(...)
80+
if (
81+
func.attr == "create"
82+
and isinstance(func.value, ast.Attribute)
83+
and func.value.attr == "identity"
84+
and isinstance(func.value.value, ast.Attribute)
85+
and func.value.value.attr == "api"
86+
):
87+
return True
88+
89+
return False
90+
91+
92+
def find_author_definitions(
93+
sources: dict[Path, str],
94+
trees: dict[Path, ast.Module],
95+
) -> list[tuple[Path, int, str]]:
96+
"""Find author definition locations using AST analysis.
97+
98+
Args:
99+
sources: Raw Python source content keyed by relative path (used for
100+
line-text extraction in findings).
101+
trees: Pre-parsed AST modules (e.g. ``ctx.python_trees``). Passing
102+
the cached property avoids redundant parsing across checks.
103+
104+
Returns:
105+
List of (file_path, line_number, matched_line_text).
106+
"""
107+
identity_imported = _has_identity_import(trees)
108+
109+
hits: list[tuple[Path, int, str]] = []
110+
for file_path, tree in trees.items():
111+
content_lines = sources[file_path].splitlines()
112+
for node in ast.walk(tree):
113+
if isinstance(node, ast.Call) and _is_author_call(node, identity_imported):
114+
line_text = (
115+
content_lines[node.lineno - 1].strip()
116+
if node.lineno <= len(content_lines)
117+
else ""
118+
)
119+
hits.append((file_path, node.lineno, line_text))
120+
121+
return hits
122+
123+
124+
@CheckRegistry.register(
125+
code="VC301",
126+
name="author-defined",
127+
description="Connector must define an author identity",
128+
severity=Severity.ERROR,
129+
applicable_types={
130+
ConnectorType.INTERNAL_ENRICHMENT,
131+
ConnectorType.EXTERNAL_IMPORT,
132+
ConnectorType.INTERNAL_IMPORT_FILE,
133+
},
134+
)
135+
def check_author_defined(ctx: ConnectorContext) -> list[CheckFinding]:
136+
"""Check that the connector defines an author identity somewhere in its source."""
137+
sources = ctx.python_sources
138+
139+
if not sources:
140+
return [no_python_sources_finding()]
141+
142+
hits = find_author_definitions(sources, ctx.python_trees)
143+
144+
if hits:
145+
file_path, line, _ = hits[0]
146+
return [
147+
CheckFinding(
148+
message="Author identity defined",
149+
severity=Severity.INFO,
150+
file_path=file_path,
151+
line=line,
152+
),
153+
]
154+
155+
return [
156+
CheckFinding(
157+
message="No author identity definition found in connector source",
158+
severity=Severity.ERROR,
159+
suggestion=(
160+
"Define an author using one of: "
161+
"stix2.Identity(name=..., identity_class='organization'), "
162+
"OrganizationAuthor(name=...), "
163+
"or self.helper.api.identity.create(type='Organization', name=...)"
164+
),
165+
),
166+
]
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
"""VC302 — Author must be referenced on STIX entities (created_by_ref).
2+
3+
Uses AST to detect ``author=`` as a keyword argument in function calls
4+
(avoids false positives from variable assignments like ``author = "John"``).
5+
Also detects ``created_by_ref=`` and ``x_opencti_created_by_ref`` via AST.
6+
"""
7+
8+
import ast
9+
from pathlib import Path
10+
11+
from connector_linter.checks.vc3xx_code.vc301_author_defined import (
12+
find_author_definitions,
13+
)
14+
from connector_linter.models import (
15+
CheckFinding,
16+
ConnectorContext,
17+
ConnectorType,
18+
Severity,
19+
no_python_sources_finding,
20+
)
21+
from connector_linter.registry import CheckRegistry
22+
23+
# ---------------------------------------------------------------------------
24+
# Keyword argument names that attach an author identity to STIX entities:
25+
#
26+
# created_by_ref — standard STIX 2.1 field (SDOs)
27+
# author — connectors-sdk model parameter
28+
# x_opencti_created_by_ref — OpenCTI custom property (used on observables/SCOs)
29+
# ---------------------------------------------------------------------------
30+
_AUTHOR_KWARGS = {"created_by_ref", "author", "x_opencti_created_by_ref"}
31+
32+
33+
def _find_author_references(
34+
trees: dict[Path, ast.Module],
35+
) -> list[tuple[Path, int, str]]:
36+
"""Find keyword arguments that reference an author on STIX entities.
37+
38+
Detects ``created_by_ref=``, ``author=``, and ``x_opencti_created_by_ref``
39+
as keyword arguments in function/constructor calls — not plain assignments.
40+
"""
41+
hits: list[tuple[Path, int, str]] = []
42+
for file_path, tree in trees.items():
43+
for node in ast.walk(tree):
44+
# Only check Call nodes — we want keyword arguments in function/
45+
# constructor calls, not standalone assignments like `author = "John"`
46+
if not isinstance(node, ast.Call):
47+
continue
48+
for kw in node.keywords:
49+
if kw.arg in _AUTHOR_KWARGS:
50+
# Use the keyword node's own lineno when available;
51+
# fall back to the call node's lineno otherwise
52+
hits.append(
53+
(
54+
file_path,
55+
getattr(kw, "lineno", node.lineno) or node.lineno,
56+
kw.arg,
57+
),
58+
)
59+
return hits
60+
61+
62+
@CheckRegistry.register(
63+
code="VC302",
64+
name="author-referenced-on-entities",
65+
description="Author must be referenced on STIX entities (created_by_ref)",
66+
severity=Severity.ERROR,
67+
applicable_types={
68+
ConnectorType.INTERNAL_ENRICHMENT,
69+
ConnectorType.EXTERNAL_IMPORT,
70+
ConnectorType.INTERNAL_IMPORT_FILE,
71+
},
72+
)
73+
def check_author_referenced(ctx: ConnectorContext) -> list[CheckFinding]:
74+
"""Check that created_by_ref or author= is used to attach author to entities."""
75+
sources = ctx.python_sources
76+
77+
if not sources:
78+
return [no_python_sources_finding()]
79+
80+
# Dependency: an author must be defined first (VC301) before it can be referenced.
81+
# If no author definition exists, we fail with a clear message pointing to VC301.
82+
author_hits = find_author_definitions(sources, ctx.python_trees)
83+
if not author_hits:
84+
return [
85+
CheckFinding(
86+
message="No author defined — cannot reference author on entities",
87+
severity=Severity.ERROR,
88+
suggestion="Define an author first (see VC301), then use created_by_ref= on STIX objects",
89+
),
90+
]
91+
92+
# Author exists — now check if it's actually referenced on STIX entities (AST-based)
93+
trees = ctx.python_trees
94+
ref_hits = _find_author_references(trees)
95+
96+
if ref_hits:
97+
file_path, line, _kwarg = ref_hits[0]
98+
return [
99+
CheckFinding(
100+
message=f"Author referenced on entities ({len(ref_hits)} occurrence(s) found)",
101+
severity=Severity.INFO,
102+
file_path=file_path,
103+
line=line,
104+
),
105+
]
106+
107+
return [
108+
CheckFinding(
109+
message="Author is defined but never referenced on STIX entities",
110+
severity=Severity.ERROR,
111+
suggestion=(
112+
"Use created_by_ref=self.author.id on SDOs, "
113+
"or x_opencti_created_by_ref in custom_properties for observables, "
114+
"or author= parameter when using connectors-sdk models"
115+
),
116+
),
117+
]

0 commit comments

Comments
 (0)