|
| 1 | +"""Realtime (WebSocket) support for the UiPath OpenAI clients. |
| 2 | +
|
| 3 | +The OpenAI Realtime API speaks over a WebSocket rather than HTTP, so it does not |
| 4 | +go through the httpx routing used for completions/embeddings. Instead, |
| 5 | +``UiPathOpenAI`` / ``UiPathAsyncOpenAI`` expose ``client.realtime.connect()`` — |
| 6 | +exactly like the stock OpenAI SDK — by swapping in the resource wrappers defined |
| 7 | +here. On connect these wrappers: |
| 8 | +
|
| 9 | +- point the client's ``websocket_base_url`` at the gateway passthrough realtime |
| 10 | + path (``.../vendor/<vendor>/model/<model>``); the SDK appends ``/realtime``, |
| 11 | +- set the S2S bearer token as ``api_key`` (the SDK sends it as |
| 12 | + ``Authorization: Bearer`` on the WebSocket upgrade), |
| 13 | +- inject the ``X-UiPath-*`` routing headers on the upgrade request. |
| 14 | +
|
| 15 | +Completions/embeddings are unaffected: their auth comes from the httpx auth |
| 16 | +pipeline, and the realtime URL is only built when ``.realtime`` is accessed. |
| 17 | +
|
| 18 | +Example: |
| 19 | + >>> import asyncio |
| 20 | + >>> from uipath.llm_client.clients.openai import UiPathAsyncOpenAI |
| 21 | + >>> |
| 22 | + >>> async def main(): |
| 23 | + ... client = UiPathAsyncOpenAI(model_name="gpt-realtime") |
| 24 | + ... async with client.realtime.connect() as conn: |
| 25 | + ... await conn.session.update( |
| 26 | + ... session={"type": "realtime", "output_modalities": ["text"]} |
| 27 | + ... ) |
| 28 | + ... await conn.conversation.item.create( |
| 29 | + ... item={ |
| 30 | + ... "type": "message", |
| 31 | + ... "role": "user", |
| 32 | + ... "content": [{"type": "input_text", "text": "Say hello!"}], |
| 33 | + ... } |
| 34 | + ... ) |
| 35 | + ... await conn.response.create() |
| 36 | + ... async for event in conn: |
| 37 | + ... if event.type == "response.output_text.delta": |
| 38 | + ... print(event.delta, end="") |
| 39 | + ... elif event.type == "response.done": |
| 40 | + ... break |
| 41 | + >>> asyncio.run(main()) |
| 42 | +""" |
| 43 | + |
| 44 | +import re |
| 45 | + |
| 46 | +from typing_extensions import override |
| 47 | + |
| 48 | +from uipath.llm_client.settings import UiPathAPIConfig, UiPathBaseSettings |
| 49 | +from uipath.llm_client.settings.constants import RoutingMode |
| 50 | +from uipath.llm_client.settings.llmgateway.auth import LLMGatewayS2SAuth |
| 51 | + |
| 52 | +try: |
| 53 | + from openai import AsyncOpenAI, OpenAI |
| 54 | + from openai._types import Headers, Omit, Query, omit |
| 55 | + from openai.resources.realtime import AsyncRealtime, Realtime |
| 56 | + from openai.resources.realtime.realtime import ( |
| 57 | + AsyncRealtimeConnectionManager, |
| 58 | + RealtimeConnectionManager, |
| 59 | + ) |
| 60 | + from openai.types.websocket_connection_options import WebSocketConnectionOptions |
| 61 | +except ImportError as e: |
| 62 | + raise ImportError( |
| 63 | + "The 'openai' extra is required for realtime support. " |
| 64 | + "Install it with: uv add uipath-llm-client[openai]" |
| 65 | + ) from e |
| 66 | + |
| 67 | +# The gateway expects the native-OpenAI realtime vendor segment in the path. |
| 68 | +DEFAULT_REALTIME_VENDOR = "nativeopenai" |
| 69 | +# Passthrough api_type segment for the realtime endpoint (not a normalized ApiType). |
| 70 | +REALTIME_API_TYPE = "realtime" |
| 71 | + |
| 72 | + |
| 73 | +def _realtime_api_config(vendor_type: str) -> UiPathAPIConfig: |
| 74 | + return UiPathAPIConfig( |
| 75 | + api_type=REALTIME_API_TYPE, |
| 76 | + routing_mode=RoutingMode.PASSTHROUGH, |
| 77 | + vendor_type=vendor_type, |
| 78 | + ) |
| 79 | + |
| 80 | + |
| 81 | +def build_realtime_ws_base_url( |
| 82 | + settings: UiPathBaseSettings, |
| 83 | + *, |
| 84 | + model_name: str, |
| 85 | + vendor_type: str = DEFAULT_REALTIME_VENDOR, |
| 86 | +) -> str: |
| 87 | + """Build the ``websocket_base_url`` to hand to the OpenAI SDK. |
| 88 | +
|
| 89 | + The SDK appends ``/realtime`` to ``websocket_base_url`` when connecting, so |
| 90 | + this strips the trailing ``/realtime`` produced by ``build_base_url`` and |
| 91 | + converts the scheme to ``wss``/``ws``. |
| 92 | + """ |
| 93 | + url = settings.build_base_url( |
| 94 | + model_name=model_name, api_config=_realtime_api_config(vendor_type) |
| 95 | + ) |
| 96 | + suffix = f"/{REALTIME_API_TYPE}" |
| 97 | + if url.endswith(suffix): |
| 98 | + url = url[: -len(suffix)] |
| 99 | + # Collapse accidental double slashes (e.g. a trailing slash in base_url), |
| 100 | + # leaving the scheme's own "//" intact. |
| 101 | + scheme, sep, rest = url.partition("://") |
| 102 | + if sep: |
| 103 | + url = f"{scheme}://{re.sub(r'/{2,}', '/', rest)}" |
| 104 | + if url.startswith("https://"): |
| 105 | + return "wss://" + url[len("https://") :] |
| 106 | + if url.startswith("http://"): |
| 107 | + return "ws://" + url[len("http://") :] |
| 108 | + return url |
| 109 | + |
| 110 | + |
| 111 | +def _prepare_connection( |
| 112 | + client: "OpenAI | AsyncOpenAI", |
| 113 | + settings: UiPathBaseSettings, |
| 114 | + *, |
| 115 | + model: str, |
| 116 | + vendor_type: str, |
| 117 | + extra_headers: Headers, |
| 118 | +) -> Headers: |
| 119 | + """Configure ``client`` for a gateway realtime connection to ``model``. |
| 120 | +
|
| 121 | + Sets ``websocket_base_url`` and the S2S bearer token (read straight from the |
| 122 | + gateway auth handler), and returns the ``X-UiPath-*`` routing headers merged |
| 123 | + with ``extra_headers`` for the WebSocket upgrade. |
| 124 | + """ |
| 125 | + client.websocket_base_url = build_realtime_ws_base_url( |
| 126 | + settings, model_name=model, vendor_type=vendor_type |
| 127 | + ) |
| 128 | + auth = settings.build_auth_pipeline() |
| 129 | + if isinstance(auth, LLMGatewayS2SAuth) and auth.access_token: |
| 130 | + client.api_key = auth.access_token |
| 131 | + merged: dict[str, object] = { |
| 132 | + **settings.build_auth_headers( |
| 133 | + model_name=model, api_config=_realtime_api_config(vendor_type) |
| 134 | + ) |
| 135 | + } |
| 136 | + if extra_headers: |
| 137 | + merged.update(extra_headers) |
| 138 | + return merged # type: ignore[return-value] |
| 139 | + |
| 140 | + |
| 141 | +class _UiPathRealtime(Realtime): |
| 142 | + """``Realtime`` resource that routes ``connect()`` through the gateway.""" |
| 143 | + |
| 144 | + def __init__( |
| 145 | + self, client: OpenAI, *, settings: UiPathBaseSettings, model: str, vendor_type: str |
| 146 | + ) -> None: |
| 147 | + super().__init__(client) |
| 148 | + self._uipath_settings = settings |
| 149 | + self._uipath_model = model |
| 150 | + self._uipath_vendor = vendor_type |
| 151 | + |
| 152 | + @override |
| 153 | + def connect( |
| 154 | + self, |
| 155 | + *, |
| 156 | + call_id: str | Omit = omit, |
| 157 | + model: str | Omit = omit, |
| 158 | + extra_query: Query = {}, |
| 159 | + extra_headers: Headers = {}, |
| 160 | + websocket_connection_options: WebSocketConnectionOptions = {}, |
| 161 | + ) -> RealtimeConnectionManager: |
| 162 | + resolved_model: str = self._uipath_model if model is omit else model # type: ignore[assignment] |
| 163 | + merged_headers = _prepare_connection( |
| 164 | + self._client, |
| 165 | + self._uipath_settings, |
| 166 | + model=resolved_model, |
| 167 | + vendor_type=self._uipath_vendor, |
| 168 | + extra_headers=extra_headers, |
| 169 | + ) |
| 170 | + return super().connect( |
| 171 | + call_id=call_id, |
| 172 | + model=resolved_model, |
| 173 | + extra_query=extra_query, |
| 174 | + extra_headers=merged_headers, |
| 175 | + websocket_connection_options=websocket_connection_options, |
| 176 | + ) |
| 177 | + |
| 178 | + |
| 179 | +class _UiPathAsyncRealtime(AsyncRealtime): |
| 180 | + """``AsyncRealtime`` resource that routes ``connect()`` through the gateway.""" |
| 181 | + |
| 182 | + def __init__( |
| 183 | + self, client: AsyncOpenAI, *, settings: UiPathBaseSettings, model: str, vendor_type: str |
| 184 | + ) -> None: |
| 185 | + super().__init__(client) |
| 186 | + self._uipath_settings = settings |
| 187 | + self._uipath_model = model |
| 188 | + self._uipath_vendor = vendor_type |
| 189 | + |
| 190 | + @override |
| 191 | + def connect( |
| 192 | + self, |
| 193 | + *, |
| 194 | + call_id: str | Omit = omit, |
| 195 | + model: str | Omit = omit, |
| 196 | + extra_query: Query = {}, |
| 197 | + extra_headers: Headers = {}, |
| 198 | + websocket_connection_options: WebSocketConnectionOptions = {}, |
| 199 | + ) -> AsyncRealtimeConnectionManager: |
| 200 | + resolved_model: str = self._uipath_model if model is omit else model # type: ignore[assignment] |
| 201 | + merged_headers = _prepare_connection( |
| 202 | + self._client, |
| 203 | + self._uipath_settings, |
| 204 | + model=resolved_model, |
| 205 | + vendor_type=self._uipath_vendor, |
| 206 | + extra_headers=extra_headers, |
| 207 | + ) |
| 208 | + return super().connect( |
| 209 | + call_id=call_id, |
| 210 | + model=resolved_model, |
| 211 | + extra_query=extra_query, |
| 212 | + extra_headers=merged_headers, |
| 213 | + websocket_connection_options=websocket_connection_options, |
| 214 | + ) |
0 commit comments