Guide for extending llm-batch-pipeline with custom plugins, backends, and stages.
┌─────────────────────────────────────────────────────────┐
│ CLI (cli.py) │
│ argparse → config builder → stage dispatch │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────┐
│ Pipeline Orchestrator (pipeline.py) │
│ Stage runner, context, retry, resume, dry-run │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────┐
│ Stage Wiring (stages.py) │
│ 10 stage functions, each receives PipelineContext │
└──┬───────┬───────┬───────┬───────┬───────┬──────────────┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
Plugins Filters Transforms Render Backends Evaluation
-
Plugin system — Python ABCs (no metaclass magic, no decorators, no framework). Users subclass
FileReader,Filter,Transformer,OutputTransformer. -
Pipeline orchestrator — Custom lightweight runner (~200 lines). No framework dependencies. Supports retry, resume from stage, dry-run.
-
Stage communication — Stages share state via
PipelineContext.artifacts(dict) andPipelineContext.files/PipelineContext.filtered_files(lists ofParsedFile). -
Schema convention — Schema files must define
class mySchema(BaseModel). Loaded dynamically viaimportlib.util.spec_from_file_location. -
Backend abstraction —
BatchBackendABC withsubmit()method. OpenAI and Ollama implementations share common validation/IO utilities inbackends/common.py.
from pathlib import Path
from llm_batch_pipeline.plugins.base import FileReader, ParsedFile
class CsvReader(FileReader):
def can_read(self, path: Path) -> bool:
return path.suffix.lower() == ".csv"
def read(self, path: Path) -> ParsedFile:
content = path.read_text(encoding="utf-8")
return ParsedFile(
filename=path.name,
raw_path=path,
content=content,
metadata={"rows": content.count("\n")},
)
def package_for_llm(self, parsed: ParsedFile) -> str:
# Format content for the LLM prompt
return f"CSV Data:\n{parsed.content}"from llm_batch_pipeline.plugins.base import Filter, ParsedFile
class MinRowFilter(Filter):
def __init__(self, min_rows: int = 2):
self.min_rows = min_rows
def apply(self, parsed: ParsedFile) -> tuple[bool, str]:
rows = parsed.metadata.get("rows", 0)
if rows < self.min_rows:
return False, f"Too few rows ({rows} < {self.min_rows})"
return True, "passed"from llm_batch_pipeline.plugins.base import Transformer, ParsedFile
class NormalizeHeadersTransformer(Transformer):
def apply(self, parsed: ParsedFile) -> ParsedFile:
lines = parsed.content.split("\n")
if lines:
lines[0] = lines[0].lower()
parsed.content = "\n".join(lines)
return parsed# my_plugin/schema.py
from pydantic import BaseModel, Field
from typing import Literal
class SentimentResult(BaseModel):
sentiment: Literal["positive", "negative", "neutral"] = Field(
description="Overall sentiment of the text."
)
confidence: float = Field(
ge=0.0, le=1.0,
description="Confidence score."
)
reason: str = Field(description="Explanation.")
# REQUIRED: `schema_loader` expects a symbol named `mySchema`
mySchema = SentimentResult# my_plugin/plugin.py
from llm_batch_pipeline.plugins.registry import PluginSpec, register_plugin
def register():
register_plugin(PluginSpec(
name="csv_sentiment",
reader=CsvReader(),
pre_filters=[MinRowFilter(min_rows=2)],
transformers=[NormalizeHeadersTransformer()],
post_filters=[],
))Add your module path to _BUILTIN_MODULES in src/llm_batch_pipeline/plugins/registry.py:
_BUILTIN_MODULES = (
"llm_batch_pipeline.examples.spam_detection.plugin",
"llm_batch_pipeline.examples.gdpr_detection.plugin",
"my_plugin.plugin", # Add your plugin
)Or register programmatically before calling the pipeline:
from my_plugin.plugin import register
register()The plugin system controls how inputs are parsed and preprocessed. Prompt/instructions, schema validation, and evaluation configuration are provided per batch run.
The LLM instructions text is loaded from:
--prompt-file(CLI) orbatches/<batch>/prompt.txt(file in the batch directory)- a built-in fallback prompt when neither exists
For submit-time overrides, use --prompt-override / --prompt-override-file.
The backends apply this by rewriting body.instructions for every request right before submission.
Your plugin’s FileReader.package_for_llm() controls the per-file content injected as the input_text part of the prompt.
Provide a schema via one of these mechanisms:
--schema-file path/to/schema.py- or a
schema.pyfile placed in the batch directory (batches/<batch>/schema.py)
Your schema file must define mySchema (typically class mySchema(BaseModel): ... or mySchema = SomeModel).
The pipeline converts it to a strict JSON schema for structured outputs and then validates the LLM JSON against it.
Evaluation field mapping defaults to label and confidence. To evaluate other schema field names, use --label-field and --confidence-field.
Note: evaluation auto-detection of label/confidence from the schema runs only when --schema-file is explicitly provided.
If you rely solely on batches/<batch>/schema.py, set --label-field / --confidence-field explicitly.
The built-in evaluate stage computes confusion matrix + precision/recall/F1 + accuracy, and optionally ROC/AUC for binary classification when --positive-class is set (and confidence is available).
It uses:
- Ground truth from
--ground-truth-csvorbatches/<batch>/evaluation/ground-truth.csv - Category map from
--category-maporbatches/<batch>/evaluation/category-map.json
Supported 'custom evaluation' via plugins:
- Implement
OutputTransformerin your plugin to reshapevalidated_rowsbefore evaluation/export (e.g. rename/move fields so they match--label-field/--confidence-field).
If you need entirely different evaluation metrics/logic:
- modify
src/llm_batch_pipeline/evaluation.py(evaluate()/EvalReport) - and update
src/llm_batch_pipeline/stages.pystage_evaluate()(plussrc/llm_batch_pipeline/export.pyif the report format changes)
| Method | Signature | Description |
|---|---|---|
can_read |
(path: Path) -> bool |
Return True if this reader handles the file |
read |
(path: Path) -> ParsedFile |
Parse file into a ParsedFile |
package_for_llm |
(parsed: ParsedFile) -> str |
Serialize content for the LLM user message |
| Method | Signature | Description |
|---|---|---|
name |
property -> str |
Human-readable name (default: class name) |
apply |
(parsed: ParsedFile) -> tuple[bool, str] |
Return (keep, reason) |
| Method | Signature | Description |
|---|---|---|
name |
property -> str |
Human-readable name (default: class name) |
apply |
(parsed: ParsedFile) -> ParsedFile |
Transform and return (possibly new) ParsedFile |
| Method | Signature | Description |
|---|---|---|
name |
property -> str |
Human-readable name |
apply |
(rows: list[dict]) -> list[dict] |
Transform validated LLM output rows |
Each stage function has the signature: (ctx: PipelineContext) -> StageResult
| Stage | Reads From | Writes To |
|---|---|---|
discover |
ctx.config.input_dir |
ctx.files |
filter_1 |
ctx.files |
ctx.filtered_files |
transform |
ctx.filtered_files |
ctx.filtered_files (mutated) |
filter_2 |
ctx.filtered_files |
ctx.filtered_files |
render |
ctx.filtered_files |
ctx.artifacts["shard_paths"] |
human_review |
ctx.artifacts |
(blocks until approved) |
submit |
ctx.artifacts["shard_paths"] |
ctx.artifacts["output_files"] |
validate |
ctx.artifacts["output_files"] |
ctx.artifacts["validated_rows"] |
evaluate |
ctx.artifacts["validated_rows"] |
ctx.artifacts["eval_report"] |
export |
ctx.artifacts["validated_rows"], ctx.artifacts["eval_report"] |
XLSX files |
@dataclass
class PipelineContext:
batch_dir: Path
config: BatchConfig
console: Console
metrics: MetricsCollector
files: list[ParsedFile] # Set by discover
filtered_files: list[ParsedFile] # Set by filter stages
artifacts: dict[str, Any] # Shared between stagesuv run pytest # All tests
uv run pytest -v # Verbose
uv run pytest -k "spam" # Filter by name
uv run pytest --tb=short # Short tracebacksTests live in tests/ with one test file per source module. Use fixtures from tests/conftest.py for sample .eml data.
# tests/test_my_plugin.py
from pathlib import Path
from my_plugin.plugin import CsvReader, MinRowFilter
class TestCsvReader:
def test_can_read_csv(self):
reader = CsvReader()
assert reader.can_read(Path("test.csv"))
assert not reader.can_read(Path("test.txt"))
def test_read_csv(self, tmp_path):
csv_file = tmp_path / "data.csv"
csv_file.write_text("a,b\n1,2\n3,4\n", encoding="utf-8")
reader = CsvReader()
parsed = reader.read(csv_file)
assert parsed.filename == "data.csv"
assert parsed.metadata["rows"] == 3- Prefer fixture-driven tests with real sample data
- Each test file mirrors the source file it tests
- Use
tmp_pathfor filesystem tests - Use
pytest.approx()for floating-point comparisons
uv run ruff check src/ tests/ # Check
uv run ruff check --fix src/ tests/ # Auto-fixConfiguration in pyproject.toml:
[tool.ruff]
target-version = "py313"
line-length = 120
[tool.ruff.lint]
select = ["E", "F", "W", "I", "N", "UP", "B", "A", "SIM"]uv run pylint src/llm_batch_pipeline/Configuration in .pylintrc. Target: 10.00/10.
Enforced by ruff I001:
from __future__ import annotations- Standard library
- Third-party
- Local package
- Module-level loggers:
logger = logging.getLogger("llm_batch_pipeline.<module>") - Schema convention:
mySchema = MyActualModelClass - Short variable names permitted:
cm,wb,ws,fh,pf,h,n,d,t(see.pylintrcgood-names)
- Stage functions catch exceptions and return
StageResult(status="failed", error=str(e)) - Broad
except Exceptionis acceptable in plugin auto-discovery and per-request processing (resilience) - Always log drop reasons as structured metadata
Use write_json_atomic() and write_text_atomic() from backends/common.py for output files to prevent partial writes.
Three GitHub Actions workflows in .github/workflows/:
| Workflow | Tool | Purpose |
|---|---|---|
ruff.yml |
Ruff | Import sorting, style, modern Python |
pylint.yml |
Pylint | Code quality, naming, complexity |
semgrep.yml |
Semgrep | Security patterns, SAST |
All three must pass before merging.