Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,29 @@ infermesh generate \

# Generate — from a JSONL file, results to another JSONL file
# Each input line: {"prompt": "..."} or {"messages": [...]} or {"responses_input": "..."}
# Output includes an _index field so interrupted runs can be resumed.
# Output includes an _index field; a checkpoint file results.checkpoint.sqlite is kept.
infermesh generate \
--model openai/gpt-4.1-mini \
--api-base https://api.openai.com/v1 \
--input-jsonl prompts.jsonl \
--output-jsonl results.jsonl

# Resume an interrupted run — skips already-completed rows and appends new ones
# Resume an interrupted run — reads results.checkpoint.sqlite, skips settled rows, appends the rest
infermesh generate \
--model openai/gpt-4.1-mini \
--api-base https://api.openai.com/v1 \
--input-jsonl prompts.jsonl \
--output-jsonl results.jsonl \
--resume

# Custom mapper — transform raw source records before sending to the model
# The mapper receives each record as a dict; must return {"input": ..., "metadata": ...}
infermesh generate \
--model openai/gpt-4.1-mini \
--input-jsonl dataset.jsonl \
--output-jsonl results.jsonl \
--mapper mypackage.prompts:build_prompt

# Create embeddings
infermesh embed \
--model text-embedding-3-small \
Expand Down
82 changes: 74 additions & 8 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ with open("results.jsonl", "w") as out, \
The callback receives:

| Argument | Type | Notes |
|---|---|---|
| --- | --- | --- |
| `index` | `int` | Position in `input_batch` (global item index, not micro-batch index) |
| `result` | `GenerationResult \| EmbeddingResult \| TranscriptionResult \| None` | `None` on failure |
| `error` | `BaseException \| None` | `None` on success |
Expand Down Expand Up @@ -192,9 +192,20 @@ Input rows for `infermesh generate` may contain any of the following fields:

### Resuming an Interrupted Run

If a long batch is interrupted (Ctrl-C, OOM, network loss), the output file
contains all rows completed so far. Re-run with `--resume` to skip them and
append only the remaining rows:
Every file-backed run writes a checkpoint file alongside the output:

```
results.jsonl ← your output (human-readable)
results.checkpoint.sqlite ← checkpoint file (resume state)
```

By default the checkpoint stays beside the output for portability and
discoverability. If you want the checkpoint on local scratch instead, pass
`--checkpoint-dir DIR` or set `INFERMESH_CHECKPOINT_DIR=DIR` before the run.
When you resume later, reuse the same checkpoint-dir setting.

If a long batch is interrupted (Ctrl-C, OOM, network loss), re-run with
`--resume` to skip settled items and append only the remaining rows:

```bash
# First attempt — interrupted partway through
Expand All @@ -204,7 +215,7 @@ infermesh generate \
--input-jsonl prompts.jsonl \
--output-jsonl results.jsonl

# Resume — reads results.jsonl, skips completed _index values, appends the rest
# Resume — reads results.checkpoint.sqlite, skips settled items, appends the rest
infermesh generate \
--model openai/gpt-4.1-mini \
--api-base https://api.openai.com/v1 \
Expand All @@ -213,9 +224,64 @@ infermesh generate \
--resume
```

Results are written to disk one row at a time as each request completes, so
a crash only loses the requests that were in-flight at that moment.
`--resume` requires `--output-jsonl`.
Each source row is tracked by its content fingerprint plus its occurrence
count, so duplicate rows are resumed independently. Re-ordering the input file
before resuming is safe, and resumed rows keep the original `_index` values
from the first run. Removing rows, adding rows, or deduplicating the input
before resuming is not supported. Results are written to disk one row at a
time as each request completes, so a crash only loses the requests that were
in-flight at that moment.

The workflow keeps a rolling in-flight window, so each settled row immediately
admits the next pending row until the source is exhausted. Output rows are
written in completion order, not input order.
Row-level generation failures become per-item `error` rows and do not abort
their siblings, but setup and workflow failures still stop the command.
Use the `_index` field to re-sort after the run if needed.

