Skip to content

Commit f7e74ab

Browse files
authored
feat(Data Modeling, Files): Extend DM Files with all upload methods (DM-3842) (#2665)
1 parent 9ffab8c commit f7e74ab

5 files changed

Lines changed: 479 additions & 32 deletions

File tree

cognite/client/_api/data_modeling/files.py

Lines changed: 194 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,30 @@
11
from __future__ import annotations
22

3+
import asyncio
34
from collections.abc import Sequence
5+
from functools import cached_property
46
from pathlib import Path
57
from typing import TYPE_CHECKING, Any, NoReturn, overload
68

79
from cognite.client._api_client import APIClient
810
from cognite.client._constants import DEFAULT_LIMIT_READ
11+
from cognite.client.config import global_config
912
from cognite.client.data_classes.data_modeling.cdm.v1 import CogniteFile
1013
from cognite.client.data_classes.data_modeling.ids import NodeId, ViewId
11-
from cognite.client.data_classes.data_modeling.instances import InstanceSort, Node, NodeList
14+
from cognite.client.data_classes.data_modeling.instances import InstanceSort, Node, NodeApply, NodeList
1215
from cognite.client.data_classes.data_modeling.views import View
16+
from cognite.client.data_classes.files import FileMetadata
1317
from cognite.client.data_classes.filters import Filter
18+
from cognite.client.exceptions import CogniteFileUploadError, CogniteNotFoundError
19+
from cognite.client.utils._retry import Backoff
1420
from cognite.client.utils.useful_types import SequenceNotStr
1521

1622
if TYPE_CHECKING:
23+
from collections.abc import Awaitable, Callable
24+
from typing import BinaryIO
25+
1726
from cognite.client import AsyncCogniteClient
27+
from cognite.client._api.data_modeling.instances import InstancesAPI
1828
from cognite.client.config import ClientConfig
1929

2030
COGNITE_FILE_VIEW_ID = CogniteFile.get_source()
@@ -45,6 +55,10 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
4555
super().__init__(config, api_version, cognite_client)
4656
self._files_api = cognite_client.files
4757

58+
@cached_property
59+
def _instances_api(self) -> InstancesAPI:
60+
return self._cognite_client.data_modeling.instances
61+
4862
async def retrieve_download_urls(
4963
self,
5064
node_ids: NodeId | tuple[str, str] | Sequence[NodeId | tuple[str, str]],
@@ -160,17 +174,186 @@ async def download_bytes(self, node_id: NodeId | tuple[str, str]) -> bytes:
160174
"""
161175
return await self._files_api.download_bytes(instance_id=node_id)
162176

163-
async def upload(self, *args: Any, **kwargs: Any) -> NoReturn:
164-
raise NotImplementedError("This method is not implemented yet!")
177+
async def upload(self, path: Path, node: NodeApply) -> None:
178+
"""`Create a file node and upload content in one step. <https://api-docs.cognite.com/20230101/tag/Files/operation/getUploadLink>`_
165179
166-
async def upload_bytes(self, *args: Any, **kwargs: Any) -> NoReturn:
167-
raise NotImplementedError("This method is not implemented yet!")
180+
The node is created (or updated) via ``instances.apply``, then the file content is uploaded.
168181
169-
async def upload_content(self, *args: Any, **kwargs: Any) -> NoReturn:
170-
raise NotImplementedError("This method is not implemented yet!")
182+
Args:
183+
path (Path): Path to the file to upload.
184+
node (NodeApply): The file node to apply before uploading.
171185
172-
async def upload_content_bytes(self, *args: Any, **kwargs: Any) -> NoReturn:
173-
raise NotImplementedError("This method is not implemented yet!")
186+
Examples:
187+
188+
Create a file node and upload content:
189+
190+
>>> from cognite.client import CogniteClient
191+
>>> from cognite.client.data_classes.data_modeling.cdm.v1 import CogniteFileApply
192+
>>> client = CogniteClient()
193+
>>> file_name = "Quarterly-Report.pdf"
194+
>>> client.data_modeling.files.upload(
195+
... Path(file_name),
196+
... CogniteFileApply(
197+
... space="my-space",
198+
... external_id="my-file",
199+
... name=file_name,
200+
... mime_type="application/pdf",
201+
... ),
202+
... )
203+
"""
204+
await self._instances_api.apply(nodes=node)
205+
node_id = node.as_id()
206+
await self._upload_to_newly_created_file_node(
207+
node_id, upload_fn=lambda: self._files_api.upload_content(path=path, instance_id=node_id)
208+
)
209+
210+
async def upload_bytes(self, content: str | bytes | BinaryIO, node: NodeApply) -> None:
211+
"""Create a file node and upload in-memory content in one step.
212+
213+
The node is created (or updated) via ``instances.apply``, then the content is uploaded.
214+
215+
Args:
216+
content (str | bytes | BinaryIO): The content to upload.
217+
node (NodeApply): The file node to apply before uploading.
218+
219+
Examples:
220+
221+
Create a file node and upload bytes:
222+
223+
>>> from cognite.client import CogniteClient
224+
>>> from cognite.client.data_classes.data_modeling.cdm.v1 import CogniteFileApply
225+
>>> client = CogniteClient()
226+
>>> client.data_modeling.files.upload_bytes(
227+
... b"some important notes",
228+
... CogniteFileApply(
229+
... space="my-space",
230+
... external_id="my-file",
231+
... name="notes.txt",
232+
... mime_type="text/plain",
233+
... ),
234+
... )
235+
"""
236+
await self._instances_api.apply(nodes=node)
237+
node_id = node.as_id()
238+
await self._upload_to_newly_created_file_node(
239+
node_id, upload_fn=lambda: self._files_api.upload_content_bytes(content=content, instance_id=node_id)
240+
)
241+
242+
async def _upload_to_newly_created_file_node(
243+
self, node_id: NodeId, upload_fn: Callable[[], Awaitable[FileMetadata]]
244+
) -> None:
245+
try:
246+
await upload_fn()
247+
return # we do not want to return legacy FileMetadata
248+
except CogniteNotFoundError as err:
249+
# If a newly created node is not found, we first need to verify that the node is actually a file node.
250+
# The retrieve endpoint is immediately consistent, so we check:
251+
if await self.retrieve(node_id, source=COGNITE_FILE_VIEW_ID) is None:
252+
raise CogniteFileUploadError(
253+
f"The file upload failed because the target {node_id=} is not a file node. "
254+
"Make sure to write through CogniteFile or an extension of it.",
255+
code=err.code,
256+
) from err
257+
258+
# We now know that the newly created node -is- a file node, we are just experiencing propagation delays to the
259+
# backend file service. We should retry with backoff settings (set by the user):
260+
await self._upload_with_retry(node_id, upload_fn, err)
261+
262+
async def _upload_with_retry(
263+
self,
264+
node_id: NodeId,
265+
upload_fn: Callable[[], Awaitable[FileMetadata]],
266+
latest_error: CogniteNotFoundError,
267+
) -> None:
268+
backoff = Backoff(max_wait=global_config.max_retry_backoff)
269+
for _ in range(global_config.max_retries):
270+
await asyncio.sleep(next(backoff)) # we sleep immediately because we have already tried uploading
271+
try:
272+
await upload_fn()
273+
return
274+
except CogniteNotFoundError as err:
275+
latest_error = err
276+
277+
total_attempts = global_config.max_retries + 1
278+
raise CogniteFileUploadError(
279+
f"The file upload failed to {node_id=} after {total_attempts} attempt(s): "
280+
"backend file propagation is taking longer than expected. "
281+
"Ensure no one has deleted the file node in the meantime, then try again shortly.",
282+
code=latest_error.code,
283+
) from latest_error
284+
285+
async def upload_content(self, path: Path, node_id: NodeId | tuple[str, str]) -> None:
286+
"""`Upload content to an existing file node by instance ID. <https://api-docs.cognite.com/20230101/tag/Files/operation/getUploadLink>`_
287+
288+
Args:
289+
path (Path): Path to the file to upload.
290+
node_id (NodeId | tuple[str, str]): Instance ID of the file node.
291+
292+
Examples:
293+
294+
Upload file content by instance ID:
295+
296+
>>> from pathlib import Path
297+
>>> from cognite.client import CogniteClient
298+
>>> from cognite.client.data_classes.data_modeling import NodeId
299+
>>> client = CogniteClient()
300+
>>> client.data_modeling.files.upload_content(
301+
... Path("/path/to/file.txt"), NodeId("my-space", "my-file")
302+
... )
303+
"""
304+
node_id = NodeId.load(node_id)
305+
await self._upload_to_existing_node(
306+
node_id, upload_fn=lambda: self._files_api.upload_content(path=path, instance_id=node_id)
307+
)
308+
309+
async def _upload_to_existing_node(self, node_id: NodeId, upload_fn: Callable[[], Awaitable[FileMetadata]]) -> None:
310+
try:
311+
await upload_fn()
312+
return # we do not want to return legacy FileMetadata
313+
except CogniteNotFoundError as err:
314+
# We did not create the node before upload, so we don't know if the node even exists. We first
315+
# need to verify that the node is actually a file node, so we use the retrieve endpoint which
316+
# is immediately consistent to check:
317+
if await self.retrieve(node_id, source=COGNITE_FILE_VIEW_ID) is None:
318+
if await self._instances_api.retrieve_nodes(nodes=node_id, sources=None):
319+
err_msg = (
320+
f"The file upload failed because the target {node_id=} exists but is not a file node. "
321+
"Make sure to write through CogniteFile or an extension of it."
322+
)
323+
else:
324+
err_msg = f"The file upload failed because the target {node_id=} does not exist."
325+
raise CogniteFileUploadError(err_msg, code=err.code) from err
326+
327+
# We now know that the existing node -is- a file node, we are just experiencing propagation delays to the
328+
# backend file service. We should retry with backoff settings (set by the user):
329+
await self._upload_with_retry(node_id, upload_fn, err)
330+
331+
async def upload_content_bytes(
332+
self,
333+
content: str | bytes | BinaryIO,
334+
node_id: NodeId | tuple[str, str],
335+
) -> None:
336+
"""Upload bytes or string content to an existing file node by instance ID.
337+
338+
Args:
339+
content (str | bytes | BinaryIO): The content to upload.
340+
node_id (NodeId | tuple[str, str]): Instance ID of the file node.
341+
342+
Examples:
343+
344+
Upload bytes to an existing file node by instance ID:
345+
346+
>>> from cognite.client import CogniteClient
347+
>>> from cognite.client.data_classes.data_modeling import NodeId
348+
>>> client = CogniteClient()
349+
>>> client.data_modeling.files.upload_content_bytes(
350+
... b"some content", NodeId("my-space", "my-file")
351+
... )
352+
"""
353+
node_id = NodeId.load(node_id)
354+
await self._upload_to_existing_node(
355+
node_id, upload_fn=lambda: self._files_api.upload_content_bytes(content=content, instance_id=node_id)
356+
)
174357

175358
async def __call__(self) -> NoReturn:
176359
raise NotImplementedError("This method is not implemented yet!")
@@ -237,7 +420,7 @@ async def retrieve(
237420
... )
238421
"""
239422
sources, strip = _resolve_source(source)
240-
result = await self._cognite_client.data_modeling.instances.retrieve_nodes(nodes=node_ids, sources=sources)
423+
result = await self._instances_api.retrieve_nodes(nodes=node_ids, sources=sources)
241424
if strip and result:
242425
for node in [result] if isinstance(result, Node) else result:
243426
node.drop_source(COGNITE_FILE_VIEW_ID)
@@ -291,7 +474,7 @@ async def list(
291474
... )
292475
"""
293476
sources, strip = _resolve_source(source)
294-
results = await self._cognite_client.data_modeling.instances.list(
477+
results = await self._instances_api.list(
295478
instance_type="node",
296479
sources=sources,
297480
space=space,

cognite/client/_api/files.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ async def upload_content(
488488
self,
489489
path: Path | str,
490490
external_id: str | None = None,
491-
instance_id: NodeId | None = None,
491+
instance_id: NodeId | tuple[str, str] | None = None,
492492
) -> FileMetadata:
493493
"""`Upload file content <https://api-docs.cognite.com/20230101/tag/Files/operation/getMultiPartUploadLink>`_
494494
@@ -500,7 +500,7 @@ async def upload_content(
500500
Args:
501501
path (Path | str): Local file path.
502502
external_id (str | None): The external ID provided by the client. Must be unique within the project.
503-
instance_id (NodeId | None): Instance ID of the file (CogniteFile).
503+
instance_id (NodeId | tuple[str, str] | None): Instance ID of the file (CogniteFile).
504504
Returns:
505505
FileMetadata: No description.
506506
"""
@@ -719,7 +719,7 @@ async def upload_content_bytes(
719719
self,
720720
content: str | bytes | BinaryIO,
721721
external_id: str | None = None,
722-
instance_id: NodeId | None = None,
722+
instance_id: NodeId | tuple[str, str] | None = None,
723723
) -> FileMetadata:
724724
"""Upload bytes or string (UTF-8 assumed).
725725
@@ -728,7 +728,7 @@ async def upload_content_bytes(
728728
Args:
729729
content (str | bytes | BinaryIO): The content to upload.
730730
external_id (str | None): The external ID provided by the client. Must be unique within the project.
731-
instance_id (NodeId | None): Instance ID of the file.
731+
instance_id (NodeId | tuple[str, str] | None): Instance ID of the file.
732732
733733
Returns:
734734
FileMetadata: No description.
@@ -1000,7 +1000,7 @@ async def multipart_upload_content_session(
10001000
self,
10011001
parts: int,
10021002
external_id: str | None = None,
1003-
instance_id: NodeId | None = None,
1003+
instance_id: NodeId | tuple[str, str] | None = None,
10041004
) -> FileMultipartUploadSession:
10051005
"""Begin uploading a file in multiple parts whose metadata is already created in CDF.
10061006
@@ -1017,7 +1017,7 @@ async def multipart_upload_content_session(
10171017
Args:
10181018
parts (int): The number of parts to upload, must be between 1 and 250.
10191019
external_id (str | None): The external ID provided by the client. Must be unique within the project.
1020-
instance_id (NodeId | None): Instance ID of the file.
1020+
instance_id (NodeId | tuple[str, str] | None): Instance ID of the file.
10211021
10221022
Returns:
10231023
FileMultipartUploadSession: Object containing metadata about the created file, and information needed to upload the file content. Use this object to manage the file upload, and `exit` it once all parts are uploaded.

0 commit comments

Comments
 (0)