Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/azure-cli/azure/cli/command_modules/acr/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
- name: Gets health state of the environment, without stopping on first error.
text: >
az acr check-health --ignore-errors
- name: Verify that blobs within a given image, identified as 'name:tag' or 'name@digest', can be downloaded from the repository.
text: >
az acr check-health -n MyRegistry -t hello-world@sha256:123
"""

helps['acr check-name'] = """
Expand Down
1 change: 1 addition & 0 deletions src/azure-cli/azure/cli/command_modules/acr/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ def load_arguments(self, _): # pylint: disable=too-many-statements
c.argument('ignore_errors', options_list=['--ignore-errors'], help='Provide all health checks, even if errors are found', action='store_true', required=False)
c.argument('vnet', options_list=['--vnet'],
help="Virtual network ID so to run this command inside a VNET to verify the DNS routing to private endpoints", required=False)
c.argument('image', arg_type=image_by_tag_or_digest_type, required=False)

with self.argument_context('acr scope-map') as c:
c.argument('registry_name', options_list=['--registry', '-r'])
Expand Down
245 changes: 204 additions & 41 deletions src/azure-cli/azure/cli/command_modules/acr/check_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------

import re

from knack.util import CLIError
from knack.log import get_logger
from .custom import get_docker_command
Expand All @@ -27,7 +28,6 @@
NOTARY_VERSION_REGEX = re.compile(r'Version:\s+([.\d]+)')
DOCKER_PULL_WRONG_PLATFORM = 'cannot be used on this platform'


# Utilities functions
def print_pass(message):
logger.warning("%s : OK", str(message))
Expand Down Expand Up @@ -71,12 +71,99 @@ def _subprocess_communicate(command_parts, shell=False):
)
return output, warning, stderr, succeeded

# Get blob identified by digest - GET {url}/v2/{name}/blobs/{digest}
def _get_blob(login_server, repository_name, digest):
Comment thread
SakoPak marked this conversation as resolved.
return 'https://{}/v2/{}/blobs/{}'.format(login_server, repository_name, digest)

# Validate if image layer can be pulled
def _validate_image_layers(repository_name,
digest,
login_server,
username,
password):

import requests
from ._docker_utils import get_manifest_authorization_header, parse_error_message

# Get manifest
# GET {url}/v2/{name}/manifests/{reference}
manifest_url = 'https://{}/v2/{}/manifests/{}'.format(login_server, repository_name, digest)

manifest_response = requests.get(
manifest_url,
headers=get_manifest_authorization_header(username, password))

manifest = manifest_response.json()

# Pull the smallest blob layer in the image to test if the image can be pulled
# First, check if the manifest is a Docker Manifest List
Comment thread
SakoPak marked this conversation as resolved.
if manifest['mediaType'] == 'application/vnd.docker.distribution.manifest.list.v2+json':
Comment thread
SakoPak marked this conversation as resolved.
Comment thread
SakoPak marked this conversation as resolved.
smallest_blobs = []

# Iterate over each manifest in the list
for manifest_item in manifest['manifests']:
# Get the manifest for the image
image_manifest_response = requests.get(
'https://{}/v2/{}/manifests/{}'.format(login_server, repository_name, manifest_item['digest']),
headers=get_manifest_authorization_header(username, password))

image_manifest = image_manifest_response.json()

# Find the smallest blob in the image manifest
smallest_blob = min(image_manifest['layers'], key=lambda layer: layer['size'])
smallest_blobs.append(smallest_blob)

# Find the smallest blob across all image manifests
smallest_blob = min(smallest_blobs, key=lambda blob: blob['size'])

else:
# If manifest is not a Docker Manifest List, find the smallest blob in the given single manifest
smallest_blob = min(manifest['layers'], key=lambda layer: layer['size'])

