Skip to content

Commit 0b4cb8b

Browse files
committed
optimize cleanup function.
1 parent b68b57a commit 0b4cb8b

12 files changed

Lines changed: 206 additions & 55 deletions

File tree

docs/docs_en/api/mas.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ oxygent/mas.py
4343
| `init_all_oxy()` | Yes | `None` | Initialize all registered Oxy objects |
4444
| `batch_init_oxy()` | Yes | `None` | Batch initialize oxy objects of specified types |
4545
| `create_vearch_table()` | Yes | `None` | Create Vearch tables for tools |
46-
| `cleanup_servers()` | Yes | `None` | Gracefully shut down remote servers/clients |
46+
| `cleanup_all()` | Yes | `None` | Gracefully release resources held by all registered Oxy components |
4747
| `add_oxy()` | No | `None` | Register a single Oxy object |
4848
| `add_oxy_list()` | No | `None` | Register a list of Oxy objects |
4949
| `call()` | Yes | `Any` | Invoke an Oxy component directly and return its output |

docs/docs_zh/api/mas.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ oxygent/mas.py
4343
| `init_all_oxy()` || `None` | 初始化所有已注册的 Oxy 对象 |
4444
| `batch_init_oxy()` || `None` | 批量初始化指定类型的 Oxy 对象 |
4545
| `create_vearch_table()` || `None` | 为工具创建 Vearch 表 |
46-
| `cleanup_servers()` || `None` | 优雅地关闭远程服务器/客户端 |
46+
| `cleanup_all()` || `None` | 优雅地释放所有已注册 Oxy 组件持有的资源 |
4747
| `add_oxy()` || `None` | 注册单个 Oxy 对象 |
4848
| `add_oxy_list()` || `None` | 注册一组 Oxy 对象 |
4949
| `call()` || `Any` | 直接调用一个 Oxy 组件并返回其输出 |

oxygent/databases/db_es/jes_es.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ async def delete(self, index_name: str, doc_id: str) -> dict[str, Any]:
124124

125125
async def close(self) -> None:
126126
"""Close the Elasticsearch connection."""
127-
return await self._run_sync(self.client.close)
127+
if self.client is not None:
128+
return await self._run_sync(self.client.close)
128129

129130

130131
async def main() -> None:

oxygent/embedding_cache.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,16 @@ def load(self) -> dict[str, Any]:
123123
"""Load the on-disk cache if it exists; otherwise, return an empty dict."""
124124
if not os.path.exists(self.file):
125125
return dict()
126-
with open(self.file, "rb") as f:
127-
return pickle.load(f)
126+
try:
127+
with open(self.file, "rb") as f:
128+
return pickle.load(f)
129+
except Exception as e:
130+
logger.warning(
131+
f"Failed to load embedding cache from {self.file}, "
132+
f"starting with empty cache: {e}",
133+
exc_info=True,
134+
)
135+
return dict()
128136

129137
# TODO: save embeddings
130138
def save(self) -> None:

