@@ -109,75 +109,121 @@ def validate_config(self, config: Dict[str, Any]) -> tuple[bool, str]:
109109
110110class PluginSecurity :
111111 """Security utilities for plugin system"""
112-
112+
113113 DANGEROUS_MODULES = {
114114 'os.system' , 'subprocess.Popen' , 'eval' , 'exec' ,
115115 '__import__' , 'compile'
116116 }
117-
117+
118118 ALLOWED_IMPORTS = {
119119 'json' , 'pathlib' , 'typing' , 'dataclasses' , 're' ,
120120 'datetime' , 'collections' , 'itertools' , 'functools'
121121 }
122-
122+
123123 @staticmethod
124124 def calculate_checksum (file_path : Path ) -> str :
125- """Calculate SHA256 checksum of a plugin file"""
125+ import hashlib
126126 sha256 = hashlib .sha256 ()
127127 with open (file_path , 'rb' ) as f :
128128 for chunk in iter (lambda : f .read (4096 ), b'' ):
129129 sha256 .update (chunk )
130130 return sha256 .hexdigest ()
131-
131+
132132 @staticmethod
133133 def validate_plugin_code (plugin_path : Path ) -> tuple [bool , str ]:
134134 """
135- Basic static analysis of plugin code for security.
136-
135+ Static analysis of plugin code for security.
137136 Returns:
138137 Tuple of (is_safe, message)
138+
139+ Design principle: fail-closed. Anything that cannot be statically
140+ resolved is treated as potentially dangerous rather than silently
141+ allowed.
139142 """
140-
141- fatal_calls = {
142- "eval" ,
143- "exec" ,
144- "compile" ,
145- "__import__" ,
146- "vars" ,
147- "getattr" ,
148- "os.system" ,
149- "os.popen" ,
143+
144+ # Any direct or aliased call to these names is an immediate rejection.
145+ fatal_calls : set [str ] = {
146+ # Code execution
147+ "eval" , "exec" , "compile" , "__import__" ,
148+ # Reflection/introspection
149+ "vars" , "getattr" ,
150+ # importlib — dynamic module loading (all public entry-points)
151+ "importlib.import_module" ,
152+ "importlib.util.spec_from_file_location" ,
153+ "importlib.util.spec_from_loader" ,
154+ "importlib.util.module_from_spec" ,
155+ # os — process execution: complete API surface
156+ "os.system" , "os.popen" ,
157+ "os.spawnl" , "os.spawnle" , "os.spawnlp" , "os.spawnlpe" ,
158+ "os.spawnv" , "os.spawnve" , "os.spawnvp" , "os.spawnvpe" ,
159+ "os.execl" , "os.execle" , "os.execlp" , "os.execlpe" ,
160+ "os.execv" , "os.execve" , "os.execvp" , "os.execvpe" ,
161+ "os.posix_spawn" , "os.posix_spawnp" ,
162+ # subprocess — complete API surface
150163 "subprocess.Popen" ,
151164 "subprocess.run" ,
152165 "subprocess.call" ,
153166 "subprocess.check_call" ,
154167 "subprocess.check_output" ,
168+ "subprocess.getoutput" ,
169+ "subprocess.getstatusoutput" ,
170+ # ctypes — direct native/OS calls
171+ "ctypes.CDLL" , "ctypes.cdll" , "ctypes.windll" , "ctypes.oledll" ,
172+ }
173+
174+ # Importing any of these (or sub-packages thereof) is an immediate rejection, because they enable dynamic execution that the call-level checks cannot fully enumerate.
175+ fatal_import_modules : set [str ] = {
176+ "importlib" , # dynamic module loading
177+ "importlib.util" ,
178+ "ctypes" , # native library access
179+ "cffi" , # native library access
180+ "types" , # raw bytecode construction
155181 }
156- warning_calls = {
157- "open" ,
158- "builtins.open" ,
182+
183+ # Subscript access (obj[key]) on these expressions is rejected because it exposes an arbitrary callable:
184+ # sys.modules['os'].system(...)
185+ # builtins.__dict__['exec'](...)
186+ fatal_subscript_bases : set [str ] = {
187+ "sys.modules" ,
188+ "__builtins__" ,
189+ "builtins.__dict__" ,
159190 }
160191
192+ # When the call target is of the form <unresolvable>.<attr>(), we check whether <attr> is one of these names. This catches the importlib.import_module('os').system(...) pattern.
193+ dangerous_opaque_attrs : set [str ] = {
194+ "system" , "popen" ,
195+ "spawnl" , "spawnle" , "spawnlp" , "spawnlpe" ,
196+ "spawnv" , "spawnve" , "spawnvp" , "spawnvpe" ,
197+ "execl" , "execle" , "execlp" , "execlpe" ,
198+ "execv" , "execve" , "execvp" , "execvpe" ,
199+ "posix_spawn" , "posix_spawnp" ,
200+ "Popen" , "run" , "call" , "check_call" , "check_output" ,
201+ "getoutput" , "getstatusoutput" ,
202+ "exec" , "eval" , "compile" ,
203+ "load_module" , "exec_module" , # importlib loader API
204+ }
205+
206+ warning_calls : set [str ] = {"open" , "builtins.open" }
161207 try :
162208 source = plugin_path .read_text ()
163209 tree = ast .parse (source , filename = str (plugin_path ))
164210 except Exception as exc :
165211 return False , f"Error validating plugin: { exc } "
166-
212+
167213 alias_map : Dict [str , str ] = {}
168214 detected_fatal : set [str ] = set ()
169215 detected_warnings : set [str ] = set ()
170-
216+
217+
171218 def register_alias (alias : str , target : str ) -> None :
172219 alias_map [alias ] = target
173-
220+
174221 def resolve_name (node : ast .AST ) -> Optional [str ]:
175222 if isinstance (node , ast .Name ):
176- target = alias_map .get (node .id , node .id )
177- return target
223+ return alias_map .get (node .id , node .id )
178224 if isinstance (node , ast .Attribute ):
179225 attrs : List [str ] = []
180- current = node
226+ current : ast . AST = node
181227 while isinstance (current , ast .Attribute ):
182228 attrs .append (current .attr )
183229 current = current .value
@@ -186,62 +232,105 @@ def resolve_name(node: ast.AST) -> Optional[str]:
186232 attrs .append (base )
187233 attrs .reverse ()
188234 return "." .join (attrs )
235+ return None
189236 if isinstance (node , ast .Call ):
190237 inner = resolve_name (node .func )
191238 if inner :
192239 return inner
193240 return None
241+
242+ def _normalise (name : str ) -> str :
243+ """Apply alias map to the leading component of a dotted name."""
244+ parts = name .split ("." )
245+ root = alias_map .get (parts [0 ], parts [0 ])
246+ return "." .join ([root , * parts [1 :]]) if len (parts ) > 1 else root
194247
195248 class Analyzer (ast .NodeVisitor ):
196249 def visit_Import (self , node : ast .Import ) -> None :
197250 for alias in node .names :
198- register_alias (alias .asname or alias .name , alias .name )
251+ mod = alias .name
252+ for blocked in fatal_import_modules :
253+ if mod == blocked or mod .startswith (blocked + "." ):
254+ detected_fatal .add (f"import { mod } " )
255+ register_alias (alias .asname or mod , mod )
199256 self .generic_visit (node )
200257
201258 def visit_ImportFrom (self , node : ast .ImportFrom ) -> None :
202259 module = node .module or ""
260+ for blocked in fatal_import_modules :
261+ if module == blocked or module .startswith (blocked + "." ):
262+ for alias in node .names :
263+ detected_fatal .add (f"from { module } import { alias .name } " )
203264 for alias in node .names :
204265 target = f"{ module } .{ alias .name } " if module else alias .name
205266 register_alias (alias .asname or alias .name , target )
206267 self .generic_visit (node )
207268
269+ def visit_Subscript (self , node : ast .Subscript ) -> None :
270+ """
271+ Flag dangerous subscript patterns:
272+ sys.modules['os'] → sys.modules[...]
273+ builtins.__dict__['exec'] → builtins.__dict__[...]
274+ """
275+ base_name = resolve_name (node .value )
276+ if base_name :
277+ normalised = _normalise (base_name )
278+ if (normalised in fatal_subscript_bases
279+ or base_name in fatal_subscript_bases ):
280+ detected_fatal .add (f"{ normalised } [...]" )
281+ self .generic_visit (node )
282+
208283 def visit_Call (self , node : ast .Call ) -> None :
209284 name = resolve_name (node .func )
210- if name :
285+
286+ if name is None :
287+ if isinstance (node .func , ast .Attribute ):
288+ attr = node .func .attr
289+ if attr in dangerous_opaque_attrs :
290+ detected_fatal .add (f"<opaque>.{ attr } ()" )
291+
292+ elif isinstance (node .func , ast .Subscript ):
293+ base_name = resolve_name (node .func .value )
294+ if base_name :
295+ normalised = _normalise (base_name )
296+ if (normalised in fatal_subscript_bases
297+ or base_name in fatal_subscript_bases ):
298+ detected_fatal .add (
299+ f"call_via_{ normalised } [...]"
300+ )
301+ else :
302+ detected_fatal .add ("<opaque_subscript_call>" )
303+
304+ else :
211305 simplified = name .replace ("builtins." , "" )
212-
213- # Handle alias that already resolved to dotted path
306+
214307 if simplified in fatal_calls :
215308 detected_fatal .add (simplified )
216309 elif simplified in warning_calls :
217310 detected_warnings .add (simplified )
218311 else :
219- # Check dotted paths by normalising alias root
220- parts = simplified .split ("." )
221- if parts :
222- root = alias_map .get (parts [0 ], parts [0 ])
223- normalised = "." .join ([root , * parts [1 :]]) if len (parts ) > 1 else root
224- normalised = normalised .replace ("builtins." , "" )
225-
226- if normalised in fatal_calls :
227- detected_fatal .add (normalised )
228- elif normalised in warning_calls :
229- detected_warnings .add (normalised )
230-
312+ normalised = _normalise (simplified ).replace (
313+ "builtins." , ""
314+ )
315+ if normalised in fatal_calls :
316+ detected_fatal .add (normalised )
317+ elif normalised in warning_calls :
318+ detected_warnings .add (normalised )
319+
231320 self .generic_visit (node )
232-
321+
233322 Analyzer ().visit (tree )
234-
323+
235324 if detected_fatal :
236325 ordered = ", " .join (sorted (detected_fatal ))
237326 return False , f"Plugin uses high-risk calls: { ordered } "
238-
327+
239328 if detected_warnings :
240329 ordered = ", " .join (sorted (detected_warnings ))
241330 return True , f"Plugin uses sensitive operations: { ordered } "
242-
331+
243332 return True , ""
244-
333+
245334 @staticmethod
246335 def verify_checksum (plugin_path : Path , expected_checksum : str ) -> bool :
247336 """Verify plugin file checksum"""
0 commit comments