smallest_digest = smallest_blob['digest']
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One edge case to account for here is that smallest_blob could be empty, this is because manifests with no layers and Manifest lists with no sub-manifests are also technically allowed. In that situation you might be able to pull the manifest as if it is a regular blob (haven't tried it but it should work) or error and ask the user to use a different image.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok that's a great point, I will factor in this edge case and have it pull the manifest like a regular blob, thanks!


# Get blob
request_url = _get_blob(login_server, repository_name, smallest_digest)
logger.debug(add_timestamp("Sending a HTTP GET request to {}".format(request_url)))

res = requests.get(request_url, auth=(username, password))

# If blob layer cannot be pulled, return server error message;
# If server error not available, will return custom error message
if res.status_code >= 400:
raise CLIError(parse_error_message('Could not get the requested data.', res), res.status_code)
print_pass("Blobs in image '{}' can be pulled from registry '{}'".format(digest, login_server))

# Checks inputted image for possible matches
def _check_image_match(repository_name,
digest,
login_server,
username,
password):
from difflib import get_close_matches
from .manifest import _get_manifest_path
from .repository import _obtain_data_from_registry

image_digest_list = _obtain_data_from_registry(
Comment thread
SakoPak marked this conversation as resolved.
login_server=login_server,
path=_get_manifest_path(repository_name),
username=username,
password=password,
result_index='manifests',
orderby=None)

# Get a list of digests that have at least 80% similarity with the inputted image name
digests = [item['digest'] for item in image_digest_list]
matches = get_close_matches(digest, digests, n=1, cutoff=0.8)

# Give a suggestion to the user if a match is found
if matches:
raise CLIError("Image '{}' not found. Did you mean to input '{}'?".format(digest, matches[0]))

# Checks for the environment
# Checks docker command, docker daemon, docker version and docker pull
def _get_docker_status_and_version(ignore_errors, yes):
from ._errors import DOCKER_DAEMON_ERROR, DOCKER_PULL_ERROR, DOCKER_VERSION_ERROR

# Docker command and docker daemon check
docker_command, error = get_docker_command(is_diagnostics_context=True)
docker_daemon_available = True
Expand Down Expand Up @@ -129,7 +216,6 @@ def _get_cli_version():
from azure.cli.core import __version__ as core_version
logger.warning('Azure CLI version: %s', core_version)


# Get helm versions
def _get_helm_version(ignore_errors):
from ._errors import HELM_VERSION_ERROR
Expand Down Expand Up @@ -294,30 +380,13 @@ def _get_endpoint_and_token_status(cmd, login_server, ignore_errors):
print_pass("Fetch refresh token for registry '{}'".format(login_server))
print_pass("Fetch access token for registry '{}'".format(login_server))


def _check_registry_health(cmd, registry_name, ignore_errors):
def _check_registry_health(cmd,
registry,
Comment thread
SakoPak marked this conversation as resolved.
login_server,
ignore_errors):
from azure.cli.core.profiles import ResourceType
if registry_name is None:
Comment thread
SakoPak marked this conversation as resolved.
logger.warning("Registry name must be provided to check connectivity.")
return

registry = None
# Connectivity
try:
registry, _ = get_registry_by_name(cmd.cli_ctx, registry_name)
login_server = registry.login_server.rstrip('/')
except CLIError:
from ._docker_utils import get_login_server_suffix
suffix = get_login_server_suffix(cmd.cli_ctx)

if not suffix:
from ._errors import LOGIN_SERVER_ERROR
_handle_error(LOGIN_SERVER_ERROR.format_error_message(registry_name), ignore_errors)
return

login_server = registry_name + suffix

status_validated = _get_registry_status(login_server, registry_name, ignore_errors)
status_validated = _get_registry_status(login_server, registry.name, ignore_errors)
if status_validated:
_get_endpoint_and_token_status(cmd, login_server, ignore_errors)

Expand All @@ -339,23 +408,20 @@ def _check_registry_health(cmd, registry_name, ignore_errors):
pass
if not valid_identity:
from ._errors import CMK_MANAGED_IDENTITY_ERROR
_handle_error(CMK_MANAGED_IDENTITY_ERROR.format_error_message(registry_name), ignore_errors)
_handle_error(CMK_MANAGED_IDENTITY_ERROR.format_error_message(registry.name), ignore_errors)


def _check_private_endpoint(cmd, registry_name, vnet_of_private_endpoint): # pylint: disable=too-many-locals, too-many-statements
def _check_private_endpoint(cmd,
registry,
Comment thread
SakoPak marked this conversation as resolved.
vnet_of_private_endpoint): # pylint: disable=too-many-locals, too-many-statements
import socket
from msrestazure.tools import parse_resource_id, is_valid_resource_id, resource_id

if registry_name is None:
if registry.name is None:
raise CLIError("Registry name must be provided to verify DNS routings of its private endpoints")

registry = None

# retrieve registry
registry, _ = get_registry_by_name(cmd.cli_ctx, registry_name)

if not registry.private_endpoint_connections:
raise CLIError('Registry "{}" doesn\'t have private endpoints to verify DNS routings.'.format(registry_name))
raise CLIError('Registry "{}" doesn\'t have private endpoints to verify DNS routings.'.format(registry.name))

if is_valid_resource_id(vnet_of_private_endpoint):
res = parse_resource_id(vnet_of_private_endpoint)
Expand Down Expand Up @@ -393,21 +459,21 @@ def _check_private_endpoint(cmd, registry_name, vnet_of_private_endpoint): # py
if dns_config["privateLinkConnectionProperties"]["fqdns"][0] in dns_mappings:
err = ('Registry "{}" has more than one private endpoint in the vnet of "{}".'
' DNS routing will be unreliable')
raise CLIError(err.format(registry_name, vnet_of_private_endpoint))
raise CLIError(err.format(registry.name, vnet_of_private_endpoint))
dns_mappings[dns_config["privateLinkConnectionProperties"]["fqdns"][0]] = dns_config["privateIPAddress"]

dns_ok = True
if not dns_mappings:
err = ('Registry "{}" doesn\'t have private endpoints in the vnet of "{}".'
' Please make sure you provided correct vnet')
raise CLIError(err.format(registry_name, vnet_of_private_endpoint))
raise CLIError(err.format(registry.name, vnet_of_private_endpoint))

for fqdn in dns_mappings:
try:
result = socket.gethostbyname(fqdn)
if result != dns_mappings[fqdn]:
err = 'DNS routing to registry "%s" through private IP is incorrect. Expect: %s, Actual: %s'
logger.warning(err, registry_name, dns_mappings[fqdn], result)
logger.warning(err, registry.name, dns_mappings[fqdn], result)
dns_ok = False
except Exception as e: # pylint: disable=broad-except
logger.warning('Error resolving DNS for %s. Ex: %s', fqdn, e)
Expand All @@ -418,28 +484,125 @@ def _check_private_endpoint(cmd, registry_name, vnet_of_private_endpoint): # py
else:
raise CLIError('DNS routing verification failed')

# Validate --image input
# Validate if image layer can be pulled
def _validate_image_name(repository_name,
Comment thread
SakoPak marked this conversation as resolved.
digest,
login_server,
username,
password):
from .manifest import _get_v2_manifest_path

from ._docker_utils import (
request_data_from_registry,
get_manifest_authorization_header,
RegistryException
)
# First check if image exists in repository before validating image layers
try:
request_data_from_registry(
http_method='get',
login_server=login_server,
path=_get_v2_manifest_path(repository_name, digest),
username=username,
password=password,
json_payload=None,
file_payload=None,
manifest_headers=get_manifest_authorization_header(username, password),
params=digest)

# If image does not exist, check for possible matches
# This helps deliver a more useful error message in case of a typo or other user error
except RegistryException as e:
if e.status_code == 404:
Comment thread
SakoPak marked this conversation as resolved.
# If at least 80% similarity with the inputted image name is found, return a suggested image name to user
_check_image_match(repository_name,
digest,
login_server,
username,
password)
# otherwise, return the error message
raise CLIError("{} Please check if image was inputted correctly.".format(str(e)))

# If image exists, then validate image layers and check if blob can be pulled
_validate_image_layers(repository_name,
digest,
login_server,
username,
password)

# General command
def acr_check_health(cmd, # pylint: disable useless-return
registry_name=None,
image=None,
vnet=None,
ignore_errors=False,
yes=False,
registry_name=None):
ignore_errors=False,
resource_group_name=None,
username=None,
password=None):
from azure.cli.core.util import in_cloud_console
from .repository import get_access_credentials, get_image_digest
from ._utils import get_resource_group_name_by_registry_name

