-
Notifications
You must be signed in to change notification settings - Fork 63
Expand file tree
/
Copy pathcodex.py
More file actions
311 lines (261 loc) · 11.7 KB
/
codex.py
File metadata and controls
311 lines (261 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
"""Codex CLI IDE integration for AI guardrails."""
import json
import os
import sys
from pathlib import Path
from typing import ClassVar, Optional
import tomli_w
if sys.version_info >= (3, 11):
import tomllib
else: # pragma: no cover - py<3.11 fallback
import tomli as tomllib
from cycode.cli.apps.ai_guardrails.consts import CYCODE_SCAN_PROMPT_COMMAND, CYCODE_SESSION_START_COMMAND
from cycode.cli.apps.ai_guardrails.ides._plugin_utils import load_plugin_json, walk_enabled_plugins
from cycode.cli.apps.ai_guardrails.ides.base import IDE, DecisionAction, HookDecision
from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload
from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType
from cycode.cli.utils.jwt_utils import decode_jwt_unverified
from cycode.logger import get_logger
logger = get_logger('AI Guardrails Codex')
_CONFIG_DIR_NAME = '.codex'
_HOOKS_FILE_NAME = 'hooks.json'
_CONFIG_TOML_NAME = 'config.toml'
_AUTH_JSON_NAME = 'auth.json'
_CODEX_HOME_ENV_VAR = 'CODEX_HOME'
_HOOK_EVENTS = ('UserPromptSubmit', 'PreToolUse:mcp')
_CODEX_EVENT_NAMES = frozenset(e.split(':', 1)[0] for e in _HOOK_EVENTS)
_SCAN_COMMAND = f'{CYCODE_SCAN_PROMPT_COMMAND} --ide codex'
_SESSION_START_COMMAND = f'{CYCODE_SESSION_START_COMMAND} --ide codex'
def _codex_home() -> Path:
"""Resolve Codex's user-scope home directory.
Honors ``$CODEX_HOME`` per Codex's documented override; falls back to
``~/.codex``.
"""
override = os.environ.get(_CODEX_HOME_ENV_VAR)
if override:
return Path(override)
return Path.home() / _CONFIG_DIR_NAME
def _codex_config_toml_path(scope: str, repo_path: Optional[Path] = None) -> Path:
"""Return the Codex ``config.toml`` path for the given scope."""
if scope == 'repo' and repo_path:
return repo_path / _CONFIG_DIR_NAME / _CONFIG_TOML_NAME
return _codex_home() / _CONFIG_TOML_NAME
def _load_codex_config(config_path: Optional[Path] = None) -> Optional[dict]:
"""Load and parse Codex's ``config.toml``. Returns None on missing/invalid."""
path = config_path or (_codex_home() / _CONFIG_TOML_NAME)
if not path.exists():
logger.debug('Codex config file not found, %s', {'path': str(path)})
return None
try:
with path.open('rb') as f:
return tomllib.load(f)
except Exception as e:
logger.debug('Failed to load Codex config file, %s', {'path': str(path)}, exc_info=e)
return None
def _email_from_auth(auth_path: Optional[Path] = None) -> Optional[str]:
"""Best-effort extraction of the signed-in Codex user's email.
Reads ``~/.codex/auth.json`` and decodes the JWT in ``tokens.id_token``
to pull the ``email`` claim. Returns None if auth.json is missing
(``OPENAI_API_KEY``-only setups, OS keychain credentials) or unreadable.
"""
path = auth_path or (_codex_home() / _AUTH_JSON_NAME)
if not path.exists():
logger.debug('Codex auth file not found, %s', {'path': str(path)})
return None
try:
auth = json.loads(path.read_text(encoding='utf-8'))
except (OSError, json.JSONDecodeError) as e:
logger.debug('Failed to load Codex auth file, %s', {'path': str(path)}, exc_info=e)
return None
token = (auth.get('tokens') or {}).get('id_token')
if not token:
return None
claims = decode_jwt_unverified(token)
if not claims:
return None
return claims.get('email')
def _resolve_codex_plugin_dir(plugin_name: str, marketplace: str) -> Optional[Path]:
"""Find ``~/.codex/plugins/cache/<marketplace>/<plugin>/<hash>/``.
The trailing segment is a content hash. If multiple are cached, pick the
most recently modified.
"""
base = _codex_home() / 'plugins' / 'cache' / marketplace / plugin_name
if not base.is_dir():
return None
candidates = [d for d in base.iterdir() if d.is_dir()]
if not candidates:
return None
return max(candidates, key=lambda d: d.stat().st_mtime)
def _read_codex_plugin(plugin_dir: Path) -> tuple[dict, dict]:
"""Read one Codex plugin's manifest + MCP servers.
Codex's manifest references the MCP file via a path string in the
``mcpServers`` field (default ``./.mcp.json``); the target file is either
a bare ``{name: cfg}`` map or wrapped in ``{"mcpServers": {...}}``.
"""
manifest = load_plugin_json(plugin_dir / '.codex-plugin' / 'plugin.json')
entry: dict = {}
if not manifest:
return entry, {}
for field in ('name', 'version', 'description'):
if field in manifest:
entry[field] = manifest[field]
mcp_ref = manifest.get('mcpServers')
if not mcp_ref:
return entry, {}
mcp_doc = load_plugin_json(plugin_dir / mcp_ref) or {}
servers = mcp_doc.get('mcpServers', mcp_doc)
if not isinstance(servers, dict):
servers = {}
if servers:
entry['mcp_server_names'] = list(servers.keys())
entry['mcp_config_file'] = json.dumps(mcp_doc)
return entry, servers
def _resolve_codex_plugins(config: dict) -> tuple[dict, dict]:
"""Walk enabled ``[plugins."<plugin>@<marketplace>"]`` entries."""
return walk_enabled_plugins(
plugin_entries=config.get('plugins') or {},
is_enabled=lambda s: isinstance(s, dict) and bool(s.get('enabled')),
locate_dir=_resolve_codex_plugin_dir,
read_plugin=_read_codex_plugin,
)
def _enable_codex_hooks_feature(scope: str, repo_path: Optional[Path] = None) -> tuple[bool, str]:
"""Set ``[features] hooks = true`` in Codex's ``config.toml``.
Codex's hook scripts are gated behind this feature flag. We preserve any
existing keys and create the file (+ parent dir) when missing.
"""
config_path = _codex_config_toml_path(scope, repo_path)
config: dict = {}
if config_path.exists():
try:
with config_path.open('rb') as f:
config = tomllib.load(f)
except Exception as e:
logger.error('Failed to parse Codex config.toml, %s', {'path': str(config_path)}, exc_info=e)
return False, f'Failed to parse existing Codex config at {config_path}'
features = config.get('features')
if not isinstance(features, dict):
features = {}
features['hooks'] = True
config['features'] = features
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
with config_path.open('wb') as f:
tomli_w.dump(config, f)
return True, f'Enabled hooks feature in {config_path}'
except Exception as e:
logger.error('Failed to write Codex config.toml, %s', {'path': str(config_path)}, exc_info=e)
return False, f'Failed to write Codex config at {config_path}'
class Codex(IDE):
name: ClassVar[str] = 'codex'
display_name: ClassVar[str] = 'Codex'
hook_events: ClassVar[list[str]] = list(_HOOK_EVENTS)
def settings_path(self, scope: str, repo_path: Optional[Path] = None) -> Path:
if scope == 'repo' and repo_path:
return repo_path / _CONFIG_DIR_NAME / _HOOKS_FILE_NAME
return _codex_home() / _HOOKS_FILE_NAME
def render_hooks_config(self, async_mode: bool = False) -> dict:
# Codex's TOML `async: true` flag is unimplemented; shell-background via
# `&` is the working mechanism. SessionStart stays sync so the
# conversation context is registered before any scan hook fires.
bg = ' &' if async_mode else ''
scan_cmd = f'{_SCAN_COMMAND}{bg}'
return {
'hooks': {
'SessionStart': [
{
'matcher': 'startup|clear',
'hooks': [{'type': 'command', 'command': _SESSION_START_COMMAND}],
}
],
'UserPromptSubmit': [
{
'hooks': [{'type': 'command', 'command': scan_cmd}],
}
],
'PreToolUse': [
{
'matcher': 'mcp__.*',
'hooks': [{'type': 'command', 'command': scan_cmd}],
},
],
},
}
def post_install(self, scope: str, repo_path: Optional[Path] = None) -> tuple[bool, str]:
return _enable_codex_hooks_feature(scope, repo_path)
def matches_payload(self, raw_payload: dict) -> bool:
return raw_payload.get('hook_event_name', '') in _CODEX_EVENT_NAMES
def parse_hook_payload(self, raw_payload: dict) -> AIHookPayload:
hook_event_name = raw_payload.get('hook_event_name', '')
tool_name = raw_payload.get('tool_name', '')
tool_input = raw_payload.get('tool_input')
if hook_event_name == 'UserPromptSubmit':
canonical_event: AiHookEventType | str = AiHookEventType.PROMPT
elif hook_event_name == 'PreToolUse' and tool_name.startswith('mcp__'):
canonical_event = AiHookEventType.MCP_EXECUTION
else:
canonical_event = hook_event_name
mcp_server_name = None
mcp_tool_name = None
mcp_arguments = None
if tool_name.startswith('mcp__'):
parts = tool_name.split('__')
if len(parts) >= 2:
mcp_server_name = parts[1]
if len(parts) >= 3:
mcp_tool_name = parts[2]
mcp_arguments = tool_input
return AIHookPayload(
event_name=canonical_event,
conversation_id=raw_payload.get('session_id'),
generation_id=raw_payload.get('turn_id'),
ide_user_email=_email_from_auth(),
model=raw_payload.get('model'),
ide_provider=self.name,
prompt=raw_payload.get('prompt', ''),
mcp_server_name=mcp_server_name,
mcp_tool_name=mcp_tool_name,
mcp_arguments=mcp_arguments,
)
def build_hook_response(self, decision: HookDecision) -> dict:
# Codex accepts the same hook response shapes as Claude Code:
# - PROMPT: empty for allow, {"decision": "block", "reason": ...} for deny
# - PreToolUse: hookSpecificOutput.permissionDecision
if decision.event_type == AiHookEventType.PROMPT:
if decision.action == DecisionAction.ALLOW:
return {}
return {'decision': 'block', 'reason': decision.user_message or ''}
if decision.action == DecisionAction.ALLOW:
return {
'hookSpecificOutput': {
'hookEventName': 'PreToolUse',
'permissionDecision': 'allow',
}
}
return {
'hookSpecificOutput': {
'hookEventName': 'PreToolUse',
'permissionDecision': decision.action.value, # 'deny' or 'ask'
'permissionDecisionReason': decision.user_message or '',
}
}
def build_session_payload(self, raw_payload: dict) -> AIHookPayload:
return AIHookPayload(
conversation_id=raw_payload.get('session_id'),
ide_user_email=_email_from_auth(),
model=raw_payload.get('model'),
ide_provider=self.name,
ide_version=raw_payload.get('codex_version'),
source=raw_payload.get('source'),
)
def get_user_email(self) -> Optional[str]:
return _email_from_auth()
def get_session_context(self) -> tuple[dict, dict]:
config = _load_codex_config()
if not config:
return {}, {}
# Codex stores MCP servers under `[mcp_servers.<name>]`. Plugin-contributed
# servers (via `[plugins."<plugin>@<marketplace>"]`) merge on top.
mcp_servers: dict = dict(config.get('mcp_servers') or {})
plugin_mcp, enriched_plugins = _resolve_codex_plugins(config)
mcp_servers.update(plugin_mcp)
return mcp_servers, enriched_plugins