-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathruntime.py
More file actions
161 lines (130 loc) · 5.55 KB
/
runtime.py
File metadata and controls
161 lines (130 loc) · 5.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
OMEN Engine Core — Plugin Runtime
Manages plugin discovery, loading, lifecycle, event routing, and fault isolation.
Reference: docs/loe-1-engine.md
"""
from __future__ import annotations
import importlib
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
logger = logging.getLogger("omen.engine.core.runtime")
# ---------------------------------------------------------------------------
# Plugin Manifest
# ---------------------------------------------------------------------------
@dataclass
class PluginManifest:
plugin_id: str
name: str
version: str
description: str
capabilities: list[str] = field(default_factory=list)
dependencies: list[str] = field(default_factory=list)
configuration_schema: dict[str, Any] = field(default_factory=dict)
omen_sdk_version: str = ">=1.0.0"
ai_assisted: bool = False
classification: str = "UNCLASSIFIED"
@classmethod
def from_file(cls, path: Path) -> "PluginManifest":
data = json.loads(path.read_text())
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
# ---------------------------------------------------------------------------
# Plugin Registry
# ---------------------------------------------------------------------------
class PluginRegistry:
"""
Central registry of loaded plugins.
In production this is backed by a signed manifest store.
This stub uses an in-memory dict for development and testing.
"""
def __init__(self) -> None:
self._plugins: dict[str, Any] = {}
self._manifests: dict[str, PluginManifest] = {}
def register(self, plugin: Any, manifest: PluginManifest) -> None:
if manifest.plugin_id in self._plugins:
raise ValueError(f"Plugin already registered: {manifest.plugin_id}")
self._plugins[manifest.plugin_id] = plugin
self._manifests[manifest.plugin_id] = manifest
logger.info("Registered plugin: %s v%s", manifest.plugin_id, manifest.version)
def get(self, plugin_id: str) -> Any | None:
return self._plugins.get(plugin_id)
def get_manifest(self, plugin_id: str) -> PluginManifest | None:
return self._manifests.get(plugin_id)
def list_plugin_ids(self) -> list[str]:
return list(self._plugins.keys())
def unregister(self, plugin_id: str) -> None:
self._plugins.pop(plugin_id, None)
self._manifests.pop(plugin_id, None)
logger.info("Unregistered plugin: %s", plugin_id)
# ---------------------------------------------------------------------------
# Plugin Runtime
# ---------------------------------------------------------------------------
class PluginRuntime:
"""
Manages the lifecycle of all registered plugins.
TODO (Phase 1):
- Add sandboxed execution with resource limits
- Add plugin signature verification
- Add capability enforcement at subscription time
- Add fault isolation (exception in one plugin must not crash others)
"""
def __init__(self, registry: PluginRegistry, event_bus: Any) -> None:
self.registry = registry
self.event_bus = event_bus
self._running: set[str] = set()
def start_plugin(self, plugin_id: str, config: dict[str, Any]) -> None:
plugin = self.registry.get(plugin_id)
if plugin is None:
raise KeyError(f"Plugin not found: {plugin_id}")
if plugin_id in self._running:
logger.warning("Plugin already running: %s", plugin_id)
return
manifest = self.registry.get_manifest(plugin_id)
self._enforce_capabilities(manifest)
try:
plugin.on_start(config)
self._running.add(plugin_id)
logger.info("Started plugin: %s", plugin_id)
except Exception as exc:
logger.error("Failed to start plugin %s: %s", plugin_id, exc, exc_info=True)
raise
def stop_plugin(self, plugin_id: str) -> None:
plugin = self.registry.get(plugin_id)
if plugin is None or plugin_id not in self._running:
return
try:
plugin.on_stop()
except Exception as exc:
logger.warning("Error stopping plugin %s: %s", plugin_id, exc)
finally:
self._running.discard(plugin_id)
logger.info("Stopped plugin: %s", plugin_id)
def dispatch_event(self, event: Any) -> list[Any]:
"""Dispatch an event to all running plugins. Returns all output events."""
output_events: list[Any] = []
for plugin_id in list(self._running):
plugin = self.registry.get(plugin_id)
if plugin is None:
continue
try:
results = plugin.on_event(event)
output_events.extend(results or [])
except Exception as exc:
logger.error(
"Plugin %s raised exception handling event %s: %s",
plugin_id, getattr(event, "event_type", "?"), exc,
exc_info=True,
)
# Fault isolation: continue to next plugin
return output_events
def running_plugins(self) -> list[str]:
return list(self._running)
def _enforce_capabilities(self, manifest: PluginManifest | None) -> None:
if manifest is None:
return
# TODO: Check declared capabilities against policy
# For now, log a warning if no capabilities declared
if not manifest.capabilities:
logger.warning("Plugin %s declares no capabilities", manifest.plugin_id)