Skip to content
This repository was archived by the owner on Nov 14, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b042c5f
fix: update path to build_zipapp script in release workflow
sturrent Oct 17, 2025
ba4c2f6
chore: sync develop with main branch
sturrent Oct 27, 2025
57fbfba
chore: ignore GitHub Copilot instructions directory
sturrent Oct 27, 2025
f78d802
fix: backport critical bug fixes from CLI integration
sturrent Oct 27, 2025
b290214
fix: implement _get_cluster_vnets_with_dns() to detect missing VNet l…
sturrent Oct 27, 2025
da03064
feat: add permission-related finding codes
sturrent Oct 27, 2025
40a205f
style: improve code quality to Pylint 9.99/10
sturrent Oct 27, 2025
35acb99
feat: add permission handling infrastructure (Step 1-2 partial)
sturrent Oct 27, 2025
6524bcf
feat: complete permission handling for all analyzers (Step 2)
sturrent Oct 27, 2025
2a209e1
fix: resolve code quality issues and test failures
sturrent Oct 27, 2025
2a4edf8
fix: remove duplicate NSG findings from misconfiguration analyzer
sturrent Oct 27, 2025
853d92f
fix: remove duplicate NSG analysis log message
sturrent Oct 27, 2025
531aa34
fix: catch AzureCLIError in execute_with_permission_check
sturrent Oct 27, 2025
268e243
feat: display permission findings in diagnostic report
sturrent Oct 27, 2025
496e6c4
refactor: suppress verbose error logging for permission issues
sturrent Oct 27, 2025
8ab7607
fix: prevent false 'No outbound IPs' finding when permissions lacking
sturrent Oct 27, 2025
73dcb63
fix: show outbound configuration in detailed report with permission m…
sturrent Oct 27, 2025
be15332
style: apply Black formatting to all modified files
sturrent Oct 27, 2025
2458ab9
fix: handle VMSS permission errors gracefully in probe tests
sturrent Oct 27, 2025
0f9395c
feat: add permission finding for probe test failures
sturrent Oct 27, 2025
99d09d8
fix: improve connectivity test permission error detection and recomme…
sturrent Oct 27, 2025
aabb6c6
feat: add connectivity test summary to reports
sturrent Oct 28, 2025
cdec1e1
chore: bump version to 1.2.0
sturrent Oct 28, 2025
297768b
fix: remove f-string without placeholders (flake8 F541)
sturrent Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ venv.bak/
*~
.DS_Store

# GitHub Copilot instructions (internal development docs)
.github/instructions/

# AKS Diagnostics specific
aks-net-diagnostics_*.json
.aks_cache/
Expand Down
47 changes: 47 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,53 @@ All notable changes to the AKS Network Diagnostics tool will be documented in th
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres on [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.2.0] - 2025-10-28

### Added
- **Permission Handling Feature**: Graceful handling of insufficient Azure RBAC permissions
- New finding codes: `PERMISSION_INSUFFICIENT_VNET`, `PERMISSION_INSUFFICIENT_VMSS`, `PERMISSION_INSUFFICIENT_LB`, `PERMISSION_INSUFFICIENT_PROBE_TEST`
- Permission error detection in all Azure CLI operations
- Specific permission recommendations with example `az role assignment` commands
- Tool continues analysis with partial data when permissions are insufficient
- Permission findings displayed in separate section of report
- **Connectivity Test Summary**: Added connectivity test results to report summary
- Shows total tests, passed, failed, and could not execute counts
- Consistent formatting between summary and detailed reports
- Clear distinction between test failures (ran but failed) and permission errors (could not execute)
- **Enhanced Error Detection**: Improved connectivity test error analysis
- Detects `AuthorizationFailed` patterns in test failures
- Identifies missing `runCommand/action` permission
- Recommends `Virtual Machine Contributor` role on MC resource group

### Changed
- **Connectivity Tester**: Updated VMSS operations to use permission-aware execution
- VMSS list operations now use `execute_with_permission_check()`
- Handles permission errors gracefully without crashing
- Creates permission findings when VMSS operations fail
- **Report Generator**: Enhanced connectivity test reporting
- Added connectivity tests section to both summary and detailed reports
- Improved labels: "Tests Failed" vs "Could Not Execute"
- Shows execution status for skipped tests (permission_denied, cluster stopped)

### Fixed
- **Probe Test Crash**: Fixed crash when using `--probe-test` flag with limited permissions
- Tool now handles permission errors gracefully
- Creates appropriate permission findings instead of crashing
- Provides actionable guidance on required permissions

### Documentation
- **Permission Handling Plan**: Added comprehensive plan for permission handling implementation
- Documented 15 commits for permission handling feature
- Added test scenarios and validation checklist
- Documented required permissions for all operations

