-
Notifications
You must be signed in to change notification settings - Fork 3.4k
[ACR] Add new command to check if specific blob is available to pull from registry #27676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
9ed61b2
7e4f854
fe7a8ca
61a9b1e
b7be29d
66c416a
fe12b1a
2fdc6cd
8e23abf
8bbf0b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| # -------------------------------------------------------------------------------------------- | ||
|
|
||
| import re | ||
|
|
||
| from knack.util import CLIError | ||
| from knack.log import get_logger | ||
| from .custom import get_docker_command | ||
|
|
@@ -27,7 +28,6 @@ | |
| NOTARY_VERSION_REGEX = re.compile(r'Version:\s+([.\d]+)') | ||
| DOCKER_PULL_WRONG_PLATFORM = 'cannot be used on this platform' | ||
|
|
||
|
|
||
| # Utilities functions | ||
| def print_pass(message): | ||
| logger.warning("%s : OK", str(message)) | ||
|
|
@@ -71,12 +71,99 @@ def _subprocess_communicate(command_parts, shell=False): | |
| ) | ||
| return output, warning, stderr, succeeded | ||
|
|
||
| # Get blob identified by digest - GET {url}/v2/{name}/blobs/{digest} | ||
| def _get_blob(login_server, repository_name, digest): | ||
| return 'https://{}/v2/{}/blobs/{}'.format(login_server, repository_name, digest) | ||
|
|
||
| # Validate if image layer can be pulled | ||
| def _validate_image_layers(repository_name, | ||
| digest, | ||
| login_server, | ||
| username, | ||
| password): | ||
|
|
||
| import requests | ||
| from ._docker_utils import get_manifest_authorization_header, parse_error_message | ||
|
|
||
| # Get manifest | ||
| # GET {url}/v2/{name}/manifests/{reference} | ||
| manifest_url = 'https://{}/v2/{}/manifests/{}'.format(login_server, repository_name, digest) | ||
|
|
||
| manifest_response = requests.get( | ||
| manifest_url, | ||
| headers=get_manifest_authorization_header(username, password)) | ||
|
|
||
| manifest = manifest_response.json() | ||
|
|
||
| # Pull the smallest blob layer in the image to test if the image can be pulled | ||
| # First, check if the manifest is a Docker Manifest List | ||
|
SakoPak marked this conversation as resolved.
|
||
| if manifest['mediaType'] == 'application/vnd.docker.distribution.manifest.list.v2+json': | ||
|
SakoPak marked this conversation as resolved.
SakoPak marked this conversation as resolved.
|
||
| smallest_blobs = [] | ||
|
|
||
| # Iterate over each manifest in the list | ||
| for manifest_item in manifest['manifests']: | ||
| # Get the manifest for the image | ||
| image_manifest_response = requests.get( | ||
| 'https://{}/v2/{}/manifests/{}'.format(login_server, repository_name, manifest_item['digest']), | ||
| headers=get_manifest_authorization_header(username, password)) | ||
|
|
||
| image_manifest = image_manifest_response.json() | ||
|
|
||
| # Find the smallest blob in the image manifest | ||
| smallest_blob = min(image_manifest['layers'], key=lambda layer: layer['size']) | ||
| smallest_blobs.append(smallest_blob) | ||
|
|
||
| # Find the smallest blob across all image manifests | ||
| smallest_blob = min(smallest_blobs, key=lambda blob: blob['size']) | ||
|
|
||
| else: | ||
| # If manifest is not a Docker Manifest List, find the smallest blob in the given single manifest | ||
| smallest_blob = min(manifest['layers'], key=lambda layer: layer['size']) | ||
|
|
||
| smallest_digest = smallest_blob['digest'] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One edge case to account for here is that smallest_blob could be empty, this is because manifests with no layers and Manifest lists with no sub-manifests are also technically allowed. In that situation you might be able to pull the manifest as if it is a regular blob (haven't tried it but it should work) or error and ask the user to use a different image.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok that's a great point, I will factor in this edge case and have it pull the manifest like a regular blob, thanks! |
||
|
|
||
| # Get blob | ||
| request_url = _get_blob(login_server, repository_name, smallest_digest) | ||
| logger.debug(add_timestamp("Sending a HTTP GET request to {}".format(request_url))) | ||
|
|
||
| res = requests.get(request_url, auth=(username, password)) | ||
|
|
||
| # If blob layer cannot be pulled, return server error message; | ||
| # If server error not available, will return custom error message | ||
| if res.status_code >= 400: | ||
| raise CLIError(parse_error_message('Could not get the requested data.', res), res.status_code) | ||
| print_pass("Blobs in image '{}' can be pulled from registry '{}'".format(digest, login_server)) | ||
|
|
||
| # Checks inputted image for possible matches | ||
| def _check_image_match(repository_name, | ||
| digest, | ||
| login_server, | ||
| username, | ||
| password): | ||
| from difflib import get_close_matches | ||
| from .manifest import _get_manifest_path | ||
| from .repository import _obtain_data_from_registry | ||
|
|
||
| image_digest_list = _obtain_data_from_registry( | ||
|
SakoPak marked this conversation as resolved.
|
||
| login_server=login_server, | ||
| path=_get_manifest_path(repository_name), | ||
| username=username, | ||
| password=password, | ||
| result_index='manifests', | ||
| orderby=None) | ||
|
|
||
| # Get a list of digests that have at least 80% similarity with the inputted image name | ||
| digests = [item['digest'] for item in image_digest_list] | ||
| matches = get_close_matches(digest, digests, n=1, cutoff=0.8) | ||
|
|
||
| # Give a suggestion to the user if a match is found | ||
| if matches: | ||
| raise CLIError("Image '{}' not found. Did you mean to input '{}'?".format(digest, matches[0])) | ||
|
|
||
| # Checks for the environment | ||
| # Checks docker command, docker daemon, docker version and docker pull | ||
| def _get_docker_status_and_version(ignore_errors, yes): | ||
| from ._errors import DOCKER_DAEMON_ERROR, DOCKER_PULL_ERROR, DOCKER_VERSION_ERROR | ||
|
|
||
| # Docker command and docker daemon check | ||
| docker_command, error = get_docker_command(is_diagnostics_context=True) | ||
| docker_daemon_available = True | ||
|
|
@@ -129,7 +216,6 @@ def _get_cli_version(): | |
| from azure.cli.core import __version__ as core_version | ||
| logger.warning('Azure CLI version: %s', core_version) | ||
|
|
||
|
|
||
| # Get helm versions | ||
| def _get_helm_version(ignore_errors): | ||
| from ._errors import HELM_VERSION_ERROR | ||
|
|
@@ -294,30 +380,13 @@ def _get_endpoint_and_token_status(cmd, login_server, ignore_errors): | |
| print_pass("Fetch refresh token for registry '{}'".format(login_server)) | ||
| print_pass("Fetch access token for registry '{}'".format(login_server)) | ||
|
|
||
|
|
||
| def _check_registry_health(cmd, registry_name, ignore_errors): | ||
| def _check_registry_health(cmd, | ||
| registry, | ||
|
SakoPak marked this conversation as resolved.
|
||
| login_server, | ||
| ignore_errors): | ||
| from azure.cli.core.profiles import ResourceType | ||
| if registry_name is None: | ||
|
SakoPak marked this conversation as resolved.
|
||
| logger.warning("Registry name must be provided to check connectivity.") | ||
| return | ||
|
|
||
| registry = None | ||
| # Connectivity | ||
| try: | ||
| registry, _ = get_registry_by_name(cmd.cli_ctx, registry_name) | ||
| login_server = registry.login_server.rstrip('/') | ||
| except CLIError: | ||
| from ._docker_utils import get_login_server_suffix | ||
| suffix = get_login_server_suffix(cmd.cli_ctx) | ||
|
|
||
| if not suffix: | ||
| from ._errors import LOGIN_SERVER_ERROR | ||
| _handle_error(LOGIN_SERVER_ERROR.format_error_message(registry_name), ignore_errors) | ||
| return | ||
|
|
||
| login_server = registry_name + suffix | ||
|
|
||
| status_validated = _get_registry_status(login_server, registry_name, ignore_errors) | ||
| status_validated = _get_registry_status(login_server, registry.name, ignore_errors) | ||
| if status_validated: | ||
| _get_endpoint_and_token_status(cmd, login_server, ignore_errors) | ||
|
|
||
|
|
@@ -339,23 +408,20 @@ def _check_registry_health(cmd, registry_name, ignore_errors): | |
| pass | ||
| if not valid_identity: | ||
| from ._errors import CMK_MANAGED_IDENTITY_ERROR | ||
| _handle_error(CMK_MANAGED_IDENTITY_ERROR.format_error_message(registry_name), ignore_errors) | ||
| _handle_error(CMK_MANAGED_IDENTITY_ERROR.format_error_message(registry.name), ignore_errors) | ||
|
|
||
|
|
||
| def _check_private_endpoint(cmd, registry_name, vnet_of_private_endpoint): # pylint: disable=too-many-locals, too-many-statements | ||
| def _check_private_endpoint(cmd, | ||
| registry, | ||
|
SakoPak marked this conversation as resolved.
|
||
| vnet_of_private_endpoint): # pylint: disable=too-many-locals, too-many-statements | ||
| import socket | ||
| from msrestazure.tools import parse_resource_id, is_valid_resource_id, resource_id | ||
|
|
||
| if registry_name is None: | ||
| if registry.name is None: | ||
| raise CLIError("Registry name must be provided to verify DNS routings of its private endpoints") | ||
|
|
||
| registry = None | ||
|
|
||
| # retrieve registry | ||
| registry, _ = get_registry_by_name(cmd.cli_ctx, registry_name) | ||
|
|
||
| if not registry.private_endpoint_connections: | ||
| raise CLIError('Registry "{}" doesn\'t have private endpoints to verify DNS routings.'.format(registry_name)) | ||
| raise CLIError('Registry "{}" doesn\'t have private endpoints to verify DNS routings.'.format(registry.name)) | ||
|
|
||
| if is_valid_resource_id(vnet_of_private_endpoint): | ||
| res = parse_resource_id(vnet_of_private_endpoint) | ||
|
|
@@ -393,21 +459,21 @@ def _check_private_endpoint(cmd, registry_name, vnet_of_private_endpoint): # py | |
| if dns_config["privateLinkConnectionProperties"]["fqdns"][0] in dns_mappings: | ||
| err = ('Registry "{}" has more than one private endpoint in the vnet of "{}".' | ||
| ' DNS routing will be unreliable') | ||
| raise CLIError(err.format(registry_name, vnet_of_private_endpoint)) | ||
| raise CLIError(err.format(registry.name, vnet_of_private_endpoint)) | ||
| dns_mappings[dns_config["privateLinkConnectionProperties"]["fqdns"][0]] = dns_config["privateIPAddress"] | ||
|
|
||
| dns_ok = True | ||
| if not dns_mappings: | ||
| err = ('Registry "{}" doesn\'t have private endpoints in the vnet of "{}".' | ||
| ' Please make sure you provided correct vnet') | ||
| raise CLIError(err.format(registry_name, vnet_of_private_endpoint)) | ||
| raise CLIError(err.format(registry.name, vnet_of_private_endpoint)) | ||
|
|
||
| for fqdn in dns_mappings: | ||
| try: | ||
| result = socket.gethostbyname(fqdn) | ||
| if result != dns_mappings[fqdn]: | ||
| err = 'DNS routing to registry "%s" through private IP is incorrect. Expect: %s, Actual: %s' | ||
| logger.warning(err, registry_name, dns_mappings[fqdn], result) | ||
| logger.warning(err, registry.name, dns_mappings[fqdn], result) | ||
| dns_ok = False | ||
| except Exception as e: # pylint: disable=broad-except | ||
| logger.warning('Error resolving DNS for %s. Ex: %s', fqdn, e) | ||
|
|
@@ -418,28 +484,125 @@ def _check_private_endpoint(cmd, registry_name, vnet_of_private_endpoint): # py | |
| else: | ||
| raise CLIError('DNS routing verification failed') | ||
|
|
||
| # Validate --image input | ||
| # Validate if image layer can be pulled | ||
| def _validate_image_name(repository_name, | ||
|
SakoPak marked this conversation as resolved.
|
||
| digest, | ||
| login_server, | ||
| username, | ||
| password): | ||
| from .manifest import _get_v2_manifest_path | ||
|
|
||
| from ._docker_utils import ( | ||
| request_data_from_registry, | ||
| get_manifest_authorization_header, | ||
| RegistryException | ||
| ) | ||
| # First check if image exists in repository before validating image layers | ||
| try: | ||
| request_data_from_registry( | ||
| http_method='get', | ||
| login_server=login_server, | ||
| path=_get_v2_manifest_path(repository_name, digest), | ||
| username=username, | ||
| password=password, | ||
| json_payload=None, | ||
| file_payload=None, | ||
| manifest_headers=get_manifest_authorization_header(username, password), | ||
| params=digest) | ||
|
|
||
| # If image does not exist, check for possible matches | ||
| # This helps deliver a more useful error message in case of a typo or other user error | ||
| except RegistryException as e: | ||
| if e.status_code == 404: | ||
|
SakoPak marked this conversation as resolved.
|
||
| # If at least 80% similarity with the inputted image name is found, return a suggested image name to user | ||
| _check_image_match(repository_name, | ||
| digest, | ||
| login_server, | ||
| username, | ||
| password) | ||
| # otherwise, return the error message | ||
| raise CLIError("{} Please check if image was inputted correctly.".format(str(e))) | ||
|
|
||
| # If image exists, then validate image layers and check if blob can be pulled | ||
| _validate_image_layers(repository_name, | ||
| digest, | ||
| login_server, | ||
| username, | ||
| password) | ||
|
|
||
| # General command | ||
| def acr_check_health(cmd, # pylint: disable useless-return | ||
| registry_name=None, | ||
| image=None, | ||
| vnet=None, | ||
| ignore_errors=False, | ||
| yes=False, | ||
| registry_name=None): | ||
| ignore_errors=False, | ||
| resource_group_name=None, | ||
| username=None, | ||
| password=None): | ||
| from azure.cli.core.util import in_cloud_console | ||
| from .repository import get_access_credentials, get_image_digest | ||
| from ._utils import get_resource_group_name_by_registry_name | ||
|
|
||
| rg = get_resource_group_name_by_registry_name(cmd.cli_ctx, registry_name, resource_group_name) | ||
|
|
||
|
SakoPak marked this conversation as resolved.
|
||
| registry = None | ||
|
|
||
| # Connectivity | ||
| try: | ||
| registry, _ = get_registry_by_name(cmd.cli_ctx, registry_name, rg) | ||
| login_server = registry.login_server.rstrip('/') | ||
| except CLIError: | ||
| from ._docker_utils import get_login_server_suffix | ||
| # Checking the Azure Container Registry login server suffix in the current cloud | ||
| suffix = get_login_server_suffix(cmd.cli_ctx) | ||
| # If not found, return error | ||
| if not suffix: | ||
|
SakoPak marked this conversation as resolved.
|
||
| from ._errors import LOGIN_SERVER_ERROR | ||
| _handle_error(LOGIN_SERVER_ERROR.format_error_message(registry_name), ignore_errors) | ||
| return | ||
|
|
||
| in_cloud_console = in_cloud_console() | ||
| if in_cloud_console: | ||
| logger.warning("Environment checks are not supported in Azure Cloud Shell.") | ||
| else: | ||
| _get_docker_status_and_version(ignore_errors, yes) | ||
| _get_cli_version() | ||
|
|
||
| _check_registry_health(cmd, registry_name, ignore_errors) | ||
| _check_registry_health(cmd, | ||
| registry, | ||
| login_server, | ||
| ignore_errors) | ||
|
|
||
| # If --image is inputted, validate image | ||
| if image: | ||
| # Get repository name and digest | ||
| try: | ||
| repository_name, digest = get_image_digest(cmd, registry_name, image, username, password) | ||
| except CLIError as e: | ||
| raise CLIError("Could not find image '{}'. {}".format(image, e)) | ||
|
|
||
| # Get the access credentials for the registry | ||
| login_server, username, password = get_access_credentials( | ||
| cmd, | ||
| registry_name=registry_name, | ||
| tenant_suffix=None, | ||
| username=None, | ||
| password=None, | ||
| repository=repository_name, | ||
| permission='pull') | ||
|
|
||
| _validate_image_name(repository_name, | ||
| digest, | ||
| login_server, | ||
| username, | ||
| password) | ||
|
|
||
| if vnet: | ||
| _check_private_endpoint(cmd, registry_name, vnet) | ||
|
|
||
| if not in_cloud_console: | ||
| _get_helm_version(ignore_errors) | ||
| _get_notary_version(ignore_errors) | ||
|
|
||
| logger.warning(FAQ_MESSAGE) | ||
Uh oh!
There was an error while loading. Please reload this page.