Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
internet-search:
max_results_per_provider: 5
summary-length: 150
api-key:
exa: "https://dashboard.exa.ai/home -- 登录获取api key"
tavily: "https://app.tavily.com/home -- 登录获取api key"
Expand Down
268 changes: 179 additions & 89 deletions app-builder/plugins/fit_py_internet_search/src/internet_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# ======================================================================================================================
import json
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence
from linkup import LinkupClient
Expand Down Expand Up @@ -69,12 +71,158 @@ def _get_max_results_per_provider() -> int:
pass


@value('internet-search.summary-length')
def _get_max_summary_length() -> int:
pass


def _truncate(text: str, max_chars: int) -> str:
if len(text) <= max_chars:
return text
return text[: max_chars - 1].rstrip() + "…"


def _extract_summary(text: str, max_sentences: int = 4) -> str:
"""
从文本中提取前几句话作为摘要

Args:
text: 原始文本
max_sentences: 最多保留的句子数,默认为4句

Returns:
摘要文本
"""
if not text:
return ""

# 使用正则表达式匹配句子结束符号
sentences = re.split(r'([。!?\.!?]+["\'»\)]?\s*)', text)

# 重新组合句子(将分隔符和句子内容合并)
combined_sentences = []
for i in range(0, len(sentences) - 1, 2):
sentence = sentences[i]
separator = sentences[i + 1] if i + 1 < len(sentences) else ""
combined = (sentence + separator).strip()
if combined:
combined_sentences.append(combined)

# 如果最后一个元素没有分隔符
if len(sentences) % 2 == 1 and sentences[-1].strip():
combined_sentences.append(sentences[-1].strip())

# 取前 max_sentences 句
if len(combined_sentences) <= max_sentences:
summary = " ".join(combined_sentences)
else:
summary = " ".join(combined_sentences[:max_sentences])

# 确保摘要不会过长(最多150字符)
if len(summary) > _get_max_summary_length():
summary = summary[:(_get_max_summary_length() - 3)].rstrip() + "..."

return summary


def _search_exa(query: str, api_key: str, max_results: int, max_snippet_chars: int) -> List[SearchItem]:
"""在 Exa 中搜索"""
items: List[SearchItem] = []
try:
exa_client = Exa(api_key=api_key)
res = exa_client.search_and_contents(
query,
text={"max_characters": 2000},
livecrawl="always",
num_results=max_results,
)
for i, r in enumerate(getattr(res, "results", [])[:max_results]):
text = _truncate(getattr(r, "text", "") or getattr(r, "content", "") or "", max_snippet_chars)
summary = _extract_summary(text) # 提取3-4句话作为摘要
items.append(
SearchItem(
id=getattr(r, "id", "") or f"exa_{i}",
text=summary,
score=12.0,
metadata={
"fileName": getattr(r, "title", "") or "",
"url": getattr(r, "url", "") or "",
"source": "exa",
"published_date": getattr(r, "published_date", None),
"summary": summary,
}
)
)
except Exception as e:
sys_plugin_logger.warning(f'Failed to search in Exa tool: {str(e)}')
return items


def _search_tavily(query: str, api_key: str, max_results: int, max_snippet_chars: int) -> List[SearchItem]:
"""在 Tavily 中搜索"""
items: List[SearchItem] = []
try:
tavily_client = TavilyClient(api_key=api_key)
res = tavily_client.search(
query=query,
max_results=max_results,
include_images=False,
)
for i, r in enumerate(res.get("results", [])[:max_results]):
text = _truncate(r.get("content", "") or "", max_snippet_chars)
summary = _extract_summary(text) # 提取3-4句话作为摘要
items.append(
SearchItem(
id=r.get("id", "") or f"tavily_{i}",
text=summary,
score=12.0,
metadata={
"fileName": r.get("title", "") or "",
"url": r.get("url", "") or "",
"source": "tavily",
"published_date": r.get("published_date"),
"summary": summary,
}
)
)
except Exception as e:
sys_plugin_logger.warning(f'Failed to search in Tavily tool: {str(e)}')
return items


def _search_linkup(query: str, api_key: str, max_results: int, max_snippet_chars: int) -> List[SearchItem]:
"""在 Linkup 中搜索"""
items: List[SearchItem] = []
try:
linkup_client = LinkupClient(api_key=api_key)
resp = linkup_client.search(
query=query,
depth="standard",
output_type="searchResults",
include_images=False,
)
for i, r in enumerate(getattr(resp, "results", [])[:max_results]):
text = _truncate(getattr(r, "content", "") or getattr(r, "text", "") or "", max_snippet_chars)
summary = _extract_summary(text) # 提取3-4句话作为摘要
items.append(
SearchItem(
id=getattr(r, "id", "") or f"linkup_{i}",
text=summary,
score=12.0,
metadata={
"fileName": getattr(r, "name", None) or getattr(r, "title", "") or "",
"url": getattr(r, "url", "") or "",
"source": "linkup",
"published_date": None,
"summary": summary,
}
)
)
except Exception as e:
sys_plugin_logger.warning(f'Failed to search in Linkup tool: {str(e)}')
return items


