Skip to content

Commit 68cfaaf

Browse files
committed
Add integration tests
1 parent a654737 commit 68cfaaf

1 file changed

Lines changed: 288 additions & 0 deletions

File tree

tests/test_agentic_e2e.py

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
"""End-to-end tests for agentic (agent identity) scenarios.
2+
3+
These tests verify the full agent identity flow using MSAL Python APIs:
4+
1. Assertion callback context propagation (fmi_path flows to callback)
5+
2. Agent app-only token acquisition using FMI-sourced client assertion (Leg 2)
6+
3. Full 3-leg flow: FMI → assertion → user_fic → user-scoped token
7+
4. Cache isolation between app-only and user-scoped tokens
8+
9+
Corresponds to:
10+
- .NET: Agentic.cs
11+
- Java: AgenticIT.java
12+
13+
Test configuration uses the same lab infrastructure as test_fmi_e2e.py.
14+
Requires LAB_APP_CLIENT_CERT_PFX_PATH environment variable.
15+
"""
16+
17+
import logging
18+
import os
19+
import sys
20+
import unittest
21+
22+
import msal
23+
from tests.http_client import MinimalHttpClient
24+
from tests.lab_config import get_client_certificate
25+
from tests.test_e2e import LabBasedTestCase
26+
27+
logger = logging.getLogger(__name__)
28+
logging.basicConfig(level=logging.DEBUG if "-v" in sys.argv else logging.INFO)
29+
30+
# =============================================================================
31+
# Test configuration — matches .NET/Java agentic test constants
32+
# =============================================================================
33+
_TENANT_ID = "10c419d4-4a50-45b2-aa4e-919fb84df24f"
34+
_BLUEPRINT_CLIENT_ID = "aab5089d-e764-47e3-9f28-cc11c2513821"
35+
_AGENT_APP_ID = "ab18ca07-d139-4840-8b3b-4be9610c6ed5"
36+
_RMA_CLIENT_ID = "3bf56293-fbb5-42bd-a407-248ba7431a8c"
37+
_USER_UPN = "agentuser1@id4slab1.onmicrosoft.com"
38+
_TOKEN_EXCHANGE_SCOPE = "api://AzureADTokenExchange/.default"
39+
_FMI_EXCHANGE_SCOPE = "api://AzureFMITokenExchange/.default"
40+
_GRAPH_SCOPE = "https://graph.microsoft.com/.default"
41+
_FMI_PATH = "SomeFmiPath/FmiCredentialPath"
42+
_AUTHORITY = "https://login.microsoftonline.com/" + _TENANT_ID
43+
44+
45+
# =============================================================================
46+
# Helpers — mirror .NET GetAppCredentialAsync / Java acquireFmiCredentialForAgent
47+
# =============================================================================
48+
49+
def _acquire_fmi_credential_for_agent(agent_app_id):
50+
"""Leg 1: Blueprint app acquires FMI credential (T1) for the given agent.
51+
52+
Uses certificate authentication with SNI (sendX5C) and fmi_path set to
53+
the agent app ID — matching .NET and Java helper methods.
54+
"""
55+
blueprint_app = msal.ConfidentialClientApplication(
56+
_BLUEPRINT_CLIENT_ID,
57+
client_credential=get_client_certificate(),
58+
authority=_AUTHORITY,
59+
http_client=MinimalHttpClient(),
60+
)
61+
result = blueprint_app.acquire_token_for_client(
62+
[_TOKEN_EXCHANGE_SCOPE], fmi_path=agent_app_id)
63+
if "access_token" not in result:
64+
raise RuntimeError(
65+
"Leg 1 failed — could not acquire FMI credential: {}: {}".format(
66+
result.get("error"), result.get("error_description")))
67+
return result["access_token"]
68+
69+
70+
def _acquire_fmi_credential_from_rma():
71+
"""Acquire an FMI credential from the RMA app using certificate credentials.
72+
73+
Mirrors Java's acquireFmiCredentialFromRma and Python's test_fmi_e2e helper.
74+
Used for assertion callback context tests where the callback just needs to
75+
return a valid FMI token (not specifically for an agent app).
76+
"""
77+
rma_app = msal.ConfidentialClientApplication(
78+
_RMA_CLIENT_ID,
79+
client_credential=get_client_certificate(),
80+
authority=_AUTHORITY,
81+
http_client=MinimalHttpClient(),
82+
)
83+
result = rma_app.acquire_token_for_client(
84+
[_FMI_EXCHANGE_SCOPE], fmi_path=_FMI_PATH)
85+
if "access_token" not in result:
86+
raise RuntimeError(
87+
"RMA FMI credential acquisition failed: {}: {}".format(
88+
result.get("error"), result.get("error_description")))
89+
return result["access_token"]
90+
91+
92+
def _acquire_instance_token_for_agent():
93+
"""Leg 1 + Leg 2: Acquire an instance token (T2) for the agent app.
94+
95+
1. Blueprint → T1 (FMI credential via fmi_path)
96+
2. Agent uses T1 as client_assertion → T2 (instance token)
97+
98+
T2 is used as user_federated_identity_credential in Leg 3 (user_fic).
99+
"""
100+
t1 = _acquire_fmi_credential_for_agent(_AGENT_APP_ID)
101+
102+
agent_app = msal.ConfidentialClientApplication(
103+
_AGENT_APP_ID,
104+
client_credential={"client_assertion": t1},
105+
authority=_AUTHORITY,
106+
http_client=MinimalHttpClient(),
107+
)
108+
result = agent_app.acquire_token_for_client([_TOKEN_EXCHANGE_SCOPE])
109+
if "access_token" not in result:
110+
raise RuntimeError(
111+
"Leg 2 failed — could not acquire instance token: {}: {}".format(
112+
result.get("error"), result.get("error_description")))
113+
return result["access_token"]
114+
115+
116+
# =============================================================================
117+
# Tests
118+
# =============================================================================
119+
120+
class TestAssertionCallbackContext(LabBasedTestCase):
121+
"""Verify assertion callback receives correct context when fmi_path is set.
122+
123+
Corresponds to Java's assertionCallback_ReceivesFmiPathContext.
124+
"""
125+
126+
def test_assertion_callback_receives_fmi_path(self):
127+
captured_context = {}
128+
129+
def assertion_callback(context):
130+
captured_context.update(context)
131+
return _acquire_fmi_credential_from_rma()
132+
133+
app = msal.ConfidentialClientApplication(
134+
"urn:microsoft:identity:fmi",
135+
client_credential={"client_assertion": assertion_callback},
136+
authority=_AUTHORITY,
137+
http_client=MinimalHttpClient(),
138+
)
139+
140+
result = app.acquire_token_for_client(
141+
[_FMI_EXCHANGE_SCOPE], fmi_path=_AGENT_APP_ID)
142+
self.assertIn("access_token", result,
143+
"acquire_token_for_client failed: {}: {}".format(
144+
result.get("error"), result.get("error_description")))
145+
146+
# Verify context was passed to callback
147+
self.assertEqual(_AGENT_APP_ID, captured_context.get("fmi_path"),
148+
"fmi_path should flow to assertion callback context")
149+
self.assertEqual("urn:microsoft:identity:fmi", captured_context.get("client_id"),
150+
"client_id should be in assertion callback context")
151+
self.assertTrue(captured_context.get("token_endpoint"),
152+
"token_endpoint should be in assertion callback context")
153+
154+
155+
class TestAgentAppToken(LabBasedTestCase):
156+
"""Agent acquires app-only token for Graph using FMI-sourced assertion.
157+
158+
Corresponds to .NET's AgentGetsAppTokenForGraphTest and
159+
Java's agentGetsAppToken_UsingFmiAssertion.
160+
161+
Flow: Blueprint → T1 (assertion callback) → Agent CCA → app token
162+
"""
163+
164+
def test_agent_gets_app_token_for_graph(self):
165+
def assertion_provider(context):
166+
return _acquire_fmi_credential_for_agent(_AGENT_APP_ID)
167+
168+
agent_app = msal.ConfidentialClientApplication(
169+
_AGENT_APP_ID,
170+
client_credential={"client_assertion": assertion_provider},
171+
authority=_AUTHORITY,
172+
http_client=MinimalHttpClient(),
173+
)
174+
175+
result = agent_app.acquire_token_for_client([_GRAPH_SCOPE])
176+
self.assertIn("access_token", result,
177+
"Agent app token acquisition failed: {}: {}".format(
178+
result.get("error"), result.get("error_description")))
179+
self.assertTrue(result["access_token"],
180+
"Access token should not be empty")
181+
182+
183+
class TestAgentUserIdentity(LabBasedTestCase):
184+
"""Full 3-leg agent identity flow: FMI → assertion → user_fic → user token.
185+
186+
Corresponds to .NET's AgentUserIdentityGetsTokenForGraphTest and
187+
Java's agentUserIdentity_GetsTokenForGraph.
188+
189+
Flow:
190+
1. Blueprint → T1 (FMI credential)
191+
2. Agent uses T1 → T2 (instance token)
192+
3. Agent exchanges T2 via user_fic → user-scoped Graph token
193+
4. Verify token is cached and retrievable via acquire_token_silent
194+
"""
195+
196+
def test_agent_user_identity_gets_token_for_graph(self):
197+
# Get instance token (T2) for user_fic exchange
198+
t2 = _acquire_instance_token_for_agent()
199+
200+
# Build agent CCA with assertion callback
201+
def assertion_provider(context):
202+
return _acquire_fmi_credential_for_agent(_AGENT_APP_ID)
203+
204+
agent_app = msal.ConfidentialClientApplication(
205+
_AGENT_APP_ID,
206+
client_credential={"client_assertion": assertion_provider},
207+
authority=_AUTHORITY,
208+
http_client=MinimalHttpClient(),
209+
)
210+
211+
# Exchange T2 for user-scoped token via user_fic grant
212+
result = agent_app.acquire_token_by_user_federated_identity_credential(
213+
[_GRAPH_SCOPE], assertion=t2, username=_USER_UPN)
214+
self.assertIn("access_token", result,
215+
"user_fic token acquisition failed: {}: {}".format(
216+
result.get("error"), result.get("error_description")))
217+
self.assertTrue(result["access_token"],
218+
"Access token should not be empty")
219+
220+
# Verify account was created
221+
accounts = agent_app.get_accounts()
222+
self.assertTrue(len(accounts) > 0,
223+
"Account should be created from user_fic response")
224+
225+
# Verify silent retrieval works (token should be cached)
226+
account = accounts[0]
227+
silent_result = agent_app.acquire_token_silent(
228+
[_GRAPH_SCOPE], account=account)
229+
self.assertIsNotNone(silent_result,
230+
"acquire_token_silent should return cached token")
231+
self.assertIn("access_token", silent_result)
232+
self.assertEqual(result["access_token"], silent_result["access_token"],
233+
"Silent call should return the same cached token")
234+
235+
236+
class TestAgentCacheIsolation(LabBasedTestCase):
237+
"""App-only and user-scoped tokens are isolated in cache on the same CCA.
238+
239+
Corresponds to Java's agentCca_AppAndUserTokens_CacheIsolation.
240+
"""
241+
242+
def test_app_and_user_tokens_are_isolated(self):
243+
def assertion_provider(context):
244+
return _acquire_fmi_credential_for_agent(_AGENT_APP_ID)
245+
246+
agent_app = msal.ConfidentialClientApplication(
247+
_AGENT_APP_ID,
248+
client_credential={"client_assertion": assertion_provider},
249+
authority=_AUTHORITY,
250+
http_client=MinimalHttpClient(),
251+
)
252+
253+
# Acquire app-only token
254+
app_result = agent_app.acquire_token_for_client([_GRAPH_SCOPE])
255+
self.assertIn("access_token", app_result,
256+
"App token acquisition failed: {}: {}".format(
257+
app_result.get("error"), app_result.get("error_description")))
258+
259+
# Acquire user token via user_fic
260+
t2 = _acquire_instance_token_for_agent()
261+
user_result = agent_app.acquire_token_by_user_federated_identity_credential(
262+
[_GRAPH_SCOPE], assertion=t2, username=_USER_UPN)
263+
self.assertIn("access_token", user_result,
264+
"User token acquisition failed: {}: {}".format(
265+
user_result.get("error"), user_result.get("error_description")))
266+
267+
# Tokens should be different (app-scoped vs user-scoped)
268+
self.assertNotEqual(app_result["access_token"], user_result["access_token"],
269+
"App token and user token should be different")
270+
271+
# Verify both are independently retrievable
272+
# App token: second call should return from cache
273+
app_cached = agent_app.acquire_token_for_client([_GRAPH_SCOPE])
274+
self.assertEqual("cache", app_cached.get("token_source"),
275+
"App token should be returned from cache on second call")
276+
self.assertEqual(app_result["access_token"], app_cached["access_token"])
277+
278+
# User token: silent call should return from cache
279+
accounts = agent_app.get_accounts()
280+
self.assertTrue(len(accounts) > 0)
281+
user_cached = agent_app.acquire_token_silent(
282+
[_GRAPH_SCOPE], account=accounts[0])
283+
self.assertIsNotNone(user_cached)
284+
self.assertEqual(user_result["access_token"], user_cached["access_token"])
285+
286+
287+
if __name__ == "__main__":
288+
unittest.main()

0 commit comments

Comments
 (0)