Skip to content

Commit 32facd2

Browse files
chore(closes OPEN-8686): upload attachments in a non-blocking way
1 parent d0df24c commit 32facd2

File tree

1 file changed

+129
-47
lines changed

1 file changed

+129
-47
lines changed

src/openlayer/lib/tracing/tracer.py

Lines changed: 129 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Module with the logic to create and manage traces and steps."""
22

33
import asyncio
4+
import atexit
45
import contextvars
56
import inspect
67
import json
@@ -10,6 +11,7 @@
1011
import time
1112
import traceback
1213
import uuid
14+
from concurrent.futures import ThreadPoolExecutor
1315
from contextlib import contextmanager
1416
from functools import wraps
1517
from pathlib import Path
@@ -66,6 +68,34 @@
6668
# Attachment upload configuration
6769
_configured_attachment_upload_enabled: bool = False
6870

71+
# Background publishing configuration
72+
_configured_background_publish_enabled: bool = True
73+
74+
# Background executor for async trace publishing
75+
_background_executor: Optional[ThreadPoolExecutor] = None
76+
77+
78+
def _get_background_executor() -> ThreadPoolExecutor:
79+
"""Get or create the background executor for trace publishing."""
80+
global _background_executor
81+
if _background_executor is None:
82+
_background_executor = ThreadPoolExecutor(
83+
max_workers=4, thread_name_prefix="openlayer-tracer"
84+
)
85+
# Register cleanup on exit
86+
atexit.register(_shutdown_background_executor)
87+
return _background_executor
88+
89+
90+
def _shutdown_background_executor() -> None:
91+
"""Shutdown the background executor gracefully."""
92+
global _background_executor
93+
if _background_executor is not None:
94+
logger.debug("Shutting down background executor, waiting for pending tasks...")
95+
_background_executor.shutdown(wait=True)
96+
_background_executor = None
97+
logger.debug("Background executor shutdown complete")
98+
6999

70100
def configure(
71101
api_key: Optional[str] = None,
@@ -78,6 +108,7 @@ def configure(
78108
offline_buffer_path: Optional[str] = None,
79109
max_buffer_size: Optional[int] = None,
80110
attachment_upload_enabled: bool = False,
111+
background_publish_enabled: bool = True,
81112
) -> None:
82113
"""Configure the Openlayer tracer with custom settings.
83114
@@ -101,6 +132,10 @@ def configure(
101132
attachment_upload_enabled: Enable uploading of attachments (images, audio, etc.) to
102133
Openlayer storage. When enabled, attachments on steps will be uploaded during
103134
trace completion. Defaults to False.
135+
background_publish_enabled: Enable background publishing of traces. When enabled,
136+
attachment uploads and trace publishing happen in a background thread, allowing
137+
the main thread to return immediately. When disabled, tracing is synchronous.
138+
Defaults to True.
104139
105140
Examples:
106141
>>> import openlayer.lib.tracing.tracer as tracer
@@ -131,7 +166,7 @@ def configure(
131166
"""
132167
global _configured_api_key, _configured_pipeline_id, _configured_base_url, _configured_timeout, _configured_max_retries, _client
133168
global _configured_on_flush_failure, _configured_offline_buffer_enabled, _configured_offline_buffer_path, _configured_max_buffer_size, _offline_buffer
134-
global _configured_attachment_upload_enabled
169+
global _configured_attachment_upload_enabled, _configured_background_publish_enabled
135170

136171
_configured_api_key = api_key
137172
_configured_pipeline_id = inference_pipeline_id
@@ -143,6 +178,7 @@ def configure(
143178
_configured_offline_buffer_path = offline_buffer_path
144179
_configured_max_buffer_size = max_buffer_size
145180
_configured_attachment_upload_enabled = attachment_upload_enabled
181+
_configured_background_publish_enabled = background_publish_enabled
146182

147183
# Reset the client and buffer so they get recreated with new configuration
148184
_client = None
@@ -1498,18 +1534,73 @@ def _handle_trace_completion(
14981534
)
14991535
return
15001536

1537+
# Get current step prompt before potentially losing context
1538+
current_step = get_current_step()
1539+
prompt = None
1540+
if isinstance(current_step, steps.ChatCompletionStep):
1541+
prompt = current_step.inputs.get("prompt")
1542+
1543+
# Resolve inference_pipeline_id now (while we have access to config)
1544+
resolved_pipeline_id = (
1545+
inference_pipeline_id
1546+
or _configured_pipeline_id
1547+
or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID")
1548+
)
1549+
1550+
if _publish:
1551+
if _configured_background_publish_enabled:
1552+
# Submit to background thread pool
1553+
executor = _get_background_executor()
1554+
executor.submit(
1555+
_upload_and_publish_trace,
1556+
current_trace,
1557+
resolved_pipeline_id,
1558+
prompt,
1559+
on_flush_failure,
1560+
)
1561+
logger.debug("Trace submitted to background executor for publishing")
1562+
else:
1563+
# Run synchronously
1564+
_upload_and_publish_trace(
1565+
current_trace,
1566+
resolved_pipeline_id,
1567+
prompt,
1568+
on_flush_failure,
1569+
)
1570+
else:
1571+
logger.debug("Ending step %s", step_name)
1572+
1573+
1574+
def _upload_and_publish_trace(
1575+
trace: "traces.Trace",
1576+
inference_pipeline_id: Optional[str],
1577+
prompt: Optional[Any],
1578+
on_flush_failure: Optional[OnFlushFailureCallback],
1579+
) -> None:
1580+
"""Upload attachments and publish trace data to Openlayer.
1581+
1582+
This function can run either synchronously or in a background thread,
1583+
depending on the background_publish_enabled configuration.
1584+
1585+
Args:
1586+
trace: The trace to upload and publish.
1587+
inference_pipeline_id: The pipeline ID to publish to.
1588+
prompt: The prompt from the ChatCompletionStep, if applicable.
1589+
on_flush_failure: Optional callback for handling failures.
1590+
"""
1591+
try:
15011592
# Upload attachments before processing trace data
15021593
if _configured_attachment_upload_enabled:
15031594
try:
15041595
from .attachment_uploader import upload_trace_attachments
15051596

1506-
upload_count = upload_trace_attachments(current_trace)
1597+
upload_count = upload_trace_attachments(trace)
15071598
if upload_count > 0:
15081599
logger.debug("Uploaded %d attachments for trace", upload_count)
15091600
except Exception as e:
15101601
logger.error("Failed to upload trace attachments: %s", e)
15111602

1512-
trace_data, input_variable_names = post_process_trace(current_trace)
1603+
trace_data, input_variable_names = post_process_trace(trace)
15131604

15141605
config = dict(
15151606
ConfigLlmData(
@@ -1533,53 +1624,44 @@ def _handle_trace_completion(
15331624
if "context" in trace_data:
15341625
config.update({"context_column_name": "context"})
15351626

1536-
if isinstance(get_current_step(), steps.ChatCompletionStep):
1537-
config.update(
1538-
{
1539-
"prompt": get_current_step().inputs.get("prompt"),
1540-
}
1541-
)
1542-
if _publish:
1543-
# Use provided pipeline_id, or fall back to configured default,
1544-
# or finally to environment variable
1545-
inference_pipeline_id = (
1546-
inference_pipeline_id
1547-
or _configured_pipeline_id
1548-
or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID")
1549-
)
1550-
client = _get_client()
1627+
if prompt is not None:
1628+
config.update({"prompt": prompt})
15511629

1552-
if client:
1553-
try:
1554-
response = client.inference_pipelines.data.stream(
1555-
inference_pipeline_id=inference_pipeline_id,
1556-
rows=[trace_data],
1557-
config=config,
1558-
)
1559-
print(
1560-
"Successfully streamed data to Openlayer. Response:",
1561-
response.to_json(),
1562-
)
1630+
client = _get_client()
15631631

1564-
except Exception as err: # pylint: disable=broad-except
1565-
logger.error(traceback.format_exc())
1566-
logger.error(
1567-
"Could not stream data to Openlayer (pipeline_id: %s, base_url: %s) Error: %s",
1568-
inference_pipeline_id,
1569-
client.base_url if client else "N/A",
1570-
err,
1571-
)
1632+
if client:
1633+
try:
1634+
response = client.inference_pipelines.data.stream(
1635+
inference_pipeline_id=inference_pipeline_id,
1636+
rows=[trace_data],
1637+
config=config,
1638+
)
1639+
logger.info(
1640+
"Successfully streamed data to Openlayer. Response: %s",
1641+
response.to_json(),
1642+
)
15721643

1573-
# Handle failure callback and offline buffering
1574-
_handle_streaming_failure(
1575-
trace_data=trace_data,
1576-
config=config,
1577-
inference_pipeline_id=inference_pipeline_id,
1578-
error=err,
1579-
on_flush_failure=on_flush_failure,
1580-
)
1581-
else:
1582-
logger.debug("Ending step %s", step_name)
1644+
except Exception as err: # pylint: disable=broad-except
1645+
logger.error(traceback.format_exc())
1646+
logger.error(
1647+
"Could not stream data to Openlayer (pipeline_id: %s, base_url: %s) Error: %s",
1648+
inference_pipeline_id,
1649+
client.base_url if client else "N/A",
1650+
err,
1651+
)
1652+
1653+
# Handle failure callback and offline buffering
1654+
_handle_streaming_failure(
1655+
trace_data=trace_data,
1656+
config=config,
1657+
inference_pipeline_id=inference_pipeline_id,
1658+
error=err,
1659+
on_flush_failure=on_flush_failure,
1660+
)
1661+
1662+
except Exception as e:
1663+
logger.error("Error in background trace publishing: %s", e)
1664+
logger.error(traceback.format_exc())
15831665

15841666

15851667
def _handle_streaming_failure(

0 commit comments

Comments
 (0)