-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Expand file tree
/
Copy path_validators.py
More file actions
385 lines (293 loc) · 13.9 KB
/
_validators.py
File metadata and controls
385 lines (293 loc) · 13.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import argparse
import base64
import binascii
from datetime import datetime
import re
from knack.util import CLIError
from azure.cli.core.commands.client_factory import get_mgmt_service_client
from azure.cli.core.commands.validators import validate_tags
secret_text_encoding_values = ['utf-8', 'utf-16le', 'utf-16be', 'ascii']
secret_binary_encoding_values = ['base64', 'hex']
def _extract_version(item_id):
return item_id.split('/')[-1]
def _get_resource_group_from_vault_name(cli_ctx, vault_name):
"""
Fetch resource group from vault name
:param str vault_name: name of the key vault
:return: resource group name or None
:rtype: str
"""
from azure.cli.core.profiles import ResourceType
from msrestazure.tools import parse_resource_id
client = get_mgmt_service_client(cli_ctx, ResourceType.MGMT_KEYVAULT).vaults
for vault in client.list():
id_comps = parse_resource_id(vault.id)
if 'name' in id_comps and id_comps['name'].lower() == vault_name.lower():
return id_comps['resource_group']
return None
# COMMAND NAMESPACE VALIDATORS
def process_certificate_cancel_namespace(namespace):
namespace.cancellation_requested = True
def process_secret_set_namespace(cmd, namespace):
validate_tags(namespace)
content = namespace.value
file_path = namespace.file_path
encoding = namespace.encoding
tags = namespace.tags or {}
use_error = CLIError("incorrect usage: [Required] --value VALUE | --file PATH")
if (content and file_path) or (not content and not file_path):
raise use_error
from azure.cli.core.profiles import ResourceType
SecretAttributes = cmd.get_models('SecretAttributes', resource_type=ResourceType.DATA_KEYVAULT)
namespace.secret_attributes = SecretAttributes()
if namespace.expires:
namespace.secret_attributes.expires = namespace.expires
if namespace.disabled:
namespace.secret_attributes.enabled = not namespace.disabled
if namespace.not_before:
namespace.secret_attributes.not_before = namespace.not_before
encoding = encoding or 'utf-8'
if file_path:
if encoding in secret_text_encoding_values:
with open(file_path, 'r') as f:
try:
content = f.read()
except UnicodeDecodeError:
raise CLIError("Unable to decode file '{}' with '{}' encoding.".format(
file_path, encoding))
encoded_str = content
encoded = content.encode(encoding)
decoded = encoded.decode(encoding)
elif encoding == 'base64':
with open(file_path, 'rb') as f:
content = f.read()
try:
encoded = base64.encodebytes(content)
except AttributeError:
encoded = base64.encodestring(content) # pylint: disable=deprecated-method
encoded_str = encoded.decode('utf-8')
decoded = base64.b64decode(encoded_str)
elif encoding == 'hex':
with open(file_path, 'rb') as f:
content = f.read()
encoded = binascii.b2a_hex(content)
encoded_str = encoded.decode('utf-8')
decoded = binascii.unhexlify(encoded_str)
if content != decoded:
raise CLIError("invalid encoding '{}'".format(encoding))
content = encoded_str
tags.update({'file-encoding': encoding})
namespace.tags = tags
namespace.value = content
# PARAMETER NAMESPACE VALIDATORS
def get_attribute_validator(name, attribute_class, create=False):
def validator(ns):
ns_dict = ns.__dict__
enabled = not ns_dict.pop('disabled') if create else ns_dict.pop('enabled')
attributes = attribute_class(
enabled=enabled,
not_before=ns_dict.pop('not_before', None),
expires=ns_dict.pop('expires', None))
setattr(ns, '{}_attributes'.format(name), attributes)
return validator
def validate_key_import_source(ns):
byok_file = ns.byok_file
pem_file = ns.pem_file
pem_password = ns.pem_password
if (not byok_file and not pem_file) or (byok_file and pem_file):
raise ValueError('supply exactly one: --byok-file, --pem-file')
if byok_file and pem_password:
raise ValueError('--byok-file cannot be used with --pem-password')
if pem_password and not pem_file:
raise ValueError('--pem-password must be used with --pem-file')
def validate_key_type(ns):
crv = getattr(ns, 'curve', None)
kty = getattr(ns, 'kty', None) or ('EC' if crv else 'RSA')
protection = getattr(ns, 'protection', None)
if protection == 'hsm':
kty = kty if kty.endswith('-HSM') else kty + '-HSM'
elif protection == 'software':
if getattr(ns, 'byok_file', None):
raise CLIError('BYOK keys are hardware protected. Omit --protection')
if kty.endswith('-HSM'):
raise CLIError('The key type {} is invalid for software protected keys. Omit --protection')
setattr(ns, 'kty', kty)
def validate_policy_permissions(ns):
key_perms = ns.key_permissions
secret_perms = ns.secret_permissions
cert_perms = ns.certificate_permissions
storage_perms = ns.storage_permissions
if not any([key_perms, secret_perms, cert_perms, storage_perms]):
raise argparse.ArgumentError(
None,
'specify at least one: --key-permissions, --secret-permissions, '
'--certificate-permissions --storage-permissions')
def validate_private_endpoint_connection_id(cmd, ns):
if ns.connection_id:
from azure.cli.core.util import parse_proxy_resource_id
result = parse_proxy_resource_id(ns.connection_id)
ns.resource_group_name = result['resource_group']
ns.vault_name = result['name']
ns.private_endpoint_connection_name = result['child_name_1']
if ns.vault_name and not ns.resource_group_name:
ns.resource_group_name = _get_resource_group_from_vault_name(cmd.cli_ctx, ns.vault_name)
if not all([ns.vault_name, ns.resource_group_name, ns.private_endpoint_connection_name]):
raise CLIError('incorrect usage: [--id ID | --name NAME --vault-name NAME]')
del ns.connection_id
def validate_principal(ns):
num_set = sum(1 for p in [ns.object_id, ns.spn, ns.upn] if p)
if num_set != 1:
raise argparse.ArgumentError(
None, 'specify exactly one: --object-id, --spn, --upn')
if ns.object_id:
ns.object_id = ns.object_id.strip('"')
def validate_resource_group_name(cmd, ns):
"""
Populate resource_group_name, if not provided
"""
if not ns.resource_group_name:
vault_name = ns.vault_name
group_name = _get_resource_group_from_vault_name(cmd.cli_ctx, vault_name)
if group_name:
ns.resource_group_name = group_name
else:
msg = "The Resource 'Microsoft.KeyVault/vaults/{}' not found within subscription."
raise CLIError(msg.format(vault_name))
def validate_deleted_vault_name(cmd, ns):
"""
Validate a deleted vault name; populate or validate location and resource_group_name
"""
from azure.cli.core.profiles import ResourceType
from msrestazure.tools import parse_resource_id
vault_name = ns.vault_name
vault = None
client = get_mgmt_service_client(cmd.cli_ctx, ResourceType.MGMT_KEYVAULT).vaults
# if the location is specified, use get_deleted rather than list_deleted
if ns.location:
vault = client.get_deleted(vault_name, ns.location)
id_comps = parse_resource_id(vault.properties.vault_id)
# otherwise, iterate through deleted vaults to find one with a matching name
else:
for v in client.list_deleted():
id_comps = parse_resource_id(v.properties.vault_id)
if id_comps['name'].lower() == vault_name.lower():
vault = v
ns.location = vault.properties.location
break
# if the vault was not found, throw an error
if not vault:
raise CLIError('No deleted vault was found with name ' + ns.vault_name)
setattr(ns, 'resource_group_name', getattr(ns, 'resource_group_name', None) or id_comps['resource_group'])
# resource_group_name must match the resource group of the deleted vault
if id_comps['resource_group'] != ns.resource_group_name:
raise CLIError("The specified resource group does not match that of the deleted vault %s. The vault "
"must be recovered to the original resource group %s."
% (vault_name, id_comps['resource_group']))
def validate_x509_certificate_chain(ns):
def _load_certificate_as_bytes(file_name):
cert_list = []
regex = r'-----BEGIN CERTIFICATE-----([^-]+)-----END CERTIFICATE-----'
with open(file_name, 'r') as f:
cert_data = f.read()
for entry in re.findall(regex, cert_data):
cert_list.append(base64.b64decode(entry.replace('\n', '')))
return cert_list
ns.x509_certificates = _load_certificate_as_bytes(ns.x509_certificates)
# ARGUMENT TYPES
def certificate_type(string):
""" Loads file and outputs contents as base64 encoded string. """
import os
try:
with open(os.path.expanduser(string), 'rb') as f:
cert_data = f.read()
return cert_data
except (IOError, OSError) as e:
raise CLIError("Unable to load certificate file '{}': {}.".format(string, e.strerror))
def datetime_type(string):
""" Validates UTC datettime in accepted format. Examples: 2017-12-31T01:11:59Z,
2017-12-31T01:11Z or 2017-12-31T01Z or 2017-12-31 """
accepted_date_formats = ['%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%MZ',
'%Y-%m-%dT%HZ', '%Y-%m-%d']
for form in accepted_date_formats:
try:
return datetime.strptime(string, form)
except ValueError: # checks next format
pass
raise ValueError("Input '{}' not valid. Valid example: 2000-12-31T12:59:59Z".format(string))
def get_vault_base_url_type(cli_ctx):
suffix = cli_ctx.cloud.suffixes.keyvault_dns
def vault_base_url_type(name):
return 'https://{}{}'.format(name, suffix)
return vault_base_url_type
def _construct_vnet(cmd, resource_group_name, vnet_name, subnet_name):
from msrestazure.tools import resource_id
from azure.cli.core.commands.client_factory import get_subscription_id
return resource_id(
subscription=get_subscription_id(cmd.cli_ctx),
resource_group=resource_group_name,
namespace='Microsoft.Network',
type='virtualNetworks',
name=vnet_name,
child_type_1='subnets',
child_name_1=subnet_name)
def validate_subnet(cmd, namespace):
from msrestazure.tools import is_valid_resource_id
subnet = namespace.subnet
subnet_is_id = is_valid_resource_id(subnet)
vnet = namespace.vnet_name
if (subnet_is_id and not vnet) or (not subnet and not vnet):
return
if subnet and not subnet_is_id and vnet:
namespace.subnet = _construct_vnet(cmd, namespace.resource_group_name, vnet, subnet)
else:
raise CLIError('incorrect usage: [--subnet ID | --subnet NAME --vnet-name NAME]')
def validate_vault_id(entity_type):
def _validate(ns):
from azure.keyvault.key_vault_id import KeyVaultIdentifier
pure_entity_type = entity_type.replace('deleted', '')
name = getattr(ns, pure_entity_type + '_name', None)
vault = getattr(ns, 'vault_base_url', None)
identifier = getattr(ns, 'identifier', None)
if identifier:
ident = KeyVaultIdentifier(uri=identifier, collection=entity_type + 's')
setattr(ns, pure_entity_type + '_name', ident.name)
setattr(ns, 'vault_base_url', ident.vault)
if hasattr(ns, pure_entity_type + '_version'):
setattr(ns, pure_entity_type + '_version', ident.version)
elif not (name and vault):
raise CLIError('incorrect usage: --id ID | --vault-name VAULT --name NAME [--version VERSION]')
return _validate
def validate_sas_definition_id(ns):
from azure.keyvault import StorageSasDefinitionId
acct_name = getattr(ns, 'storage_account_name', None)
sas_name = getattr(ns, 'sas_definition_name', None)
vault = getattr(ns, 'vault_base_url', None)
identifier = getattr(ns, 'identifier', None)
if identifier:
ident = StorageSasDefinitionId(uri=identifier)
setattr(ns, 'sas_definition_name', getattr(ident, 'sas_definition'))
setattr(ns, 'storage_account_name', getattr(ident, 'account_name'))
setattr(ns, 'vault_base_url', ident.vault)
elif not (acct_name and sas_name and vault):
raise CLIError('incorrect usage: --id ID | --vault-name VAULT --account-name --name NAME')
def validate_storage_account_id(ns):
from azure.keyvault import StorageAccountId
acct_name = getattr(ns, 'storage_account_name', None)
vault = getattr(ns, 'vault_base_url', None)
identifier = getattr(ns, 'identifier', None)
if identifier:
ident = StorageAccountId(uri=identifier)
setattr(ns, 'storage_account_name', ident.name)
setattr(ns, 'vault_base_url', ident.vault)
elif not (acct_name and vault):
raise CLIError('incorrect usage: --id ID | --vault-name VAULT --name NAME')
def validate_storage_disabled_attribute(attr_arg_name, attr_type):
def _validate(ns):
disabled = getattr(ns, 'disabled', None)
attr_arg = attr_type(enabled=(not disabled))
setattr(ns, attr_arg_name, attr_arg)
return _validate