oxygent/live_prompt/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
"""
55

66
# Core ES-based prompt management
7-
from .manager import PromptManager, get_dynamic_prompt, get_prompt_manager
7+
from .manager import (
8+
PromptManager,
9+
close_prompt_manager,
10+
get_dynamic_prompt,
11+
get_prompt_manager,
12+
)
813

914
# Prompt optimization
1015
from .optimizer import PromptOptimizer, get_prompt_optimizer
@@ -30,6 +35,7 @@
3035
# Core ES-based prompt management
3136
"get_prompt_manager",
3237
"get_dynamic_prompt",
38+
"close_prompt_manager",
3339
"PromptManager",
3440
# Hot-reload functionality
3541
"setup_dynamic_agents",

oxygent/live_prompt/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ async def close_prompt_manager() -> None:
740740
logger.info("Version sync stopped during shutdown")
741741

742742
await prompt_manager.close()
743-
logger.info("Prompt manager closed successfully")
743+
logger.debug("Prompt manager closed successfully")
744744
except Exception as e:
745745
logger.error(f"Error closing prompt manager: {e}", exc_info=True)
746746
finally:

oxygent/mas.py

Lines changed: 92 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
from .oxy.base_flow import BaseFlow
4242
from .oxy.base_tool import BaseTool
4343
from .oxy.llms.base_llm import BaseLLM
44-
from .oxy.mcp_tools.base_mcp_client import BaseMCPClient
4544
from .routes import router
4645
from .schemas import OxyRequest, OxyResponse, SSEMessage, WebResponse
4746
from .utils.common_utils import (
@@ -213,17 +212,41 @@ async def __aenter__(self) -> "MAS":
213212
return self
214213

215214
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
215+
async def _safe(coro, label: str) -> None:
216+
try:
217+
await coro
218+
logger.info(f"✓ {label} shutdown completed.")
219+
except Exception as e:
220+
logger.warning(f"Error during shutdown ({label}): {e}", exc_info=True)
221+
222+
logger.info("=" * 64)
223+
logger.info("🔒 OxyGent MAS Application Shutdown Initiated")
224+
logger.info("=" * 64)
225+
226+
# 1. Wait for background tasks
216227
all_tasks = [t for tasks in self.background_tasks.values() for t in tasks]
217228
if all_tasks:
218-
await asyncio.gather(*all_tasks)
229+
await _safe(
230+
asyncio.gather(*all_tasks, return_exceptions=True),
231+
"background tasks",
232+
)
233+
234+
# 2. Tear down resources — each step runs regardless of prior failures
235+
from .live_prompt import close_prompt_manager
236+
237+
await _safe(close_prompt_manager(), "prompt manager")
238+
239+
if self.es_client:
240+
await _safe(self.es_client.close(), "ES client")
241+
242+
if self.redis_client:
243+
await _safe(self.redis_client.close(), "Redis client")
244+
245+
await _safe(self.cleanup_all(), "oxy cleanup")
246+
219247
logger.info("=" * 64)
220248
logger.info("🪂 OxyGent MAS Application Exit")
221249
logger.info("=" * 64)
222-
if self.es_client:
223-
await self.es_client.close()
224-
if self.redis_client:
225-
await self.redis_client.close()
226-
await self.cleanup_servers()
227250

228251
def add_background_task(self, trace_id: str, task: asyncio.Task) -> None:
229252
"""Register a background task under the given trace_id."""
@@ -341,24 +364,21 @@ async def init(self) -> None:
341364
logger.warning(f"Failed to setup dynamic agents: {e}", exc_info=True)
342365
logger.info("=" * 64)
343366

344-
async def cleanup_servers(self) -> None:
345-
"""Gracefully shut down remote servers/clients.
367+
async def cleanup_all(self) -> None:
368+
"""Gracefully release resources held by all registered Oxy components.
346369
347-
The method concurrently calls ``cleanup()`` on every
348-
:class:`BaseMCPClient` that has been registered. It is automatically
349-
invoked by :func:`__aexit__`.
370+
Calls ``cleanup()`` on every registered :class:`Oxy` instance.
371+
Each cleanup is invoked individually so that a failure in one
372+
component does not prevent others from releasing their resources.
350373
"""
351-
cleanup_tasks = []
352-
for oxy in self.oxy_name_to_oxy.values():
353-
if not isinstance(oxy, BaseMCPClient):
354-
continue
355-
cleanup_tasks.append(asyncio.create_task(oxy.cleanup()))
356-
357-
if cleanup_tasks:
374+
for oxy_name, oxy in self.oxy_name_to_oxy.items():
358375
try:
359-
await asyncio.gather(*cleanup_tasks, return_exceptions=False)
376+
await oxy.cleanup()
360377
except Exception as e:
361-
logger.warning(f"Warning during final cleanup: {e}", exc_info=True)
378+
logger.warning(
379+
f"Error during cleanup of oxy '{oxy_name}': {e}",
380+
exc_info=True,
381+
)
362382

363383
async def init_db(self) -> None:
364384
"""Es --- (table_name: key)
@@ -635,21 +655,59 @@ async def batch_init_oxy(self, *class_type: type) -> None:
635655
class_types: List of class types to initialize (e.g., BaseLLM, BaseTool, BaseAgent).
636656
637657
NOTE:
638-
Initialize all oxy objects of the specified class types,
658+
Initialize all oxy objects of the specified class types.
659+
If any init fails, already-initialized oxy objects of the same
660+
batch are cleaned up before the exception propagates.
639661
"""
640-
tasks = []
641-
for oxy_name in list(self.oxy_name_to_oxy.keys()):
642-
oxy = self.oxy_name_to_oxy[oxy_name]
643-
if not isinstance(oxy, class_type):
644-
continue
662+
targets = [
663+
(name, oxy)
664+
for name, oxy in self.oxy_name_to_oxy.items()
665+
if isinstance(oxy, class_type)
666+
]
667+
for _name, oxy in targets:
645668
oxy.set_mas(self)
646-
task = oxy.init()
647-
if Config.get_tool_is_concurrent_init():
648-
tasks.append(task)
649-
else:
650-
await task
651-
if tasks:
652-
await asyncio.gather(*tasks)
669+
670+
initialized_names: list[str] = []
671+
if Config.get_tool_is_concurrent_init():
672+
results = await asyncio.gather(
673+
*(oxy.init() for _name, oxy in targets),
674+
return_exceptions=True,
675+
)
676+
first_error = None
677+
for (name, _oxy), result in zip(targets, results):
678+
if isinstance(result, Exception):
679+
logger.error(
680+
f"Failed to initialize oxy '{name}': {result}", exc_info=result
681+
)
682+
if first_error is None:
683+
first_error = result
684+
else:
685+
initialized_names.append(name)
686+
if first_error is not None:
687+
for name in initialized_names:
688+
try:
689+
await self.oxy_name_to_oxy[name].cleanup()
690+
except Exception as ce:
691+
logger.warning(
692+
f"Error during rollback cleanup of oxy '{name}': {ce}",
693+
exc_info=True,
694+
)
695+
raise first_error
696+
else:
697+
try:
698+
for name, oxy in targets:
699+
await oxy.init()
700+
initialized_names.append(name)
701+
except Exception:
702+
for name in initialized_names:
703+
try:
704+
await self.oxy_name_to_oxy[name].cleanup()
705+
except Exception as ce:
706+
logger.warning(
707+
f"Error during rollback cleanup of oxy '{name}': {ce}",
708+
exc_info=True,
709+
)
710+
raise
653711

