diff --git a/sdk/identity/azure-identity/azure/identity/_credentials/default.py b/sdk/identity/azure-identity/azure/identity/_credentials/default.py index d44f5580ba8d..2a6ebeb709d5 100644 --- a/sdk/identity/azure-identity/azure/identity/_credentials/default.py +++ b/sdk/identity/azure-identity/azure/identity/_credentials/default.py @@ -4,7 +4,7 @@ # ------------------------------------ import logging import os -from typing import List, Any, Optional, cast +from typing import Dict, List, Any, Optional, cast from azure.core.credentials import ( AccessToken, @@ -141,38 +141,51 @@ class DefaultAzureCredential(ChainedTokenCredential): :caption: Create a DefaultAzureCredential. """ - def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statements, too-many-locals + def __init__( # pylint: disable=too-many-statements, too-many-locals + self, + *, + authority: Optional[str] = None, + exclude_environment_credential: Optional[bool] = None, + exclude_workload_identity_credential: Optional[bool] = None, + exclude_managed_identity_credential: Optional[bool] = None, + exclude_shared_token_cache_credential: Optional[bool] = None, + exclude_visual_studio_code_credential: Optional[bool] = None, + exclude_cli_credential: Optional[bool] = None, + exclude_developer_cli_credential: Optional[bool] = None, + exclude_powershell_credential: Optional[bool] = None, + exclude_interactive_browser_credential: Optional[bool] = None, + exclude_broker_credential: Optional[bool] = None, + managed_identity_client_id: Optional[str] = None, + workload_identity_client_id: Optional[str] = None, + workload_identity_tenant_id: Optional[str] = None, + interactive_browser_tenant_id: Optional[str] = None, + interactive_browser_client_id: Optional[str] = None, + broker_tenant_id: Optional[str] = None, + broker_client_id: Optional[str] = None, + shared_cache_username: Optional[str] = None, + shared_cache_tenant_id: Optional[str] = None, + visual_studio_code_tenant_id: Optional[str] = None, + process_timeout: int = 10, + require_envvar: bool = False, + **kwargs: Any, + ) -> None: if "tenant_id" in kwargs: raise TypeError("'tenant_id' is not supported in DefaultAzureCredential.") - authority = kwargs.pop("authority", None) authority = normalize_authority(authority) if authority else get_default_authority() - vscode_tenant_id = kwargs.pop("visual_studio_code_tenant_id", None) - - interactive_browser_tenant_id = kwargs.pop( - "interactive_browser_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) - ) - - managed_identity_client_id = kwargs.pop( - "managed_identity_client_id", os.environ.get(EnvironmentVariables.AZURE_CLIENT_ID) + interactive_browser_tenant_id = interactive_browser_tenant_id or os.environ.get( + EnvironmentVariables.AZURE_TENANT_ID ) - workload_identity_client_id = kwargs.pop("workload_identity_client_id", managed_identity_client_id) - workload_identity_tenant_id = kwargs.pop( - "workload_identity_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) + managed_identity_client_id = managed_identity_client_id or os.environ.get(EnvironmentVariables.AZURE_CLIENT_ID) + workload_identity_client_id = workload_identity_client_id or managed_identity_client_id + workload_identity_tenant_id = workload_identity_tenant_id or os.environ.get( + EnvironmentVariables.AZURE_TENANT_ID ) - interactive_browser_client_id = kwargs.pop("interactive_browser_client_id", None) - - broker_tenant_id = kwargs.pop("broker_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID)) - broker_client_id = kwargs.pop("broker_client_id", None) + broker_tenant_id = broker_tenant_id or os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) + shared_cache_username = shared_cache_username or os.environ.get(EnvironmentVariables.AZURE_USERNAME) + shared_cache_tenant_id = shared_cache_tenant_id or os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) - shared_cache_username = kwargs.pop("shared_cache_username", os.environ.get(EnvironmentVariables.AZURE_USERNAME)) - shared_cache_tenant_id = kwargs.pop( - "shared_cache_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) - ) - - process_timeout = kwargs.pop("process_timeout", 10) - require_envvar = kwargs.pop("require_envvar", False) token_credentials_env = os.environ.get(EnvironmentVariables.AZURE_TOKEN_CREDENTIALS, "").strip().lower() if require_envvar and not token_credentials_env: raise ValueError( @@ -181,85 +194,82 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement ) # Define credential configuration mapping - credential_config = { + credential_config: Dict[str, Any] = { "environment": { - "exclude_param": "exclude_environment_credential", "env_name": "environmentcredential", "default_exclude": False, }, "workload_identity": { - "exclude_param": "exclude_workload_identity_credential", "env_name": "workloadidentitycredential", "default_exclude": False, }, "managed_identity": { - "exclude_param": "exclude_managed_identity_credential", "env_name": "managedidentitycredential", "default_exclude": False, }, "shared_token_cache": { - "exclude_param": "exclude_shared_token_cache_credential", "default_exclude": False, }, "visual_studio_code": { - "exclude_param": "exclude_visual_studio_code_credential", "env_name": "visualstudiocodecredential", "default_exclude": False, }, "cli": { - "exclude_param": "exclude_cli_credential", "env_name": "azureclicredential", "default_exclude": False, }, "developer_cli": { - "exclude_param": "exclude_developer_cli_credential", "env_name": "azuredeveloperclicredential", "default_exclude": False, }, "powershell": { - "exclude_param": "exclude_powershell_credential", "env_name": "azurepowershellcredential", "default_exclude": False, }, "interactive_browser": { - "exclude_param": "exclude_interactive_browser_credential", "env_name": "interactivebrowsercredential", "default_exclude": True, }, "broker": { - "exclude_param": "exclude_broker_credential", "default_exclude": False, }, } - # Extract user-provided exclude flags and set defaults - exclude_flags = {} - user_excludes = {} - for cred_key, config in credential_config.items(): - param_name = cast(str, config["exclude_param"]) - user_excludes[cred_key] = kwargs.pop(param_name, None) - exclude_flags[cred_key] = config["default_exclude"] + # Build user-provided exclude flags and defaults + user_excludes = { + "environment": exclude_environment_credential, + "workload_identity": exclude_workload_identity_credential, + "managed_identity": exclude_managed_identity_credential, + "shared_token_cache": exclude_shared_token_cache_credential, + "visual_studio_code": exclude_visual_studio_code_credential, + "cli": exclude_cli_credential, + "developer_cli": exclude_developer_cli_credential, + "powershell": exclude_powershell_credential, + "interactive_browser": exclude_interactive_browser_credential, + "broker": exclude_broker_credential, + } + exclude_flags = {cred_key: config["default_exclude"] for cred_key, config in credential_config.items()} # Process AZURE_TOKEN_CREDENTIALS environment variable and apply user overrides exclude_flags = process_credential_exclusions(credential_config, exclude_flags, user_excludes) - # Extract individual exclude flags for backward compatibility - exclude_environment_credential = exclude_flags["environment"] - exclude_workload_identity_credential = exclude_flags["workload_identity"] - exclude_managed_identity_credential = exclude_flags["managed_identity"] - exclude_shared_token_cache_credential = exclude_flags["shared_token_cache"] - exclude_visual_studio_code_credential = exclude_flags["visual_studio_code"] - exclude_cli_credential = exclude_flags["cli"] - exclude_developer_cli_credential = exclude_flags["developer_cli"] - exclude_powershell_credential = exclude_flags["powershell"] - exclude_interactive_browser_credential = exclude_flags["interactive_browser"] - exclude_broker_credential = exclude_flags["broker"] + # Extract individual exclude flags for readability + _exclude_environment = exclude_flags["environment"] + _exclude_workload_identity = exclude_flags["workload_identity"] + _exclude_managed_identity = exclude_flags["managed_identity"] + _exclude_shared_token_cache = exclude_flags["shared_token_cache"] + _exclude_visual_studio_code = exclude_flags["visual_studio_code"] + _exclude_cli = exclude_flags["cli"] + _exclude_developer_cli = exclude_flags["developer_cli"] + _exclude_powershell = exclude_flags["powershell"] + _exclude_interactive_browser = exclude_flags["interactive_browser"] + _exclude_broker = exclude_flags["broker"] credentials: List[SupportsTokenInfo] = [] within_dac.set(True) - if not exclude_environment_credential: + if not _exclude_environment: credentials.append(EnvironmentCredential(authority=authority, _within_dac=True, **kwargs)) - if not exclude_workload_identity_credential: + if not _exclude_workload_identity: try: credentials.append( WorkloadIdentityCredential( @@ -271,30 +281,30 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement ) except ValueError as ex: credentials.append(FailedDACCredential("WorkloadIdentityCredential", error=str(ex))) - if not exclude_managed_identity_credential: + if not _exclude_managed_identity: credentials.append( ManagedIdentityCredential( client_id=managed_identity_client_id, - _exclude_workload_identity_credential=exclude_workload_identity_credential, + _exclude_workload_identity_credential=_exclude_workload_identity, _enable_imds_probe=token_credentials_env != "managedidentitycredential", **kwargs, ) ) - if not exclude_shared_token_cache_credential and SharedTokenCacheCredential.supported(): + if not _exclude_shared_token_cache and SharedTokenCacheCredential.supported(): # username and/or tenant_id are only required when the cache contains tokens for multiple identities shared_cache = SharedTokenCacheCredential( username=shared_cache_username, tenant_id=shared_cache_tenant_id, authority=authority, **kwargs ) credentials.append(shared_cache) - if not exclude_visual_studio_code_credential: - credentials.append(VisualStudioCodeCredential(tenant_id=vscode_tenant_id)) - if not exclude_cli_credential: + if not _exclude_visual_studio_code: + credentials.append(VisualStudioCodeCredential(tenant_id=visual_studio_code_tenant_id)) + if not _exclude_cli: credentials.append(AzureCliCredential(process_timeout=process_timeout)) - if not exclude_powershell_credential: + if not _exclude_powershell: credentials.append(AzurePowerShellCredential(process_timeout=process_timeout)) - if not exclude_developer_cli_credential: + if not _exclude_developer_cli: credentials.append(AzureDeveloperCliCredential(process_timeout=process_timeout)) - if not exclude_interactive_browser_credential: + if not _exclude_interactive_browser: if interactive_browser_client_id: credentials.append( InteractiveBrowserCredential( @@ -303,7 +313,7 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement ) else: credentials.append(InteractiveBrowserCredential(tenant_id=interactive_browser_tenant_id, **kwargs)) - if not exclude_broker_credential: + if not _exclude_broker: broker_credential_args = {"tenant_id": broker_tenant_id, **kwargs} if broker_client_id: broker_credential_args["client_id"] = broker_client_id diff --git a/sdk/identity/azure-identity/azure/identity/_internal/utils.py b/sdk/identity/azure-identity/azure/identity/_internal/utils.py index 2ae4c845aaed..900eaacf5287 100644 --- a/sdk/identity/azure-identity/azure/identity/_internal/utils.py +++ b/sdk/identity/azure-identity/azure/identity/_internal/utils.py @@ -191,11 +191,11 @@ def process_credential_exclusions(credential_config: dict, exclude_flags: dict, any user-provided exclude overrides which take precedence over environment settings. :param credential_config: Configuration mapping for all available credentials, containing - exclude parameter names, environment names, and default exclude settings + environment names and default exclude settings :type credential_config: dict :param exclude_flags: Dictionary of exclude flags for each credential (will be modified) :type exclude_flags: dict - :param user_excludes: User-provided exclude overrides from constructor kwargs + :param user_excludes: User-provided exclude overrides (None means not specified) :type user_excludes: dict :return: Dictionary of final exclude flags for each credential diff --git a/sdk/identity/azure-identity/azure/identity/aio/_credentials/default.py b/sdk/identity/azure-identity/azure/identity/aio/_credentials/default.py index f9a377d7c452..6490cf106e5f 100644 --- a/sdk/identity/azure-identity/azure/identity/aio/_credentials/default.py +++ b/sdk/identity/azure-identity/azure/identity/aio/_credentials/default.py @@ -4,7 +4,7 @@ # ------------------------------------ import logging import os -from typing import List, Optional, Any, cast +from typing import Dict, List, Optional, Any, cast from azure.core.credentials import AccessToken, AccessTokenInfo, TokenRequestOptions from azure.core.credentials_async import AsyncTokenCredential, AsyncSupportsTokenInfo @@ -123,30 +123,42 @@ class DefaultAzureCredential(ChainedTokenCredential): :caption: Create a DefaultAzureCredential. """ - def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statements, too-many-locals + def __init__( # pylint: disable=too-many-statements, too-many-locals + self, + *, + authority: Optional[str] = None, + exclude_environment_credential: Optional[bool] = None, + exclude_workload_identity_credential: Optional[bool] = None, + exclude_managed_identity_credential: Optional[bool] = None, + exclude_shared_token_cache_credential: Optional[bool] = None, + exclude_visual_studio_code_credential: Optional[bool] = None, + exclude_cli_credential: Optional[bool] = None, + exclude_developer_cli_credential: Optional[bool] = None, + exclude_powershell_credential: Optional[bool] = None, + managed_identity_client_id: Optional[str] = None, + workload_identity_client_id: Optional[str] = None, + workload_identity_tenant_id: Optional[str] = None, + shared_cache_username: Optional[str] = None, + shared_cache_tenant_id: Optional[str] = None, + visual_studio_code_tenant_id: Optional[str] = None, + process_timeout: int = 10, + require_envvar: bool = False, + **kwargs: Any, + ) -> None: if "tenant_id" in kwargs: raise TypeError("'tenant_id' is not supported in DefaultAzureCredential.") - authority = kwargs.pop("authority", None) authority = normalize_authority(authority) if authority else get_default_authority() - vscode_tenant_id = kwargs.pop("visual_studio_code_tenant_id", None) + shared_cache_username = shared_cache_username or os.environ.get(EnvironmentVariables.AZURE_USERNAME) + shared_cache_tenant_id = shared_cache_tenant_id or os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) - shared_cache_username = kwargs.pop("shared_cache_username", os.environ.get(EnvironmentVariables.AZURE_USERNAME)) - shared_cache_tenant_id = kwargs.pop( - "shared_cache_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) + managed_identity_client_id = managed_identity_client_id or os.environ.get(EnvironmentVariables.AZURE_CLIENT_ID) + workload_identity_client_id = workload_identity_client_id or managed_identity_client_id + workload_identity_tenant_id = workload_identity_tenant_id or os.environ.get( + EnvironmentVariables.AZURE_TENANT_ID ) - managed_identity_client_id = kwargs.pop( - "managed_identity_client_id", os.environ.get(EnvironmentVariables.AZURE_CLIENT_ID) - ) - workload_identity_client_id = kwargs.pop("workload_identity_client_id", managed_identity_client_id) - workload_identity_tenant_id = kwargs.pop( - "workload_identity_tenant_id", os.environ.get(EnvironmentVariables.AZURE_TENANT_ID) - ) - - process_timeout = kwargs.pop("process_timeout", 10) - require_envvar = kwargs.pop("require_envvar", False) token_credentials_env = os.environ.get(EnvironmentVariables.AZURE_TOKEN_CREDENTIALS, "").strip().lower() if require_envvar and not token_credentials_env: raise ValueError( @@ -155,74 +167,71 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement ) # Define credential configuration mapping (async version) - credential_config = { + credential_config: Dict[str, Any] = { "environment": { - "exclude_param": "exclude_environment_credential", "env_name": "environmentcredential", "default_exclude": False, }, "workload_identity": { - "exclude_param": "exclude_workload_identity_credential", "env_name": "workloadidentitycredential", "default_exclude": False, }, "managed_identity": { - "exclude_param": "exclude_managed_identity_credential", "env_name": "managedidentitycredential", "default_exclude": False, }, "shared_token_cache": { - "exclude_param": "exclude_shared_token_cache_credential", "default_exclude": False, }, "visual_studio_code": { - "exclude_param": "exclude_visual_studio_code_credential", "env_name": "visualstudiocodecredential", "default_exclude": False, }, "cli": { - "exclude_param": "exclude_cli_credential", "env_name": "azureclicredential", "default_exclude": False, }, "developer_cli": { - "exclude_param": "exclude_developer_cli_credential", "env_name": "azuredeveloperclicredential", "default_exclude": False, }, "powershell": { - "exclude_param": "exclude_powershell_credential", "env_name": "azurepowershellcredential", "default_exclude": False, }, } - # Extract user-provided exclude flags and set defaults - exclude_flags = {} - user_excludes = {} - for cred_key, config in credential_config.items(): - param_name = cast(str, config["exclude_param"]) - user_excludes[cred_key] = kwargs.pop(param_name, None) - exclude_flags[cred_key] = config["default_exclude"] + # Build user-provided exclude flags and defaults + user_excludes = { + "environment": exclude_environment_credential, + "workload_identity": exclude_workload_identity_credential, + "managed_identity": exclude_managed_identity_credential, + "shared_token_cache": exclude_shared_token_cache_credential, + "visual_studio_code": exclude_visual_studio_code_credential, + "cli": exclude_cli_credential, + "developer_cli": exclude_developer_cli_credential, + "powershell": exclude_powershell_credential, + } + exclude_flags = {cred_key: config["default_exclude"] for cred_key, config in credential_config.items()} # Process AZURE_TOKEN_CREDENTIALS environment variable and apply user overrides exclude_flags = process_credential_exclusions(credential_config, exclude_flags, user_excludes) - # Extract individual exclude flags for backward compatibility - exclude_environment_credential = exclude_flags["environment"] - exclude_workload_identity_credential = exclude_flags["workload_identity"] - exclude_managed_identity_credential = exclude_flags["managed_identity"] - exclude_shared_token_cache_credential = exclude_flags["shared_token_cache"] - exclude_visual_studio_code_credential = exclude_flags["visual_studio_code"] - exclude_cli_credential = exclude_flags["cli"] - exclude_developer_cli_credential = exclude_flags["developer_cli"] - exclude_powershell_credential = exclude_flags["powershell"] + # Extract individual exclude flags for readability + _exclude_environment = exclude_flags["environment"] + _exclude_workload_identity = exclude_flags["workload_identity"] + _exclude_managed_identity = exclude_flags["managed_identity"] + _exclude_shared_token_cache = exclude_flags["shared_token_cache"] + _exclude_visual_studio_code = exclude_flags["visual_studio_code"] + _exclude_cli = exclude_flags["cli"] + _exclude_developer_cli = exclude_flags["developer_cli"] + _exclude_powershell = exclude_flags["powershell"] credentials: List[AsyncSupportsTokenInfo] = [] within_dac.set(True) - if not exclude_environment_credential: + if not _exclude_environment: credentials.append(EnvironmentCredential(authority=authority, _within_dac=True, **kwargs)) - if not exclude_workload_identity_credential: + if not _exclude_workload_identity: try: credentials.append( WorkloadIdentityCredential( @@ -234,28 +243,28 @@ def __init__(self, **kwargs: Any) -> None: # pylint: disable=too-many-statement ) except ValueError as ex: credentials.append(AsyncFailedDACCredential("WorkloadIdentityCredential", error=str(ex))) - if not exclude_managed_identity_credential: + if not _exclude_managed_identity: credentials.append( ManagedIdentityCredential( client_id=managed_identity_client_id, - _exclude_workload_identity_credential=exclude_workload_identity_credential, + _exclude_workload_identity_credential=_exclude_workload_identity, _enable_imds_probe=token_credentials_env != "managedidentitycredential", **kwargs, ) ) - if not exclude_shared_token_cache_credential and SharedTokenCacheCredential.supported(): + if not _exclude_shared_token_cache and SharedTokenCacheCredential.supported(): # username and/or tenant_id are only required when the cache contains tokens for multiple identities shared_cache = SharedTokenCacheCredential( username=shared_cache_username, tenant_id=shared_cache_tenant_id, authority=authority, **kwargs ) credentials.append(shared_cache) - if not exclude_visual_studio_code_credential: - credentials.append(VisualStudioCodeCredential(tenant_id=vscode_tenant_id)) - if not exclude_cli_credential: + if not _exclude_visual_studio_code: + credentials.append(VisualStudioCodeCredential(tenant_id=visual_studio_code_tenant_id)) + if not _exclude_cli: credentials.append(AzureCliCredential(process_timeout=process_timeout)) - if not exclude_powershell_credential: + if not _exclude_powershell: credentials.append(AzurePowerShellCredential(process_timeout=process_timeout)) - if not exclude_developer_cli_credential: + if not _exclude_developer_cli: credentials.append(AzureDeveloperCliCredential(process_timeout=process_timeout)) within_dac.set(False) super().__init__(*credentials)