Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ prompts = [

with LMClient(
model="openai/gpt-4.1-mini",
max_parallel_requests=32,
rpm=500,
tpm=100_000,
) as client:
Expand All @@ -84,6 +85,10 @@ One failing request does not abort the whole batch. Failed items are `None` in
`batch.results`; the exception is in `batch.errors[i]`. This is deliberate: a single
provider error should not wipe out a long experiment.

For large Python batches, set `max_parallel_requests` explicitly. That enables
bounded in-flight scheduling for `generate_batch`; when it is unset, the method
may start work for the full batch up front.

This code works in Jupyter notebooks without any `asyncio` setup. The sync API runs a
background event loop so you do not have to.

Expand Down Expand Up @@ -197,7 +202,7 @@ import json
from infermesh import LMClient

with open("results.jsonl", "w") as out, \
LMClient(model="openai/gpt-4.1-mini") as client:
LMClient(model="openai/gpt-4.1-mini", max_parallel_requests=32) as client:

def save(index: int, result, error) -> None:
row = {"index": index}
Expand Down
9 changes: 7 additions & 2 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ prompts = [

with LMClient(
model="openai/gpt-4.1-mini",
max_parallel_requests=32,
rpm=500,
tpm=100_000,
) as client:
Expand All @@ -48,6 +49,10 @@ By default, one failing request does not abort the whole batch. Failed items are
stored as `None` in `batch.results`, and the corresponding exception is stored
in `batch.errors[i]`.

For large Python batches, set `max_parallel_requests` explicitly. That enables
bounded in-flight scheduling for `generate_batch`; when it is unset, the method
may start work for the full batch up front.

### Crash-Resilient Batches with `on_result`

For large batches, you may want to write results to disk as each request
Expand All @@ -65,7 +70,7 @@ from infermesh import LMClient
prompts = [...] # large list

with open("results.jsonl", "w") as out, \
LMClient(model="openai/gpt-4.1-mini") as client:
LMClient(model="openai/gpt-4.1-mini", max_parallel_requests=32) as client:

def save(index: int, result, error) -> None:
row = {"index": index}
Expand Down Expand Up @@ -102,7 +107,7 @@ if output_path.exists():
pending = [(i, p) for i, p in enumerate(prompts) if i not in done]

with open(output_path, "a") as out, \
LMClient(model="openai/gpt-4.1-mini") as client:
LMClient(model="openai/gpt-4.1-mini", max_parallel_requests=32) as client:

def save(batch_idx: int, result, error) -> None:
orig_idx = pending[batch_idx][0]
Expand Down
10 changes: 9 additions & 1 deletion src/infermesh/_client_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,17 @@ def _validate_init_args(
api_key: str | None,
deployments: dict[str, DeploymentConfig | dict[str, Any]] | None,
endpoint: EndpointType,
max_parallel_requests: int | None,
) -> None:
"""Validate top-level constructor arguments."""

validate_endpoint(endpoint)
if model is None:
raise ValueError("``model`` is required.")
if max_parallel_requests is not None and max_parallel_requests < 1:
raise ValueError(
"``max_parallel_requests`` must be ``None`` or a positive integer."
)
if deployments is not None and (api_base is not None or api_key is not None):
raise ValueError(
"``api_base`` and ``api_key`` cannot be set when ``deployments`` "
Expand Down Expand Up @@ -199,11 +204,14 @@ async def _dispatch_with_controls(
request_callable: Any,
request_args: tuple[Any, ...],
request_kwargs: dict[str, Any],
queue_started_at: float | None = None,
) -> tuple[Any, RequestMetrics]:
"""Run a request with concurrency and rate-limiting controls."""

state = self._get_loop_state()
queue_started = time.perf_counter()
queue_started = (
queue_started_at if queue_started_at is not None else time.perf_counter()
)
handle: RateLimiterAcquisitionHandle | None = None
semaphore_context = (
state.semaphore if state.semaphore is not None else _null_async_context()
Expand Down
Loading
Loading