-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsecurity_trimming.py
More file actions
264 lines (221 loc) · 8.28 KB
/
Copy pathsecurity_trimming.py
File metadata and controls
264 lines (221 loc) · 8.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
"""
Example: Row-Level Security in RAG with Azure AI Search
KB Section: 3. Technical Challenges - Security & Governance
KB Link: https://maree217.github.io/copilot-architect-kb#challenges
Description:
Implements row-level security for RAG systems using Azure AI Search metadata
filtering. Ensures users only retrieve documents they're authorized to access.
Security Requirements:
- User sees only documents from their authorized groups
- Filter applied at search time (not post-processing)
- Supports Azure AD group membership
- Zero unauthorized document leakage
Prerequisites:
- pip install azure-search-documents azure-identity python-dotenv
- Azure AI Search service
- AZURE_SEARCH_ENDPOINT and AZURE_SEARCH_KEY
Usage:
$ python security_trimming.py
"""
import os
from dotenv import load_dotenv
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential
from typing import List, Dict
# Load environment variables
load_dotenv()
class SecureRAGRetriever:
"""
RAG retriever with row-level security using Azure AI Search.
Documents are indexed with 'allowed_groups' metadata. At query time,
filters documents based on user's group membership.
"""
def __init__(
self,
search_endpoint: str,
search_key: str,
index_name: str = "secure-documents"
):
"""
Initialize secure retriever.
Args:
search_endpoint: Azure AI Search endpoint
search_key: Azure AI Search admin key
index_name: Name of search index
"""
self.search_client = SearchClient(
endpoint=search_endpoint,
index_name=index_name,
credential=AzureKeyCredential(search_key)
)
def search_with_security_trimming(
self,
query: str,
user_groups: List[str],
top_k: int = 5
) -> List[Dict]:
"""
Search with security filtering based on user's group membership.
Args:
query: User's search query
user_groups: List of AD groups user belongs to
top_k: Number of results to return
Returns:
List of documents user is authorized to see
"""
# Build OData filter for user's groups
# Format: "allowed_groups/any(g: g in ('group1', 'group2'))"
group_filter = self._build_group_filter(user_groups)
print(f"🔍 Searching with security filter:")
print(f" Query: {query}")
print(f" User groups: {user_groups}")
print(f" Filter: {group_filter}")
# Execute search with security filter
results = self.search_client.search(
search_text=query,
filter=group_filter,
select=["id", "content", "title", "allowed_groups", "metadata"],
top=top_k
)
# Convert to list
documents = []
for result in results:
doc = {
"id": result.get("id"),
"title": result.get("title"),
"content": result.get("content"),
"allowed_groups": result.get("allowed_groups"),
"score": result.get("@search.score")
}
documents.append(doc)
return documents
def _build_group_filter(self, user_groups: List[str]) -> str:
"""
Build OData filter for group-based security.
Example output:
"allowed_groups/any(g: g in ('finance', 'legal'))"
"""
if not user_groups:
# No groups = no access
return "allowed_groups/any(g: g eq 'none')"
# Escape single quotes in group names
escaped_groups = [g.replace("'", "''") for g in user_groups]
# Build comma-separated list
groups_list = "', '".join(escaped_groups)
# OData filter syntax
return f"allowed_groups/any(g: g in ('{groups_list}'))"
def simulate_document_index():
"""Simulate documents with different security levels"""
return [
{
"id": "doc1",
"title": "Q4 Financial Report",
"content": "Revenue increased by 23%...",
"allowed_groups": ["finance", "executive"]
},
{
"id": "doc2",
"title": "Legal Compliance Update",
"content": "New regulations require...",
"allowed_groups": ["legal", "compliance", "executive"]
},
{
"id": "doc3",
"title": "Engineering Design Doc",
"content": "System architecture uses...",
"allowed_groups": ["engineering", "product"]
},
{
"id": "doc4",
"title": "Public Product Announcement",
"content": "We're excited to announce...",
"allowed_groups": ["all-employees"]
},
{
"id": "doc5",
"title": "M&A Confidential",
"content": "Acquisition target analysis...",
"allowed_groups": ["executive", "corporate-dev"]
}
]
def main():
"""Example usage with different user personas"""
# Simulate retriever (in production, connect to real Azure AI Search)
print("\n" + "="*60)
print("🔐 ROW-LEVEL SECURITY IN RAG")
print("="*60)
# Document corpus
documents = simulate_document_index()
print(f"\n📚 Document Corpus: {len(documents)} documents")
for doc in documents:
print(f" - {doc['title']}: {doc['allowed_groups']}")
# Test different user personas
personas = [
{
"name": "Alice (Finance Analyst)",
"groups": ["finance", "all-employees"],
"query": "financial report"
},
{
"name": "Bob (Legal Counsel)",
"groups": ["legal", "compliance", "all-employees"],
"query": "compliance"
},
{
"name": "Carol (CEO)",
"groups": ["executive", "finance", "legal", "all-employees"],
"query": "confidential"
},
{
"name": "Dave (Contractor)",
"groups": ["all-employees"],
"query": "announcement"
}
]
for persona in personas:
print("\n" + "-"*60)
print(f"👤 User: {persona['name']}")
print(f" Groups: {persona['groups']}")
print(f" Query: '{persona['query']}'")
# Filter documents based on user's groups
authorized_docs = [
doc for doc in documents
if any(group in doc['allowed_groups'] for group in persona['groups'])
]
print(f"\n✓ Authorized Documents: {len(authorized_docs)}")
for doc in authorized_docs:
print(f" - {doc['title']}")
# Show what they CANNOT see
unauthorized_docs = [
doc for doc in documents
if doc not in authorized_docs
]
if unauthorized_docs:
print(f"\n❌ Hidden Documents: {len(unauthorized_docs)}")
for doc in unauthorized_docs:
print(f" - {doc['title']} (requires: {doc['allowed_groups']})")
# Security analysis
print("\n" + "="*60)
print("🛡️ SECURITY ANALYSIS")
print("="*60)
print("\n✓ PROTECTION MECHANISMS:")
print(" 1. Filter at search time (not post-processing)")
print(" 2. Azure AI Search enforces security")
print(" 3. User never sees unauthorized document IDs")
print(" 4. Supports Azure AD group integration")
print("\n✓ PRODUCTION CONSIDERATIONS:")
print(" 1. Index documents with 'allowed_groups' metadata")
print(" 2. Populate groups from Azure AD at index time")
print(" 3. Get user's groups from Azure AD token at query time")
print(" 4. Apply filter on every search (never skip)")
print("\n✓ ALTERNATIVE APPROACHES:")
print(" 1. Pinecone Namespaces: Separate namespace per user/team")
print(" 2. Weaviate Multi-Tenancy: Built-in tenant isolation")
print(" 3. Document-Level Encryption: Encrypt with user keys")
print("\n⚠️ ANTI-PATTERNS (DO NOT DO):")
print(" 1. ❌ Post-filtering (retrieves unauthorized docs first)")
print(" 2. ❌ Client-side filtering (user can bypass)")
print(" 3. ❌ Single index without metadata (no security)")
print(" 4. ❌ Trusting user-provided group claims (verify server-side)")
if __name__ == "__main__":
main()