1616The what-if service will use your configured credentials to access your subscription
1717and preview deployment changes under your permissions.
1818"""
19-
20- import requests
2119from typing import Dict , Any , Optional
22- from azure .identity import AzureCliCredential
23- from datetime import datetime , timezone
2420from knack .log import get_logger
2521
2622logger = get_logger (__name__ )
2723
28- # Configuration
29- FUNCTION_APP_URL = "https://azcli-script-insight.azurewebsites.net"
30-
3124
32- def get_azure_cli_access_token () -> Optional [str ]:
33- """
34- Get access token for the caller's subscription using AzureCliCredential
25+ def show_what_if (cli_ctx , azcli_script : str , subscription_id : Optional [str ] = None , no_pretty_print : bool = False ):
26+ from azure .cli .command_modules .resource ._formatters import format_what_if_operation_result
27+ from azure .cli .core ._profile import Profile
28+ import threading
29+ import time
30+ import sys
31+ import json
32+ from requests import Request , Session
3533
36- Returns:
37- Access token string if successful, None if failed
38- """
39- token_info = get_azure_cli_token_info ()
40- return token_info .get ("accessToken" ) if token_info else None
34+ payload = {
35+ "azcli_script" : azcli_script ,
36+ "subscription_id" : subscription_id
37+ }
4138
39+ request_completed = threading .Event ()
40+
41+ def rotating_progress ():
42+ """Simulate a rotating progress indicator, similar to the one displayed during long-running operations.
43+ """
44+ chars = ["|" , "\\ " , "/" , "-" ]
45+ idx = 0
46+ while not request_completed .is_set ():
47+ sys .stderr .write (f"\r { chars [idx % len (chars )]} Running" )
48+ sys .stderr .flush ()
49+ idx += 1
50+ time .sleep (0.2 )
51+ sys .stderr .write ("\r " + " " * 20 + "\r " )
52+ sys .stderr .flush ()
4253
43- def get_azure_cli_token_info () -> Optional [Dict [str , Any ]]:
44- """
45- Get complete token information using AzureCliCredential including expiration
46-
47- Returns:
48- Dictionary with token info including accessToken, expiresOn, etc., or None if failed
49- """
5054 try :
51- # Use AzureCliCredential for Azure CLI authentication
52- cli_credential = AzureCliCredential (process_timeout = 30 )
53-
54- # Get access token for Azure Resource Manager
55- token = cli_credential .get_token ("https://management.azure.com/.default" )
56-
57- token_info = {
58- "accessToken" : token .token ,
59- "expiresOn" : datetime .fromtimestamp (token .expires_on , tz = timezone .utc ).isoformat (),
60- "tokenType" : "Bearer"
61- }
62-
63- return token_info
64-
65- except Exception as e :
66- logger .warning (f"Error getting access token with AzureCliCredential: { str (e )} " )
67- return None
55+ FUNCTION_APP_URL = "https://azcli-script-insight.azurewebsites.net"
56+ resource = cli_ctx .cloud .endpoints .active_directory_resource_id
57+ profile = Profile (cli_ctx = cli_ctx )
58+
59+ try :
60+ token_result = profile .get_raw_token (resource , subscription = subscription_id )
61+ token_info , _ , _ = token_result
62+ token_type , token , _ = token_info
63+ except Exception as token_ex :
64+ request_completed .set ()
65+ raise CLIError (f"Failed to get authentication token: { token_ex } " )
66+
67+ headers_dict = {}
68+ headers_dict ['Authorization' ] = '{} {}' .format (token_type , token )
69+ headers_dict ['Content-Type' ] = 'application/json'
70+
71+ progress_thread = threading .Thread (target = rotating_progress )
72+ progress_thread .daemon = True
73+ progress_thread .start ()
74+
75+ session = Session ()
76+ req = Request (method = "POST" , url = f"{ FUNCTION_APP_URL } /api/what_if_preview" ,
77+ headers = headers_dict , data = json .dumps (payload ))
78+ prepared = session .prepare_request (req )
79+ response = session .send (prepared )
80+ request_completed .set ()
81+
82+ progress_thread .join (timeout = 0.5 )
83+
84+ except Exception as ex :
85+ request_completed .set ()
86+ if 'progress_thread' in locals ():
87+ progress_thread .join (timeout = 0.5 )
88+ raise CLIError (f"Failed to connect to the what-if service: { ex } " )
6889
69-
70- def what_if_preview (azcli_script : str , subscription_id : Optional [str ] = None ) -> Dict [str , Any ]:
71- """
72- Preview deployment changes using Azure what-if functionality
73-
74- Args:
75- function_app_url: Base URL of your Azure Function App
76- azcli_script: Azure CLI script to analyze
77- subscription_id: Optional fallback subscription ID if not in script
78-
79- Returns:
80- Dictionary with what-if preview result
81- """
82- url = f"{ FUNCTION_APP_URL .rstrip ('/' )} /api/what_if_preview"
83-
84- headers = {
85- 'Content-Type' : 'application/json' ,
86- 'Accept' : 'application/json'
87- }
88-
89- # Get access token from Azure CLI
90- access_token = get_azure_cli_access_token ()
91- if not access_token :
92- return {
93- "error" : "Failed to get access token from Azure CLI. Please ensure you are logged in with 'az login'" ,
94- "details" : "The what-if service requires client credentials to access your subscription. Please provide an access token." ,
95- "success" : False
96- }
97-
98- # Use Authorization header for access token
99- headers ['Authorization' ] = f'Bearer { access_token } '
100-
101- payload = {"azcli_script" : azcli_script }
102- if subscription_id :
103- payload ["subscription_id" ] = subscription_id
104-
10590 try :
106- response = requests .post (url , json = payload , headers = headers , timeout = 300 )
107- return response .json ()
108- except requests .RequestException as e :
109- raise e
91+ raw_results = response .json ()
92+ except ValueError as ex :
93+ raise CLIError (f"Failed to parse response from what-if service: { ex } " )
94+
95+ success = raw_results .get ('success' )
96+ if success is False :
97+ return raw_results
98+ elif success is True :
99+ what_if_result = raw_results .get ('what_if_result' , {})
100+ what_if_operation_result = _convert_json_to_what_if_result (what_if_result )
101+ if no_pretty_print :
102+ return what_if_result
103+ print (format_what_if_operation_result (what_if_operation_result , cli_ctx .enable_color ))
104+ return what_if_result
105+ else :
106+ raise CLIError (f"Unexpected response from what-if service, got: { raw_results } " )
107+
108+
109+ def _convert_json_to_what_if_result (what_if_json_result ):
110+ from azure .cli .command_modules .resource ._formatters import _change_type_to_weight , _property_change_type_to_weight
111+
112+ enum_keys = list (_change_type_to_weight .keys ())
113+ enum_mapping = {}
114+ for enum_obj in enum_keys :
115+ str_repr = str (enum_obj ).lower ()
116+ if 'create' in str_repr :
117+ enum_mapping ['Create' ] = enum_obj
118+ elif 'delete' in str_repr :
119+ enum_mapping ['Delete' ] = enum_obj
120+ elif 'modify' in str_repr :
121+ enum_mapping ['Modify' ] = enum_obj
122+ elif 'deploy' in str_repr :
123+ enum_mapping ['Deploy' ] = enum_obj
124+ elif 'no_change' in str_repr or 'nochange' in str_repr :
125+ enum_mapping ['NoChange' ] = enum_obj
126+ elif 'ignore' in str_repr :
127+ enum_mapping ['Ignore' ] = enum_obj
128+ elif 'unsupported' in str_repr :
129+ enum_mapping ['Unsupported' ] = enum_obj
130+ elif 'no_effect' in str_repr or 'noeffect' in str_repr :
131+ enum_mapping ['NoEffect' ] = enum_obj
132+
133+ property_enum_keys = list (_property_change_type_to_weight .keys ())
134+ property_enum_mapping = {}
135+ for enum_obj in property_enum_keys :
136+ str_repr = str (enum_obj ).lower ()
137+ if 'create' in str_repr :
138+ property_enum_mapping ['Create' ] = enum_obj
139+ elif 'delete' in str_repr :
140+ property_enum_mapping ['Delete' ] = enum_obj
141+ elif 'modify' in str_repr :
142+ property_enum_mapping ['Modify' ] = enum_obj
143+ elif 'array' in str_repr :
144+ property_enum_mapping ['Array' ] = enum_obj
145+ elif 'no_effect' in str_repr or 'noeffect' in str_repr :
146+ property_enum_mapping ['NoEffect' ] = enum_obj
147+
148+ class WhatIfOperationResult :
149+ def __init__ (self ):
150+ self .changes = []
151+ self .potential_changes = []
152+ self .diagnostics = []
153+
154+ class ResourceChange :
155+ def __init__ (self , change_data ):
156+ self .change_type = _map_change_type_string (change_data .get ('changeType' , 'Unknown' ))
157+ self .resource_id = change_data .get ('resourceId' , '' )
158+ self .before = change_data .get ('before' )
159+ self .after = change_data .get ('after' )
160+ self .delta = []
161+
162+ delta_data = change_data .get ('delta' , [])
163+ for property_data in delta_data :
164+ property_change = PropertyChange (property_data )
165+ self .delta .append (property_change )
166+
167+ class PropertyChange :
168+ def __init__ (self , change_data ):
169+ self .property_change_type = _map_property_change_type_string (
170+ change_data .get ('propertyChangeType' , 'NoEffect' ))
171+ self .path = change_data .get ('path' , '' )
172+ self .before = change_data .get ('before' )
173+ self .after = change_data .get ('after' )
174+ self .children = []
175+
176+ children_data = change_data .get ('children' , [])
177+ for child_data in children_data :
178+ child_property_change = PropertyChange (child_data )
179+ self .children .append (child_property_change )
180+
181+ def _map_change_type_string (change_type_str ):
182+ result = enum_mapping .get (change_type_str )
183+ return result
184+
185+ def _map_property_change_type_string (property_change_type_str ):
186+ result = property_enum_mapping .get (property_change_type_str )
187+ return result
188+
189+ result = WhatIfOperationResult ()
190+
191+ changes = what_if_json_result .get ('changes' , [])
192+ for change_data in changes :
193+ resource_change = ResourceChange (change_data )
194+ result .changes .append (resource_change )
195+
196+ potential_changes = what_if_json_result .get ('potential_changes' , [])
197+ for change_data in potential_changes :
198+ resource_change = ResourceChange (change_data )
199+ result .potential_changes .append (resource_change )
200+
201+ return result
0 commit comments