Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions msal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
SystemAssignedManagedIdentity, UserAssignedManagedIdentity,
ManagedIdentityClient,
ManagedIdentityError,
MsiV2Error,
ArcPlatformNotSupportedError,
)

Expand Down
47 changes: 46 additions & 1 deletion msal/managed_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class ManagedIdentityError(ValueError):
pass


class MsiV2Error(ManagedIdentityError):
"""Raised when the MSI v2 (mTLS PoP) flow fails."""
pass


class ManagedIdentity(UserDict):
"""Feed an instance of this class to :class:`msal.ManagedIdentityClient`
to acquire token for the specified managed identity.
Expand Down Expand Up @@ -259,6 +264,8 @@ def acquire_token_for_client(
*,
resource: str, # If/when we support scope, resource will become optional
claims_challenge: Optional[str] = None,
mtls_proof_of_possession: bool = False,
with_attestation_support: bool = False,
):
"""Acquire token for the managed identity.

Expand All @@ -278,6 +285,23 @@ def acquire_token_for_client(
even if the app developer did not opt in for the "CP1" client capability.
Upon receiving a `claims_challenge`, MSAL will attempt to acquire a new token.

:param bool mtls_proof_of_possession: (optional)
When True, use the MSI v2 (mTLS Proof-of-Possession) flow to acquire an
``mtls_pop`` token bound to a short-lived mTLS certificate issued by the
IMDS ``/issuecredential`` endpoint.
Without this flag the legacy IMDS v1 flow is used.
Defaults to False.

MSI v2 is used only when both ``mtls_proof_of_possession`` and
``with_attestation_support`` are True.

:param bool with_attestation_support: (optional)
When True (and ``mtls_proof_of_possession`` is also True), attempt
KeyGuard / platform attestation before credential issuance.
On Windows this leverages ``AttestationClientLib.dll`` when available;
on other platforms the parameter is silently ignored.
Defaults to False.

.. note::

Known issue: When an Azure VM has only one user-assigned managed identity,
Expand All @@ -292,6 +316,27 @@ def acquire_token_for_client(
client_id_in_cache = self._managed_identity.get(
ManagedIdentity.ID, "SYSTEM_ASSIGNED_MANAGED_IDENTITY")
now = time.time()
# MSI v2 is opt-in: use it only when BOTH mtls_proof_of_possession and
# with_attestation_support are explicitly requested by the caller.
# No auto-fallback: if MSI v2 is requested and fails, the error is raised.
use_msi_v2 = bool(mtls_proof_of_possession and with_attestation_support)

if with_attestation_support and not mtls_proof_of_possession:
raise ManagedIdentityError(
"attestation_requires_pop",
"with_attestation_support=True requires mtls_proof_of_possession=True (mTLS PoP)."
)

if use_msi_v2:
from .msi_v2 import obtain_token as _obtain_token_v2
result = _obtain_token_v2(
self._http_client, self._managed_identity, resource,
attestation_enabled=True,
)
if "access_token" in result and "error" not in result:
result[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP
return result

if True: # Attempt cache search even if receiving claims_challenge,
# because we want to locate the existing token (if any) and refresh it
matches = self._token_cache.search(
Expand Down Expand Up @@ -685,4 +730,4 @@ def _obtain_token_on_arc(http_client, endpoint, resource):
return {
"error": "invalid_request",
"error_description": response.text,
}
}
Loading
Loading