Skip to content

Commit 3a2bf2c

Browse files
refactor: various small improvements
1 parent f33de76 commit 3a2bf2c

File tree

4 files changed

+76
-54
lines changed

4 files changed

+76
-54
lines changed

src/openlayer/lib/data/_upload.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def upload_bytes_multipart(
9797
data: Union[bytes, BinaryIO],
9898
object_name: str,
9999
content_type: str,
100-
fields: Optional[Dict] = None,
100+
fields: Dict[str, str] = {},
101101
) -> Response:
102102
"""Upload data using multipart POST (for S3 and local storage).
103103
@@ -115,8 +115,7 @@ def upload_bytes_multipart(
115115
if isinstance(data, bytes):
116116
data = io.BytesIO(data)
117117

118-
upload_fields = dict(fields) if fields else {}
119-
upload_fields["file"] = (object_name, data, content_type)
118+
upload_fields = {"file": (object_name, data, content_type), **fields}
120119

121120
encoder = MultipartEncoder(fields=upload_fields)
122121
headers = {"Content-Type": encoder.content_type}
@@ -136,7 +135,7 @@ def upload_bytes_put(
136135
url: str,
137136
data: Union[bytes, BinaryIO],
138137
content_type: str,
139-
extra_headers: Optional[Dict[str, str]] = None,
138+
extra_headers: Dict[str, str] = {},
140139
) -> Response:
141140
"""Upload data using PUT request (for GCS and Azure).
142141
@@ -149,9 +148,7 @@ def upload_bytes_put(
149148
Returns:
150149
The response from the upload request.
151150
"""
152-
headers = {"Content-Type": content_type}
153-
if extra_headers:
154-
headers.update(extra_headers)
151+
headers = {"Content-Type": content_type, **extra_headers}
155152

156153
response = requests.put(
157154
url,

src/openlayer/lib/integrations/openai_tracer.py

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,7 @@ def handle_non_streaming_create(
400400
processed_messages = extract_chat_completion_messages(kwargs["messages"])
401401

402402
# Check if response contains audio (to sanitize raw_output)
403-
has_audio = (
404-
hasattr(response.choices[0].message, "audio")
405-
and response.choices[0].message.audio is not None
406-
)
403+
has_audio = bool(getattr(response.choices[0].message, "audio", None))
407404

408405
# Sanitize raw_output to remove heavy base64 data already uploaded as attachments
409406
raw_output = response.model_dump()
@@ -481,11 +478,9 @@ def extract_chat_completion_messages(
481478

482479
# If content is a list, process each item
483480
if isinstance(content, list):
484-
normalized_content: List[ContentItem] = []
485-
for item in content:
486-
content_item = _normalize_content_item(item)
487-
normalized_content.append(content_item)
488-
481+
normalized_content: List[ContentItem] = [
482+
_normalize_content_item(item) for item in content
483+
]
489484
processed_msg = {"role": role, "content": normalized_content}
490485
if name:
491486
processed_msg["name"] = name
@@ -594,7 +589,8 @@ def _normalize_content_item(item: Dict[str, Any]) -> ContentItem:
594589

595590
else:
596591
# Unknown type, return as TextContent with string representation
597-
logger.debug("Unknown content item type: %s", item_type)
592+
# Log as warning so we can learn about new OpenAI content types
593+
logger.warning("Unknown content item type '%s', preserving as text", item_type)
598594
return TextContent(text=str(item))
599595

600596

@@ -874,9 +870,7 @@ def handle_responses_non_streaming_create(
874870
if hasattr(response, "model_dump"):
875871
raw_output = response.model_dump()
876872
if has_generated_images:
877-
raw_output = _sanitize_raw_output(
878-
raw_output, has_generated_images=True
879-
)
873+
raw_output = _sanitize_raw_output(raw_output, has_generated_images=True)
880874
else:
881875
raw_output = str(response)
882876

@@ -1114,6 +1108,10 @@ def parse_responses_output_data(
11141108
)
11151109
content_items.append(ImageContent(attachment=attachment))
11161110

1111+
else:
1112+
# Unknown output type - log for future support
1113+
logger.debug("Unknown Responses API output type: %s", output_type)
1114+
11171115
# Return appropriate format based on content
11181116
if len(content_items) > 1:
11191117
# Multimodal output - return list of ContentItems
@@ -1127,7 +1125,7 @@ def parse_responses_output_data(
11271125
return content_items
11281126

11291127
except Exception as e:
1130-
logger.debug("Could not parse Responses API output data: %s", e)
1128+
logger.warning("Could not parse Responses API output data: %s", e)
11311129

11321130
return None
11331131

@@ -1230,30 +1228,46 @@ def _sanitize_raw_output(
12301228

12311229
sanitized = copy.deepcopy(raw_output)
12321230

1233-
# Clear audio data from Chat Completions response
12341231
if has_audio:
1235-
try:
1236-
for choice in sanitized.get("choices", []):
1237-
message = choice.get("message", {})
1238-
if message and "audio" in message and message["audio"]:
1239-
if "data" in message["audio"]:
1240-
message["audio"]["data"] = "[UPLOADED_TO_STORAGE]"
1241-
except Exception as e:
1242-
logger.debug("Could not sanitize audio data from raw_output: %s", e)
1232+
_sanitize_audio_data(sanitized)
12431233

1244-
# Clear image data from Responses API
12451234
if has_generated_images:
1246-
try:
1247-
for output_item in sanitized.get("output", []):
1248-
if output_item.get("type") == "image_generation_call":
1249-
if "result" in output_item:
1250-
output_item["result"] = "[UPLOADED_TO_STORAGE]"
1251-
except Exception as e:
1252-
logger.debug("Could not sanitize image data from raw_output: %s", e)
1235+
_sanitize_image_data(sanitized)
12531236

12541237
return sanitized
12551238

12561239

1240+
def _sanitize_audio_data(sanitized: Dict[str, Any]) -> None:
1241+
"""Remove audio base64 data from Chat Completions response.
1242+
1243+
Args:
1244+
sanitized: The raw output dict to sanitize (modified in place)
1245+
"""
1246+
try:
1247+
for choice in sanitized.get("choices", []):
1248+
message = choice.get("message", {})
1249+
if message and "audio" in message and message["audio"]:
1250+
if "data" in message["audio"]:
1251+
message["audio"]["data"] = "[UPLOADED_TO_STORAGE]"
1252+
except Exception as e:
1253+
logger.debug("Could not sanitize audio data from raw_output: %s", e)
1254+
1255+
1256+
def _sanitize_image_data(sanitized: Dict[str, Any]) -> None:
1257+
"""Remove generated image base64 data from Responses API output.
1258+
1259+
Args:
1260+
sanitized: The raw output dict to sanitize (modified in place)
1261+
"""
1262+
try:
1263+
for output_item in sanitized.get("output", []):
1264+
if output_item.get("type") == "image_generation_call":
1265+
if "result" in output_item:
1266+
output_item["result"] = "[UPLOADED_TO_STORAGE]"
1267+
except Exception as e:
1268+
logger.debug("Could not sanitize image data from raw_output: %s", e)
1269+
1270+
12571271
def parse_non_streaming_output_data(
12581272
response: "openai.types.chat.chat_completion.ChatCompletion",
12591273
) -> Union[str, List[ContentItem], Dict[str, Any], None]:

src/openlayer/lib/tracing/attachment_uploader.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
from .steps import Step
1414
from .traces import Trace
1515

16-
from .attachments import Attachment
1716
from ..data._upload import STORAGE, StorageType, upload_bytes
17+
from .attachments import Attachment
1818

1919
logger = logging.getLogger(__name__)
2020

@@ -59,16 +59,16 @@ class AttachmentUploader:
5959
S3, GCS, Azure, and local storage backends.
6060
"""
6161

62-
def __init__(self, client: "Openlayer", storage: Optional[StorageType] = None):
62+
def __init__(self, client: "Openlayer", storage: StorageType = STORAGE):
6363
"""Initialize the attachment uploader.
6464
6565
Args:
6666
client: The Openlayer client instance.
6767
storage: Storage type override. Defaults to the global STORAGE setting.
6868
"""
6969
self._client = client
70-
self._storage = storage or STORAGE
71-
self._upload_cache: Dict[str, str] = {} # checksum -> storage_uri
70+
self._storage = storage
71+
self._storage_uri_cache: Dict[str, str] = {} # checksum -> storage_uri
7272

7373
def upload_attachment(self, attachment: "Attachment") -> "Attachment":
7474
"""Upload a single attachment if needed.
@@ -104,8 +104,11 @@ def upload_attachment(self, attachment: "Attachment") -> "Attachment":
104104
return attachment
105105

106106
# Check cache by checksum for deduplication
107-
if attachment.checksum_md5 and attachment.checksum_md5 in self._upload_cache:
108-
attachment.storage_uri = self._upload_cache[attachment.checksum_md5]
107+
if (
108+
attachment.checksum_md5
109+
and attachment.checksum_md5 in self._storage_uri_cache
110+
):
111+
attachment.storage_uri = self._storage_uri_cache[attachment.checksum_md5]
109112
logger.debug(
110113
"Using cached storage_uri for attachment %s (checksum: %s)",
111114
attachment.name,
@@ -146,7 +149,9 @@ def upload_attachment(self, attachment: "Attachment") -> "Attachment":
146149

147150
# Cache for deduplication
148151
if attachment.checksum_md5:
149-
self._upload_cache[attachment.checksum_md5] = attachment.storage_uri
152+
self._storage_uri_cache[attachment.checksum_md5] = (
153+
attachment.storage_uri
154+
)
150155

151156
# Clear data after upload (no longer needed, avoid duplicating in JSON)
152157
attachment._pending_bytes = None
@@ -213,11 +218,11 @@ def upload_trace_attachments(self, trace: "Trace") -> int:
213218
Returns:
214219
The number of attachments uploaded.
215220
"""
216-
upload_count = 0
217221
seen_ids: set = set()
218222

219-
def process_step(step: "Step") -> None:
220-
nonlocal upload_count
223+
def process_step(step: "Step") -> int:
224+
"""Process a step and return the number of attachments uploaded."""
225+
step_upload_count = 0
221226

222227
# Collect attachments from all sources
223228
all_attachments: List[Attachment] = list(step.attachments)
@@ -233,14 +238,15 @@ def process_step(step: "Step") -> None:
233238
if not attachment.is_uploaded() and attachment.has_data():
234239
self.upload_attachment(attachment)
235240
if attachment.is_uploaded():
236-
upload_count += 1
241+
step_upload_count += 1
237242

238243
# Process nested steps recursively
239244
for nested_step in step.steps:
240-
process_step(nested_step)
245+
step_upload_count += process_step(nested_step)
246+
247+
return step_upload_count
241248

242-
for step in trace.steps:
243-
process_step(step)
249+
upload_count = sum(process_step(step) for step in trace.steps)
244250

245251
if upload_count > 0:
246252
logger.info("Uploaded %d attachment(s) for trace", upload_count)

src/openlayer/lib/tracing/steps.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,20 @@ def attach(
9393
name=name,
9494
media_type=media_type,
9595
)
96-
else:
97-
# File-like object
96+
elif hasattr(data, "read"):
97+
# File-like object (BinaryIO, BufferedReader, etc.)
9898
file_bytes = data.read()
9999
inferred_name = name or getattr(data, "name", None) or "attachment"
100100
attachment = Attachment.from_bytes(
101101
data=file_bytes,
102102
name=inferred_name,
103103
media_type=media_type or "application/octet-stream",
104104
)
105+
else:
106+
raise TypeError(
107+
f"Unsupported data type for attach(): {type(data).__name__}. "
108+
"Expected bytes, str, Path, file-like object, or Attachment."
109+
)
105110

106111
if metadata:
107112
attachment.metadata.update(metadata)

0 commit comments

Comments
 (0)