forked from aws/aws-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patherrorhandler.py
More file actions
355 lines (283 loc) · 11.6 KB
/
errorhandler.py
File metadata and controls
355 lines (283 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import argparse
import logging
import signal
from botocore.exceptions import (
ClientError,
NoCredentialsError,
NoRegionError,
)
from botocore.exceptions import (
ParamValidationError as BotocoreParamValidationError,
)
from awscli.argparser import USAGE, ArgParseException
from awscli.argprocess import ParamError, ParamSyntaxError
from awscli.arguments import UnknownArgumentError
from awscli.autoprompt.factory import PrompterKeyboardInterrupt
from awscli.constants import (
CLIENT_ERROR_RC,
CONFIGURATION_ERROR_RC,
GENERAL_ERROR_RC,
PARAM_VALIDATION_ERROR_RC,
)
from awscli.customizations.exceptions import (
ConfigurationError,
ParamValidationError,
)
from awscli.formatter import get_formatter
from awscli.utils import PagerInitializationException
LOG = logging.getLogger(__name__)
VALID_ERROR_FORMATS = ['legacy', 'json', 'yaml', 'text', 'table', 'enhanced']
# Maximum number of items to display inline for collections
MAX_INLINE_ITEMS = 5
class EnhancedErrorFormatter:
def format_error(self, error_info, formatted_message, stream):
stream.write(f'{formatted_message}\n')
additional_fields = self._get_additional_fields(error_info)
if not additional_fields:
return
stream.write('\nAdditional error details:\n')
for key, value in additional_fields.items():
if self._is_simple_value(value):
stream.write(f'{key}: {value}\n')
elif self._is_small_collection(value):
stream.write(f'{key}: {self._format_inline(value)}\n')
else:
stream.write(
f'{key}: <complex value>\n'
f'(Use --cli-error-format with json or yaml '
f'to see full details)\n'
)
def _is_simple_value(self, value):
return isinstance(value, (str, int, float, bool, type(None)))
def _is_small_collection(self, value):
if isinstance(value, list):
return len(value) < MAX_INLINE_ITEMS and all(
self._is_simple_value(item) for item in value
)
elif isinstance(value, dict):
return len(value) < MAX_INLINE_ITEMS and all(
self._is_simple_value(v) for v in value.values()
)
return False
def _format_inline(self, value):
if isinstance(value, list):
return f"[{', '.join(str(item) for item in value)}]"
elif isinstance(value, dict):
items = ', '.join(f'{k}: {v}' for k, v in value.items())
return f'{{{items}}}'
return str(value)
def _get_additional_fields(self, error_info):
standard_keys = {'Code', 'Message'}
return {k: v for k, v in error_info.items() if k not in standard_keys}
def construct_entry_point_handlers_chain():
handlers = [
ParamValidationErrorsHandler(),
PrompterInterruptExceptionHandler(),
InterruptExceptionHandler(),
GeneralExceptionHandler(),
]
return ChainedExceptionHandler(exception_handlers=handlers)
def construct_cli_error_handlers_chain(session=None):
handlers = [
ParamValidationErrorsHandler(),
UnknownArgumentErrorHandler(),
ConfigurationErrorHandler(),
NoRegionErrorHandler(),
NoCredentialsErrorHandler(),
PagerErrorHandler(),
InterruptExceptionHandler(),
ClientErrorHandler(session),
GeneralExceptionHandler(),
]
return ChainedExceptionHandler(exception_handlers=handlers)
class BaseExceptionHandler:
def handle_exception(self, exception, stdout, stderr):
raise NotImplementedError('handle_exception')
class FilteredExceptionHandler(BaseExceptionHandler):
EXCEPTIONS_TO_HANDLE = ()
MESSAGE = '%s'
def handle_exception(self, exception, stdout, stderr, **kwargs):
if isinstance(exception, self.EXCEPTIONS_TO_HANDLE):
return_val = self._do_handle_exception(
exception, stdout, stderr, **kwargs
)
if return_val is not None:
return return_val
def _do_handle_exception(self, exception, stdout, stderr, **kwargs):
stderr.write("\n")
stderr.write(self.MESSAGE % exception)
stderr.write("\n")
return self.RC
class ParamValidationErrorsHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = (
ParamError,
ParamSyntaxError,
ArgParseException,
ParamValidationError,
BotocoreParamValidationError,
)
RC = PARAM_VALIDATION_ERROR_RC
class SilenceParamValidationMsgErrorHandler(ParamValidationErrorsHandler):
def _do_handle_exception(self, exception, stdout, stderr):
return self.RC
class ClientErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = ClientError
RC = CLIENT_ERROR_RC
def __init__(self, session=None):
self._session = session
self._enhanced_formatter = None
def _do_handle_exception(self, exception, stdout, stderr, **kwargs):
parsed_globals = kwargs.get('parsed_globals')
displayed_structured = False
if self._session:
displayed_structured = self._try_display_structured_error(
exception, stderr, parsed_globals
)
if not displayed_structured:
return super()._do_handle_exception(
exception, stdout, stderr, **kwargs
)
return self.RC
def _resolve_error_format(self, parsed_globals):
if parsed_globals:
error_format = getattr(parsed_globals, 'cli_error_format', None)
if error_format:
return error_format.lower()
try:
error_format = self._session.get_config_variable(
'cli_error_format'
)
if error_format:
return error_format.lower()
except (KeyError, AttributeError) as e:
LOG.debug(
'Failed to get cli_error_format from config: %s', e
)
return 'enhanced'
def _try_display_structured_error(
self, exception, stderr, parsed_globals=None
):
try:
error_response = self._extract_error_response(exception)
if not error_response or 'Error' not in error_response:
return False
error_info = error_response['Error']
error_format = self._resolve_error_format(parsed_globals)
if error_format not in VALID_ERROR_FORMATS:
LOG.warning(
f"Invalid cli_error_format: '{error_format}'. "
f"Using 'enhanced' format."
)
error_format = 'enhanced'
if error_format == 'legacy':
return False
formatted_message = str(exception)
if error_format == 'enhanced':
if self._enhanced_formatter is None:
self._enhanced_formatter = EnhancedErrorFormatter()
self._enhanced_formatter.format_error(
error_info, formatted_message, stderr
)
return True
temp_parsed_globals = argparse.Namespace()
temp_parsed_globals.output = error_format
temp_parsed_globals.query = None
temp_parsed_globals.color = (
getattr(parsed_globals, 'color', 'auto')
if parsed_globals
else 'auto'
)
formatter = get_formatter(error_format, temp_parsed_globals)
formatter('error', error_info, stderr)
return True
except Exception as e:
LOG.debug(
'Failed to display structured error: %s', e, exc_info=True
)
return False
@staticmethod
def _extract_error_response(exception):
if not isinstance(exception, ClientError):
return None
if hasattr(exception, 'response') and 'Error' in exception.response:
error_dict = dict(exception.response['Error'])
# AWS services return modeled error fields
# at the top level of the error response,
# not nested under an Error key. Botocore preserves this structure.
# Include these fields to provide complete error information.
# Exclude response metadata and avoid duplicates.
excluded_keys = {'Error', 'ResponseMetadata', 'Code', 'Message'}
for key, value in exception.response.items():
if key not in excluded_keys and key not in error_dict:
error_dict[key] = value
return {'Error': error_dict}
return None
class ConfigurationErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = ConfigurationError
RC = CONFIGURATION_ERROR_RC
class NoRegionErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = NoRegionError
RC = CONFIGURATION_ERROR_RC
MESSAGE = (
'%s You can also configure your region by running "aws configure".'
)
class NoCredentialsErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = NoCredentialsError
RC = CONFIGURATION_ERROR_RC
MESSAGE = '%s. You can configure credentials by running "aws login".'
class PagerErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = PagerInitializationException
RC = CONFIGURATION_ERROR_RC
MESSAGE = (
'Unable to redirect output to pager. Received the '
'following error when opening pager:\n%s\n\n'
'Learn more about configuring the output pager by running '
'"aws help config-vars".'
)
class UnknownArgumentErrorHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = UnknownArgumentError
RC = PARAM_VALIDATION_ERROR_RC
def _do_handle_exception(self, exception, stdout, stderr):
stderr.write("\n")
stderr.write(f'usage: {USAGE}\n{exception}\n')
stderr.write("\n")
return self.RC
class InterruptExceptionHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = KeyboardInterrupt
RC = 128 + signal.SIGINT
def _do_handle_exception(self, exception, stdout, stderr):
stdout.write("\n")
return self.RC
class PrompterInterruptExceptionHandler(InterruptExceptionHandler):
EXCEPTIONS_TO_HANDLE = PrompterKeyboardInterrupt
def _do_handle_exception(self, exception, stdout, stderr):
stderr.write(f'{exception}')
stderr.write("\n")
return self.RC
class GeneralExceptionHandler(FilteredExceptionHandler):
EXCEPTIONS_TO_HANDLE = Exception
RC = GENERAL_ERROR_RC
class ChainedExceptionHandler(BaseExceptionHandler):
def __init__(self, exception_handlers):
self._exception_handlers = exception_handlers
def inject_handler(self, position, handler):
self._exception_handlers.insert(position, handler)
def handle_exception(self, exception, stdout, stderr, **kwargs):
for handler in self._exception_handlers:
return_value = handler.handle_exception(
exception, stdout, stderr, **kwargs
)
if return_value is not None:
return return_value