11"""Data models for the connector linter."""
22
3+ import ast
34import json
45from dataclasses import dataclass , field
56from enum import StrEnum
7+ from functools import cached_property
68from pathlib import Path
79from typing import Any
810
911
12+ class ConnectorType (StrEnum ):
13+ """Known OpenCTI connector types."""
14+
15+ EXTERNAL_IMPORT = "EXTERNAL_IMPORT"
16+ INTERNAL_ENRICHMENT = "INTERNAL_ENRICHMENT"
17+ INTERNAL_EXPORT_FILE = "INTERNAL_EXPORT_FILE"
18+ INTERNAL_IMPORT_FILE = "INTERNAL_IMPORT_FILE"
19+ STREAM = "STREAM"
20+
21+ @property
22+ def label (self ) -> str :
23+ """Human-readable label derived from the value.
24+
25+ Examples: EXTERNAL_IMPORT → 'External Import', STREAM → 'Stream'.
26+ """
27+ return self .value .replace ("_" , " " ).title ()
28+
29+
1030class Severity (StrEnum ):
1131 """Severity levels for check results."""
1232
@@ -18,6 +38,23 @@ def symbol(self) -> str:
1838 """Get a short symbol for the severity level."""
1939 return {"error" : "E" , "warning" : "W" , "info" : "I" }[self .value ]
2040
41+ def rank (self ) -> int :
42+ """Numeric rank for ordering (INFO=0, WARNING=1, ERROR=2)."""
43+ return {"info" : 0 , "warning" : 1 , "error" : 2 }[self .value ]
44+
45+
46+ # Shared severity → display mappings. Keyed by Severity enum for direct lookup.
47+ SEVERITY_EMOJI : dict ["Severity" , str ] = {
48+ Severity .ERROR : "🔴" ,
49+ Severity .WARNING : "🟡" ,
50+ Severity .INFO : "🔵" ,
51+ }
52+ SEVERITY_COLOR : dict ["Severity" , str ] = {
53+ Severity .ERROR : "red" ,
54+ Severity .WARNING : "yellow" ,
55+ Severity .INFO : "cyan" ,
56+ }
57+
2158
2259@dataclass
2360class CheckFinding :
@@ -48,12 +85,30 @@ class CheckResult:
4885 suggestion : str | None = None
4986
5087
88+ def no_python_sources_finding (suggestion : str | None = None ) -> "CheckFinding" :
89+ """Standard finding for checks that require Python source files but find none."""
90+ return CheckFinding (
91+ message = "No Python source files found in src/" ,
92+ severity = Severity .ERROR ,
93+ suggestion = suggestion or "Connector must have Python source files under src/" ,
94+ )
95+
96+
97+ _DIR_TO_CONNECTOR_TYPE : dict [str , ConnectorType ] = {
98+ "external-import" : ConnectorType .EXTERNAL_IMPORT ,
99+ "internal-enrichment" : ConnectorType .INTERNAL_ENRICHMENT ,
100+ "internal-export-file" : ConnectorType .INTERNAL_EXPORT_FILE ,
101+ "internal-import-file" : ConnectorType .INTERNAL_IMPORT_FILE ,
102+ "stream" : ConnectorType .STREAM ,
103+ }
104+
105+
51106@dataclass
52107class ConnectorContext :
53108 """Contextual data about a connector, loaded once and shared across checks."""
54109
55110 path : Path
56- connector_type : str | None = None
111+ connector_type : ConnectorType | None = None
57112 manifest : dict [str , Any ] = field (default_factory = dict )
58113 config_schema : dict [str , Any ] = field (default_factory = dict )
59114 has_tests : bool = False
@@ -63,34 +118,65 @@ class ConnectorContext:
63118 src_files : list [Path ] = field (default_factory = list )
64119 all_files : list [Path ] = field (default_factory = list )
65120
121+ @cached_property
122+ def python_sources (self ) -> dict [Path , str ]:
123+ """All Python source files under src/, keyed by path relative to connector root.
124+
125+ Computed once and cached for the lifetime of this context.
126+ Uses src_files populated at load time to avoid re-scanning the filesystem.
127+ """
128+ sources : dict [Path , str ] = {}
129+ for rel_path in self .src_files :
130+ abs_path = self .path / rel_path
131+ try :
132+ sources [rel_path ] = abs_path .read_text (
133+ encoding = "utf-8" , errors = "replace"
134+ )
135+ except OSError :
136+ continue
137+ return sources
138+
139+ @cached_property
140+ def python_trees (self ) -> dict [Path , ast .Module ]:
141+ """Parsed AST modules for all Python source files.
142+
143+ Computed once and cached for the lifetime of this context.
144+ Files with syntax errors are silently skipped.
145+ """
146+ trees : dict [Path , ast .Module ] = {}
147+ for file_path , content in self .python_sources .items ():
148+ try :
149+ trees [file_path ] = ast .parse (content , filename = str (file_path ))
150+ except SyntaxError :
151+ continue
152+ return trees
153+
66154 @classmethod
67155 def load (cls , connector_path : Path ) -> "ConnectorContext" :
68156 """Load connector context from its directory."""
69157 ctx = cls (path = connector_path .resolve ())
70158
71159 # Detect connector type from parent directory name
72- parent_name = ctx .path .parent .name
73- type_mapping = {
74- "external-import" : "EXTERNAL_IMPORT" ,
75- "internal-enrichment" : "INTERNAL_ENRICHMENT" ,
76- "internal-export-file" : "INTERNAL_EXPORT_FILE" ,
77- "internal-import-file" : "INTERNAL_IMPORT_FILE" ,
78- "stream" : "STREAM" ,
79- }
80- ctx .connector_type = type_mapping .get (parent_name )
160+ ctx .connector_type = _DIR_TO_CONNECTOR_TYPE .get (ctx .path .parent .name )
81161 # Fallback only for template layout: templates/<connector-kind>
82- if ctx .connector_type is None and parent_name == "templates" :
83- ctx .connector_type = type_mapping .get (ctx .path .name )
162+ if ctx .connector_type is None and ctx . path . parent . name == "templates" :
163+ ctx .connector_type = _DIR_TO_CONNECTOR_TYPE .get (ctx .path .name )
84164
85165 # Load manifest
86166 manifest_path = ctx .path / "__metadata__" / "connector_manifest.json"
87167 if manifest_path .exists ():
88- with manifest_path .open () as f :
89- ctx .manifest = json .load (f )
168+ try :
169+ with manifest_path .open () as f :
170+ ctx .manifest = json .load (f )
171+ except (json .JSONDecodeError , OSError ):
172+ pass # malformed or unreadable — checks that need it will report missing fields
90173
91174 # Fallback: use container_type from manifest
92175 if ctx .connector_type is None and ctx .manifest .get ("container_type" ):
93- ctx .connector_type = ctx .manifest ["container_type" ]
176+ try :
177+ ctx .connector_type = ConnectorType (ctx .manifest ["container_type" ])
178+ except ValueError :
179+ pass # unknown type string — leave as None
94180
95181 if ctx .connector_type is None :
96182 raise ValueError (
@@ -105,8 +191,11 @@ def load(cls, connector_path: Path) -> "ConnectorContext":
105191 # Load config schema
106192 schema_path = ctx .path / "__metadata__" / "connector_config_schema.json"
107193 if schema_path .exists ():
108- with schema_path .open () as f :
109- ctx .config_schema = json .load (f )
194+ try :
195+ with schema_path .open () as f :
196+ ctx .config_schema = json .load (f )
197+ except (json .JSONDecodeError , OSError ):
198+ pass # malformed or unreadable — leave as empty dict
110199
111200 # Detect structural elements
112201 ctx .has_metadata_dir = (ctx .path / "__metadata__" ).is_dir ()
0 commit comments