Skip to content

Commit 6e45183

Browse files
committed
[fit] 代码重构
1 parent b826482 commit 6e45183

8 files changed

Lines changed: 894 additions & 586 deletions

File tree

framework/fit/python/conf/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ https:
4242
key_file_password_encrypted: false # 私钥的密码是否被加密,仅当 cert_enabled 和 key_file_encrypted 为 true 时有意义
4343
registry-center:
4444
server:
45-
mode: 'DIRECT'
45+
mode: 'DIRECT' # DIRECT 表示直连,直接连接内存注册中心;PROXY 表示代理模式,通过本地代理服务连接 Nacos 注册中心
4646
addresses:
4747
- "localhost:8848"
4848
protocol: 2
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# -- encoding: utf-8 --
2+
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
3+
# This file is a part of the ModelEngine Project.
4+
# Licensed under the MIT License. See License.txt in the project root for license information.
5+
# ======================================================================================================================
6+
"""
7+
Nacos registry service package.
8+
"""
9+
10+
from .nacos_registry_server import (
11+
register_fitables,
12+
unregister_fitables,
13+
query_fitable_addresses,
14+
subscribe_fit_service,
15+
unsubscribe_fitables,
16+
query_fitable_metas
17+
)
18+
19+
__all__ = [
20+
'register_fitables',
21+
'unregister_fitables',
22+
'query_fitable_addresses',
23+
'subscribe_fit_service',
24+
'unsubscribe_fitables',
25+
'query_fitable_metas'
26+
]
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
# -- encoding: utf-8 --
2+
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
3+
# This file is a part of the ModelEngine Project.
4+
# Licensed under the MIT License. See License.txt in the project root for license information.
5+
# ======================================================================================================================
6+
"""
7+
Async executor for Nacos operations.
8+
9+
This module provides an async executor for handling Nacos operations
10+
in a background thread with proper event loop management.
11+
"""
12+
import asyncio
13+
import atexit
14+
import threading
15+
from concurrent.futures import Future
16+
17+
from v2.nacos import NacosNamingService, RegisterInstanceParam, ListInstanceParam, \
18+
DeregisterInstanceParam, SubscribeServiceParam, ListServiceParam
19+
20+
from fitframework.api.logging import plugin_logger
21+
from .config import build_nacos_config
22+
23+
24+
class AsyncExecutor:
25+
"""Executor for handling asynchronous operations in a background thread."""
26+
27+
def __init__(self):
28+
self._loop = None
29+
self._thread = None
30+
self._started = False
31+
self._shutdown = False
32+
self._nacos_client = None
33+
self._init_complete = threading.Event()
34+
35+
def start(self):
36+
"""Start the background event loop thread."""
37+
if self._started:
38+
return
39+
40+
self._thread = threading.Thread(
41+
target=self._run_event_loop,
42+
daemon=True,
43+
name="NacosAsyncThread"
44+
)
45+
self._thread.start()
46+
47+
# Wait for initialization to complete
48+
if not self._init_complete.wait(timeout=10): # Max wait 10 seconds
49+
raise RuntimeError("Failed to initialize async executor within timeout")
50+
51+
self._started = True
52+
53+
def _run_event_loop(self):
54+
"""Run the event loop in the background thread."""
55+
try:
56+
self._loop = asyncio.new_event_loop()
57+
asyncio.set_event_loop(self._loop)
58+
59+
# Create Nacos client in this event loop
60+
async def init_nacos_client():
61+
try:
62+
config = build_nacos_config()
63+
self._nacos_client = await NacosNamingService.create_naming_service(config)
64+
plugin_logger.info("Nacos client initialized successfully")
65+
except Exception as e:
66+
plugin_logger.error(f"Failed to initialize Nacos client: {e}")
67+
raise
68+
finally:
69+
# Mark initialization complete
70+
self._init_complete.set()
71+
72+
self._loop.run_until_complete(init_nacos_client())
73+
74+
# Run event loop until shutdown
75+
self._loop.run_forever()
76+
except Exception as e:
77+
plugin_logger.error(f"Error in async executor event loop: {e}")
78+
self._init_complete.set() # Set even on failure to avoid infinite wait
79+
finally:
80+
try:
81+
if self._nacos_client:
82+
# Cleanup Nacos client if needed
83+
pass
84+
if self._loop:
85+
self._loop.close()
86+
except Exception as e:
87+
plugin_logger.error(f"Error during cleanup: {e}")
88+
89+
def run_coroutine(self, coro):
90+
"""
91+
Run a coroutine in the background event loop and return the result.
92+
93+
Args:
94+
coro: The coroutine to run.
95+
96+
Returns:
97+
The result of the coroutine.
98+
99+
Raises:
100+
RuntimeError: If the executor is not properly initialized.
101+
"""
102+
if not self._started:
103+
self.start()
104+
105+
if self._loop is None or self._nacos_client is None:
106+
raise RuntimeError("Async executor not properly initialized")
107+
108+
# Create a Future to get the result
109+
result_future = Future()
110+
111+
async def wrapped_coro():
112+
try:
113+
result = await coro
114+
result_future.set_result(result)
115+
except Exception as e:
116+
result_future.set_exception(e)
117+
118+
# Schedule the coroutine in the event loop
119+
self._loop.call_soon_threadsafe(asyncio.create_task, wrapped_coro())
120+
121+
# Wait for result
122+
return result_future.result(timeout=30) # 30 second timeout
123+
124+
def get_nacos_client(self):
125+
"""
126+
Get the Nacos client instance.
127+
128+
Returns:
129+
The Nacos client instance.
130+
"""
131+
if not self._started:
132+
self.start()
133+
return self._nacos_client
134+
135+
def shutdown(self):
136+
"""Shutdown the async executor."""
137+
if self._loop and not self._loop.is_closed():
138+
self._loop.call_soon_threadsafe(self._loop.stop)
139+
self._shutdown = True
140+
141+
142+
# Global async executor
143+
_async_executor = AsyncExecutor()
144+
145+
146+
def run_async_safely(coro):
147+
"""
148+
Run an async operation safely using the dedicated executor.
149+
150+
Args:
151+
coro: The coroutine to run.
152+
153+
Returns:
154+
The result of the coroutine.
155+
156+
Raises:
157+
Exception: If the async operation fails.
158+
"""
159+
try:
160+
return _async_executor.run_coroutine(coro)
161+
except Exception as e:
162+
plugin_logger.error(f"Error running async operation: {e}")
163+
raise
164+
165+
166+
# Async wrapper functions
167+
async def call_list_instances(param: ListInstanceParam):
168+
"""List instances."""
169+
client = _async_executor.get_nacos_client()
170+
return await client.list_instances(param)
171+
172+
173+
async def call_deregister_instance(param: DeregisterInstanceParam) -> bool:
174+
"""Deregister instance."""
175+
client = _async_executor.get_nacos_client()
176+
return await client.deregister_instance(param)
177+
178+
179+
async def call_subscribe(param: SubscribeServiceParam) -> None:
180+
"""Subscribe to service."""
181+
client = _async_executor.get_nacos_client()
182+
await client.subscribe(param)
183+
184+
185+
async def call_unsubscribe(param: SubscribeServiceParam) -> None:
186+
"""Unsubscribe from service."""
187+
client = _async_executor.get_nacos_client()
188+
await client.unsubscribe(param)
189+
190+
191+
async def call_list_services(param: ListServiceParam):
192+
"""List services."""
193+
client = _async_executor.get_nacos_client()
194+
return await client.list_services(param)
195+
196+
197+
async def call_register_instance(param: RegisterInstanceParam) -> None:
198+
"""Register instance."""
199+
client = _async_executor.get_nacos_client()
200+
await client.register_instance(param)
201+
202+
203+
def _cleanup_async_executor():
204+
"""Cleanup the async executor."""
205+
try:
206+
_async_executor.shutdown()
207+
except Exception as e:
208+
plugin_logger.error(f"Error during async executor cleanup: {e}")
209+
210+
211+
# Register cleanup function
212+
atexit.register(_cleanup_async_executor)
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# -- encoding: utf-8 --
2+
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
3+
# This file is a part of the ModelEngine Project.
4+
# Licensed under the MIT License. See License.txt in the project root for license information.
5+
# ======================================================================================================================
6+
"""
7+
Configuration module for Nacos registry server.
8+
"""
9+
from v2.nacos import ClientConfigBuilder
10+
11+
from fitframework import value
12+
from fitframework.utils import tools
13+
14+
15+
@value('registry-center.server.addresses', converter=tools.to_list)
16+
def get_registry_server_addresses() -> list:
17+
"""Get the list of registry server addresses."""
18+
pass
19+
20+
21+
@value('nacos.username', default_value=None)
22+
def get_nacos_username() -> str:
23+
"""
24+
Get the Nacos username.
25+
26+
Returns:
27+
Nacos username.
28+
"""
29+
pass
30+
31+
32+
@value('nacos.password', default_value=None)
33+
def get_nacos_password() -> str:
34+
"""
35+
Get the Nacos password.
36+
37+
Returns:
38+
Nacos password.
39+
"""
40+
pass
41+
42+
43+
@value('nacos.accessKey', default_value=None)
44+
def get_nacos_access_key() -> str:
45+
"""
46+
Get the Nacos access key.
47+
48+
Returns:
49+
Nacos access key.
50+
"""
51+
pass
52+
53+
54+
@value('nacos.secretKey', default_value=None)
55+
def get_nacos_secret_key() -> str:
56+
"""
57+
Get the Nacos secret key.
58+
59+
Returns:
60+
Nacos secret key.
61+
"""
62+
pass
63+
64+
65+
@value('nacos.namespace', default_value="")
66+
def get_nacos_namespace() -> str:
67+
"""
68+
Get the Nacos namespace.
69+
70+
Returns:
71+
Nacos namespace.
72+
"""
73+
pass
74+
75+
76+
@value('nacos.isEphemeral', default_value=True, converter=bool)
77+
def get_heartbeat_is_ephemeral() -> bool:
78+
"""
79+
Get whether the heartbeat is ephemeral.
80+
81+
Returns:
82+
Whether the heartbeat is ephemeral.
83+
"""
84+
pass
85+
86+
87+
@value('nacos.heartBeatInterval', default_value=5000, converter=int)
88+
def get_heartbeat_interval() -> int:
89+
"""
90+
Get the heartbeat interval in milliseconds.
91+
92+
Returns:
93+
Heartbeat interval in milliseconds.
94+
"""
95+
pass
96+
97+
98+
@value('nacos.heartBeatTimeout', default_value=15000, converter=int)
99+
def get_heartbeat_timeout() -> int:
100+
"""
101+
Get the heartbeat timeout in milliseconds.
102+
103+
Returns:
104+
Heartbeat timeout in milliseconds.
105+
"""
106+
pass
107+
108+
109+
@value('nacos.weight', default_value=1.0, converter=float)
110+
def get_heartbeat_weight() -> float:
111+
"""
112+
Get the heartbeat weight.
113+
114+
Returns:
115+
Heartbeat weight.
116+
"""
117+
pass
118+
119+
120+
def build_nacos_config():
121+
"""
122+
Build the Nacos client configuration.
123+
124+
Returns:
125+
Configured Nacos client config.
126+
"""
127+
return (ClientConfigBuilder()
128+
.server_address(get_registry_server_addresses()[0])
129+
.namespace_id(get_nacos_namespace() or 'local')
130+
.username(get_nacos_username())
131+
.password(get_nacos_password())
132+
.access_key(get_nacos_access_key())
133+
.secret_key(get_nacos_secret_key())
134+
.build())

0 commit comments

Comments
 (0)