Skip to content

feat: streaming validation — per-chunk requirement checking with early exit #891

@planetf1

Description

@planetf1

Problem

All validation in mellea requires a fully-computed ModelOutputThunk. Generation must complete before any constraint is evaluated.

This doesn't match how inference actually works. Backends stream output incrementally — token by token for text, frame by frame for audio, segment by segment for structured data. mellea throws that away and waits for the end.

In practice:

  • A generation that fails on its first sentence runs to completion anyway.
  • Audio that goes wrong in the first few seconds can't be caught until the full clip is done.
  • There's no way to feed partial results to a UI or downstream system as generation proceeds.

Requirements

  1. Incremental validation
  2. Early exit on definitive failure
  3. Deferred judgement when there isn't enough data yet
  4. Backward compatibility — existing requirements work unchanged, streaming is opt-in
  5. Stateful validation — a requirement can accumulate information across chunks within a single attempt
  6. Multiple requirements checkable concurrently per chunk
  7. Configurable chunking boundary (sentence, word, paragraph, audio segment…) set at the call site
  8. A shared event vocabulary for consumers — UIs, m serve, downstream packages

Proposed approach

The work breaks into two layers.

Layer 1 — requirements get streaming support

Requirement gets a new method: stream_validate(backend, ctx, chunk). It receives one chunk at a time and returns a PartialValidationResult — a tri-state value of "pass", "fail", or "unknown". The default returns "unknown" every chunk and falls back to the existing validate() on the complete output at stream end, so existing requirements need no changes.

This default behaviour is correct for requirements that need a full output to do their job — LLM-as-judge and ALoRA requirements return "unknown" throughout streaming and receive the complete output via validate() at stream end. This is the intended path for any requirement that cannot make a meaningful incremental judgement.

The method is named stream_validate rather than avalidate. The a prefix in mellea means "async version of the same operation" — this is a different operation with different semantics (incremental input, tri-state result, stateful between calls) and should be clearly distinct from both validate() and mfuncs.avalidate().

One design question to settle: during streaming, ctx does not yet contain the model output — the MOT isn't complete, so a requirement cannot read accumulated content through context. A requirement that needs to inspect what has been generated so far must hold that state internally.

State isolation across retries: The orchestrator clones each requirement (copy(req)) before every streaming attempt. Streaming state lives on the clone; the original is never mutated. This is consistent with the existing defensive copy in validate() (requirement.py:178). Requirement authors using mutable container fields (e.g. self._buffer = []) should reassign rather than mutate in place, or override __copy__ to properly isolate those fields — self._buffer.append(chunk) on a clone mutates the original's list via the shared reference.

Phase 1 does not include a reset() method. State isolation is handled entirely by the orchestrator cloning requirements before each attempt. If Phase 2 reveals a clear use case for explicit lifecycle hooks, one can be added then.

Layer 2 — orchestration

A new stream_with_chunking() function drives generation and coordinates validation. It consumes mot.astream() in a single background task — MOT enforces a single-consumer constraint on its stream, so requirements cannot each pull from it independently. This function buffers the token stream, applies the chunking strategy, and fans out complete chunks to requirements. It runs stream_validate() across all requirements in parallel on each chunk, exits early on "fail", and calls validate() at stream end for any requirement that did not return "fail". Both "unknown" and "pass" results trigger a final validate() — partial "pass" is informational only in Phase 1 and cannot safely short-circuit final validation without monotonicity guarantees.

This function is a standalone primitive — it does not plug into BaseSamplingStrategy. The existing sampling loop (BaseSamplingStrategy.sample()) always awaits the full output before validation and is not changed in this phase. stream_with_chunking() bypasses it entirely: callers invoke it directly and own the retry loop themselves. For the common case, a session-level convenience function (stream_instruct()) wraps it — providing the same instruct()-style interface users already know, but with streaming validation and retry handled internally.

On early exit due to "fail", the orchestrator calls cancel_generation() on the MOT — a new public method on ModelOutputThunk that cancels the background generate task, drains the queue, and ends any open telemetry span. Without this, the backend's asyncio.Queue(maxsize=20) fills and the producer blocks indefinitely once the consumer stops. This method is introduced as part of #901.

Chunking is a call-site concern rather than a per-requirement one — it is a property of the output type, not the requirement. For now, an external ChunkingStrategy (split(accumulated_text) -> list[str]) handles this, with built-in sentence, word, and paragraph implementations.

A small set of standard event dataclasses — ChunkEvent, QuickCheckEvent, RetryEvent, CompletedEvent, and others — gives consumers a shared vocabulary rather than having every downstream package define its own types. These are specified as part of this work. The sampling strategy plugin hooks (SAMPLING_LOOP_START, SAMPLING_REPAIR, etc.) do not fire during streaming; the event types are the equivalent observability mechanism for the streaming path.

The second phase — MOT-owned chunking

The right long-term owner of chunking is the MOT itself, since it already owns parsed_repr and has the semantic knowledge to produce meaningful chunks for its specific type. A follow-on issue will cover adding stream_parsed_repr to MOT.

When that lands, two things change. First, stream_with_chunking() can accept a MOT-native chunker in place of the external ChunkingStrategy — call-site interface stays the same, external chunkers can be deprecated. Second, and more significantly, requirements that currently need internal state to track accumulated output can instead read directly from context, removing the need for per-requirement state management in most cases. Everything else written in this phase — stream_validate, PartialValidationResult, the event types, the orchestration logic — is unaffected.


Testing and documentation

Each child issue is expected to deliver the following alongside its code:

Unit tests — for all new types and methods in isolation: PartialValidationResult construction and tri-state semantics, ChunkingStrategy split logic (including empty input, no complete chunk yet, trailing fragment), stream_validate() default behaviour, and any stateful override behaviour.

Integration tests with a mock streaming backendstream_with_chunking() orchestration logic should be covered by tests using a StreamingMockBackend (a backend that emits tokens from a fixed string on a configurable schedule). This allows the full chunk-validate-early-exit cycle to be tested without a live Ollama instance. Tests that exercise the real streaming path against Ollama are welcome but should be marked @pytest.mark.integration.

No e2e or qualitative tests — streaming validation correctness is verifiable without LLM quality judgements. The mock backend is sufficient.

Examples — at least one example in docs/examples/ showing how to write a stream_validate() override and use it with stream_with_chunking().

Docstrings — all public types and functions should have docstrings following the project's Google-style convention. These feed the auto-generated API docs.


Dependencies

Child issues

Metadata

Metadata

Assignees

Labels

enhancementNew feature or requestepicHigh level Epic

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions