Skip to content

Commit d83635e

Browse files
committed
updated API for fmi
1 parent a6d3be3 commit d83635e

File tree

4 files changed

+69
-71
lines changed

4 files changed

+69
-71
lines changed

msal/application.py

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2427,7 +2427,7 @@ class ConfidentialClientApplication(ClientApplication): # server-side web app
24272427
except that ``allow_broker`` parameter shall remain ``None``.
24282428
"""
24292429

2430-
def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
2430+
def acquire_token_for_client(self, scopes, claims_challenge=None, fmi_path=None, **kwargs):
24312431
"""Acquires token for the current confidential client, not for an end user.
24322432
24332433
Since MSAL Python 1.23, it will automatically look for token from cache,
@@ -2440,7 +2440,17 @@ def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
24402440
in the form of a claims_challenge directive in the www-authenticate header to be
24412441
returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
24422442
It is a string of a JSON object which contains lists of claims being requested from these locations.
2443-
2443+
:param str fmi_path:
2444+
Optional. The Federated Managed Identity (FMI) credential path.
2445+
When provided, it is sent as the ``fmi_path`` parameter in the
2446+
token request body, and the resulting token is cached separately
2447+
so that different FMI paths do not share cached tokens.
2448+
Example usage::
2449+
2450+
result = cca.acquire_token_for_client(
2451+
scopes=["api://resource/.default"],
2452+
fmi_path="SomeFmiPath/FmiCredentialPath",
2453+
)
24442454
:return: A dict representing the json response from Microsoft Entra:
24452455
24462456
- A successful response would contain "access_token" key,
@@ -2450,6 +2460,12 @@ def acquire_token_for_client(self, scopes, claims_challenge=None, **kwargs):
24502460
raise ValueError( # We choose to disallow force_refresh
24512461
"Historically, this method does not support force_refresh behavior. "
24522462
)
2463+
if fmi_path is not None:
2464+
if not isinstance(fmi_path, str):
2465+
raise ValueError(
2466+
"fmi_path must be a string, got {}".format(type(fmi_path).__name__))
2467+
kwargs["data"] = kwargs.get("data", {})
2468+
kwargs["data"]["fmi_path"] = fmi_path
24532469
return _clean_up(self._acquire_token_silent_with_error(
24542470
scopes, None, claims_challenge=claims_challenge, **kwargs))
24552471

@@ -2494,32 +2510,6 @@ def remove_tokens_for_client(self):
24942510
self.token_cache.remove_at(at)
24952511
# acquire_token_for_client() obtains no RTs, so we have no RT to remove
24962512

