Skip to content

Commit dd4bbb7

Browse files
msusaghaakonvt
andauthored
feat(files): Support automatic upload of files larger than 5GB (#2566)
Co-authored-by: Håkon V. Treider <haakonvt@gmail.com>
1 parent 02db7ca commit dd4bbb7

11 files changed

Lines changed: 550 additions & 96 deletions

File tree

cognite/client/_api/files.py

Lines changed: 132 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,25 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import copy
5+
import math
46
import warnings
57
from collections import defaultdict
68
from collections.abc import AsyncIterator, Sequence
79
from pathlib import Path
810
from typing import Any, BinaryIO, Literal, overload
911
from urllib.parse import urlparse
1012

13+
from typing_extensions import assert_never
14+
1115
from cognite.client._api_client import APIClient
12-
from cognite.client._constants import DEFAULT_LIMIT_READ
16+
from cognite.client._constants import (
17+
DEFAULT_LIMIT_READ,
18+
FILE_DEFAULT_MULTIPART_SIZE,
19+
FILE_MAX_MULTIPART_COUNT,
20+
FILE_MAX_MULTIPART_SIZE,
21+
FILE_MIN_MULTIPART_SIZE,
22+
)
1323
from cognite.client.data_classes import (
1424
FileMetadata,
1525
FileMetadataFilter,
@@ -28,14 +38,37 @@
2838
from cognite.client.utils._auxiliary import append_url_path, find_duplicates, unpack_items
2939
from cognite.client.utils._concurrency import AsyncSDKTask, execute_async_tasks
3040
from cognite.client.utils._identifier import Identifier, IdentifierSequence
31-
from cognite.client.utils._uploading import prepare_content_for_upload
41+
from cognite.client.utils._uploading import AsyncFileChunker, prepare_content_for_upload
3242
from cognite.client.utils._validation import process_asset_subtree_ids, process_data_set_ids
3343
from cognite.client.utils.useful_types import SequenceNotStr
3444

3545

3646
class FilesAPI(APIClient):
3747
_RESOURCE_PATH = "/files"
3848

49+
def _get_semaphore(
50+
self, operation: Literal["read", "write", "delete", "upload", "download", "open_files"]
51+
) -> asyncio.BoundedSemaphore:
52+
from cognite.client import global_config
53+
54+
project = self._cognite_client.config.project
55+
files = global_config.concurrency_settings.files
56+
match operation:
57+
case "read":
58+
return files._semaphore_factory("read", project)
59+
case "write":
60+
return files._semaphore_factory("write", project)
61+
case "upload":
62+
return files._semaphore_factory("upload", project)
63+
case "download":
64+
return files._semaphore_factory("download", project)
65+
case "delete":
66+
return files._semaphore_factory("delete", project)
67+
case "open_files":
68+
return files._semaphore_factory("open_files", "")
69+
case _:
70+
assert_never(operation)
71+
3972
@overload
4073
def __call__(
4174
self,
@@ -210,7 +243,7 @@ async def create(
210243
url_path=self._RESOURCE_PATH,
211244
json=file_metadata.dump(camel_case=True),
212245
params={"overwrite": overwrite},
213-
semaphore=self._get_semaphore("write"),
246+
semaphore=self._get_semaphore("upload"),
214247
)
215248
returned_file_metadata = res.json()
216249
upload_url = returned_file_metadata["uploadUrl"]
@@ -450,22 +483,33 @@ async def upload_content(
450483
external_id: str | None = None,
451484
instance_id: NodeId | None = None,
452485
) -> FileMetadata:
453-
"""`Upload a file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getUploadLink>`_.
486+
"""`Upload file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getMultiPartUploadLink>`_
487+
488+
Upload file content from a local file path to a file previously created (initiated) with only metadata.
489+
For files created with FilesAPI.create(), use `external_id`.
490+
For files created with data modeling API using CogniteFileApply, use `instance_id`.
491+
Supports upload of large files (>5 GB), using multipart upload.
454492
455493
Args:
456-
path (Path | str): Path to the file you wish to upload.
494+
path (Path | str): Local file path.
457495
external_id (str | None): The external ID provided by the client. Must be unique within the project.
458-
instance_id (NodeId | None): Instance ID of the file.
496+
instance_id (NodeId | None): Instance ID of the file (CogniteFile).
459497
Returns:
460498
FileMetadata: No description.
461499
"""
462500
path = Path(path)
463-
if path.is_file():
464-
with path.open("rb") as fh:
465-
return await self.upload_content_bytes(fh, external_id=external_id, instance_id=instance_id)
466-
elif path.is_dir():
501+
if path.is_dir():
467502
raise IsADirectoryError(path)
468-
raise FileNotFoundError(path)
503+
if not path.is_file():
504+
raise FileNotFoundError(path)
505+
506+
file_size = path.stat().st_size
507+
part_size, num_parts = self.calculate_part_size_and_count(file_size)
508+
session = await self.multipart_upload_content_session(
509+
parts=num_parts, external_id=external_id, instance_id=instance_id
510+
)
511+
await self._run_multipart_upload(session, path, part_size, file_size, num_parts)
512+
return session.file_metadata
469513

470514
async def upload(
471515
self,
@@ -486,7 +530,14 @@ async def upload(
486530
recursive: bool = False,
487531
overwrite: bool = False,
488532
) -> FileMetadata | FileMetadataList:
489-
"""`Upload a file <https://api-docs.cognite.com/20230101/tag/Files/operation/initFileUpload>`_.
533+
"""`Upload a file or directory <https://api-docs.cognite.com/20230101/tag/Files/operation/initMultiPartUpload>`_
534+
535+
Creates files in files API with metadata and uploads file content.
536+
537+
Note:
538+
If path is a directory, this method will upload all files in that directory. Use `recursive=True` for subdirectories as well.
539+
540+
Supports upload of large files (>5 GB), using multipart upload.
490541
491542
Args:
492543
path (Path | str): Path to the file you wish to upload. If path is a directory, this method will upload all files in that directory.
@@ -566,6 +617,7 @@ async def upload(
566617
source_modified_time=source_modified_time,
567618
security_categories=security_categories,
568619
)
620+
569621
path = Path(path)
570622
if path.is_file():
571623
if not name:
@@ -574,7 +626,6 @@ async def upload(
574626

575627
elif not path.is_dir():
576628
raise FileNotFoundError(path)
577-
578629
tasks: list[AsyncSDKTask] = []
579630
file_iter = path.rglob("*") if recursive else path.iterdir()
580631
for file in file_iter:
@@ -590,8 +641,61 @@ async def upload(
590641
async def _upload_file_from_path(
591642
self, file_metadata: FileMetadataWrite, path: Path, overwrite: bool
592643
) -> FileMetadata:
593-
with path.open("rb") as fh:
594-
return await self.upload_bytes(fh, overwrite=overwrite, **file_metadata.dump(camel_case=False))
644+
file_size = path.stat().st_size
645+
part_size, num_parts = self.calculate_part_size_and_count(file_size)
646+
session = await self.multipart_upload_session(
647+
parts=num_parts,
648+
overwrite=overwrite,
649+
**file_metadata.dump(camel_case=False),
650+
)
651+
await self._run_multipart_upload(session, path, part_size, file_size, num_parts)
652+
return session.file_metadata
653+
654+
async def _run_multipart_upload(
655+
self,
656+
session: FileMultipartUploadSession,
657+
path: Path,
658+
part_size: int,
659+
file_size: int,
660+
num_parts: int,
661+
) -> None:
662+
# Use a semaphore to limit the number of open files at the same time,
663+
# since each multipart upload will open the file for the duration of the upload,
664+
# and having too many open files can cause issues on some operating systems.
665+
open_files_semaphore = self._get_semaphore("open_files")
666+
667+
async def upload_part(part_no: int) -> None:
668+
async with open_files_semaphore:
669+
offset = part_no * part_size
670+
read_size = min(part_size, file_size - offset)
671+
with path.open("rb") as fh:
672+
await session.upload_part_async(part_no, AsyncFileChunker(fh, offset=offset, size=read_size))
673+
674+
async with session:
675+
await asyncio.gather(*(upload_part(i) for i in range(num_parts)))
676+
677+
def calculate_part_size_and_count(self, file_size: int) -> tuple[int, int]:
678+
"""Calculate part size and count for a multipart upload, for a given file size.
679+
680+
See <https://api-docs.cognite.com/20230101/tag/Files/operation/initMultiPartUpload>
681+
for more details on multipart upload and the constraints on part size and count.
682+
683+
Args:
684+
file_size (int): The total file size in bytes.
685+
686+
Returns:
687+
tuple[int, int]: A tuple of (part_size, num_parts).
688+
"""
689+
if file_size > FILE_MAX_MULTIPART_COUNT * FILE_MAX_MULTIPART_SIZE:
690+
raise ValueError(
691+
f"File size {file_size} exceeds the maximum supported size of {FILE_MAX_MULTIPART_COUNT * FILE_MAX_MULTIPART_SIZE} bytes for multipart upload."
692+
)
693+
if file_size < FILE_MIN_MULTIPART_SIZE:
694+
return FILE_MIN_MULTIPART_SIZE, 1
695+
uncapped_part_size = max(FILE_DEFAULT_MULTIPART_SIZE, math.ceil(file_size / FILE_MAX_MULTIPART_COUNT))
696+
part_size = min(uncapped_part_size, FILE_MAX_MULTIPART_SIZE)
697+
num_parts = math.ceil(file_size / part_size)
698+
return part_size, num_parts
595699

596700
async def upload_content_bytes(
597701
self,
@@ -633,7 +737,7 @@ async def upload_content_bytes(
633737
res = await self._post(
634738
url_path=f"{self._RESOURCE_PATH}/uploadlink",
635739
json={"items": identifiers.as_dicts()},
636-
semaphore=self._get_semaphore("write"),
740+
semaphore=self._get_semaphore("upload"),
637741
)
638742
except CogniteAPIError as e:
639743
if e.code == 403:
@@ -672,7 +776,7 @@ async def _upload_bytes(
672776
content=file_content,
673777
headers=headers,
674778
timeout=self._config.file_transfer_timeout,
675-
semaphore=self._get_semaphore("write"),
779+
semaphore=self._get_semaphore("upload"),
676780
)
677781
if not upload_response.is_success:
678782
raise CogniteFileUploadError(message=upload_response.text, code=upload_response.status_code)
@@ -751,7 +855,7 @@ async def upload_bytes(
751855
url_path=self._RESOURCE_PATH,
752856
json=file_metadata.dump(camel_case=True),
753857
params={"overwrite": overwrite},
754-
semaphore=self._get_semaphore("write"),
858+
semaphore=self._get_semaphore("upload"),
755859
)
756860
except CogniteAPIError as e:
757861
if e.code == 403 and "insufficient access rights" in e.message:
@@ -850,7 +954,7 @@ async def multipart_upload_session(
850954
url_path=self._RESOURCE_PATH + "/initmultipartupload",
851955
json=file_metadata.dump(camel_case=True),
852956
params={"overwrite": overwrite, "parts": parts},
853-
semaphore=self._get_semaphore("write"),
957+
semaphore=self._get_semaphore("upload"),
854958
)
855959
except CogniteAPIError as e:
856960
if e.code == 403 and "insufficient access rights" in e.message:
@@ -919,7 +1023,7 @@ async def multipart_upload_content_session(
9191023
url_path=f"{self._RESOURCE_PATH}/multiuploadlink",
9201024
json={"items": identifiers.as_dicts()},
9211025
params={"parts": parts},
922-
semaphore=self._get_semaphore("write"),
1026+
semaphore=self._get_semaphore("upload"),
9231027
)
9241028
except CogniteAPIError as e:
9251029
if e.code == 403:
@@ -940,15 +1044,17 @@ async def multipart_upload_content_session(
9401044
FileMetadata._load(returned_file_metadata), upload_urls, upload_id, self._cognite_client
9411045
)
9421046

943-
async def _upload_multipart_part(self, upload_url: str, content: str | bytes | BinaryIO) -> None:
1047+
async def _upload_multipart_part(
1048+
self, upload_url: str, content: str | bytes | BinaryIO | AsyncIterator[bytes]
1049+
) -> None:
9441050
"""Upload part of a file to an upload URL returned from `multipart_upload_session`.
9451051
9461052
Note:
9471053
If `content` does not somehow expose its length, this method may not work on Azure or AWS.
9481054
9491055
Args:
9501056
upload_url (str): URL to upload file chunk to.
951-
content (str | bytes | BinaryIO): The content to upload.
1057+
content (str | bytes | BinaryIO | AsyncIterator[bytes]): The content to upload.
9521058
"""
9531059
headers = {"accept": "*/*"}
9541060
file_size, file_content = prepare_content_for_upload(content)
@@ -961,7 +1067,7 @@ async def _upload_multipart_part(self, upload_url: str, content: str | bytes | B
9611067
content=file_content,
9621068
headers=headers,
9631069
timeout=self._config.file_transfer_timeout,
964-
semaphore=self._get_semaphore("write"),
1070+
semaphore=self._get_semaphore("upload"),
9651071
)
9661072
if not upload_response.is_success:
9671073
raise CogniteFileUploadError(message=upload_response.text, code=upload_response.status_code)
@@ -975,7 +1081,7 @@ async def _complete_multipart_upload(self, session: FileMultipartUploadSession)
9751081
await self._post(
9761082
self._RESOURCE_PATH + "/completemultipartupload",
9771083
json={"id": session.file_metadata.id, "uploadId": session._upload_id},
978-
semaphore=self._get_semaphore("write"),
1084+
semaphore=self._get_semaphore("upload"),
9791085
)
9801086

9811087
async def retrieve_download_urls(
@@ -1201,7 +1307,7 @@ async def _download_file_to_path(self, download_link: str, path: Path) -> None:
12011307
full_url=download_link,
12021308
full_headers={"accept": "*/*"},
12031309
timeout=self._config.file_transfer_timeout,
1204-
semaphore=self._get_semaphore("read"),
1310+
semaphore=self._get_semaphore("download"),
12051311
)
12061312
with path.open("wb") as file:
12071313
async with stream as response:
@@ -1267,7 +1373,7 @@ async def _download_file(self, download_link: str) -> bytes:
12671373
full_url=download_link,
12681374
headers={"accept": "*/*"},
12691375
timeout=self._config.file_transfer_timeout,
1270-
semaphore=self._get_semaphore("read"),
1376+
semaphore=self._get_semaphore("download"),
12711377
)
12721378
return response.content
12731379

cognite/client/_constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,9 @@ def __bool__(self) -> Literal[False]:
3333
DATA_MODELING_DEFAULT_LIMIT_READ = 10
3434
DEFAULT_DATAPOINTS_CHUNK_SIZE = 100_000
3535
_RUNNING_IN_BROWSER = IN_BROWSER
36+
37+
# Files API constants
38+
FILE_MIN_MULTIPART_SIZE = 5 * 1024 * 1024 # 5 MiB
39+
FILE_MAX_MULTIPART_SIZE = 4000 * 1024 * 1024 # 4000 MiB
40+
FILE_DEFAULT_MULTIPART_SIZE = 50 * 1024 * 1024 # 50 MiB
41+
FILE_MAX_MULTIPART_COUNT = 250

cognite/client/_sync_api/files.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""
22
===============================================================================
3-
9775a0ca6d02913bc835831ac35662df
3+
1f615115f322d471c0ae15347e13f8c2
44
This file is auto-generated from the Async API modules, - do not edit manually!
55
===============================================================================
66
"""
@@ -12,7 +12,9 @@
1212
from typing import Any, BinaryIO, Literal, overload
1313

1414
from cognite.client import AsyncCogniteClient
15-
from cognite.client._constants import DEFAULT_LIMIT_READ
15+
from cognite.client._constants import (
16+
DEFAULT_LIMIT_READ,
17+
)
1618
from cognite.client._sync_api_client import SyncAPIClient
1719
from cognite.client.data_classes import (
1820
FileMetadata,
@@ -425,12 +427,17 @@ def upload_content(
425427
self, path: Path | str, external_id: str | None = None, instance_id: NodeId | None = None
426428
) -> FileMetadata:
427429
"""
428-
`Upload a file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getUploadLink>`_.
430+
`Upload file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getMultiPartUploadLink>`_
431+
432+
Upload file content from a local file path to a file previously created (initiated) with only metadata.
433+
For files created with FilesAPI.create(), use `external_id`.
434+
For files created with data modeling API using CogniteFileApply, use `instance_id`.
435+
Supports upload of large files (>5 GB), using multipart upload.
429436
430437
Args:
431-
path (Path | str): Path to the file you wish to upload.
438+
path (Path | str): Local file path.
432439
external_id (str | None): The external ID provided by the client. Must be unique within the project.
433-
instance_id (NodeId | None): Instance ID of the file.
440+
instance_id (NodeId | None): Instance ID of the file (CogniteFile).
434441
Returns:
435442
FileMetadata: No description.
436443
"""
@@ -458,7 +465,14 @@ def upload(
458465
overwrite: bool = False,
459466
) -> FileMetadata | FileMetadataList:
460467
"""
461-
`Upload a file <https://api-docs.cognite.com/20230101/tag/Files/operation/initFileUpload>`_.
468+
`Upload a file or directory <https://api-docs.cognite.com/20230101/tag/Files/operation/initMultiPartUpload>`_
469+
470+
Creates files in files API with metadata and uploads file content.
471+
472+
Note:
473+
If path is a directory, this method will upload all files in that directory. Use `recursive=True` for subdirectories as well.
474+
475+
Supports upload of large files (>5 GB), using multipart upload.
462476
463477
Args:
464478
path (Path | str): Path to the file you wish to upload. If path is a directory, this method will upload all files in that directory.

0 commit comments

Comments
 (0)