forked from Azure/azure-cli-extensions
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_stream_utils.py
More file actions
221 lines (185 loc) · 7.93 KB
/
_stream_utils.py
File metadata and controls
221 lines (185 loc) · 7.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# pylint: disable=wrong-import-order
import time
import colorama # pylint: disable=import-error
from io import BytesIO
from random import uniform
from knack.util import CLIError
from knack.log import get_logger
from azure.core.exceptions import HttpResponseError
from azure.cli.core.profiles import ResourceType, get_sdk
from azure.common import AzureHttpError
logger = get_logger(__name__)
DEFAULT_CHUNK_SIZE = 1024 * 4
DEFAULT_LOG_TIMEOUT_IN_SEC = 60 * 30 # 30 minutes
def stream_logs(cmd,
client,
resource_group,
service,
app,
deployment,
no_format=False,
raise_error_on_failure=True,
logger_level_func=logger.warning):
log_file_sas = None
error_msg = "Could not get logs for Service: {}".format(service)
try:
log_file_sas = client.get_log_file_url(
resource_group_name=resource_group,
service_name=service,
app_name=app,
deployment_name=deployment).url
except (AttributeError, HttpResponseError) as e:
logger.warning("%s Exception: %s", error_msg, e)
raise CLIError(error_msg)
if not log_file_sas:
logger.warning("%s Empty SAS URL.", error_msg)
raise CLIError(error_msg)
BlobClient = get_sdk(cmd.cli_ctx, ResourceType.DATA_STORAGE_BLOB, '_blob_client#BlobClient')
blob_client = BlobClient.from_blob_url(log_file_sas)
_stream_logs(no_format,
DEFAULT_CHUNK_SIZE,
DEFAULT_LOG_TIMEOUT_IN_SEC,
blob_client,
raise_error_on_failure,
logger_level_func)
def _stream_logs(no_format, # pylint: disable=too-many-locals, too-many-statements, too-many-branches
byte_size,
timeout_in_seconds,
blob_service,
raise_error_on_failure,
logger_level_func):
if not no_format:
colorama.init()
stream = BytesIO()
metadata = {}
start = 0
available = 0
sleep_time = 1
max_sleep_time = 15
num_fails = 0
num_fails_for_backoff = 3
consecutive_sleep_in_sec = 0
blob_exists = False
def safe_get_blob_properties():
'''
In recent storage SDK, the get_blob_properties will output error logs on BlobNotFound (and also raise
AzureHttpError(404)). There is no way to suppress the error logging from the callsite.
However, in our scenario, such BlobNotFound error is expected before the build actually kicks off.
To get rid of the error logging, we only call the get_blob_properties after the blob is created.
'''
nonlocal blob_exists
if not blob_exists:
blob_exists = blob_service.exists()
if blob_exists:
return blob_service.get_blob_properties()
return None
# Try to get the initial properties so there's no waiting.
# If the storage call fails, we'll just sleep and try again after.
try:
props = safe_get_blob_properties()
if props:
metadata = props.metadata
available = props.size
except (AttributeError, AzureHttpError):
pass
while (_blob_is_not_complete(metadata) or start < available):
while start < available:
# Success! Reset our polling backoff.
sleep_time = 1
num_fails = 0
consecutive_sleep_in_sec = 0
try:
old_byte_size = len(stream.getvalue())
downloader = blob_service.download_blob(offset=start, length=byte_size, max_concurrency=1)
downloader.readinto(stream)
curr_bytes = stream.getvalue()
new_byte_size = len(curr_bytes)
amount_read = new_byte_size - old_byte_size
start += amount_read
# Only scan what's newly read. If nothing is read, default to 0.
min_scan_range = max(new_byte_size - amount_read - 1, 0)
for i in range(new_byte_size - 1, min_scan_range, -1):
if curr_bytes[i - 1:i + 1] == b'\r\n':
flush = curr_bytes[:i] # won't logger.warning \n
stream = BytesIO()
stream.write(curr_bytes[i + 1:])
logger_level_func(flush.decode('utf-8', errors='ignore'))
break
if curr_bytes[i:i + 1] == b'\n':
flush = curr_bytes[:i + 1] # won't logger.warning \n
stream = BytesIO()
stream.write(curr_bytes[i + 1:])
logger_level_func(flush.decode('utf-8', errors='ignore'))
break
except AzureHttpError as ae:
if ae.status_code != 404:
raise CLIError(ae)
except KeyboardInterrupt:
curr_bytes = stream.getvalue()
if curr_bytes:
logger_level_func(curr_bytes.decode('utf-8', errors='ignore'))
return
try:
props = safe_get_blob_properties()
if props:
metadata = props.metadata
available = props.size
except AzureHttpError as ae:
if ae.status_code != 404:
raise CLIError(ae)
except KeyboardInterrupt:
if curr_bytes:
logger_level_func(curr_bytes.decode('utf-8', errors='ignore'))
return
except Exception as err:
raise CLIError(err)
if consecutive_sleep_in_sec > timeout_in_seconds:
# Flush anything remaining in the buffer - this would be the case
# if the file has expired and we weren't able to detect any \r\n
curr_bytes = stream.getvalue()
if curr_bytes:
logger_level_func(curr_bytes.decode('utf-8', errors='ignore'))
return
# If no new data available but not complete, sleep before trying to process additional data.
if (_blob_is_not_complete(metadata) and start >= available):
num_fails += 1
if num_fails >= num_fails_for_backoff:
num_fails = 0
sleep_time = min(sleep_time * 2, max_sleep_time)
rnd = uniform(1, 2) # 1.0 <= x < 2.0
total_sleep_time = sleep_time + rnd
consecutive_sleep_in_sec += total_sleep_time
time.sleep(total_sleep_time)
# One final check to see if there's anything in the buffer to flush
# E.g., metadata has been set and start == available, but the log file
# didn't end in \r\n, so we were unable to flush out the final contents.
curr_bytes = stream.getvalue()
if curr_bytes:
logger_level_func(curr_bytes.decode('utf-8', errors='ignore'))
build_status = _get_run_status(metadata).lower()
logger_level_func("Log status was: {}".format(build_status))
if raise_error_on_failure:
if build_status in ('internalerror', 'failed'):
raise CLIError("Run failed")
if build_status == 'timedout':
raise CLIError("Run timed out")
if build_status == 'canceled':
raise CLIError("Run was canceled")
def _blob_is_not_complete(metadata):
if not metadata:
return True
for key in metadata:
if key.lower() == '__complete_status':
return False
return True
def _get_run_status(metadata):
if metadata is None:
return 'inprogress'
for key in metadata:
if key.lower() == '__complete_status':
return metadata[key]
return 'inprogress'