Skip to content

Commit e834a60

Browse files
authored
feat: add llm request throttling (#1068)
1 parent 4877a0b commit e834a60

5 files changed

Lines changed: 519 additions & 21 deletions

File tree

src/uipath/_cli/cli_eval.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from uipath._events._event_bus import EventBus
2020
from uipath._utils._bindings import ResourceOverwritesContext
2121
from uipath.eval._helpers import auto_discover_entrypoint
22+
from uipath.platform.chat import set_llm_concurrency
2223
from uipath.platform.common import UiPathConfig
2324
from uipath.telemetry._track import flush_events
2425
from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter
@@ -106,6 +107,12 @@ def setup_reporting_prereq(no_report: bool) -> bool:
106107
type=click.Path(exists=False),
107108
help="File path where traces will be written in JSONL format",
108109
)
110+
@click.option(
111+
"--max-llm-concurrency",
112+
type=int,
113+
default=20,
114+
help="Maximum concurrent LLM requests (default: 20)",
115+
)
109116
def eval(
110117
entrypoint: str | None,
111118
eval_set: str | None,
@@ -118,6 +125,7 @@ def eval(
118125
report_coverage: bool,
119126
model_settings_id: str,
120127
trace_file: str | None,
128+
max_llm_concurrency: int,
121129
) -> None:
122130
"""Run an evaluation set against the agent.
123131
@@ -131,7 +139,11 @@ def eval(
131139
enable_mocker_cache: Enable caching for LLM mocker responses
132140
report_coverage: Report evaluation coverage
133141
model_settings_id: Model settings ID to override agent settings
142+
trace_file: File path where traces will be written in JSONL format
143+
max_llm_concurrency: Maximum concurrent LLM requests
134144
"""
145+
set_llm_concurrency(max_llm_concurrency)
146+
135147
should_register_progress_reporter = setup_reporting_prereq(no_report)
136148

137149
result = Middlewares.next(

src/uipath/platform/chat/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
ToolParametersDefinition,
3131
ToolPropertyDefinition,
3232
)
33+
from .llm_throttle import get_llm_semaphore, set_llm_concurrency
3334

3435
__all__ = [
3536
# Conversations Service
@@ -39,6 +40,9 @@
3940
"EmbeddingModels",
4041
"UiPathLlmChatService",
4142
"UiPathOpenAIService",
43+
# LLM Throttling
44+
"get_llm_semaphore",
45+
"set_llm_concurrency",
4246
# LLM Gateway Models
4347
"ToolPropertyDefinition",
4448
"ToolParametersDefinition",

src/uipath/platform/chat/_llm_gateway_service.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
ToolChoice,
3232
ToolDefinition,
3333
)
34+
from .llm_throttle import get_llm_semaphore
3435

3536
# Common constants
3637
API_VERSION = "2024-10-21" # Standard API version for OpenAI-compatible endpoints
@@ -189,13 +190,14 @@ async def embeddings(
189190
)
190191
endpoint = Endpoint("/" + endpoint)
191192

192-
response = await self.request_async(
193-
"POST",
194-
endpoint,
195-
json={"input": input},
196-
params={"api-version": API_VERSION},
197-
headers=DEFAULT_LLM_HEADERS,
198-
)
193+
async with get_llm_semaphore():
194+
response = await self.request_async(
195+
"POST",
196+
endpoint,
197+
json={"input": input},
198+
params={"api-version": API_VERSION},
199+
headers=DEFAULT_LLM_HEADERS,
200+
)
199201

200202
return TextEmbedding.model_validate(response.json())
201203

@@ -315,13 +317,14 @@ class Country(BaseModel):
315317
# Use provided dictionary format directly
316318
request_body["response_format"] = response_format
317319

318-
response = await self.request_async(
319-
"POST",
320-
endpoint,
321-
json=request_body,
322-
params={"api-version": API_VERSION},
323-
headers=DEFAULT_LLM_HEADERS,
324-
)
320+
async with get_llm_semaphore():
321+
response = await self.request_async(
322+
"POST",
323+
endpoint,
324+
json=request_body,
325+
params={"api-version": API_VERSION},
326+
headers=DEFAULT_LLM_HEADERS,
327+
)
325328

326329
return ChatCompletion.model_validate(response.json())
327330

@@ -546,13 +549,14 @@ class Country(BaseModel):
546549
"X-UiPath-LlmGateway-NormalizedApi-ModelName": model,
547550
}
548551

549-
response = await self.request_async(
550-
"POST",
551-
endpoint,
552-
json=request_body,
553-
params={"api-version": NORMALIZED_API_VERSION},
554-
headers=headers,
555-
)
552+
async with get_llm_semaphore():
553+
response = await self.request_async(
554+
"POST",
555+
endpoint,
556+
json=request_body,
557+
params={"api-version": NORMALIZED_API_VERSION},
558+
headers=headers,
559+
)
556560

557561
return ChatCompletion.model_validate(response.json())
558562

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""LLM request throttling utilities.
2+
3+
This module provides concurrency control for LLM API requests to prevent
4+
overwhelming the system with simultaneous calls.
5+
"""
6+
7+
import asyncio
8+
9+
DEFAULT_LLM_CONCURRENCY = 20
10+
_llm_concurrency_limit: int = DEFAULT_LLM_CONCURRENCY
11+
_llm_semaphore: asyncio.Semaphore | None = None
12+
_llm_semaphore_loop: asyncio.AbstractEventLoop | None = None
13+
14+
15+
def get_llm_semaphore() -> asyncio.Semaphore:
16+
"""Get the LLM semaphore, creating with configured limit if not set.
17+
18+
The semaphore is recreated if called from a different event loop than
19+
the one it was originally created in. This prevents "bound to a different
20+
event loop" errors when using multiple asyncio.run() calls.
21+
"""
22+
global _llm_semaphore, _llm_semaphore_loop
23+
24+
loop = asyncio.get_running_loop()
25+
26+
# Recreate semaphore if it doesn't exist or if the event loop changed
27+
if _llm_semaphore is None or _llm_semaphore_loop is not loop:
28+
_llm_semaphore = asyncio.Semaphore(_llm_concurrency_limit)
29+
_llm_semaphore_loop = loop
30+
31+
return _llm_semaphore
32+
33+
34+
def set_llm_concurrency(limit: int) -> None:
35+
"""Set the max concurrent LLM requests. Call before making any LLM calls.
36+
37+
Args:
38+
limit: Maximum number of concurrent LLM requests allowed (must be > 0).
39+
40+
Raises:
41+
ValueError: If limit is less than 1.
42+
"""
43+
if limit < 1:
44+
raise ValueError("LLM concurrency limit must be at least 1")
45+
46+
global _llm_concurrency_limit, _llm_semaphore, _llm_semaphore_loop
47+
_llm_concurrency_limit = limit
48+
_llm_semaphore = None
49+
_llm_semaphore_loop = None

0 commit comments

Comments
 (0)