654712
async def init_all_oxy(self) -> None:
655713
"""Initializing all tools and agents assign values of agent.tools to each

oxygent/oxy/agents/a2a_client_agent.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,21 @@ async def init(self) -> None:
239239
extra={"agent": self.name, "card_url": getattr(self._card, "url", "")},
240240
)
241241

242+
async def cleanup(self) -> None:
243+
"""Close the HTTP client and release connection pool resources."""
244+
if self._http_client is not None:
245+
try:
246+
await self._http_client.aclose()
247+
except Exception as e:
248+
logger.warning(
249+
f"Error closing HTTP client for A2AClientAgent '{self.name}': {e}",
250+
exc_info=True,
251+
)
252+
finally:
253+
self._http_client = None
254+
self._client = None
255+
self._card = None
256+
242257
@staticmethod
243258
def _task_state(task: Task | None) -> str:
244259
if not task or not task.status:

oxygent/oxy/agents/shell_use_agent.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,46 @@ async def init(self) -> None:
3232

3333
client = paramiko.SSHClient()
3434
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
35-
client.connect(**self.auth_info)
35+
try:
36+
client.connect(**self.auth_info)
37+
except Exception as e:
38+
logger.error(
39+
f"Failed to establish SSH connection for '{self.name}': {e}",
40+
exc_info=True,
41+
)
42+
client.close()
43+
raise
44+
3645
ssh_channel = client.invoke_shell()
3746

3847
await asyncio.sleep(1)
3948
if ssh_channel.recv_ready():
4049
output = ssh_channel.recv(4096).decode()
4150
self.mas.global_data["hello_terminal"] = clean_ansi_codes(output)
51+
self.mas.global_data["ssh_client"] = client
4252
self.mas.global_data["ssh_channel"] = ssh_channel
4353

54+
async def cleanup(self) -> None:
55+
"""Close the SSH channel and client connection."""
56+
channel = self.mas.global_data.pop("ssh_channel", None) if self.mas else None
57+
client = self.mas.global_data.pop("ssh_client", None) if self.mas else None
58+
try:
59+
if channel is not None:
60+
channel.close()
61+
except Exception as e:
62+
logger.warning(
63+
f"Error closing SSH channel for '{self.name}': {e}",
64+
exc_info=True,
65+
)
66+
try:
67+
if client is not None:
68+
client.close()
69+
except Exception as e:
70+
logger.warning(
71+
f"Error closing SSH client for '{self.name}': {e}",
72+
exc_info=True,
73+
)
74+
4475
def _parse_llm_response(
4576
self, ori_response: str, oxy_request: OxyRequest = None
4677
) -> LLMResponse:

oxygent/oxy/agents/sse_oxy_agent.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,30 @@ class SSEOxyGent(RemoteAgent):
2424
async def init(self) -> None:
2525
"""Initialize the SSE remote agent connection."""
2626
await super().init()
27-
28-
async with httpx.AsyncClient() as client:
29-
response = await client.get(build_url(self.server_url, "/get_organization"))
30-
self.org = response.json()["data"]["organization"]
31-
32-
if self.desc == "":
27+
try:
28+
async with httpx.AsyncClient() as client:
3329
response = await client.get(
34-
build_url(self.server_url, "/get_description")
30+
build_url(self.server_url, "/get_organization")
3531
)
36-
if response.json().get("code") == 200:
37-
self.desc = response.json()["data"]["description"]
38-
self._set_desc_for_llm()
32+
self.org = response.json()["data"]["organization"]
33+
34+
if self.desc == "":
35+
response = await client.get(
36+
build_url(self.server_url, "/get_description")
37+
)
38+
if response.json().get("code") == 200:
39+
self.desc = response.json()["data"]["description"]
40+
self._set_desc_for_llm()
41+
except Exception as e:
42+
logger.error(
43+
f"Failed to initialize SSEOxyGent '{self.name}' "
44+
f"(url={self.server_url}): {e}",
45+
exc_info=True,
46+
)
47+
raise RuntimeError(
48+
f"Failed to initialize SSEOxyGent '{self.name}' "
49+
f"at {self.server_url}: {e}"
50+
) from e
3951

4052
async def _execute(self, oxy_request: OxyRequest) -> OxyResponse:
4153
"""Forward the request to the remote SSE endpoint and stream the response."""

0 commit comments

Comments
 (0)