Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 76 additions & 66 deletions sdk/identity/azure-identity/azure/identity/_credentials/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# ------------------------------------
import logging
import os
from typing import List, Any, Optional, cast
from typing import Dict, List, Any, Optional, cast

from azure.core.credentials import (
AccessToken,
Expand Down Expand Up @@ -141,38 +141,51 @@ class DefaultAzureCredential(ChainedTokenCredential):
:caption: Create a DefaultAzureCredential.
"""

def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statements, too-many-locals
def __init__( # pylint: disable=too-many-statements, too-many-locals
self,
*,
authority: Optional[str] = None,
exclude_environment_credential: Optional[bool] = None,
exclude_workload_identity_credential: Optional[bool] = None,
exclude_managed_identity_credential: Optional[bool] = None,
exclude_shared_token_cache_credential: Optional[bool] = None,
exclude_visual_studio_code_credential: Optional[bool] = None,
exclude_cli_credential: Optional[bool] = None,
exclude_developer_cli_credential: Optional[bool] = None,
exclude_powershell_credential: Optional[bool] = None,
exclude_interactive_browser_credential: Optional[bool] = None,
exclude_broker_credential: Optional[bool] = None,
managed_identity_client_id: Optional[str] = None,
workload_identity_client_id: Optional[str] = None,
workload_identity_tenant_id: Optional[str] = None,
interactive_browser_tenant_id: Optional[str] = None,
interactive_browser_client_id: Optional[str] = None,
broker_tenant_id: Optional[str] = None,
broker_client_id: Optional[str] = None,
shared_cache_username: Optional[str] = None,
shared_cache_tenant_id: Optional[str] = None,
visual_studio_code_tenant_id: Optional[str] = None,
process_timeout: int = 10,
require_envvar: bool = False,
**kwargs: Any,
) -> None:
if "tenant_id" in kwargs:
raise TypeError("'tenant_id' is not supported in DefaultAzureCredential.")

authority = kwargs.pop("authority", None)
authority = normalize_authority(authority) if authority else get_default_authority()

vscode_tenant_id = kwargs.pop("visual_studio_code_tenant_id", None)

interactive_browser_tenant_id = kwargs.pop(
"interactive_browser_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)
)

managed_identity_client_id = kwargs.pop(
"managed_identity_client_id", os.environ.get(EnvironmentVariables.AZURE_CLIENT_ID)
interactive_browser_tenant_id = interactive_browser_tenant_id or os.environ.get(
EnvironmentVariables.AZURE_TENANT_ID
)
workload_identity_client_id = kwargs.pop("workload_identity_client_id", managed_identity_client_id)
workload_identity_tenant_id = kwargs.pop(
"workload_identity_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)
managed_identity_client_id = managed_identity_client_id or os.environ.get(EnvironmentVariables.AZURE_CLIENT_ID)
workload_identity_client_id = workload_identity_client_id or managed_identity_client_id
workload_identity_tenant_id = workload_identity_tenant_id or os.environ.get(
EnvironmentVariables.AZURE_TENANT_ID
Comment on lines +177 to +183
Comment on lines +180 to +183
)
interactive_browser_client_id = kwargs.pop("interactive_browser_client_id", None)

broker_tenant_id = kwargs.pop("broker_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID))
broker_client_id = kwargs.pop("broker_client_id", None)
broker_tenant_id = broker_tenant_id or os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)
shared_cache_username = shared_cache_username or os.environ.get(EnvironmentVariables.AZURE_USERNAME)
shared_cache_tenant_id = shared_cache_tenant_id or os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)

shared_cache_username = kwargs.pop("shared_cache_username", os.environ.get(EnvironmentVariables.AZURE_USERNAME))
shared_cache_tenant_id = kwargs.pop(
"shared_cache_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)
)

process_timeout = kwargs.pop("process_timeout", 10)
require_envvar = kwargs.pop("require_envvar", False)
token_credentials_env = os.environ.get(EnvironmentVariables.AZURE_TOKEN_CREDENTIALS, "").strip().lower()
if require_envvar and not token_credentials_env:
raise ValueError(
Expand All @@ -181,85 +194,82 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement
)

# Define credential configuration mapping
credential_config = {
credential_config: Dict[str, Any] = {
"environment": {
"exclude_param": "exclude_environment_credential",
"env_name": "environmentcredential",
"default_exclude": False,
},
"workload_identity": {
"exclude_param": "exclude_workload_identity_credential",
"env_name": "workloadidentitycredential",
"default_exclude": False,
},
"managed_identity": {
"exclude_param": "exclude_managed_identity_credential",
"env_name": "managedidentitycredential",
"default_exclude": False,
},
"shared_token_cache": {
"exclude_param": "exclude_shared_token_cache_credential",
"default_exclude": False,
},
"visual_studio_code": {
"exclude_param": "exclude_visual_studio_code_credential",
"env_name": "visualstudiocodecredential",
"default_exclude": False,
},
"cli": {
"exclude_param": "exclude_cli_credential",
"env_name": "azureclicredential",
"default_exclude": False,
},
"developer_cli": {
"exclude_param": "exclude_developer_cli_credential",
"env_name": "azuredeveloperclicredential",
"default_exclude": False,
},
"powershell": {
"exclude_param": "exclude_powershell_credential",
"env_name": "azurepowershellcredential",
"default_exclude": False,
},
"interactive_browser": {
"exclude_param": "exclude_interactive_browser_credential",
"env_name": "interactivebrowsercredential",
"default_exclude": True,
},
"broker": {
"exclude_param": "exclude_broker_credential",
"default_exclude": False,
},
}

# Extract user-provided exclude flags and set defaults
exclude_flags = {}
user_excludes = {}
for cred_key, config in credential_config.items():
param_name = cast(str, config["exclude_param"])
user_excludes[cred_key] = kwargs.pop(param_name, None)
exclude_flags[cred_key] = config["default_exclude"]
# Build user-provided exclude flags and defaults
user_excludes = {
"environment": exclude_environment_credential,
"workload_identity": exclude_workload_identity_credential,
"managed_identity": exclude_managed_identity_credential,
"shared_token_cache": exclude_shared_token_cache_credential,
"visual_studio_code": exclude_visual_studio_code_credential,
"cli": exclude_cli_credential,
"developer_cli": exclude_developer_cli_credential,
"powershell": exclude_powershell_credential,
"interactive_browser": exclude_interactive_browser_credential,
"broker": exclude_broker_credential,
}
exclude_flags = {cred_key: config["default_exclude"] for cred_key, config in credential_config.items()}

# Process AZURE_TOKEN_CREDENTIALS environment variable and apply user overrides
exclude_flags = process_credential_exclusions(credential_config, exclude_flags, user_excludes)

# Extract individual exclude flags for backward compatibility
exclude_environment_credential = exclude_flags["environment"]
exclude_workload_identity_credential = exclude_flags["workload_identity"]
exclude_managed_identity_credential = exclude_flags["managed_identity"]
exclude_shared_token_cache_credential = exclude_flags["shared_token_cache"]
exclude_visual_studio_code_credential = exclude_flags["visual_studio_code"]
exclude_cli_credential = exclude_flags["cli"]
exclude_developer_cli_credential = exclude_flags["developer_cli"]
exclude_powershell_credential = exclude_flags["powershell"]
exclude_interactive_browser_credential = exclude_flags["interactive_browser"]
exclude_broker_credential = exclude_flags["broker"]
# Extract individual exclude flags for readability
_exclude_environment = exclude_flags["environment"]
_exclude_workload_identity = exclude_flags["workload_identity"]
_exclude_managed_identity = exclude_flags["managed_identity"]
_exclude_shared_token_cache = exclude_flags["shared_token_cache"]
_exclude_visual_studio_code = exclude_flags["visual_studio_code"]
_exclude_cli = exclude_flags["cli"]
_exclude_developer_cli = exclude_flags["developer_cli"]
_exclude_powershell = exclude_flags["powershell"]
_exclude_interactive_browser = exclude_flags["interactive_browser"]
_exclude_broker = exclude_flags["broker"]

credentials: List[SupportsTokenInfo] = []
within_dac.set(True)
if not exclude_environment_credential:
if not _exclude_environment:
credentials.append(EnvironmentCredential(authority=authority, _within_dac=True, **kwargs))
if not exclude_workload_identity_credential:
if not _exclude_workload_identity:
try:
credentials.append(
WorkloadIdentityCredential(
Expand All @@ -271,30 +281,30 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement
)
except ValueError as ex:
credentials.append(FailedDACCredential("WorkloadIdentityCredential", error=str(ex)))
if not exclude_managed_identity_credential:
if not _exclude_managed_identity:
credentials.append(
ManagedIdentityCredential(
client_id=managed_identity_client_id,
_exclude_workload_identity_credential=exclude_workload_identity_credential,
_exclude_workload_identity_credential=_exclude_workload_identity,
_enable_imds_probe=token_credentials_env != "managedidentitycredential",
**kwargs,
)
)
if not exclude_shared_token_cache_credential and SharedTokenCacheCredential.supported():
if not _exclude_shared_token_cache and SharedTokenCacheCredential.supported():
# username and/or tenant_id are only required when the cache contains tokens for multiple identities
shared_cache = SharedTokenCacheCredential(
username=shared_cache_username, tenant_id=shared_cache_tenant_id, authority=authority, **kwargs
)
credentials.append(shared_cache)
if not exclude_visual_studio_code_credential:
credentials.append(VisualStudioCodeCredential(tenant_id=vscode_tenant_id))
if not exclude_cli_credential:
if not _exclude_visual_studio_code:
credentials.append(VisualStudioCodeCredential(tenant_id=visual_studio_code_tenant_id))
if not _exclude_cli:
credentials.append(AzureCliCredential(process_timeout=process_timeout))
if not exclude_powershell_credential:
if not _exclude_powershell:
credentials.append(AzurePowerShellCredential(process_timeout=process_timeout))
if not exclude_developer_cli_credential:
if not _exclude_developer_cli:
credentials.append(AzureDeveloperCliCredential(process_timeout=process_timeout))
if not exclude_interactive_browser_credential:
if not _exclude_interactive_browser:
if interactive_browser_client_id:
credentials.append(
InteractiveBrowserCredential(
Expand All @@ -303,7 +313,7 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement
)
else:
credentials.append(InteractiveBrowserCredential(tenant_id=interactive_browser_tenant_id, **kwargs))
if not exclude_broker_credential:
if not _exclude_broker:
broker_credential_args = {"tenant_id": broker_tenant_id, **kwargs}
if broker_client_id:
broker_credential_args["client_id"] = broker_client_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ def process_credential_exclusions(credential_config: dict, exclude_flags: dict,
any user-provided exclude overrides which take precedence over environment settings.

:param credential_config: Configuration mapping for all available credentials, containing
exclude parameter names, environment names, and default exclude settings
environment names and default exclude settings
:type credential_config: dict
:param exclude_flags: Dictionary of exclude flags for each credential (will be modified)
:type exclude_flags: dict
:param user_excludes: User-provided exclude overrides from constructor kwargs
:param user_excludes: User-provided exclude overrides (None means not specified)
:type user_excludes: dict

:return: Dictionary of final exclude flags for each credential
Expand Down
Loading