-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmcp_server.py
More file actions
250 lines (191 loc) · 7.78 KB
/
Copy pathmcp_server.py
File metadata and controls
250 lines (191 loc) · 7.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
"""
TokenMesh — Cloud Security MCP Agent
Exposes all Azure security analysis tools as MCP (Model Context Protocol)
tools so Claude Desktop (or any MCP client) can call them directly.
"""
import json
from typing import Optional
from mcp.server.fastmcp import FastMCP
from azure_auth import get_subscription_id
from tools.identity import list_users, list_groups, list_service_principals
from tools.rbac import (
list_role_assignments,
summarize_high_privilege_assignments,
list_subscription_scoped_assignments,
)
from tools.storage import (
list_storage_accounts,
check_storage_public_access,
check_storage_hardening,
)
from tools.intelligence import get_high_privileged_identities
from tools.backdoor import detect_backdoors
from tools.report import generate_pdf_report
mcp = FastMCP("TokenMesh")
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
def _sub_id(subscription_id: Optional[str] = None) -> str:
"""Resolve subscription ID from parameter or environment."""
return get_subscription_id(subscription_id)
def _json_result(data: dict) -> str:
"""Return a pretty-printed JSON string for MCP text responses."""
return json.dumps(data, indent=2, default=str)
# Keep track of last result for PDF report generation
_last_result: dict = {}
def _store(result: dict) -> str:
global _last_result
_last_result = result
return _json_result(result)
# ---------------------------------------------------------------------------
# Identity tools
# ---------------------------------------------------------------------------
@mcp.tool()
def mcp_list_users(
subscription_id: Optional[str] = None,
top: int = 100,
) -> str:
"""List Microsoft Entra ID users.
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
top: Number of users to fetch (max 999).
"""
result = list_users(subscription_id=subscription_id, top=top)
return _store(result)
@mcp.tool()
def mcp_list_groups(
subscription_id: Optional[str] = None,
top: int = 100,
) -> str:
"""List Microsoft Entra ID groups.
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
top: Number of groups to fetch (max 999).
"""
result = list_groups(subscription_id=subscription_id, top=top)
return _store(result)
@mcp.tool()
def mcp_list_service_principals(
subscription_id: Optional[str] = None,
top: int = 100,
) -> str:
"""List Microsoft Entra ID service principals (app registrations).
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
top: Number of service principals to fetch (max 999).
"""
result = list_service_principals(subscription_id=subscription_id, top=top)
return _store(result)
# ---------------------------------------------------------------------------
# RBAC tools
# ---------------------------------------------------------------------------
@mcp.tool()
def mcp_list_role_assignments(
subscription_id: Optional[str] = None,
scope: Optional[str] = None,
) -> str:
"""List Azure RBAC role assignments at subscription or custom scope.
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
scope: Optional Azure scope (e.g. /subscriptions/<id> or a resource group path).
"""
result = list_role_assignments(subscription_id=subscription_id, scope=scope)
return _store(result)
@mcp.tool()
def mcp_summarize_high_privilege_assignments(
subscription_id: Optional[str] = None,
) -> str:
"""Summarize high-privilege Azure RBAC roles (Owner, Contributor, User Access Administrator).
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
"""
result = summarize_high_privilege_assignments(subscription_id=subscription_id)
return _store(result)
@mcp.tool()
def mcp_list_subscription_scoped_assignments(
subscription_id: Optional[str] = None,
) -> str:
"""List all RBAC role assignments scoped to the subscription.
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
"""
result = list_subscription_scoped_assignments(subscription_id=subscription_id)
return _store(result)
# ---------------------------------------------------------------------------
# Storage tools
# ---------------------------------------------------------------------------
@mcp.tool()
def mcp_list_storage_accounts(
subscription_id: Optional[str] = None,
) -> str:
"""List all Azure storage accounts in the subscription.
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
"""
result = list_storage_accounts(subscription_id=subscription_id)
return _store(result)
@mcp.tool()
def mcp_check_storage_public_access(
subscription_id: Optional[str] = None,
) -> str:
"""Find storage accounts that allow public blob access (security risk).
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
"""
result = check_storage_public_access(subscription_id=subscription_id)
return _store(result)
@mcp.tool()
def mcp_check_storage_hardening(
subscription_id: Optional[str] = None,
) -> str:
"""Check storage accounts for misconfigurations: public access, weak TLS, HTTP usage.
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
"""
result = check_storage_hardening(subscription_id=subscription_id)
return _store(result)
# ---------------------------------------------------------------------------
# Intelligence & Backdoor detection
# ---------------------------------------------------------------------------
@mcp.tool()
def mcp_get_high_privileged_identities(
subscription_id: Optional[str] = None,
) -> str:
"""Get high-privileged identities with Azure RBAC + Entra roles, risk levels, evidence, and attack paths.
This is the core intelligence tool that enriches RBAC data with Entra directory
roles, resolves principal names, and generates attack path analysis.
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
"""
sub_id = _sub_id(subscription_id)
result = get_high_privileged_identities(subscription_id=sub_id)
return _store(result)
@mcp.tool()
def mcp_detect_backdoors(
subscription_id: Optional[str] = None,
) -> str:
"""Detect potential backdoors: high-privilege service principals, unresolved identities, public storage.
Args:
subscription_id: Azure subscription ID. Uses AZURE_SUBSCRIPTION_ID env var if omitted.
"""
sub_id = _sub_id(subscription_id)
result = detect_backdoors(subscription_id=sub_id)
return _store(result)
# ---------------------------------------------------------------------------
# Report generation
# ---------------------------------------------------------------------------
@mcp.tool()
def mcp_generate_pdf_report() -> str:
"""Generate a PDF security report from the most recent analysis results.
Call one of the analysis tools first (e.g. mcp_get_high_privileged_identities)
before calling this tool, so there is data to include in the report.
"""
if not _last_result:
return json.dumps({"error": "No analysis data available yet. Run an analysis tool first."})
result = generate_pdf_report(_last_result)
return _json_result(result)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
mcp.run()