|
2 | 2 |
|
3 | 3 | import concurrent.futures |
4 | 4 | import dataclasses |
| 5 | +import datetime |
| 6 | +import email.utils |
5 | 7 | import hashlib |
6 | 8 | import io |
7 | 9 | import json |
@@ -946,27 +948,33 @@ def _handle_upload_exception( |
946 | 948 | begin_offset = progress.get("begin_offset") |
947 | 949 | offset = progress.get("offset") |
948 | 950 |
|
949 | | - if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): |
950 | | - self.emitter.emit("upload_retrying", progress) |
| 951 | + LOG.warning( |
| 952 | + f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" |
| 953 | + ) |
951 | 954 |
|
952 | | - LOG.warning( |
953 | | - f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" |
954 | | - ) |
| 955 | + if retries <= constants.MAX_UPLOAD_RETRIES: |
| 956 | + retriable, retry_after_sec = _is_retriable_exception(ex) |
| 957 | + if retriable: |
| 958 | + self.emitter.emit("upload_retrying", progress) |
955 | 959 |
|
956 | | - # Keep things immutable here. Will increment retries in the caller |
957 | | - retries += 1 |
958 | | - if _is_immediate_retriable_exception(ex): |
959 | | - sleep_for = 0 |
960 | | - else: |
961 | | - sleep_for = min(2**retries, 16) |
962 | | - LOG.info( |
963 | | - f"Retrying in {sleep_for} seconds ({retries}/{constants.MAX_UPLOAD_RETRIES})" |
964 | | - ) |
965 | | - if sleep_for: |
966 | | - time.sleep(sleep_for) |
967 | | - else: |
968 | | - self.emitter.emit("upload_failed", progress) |
969 | | - raise ex |
| 960 | + # Keep things immutable here. Will increment retries in the caller |
| 961 | + retries += 1 |
| 962 | + if _is_immediate_retriable_exception(ex): |
| 963 | + sleep_for = 0 |
| 964 | + else: |
| 965 | + sleep_for = min(2**retries, 16) |
| 966 | + sleep_for += retry_after_sec |
| 967 | + |
| 968 | + LOG.info( |
| 969 | + f"Retrying in {sleep_for} seconds ({retries}/{constants.MAX_UPLOAD_RETRIES})" |
| 970 | + ) |
| 971 | + if sleep_for: |
| 972 | + time.sleep(sleep_for) |
| 973 | + |
| 974 | + return |
| 975 | + |
| 976 | + self.emitter.emit("upload_failed", progress) |
| 977 | + raise ex |
970 | 978 |
|
971 | 979 | @classmethod |
972 | 980 | def _upload_name(cls, progress: UploaderProgress): |
@@ -1083,23 +1091,188 @@ def _is_immediate_retriable_exception(ex: BaseException) -> bool: |
1083 | 1091 | return False |
1084 | 1092 |
|
1085 | 1093 |
|
1086 | | -def _is_retriable_exception(ex: BaseException) -> bool: |
| 1094 | +def _is_retriable_exception(ex: BaseException) -> tuple[bool, int]: |
| 1095 | + """ |
| 1096 | + Determine if an exception should be retried and how long to wait. |
| 1097 | +
|
| 1098 | + Args: |
| 1099 | + ex: Exception to check for retryability |
| 1100 | +
|
| 1101 | + Returns: |
| 1102 | + Tuple of (retriable, retry_after_sec) where: |
| 1103 | + - retriable: True if the exception should be retried |
| 1104 | + - retry_after_sec: Seconds to wait before retry (>= 0) |
| 1105 | +
|
| 1106 | + Examples: |
| 1107 | + >>> resp = requests.Response() |
| 1108 | + >>> resp._content = b"foo" |
| 1109 | + >>> resp.status_code = 400 |
| 1110 | + >>> ex = requests.HTTPError("error", response=resp) |
| 1111 | + >>> _is_retriable_exception(ex) |
| 1112 | + (False, 0) |
| 1113 | + >>> resp._content = b'{"backoff": 13000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' |
| 1114 | + >>> resp.status_code = 400 |
| 1115 | + >>> ex = requests.HTTPError("error", response=resp) |
| 1116 | + >>> _is_retriable_exception(ex) |
| 1117 | + (True, 13) |
| 1118 | + >>> resp._content = b'{"backoff": "foo", "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' |
| 1119 | + >>> resp.status_code = 400 |
| 1120 | + >>> ex = requests.HTTPError("error", response=resp) |
| 1121 | + >>> _is_retriable_exception(ex) |
| 1122 | + (True, 10) |
| 1123 | + >>> resp._content = b'{"debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' |
| 1124 | + >>> resp.status_code = 400 |
| 1125 | + >>> ex = requests.HTTPError("error", response=resp) |
| 1126 | + >>> _is_retriable_exception(ex) |
| 1127 | + (True, 10) |
| 1128 | + >>> resp._content = b"foo" |
| 1129 | + >>> resp.status_code = 429 |
| 1130 | + >>> ex = requests.HTTPError("error", response=resp) |
| 1131 | + >>> _is_retriable_exception(ex) |
| 1132 | + (True, 10) |
| 1133 | + >>> resp._content = b"foo" |
| 1134 | + >>> resp.status_code = 429 |
| 1135 | + >>> ex = requests.HTTPError("error", response=resp) |
| 1136 | + >>> _is_retriable_exception(ex) |
| 1137 | + (True, 10) |
| 1138 | + >>> resp._content = b'{"backoff": 12000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' |
| 1139 | + >>> resp.status_code = 429 |
| 1140 | + >>> ex = requests.HTTPError("error", response=resp) |
| 1141 | + >>> _is_retriable_exception(ex) |
| 1142 | + (True, 12) |
| 1143 | + >>> resp._content = b'{"backoff": 12000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}}' |
| 1144 | + >>> resp.headers = {"Retry-After": "1"} |
| 1145 | + >>> resp.status_code = 503 |
| 1146 | + >>> ex = requests.HTTPError("error", response=resp) |
| 1147 | + >>> _is_retriable_exception(ex) |
| 1148 | + (True, 1) |
| 1149 | + """ |
| 1150 | + |
| 1151 | + DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC = 10 |
| 1152 | + |
1087 | 1153 | if isinstance(ex, (requests.ConnectionError, requests.Timeout)): |
1088 | | - return True |
| 1154 | + return True, 0 |
1089 | 1155 |
|
1090 | 1156 | if isinstance(ex, requests.HTTPError) and isinstance( |
1091 | 1157 | ex.response, requests.Response |
1092 | 1158 | ): |
1093 | | - if 400 <= ex.response.status_code < 500: |
| 1159 | + status_code = ex.response.status_code |
| 1160 | + |
| 1161 | + # Always retry with some delay |
| 1162 | + if status_code == 429: |
| 1163 | + retry_after_sec = ( |
| 1164 | + _parse_retry_after_from_header(ex.response) |
| 1165 | + or DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC |
| 1166 | + ) |
| 1167 | + |
1094 | 1168 | try: |
1095 | | - resp = ex.response.json() |
1096 | | - except json.JSONDecodeError: |
1097 | | - return False |
1098 | | - return resp.get("debug_info", {}).get("retriable", False) |
1099 | | - else: |
1100 | | - return True |
| 1169 | + data = ex.response.json() |
| 1170 | + except requests.JSONDecodeError: |
| 1171 | + return True, retry_after_sec |
1101 | 1172 |
|
1102 | | - return False |
| 1173 | + backoff_ms = _parse_backoff(data.get("backoff")) |
| 1174 | + if backoff_ms is None: |
| 1175 | + return True, retry_after_sec |
| 1176 | + else: |
| 1177 | + return True, max(0, int(int(backoff_ms) / 1000)) |
| 1178 | + |
| 1179 | + if 400 <= status_code < 500: |
| 1180 | + try: |
| 1181 | + data = ex.response.json() |
| 1182 | + except requests.JSONDecodeError: |
| 1183 | + return False, (_parse_retry_after_from_header(ex.response) or 0) |
| 1184 | + |
| 1185 | + debug_info = data.get("debug_info", {}) |
| 1186 | + |
| 1187 | + if isinstance(debug_info, dict): |
| 1188 | + error_type = debug_info.get("type") |
| 1189 | + else: |
| 1190 | + error_type = None |
| 1191 | + |
| 1192 | + # The server may respond 429 RequestRateLimitedError but with retryable=False |
| 1193 | + # We should retry for this case regardless |
| 1194 | + # e.g. HTTP 429 {"backoff": 10000, "debug_info": {"retriable": false, "type": "RequestRateLimitedError", "message": "Request rate limit has been exceeded"}} |
| 1195 | + if error_type == "RequestRateLimitedError": |
| 1196 | + backoff_ms = _parse_backoff(data.get("backoff")) |
| 1197 | + if backoff_ms is None: |
| 1198 | + return True, ( |
| 1199 | + _parse_retry_after_from_header(ex.response) |
| 1200 | + or DEFAULT_RETRY_AFTER_RATE_LIMIT_SEC |
| 1201 | + ) |
| 1202 | + else: |
| 1203 | + return True, max(0, int(int(backoff_ms) / 1000)) |
| 1204 | + |
| 1205 | + return debug_info.get("retriable", False), 0 |
| 1206 | + |
| 1207 | + if 500 <= status_code < 600: |
| 1208 | + return True, (_parse_retry_after_from_header(ex.response) or 0) |
| 1209 | + |
| 1210 | + return False, 0 |
| 1211 | + |
| 1212 | + |
| 1213 | +def _parse_backoff(backoff: T.Any) -> int | None: |
| 1214 | + if backoff is not None: |
| 1215 | + try: |
| 1216 | + backoff_ms = int(backoff) |
| 1217 | + except (ValueError, TypeError): |
| 1218 | + backoff_ms = None |
| 1219 | + else: |
| 1220 | + backoff_ms = None |
| 1221 | + return backoff_ms |
| 1222 | + |
| 1223 | + |
| 1224 | +def _parse_retry_after_from_header(resp: requests.Response) -> int | None: |
| 1225 | + """ |
| 1226 | + Parse Retry-After header from HTTP response. |
| 1227 | + See See https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Retry-After |
| 1228 | +
|
| 1229 | + Args: |
| 1230 | + resp: HTTP response object with headers |
| 1231 | +
|
| 1232 | + Returns: |
| 1233 | + Number of seconds to wait (>= 0) or None if header missing/invalid. |
| 1234 | +
|
| 1235 | + Examples: |
| 1236 | + >>> resp = requests.Response() |
| 1237 | + >>> resp.headers = {"Retry-After": "1"} |
| 1238 | + >>> _parse_retry_after_from_header(resp) |
| 1239 | + 1 |
| 1240 | + >>> resp.headers = {"Retry-After": "-1"} |
| 1241 | + >>> _parse_retry_after_from_header(resp) |
| 1242 | + 0 |
| 1243 | + >>> resp.headers = {"Retry-After": "Wed, 21 Oct 2015 07:28:00 GMT"} |
| 1244 | + >>> _parse_retry_after_from_header(resp) |
| 1245 | + 0 |
| 1246 | + >>> resp.headers = {"Retry-After": "Wed, 21 Oct 2315 07:28:00"} |
| 1247 | + >>> _parse_retry_after_from_header(resp) |
| 1248 | + """ |
| 1249 | + |
| 1250 | + value = resp.headers.get("Retry-After") |
| 1251 | + if value is None: |
| 1252 | + return None |
| 1253 | + |
| 1254 | + try: |
| 1255 | + return max(0, int(value)) |
| 1256 | + except (ValueError, TypeError): |
| 1257 | + pass |
| 1258 | + |
| 1259 | + # e.g. "Wed, 21 Oct 2015 07:28:00 GMT" |
| 1260 | + try: |
| 1261 | + dt = email.utils.parsedate_to_datetime(value) |
| 1262 | + except (ValueError, TypeError): |
| 1263 | + dt = None |
| 1264 | + |
| 1265 | + if dt is None: |
| 1266 | + LOG.warning(f"Error parsing Retry-After: {value}") |
| 1267 | + return None |
| 1268 | + |
| 1269 | + try: |
| 1270 | + delta = dt - datetime.datetime.now(datetime.timezone.utc) |
| 1271 | + except (TypeError, ValueError): |
| 1272 | + # e.g. TypeError: can't subtract offset-naive and offset-aware datetimes |
| 1273 | + return None |
| 1274 | + |
| 1275 | + return max(0, int(delta.total_seconds())) |
1103 | 1276 |
|
1104 | 1277 |
|
1105 | 1278 | _SUFFIX_MAP: dict[api_v4.ClusterFileType | types.FileType, str] = { |
|
0 commit comments