-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathupload.py
More file actions
197 lines (158 loc) · 6.37 KB
/
Copy pathupload.py
File metadata and controls
197 lines (158 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from __future__ import annotations
import asyncio
import logging
import mimetypes
from pathlib import Path
from typing import TYPE_CHECKING, Any
from requests_toolbelt import MultipartEncoder
from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client.transport import WithRestClient
if TYPE_CHECKING:
from sift_client.transport.rest_transport import RestClient
# Configure logging
logger = logging.getLogger(__name__)
# Parquet MIME types are not included in mimetypes by default
mimetypes.add_type("application/vnd.apache.parquet", ".pqt")
mimetypes.add_type("application/vnd.apache.parquet", ".parquet")
class UploadLowLevelClient(LowLevelClientBase, WithRestClient):
"""Low-level client for file upload operations.
This class provides a thin wrapper for uploading file attachments via REST API.
Example:
```python
from sift_client.client import SiftClient
from sift_client._internal.low_level_wrappers.upload import UploadLowLevelClient
# Initialize the REST client
sift_client = SiftClient(rest_url="https://your-api.siftstack.com", grpc_url="https://your-grpc-api.siftstack.com", api_key="your-api-key")
# Create the upload client
upload_client = UploadLowLevelClient(sift_client.rest_client)
# Upload a file
remote_file_id = await upload_client.upload_attachment(
path="path/to/file.mp4",
entity_id="run_12345",
entity_type="runs",
description="Video of test run",
)
```
"""
UPLOAD_PATH = "/api/v0/remote-files/upload"
UPLOAD_BULK_PATH = "/api/v0/remote-files/upload:bulk"
def __init__(self, rest_client: RestClient):
"""Initialize the UploadLowLevelClient.
Args:
rest_client: The REST client to use for making API calls.
"""
super().__init__(rest_client)
async def upload_attachment(
self,
path: str | Path,
entity_id: str,
entity_type: str,
metadata: dict[str, Any] | None = None,
description: str | None = None,
organization_id: str | None = None,
) -> str:
"""Upload a file attachment to an entity.
Args:
path: Path to the file to upload.
entity_id: The ID of the entity to attach the file to.
entity_type: The type of entity (e.g., "runs", "annotations", "annotation_logs").
metadata: Optional metadata for the file (e.g., video/image metadata).
description: Optional description of the file.
organization_id: Optional organization ID.
Returns:
The remote file ID of the uploaded file.
Raises:
ValueError: If the path doesn't point to a regular file or MIME type cannot be determined.
Exception: If the upload fails.
"""
posix_path = Path(path) if isinstance(path, str) else path
if not posix_path.is_file():
raise ValueError(f"Provided path, '{path}', does not point to a regular file.")
file_name, mimetype, content_encoding = self._mime_and_content_type_from_path(posix_path)
if not mimetype:
extension = posix_path.suffix
if extension:
mimetype = f"application/x-{extension.lstrip('.')}"
else:
mimetype = "application/octet-stream" # fallback to generic 'binary data' MIME type
# Run the synchronous file upload in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self._upload_file_sync,
posix_path,
file_name,
mimetype,
content_encoding,
entity_id,
entity_type,
metadata,
description,
organization_id,
)
def _upload_file_sync(
self,
path: Path,
file_name: str,
mimetype: str,
content_encoding: str | None,
entity_id: str,
entity_type: str,
metadata: dict[str, Any] | None,
description: str | None,
organization_id: str | None,
) -> str:
"""Synchronous helper to upload the file.
This is called from a thread pool to avoid blocking the async event loop.
"""
with open(path, "rb") as file:
form_fields: dict[str, Any] = {
"entityId": entity_id,
"entityType": entity_type,
}
if content_encoding:
form_fields["file"] = (
file_name,
file,
mimetype,
{
"Content-Encoding": content_encoding,
},
)
else:
form_fields["file"] = (file_name, file, mimetype)
if metadata:
import json
form_fields["metadata"] = json.dumps(
metadata, default=lambda x: x.as_json() if hasattr(x, "as_json") else x
)
if organization_id:
form_fields["organizationId"] = organization_id
if description:
form_fields["description"] = description
form_data = MultipartEncoder(fields=form_fields)
# Use the RestClient to make the POST request
response = self._rest_client.post(
endpoint=self.UPLOAD_PATH,
data=form_data, # type: ignore
headers={
"Content-Type": form_data.content_type,
},
)
if response.status_code != 200:
raise Exception(
f"Request failed with status code {response.status_code} ({response.reason})."
)
response_data = response.json()
return response_data.get("remoteFile", {}).get("remoteFileId")
@staticmethod
def _mime_and_content_type_from_path(path: Path) -> tuple[str, str | None, str | None]:
"""Determine the MIME type and content encoding from a file path.
Args:
path: The file path to analyze.
Returns:
A tuple of (file_name, mime_type, content_encoding).
"""
file_name = path.name
mime, encoding = mimetypes.guess_type(path)
return file_name, mime, encoding