-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathproxy_controller.py
More file actions
241 lines (207 loc) · 9.48 KB
/
Copy pathproxy_controller.py
File metadata and controls
241 lines (207 loc) · 9.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
from fastapi import APIRouter, Depends, HTTPException
from exa_py import Exa
from app.component.auth import key_must
from app.component.environment import env_not_empty
from app.model.mcp.proxy import ExaSearch
from typing import Any, cast
import requests
from utils import traceroot_wrapper as traceroot
logger = traceroot.get_logger("server_proxy_controller")
from app.model.user.key import Key
router = APIRouter(prefix="/proxy", tags=["Mcp Servers"])
@router.post("/exa")
@traceroot.trace()
def exa_search(search: ExaSearch, key: Key = Depends(key_must)):
"""Search using Exa API."""
EXA_API_KEY = env_not_empty("EXA_API_KEY")
secrets_to_redact = (EXA_API_KEY,)
def _redact_secret(text: str) -> str:
redacted = text
for secret in secrets_to_redact:
if secret:
redacted = redacted.replace(secret, "[REDACTED]")
return redacted
try:
# Validate input parameters
if search.num_results is not None and not 0 < search.num_results <= 100:
logger.warning("Invalid exa search parameter", extra={"param": "num_results", "value": search.num_results})
raise ValueError("num_results must be between 1 and 100")
if search.include_text is not None and len(search.include_text) > 0:
if len(search.include_text) > 1:
logger.warning("Invalid exa search parameter", extra={"param": "include_text", "reason": "more than 1 string"})
raise ValueError("include_text can only contain 1 string")
if len(search.include_text[0].split()) > 5:
logger.warning("Invalid exa search parameter", extra={"param": "include_text", "reason": "exceeds 5 words"})
raise ValueError("include_text string cannot be longer than 5 words")
if search.exclude_text is not None and len(search.exclude_text) > 0:
if len(search.exclude_text) > 1:
logger.warning("Invalid exa search parameter", extra={"param": "exclude_text", "reason": "more than 1 string"})
raise ValueError("exclude_text can only contain 1 string")
if len(search.exclude_text[0].split()) > 5:
logger.warning("Invalid exa search parameter", extra={"param": "exclude_text", "reason": "exceeds 5 words"})
raise ValueError("exclude_text string cannot be longer than 5 words")
exa = Exa(EXA_API_KEY)
# Call Exa API with direct parameters
if search.text:
results = cast(
dict[str, Any],
exa.search_and_contents(
query=search.query,
type=search.search_type,
category=search.category,
num_results=search.num_results,
include_text=search.include_text,
exclude_text=search.exclude_text,
use_autoprompt=search.use_autoprompt,
text=True,
),
)
else:
results = cast(
dict[str, Any],
exa.search(
query=search.query,
type=search.search_type,
category=search.category,
num_results=search.num_results,
include_text=search.include_text,
exclude_text=search.exclude_text,
use_autoprompt=search.use_autoprompt,
),
)
result_count = len(results.get("results", [])) if "results" in results else 0
logger.info("Exa search completed", extra={"query": search.query, "search_type": search.search_type, "result_count": result_count})
return results
except ValueError as e:
logger.warning("Exa search validation error", extra={"error": str(e)})
raise HTTPException(status_code=500, detail="Internal server error")
except Exception as e:
logger.error(
"Exa search failed",
extra={"query": search.query, "error_type": type(e).__name__, "error": _redact_secret(str(e))},
exc_info=False,
)
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/google")
@traceroot.trace()
def google_search(query: str, search_type: str = "web", key: Key = Depends(key_must)):
"""Search using Google Custom Search API."""
# https://developers.google.com/custom-search/v1/overview
GOOGLE_API_KEY = env_not_empty("GOOGLE_API_KEY")
# https://cse.google.com/cse/all
SEARCH_ENGINE_ID = env_not_empty("SEARCH_ENGINE_ID")
secrets_to_redact = (GOOGLE_API_KEY, SEARCH_ENGINE_ID)
def _redact_secret(text: str) -> str:
redacted = text
for secret in secrets_to_redact:
if secret and isinstance(redacted, str):
redacted = redacted.replace(secret, "[REDACTED]")
return redacted
def _redact_obj(obj):
"""Recursively redact secrets from all string fields in a dict/list structure."""
if isinstance(obj, dict):
return {k: _redact_obj(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [_redact_obj(item) for item in obj]
elif isinstance(obj, str):
return _redact_secret(obj)
else:
return obj
# Using the first page
start_page_idx = 1
# Different language may get different result
search_language = "en"
# How many pages to return
num_result_pages = 10
# Constructing the URL
# Doc: https://developers.google.com/custom-search/v1/using_rest
base_url = (
f"https://www.googleapis.com/customsearch/v1?"
f"key={GOOGLE_API_KEY}&cx={SEARCH_ENGINE_ID}&q={query}&start="
f"{start_page_idx}&lr={search_language}&num={num_result_pages}"
)
if search_type == "image":
url = base_url + "&searchType=image"
else:
url = base_url
responses = []
try:
# Make the GET request
result = requests.get(url)
data = result.json()
# Get the result items
if "items" in data:
search_items = data.get("items")
# Iterate over results found
for i, search_item in enumerate(search_items, start=1):
if search_type == "image":
# Process image search results
title = search_item.get("title")
image_url = search_item.get("link")
display_link = search_item.get("displayLink")
# Get context URL (page containing the image)
image_info = search_item.get("image", {})
context_url = image_info.get("contextLink", "")
# Get image dimensions if available
width = image_info.get("width")
height = image_info.get("height")
response = {
"result_id": i,
"title": title,
"image_url": image_url,
"display_link": display_link,
"context_url": context_url,
}
# Add dimensions if available
if width:
response["width"] = int(width)
if height:
response["height"] = int(height)
responses.append(response)
else:
# Process web search results
# Check metatags are present
if "pagemap" not in search_item:
continue
if "metatags" not in search_item["pagemap"]:
continue
if "og:description" in search_item["pagemap"]["metatags"][0]:
long_description = search_item["pagemap"]["metatags"][0]["og:description"]
else:
long_description = "N/A"
# Get the page title
title = search_item.get("title")
# Page snippet
snippet = search_item.get("snippet")
# Extract the page url
link = search_item.get("link")
response = {
"result_id": i,
"title": title,
"description": snippet,
"long_description": long_description,
"url": link,
}
responses.append(response)
logger.info("Google search completed", extra={"query": _redact_secret(query), "search_type": _redact_secret(search_type), "result_count": len(responses)})
else:
error_info = data.get("error", {})
sanitized_error = _redact_obj(error_info)
logger.error(
"Google search API error",
extra={"query": _redact_secret(query), "search_type": _redact_secret(search_type)},
)
raise HTTPException(status_code=500, detail="Internal server error")
except Exception as e:
logger.error(
"Google search failed",
extra={
"query": _redact_secret(query),
"search_type": _redact_secret(search_type),
"error_type": type(e).__name__,
"error": _redact_secret(str(e)),
},
exc_info=False,
)
raise HTTPException(status_code=500, detail="Internal server error")
return responses