-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathhelm_values.py
More file actions
266 lines (222 loc) · 10.4 KB
/
Copy pathhelm_values.py
File metadata and controls
266 lines (222 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""Pure helm-values generation for the ``agentex-agent`` chart.
Maps an :class:`~agentex.config.agent_manifest.AgentManifest` plus an optional
:class:`~agentex.config.environment_config.AgentEnvironmentConfig` to the values
dict the ``agentex-agent`` helm chart consumes. Depends only on pydantic and the
stdlib, so it is safe to import from a slim REST-only install without the ADK
runtime — the same contract as the other ``agentex.config`` modules.
Filesystem-aware ACP module resolution stays in
``agentex.lib.cli.utils.path_utils``: callers that have the agent source tree on
disk should resolve the module themselves and pass ``acp_module``. Callers
without one (e.g. server-side deployers) get :func:`derive_acp_module`'s
pure-string derivation by default.
"""
from __future__ import annotations
import json
import base64
import logging
from typing import Any
from agentex.config.agent_manifest import AgentManifest
from agentex.config.environment_config import AgentAuthConfig, AgentEnvironmentConfig
logger = logging.getLogger(__name__)
TEMPORAL_WORKER_KEY = "temporal-worker"
AUTH_PRINCIPAL_ENV_VAR = "AUTH_PRINCIPAL_B64"
__all__ = [
"AUTH_PRINCIPAL_ENV_VAR",
"TEMPORAL_WORKER_KEY",
"build_acp_command",
"derive_acp_module",
"encode_principal_context",
"convert_env_vars_dict_to_list",
"merge_deployment_configs",
]
def convert_env_vars_dict_to_list(env_vars: dict[str, str]) -> list[dict[str, str]]:
"""Convert a dictionary of environment variables to a list of dictionaries"""
return [{"name": key, "value": value} for key, value in env_vars.items()]
def encode_principal_context(auth_config: AgentAuthConfig | None) -> str | None:
"""Base64-encode the auth principal as compact JSON, or None if unset."""
if auth_config is None:
return None
principal = auth_config.principal
if not principal:
return None
json_str = json.dumps(principal, separators=(",", ":"))
return base64.b64encode(json_str.encode("utf-8")).decode("utf-8")
def derive_acp_module(manifest: AgentManifest) -> str:
"""Derive the ACP module from the manifest by pure string transform.
Callers with the agent source tree on disk should prefer the filesystem-aware
resolution in ``agentex.lib.cli.utils.path_utils.calculate_docker_acp_module``
and pass its result to :func:`merge_deployment_configs` as ``acp_module``.
"""
if manifest.local_development and manifest.local_development.paths:
acp_path = manifest.local_development.paths.acp
if acp_path:
return acp_path.replace(".py", "").replace("/", ".")
return "project.acp"
def build_acp_command(acp_module: str) -> list[str]:
"""Build the uvicorn command that runs the agent's ACP server."""
return ["uvicorn", f"{acp_module}:acp", "--host", "0.0.0.0", "--port", "8000"]
def _deep_merge(base_dict: dict[str, Any], override_dict: dict[str, Any]) -> None:
"""Deep merge override_dict into base_dict"""
for key, value in override_dict.items():
if key in base_dict and isinstance(base_dict[key], dict) and isinstance(value, dict):
_deep_merge(base_dict[key], value)
else:
base_dict[key] = value
def merge_deployment_configs(
manifest: AgentManifest,
agent_env_config: AgentEnvironmentConfig | None,
*,
repository: str,
image_tag: str,
acp_module: str | None = None,
) -> dict[str, Any]:
"""Merge global deployment config with environment-specific overrides into helm values.
Args:
manifest: The agent manifest configuration.
agent_env_config: Environment-specific configuration (optional).
repository: Container image repository to deploy.
image_tag: Container image tag to deploy.
acp_module: Pre-resolved ACP module for the uvicorn command. Defaults to
:func:`derive_acp_module`'s pure-string derivation.
Returns:
Dictionary of helm values ready for deployment.
Raises:
ValueError: If deployment configuration is missing or invalid.
"""
agent_config = manifest.agent
if not manifest.deployment:
raise ValueError("No deployment configuration found in manifest")
if not repository or not image_tag:
raise ValueError("Repository and image tag are required")
# Start with global configuration
helm_values: dict[str, Any] = {
"global": {
"image": {
"repository": repository,
"tag": image_tag,
"pullPolicy": "IfNotPresent",
},
"agent": {
"name": manifest.agent.name,
"description": manifest.agent.description,
"acp_type": manifest.agent.acp_type,
},
},
"replicaCount": manifest.deployment.global_config.replicaCount,
"resources": {
"requests": {
"cpu": manifest.deployment.global_config.resources.requests.cpu,
"memory": manifest.deployment.global_config.resources.requests.memory,
},
"limits": {
"cpu": manifest.deployment.global_config.resources.limits.cpu,
"memory": manifest.deployment.global_config.resources.limits.memory,
},
},
# Enable autoscaling by default for production deployments
"autoscaling": {
"enabled": True,
"minReplicas": 1,
"maxReplicas": 10,
"targetCPUUtilizationPercentage": 50,
},
}
# Handle temporal configuration using new helper methods
if agent_config.is_temporal_agent():
temporal_config = agent_config.get_temporal_workflow_config()
if temporal_config:
helm_values[TEMPORAL_WORKER_KEY] = {
"enabled": True,
# Enable autoscaling for temporal workers as well
"autoscaling": {
"enabled": True,
"minReplicas": 1,
"maxReplicas": 10,
"targetCPUUtilizationPercentage": 50,
},
}
helm_values["global"]["workflow"] = {
"name": temporal_config.name,
"taskQueue": temporal_config.queue_name,
}
# Collect all environment variables with proper precedence
# Priority: manifest -> environments.yaml -> secrets (highest)
all_env_vars: dict[str, str] = {}
secret_env_vars: list[dict[str, str]] = []
# Start with agent_config env vars from manifest
if agent_config.env:
all_env_vars.update(agent_config.env)
# Override with environment config env vars if they exist
if agent_env_config and agent_env_config.helm_overrides and "env" in agent_env_config.helm_overrides:
env_overrides = agent_env_config.helm_overrides["env"]
if isinstance(env_overrides, list):
# Convert list format to dict for easier merging
env_override_dict: dict[str, str] = {}
for env_var in env_overrides:
if isinstance(env_var, dict) and "name" in env_var and "value" in env_var:
env_override_dict[str(env_var["name"])] = str(env_var["value"])
all_env_vars.update(env_override_dict)
# Handle credentials and check for conflicts
if agent_config.credentials:
for credential in agent_config.credentials:
# Handle both CredentialMapping objects and legacy dict format
if isinstance(credential, dict):
env_var_name = credential["env_var_name"]
secret_name = credential["secret_name"]
secret_key = credential["secret_key"]
else:
env_var_name = credential.env_var_name
secret_name = credential.secret_name
secret_key = credential.secret_key
# Check if the environment variable name conflicts with existing env vars
if env_var_name in all_env_vars:
logger.warning(
f"Environment variable '{env_var_name}' is defined in both "
f"env and secretEnvVars. The secret value will take precedence."
)
# Remove from regular env vars since secret takes precedence
del all_env_vars[env_var_name]
secret_env_vars.append(
{
"name": env_var_name,
"secretName": secret_name,
"secretKey": secret_key,
}
)
# Apply agent environment configuration overrides
if agent_env_config:
# Add auth principal env var if environment config is set
if agent_env_config.auth:
encoded_principal = encode_principal_context(agent_env_config.auth)
if encoded_principal:
all_env_vars[AUTH_PRINCIPAL_ENV_VAR] = encoded_principal
else:
raise ValueError(f"Auth principal unable to be encoded for agent_env_config: {agent_env_config}")
if agent_env_config.helm_overrides:
_deep_merge(helm_values, agent_env_config.helm_overrides)
# Set final environment variables
# Environment variable precedence: manifest -> environments.yaml -> secrets (highest)
if all_env_vars:
helm_values["env"] = convert_env_vars_dict_to_list(all_env_vars)
if secret_env_vars:
helm_values["secretEnvVars"] = secret_env_vars
# Set environment variables for temporal worker if enabled
if TEMPORAL_WORKER_KEY in helm_values:
if all_env_vars:
helm_values[TEMPORAL_WORKER_KEY]["env"] = convert_env_vars_dict_to_list(all_env_vars)
if secret_env_vars:
helm_values[TEMPORAL_WORKER_KEY]["secretEnvVars"] = secret_env_vars
# Handle image pull secrets
if manifest.deployment and manifest.deployment.imagePullSecrets:
pull_secrets = [pull_secret.model_dump() for pull_secret in manifest.deployment.imagePullSecrets]
helm_values["global"]["imagePullSecrets"] = pull_secrets
helm_values["imagePullSecrets"] = pull_secrets
# Add dynamic ACP command based on manifest configuration if command is not set in helm overrides
helm_overrides_command = (
agent_env_config and agent_env_config.helm_overrides and "command" in agent_env_config.helm_overrides
)
if not helm_overrides_command:
module = acp_module or derive_acp_module(manifest)
helm_values["command"] = build_acp_command(module)
logger.info(f"Using ACP command: uvicorn {module}:acp")
return helm_values