### Known Issues
- **Azure CLI Bug**: Azure CLI v2.78.0 has a bug where `vmss run-command invoke` returns "This is a sample script" instead of actual command output
- Tracked in Azure CLI issue [#32286](https://github.com/Azure/azure-cli/issues/32286)
- Fix merged in PR [#32280](https://github.com/Azure/azure-cli/pull/32280)
- Workaround: Use Azure CLI v2.77 or wait for v2.78.1/v2.79.0
- Our tool handles this gracefully by reporting execution errors

## [1.1.2] - 2025-10-17

### Added
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ Generated with `--json-report`, contains:
"resource_group": "my-rg",
"subscription": "xxx",
"generated": "2025-10-03T14:30:00Z",
"script_version": "1.1.2"
"script_version": "1.2.0"
},
"cluster_info": { "..." },
"findings": [
Expand Down Expand Up @@ -499,6 +499,6 @@ Built for Azure Kubernetes Service troubleshooting by the Azure community.

---

**Version**: 1.1.2
**Version**: 1.2.0
**Last Updated**: October 2025
**Maintained by**: [@sturrent](https://github.com/sturrent)
57 changes: 39 additions & 18 deletions aks-net-diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
AKS Network Diagnostics Script
Comprehensive read-only analysis of AKS cluster network configuration
Author: Azure Networking Diagnostics Generator
Version: 1.1.2
Version: 1.2.0
"""

import argparse
Expand Down Expand Up @@ -190,16 +190,16 @@ def check_prerequisites(self):
subprocess.run(
["az", "--version"], capture_output=True, check=True, timeout=AZURE_CLI_TIMEOUT, shell=IS_WINDOWS
)
except (subprocess.CalledProcessError, FileNotFoundError):
raise FileNotFoundError("Azure CLI is not installed or not in PATH")
except (subprocess.CalledProcessError, FileNotFoundError) as exc:
raise FileNotFoundError("Azure CLI is not installed or not in PATH") from exc

# Check if logged in
try:
subprocess.run(
["az", "account", "show"], capture_output=True, check=True, timeout=AZURE_CLI_TIMEOUT, shell=IS_WINDOWS
)
except subprocess.CalledProcessError:
raise PermissionError("Not logged in to Azure. Run 'az login' first.")
except subprocess.CalledProcessError as exc:
raise PermissionError("Not logged in to Azure. Run 'az login' first.") from exc

# Set subscription if provided
if self.subscription:
Expand All @@ -212,8 +212,8 @@ def check_prerequisites(self):
shell=IS_WINDOWS,
)
self.logger.info(f"Using Azure subscription: {self.subscription}")
except subprocess.CalledProcessError:
raise ValueError(f"Failed to set subscription: {self.subscription}")
except subprocess.CalledProcessError as exc:
raise ValueError(f"Failed to set subscription: {self.subscription}") from exc
else:
# Get current subscription
current_sub = self.azure_cli_executor.execute(
Expand All @@ -233,20 +233,20 @@ def fetch_cluster_information(self):

def analyze_vnet_configuration(self):
"""Analyze VNet configuration using ClusterDataCollector"""
collector = ClusterDataCollector(self.azure_cli_executor, self.logger)
self.vnets_analysis = collector.collect_vnet_info(self.agent_pools)
self.cluster_data_collector = ClusterDataCollector(self.azure_cli_executor, self.logger)
self.vnets_analysis = self.cluster_data_collector.collect_vnet_info(self.agent_pools)

def analyze_outbound_connectivity(self):
"""Analyze outbound connectivity configuration using OutboundConnectivityAnalyzer"""
analyzer = OutboundConnectivityAnalyzer(
self.outbound_analyzer = OutboundConnectivityAnalyzer(
cluster_info=self.cluster_info,
agent_pools=self.agent_pools,
azure_cli=self.azure_cli_executor,
logger=self.logger,
)

self.outbound_analysis = analyzer.analyze(show_details=self.show_details)
self.outbound_ips = analyzer.get_outbound_ips()
self.outbound_analysis = self.outbound_analyzer.analyze(show_details=self.show_details)
self.outbound_ips = self.outbound_analyzer.get_outbound_ips()

def _analyze_node_subnet_udrs(self):
"""Analyze User Defined Routes on node subnets using RouteTableAnalyzer"""
Expand All @@ -260,8 +260,6 @@ def analyze_vmss_configuration(self):

def analyze_nsg_configuration(self):
"""Analyze Network Security Group configuration for AKS nodes using modular NSGAnalyzer"""
self.logger.info("Analyzing NSG configuration...")

try:
# Create NSG analyzer instance with the new modular component
nsg_analyzer = NSGAnalyzer(
Expand Down Expand Up @@ -346,8 +344,8 @@ def _get_current_client_ip(self):
import urllib.error
import urllib.request

response = urllib.request.urlopen("https://api.ipify.org", timeout=5)
return response.read().decode("utf-8").strip()
with urllib.request.urlopen("https://api.ipify.org", timeout=5) as response:
return response.read().decode("utf-8").strip()
except Exception:
return None

Expand All @@ -360,10 +358,10 @@ def check_api_connectivity(self):

def analyze_misconfigurations(self):
"""Analyze potential misconfigurations and failures using MisconfigurationAnalyzer"""
analyzer = MisconfigurationAnalyzer(self.azure_cli_executor, self.logger)
self.misconfiguration_analyzer = MisconfigurationAnalyzer(self.azure_cli_executor, self.logger)

# Run analysis and get findings
findings, cluster_stopped = analyzer.analyze(
findings, cluster_stopped = self.misconfiguration_analyzer.analyze(
cluster_info=self.cluster_info,
outbound_analysis=self.outbound_analysis,
outbound_ips=self.outbound_ips,
Expand All @@ -372,6 +370,7 @@ def analyze_misconfigurations(self):
nsg_analysis=self.nsg_analysis,
api_probe_results=self.api_probe_results,
vmss_analysis=self.vmss_analysis,
outbound_analyzer=self.outbound_analyzer,
)

# Store results
Expand Down Expand Up @@ -409,6 +408,27 @@ def generate_report(self):
if self.json_report:
report_gen.save_json_report(self.json_report, file_permissions=DEFAULT_FILE_PERMISSIONS)

def collect_permission_findings(self):
"""Collect permission-related findings from all analyzers"""
# Collect from cluster data collector
if hasattr(self, "cluster_data_collector") and hasattr(self.cluster_data_collector, "findings"):
for finding in self.cluster_data_collector.findings:
self.findings.append(finding.to_dict() if hasattr(finding, "to_dict") else finding)

# Collect from outbound analyzer
if hasattr(self, "outbound_analyzer") and hasattr(self.outbound_analyzer, "findings"):
for finding in self.outbound_analyzer.findings:
self.findings.append(finding.to_dict() if hasattr(finding, "to_dict") else finding)

# Collect from misconfiguration analyzer
if hasattr(self, "misconfiguration_analyzer") and hasattr(self.misconfiguration_analyzer, "findings"):
for finding in self.misconfiguration_analyzer.findings:
self.findings.append(finding.to_dict() if hasattr(finding, "to_dict") else finding)

# NSG and DNS analyzer findings are already collected in their respective methods
# Note: Permission findings are created by analyzers with specific context,
# so we don't need to duplicate them from azure_cli.permission_errors

def run(self):
"""Main execution method"""
self.parse_arguments()
Expand All @@ -426,6 +446,7 @@ def run(self):
self.analyze_api_server_access()
self.check_api_connectivity()
self.analyze_misconfigurations()
self.collect_permission_findings() # Collect all permission findings before reporting
self.generate_report()


Expand Down
7 changes: 2 additions & 5 deletions aks_diagnostics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
"""
AKS Network Diagnostics Package
Modular package for analyzing AKS cluster network configurations
"""
"""AKS Network Diagnostics - Comprehensive AKS network configuration analysis tool"""

__version__ = "1.1.2"
__version__ = "1.2.0"
__author__ = "Azure Networking Diagnostics Generator"

# Import only the modules that exist
Expand Down
2 changes: 1 addition & 1 deletion aks_diagnostics/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
- Python code uses semantic version without prefix (1.0.0, 2.1.0)
"""

__version__ = "1.1.2"
__version__ = "1.2.0"
__author__ = "Azure Networking Diagnostics Generator"
__description__ = "Comprehensive read-only analysis of AKS cluster network configuration"
127 changes: 122 additions & 5 deletions aks_diagnostics/azure_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class AzureCLIExecutor:
def __init__(self):
"""Initialize Azure CLI executor"""
self.logger = logging.getLogger("aks_net_diagnostics.azure_cli")
self.permission_errors = [] # Track permission issues encountered

def execute(self, cmd: List[str], expect_json: bool = True, timeout: Optional[int] = None) -> Any:
"""
Expand Down Expand Up @@ -80,11 +81,16 @@ def execute(self, cmd: List[str], expect_json: bool = True, timeout: Optional[in
stderr_output = e.stderr.strip() if e.stderr else ""
stdout_output = e.stdout.strip() if e.stdout else ""

self.logger.error(f"Azure CLI command failed: {cmd_str}")
if stderr_output:
self.logger.error(f"Error: {stderr_output}")
elif stdout_output:
self.logger.error(f"Output: {stdout_output}")
# Check if this is a permission error - don't log it as ERROR since it will be handled gracefully
is_permission_error = self._is_authorization_error(stderr_output)

if not is_permission_error:
# Only log as ERROR if it's not a permission issue
self.logger.error(f"Azure CLI command failed: {cmd_str}")
if stderr_output:
self.logger.error(f"Error: {stderr_output}")
elif stdout_output:
self.logger.error(f"Output: {stdout_output}")

# Check for authentication errors
if "az login" in stderr_output.lower() or "authentication" in stderr_output.lower():
Expand Down Expand Up @@ -170,3 +176,114 @@ def get_current_subscription(self) -> str:
if isinstance(result, str) and result.strip():
return result.strip()
return ""

def _is_authorization_error(self, stderr: str) -> bool:
"""
Check if error is due to insufficient permissions

Args:
stderr: Standard error output from Azure CLI

Returns:
True if the error indicates authorization/permission failure
"""
error_lower = stderr.lower()

auth_patterns = [
"authorizationfailed",
"does not have authorization",
"insufficient privileges",
"forbidden",
"the client", # Common in "The client 'user@example.com' does not have authorization..."
"(401)", # HTTP 401 Unauthorized
"permission", # Generic permission errors
]

return any(pattern in error_lower for pattern in auth_patterns)

def _extract_permission_action(self, stderr: str, command: List[str]) -> str:
"""
Extract or infer the missing permission from error or command

Args:
stderr: Standard error output from Azure CLI
command: The Azure CLI command that failed (without 'az' prefix)

Returns:
Permission action string (e.g., "Microsoft.Network/virtualNetworks/read")
"""
# Try to extract from error message first
error_lower = stderr.lower()

# Common permission patterns in Azure CLI errors
if "microsoft.network/virtualnetworks/read" in error_lower:
return "Microsoft.Network/virtualNetworks/read"
elif "microsoft.compute/virtualmachinescalesets" in error_lower:
return "Microsoft.Compute/virtualMachineScaleSets/read"
elif "microsoft.network/networksecuritygroups" in error_lower:
return "Microsoft.Network/networkSecurityGroups/read"
elif "microsoft.network/loadbalancers" in error_lower:
return "Microsoft.Network/loadBalancers/read"
elif "microsoft.network/privatednszones" in error_lower:
return "Microsoft.Network/privateDnsZones/read"

# Infer from command if not in error message
cmd_str = " ".join(command).lower()

if "vnet" in cmd_str and ("show" in cmd_str or "list" in cmd_str):
return "Microsoft.Network/virtualNetworks/read"
elif "vmss" in cmd_str and ("show" in cmd_str or "list" in cmd_str):
return "Microsoft.Compute/virtualMachineScaleSets/read"
elif "nsg" in cmd_str and "show" in cmd_str:
return "Microsoft.Network/networkSecurityGroups/read"
elif ("lb" in cmd_str or "load-balancer" in cmd_str) and "show" in cmd_str:
return "Microsoft.Network/loadBalancers/read"
elif "private-dns" in cmd_str:
return "Microsoft.Network/privateDnsZones/read"
elif "aks" in cmd_str and ("show" in cmd_str or "list" in cmd_str):
return "Microsoft.ContainerService/managedClusters/read"

return "Unknown permission (check Azure Activity Log for details)"

def execute_with_permission_check(
self, cmd: List[str], context: str, expect_json: bool = True, timeout: Optional[int] = None
) -> Optional[Any]:
"""
Execute Azure CLI command with permission error handling.

Args:
cmd: Azure CLI command parts (without 'az' prefix)
context: Human-readable context for error messages (e.g., "retrieve VNet details")
expect_json: Whether to parse output as JSON
timeout: Optional custom timeout in seconds

Returns:
Command output or None if permission error occurred

Raises:
Other exceptions if not a permission error
"""
try:
return self.execute(cmd, expect_json=expect_json, timeout=timeout)
except AzureCLIError as e:
# Check if it's a permission error
stderr_output = getattr(e, "stderr", str(e))

if self._is_authorization_error(stderr_output):
# Permission error - log and track
action = self._extract_permission_action(stderr_output, cmd)

permission_error = {
"context": context,
"command": " ".join(["az"] + cmd),
"permission": action,
"error": stderr_output,
}
self.permission_errors.append(permission_error)

self.logger.warning(f"Insufficient permissions to {context}. Required: {action}")

return None # Graceful degradation

# Not a permission error, re-raise
raise
Loading