|
9 | 9 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
10 | 10 | # License for the specific language governing permissions and limitations |
11 | 11 | # under the License. |
| 12 | +import io |
12 | 13 | import os |
13 | 14 | from urllib import parse |
14 | 15 | from urllib.parse import urlsplit |
@@ -383,21 +384,28 @@ def create_object(self, container, name, filename=None, data=None, |
383 | 384 | endpoint_override=endpoint, |
384 | 385 | requests_auth=self._get_req_auth(endpoint), |
385 | 386 | **headers) |
386 | | - |
387 | | - if data is not None: |
388 | | - self.log.debug( |
389 | | - "uploading data to %(endpoint)s", |
390 | | - {'endpoint': endpoint}) |
391 | | - return self._create( |
392 | | - _obj.Object, container=container, |
393 | | - name=name, data=data, |
394 | | - endpoint_override=endpoint, |
395 | | - requests_auth=self._get_req_auth(endpoint), |
396 | | - **headers) |
397 | | - |
398 | 387 | if segment_size: |
399 | 388 | segment_size = int(segment_size) |
400 | 389 | segment_size = self.get_object_segment_size(segment_size) |
| 390 | + |
| 391 | + if data is not None: |
| 392 | + data_size = self._try_get_size(data) |
| 393 | + |
| 394 | + if data_size is not None and data_size > segment_size: |
| 395 | + return self._upload_large_data( |
| 396 | + endpoint, data, name, headers, segment_size |
| 397 | + ) |
| 398 | + else: |
| 399 | + self.log.debug( |
| 400 | + "uploading data to %(endpoint)s", |
| 401 | + {'endpoint': endpoint}) |
| 402 | + return self._create( |
| 403 | + _obj.Object, container=container, |
| 404 | + name=name, data=data, |
| 405 | + endpoint_override=endpoint, |
| 406 | + requests_auth=self._get_req_auth(endpoint), |
| 407 | + **headers) |
| 408 | + |
401 | 409 | file_size = os.path.getsize(filename) |
402 | 410 |
|
403 | 411 | if generate_checksums and md5 is None: |
@@ -531,6 +539,80 @@ def _finish_large_object_upload(self, endpoint, headers, upload_id): |
531 | 539 | if retries == 0: |
532 | 540 | raise |
533 | 541 |
|
| 542 | + def _try_get_size(self, data): |
| 543 | + """Try to get the size of a data object if possible. |
| 544 | +
|
| 545 | + :param data: The data object passed to create_object. |
| 546 | + :returns: The size of the data if it can be determined, else None. |
| 547 | + """ |
| 548 | + if hasattr(data, 'fileno'): |
| 549 | + try: |
| 550 | + fileno = data.fileno() |
| 551 | + except io.UnsupportedOperation: |
| 552 | + return None |
| 553 | + try: |
| 554 | + st = os.fstat(fileno) |
| 555 | + return st.st_size |
| 556 | + except Exception: |
| 557 | + self.log.debug( |
| 558 | + "Cannot determine size of data with fileno %s", |
| 559 | + fileno, exc_info=True) |
| 560 | + return None |
| 561 | + if hasattr(data, 'len'): |
| 562 | + return data.len |
| 563 | + if hasattr(data, '__len__'): |
| 564 | + return len(data) |
| 565 | + try: |
| 566 | + pos = data.tell() |
| 567 | + data.seek(0, os.SEEK_END) |
| 568 | + size = data.tell() |
| 569 | + data.seek(pos, os.SEEK_SET) |
| 570 | + return size |
| 571 | + except Exception: |
| 572 | + pass |
| 573 | + return None |
| 574 | + |
| 575 | + def _upload_large_data(self, endpoint, data, name, headers, segment_size): |
| 576 | + """ |
| 577 | + If the object is big, we need to break it up into segments that |
| 578 | + are no larger than segment_size, upload each of them individually |
| 579 | + and then upload a manifest object. The segments can be uploaded in |
| 580 | + parallel, so we'll use the async feature of the TaskManager. |
| 581 | + """ |
| 582 | + upload_id = _obj.Object.initiate_multipart_upload( |
| 583 | + self, endpoint, name, |
| 584 | + requests_auth=self._get_req_auth(endpoint) |
| 585 | + ) |
| 586 | + url = f'{endpoint}/{name}' |
| 587 | + part_number = 1 |
| 588 | + |
| 589 | + try: |
| 590 | + while True: |
| 591 | + segment = data.read(segment_size) |
| 592 | + if not segment: |
| 593 | + break |
| 594 | + result = self.put( |
| 595 | + f'{url}?partNumber={part_number}&uploadId={upload_id}', |
| 596 | + headers=headers, data=segment, |
| 597 | + requests_auth=self._get_req_auth(endpoint) |
| 598 | + ) |
| 599 | + result.raise_for_status() |
| 600 | + part_number += 1 |
| 601 | + |
| 602 | + return self._finish_large_object_upload( |
| 603 | + url, headers, upload_id) |
| 604 | + except Exception: |
| 605 | + try: |
| 606 | + self.log.debug( |
| 607 | + "Failed to upload large data. Aborting %s", upload_id |
| 608 | + ) |
| 609 | + self._abort_multipart_upload(endpoint=url, upload_id=upload_id) |
| 610 | + except Exception: |
| 611 | + self.log.exception( |
| 612 | + "Failed to cleanup multipart upload %s:", upload_id |
| 613 | + ) |
| 614 | + raise |
| 615 | + |
534 | 616 | def _object_name_from_url(self, url): |
535 | 617 | '''Get container_name/object_name from the full URL called. |
536 | 618 | Remove the Swift endpoint from the front of the URL, and remove |
|
0 commit comments