diff --git a/config.yaml.full b/config.yaml.full index fd18f751..df7aac0a 100644 --- a/config.yaml.full +++ b/config.yaml.full @@ -151,7 +151,11 @@ database: region: cn-beijing # default Volcengine TOS Vector region bucket: account_id: - + +# Dynamic config +nacos: + endpoint: + password: # [optional] for prompt optimization in cli/app diff --git a/veadk/agent.py b/veadk/agent.py index 7f4745d6..0d5592b7 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -24,6 +24,8 @@ if not os.getenv("LITELLM_LOCAL_MODEL_COST_MAP"): os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True" +import uuid + from google.adk.agents import LlmAgent, RunConfig from google.adk.agents.base_agent import BaseAgent from google.adk.agents.context_cache_config import ContextCacheConfig @@ -89,6 +91,7 @@ class Agent(LlmAgent): model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow") + id: str = Field(default_factory=lambda: str(uuid.uuid4()).split("-")[0]) name: str = DEFAULT_AGENT_NAME description: str = DEFAULT_DESCRIPTION instruction: Union[str, InstructionProvider] = DEFAULT_INSTRUCTION @@ -278,7 +281,13 @@ def model_post_init(self, __context: Any) -> None: logger.info(f"{self.__class__.__name__} `{self.name}` init done.") logger.debug( - f"Agent: {self.model_dump(include={'name', 'model_name', 'model_api_base', 'tools'})}" + f"Agent: {self.model_dump(include={'id', 'name', 'model_name', 'model_api_base', 'tools'})}" + ) + + def update_model(self, model_name: str): + logger.info(f"Updating model to {model_name}") + self.model = self.model.model_copy( + update={"model": f"{self.model_provider}/{model_name}"} ) async def _run( diff --git a/veadk/auth/veauth/mse_veauth.py b/veadk/auth/veauth/mse_veauth.py new file mode 100644 index 00000000..5082fc28 --- /dev/null +++ b/veadk/auth/veauth/mse_veauth.py @@ -0,0 +1,118 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from veadk.auth.veauth.utils import get_credential_from_vefaas_iam +from veadk.configs.database_configs import MSENacosConfig +from veadk.utils.logger import get_logger +from veadk.utils.volcengine_sign import ve_request + +logger = get_logger(__name__) + + +def get_instance_id_by_name(instance_name: str, region: str, project_name: str) -> str: + logger.info(f"Fetching MSE Nacos instance id by instance name {instance_name} ...") + + access_key = os.getenv("VOLCENGINE_ACCESS_KEY") + secret_key = os.getenv("VOLCENGINE_SECRET_KEY") + session_token = "" + + if not (access_key and secret_key): + # try to get from vefaas iam + cred = get_credential_from_vefaas_iam() + access_key = cred.access_key_id + secret_key = cred.secret_access_key + session_token = cred.session_token + + res = ve_request( + request_body={ + "Filter": {"Status": [], "ProjectName": project_name}, + "PageNumber": 1, + "PageSize": 10, + "ProjectName": project_name, + }, + header={"X-Security-Token": session_token}, + action="ListNacosRegistries", + ak=access_key, + sk=secret_key, + service="mse", + version="2022-01-01", + region=region, + host="open.volcengineapi.com", + ) + + try: + for item in res["Result"]["Items"]: + if item["Name"] == instance_name: + logger.info( + f"Found MSE Nacos instance id {item['Id']} by instance name {instance_name}" + ) + return item["Id"] + raise ValueError(f"Id by instance name {instance_name} not found: {res}") + except Exception as e: + logger.error( + f"Failed to get MSE Nacos instance id by name {instance_name}: {e}, response: {res}" + ) + raise e + + +def get_mse_cridential( + instance_name: str, + region: str = "cn-beijing", + project_name: str = "default", +) -> MSENacosConfig: + logger.info("Fetching MSE Nacos token...") + + access_key = os.getenv("VOLCENGINE_ACCESS_KEY") + secret_key = os.getenv("VOLCENGINE_SECRET_KEY") + session_token = "" + + if not (access_key and secret_key): + # try to get from vefaas iam + cred = get_credential_from_vefaas_iam() + access_key = cred.access_key_id + secret_key = cred.secret_access_key + session_token = cred.session_token + + instance_id = get_instance_id_by_name( + instance_name=instance_name, region=region, project_name=project_name + ) + + res = ve_request( + request_body={ + "Id": instance_id, + "ProjectName": project_name, + }, + header={"X-Security-Token": session_token}, + action="GetNacosRegistry", + ak=access_key, + sk=secret_key, + service="mse", + version="2022-01-01", + region=region, + host="open.volcengineapi.com", + ) + + try: + logger.info( + f"Successfully fetched MSE Nacos endpoint {res['Result']['NacosRegistry']['PublicAddress']} and corresponding password." + ) + return MSENacosConfig( + endpoint=res["Result"]["NacosRegistry"]["PublicAddress"], + password=res["Result"]["NacosRegistry"]["InitialPassword"], + ) + except Exception as e: + logger.error(f"Failed to get MSE Nacos token: {e}, response: {res}") + raise e diff --git a/veadk/configs/database_configs.py b/veadk/configs/database_configs.py index 3292f3bd..89fbbbe7 100644 --- a/veadk/configs/database_configs.py +++ b/veadk/configs/database_configs.py @@ -178,3 +178,13 @@ class TOSVectorConfig(BaseSettings): user_agent_soft_version: str | None = None user_agent_customized_key_values: dict[str, str] | None = None + + +class MSENacosConfig(BaseSettings): + model_config = SettingsConfigDict(env_prefix="NACOS_") + + endpoint: str + port: str = "8848" # hard coding by Volcengine MSE Nacos service + + username: str = "nacos" # hard coding by Volcengine MSE Nacos service + password: str diff --git a/veadk/configs/dynamic_config_manager.py b/veadk/configs/dynamic_config_manager.py new file mode 100644 index 00000000..2c67777e --- /dev/null +++ b/veadk/configs/dynamic_config_manager.py @@ -0,0 +1,159 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +from v2.nacos import ClientConfig, NacosConfigService +from v2.nacos.config.model.config_param import ConfigParam + +from veadk.agent import Agent +from veadk.auth.veauth.mse_veauth import get_mse_cridential +from veadk.consts import DEFAULT_NACOS_GROUP, DEFAULT_NACOS_INSTANCE_NAME +from veadk.utils.logger import get_logger + +logger = get_logger(__name__) + + +class DynamicConfigManager: + """ + DynamicConfigManager is responsible for creating and publishing dynamic config to nacos. + """ + + def __init__( + self, + agents: list[Agent] | Agent, + ): + """ + Initialize DynamicConfigManager with agents and app_name. + + Args: + agents (list[Agent] | Agent): The agent(s) to be included in the dynamic config. + """ + if isinstance(agents, list): + self.agents = agents + else: + self.agents = [agents] + + logger.debug(f"DynamicConfigManager init with {len(self.agents)} agent(s).") + + async def create_config( + self, + configs: dict = {}, + instance_name: str = "", + group_id: str = "", + ): + if not instance_name: + logger.warning( + f"instance_name is not provided, use default value `{DEFAULT_NACOS_INSTANCE_NAME}`. This may lead to unexpected behavior such as configuration override." + ) + instance_name = DEFAULT_NACOS_INSTANCE_NAME + + if not group_id: + logger.warning( + f"group_id is not provided, use default value `{DEFAULT_NACOS_GROUP}`. This may lead to unexpected behavior such as configuration override." + ) + group_id = group_id or DEFAULT_NACOS_GROUP + + nacos_endpoint = os.getenv("NACOS_ENDPOINT") + nacos_port = os.getenv("NACOS_PORT", "8848") + nacos_username = os.getenv("NACOS_USERNAME", "nacos") + nacos_password = os.getenv("NACOS_PASSWORD") + + if not all([nacos_endpoint, nacos_port, nacos_username, nacos_password]): + logger.warning( + "fetch NACOS_ENDPOINT, NACOS_PORT, NACOS_USERNAME, and NACOS_PASSWORD from env failed, try to get by volcengine AK/SK." + ) + + nacos_credentials = get_mse_cridential(instance_name=instance_name) + nacos_endpoint = nacos_credentials.endpoint + nacos_port = nacos_credentials.port + nacos_username = nacos_credentials.username + nacos_password = nacos_credentials.password + + client_config = ClientConfig( + server_addresses=f"{nacos_endpoint}:{nacos_port}", + namespace_id="", + username=nacos_username, + password=nacos_password, + ) + + config_client = await NacosConfigService.create_config_service( + client_config=client_config + ) + + if not configs: + logger.info("user config_dict is empty, use default config instead.") + configs = { + "agent": [ + { + "id": agent.id, + "name": agent.name, + "description": agent.description, + "model_name": agent.model_name, + "instruction": agent.instruction, + } + for agent in self.agents + ] + } + response = await config_client.publish_config( + param=ConfigParam( + data_id="veadk", + group=group_id, + type="json", + content=json.dumps(configs), + ) + ) + assert response, "publish config to nacos failed" + logger.info("Publish config to nacos success") + + await config_client.add_listener( + data_id="veadk", + group="VEADK_GROUP", + listener=self.handle_config_update, + ) + logger.info("Add config listener to nacos success") + + return config_client + + def register_agent(self, agent: list[Agent] | Agent): + if isinstance(agent, list): + self.agents.extend(agent) + else: + self.agents.append(agent) + + def update_agent(self, configs: dict): + for agent in self.agents: + for config in configs["agent"]: + if agent.id == config["id"]: + logger.info(f"Update agent {agent.id} with config {config}") + name = config["name"] + description = config["description"] + model_name = config["model_name"] + instruction = config["instruction"] + + agent.name = name + agent.description = description + if model_name != agent.model_name: + agent.update_model(model_name=model_name) + agent.instruction = instruction + + async def handle_config_update(self, tenant, data_id, group, content) -> None: + logger.debug( + "listen, tenant:{} data_id:{} group:{} content:{}".format( + tenant, data_id, group, content + ) + ) + content = json.loads(content) + self.update_agent(content) diff --git a/veadk/consts.py b/veadk/consts.py index 209c81e3..c67ba39c 100644 --- a/veadk/consts.py +++ b/veadk/consts.py @@ -72,3 +72,6 @@ DEFAULT_IMAGE_GENERATE_MODEL_API_BASE = "https://ark.cn-beijing.volces.com/api/v3/" VEFAAS_IAM_CRIDENTIAL_PATH = "/var/run/secrets/iam/credential" + +DEFAULT_NACOS_GROUP = "VEADK_GROUP" +DEFAULT_NACOS_INSTANCE_NAME = "veadk"