`--resume` requires `--output-jsonl` and the matching checkpoint file from a
previous file-backed run. If the checkpoint is missing,
if the input and output paths are the same file, if the output file is missing
any settled `_index` rows recorded in the checkpoint, or if the current input
does not match the original row occurrences, infermesh fails fast instead of
guessing.

### Custom Input Mapping with `--mapper`

Use `--mapper` to transform raw source records before they are sent to the
model. This lets you drive generation from any record format without
preprocessing the source file.

```bash
infermesh generate \
--model openai/gpt-4.1-mini \
--input-jsonl dataset.jsonl \
--output-jsonl results.jsonl \
--mapper mypackage.prompts:build_prompt
```

The mapper is imported as `package.module:function`. The function receives
each raw source record as a `dict` and must return a `dict` with at least an
`"input"` key:

```python
# mypackage/prompts.py
def build_prompt(record: dict) -> dict:
return {
"input": f"Classify the following text:\n\n{record['body']}",
"metadata": {"doc_id": record["id"]},
}
```

| Return key | Required | Notes |
|---|---|---|
| `"input"` | Yes | Passed directly to the generation endpoint |
| `"metadata"` | No | Copied into the output row under `"metadata"` when it is a JSON-serializable dict |

Extra keys beyond `"input"` and `"metadata"` are ignored. Mapper failures
become per-item error rows — they do not abort the run. If you later resume a
file-backed run, infermesh requires the same mapper implementation that wrote
the original checkpoint file.

## Generate Text

Expand Down
34 changes: 34 additions & 0 deletions src/infermesh/_cli_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ def _validate_cli_deployments_toml(
) -> None:
"""Reject plaintext secrets in CLI-loaded router deployment config."""

if "deployments" not in loaded:
raise ValueError(
f"TOML file {deployments_toml_path!r} is missing a [deployments] table."
)
for name, config in loaded["deployments"].items():
forbidden_path = _find_forbidden_secret_path(
config,
Expand Down Expand Up @@ -347,3 +351,33 @@ def _maybe_parse_json(value: str) -> Any:
return json.loads(value)
except json.JSONDecodeError:
return None


def _build_generation_record(
orig_idx: int,
result: Any,
error: BaseException | None,
*,
parse_json: bool,
) -> dict[str, Any]:
"""Convert one generation result into its JSONL output shape."""

if result is None:
return {
"_index": orig_idx,
"output_text": None,
"output_parsed": None,
"token_usage": None,
"request_id": None,
"finish_reason": None,
"error": str(error) if error else "unknown error",
}
return {
"_index": orig_idx,
"output_text": result.output_text,
"output_parsed": _maybe_parse_json(result.output_text) if parse_json else None,
"token_usage": _token_usage_to_dict(result),
"request_id": result.request_id,
"finish_reason": result.finish_reason,
"error": None,
}
9 changes: 8 additions & 1 deletion src/infermesh/_client_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import threading
import time
import weakref
from collections.abc import Coroutine
from dataclasses import dataclass
from typing import Any, Literal, cast
from typing import Any, Literal, TypeVar, cast
from urllib.parse import urlparse

from infermesh._utils import (
Expand All @@ -36,6 +37,7 @@
"cost-based-routing",
"usage-based-routing-v2",
]
T = TypeVar("T")


@dataclass(slots=True)
Expand Down Expand Up @@ -197,6 +199,11 @@ def _initialize_runtime_state(
self._deployments = self._coerce_deployments(deployments)
self._closed = False

def _run_sync(self, coroutine: Coroutine[Any, Any, T]) -> T:
"""Run a coroutine on the client-owned background event loop."""

return self._sync_runner.run(coroutine)

async def _dispatch_with_controls(
self,
*,
Expand Down
5 changes: 5 additions & 0 deletions src/infermesh/_workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Internal workflow package."""

from .engine import run_generate_workflow

__all__ = ["run_generate_workflow"]
Loading
Loading