Problem
All validation in mellea requires a fully-computed ModelOutputThunk. Generation must complete before any constraint is evaluated.
This doesn't match how inference actually works. Backends stream output incrementally — token by token for text, frame by frame for audio, segment by segment for structured data. mellea throws that away and waits for the end.
In practice:
- A generation that fails on its first sentence runs to completion anyway.
- Audio that goes wrong in the first few seconds can't be caught until the full clip is done.
- There's no way to feed partial results to a UI or downstream system as generation proceeds.
Requirements
- Incremental validation
- Early exit on definitive failure
- Deferred judgement when there isn't enough data yet
- Backward compatibility — existing requirements work unchanged, streaming is opt-in
- Stateful validation — a requirement can accumulate information across chunks within a single attempt
- Multiple requirements checkable concurrently per chunk
- Configurable chunking boundary (sentence, word, paragraph, audio segment…) set at the call site
- A shared event vocabulary for consumers — UIs,
m serve, downstream packages
Proposed approach
The work breaks into two layers.
Layer 1 — requirements get streaming support
Requirement gets a new method: stream_validate(backend, ctx, chunk). It receives one chunk at a time and returns a PartialValidationResult — a tri-state value of "pass", "fail", or "unknown". The default returns "unknown" every chunk and falls back to the existing validate() on the complete output at stream end, so existing requirements need no changes.
This default behaviour is correct for requirements that need a full output to do their job — LLM-as-judge and ALoRA requirements return "unknown" throughout streaming and receive the complete output via validate() at stream end. This is the intended path for any requirement that cannot make a meaningful incremental judgement.
The method is named stream_validate rather than avalidate. The a prefix in mellea means "async version of the same operation" — this is a different operation with different semantics (incremental input, tri-state result, stateful between calls) and should be clearly distinct from both validate() and mfuncs.avalidate().
One design question to settle: during streaming, ctx does not yet contain the model output — the MOT isn't complete, so a requirement cannot read accumulated content through context. A requirement that needs to inspect what has been generated so far must hold that state internally.
State isolation across retries: The orchestrator clones each requirement (copy(req)) before every streaming attempt. Streaming state lives on the clone; the original is never mutated. This is consistent with the existing defensive copy in validate() (requirement.py:178). Requirement authors using mutable container fields (e.g. self._buffer = []) should reassign rather than mutate in place, or override __copy__ to properly isolate those fields — self._buffer.append(chunk) on a clone mutates the original's list via the shared reference.
Phase 1 does not include a reset() method. State isolation is handled entirely by the orchestrator cloning requirements before each attempt. If Phase 2 reveals a clear use case for explicit lifecycle hooks, one can be added then.
Layer 2 — orchestration
A new stream_with_chunking() function drives generation and coordinates validation. It consumes mot.astream() in a single background task — MOT enforces a single-consumer constraint on its stream, so requirements cannot each pull from it independently. This function buffers the token stream, applies the chunking strategy, and fans out complete chunks to requirements. It runs stream_validate() across all requirements in parallel on each chunk, exits early on "fail", and calls validate() at stream end for any requirement that did not return "fail". Both "unknown" and "pass" results trigger a final validate() — partial "pass" is informational only in Phase 1 and cannot safely short-circuit final validation without monotonicity guarantees.
This function is a standalone primitive — it does not plug into BaseSamplingStrategy. The existing sampling loop (BaseSamplingStrategy.sample()) always awaits the full output before validation and is not changed in this phase. stream_with_chunking() bypasses it entirely: callers invoke it directly and own the retry loop themselves. For the common case, a session-level convenience function (stream_instruct()) wraps it — providing the same instruct()-style interface users already know, but with streaming validation and retry handled internally.
On early exit due to "fail", the orchestrator calls cancel_generation() on the MOT — a new public method on ModelOutputThunk that cancels the background generate task, drains the queue, and ends any open telemetry span. Without this, the backend's asyncio.Queue(maxsize=20) fills and the producer blocks indefinitely once the consumer stops. This method is introduced as part of #901.
Chunking is a call-site concern rather than a per-requirement one — it is a property of the output type, not the requirement. For now, an external ChunkingStrategy (split(accumulated_text) -> list[str]) handles this, with built-in sentence, word, and paragraph implementations.
A small set of standard event dataclasses — ChunkEvent, QuickCheckEvent, RetryEvent, CompletedEvent, and others — gives consumers a shared vocabulary rather than having every downstream package define its own types. These are specified as part of this work. The sampling strategy plugin hooks (SAMPLING_LOOP_START, SAMPLING_REPAIR, etc.) do not fire during streaming; the event types are the equivalent observability mechanism for the streaming path.
The second phase — MOT-owned chunking
The right long-term owner of chunking is the MOT itself, since it already owns parsed_repr and has the semantic knowledge to produce meaningful chunks for its specific type. A follow-on issue will cover adding stream_parsed_repr to MOT.
When that lands, two things change. First, stream_with_chunking() can accept a MOT-native chunker in place of the external ChunkingStrategy — call-site interface stays the same, external chunkers can be deprecated. Second, and more significantly, requirements that currently need internal state to track accumulated output can instead read directly from context, removing the need for per-requirement state management in most cases. Everything else written in this phase — stream_validate, PartialValidationResult, the event types, the orchestration logic — is unaffected.
Testing and documentation
Each child issue is expected to deliver the following alongside its code:
Unit tests — for all new types and methods in isolation: PartialValidationResult construction and tri-state semantics, ChunkingStrategy split logic (including empty input, no complete chunk yet, trailing fragment), stream_validate() default behaviour, and any stateful override behaviour.
Integration tests with a mock streaming backend — stream_with_chunking() orchestration logic should be covered by tests using a StreamingMockBackend (a backend that emits tokens from a fixed string on a configurable schedule). This allows the full chunk-validate-early-exit cycle to be tested without a live Ollama instance. Tests that exercise the real streaming path against Ollama are welcome but should be marked @pytest.mark.integration.
No e2e or qualitative tests — streaming validation correctness is verifiable without LLM quality judgements. The mock backend is sufficient.
Examples — at least one example in docs/examples/ showing how to write a stream_validate() override and use it with stream_with_chunking().
Docstrings — all public types and functions should have docstrings following the project's Google-style convention. These feed the auto-generated API docs.
Dependencies
Child issues
Problem
All validation in mellea requires a fully-computed
ModelOutputThunk. Generation must complete before any constraint is evaluated.This doesn't match how inference actually works. Backends stream output incrementally — token by token for text, frame by frame for audio, segment by segment for structured data. mellea throws that away and waits for the end.
In practice:
Requirements
m serve, downstream packagesProposed approach
The work breaks into two layers.
Layer 1 — requirements get streaming support
Requirementgets a new method:stream_validate(backend, ctx, chunk). It receives one chunk at a time and returns aPartialValidationResult— a tri-state value of"pass","fail", or"unknown". The default returns"unknown"every chunk and falls back to the existingvalidate()on the complete output at stream end, so existing requirements need no changes.This default behaviour is correct for requirements that need a full output to do their job — LLM-as-judge and ALoRA requirements return
"unknown"throughout streaming and receive the complete output viavalidate()at stream end. This is the intended path for any requirement that cannot make a meaningful incremental judgement.The method is named
stream_validaterather thanavalidate. Theaprefix in mellea means "async version of the same operation" — this is a different operation with different semantics (incremental input, tri-state result, stateful between calls) and should be clearly distinct from bothvalidate()andmfuncs.avalidate().One design question to settle: during streaming,
ctxdoes not yet contain the model output — the MOT isn't complete, so a requirement cannot read accumulated content through context. A requirement that needs to inspect what has been generated so far must hold that state internally.State isolation across retries: The orchestrator clones each requirement (
copy(req)) before every streaming attempt. Streaming state lives on the clone; the original is never mutated. This is consistent with the existing defensive copy invalidate()(requirement.py:178). Requirement authors using mutable container fields (e.g.self._buffer = []) should reassign rather than mutate in place, or override__copy__to properly isolate those fields —self._buffer.append(chunk)on a clone mutates the original's list via the shared reference.Phase 1 does not include a
reset()method. State isolation is handled entirely by the orchestrator cloning requirements before each attempt. If Phase 2 reveals a clear use case for explicit lifecycle hooks, one can be added then.Layer 2 — orchestration
A new
stream_with_chunking()function drives generation and coordinates validation. It consumesmot.astream()in a single background task — MOT enforces a single-consumer constraint on its stream, so requirements cannot each pull from it independently. This function buffers the token stream, applies the chunking strategy, and fans out complete chunks to requirements. It runsstream_validate()across all requirements in parallel on each chunk, exits early on"fail", and callsvalidate()at stream end for any requirement that did not return"fail". Both"unknown"and"pass"results trigger a finalvalidate()— partial"pass"is informational only in Phase 1 and cannot safely short-circuit final validation without monotonicity guarantees.This function is a standalone primitive — it does not plug into
BaseSamplingStrategy. The existing sampling loop (BaseSamplingStrategy.sample()) always awaits the full output before validation and is not changed in this phase.stream_with_chunking()bypasses it entirely: callers invoke it directly and own the retry loop themselves. For the common case, a session-level convenience function (stream_instruct()) wraps it — providing the sameinstruct()-style interface users already know, but with streaming validation and retry handled internally.On early exit due to
"fail", the orchestrator callscancel_generation()on the MOT — a new public method onModelOutputThunkthat cancels the background generate task, drains the queue, and ends any open telemetry span. Without this, the backend'sasyncio.Queue(maxsize=20)fills and the producer blocks indefinitely once the consumer stops. This method is introduced as part of #901.Chunking is a call-site concern rather than a per-requirement one — it is a property of the output type, not the requirement. For now, an external
ChunkingStrategy(split(accumulated_text) -> list[str]) handles this, with built-in sentence, word, and paragraph implementations.A small set of standard event dataclasses —
ChunkEvent,QuickCheckEvent,RetryEvent,CompletedEvent, and others — gives consumers a shared vocabulary rather than having every downstream package define its own types. These are specified as part of this work. The sampling strategy plugin hooks (SAMPLING_LOOP_START,SAMPLING_REPAIR, etc.) do not fire during streaming; the event types are the equivalent observability mechanism for the streaming path.The second phase — MOT-owned chunking
The right long-term owner of chunking is the MOT itself, since it already owns
parsed_reprand has the semantic knowledge to produce meaningful chunks for its specific type. A follow-on issue will cover addingstream_parsed_reprto MOT.When that lands, two things change. First,
stream_with_chunking()can accept a MOT-native chunker in place of the externalChunkingStrategy— call-site interface stays the same, external chunkers can be deprecated. Second, and more significantly, requirements that currently need internal state to track accumulated output can instead read directly from context, removing the need for per-requirement state management in most cases. Everything else written in this phase —stream_validate,PartialValidationResult, the event types, the orchestration logic — is unaffected.Testing and documentation
Each child issue is expected to deliver the following alongside its code:
Unit tests — for all new types and methods in isolation:
PartialValidationResultconstruction and tri-state semantics,ChunkingStrategysplit logic (including empty input, no complete chunk yet, trailing fragment),stream_validate()default behaviour, and any stateful override behaviour.Integration tests with a mock streaming backend —
stream_with_chunking()orchestration logic should be covered by tests using aStreamingMockBackend(a backend that emits tokens from a fixed string on a configurable schedule). This allows the full chunk-validate-early-exit cycle to be tested without a live Ollama instance. Tests that exercise the real streaming path against Ollama are welcome but should be marked@pytest.mark.integration.No e2e or qualitative tests — streaming validation correctness is verifiable without LLM quality judgements. The mock backend is sufficient.
Examples — at least one example in
docs/examples/showing how to write astream_validate()override and use it withstream_with_chunking().Docstrings — all public types and functions should have docstrings following the project's Google-style convention. These feed the auto-generated API docs.
Dependencies
refactor/mot-field-partitioning, open): ConsolidatesModelOutputThunktelemetry fields intomot.generation.*(GenerationMetadatadataclass). Streaming code targets the post-refactor!: partition ModelOutputThunk execution metadata into Generat… #908 API throughout. The streaming implementation branch (feat/streaming-validation) is based on the refactor!: partition ModelOutputThunk execution metadata into Generat… #908 branch tip; once refactor!: partition ModelOutputThunk execution metadata into Generat… #908 squash-merges tomain, rebase withgit rebase --onto upstream/main upstream/pr/908 feat/streaming-validation.ModelOutputThunkcleanup. Held until feat(core): add PartialValidationResult with tri-state semantics #898–feat(stdlib): implement stream_with_chunking() with per-chunk validation #901 land to avoid collision onbase.pyinternal queue fields and the Phase 2stream_parsed_reprdesign space.Child issues
PartialValidationResultwith tri-state semantics (merged PR feat(core): add PartialValidationResult with tri-state semantics #924)ChunkingStrategyABC and built-in chunkers (merged PR feat(stdlib): add ChunkingStrategy ABC and built-in chunkers #923)stream_validate()onRequirement(merged PR feat: add stream_validate() hook to Requirement (#900) #925)stream_with_chunking()with per-chunk validation (PR feat(stdlib): add stream_with_chunking() with per-chunk validation (#901) #942 in review)