2497-
def acquire_token_for_client_with_fmi_path(self, scopes, fmi_path, claims_challenge=None, **kwargs):
2498-
"""Acquires token for the current confidential client with a Federated Managed Identity (FMI) path.
2499-
2500-
This is a convenience wrapper around :func:`~acquire_token_for_client`
2501-
that attaches the ``fmi_path`` parameter to the token request body.
2502-
2503-
:param list[str] scopes: (Required)
2504-
Scopes requested to access a protected API (a resource).
2505-
:param str fmi_path: (Required)
2506-
The Federated Managed Identity path to attach to the request.
2507-
:param claims_challenge:
2508-
The claims_challenge parameter requests specific claims requested by the resource provider
2509-
in the form of a claims_challenge directive in the www-authenticate header to be
2510-
returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
2511-
It is a string of a JSON object which contains lists of claims being requested from these locations.
2512-
2513-
:return: A dict representing the json response from Microsoft Entra:
2514-
2515-
- A successful response would contain "access_token" key,
2516-
- an error response would contain "error" and usually "error_description".
2517-
"""
2518-
data = kwargs.pop("data", {})
2519-
data["fmi_path"] = fmi_path
2520-
return self.acquire_token_for_client(
2521-
scopes, claims_challenge=claims_challenge, data=data, **kwargs)
2522-
25232513
def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs):
25242514
"""Acquires token using on-behalf-of (OBO) flow.
25252515

msal/token_cache.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
#
2121
# Excluded fields and reasons:
2222
# - "client_id" : Standard OAuth2 client identifier, same for every request
23-
# - "grant_type" : Standard OAuth2 grant type (e.g. jwt-bearer, refresh_token)
23+
# - "grant_type" : It is possible to combine grants to get tokens, e.g. obo + refresh_token, auth_code + refresh_token etc.
2424
# - "scope" : Already represented as "target" in the AT cache key
2525
# - "claims" : Handled separately; its presence forces a token refresh
26-
# - "username" : Standard ROPC grant parameter
27-
# - "password" : Standard ROPC grant parameter
26+
# - "username" : Standard ROPC grant parameter. Tokens are cached by user ID (subject or oid+tid) instead
27+
# - "password" : Standard ROPC grant parameter. Tokens are tied to credentials.
2828
# - "refresh_token" : Standard refresh grant parameter
2929
# - "code" : Standard authorization code grant parameter
3030
# - "redirect_uri" : Standard authorization code grant parameter

tests/test_application.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,15 @@ def test_organizations_authority_should_emit_warning(self):
709709

710710
@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK)
711711
class TestAcquireTokenForClientWithFmiPath(unittest.TestCase):
712-
"""Test that acquire_token_for_client_with_fmi_path attaches fmi_path to HTTP body."""
712+
"""Test that acquire_token_for_client(fmi_path=...) attaches fmi_path to HTTP body."""
713+
714+
def test_fmi_path_rejects_non_string_types(self):
715+
app = ConfidentialClientApplication(
716+
"client_id", client_credential="secret",
717+
authority="https://login.microsoftonline.com/my_tenant")
718+
for bad_value in [123, True, ["path"], {"path": "value"}, b"bytes"]:
719+
with self.assertRaises(ValueError, msg="fmi_path={!r} should raise".format(bad_value)):
720+
app.acquire_token_for_client(["scope"], fmi_path=bad_value)
713721

714722
def test_fmi_path_is_included_in_request_body(self):
715723
app = ConfidentialClientApplication(
@@ -726,8 +734,8 @@ def mock_post(url, headers=None, data=None, *args, **kwargs):
726734
"expires_in": 3600,
727735
}))
728736

729-
result = app.acquire_token_for_client_with_fmi_path(
730-
["scope"], fmi_path, post=mock_post)
737+
result = app.acquire_token_for_client(
738+
["scope"], fmi_path=fmi_path, post=mock_post)
731739
self.assertIn("access_token", result)
732740
self.assertIn("fmi_path", captured_data,
733741
"fmi_path should be present in the HTTP request body")
@@ -749,8 +757,8 @@ def mock_post(url, headers=None, data=None, *args, **kwargs):
749757
"expires_in": 3600,
750758
}))
751759

752-
result = app.acquire_token_for_client_with_fmi_path(
753-
["scope"], fmi_path, post=mock_post)
760+
result = app.acquire_token_for_client(
761+
["scope"], fmi_path=fmi_path, post=mock_post)
754762
self.assertIn("access_token", result)
755763
self.assertEqual(fmi_path, captured_data["fmi_path"])
756764
self.assertEqual("client_credentials", captured_data.get("grant_type"))
@@ -770,8 +778,8 @@ def mock_post(url, headers=None, data=None, *args, **kwargs):
770778
"expires_in": 3600,
771779
}))
772780

773-
result = app.acquire_token_for_client_with_fmi_path(
774-
["scope"], fmi_path,
781+
result = app.acquire_token_for_client(
782+
["scope"], fmi_path=fmi_path,
775783
data={"extra_key": "extra_value"},
776784
post=mock_post)
777785
self.assertIn("access_token", result)
@@ -794,13 +802,13 @@ def mock_post(url, headers=None, data=None, *args, **kwargs):
794802
"expires_in": 3600,
795803
}))
796804

797-
result1 = app.acquire_token_for_client_with_fmi_path(
798-
["scope"], fmi_path, post=mock_post)
805+
result1 = app.acquire_token_for_client(
806+
["scope"], fmi_path=fmi_path, post=mock_post)
799807
self.assertIn("access_token", result1)
800808
self.assertEqual(result1[app._TOKEN_SOURCE], app._TOKEN_SOURCE_IDP)
801809

802-
result2 = app.acquire_token_for_client_with_fmi_path(
803-
["scope"], fmi_path, post=mock_post)
810+
result2 = app.acquire_token_for_client(
811+
["scope"], fmi_path=fmi_path, post=mock_post)
804812
self.assertIn("access_token", result2)
805813
self.assertEqual(result2[app._TOKEN_SOURCE], app._TOKEN_SOURCE_CACHE,
806814
"Second call should return token from cache")
@@ -821,20 +829,20 @@ def mock_post(url, headers=None, data=None, *args, **kwargs):
821829
return mock_post
822830

823831
# Acquire token with path A
824-
result_a = app.acquire_token_for_client_with_fmi_path(
825-
["scope"], "PathA/credential", post=mock_post_factory("AT_for_path_A"))
832+
result_a = app.acquire_token_for_client(
833+
["scope"], fmi_path="PathA/credential", post=mock_post_factory("AT_for_path_A"))
826834
self.assertEqual("AT_for_path_A", result_a["access_token"])
827835

828836
# Acquire token with path B (should NOT get path A's cached token)
829-
result_b = app.acquire_token_for_client_with_fmi_path(
830-
["scope"], "PathB/credential", post=mock_post_factory("AT_for_path_B"))
837+
result_b = app.acquire_token_for_client(
838+
["scope"], fmi_path="PathB/credential", post=mock_post_factory("AT_for_path_B"))
831839
self.assertEqual("AT_for_path_B", result_b["access_token"])
832840
self.assertEqual(result_b[app._TOKEN_SOURCE], app._TOKEN_SOURCE_IDP,
833841
"Different FMI path should NOT return a cached token from another path")
834842

835843
# Verify path A still returns its own cached token
836-
result_a2 = app.acquire_token_for_client_with_fmi_path(
837-
["scope"], "PathA/credential", post=mock_post_factory("should_not_be_used"))
844+
result_a2 = app.acquire_token_for_client(
845+
["scope"], fmi_path="PathA/credential", post=mock_post_factory("should_not_be_used"))
838846
self.assertEqual("AT_for_path_A", result_a2["access_token"])
839847
self.assertEqual(result_a2[app._TOKEN_SOURCE], app._TOKEN_SOURCE_CACHE,
840848
"Same FMI path should return cached token")
@@ -846,8 +854,8 @@ def test_fmi_token_does_not_interfere_with_non_fmi_token(self):
846854
authority="https://login.microsoftonline.com/my_tenant")
847855

848856
# First, cache a token via FMI path
849-
app.acquire_token_for_client_with_fmi_path(
850-
["scope"], "some/fmi/path",
857+
app.acquire_token_for_client(
858+
["scope"], fmi_path="some/fmi/path",
851859
post=lambda url, **kwargs: MinimalResponse(
852860
status_code=200, text=json.dumps({
853861
"access_token": "FMI_AT", "expires_in": 3600})))

tests/test_fmi_e2e.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ def _get_fmi_credential_from_rma():
4545
authority=_AUTHORITY_URL,
4646
http_client=MinimalHttpClient(),
4747
)
48-
result = app.acquire_token_for_client_with_fmi_path(
49-
[_FMI_SCOPE_FOR_RMA], _FMI_PATH)
48+
result = app.acquire_token_for_client(
49+
[_FMI_SCOPE_FOR_RMA], fmi_path=_FMI_PATH)
5050
if "access_token" not in result:
5151
raise RuntimeError(
5252
"Failed to acquire FMI token from RMA: {}: {}".format(
@@ -73,17 +73,17 @@ def test_acquire_and_cache_with_fmi_path(self):
7373
scopes = [_FMI_SCOPE]
7474

7575
# 1. Acquire token by credential with FMI path
76-
result = app.acquire_token_for_client_with_fmi_path(scopes, _FMI_PATH)
76+
result = app.acquire_token_for_client(scopes, fmi_path=_FMI_PATH)
7777
self.assertIn("access_token", result,
78-
"acquire_token_for_client_with_fmi_path() failed: {}: {}".format(
78+
"acquire_token_for_client(fmi_path=...) failed: {}: {}".format(
7979
result.get("error"), result.get("error_description")))
8080
self.assertNotEqual("", result["access_token"],
81-
"acquire_token_for_client_with_fmi_path() returned empty access token")
81+
"acquire_token_for_client(fmi_path=...) returned empty access token")
8282

8383
first_token = result["access_token"]
8484

8585
# 2. Verify silent token acquisition works (should retrieve from cache)
86-
cache_result = app.acquire_token_for_client_with_fmi_path(scopes, _FMI_PATH)
86+
cache_result = app.acquire_token_for_client(scopes, fmi_path=_FMI_PATH)
8787
self.assertIn("access_token", cache_result,
8888
"Second call failed: {}: {}".format(
8989
cache_result.get("error"), cache_result.get("error_description")))
@@ -124,16 +124,16 @@ def test_acquire_with_assertion_callback_and_fmi_path(self):
124124
fmi_path = "SomeFmiPath/Path"
125125

126126
# 1. Acquire token by credential with FMI path
127-
result = app.acquire_token_for_client_with_fmi_path(scopes, fmi_path)
127+
result = app.acquire_token_for_client(scopes, fmi_path=fmi_path)
128128
self.assertIn("access_token", result,
129-
"acquire_token_for_client_with_fmi_path() failed: {}: {}".format(
129+
"acquire_token_for_client(fmi_path=...) failed: {}: {}".format(
130130
result.get("error"), result.get("error_description")))
131131
self.assertNotEqual("", result["access_token"],
132-
"acquire_token_for_client_with_fmi_path() returned empty access token")
132+
"acquire_token_for_client(fmi_path=...) returned empty access token")
133133
first_token = result["access_token"]
134134

135135
# 2. Verify cached token acquisition works
136-
cache_result = app.acquire_token_for_client_with_fmi_path(scopes, fmi_path)
136+
cache_result = app.acquire_token_for_client(scopes, fmi_path=fmi_path)
137137
self.assertIn("access_token", cache_result,
138138
"Second call failed: {}: {}".format(
139139
cache_result.get("error"), cache_result.get("error_description")))
@@ -166,15 +166,15 @@ def test_different_fmi_paths_are_cached_separately(self):
166166
scopes = [_FMI_SCOPE]
167167

168168
# Acquire token with path A
169-
result_a = app.acquire_token_for_client_with_fmi_path(
170-
scopes, "PathA/credential")
169+
result_a = app.acquire_token_for_client(
170+
scopes, fmi_path="PathA/credential")
171171
self.assertIn("access_token", result_a,
172172
"Path A acquisition failed: {}: {}".format(
173173
result_a.get("error"), result_a.get("error_description")))
174174

175175
# Acquire token with path B — should NOT get path A's cached token
176-
result_b = app.acquire_token_for_client_with_fmi_path(
177-
scopes, "PathB/credential")
176+
result_b = app.acquire_token_for_client(
177+
scopes, fmi_path="PathB/credential")
178178
self.assertIn("access_token", result_b,
179179
"Path B acquisition failed: {}: {}".format(
180180
result_b.get("error"), result_b.get("error_description")))
@@ -183,8 +183,8 @@ def test_different_fmi_paths_are_cached_separately(self):
183183
"Different FMI path should NOT return cached token from another path")
184184

185185
# Verify path A still returns its own cached token
186-
result_a2 = app.acquire_token_for_client_with_fmi_path(
187-
scopes, "PathA/credential")
186+
result_a2 = app.acquire_token_for_client(
187+
scopes, fmi_path="PathA/credential")
188188
self.assertIn("access_token", result_a2)
189189
self.assertEqual(
190190
result_a2.get("token_source"), "cache",
@@ -201,7 +201,7 @@ def test_fmi_token_does_not_interfere_with_non_fmi_token(self):
201201
scopes = [_FMI_SCOPE]
202202

203203
# Cache a token via FMI path
204-
fmi_result = app.acquire_token_for_client_with_fmi_path(scopes, _FMI_PATH)
204+
fmi_result = app.acquire_token_for_client(scopes, fmi_path=_FMI_PATH)
205205
self.assertIn("access_token", fmi_result)
206206

207207
# Regular acquire_token_for_client should NOT get the FMI token
@@ -230,14 +230,14 @@ def test_two_fmi_paths_produce_separate_cache_entries(self):
230230
path_b = "PathBeta/Credential"
231231

232232
# 1. Acquire token with path A
233-
result_a = app.acquire_token_for_client_with_fmi_path(scopes, path_a)
233+
result_a = app.acquire_token_for_client(scopes, fmi_path=path_a)
234234
self.assertIn("access_token", result_a,
235235
"Path A acquisition failed: {}: {}".format(
236236
result_a.get("error"), result_a.get("error_description")))
237237
token_a = result_a["access_token"]
238238

239239
# 2. Acquire token with path B
240-
result_b = app.acquire_token_for_client_with_fmi_path(scopes, path_b)
240+
result_b = app.acquire_token_for_client(scopes, fmi_path=path_b)
241241
self.assertIn("access_token", result_b,
242242
"Path B acquisition failed: {}: {}".format(
243243
result_b.get("error"), result_b.get("error_description")))
@@ -268,12 +268,12 @@ def test_two_fmi_paths_produce_separate_cache_entries(self):
268268
"ext_cache_key values for different FMI paths must differ")
269269

270270
# 5. Verify each path still returns its own cached token
271-
cached_a = app.acquire_token_for_client_with_fmi_path(scopes, path_a)
271+
cached_a = app.acquire_token_for_client(scopes, fmi_path=path_a)
272272
self.assertEqual("cache", cached_a.get("token_source"))
273273
self.assertEqual(token_a, cached_a["access_token"],
274274
"Path A should return its own cached token")
275275

276-
cached_b = app.acquire_token_for_client_with_fmi_path(scopes, path_b)
276+
cached_b = app.acquire_token_for_client(scopes, fmi_path=path_b)
277277
self.assertEqual("cache", cached_b.get("token_source"))
278278
self.assertEqual(token_b, cached_b["access_token"],
279279
"Path B should return its own cached token")

0 commit comments

Comments
 (0)