Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions msal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
SystemAssignedManagedIdentity, UserAssignedManagedIdentity,
ManagedIdentityClient,
ManagedIdentityError,
MsiV2Error,
ArcPlatformNotSupportedError,
)

Expand Down
51 changes: 44 additions & 7 deletions msal/managed_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class ManagedIdentityError(ValueError):
pass


class MsiV2Error(ManagedIdentityError):
"""Raised when the MSI v2 (mTLS PoP) flow fails."""
pass


class ManagedIdentity(UserDict):
"""Feed an instance of this class to :class:`msal.ManagedIdentityClient`
to acquire token for the specified managed identity.
Expand Down Expand Up @@ -166,6 +171,7 @@ def __init__(
token_cache=None,
http_cache=None,
client_capabilities: Optional[List[str]] = None,
msi_v2_enabled: Optional[bool] = None,
):
"""Create a managed identity client.

Expand Down Expand Up @@ -207,6 +213,17 @@ def __init__(
Client capability in Managed Identity is relayed as-is
via ``xms_cc`` parameter on the wire.

:param bool msi_v2_enabled: (optional)
Enable MSI v2 (mTLS PoP) token acquisition.
When True (or when the ``MSAL_ENABLE_MSI_V2`` environment variable
is set to a truthy value), the client will attempt to acquire tokens
using the MSI v2 flow (IMDS /issuecredential + mTLS PoP).
If the MSI v2 flow fails, it automatically falls back to MSI v1.
MSI v2 only applies to Azure VM (IMDS) environments; it is ignored
in other managed identity environments (App Service, Service Fabric,
Azure Arc, etc.).
Defaults to None (disabled unless the env var is set).

Recipe 1: Hard code a managed identity for your app::

import msal, requests
Expand Down Expand Up @@ -253,6 +270,11 @@ def __init__(
)
self._token_cache = token_cache or TokenCache()
self._client_capabilities = client_capabilities
# MSI v2 is enabled by the constructor param or the MSAL_ENABLE_MSI_V2 env var
if msi_v2_enabled is None:
env_val = os.environ.get("MSAL_ENABLE_MSI_V2", "").lower()
msi_v2_enabled = env_val in ("1", "true", "yes")
self._msi_v2_enabled = msi_v2_enabled

def acquire_token_for_client(
self,
Expand Down Expand Up @@ -326,13 +348,28 @@ def acquire_token_for_client(
break # With a fallback in hand, we break here to go refresh
return access_token_from_cache # It is still good as new
try:
result = _obtain_token(
self._http_client, self._managed_identity, resource,
access_token_sha256_to_refresh=hashlib.sha256(
access_token_to_refresh.encode("utf-8")).hexdigest()
if access_token_to_refresh else None,
client_capabilities=self._client_capabilities,
)
result = None
if self._msi_v2_enabled:
try:
from .msi_v2 import obtain_token as _obtain_token_v2
result = _obtain_token_v2(
self._http_client, self._managed_identity, resource)
logger.debug("MSI v2 token acquisition succeeded")
except MsiV2Error as exc:
logger.warning(
"MSI v2 flow failed, falling back to MSI v1: %s", exc)
except Exception as exc: # pylint: disable=broad-except
logger.warning(
"MSI v2 encountered unexpected error, "
"falling back to MSI v1: %s", exc)
if result is None:
result = _obtain_token(
self._http_client, self._managed_identity, resource,
access_token_sha256_to_refresh=hashlib.sha256(
access_token_to_refresh.encode("utf-8")).hexdigest()
if access_token_to_refresh else None,
client_capabilities=self._client_capabilities,
)
if "access_token" in result:
expires_in = result.get("expires_in", 3600)
if "refresh_in" not in result and expires_in >= 7200:
Expand Down
Loading
Loading