def _internet_search(
query: str,
api_keys: Dict[str, str],
Expand All @@ -88,102 +236,44 @@ def _internet_search(
for name in ("exa", "tavily", "linkup"):
if api_keys.get(name):
selected.append(name)
items: List[SearchItem] = []
errors = [] # 记录失败的搜索工具

# Exa
# 准备并行搜索任务
search_tasks = []
if "exa" in selected and api_keys.get("exa"):
try:
exa_client = Exa(api_key=api_keys["exa"])
res = exa_client.search_and_contents(
query,
text={"max_characters": 2000},
livecrawl="always",
num_results=max_results_per_provider,
)
for i, r in enumerate(getattr(res, "results", [])[:max_results_per_provider]):
text = _truncate(getattr(r, "text", "") or getattr(r, "content", "") or "", max_snippet_chars)
items.append(
SearchItem(
id=getattr(r, "id", "") or f"exa_{i}",
text=text,
score=12.0, # 使用float确保序列化
metadata={
"fileName": getattr(r, "title", "") or "",
"url": getattr(r, "url", "") or "",
"source": "exa",
"published_date": getattr(r, "published_date", None),
"summary": text,
}
)
)
except Exception as e:
sys_plugin_logger.warning(f'Failed to search in Exa tool: {str(e)}')
errors.append("exa")

# Tavily
search_tasks.append(("exa", _search_exa, api_keys["exa"]))
if "tavily" in selected and api_keys.get("tavily"):
try:
tavily_client = TavilyClient(api_key=api_keys["tavily"])
res = tavily_client.search(
query=query,
max_results=max_results_per_provider,
include_images=False,
)
for i, r in enumerate(res.get("results", [])[:max_results_per_provider]):
text = _truncate(r.get("content", "") or "", max_snippet_chars)
items.append(
SearchItem(
id=r.get("id", "") or f"tavily_{i}",
text=text,
score=12.0,
metadata={
"fileName": r.get("title", "") or "",
"url": r.get("url", "") or "",
"source": "tavily",
"published_date": r.get("published_date"),
"summary": text,
}
)
)
except Exception as e:
sys_plugin_logger.warning(f'Failed to search in Tavily tool: {str(e)}')
errors.append("tavily")

# Linkup
search_tasks.append(("tavily", _search_tavily, api_keys["tavily"]))
if "linkup" in selected and api_keys.get("linkup"):
try:
linkup_client = LinkupClient(api_key=api_keys["linkup"])
resp = linkup_client.search(
query=query,
depth="standard",
output_type="searchResults",
include_images=False,
)
for i, r in enumerate(getattr(resp, "results", [])[:max_results_per_provider]):
text = _truncate(getattr(r, "content", "") or getattr(r, "text", "") or "", max_snippet_chars)
items.append(
SearchItem(
id=getattr(r, "id", "") or f"linkup_{i}",
text=text,
score=12.0,
metadata={
"fileName": getattr(r, "name", None) or getattr(r, "title", "") or "",
"url": getattr(r, "url", "") or "",
"source": "linkup",
"published_date": None,
"summary": text,
}
)
)
except Exception as e:
sys_plugin_logger.warning(f'Failed to search in Linkup tool: {str(e)}')
errors.append("linkup")

search_tasks.append(("linkup", _search_linkup, api_keys["linkup"]))

# 使用线程池并行执行搜索
items: List[SearchItem] = []
errors = []

with ThreadPoolExecutor(max_workers=len(search_tasks)) as executor:
# 提交所有搜索任务
future_to_provider = {
executor.submit(task_func, query, api_key, max_results_per_provider, max_snippet_chars): provider_name
for provider_name, task_func, api_key in search_tasks
}

# 收集结果
for future in as_completed(future_to_provider):
provider_name = future_to_provider[future]
try:
results = future.result()
if results:
items.extend(results)
else:
errors.append(provider_name)
except Exception as e:
sys_plugin_logger.error(f'Unexpected error in {provider_name} search: {str(e)}')
errors.append(provider_name)

# 如果所有搜索都失败了,才抛出异常
if not items and errors:
raise FitException(
InternalErrorCode.CLIENT_ERROR,
InternalErrorCode.CLIENT_ERROR,
f'All search tools failed: {", ".join(errors)}'
)

Expand Down