rg = get_resource_group_name_by_registry_name(cmd.cli_ctx, registry_name, resource_group_name)

Comment thread
SakoPak marked this conversation as resolved.
registry = None

# Connectivity
try:
registry, _ = get_registry_by_name(cmd.cli_ctx, registry_name, rg)
login_server = registry.login_server.rstrip('/')
except CLIError:
from ._docker_utils import get_login_server_suffix
# Checking the Azure Container Registry login server suffix in the current cloud
suffix = get_login_server_suffix(cmd.cli_ctx)
# If not found, return error
if not suffix:
Comment thread
SakoPak marked this conversation as resolved.
from ._errors import LOGIN_SERVER_ERROR
_handle_error(LOGIN_SERVER_ERROR.format_error_message(registry_name), ignore_errors)
return

in_cloud_console = in_cloud_console()
if in_cloud_console:
logger.warning("Environment checks are not supported in Azure Cloud Shell.")
else:
_get_docker_status_and_version(ignore_errors, yes)
_get_cli_version()

_check_registry_health(cmd, registry_name, ignore_errors)
_check_registry_health(cmd,
registry,
login_server,
ignore_errors)

# If --image is inputted, validate image
if image:
# Get repository name and digest
try:
repository_name, digest = get_image_digest(cmd, registry_name, image, username, password)
except CLIError as e:
raise CLIError("Could not find image '{}'. {}".format(image, e))

# Get the access credentials for the registry
login_server, username, password = get_access_credentials(
cmd,
registry_name=registry_name,
tenant_suffix=None,
username=None,
password=None,
repository=repository_name,
permission='pull')

_validate_image_name(repository_name,
digest,
login_server,
username,
password)

if vnet:
_check_private_endpoint(cmd, registry_name, vnet)

if not in_cloud_console:
_get_helm_version(ignore_errors)
_get_notary_version(ignore_errors)

logger.warning(FAQ_MESSAGE)
Loading
Loading