Skip to content

Commit b86f6fa

Browse files
congxiao-wxxclaude
andcommitted
feat: add Config.from_request_headers to extract FC temporary credentials from HTTP headers
FC platform injects fresh STS credentials via x-fc-access-key-id, x-fc-access-key-secret, and x-fc-security-token headers on every request. This method extracts them so long-running agent services can use per-request credentials instead of potentially expired env vars. - Add Config.from_request_headers() classmethod supporting AgentRequest and Starlette Request - Add unit tests (8 cases) and e2e tests (5 cases covering OpenAI/AG-UI protocols) - Add example: server_with_header_credentials.py Change-Id: I59a5844735797a26a30fa04a7859e2e99d216d17 Co-developed-by: Claude <noreply@anthropic.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 333f548 commit b86f6fa

4 files changed

Lines changed: 375 additions & 1 deletion

File tree

agentrun/utils/config.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"""
66

77
import os
8-
from typing import Dict, Optional
8+
from typing import Any, Dict, Optional
99

1010
from dotenv import load_dotenv
1111

@@ -154,6 +154,40 @@ def __init__(
154154
self._bailian_endpoint = bailian_endpoint
155155
self._headers = headers or {}
156156

157+
@classmethod
158+
def from_request_headers(cls, request: Any) -> "Config":
159+
"""从 HTTP 请求头提取临时凭证构造 Config / Create Config from HTTP request headers
160+
161+
支持传入 AgentRequest(自动取 raw_request)或 Starlette Request。
162+
Accepts AgentRequest (auto-extracts raw_request) or Starlette Request.
163+
164+
提取以下 header:
165+
Extracts the following headers:
166+
- x-fc-access-key-id → access_key_id
167+
- x-fc-access-key-secret → access_key_secret
168+
- x-fc-security-token → security_token
169+
170+
Args:
171+
request: AgentRequest 或 Starlette Request 对象
172+
AgentRequest or Starlette Request object
173+
174+
Returns:
175+
Config: 包含请求头中凭证信息的配置对象
176+
Config with credentials from request headers
177+
178+
Note:
179+
如果需要与其他 Config 合并,请将此方法返回的 Config 放在 with_configs 的最后参数,
180+
以确保 header 凭证不被覆盖:
181+
``Config.with_configs(base_config, Config.from_request_headers(request))``
182+
"""
183+
raw = getattr(request, "raw_request", None) or request
184+
headers = getattr(raw, "headers", {})
185+
return cls(
186+
access_key_id=headers.get("x-fc-access-key-id", ""),
187+
access_key_secret=headers.get("x-fc-access-key-secret", ""),
188+
security_token=headers.get("x-fc-security-token", ""),
189+
)
190+
157191
@classmethod
158192
def with_configs(cls, *configs: Optional["Config"]) -> "Config":
159193
return cls().update(*configs)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
"""使用 HTTP 请求头临时凭证的 Agent Server 示例
2+
3+
本示例演示如何在 AgentRunServer 中从每次请求的 HTTP header 获取临时凭证,
4+
替代环境变量中可能过期的 token,确保 Agent 调用知识库等资源时凭证始终有效。
5+
6+
适用场景:
7+
- 部署在函数计算(FC)上的 Agent,FC 平台通过请求头注入临时凭证
8+
- 需要长期运行的 Agent 服务,环境变量中的 STS Token 会过期
9+
10+
FC 平台注入的请求头:
11+
- x-fc-access-key-id: 临时 AccessKey ID
12+
- x-fc-access-key-secret: 临时 AccessKey Secret
13+
- x-fc-security-token: 临时 Security Token
14+
15+
启动服务后测试:
16+
curl http://127.0.0.1:9000/openai/v1/chat/completions -X POST \\
17+
-H "Content-Type: application/json" \\
18+
-H "x-fc-access-key-id: <your-ak>" \\
19+
-H "x-fc-access-key-secret: <your-sk>" \\
20+
-H "x-fc-security-token: <your-token>" \\
21+
-d '{"messages": [{"role": "user", "content": "什么是Serverless?"}], "stream": true}'
22+
"""
23+
24+
import os
25+
26+
from agentrun.knowledgebase import KnowledgeBase
27+
from agentrun.server import AgentRequest, AgentRunServer
28+
from agentrun.server.model import ServerConfig
29+
from agentrun.utils.config import Config
30+
from agentrun.utils.log import logger
31+
32+
KNOWLEDGE_BASE_NAME = os.getenv("AGENTRUN_KNOWLEDGE_BASE", "my-knowledge-base")
33+
34+
35+
async def invoke_agent(request: AgentRequest):
36+
config = Config.from_request_headers(request)
37+
38+
user_query = request.messages[-1].content if request.messages else ""
39+
if not user_query:
40+
yield "请输入您的问题。"
41+
return
42+
43+
try:
44+
kb = KnowledgeBase.get_by_name(KNOWLEDGE_BASE_NAME, config=config)
45+
result = kb.retrieve(query=user_query, config=config)
46+
47+
nodes = result.get("nodes", result.get("data", []))
48+
if not nodes:
49+
yield f"未在知识库 {KNOWLEDGE_BASE_NAME} 中找到相关内容。"
50+
return
51+
52+
chunks = [
53+
node.get("content", node.get("text", ""))
54+
for node in nodes
55+
if node.get("content") or node.get("text")
56+
]
57+
context = "\n---\n".join(chunks)
58+
yield f"根据知识库检索结果:\n\n{context}"
59+
60+
except Exception as e:
61+
logger.error("知识库检索失败: %s", e)
62+
yield f"知识库检索失败: {e}"
63+
64+
65+
AgentRunServer(
66+
invoke_agent=invoke_agent,
67+
config=ServerConfig(cors_origins=["*"]),
68+
).start(port=9000)
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
"""
2+
E2E 测试: Config.from_request_headers 通过真实 HTTP 请求提取临时凭证
3+
4+
测试覆盖:
5+
- 通过 OpenAI 协议发送 x-fc-* header,invoke_agent 中使用 Config.from_request_headers 提取凭证
6+
- 通过 AG-UI 协议发送 x-fc-* header,验证同样可以提取
7+
- 部分 header 缺失时对应字段为空字符串
8+
- 无 x-fc-* header 时所有字段为空字符串
9+
"""
10+
11+
from agentrun.server import AgentRequest, AgentRunServer
12+
from agentrun.utils.config import Config
13+
14+
15+
def _make_client(invoke_agent):
16+
server = AgentRunServer(invoke_agent=invoke_agent)
17+
app = server.as_fastapi_app()
18+
from fastapi.testclient import TestClient
19+
20+
return TestClient(app)
21+
22+
23+
class TestHeaderCredentials:
24+
"""通过真实 HTTP 请求验证 Config.from_request_headers 的 E2E 行为"""
25+
26+
def test_openai_full_headers(self):
27+
"""OpenAI 协议: 三个 x-fc-* header 都存在时正确提取"""
28+
captured = {}
29+
30+
async def invoke_agent(request: AgentRequest):
31+
config = Config.from_request_headers(request)
32+
captured["ak"] = config.get_access_key_id()
33+
captured["sk"] = config.get_access_key_secret()
34+
captured["token"] = config.get_security_token()
35+
yield "ok"
36+
37+
client = _make_client(invoke_agent)
38+
response = client.post(
39+
"/openai/v1/chat/completions",
40+
json={
41+
"messages": [{"role": "user", "content": "test"}],
42+
"stream": True,
43+
},
44+
headers={
45+
"x-fc-access-key-id": "ak-e2e-test",
46+
"x-fc-access-key-secret": "sk-e2e-test",
47+
"x-fc-security-token": "token-e2e-test",
48+
},
49+
)
50+
assert response.status_code == 200
51+
assert captured["ak"] == "ak-e2e-test"
52+
assert captured["sk"] == "sk-e2e-test"
53+
assert captured["token"] == "token-e2e-test"
54+
55+
def test_agui_full_headers(self):
56+
"""AG-UI 协议: 三个 x-fc-* header 都存在时正确提取"""
57+
captured = {}
58+
59+
async def invoke_agent(request: AgentRequest):
60+
config = Config.from_request_headers(request)
61+
captured["ak"] = config.get_access_key_id()
62+
captured["sk"] = config.get_access_key_secret()
63+
captured["token"] = config.get_security_token()
64+
yield "ok"
65+
66+
client = _make_client(invoke_agent)
67+
response = client.post(
68+
"/ag-ui/agent",
69+
json={
70+
"messages": [{"role": "user", "content": "test"}],
71+
},
72+
headers={
73+
"x-fc-access-key-id": "ak-agui-test",
74+
"x-fc-access-key-secret": "sk-agui-test",
75+
"x-fc-security-token": "token-agui-test",
76+
},
77+
)
78+
assert response.status_code == 200
79+
assert captured["ak"] == "ak-agui-test"
80+
assert captured["sk"] == "sk-agui-test"
81+
assert captured["token"] == "token-agui-test"
82+
83+
def test_partial_headers(self):
84+
"""部分 x-fc-* header 缺失时,缺失字段为空字符串"""
85+
captured = {}
86+
87+
async def invoke_agent(request: AgentRequest):
88+
config = Config.from_request_headers(request)
89+
captured["ak"] = config.get_access_key_id()
90+
captured["sk"] = config.get_access_key_secret()
91+
captured["token"] = config.get_security_token()
92+
yield "ok"
93+
94+
client = _make_client(invoke_agent)
95+
response = client.post(
96+
"/openai/v1/chat/completions",
97+
json={
98+
"messages": [{"role": "user", "content": "test"}],
99+
"stream": True,
100+
},
101+
headers={
102+
"x-fc-access-key-id": "ak-partial",
103+
},
104+
)
105+
assert response.status_code == 200
106+
assert captured["ak"] == "ak-partial"
107+
assert captured["sk"] == ""
108+
assert captured["token"] == ""
109+
110+
def test_no_fc_headers(self):
111+
"""无 x-fc-* header 时所有字段为空字符串"""
112+
captured = {}
113+
114+
async def invoke_agent(request: AgentRequest):
115+
config = Config.from_request_headers(request)
116+
captured["ak"] = config.get_access_key_id()
117+
captured["sk"] = config.get_access_key_secret()
118+
captured["token"] = config.get_security_token()
119+
yield "ok"
120+
121+
client = _make_client(invoke_agent)
122+
response = client.post(
123+
"/openai/v1/chat/completions",
124+
json={
125+
"messages": [{"role": "user", "content": "test"}],
126+
"stream": True,
127+
},
128+
)
129+
assert response.status_code == 200
130+
assert captured["ak"] == ""
131+
assert captured["sk"] == ""
132+
assert captured["token"] == ""
133+
134+
def test_config_merge_with_base(self):
135+
"""从 header 提取的 Config 与 base Config 合并时 header 凭证优先"""
136+
captured = {}
137+
138+
async def invoke_agent(request: AgentRequest):
139+
base = Config(
140+
access_key_id="base-ak",
141+
access_key_secret="base-sk",
142+
security_token="base-token",
143+
)
144+
header_config = Config.from_request_headers(request)
145+
merged = Config.with_configs(base, header_config)
146+
captured["ak"] = merged.get_access_key_id()
147+
captured["sk"] = merged.get_access_key_secret()
148+
captured["token"] = merged.get_security_token()
149+
yield "ok"
150+
151+
client = _make_client(invoke_agent)
152+
response = client.post(
153+
"/openai/v1/chat/completions",
154+
json={
155+
"messages": [{"role": "user", "content": "test"}],
156+
"stream": True,
157+
},
158+
headers={
159+
"x-fc-access-key-id": "header-ak",
160+
"x-fc-access-key-secret": "header-sk",
161+
"x-fc-security-token": "header-token",
162+
},
163+
)
164+
assert response.status_code == 200
165+
assert captured["ak"] == "header-ak"
166+
assert captured["sk"] == "header-sk"
167+
assert captured["token"] == "header-token"

tests/unittests/test_config.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Config.from_request_headers 单元测试"""
2+
3+
from agentrun.utils.config import Config
4+
5+
6+
class FakeRequest:
7+
"""模拟 Starlette Request"""
8+
9+
def __init__(self, headers: dict):
10+
self.headers = headers
11+
12+
13+
class FakeAgentRequest:
14+
"""模拟 AgentRequest(含 raw_request)"""
15+
16+
def __init__(self, headers: dict):
17+
self.raw_request = FakeRequest(headers)
18+
19+
20+
class TestFromRequestHeaders:
21+
22+
def test_full_headers(self):
23+
request = FakeRequest({
24+
"x-fc-access-key-id": "ak-123",
25+
"x-fc-access-key-secret": "sk-456",
26+
"x-fc-security-token": "token-789",
27+
})
28+
config = Config.from_request_headers(request)
29+
30+
assert config.get_access_key_id() == "ak-123"
31+
assert config.get_access_key_secret() == "sk-456"
32+
assert config.get_security_token() == "token-789"
33+
34+
def test_partial_headers_missing_token(self):
35+
request = FakeRequest({
36+
"x-fc-access-key-id": "ak-123",
37+
"x-fc-access-key-secret": "sk-456",
38+
})
39+
config = Config.from_request_headers(request)
40+
41+
assert config.get_access_key_id() == "ak-123"
42+
assert config.get_access_key_secret() == "sk-456"
43+
assert config.get_security_token() == ""
44+
45+
def test_partial_headers_only_key_id(self):
46+
request = FakeRequest({
47+
"x-fc-access-key-id": "ak-only",
48+
})
49+
config = Config.from_request_headers(request)
50+
51+
assert config.get_access_key_id() == "ak-only"
52+
assert config.get_access_key_secret() == ""
53+
assert config.get_security_token() == ""
54+
55+
def test_empty_headers(self):
56+
request = FakeRequest({})
57+
config = Config.from_request_headers(request)
58+
59+
assert config.get_access_key_id() == ""
60+
assert config.get_access_key_secret() == ""
61+
assert config.get_security_token() == ""
62+
63+
def test_agent_request_unwrap(self):
64+
agent_req = FakeAgentRequest({
65+
"x-fc-access-key-id": "ak-from-agent",
66+
"x-fc-access-key-secret": "sk-from-agent",
67+
"x-fc-security-token": "token-from-agent",
68+
})
69+
config = Config.from_request_headers(agent_req)
70+
71+
assert config.get_access_key_id() == "ak-from-agent"
72+
assert config.get_access_key_secret() == "sk-from-agent"
73+
assert config.get_security_token() == "token-from-agent"
74+
75+
def test_config_usable_as_resource_param(self):
76+
config = Config.from_request_headers(FakeRequest({
77+
"x-fc-access-key-id": "ak-new",
78+
"x-fc-access-key-secret": "sk-new",
79+
"x-fc-security-token": "token-new",
80+
}))
81+
assert config.get_access_key_id() == "ak-new"
82+
assert config.get_access_key_secret() == "sk-new"
83+
assert config.get_security_token() == "token-new"
84+
assert config.get_region_id() == "cn-hangzhou"
85+
86+
def test_agent_request_with_none_raw_request(self):
87+
agent_req = FakeAgentRequest({})
88+
agent_req.raw_request = None
89+
config = Config.from_request_headers(agent_req)
90+
assert config.get_access_key_id() == ""
91+
assert config.get_access_key_secret() == ""
92+
assert config.get_security_token() == ""
93+
94+
def test_extra_headers_ignored(self):
95+
request = FakeRequest({
96+
"x-fc-access-key-id": "ak-123",
97+
"x-fc-access-key-secret": "sk-456",
98+
"x-fc-security-token": "token-789",
99+
"authorization": "Bearer xxx",
100+
"content-type": "application/json",
101+
})
102+
config = Config.from_request_headers(request)
103+
assert config.get_access_key_id() == "ak-123"
104+
assert config.get_access_key_secret() == "sk-456"
105+
assert config.get_security_token() == "token-789"

0 commit comments

Comments
 (0)