Skip to content

Commit e0580f5

Browse files
committed
Added withFmi method for cca app
1 parent d7e0e11 commit e0580f5

File tree

3 files changed

+278
-0
lines changed

3 files changed

+278
-0
lines changed

msal/application.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2491,6 +2491,32 @@ def remove_tokens_for_client(self):
24912491
self.token_cache.remove_at(at)
24922492
# acquire_token_for_client() obtains no RTs, so we have no RT to remove
24932493

2494+
def acquire_token_for_client_with_fmi_path(self, scopes, fmi_path, claims_challenge=None, **kwargs):
2495+
"""Acquires token for the current confidential client with a Federated Managed Identity (FMI) path.
2496+
2497+
This is a convenience wrapper around :func:`~acquire_token_for_client`
2498+
that attaches the ``fmi_path`` parameter to the token request body.
2499+
2500+
:param list[str] scopes: (Required)
2501+
Scopes requested to access a protected API (a resource).
2502+
:param str fmi_path: (Required)
2503+
The Federated Managed Identity path to attach to the request.
2504+
:param claims_challenge:
2505+
The claims_challenge parameter requests specific claims requested by the resource provider
2506+
in the form of a claims_challenge directive in the www-authenticate header to be
2507+
returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2508+
It is a string of a JSON object which contains lists of claims being requested from these locations.
2509+
2510+
:return: A dict representing the json response from Microsoft Entra:
2511+
2512+
- A successful response would contain "access_token" key,
2513+
- an error response would contain "error" and usually "error_description".
2514+
"""
2515+
data = kwargs.pop("data", {})
2516+
data["fmi_path"] = fmi_path
2517+
return self.acquire_token_for_client(
2518+
scopes, claims_challenge=claims_challenge, data=data, **kwargs)
2519+
24942520
def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs):
24952521
"""Acquires token using on-behalf-of (OBO) flow.
24962522

tests/test_application.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,105 @@ def test_organizations_authority_should_emit_warning(self):
707707
authority="https://login.microsoftonline.com/organizations")
708708

709709

710+
@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK)
711+
class TestAcquireTokenForClientWithFmiPath(unittest.TestCase):
712+
"""Test that acquire_token_for_client_with_fmi_path attaches fmi_path to HTTP body."""
713+
714+
def test_fmi_path_is_included_in_request_body(self):
715+
app = ConfidentialClientApplication(
716+
"client_id", client_credential="secret",
717+
authority="https://login.microsoftonline.com/my_tenant")
718+
fmi_path = "SomeFmiPath/FmiCredentialPath"
719+
captured_data = {}
720+
721+
def mock_post(url, headers=None, data=None, *args, **kwargs):
722+
captured_data.update(data or {})
723+
return MinimalResponse(
724+
status_code=200, text=json.dumps({
725+
"access_token": "an AT",
726+
"expires_in": 3600,
727+
}))
728+
729+
result = app.acquire_token_for_client_with_fmi_path(
730+
["scope"], fmi_path, post=mock_post)
731+
self.assertIn("access_token", result)
732+
self.assertIn("fmi_path", captured_data,
733+
"fmi_path should be present in the HTTP request body")
734+
self.assertEqual(fmi_path, captured_data["fmi_path"],
735+
"fmi_path value should match the input")
736+
737+
def test_fmi_path_coexists_with_other_data(self):
738+
app = ConfidentialClientApplication(
739+
"client_id", client_credential="secret",
740+
authority="https://login.microsoftonline.com/my_tenant")
741+
fmi_path = "another/fmi/path"
742+
captured_data = {}
743+
744+
def mock_post(url, headers=None, data=None, *args, **kwargs):
745+
captured_data.update(data or {})
746+
return MinimalResponse(
747+
status_code=200, text=json.dumps({
748+
"access_token": "an AT",
749+
"expires_in": 3600,
750+
}))
751+
752+
result = app.acquire_token_for_client_with_fmi_path(
753+
["scope"], fmi_path, post=mock_post)
754+
self.assertIn("access_token", result)
755+
self.assertEqual(fmi_path, captured_data["fmi_path"])
756+
self.assertEqual("client_credentials", captured_data.get("grant_type"))
757+
758+
def test_fmi_path_preserves_existing_data_params(self):
759+
app = ConfidentialClientApplication(
760+
"client_id", client_credential="secret",
761+
authority="https://login.microsoftonline.com/my_tenant")
762+
fmi_path = "my/fmi/path"
763+
captured_data = {}
764+
765+
def mock_post(url, headers=None, data=None, *args, **kwargs):
766+
captured_data.update(data or {})
767+
return MinimalResponse(
768+
status_code=200, text=json.dumps({
769+
"access_token": "an AT",
770+
"expires_in": 3600,
771+
}))
772+
773+
result = app.acquire_token_for_client_with_fmi_path(
774+
["scope"], fmi_path,
775+
data={"extra_key": "extra_value"},
776+
post=mock_post)
777+
self.assertIn("access_token", result)
778+
self.assertEqual(fmi_path, captured_data["fmi_path"])
779+
self.assertEqual("extra_value", captured_data.get("extra_key"),
780+
"Pre-existing data params should be preserved")
781+
782+
def test_cached_token_is_returned_on_second_call(self):
783+
app = ConfidentialClientApplication(
784+
"client_id", client_credential="secret",
785+
authority="https://login.microsoftonline.com/my_tenant")
786+
fmi_path = "SomeFmiPath/FmiCredentialPath"
787+
call_count = [0]
788+
789+
def mock_post(url, headers=None, data=None, *args, **kwargs):
790+
call_count[0] += 1
791+
return MinimalResponse(
792+
status_code=200, text=json.dumps({
793+
"access_token": "an AT",
794+
"expires_in": 3600,
795+
}))
796+
797+
result1 = app.acquire_token_for_client_with_fmi_path(
798+
["scope"], fmi_path, post=mock_post)
799+
self.assertIn("access_token", result1)
800+
self.assertEqual(result1[app._TOKEN_SOURCE], app._TOKEN_SOURCE_IDP)
801+
802+
result2 = app.acquire_token_for_client_with_fmi_path(
803+
["scope"], fmi_path, post=mock_post)
804+
self.assertIn("access_token", result2)
805+
self.assertEqual(result2[app._TOKEN_SOURCE], app._TOKEN_SOURCE_CACHE,
806+
"Second call should return token from cache")
807+
808+
710809
@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK)
711810
class TestRemoveTokensForClient(unittest.TestCase):
712811
def test_remove_tokens_for_client_should_remove_client_tokens_only(self):

tests/test_fmi_e2e.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
"""End-to-end tests for Federated Managed Identity (FMI) functionality.
2+
3+
These tests verify:
4+
1. Tokens can be acquired using certificate authentication with FMI path
5+
2. Tokens are properly cached and returned from cache on subsequent calls
6+
3. Tokens can be acquired using an assertion callback (RMA pattern) with FMI path
7+
8+
"""
9+
10+
import logging
11+
import os
12+
import sys
13+
import unittest
14+
15+
import msal
16+
from tests.http_client import MinimalHttpClient
17+
from tests.lab_config import get_client_certificate
18+
from tests.test_e2e import LabBasedTestCase
19+
20+
logger = logging.getLogger(__name__)
21+
logging.basicConfig(level=logging.DEBUG if "-v" in sys.argv else logging.INFO)
22+
23+
# Test configuration
24+
_FMI_TENANT_ID = "f645ad92-e38d-4d1a-b510-d1b09a74a8ca"
25+
_FMI_CLIENT_ID = "4df2cbbb-8612-49c1-87c8-f334d6d065ad"
26+
_FMI_SCOPE = "3091264c-7afb-45d4-b527-39737ee86187/.default"
27+
_FMI_PATH = "SomeFmiPath/FmiCredentialPath"
28+
_FMI_CLIENT_ID_URN = "urn:microsoft:identity:fmi"
29+
_FMI_SCOPE_FOR_RMA = "api://AzureFMITokenExchange/.default"
30+
_AUTHORITY_URL = "https://login.microsoftonline.com/" + _FMI_TENANT_ID
31+
32+
33+
def _get_fmi_credential_from_rma():
34+
"""Acquire an FMI token from RMA service using certificate credentials.
35+
36+
This mirrors the Go function GetFmiCredentialFromRma:
37+
1. Create a confidential client with certificate credential
38+
2. Acquire a token for the FMI scope with the FMI path
39+
3. Return the access token as an assertion string
40+
"""
41+
42+
app = msal.ConfidentialClientApplication(
43+
_FMI_CLIENT_ID,
44+
client_credential=get_client_certificate(),
45+
authority=_AUTHORITY_URL,
46+
http_client=MinimalHttpClient(),
47+
)
48+
result = app.acquire_token_for_client_with_fmi_path(
49+
[_FMI_SCOPE_FOR_RMA], _FMI_PATH)
50+
if "access_token" not in result:
51+
raise RuntimeError(
52+
"Failed to acquire FMI token from RMA: {}: {}".format(
53+
result.get("error"), result.get("error_description")))
54+
return result["access_token"]
55+
56+
57+
class TestFMIBasicFunctionality(LabBasedTestCase):
58+
"""Test basic FMI token acquisition with certificate credential.
59+
60+
Mirrors TestFMIBasicFunctionality from Go:
61+
1. Acquire token by credential with FMI path
62+
2. Verify silent (cached) token acquisition works
63+
3. Validate tokens match (proving cache was used)
64+
"""
65+
66+
def test_acquire_and_cache_with_fmi_path(self):
67+
app = msal.ConfidentialClientApplication(
68+
_FMI_CLIENT_ID,
69+
client_credential=get_client_certificate(),
70+
authority=_AUTHORITY_URL,
71+
http_client=MinimalHttpClient(),
72+
)
73+
scopes = [_FMI_SCOPE]
74+
75+
# 1. Acquire token by credential with FMI path
76+
result = app.acquire_token_for_client_with_fmi_path(scopes, _FMI_PATH)
77+
self.assertIn("access_token", result,
78+
"acquire_token_for_client_with_fmi_path() failed: {}: {}".format(
79+
result.get("error"), result.get("error_description")))
80+
self.assertNotEqual("", result["access_token"],
81+
"acquire_token_for_client_with_fmi_path() returned empty access token")
82+
83+
first_token = result["access_token"]
84+
85+
# 2. Verify silent token acquisition works (should retrieve from cache)
86+
cache_result = app.acquire_token_for_client_with_fmi_path(scopes, _FMI_PATH)
87+
self.assertIn("access_token", cache_result,
88+
"Second call failed: {}: {}".format(
89+
cache_result.get("error"), cache_result.get("error_description")))
90+
self.assertNotEqual("", cache_result["access_token"],
91+
"Second call returned empty access token")
92+
self.assertEqual(
93+
cache_result.get("token_source"), "cache",
94+
"Second call should return token from cache")
95+
96+
# 3. Validate tokens match (proving cache was used)
97+
self.assertEqual(first_token, cache_result["access_token"],
98+
"Token comparison failed - tokens don't match, "
99+
"cache might not be working correctly")
100+
101+
class TestFMIIntegration(LabBasedTestCase):
102+
"""Test FMI with assertion callback (RMA pattern).
103+
104+
Mirrors TestFMIIntegration from Go:
105+
1. Get credentials from RMA via assertion callback
106+
2. Acquire token by credential with FMI path
107+
3. Verify cached token acquisition works
108+
4. Compare tokens to verify cache was used
109+
"""
110+
111+
def test_acquire_with_assertion_callback_and_fmi_path(self):
112+
# Create credential from assertion callback (mirrors Go's NewCredFromAssertionCallback)
113+
client_credential = {
114+
"client_assertion": lambda: _get_fmi_credential_from_rma(),
115+
}
116+
117+
app = msal.ConfidentialClientApplication(
118+
_FMI_CLIENT_ID_URN,
119+
client_credential=client_credential,
120+
authority=_AUTHORITY_URL,
121+
http_client=MinimalHttpClient(),
122+
)
123+
scopes = [_FMI_SCOPE]
124+
fmi_path = "SomeFmiPath/Path"
125+
126+
# 1. Acquire token by credential with FMI path
127+
result = app.acquire_token_for_client_with_fmi_path(scopes, fmi_path)
128+
self.assertIn("access_token", result,
129+
"acquire_token_for_client_with_fmi_path() failed: {}: {}".format(
130+
result.get("error"), result.get("error_description")))
131+
self.assertNotEqual("", result["access_token"],
132+
"acquire_token_for_client_with_fmi_path() returned empty access token")
133+
first_token = result["access_token"]
134+
135+
# 2. Verify cached token acquisition works
136+
cache_result = app.acquire_token_for_client_with_fmi_path(scopes, fmi_path)
137+
self.assertIn("access_token", cache_result,
138+
"Second call failed: {}: {}".format(
139+
cache_result.get("error"), cache_result.get("error_description")))
140+
self.assertNotEqual("", cache_result["access_token"],
141+
"Second call returned empty access token")
142+
self.assertEqual(
143+
cache_result.get("token_source"), "cache",
144+
"Second call should return token from cache")
145+
146+
# 3. Compare tokens to verify cache was used
147+
self.assertEqual(first_token, cache_result["access_token"],
148+
"Token comparison failed - tokens don't match, "
149+
"cache might not be working correctly")
150+
151+
152+
if __name__ == "__main__":
153+
unittest.main()

0 commit comments

Comments
 (0)