Skip to content

Commit 5ecc273

Browse files
feat(closes OPEN-8683): multimodal attachment support for the Python SDK
1 parent 0a1ae01 commit 5ecc273

File tree

7 files changed

+1154
-103
lines changed

7 files changed

+1154
-103
lines changed

src/openlayer/lib/data/_upload.py

Lines changed: 143 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
different storage backends.
55
"""
66

7+
import io
78
import os
89
from enum import Enum
9-
from typing import Optional
10+
from typing import BinaryIO, Dict, Optional, Union
1011

1112
import requests
1213
from requests.adapters import Response
@@ -35,6 +36,135 @@ class StorageType(Enum):
3536
VERIFY_REQUESTS = True
3637

3738

39+
# ----- Low-level upload functions (work with bytes or file-like objects) ---- #
40+
def upload_bytes(
41+
storage: StorageType,
42+
url: str,
43+
data: Union[bytes, BinaryIO],
44+
object_name: str,
45+
content_type: str,
46+
fields: Optional[Dict] = None,
47+
) -> Response:
48+
"""Upload data to the appropriate storage backend.
49+
50+
This is a convenience function that routes to the correct upload method
51+
based on the storage type.
52+
53+
Args:
54+
storage: The storage backend type.
55+
url: The presigned URL to upload to.
56+
data: The data to upload (bytes or file-like object).
57+
object_name: The object name (used for multipart uploads).
58+
content_type: The MIME type of the data.
59+
fields: Additional fields for multipart uploads (S3 policy fields).
60+
61+
Returns:
62+
The response from the upload request.
63+
"""
64+
if storage == StorageType.AWS:
65+
return upload_bytes_multipart(
66+
url=url,
67+
data=data,
68+
object_name=object_name,
69+
content_type=content_type,
70+
fields=fields,
71+
)
72+
elif storage == StorageType.GCP:
73+
return upload_bytes_put(
74+
url=url,
75+
data=data,
76+
content_type=content_type,
77+
)
78+
elif storage == StorageType.AZURE:
79+
return upload_bytes_put(
80+
url=url,
81+
data=data,
82+
content_type=content_type,
83+
extra_headers={"x-ms-blob-type": "BlockBlob"},
84+
)
85+
else:
86+
# Local storage uses multipart POST (no extra fields)
87+
return upload_bytes_multipart(
88+
url=url,
89+
data=data,
90+
object_name=object_name,
91+
content_type=content_type,
92+
)
93+
94+
95+
def upload_bytes_multipart(
96+
url: str,
97+
data: Union[bytes, BinaryIO],
98+
object_name: str,
99+
content_type: str,
100+
fields: Optional[Dict] = None,
101+
) -> Response:
102+
"""Upload data using multipart POST (for S3 and local storage).
103+
104+
Args:
105+
url: The presigned URL to upload to.
106+
data: The data to upload (bytes or file-like object).
107+
object_name: The object name for the file field.
108+
content_type: The MIME type of the data.
109+
fields: Additional fields to include in the multipart form (e.g., S3 policy fields).
110+
111+
Returns:
112+
The response from the upload request.
113+
"""
114+
# Convert bytes to file-like object if needed
115+
if isinstance(data, bytes):
116+
data = io.BytesIO(data)
117+
118+
upload_fields = dict(fields) if fields else {}
119+
upload_fields["file"] = (object_name, data, content_type)
120+
121+
encoder = MultipartEncoder(fields=upload_fields)
122+
headers = {"Content-Type": encoder.content_type}
123+
124+
response = requests.post(
125+
url,
126+
data=encoder,
127+
headers=headers,
128+
verify=VERIFY_REQUESTS,
129+
timeout=REQUESTS_TIMEOUT,
130+
)
131+
response.raise_for_status()
132+
return response
133+
134+
135+
def upload_bytes_put(
136+
url: str,
137+
data: Union[bytes, BinaryIO],
138+
content_type: str,
139+
extra_headers: Optional[Dict[str, str]] = None,
140+
) -> Response:
141+
"""Upload data using PUT request (for GCS and Azure).
142+
143+
Args:
144+
url: The presigned URL to upload to.
145+
data: The data to upload (bytes or file-like object).
146+
content_type: The MIME type of the data.
147+
extra_headers: Additional headers (e.g., x-ms-blob-type for Azure).
148+
149+
Returns:
150+
The response from the upload request.
151+
"""
152+
headers = {"Content-Type": content_type}
153+
if extra_headers:
154+
headers.update(extra_headers)
155+
156+
response = requests.put(
157+
url,
158+
data=data,
159+
headers=headers,
160+
verify=VERIFY_REQUESTS,
161+
timeout=REQUESTS_TIMEOUT,
162+
)
163+
response.raise_for_status()
164+
return response
165+
166+
167+
# --- High-level Uploader class (file-based uploads with progress tracking) -- #
38168
class Uploader:
39169
"""Internal class to handle http requests"""
40170

@@ -105,7 +235,9 @@ def upload_blob_s3(
105235
fields = presigned_url_response.fields
106236
fields["file"] = (object_name, f, "application/x-tar")
107237
e = MultipartEncoder(fields=fields)
108-
m = MultipartEncoderMonitor(e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n))
238+
m = MultipartEncoderMonitor(
239+
e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n)
240+
)
109241
headers = {"Content-Type": m.content_type}
110242
res = requests.post(
111243
presigned_url_response.url,
@@ -116,7 +248,9 @@ def upload_blob_s3(
116248
)
117249
return res
118250

119-
def upload_blob_gcs(self, file_path: str, presigned_url_response: PresignedURLCreateResponse):
251+
def upload_blob_gcs(
252+
self, file_path: str, presigned_url_response: PresignedURLCreateResponse
253+
):
120254
"""Generic method to upload data to Google Cloud Storage and create the
121255
appropriate resource in the backend.
122256
"""
@@ -137,7 +271,9 @@ def upload_blob_gcs(self, file_path: str, presigned_url_response: PresignedURLCr
137271
)
138272
return res
139273

140-
def upload_blob_azure(self, file_path: str, presigned_url_response: PresignedURLCreateResponse):
274+
def upload_blob_azure(
275+
self, file_path: str, presigned_url_response: PresignedURLCreateResponse
276+
):
141277
"""Generic method to upload data to Azure Blob Storage and create the
142278
appropriate resource in the backend.
143279
"""
@@ -180,7 +316,9 @@ def upload_blob_local(
180316
with open(file_path, "rb") as f:
181317
fields = {"file": (object_name, f, "application/x-tar")}
182318
e = MultipartEncoder(fields=fields)
183-
m = MultipartEncoderMonitor(e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n))
319+
m = MultipartEncoderMonitor(
320+
e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n)
321+
)
184322
headers = {"Content-Type": m.content_type}
185323
res = requests.post(
186324
presigned_url_response.url,

src/openlayer/lib/tracing/__init__.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,32 @@
11
"""Openlayer tracing module."""
22

3+
from .attachments import Attachment
34
from .tracer import (
5+
configure,
6+
create_step,
7+
get_current_step,
8+
get_current_trace,
9+
log_attachment,
10+
log_context,
11+
log_output,
412
trace,
513
trace_async,
6-
update_current_trace,
714
update_current_step,
8-
log_context,
9-
log_output,
10-
configure,
11-
get_current_trace,
12-
get_current_step,
13-
create_step,
15+
update_current_trace,
1416
)
1517

16-
1718
__all__ = [
1819
# Core tracing functions
1920
"trace",
20-
"trace_async",
21+
"trace_async",
2122
"update_current_trace",
2223
"update_current_step",
2324
"log_context",
2425
"log_output",
2526
"configure",
2627
"get_current_trace",
27-
"get_current_step",
28+
"get_current_step",
2829
"create_step",
30+
"Attachment",
31+
"log_attachment",
2932
]
30-

0 commit comments

Comments
 (0)