Skip to content

feat(stdlib): implement stream_with_chunking() with per-chunk validation #901

@planetf1

Description

@planetf1

What

Create mellea/stdlib/streaming.py with a stream_with_chunking() function and a StreamChunkingResult return type.

async def stream_with_chunking(
    action,
    backend,
    ctx,
    *,
    quick_check_requirements: list[Requirement] | None = None,
    chunking: str | ChunkingStrategy = "sentence",
    quick_check_backend: Backend | None = None,
) -> StreamChunkingResult:

Why

This is the core orchestration primitive for streaming validation. It bridges the MOT's raw token stream, the ChunkingStrategy, and per-chunk stream_validate() calls on requirements. All higher-level streaming APIs build on this function.

Implementation

Internals:

  1. Clone each requirement (copy(req)) before the attempt starts — streaming state accumulates on the clone; the original is never mutated
  2. Start a single background asyncio.Task consuming mot.astream() — the MOT enforces a single-consumer constraint (documented at mellea/core/base.py:434-436), so the orchestrator is the only caller; fanning out to requirements happens via a result queue, not by sharing the MOT
  3. Accumulate token deltas, apply ChunkingStrategy.split() to produce complete chunks
  4. For each complete chunk: run stream_validate() across all requirements in parallel via asyncio.gather
  5. On any "fail" result: call mot.cancel_generation() to cancel the background task, drain the queue, and end any open telemetry span; set completed = False. Without this, asyncio.Queue(maxsize=20) blocks the producer indefinitely once the consumer stops
  6. After the stream ends (mot._computed is True): call validate() on every requirement that did not return "fail" — both "unknown" and "pass" trigger final validation; "pass" mid-stream is informational only in Phase 1
  7. Route validation calls to quick_check_backend if provided

StreamChunkingResult (also defined in this module):

  • astream() -> AsyncIterator[str] — yields validated text chunks as they complete
  • acomplete() — awaits full completion
  • completed: boolFalse if the stream exited early due to a "fail" result
  • full_text: str — complete generated text (available after acomplete())
  • as_thunk — wraps the output as a ModelOutputThunk for interop with existing mellea code

Test infrastructure: A StreamingMockBackend test helper needs to be created (does not currently exist in the codebase). It should accept a fixed response string and feed it into a MOT's queue token by token. It can be defined in test/stdlib/test_streaming.py or a shared fixture. This is the primary vehicle for testing the orchestration logic without a live backend.

Example: Add docs/examples/streaming/stream_with_chunking.py demonstrating how to write a stream_validate() override and use it with this function. Marker: # pytest: ollama, integration.

Acceptance criteria

  • mellea/stdlib/streaming.py created with stream_with_chunking() and StreamChunkingResult
  • Requirements cloned (copy(req)) before each attempt; the original requirement instance is never mutated
  • Single asyncio.Task consumes mot.astream() — no other code calls astream() on the same MOT
  • Per-chunk stream_validate() called via asyncio.gather (parallel, not sequential)
  • Early exit on "fail" cancels the remaining stream; StreamChunkingResult.completed is False
  • All non-"fail" requirements (both "pass" and "unknown") get validate() called at stream end
  • quick_check_backend routes validation calls to the alternate backend
  • StreamChunkingResult.as_thunk wraps the output as a ModelOutputThunk
  • StreamingMockBackend test helper created
  • Integration tests in test/stdlib/test_streaming.py using StreamingMockBackend covering:
    • Normal completion (all "unknown", validate() called at stream end)
    • Early exit on "fail"
    • Stateful requirement verified across a retry — clone ensures clean state; no bleed from prior attempt
    • quick_check_backend routing
  • cancel_generation() added to ModelOutputThunk in mellea/core/base.py; cancels _generate task, drains queue, ends open telemetry span; orchestrator calls it on early "fail" exit
  • Test: deadlock scenario — early exit with queue near capacity does not hang
  • Uses mot.generation.* telemetry paths throughout (requires refactor!: partition ModelOutputThunk execution metadata into Generat… #908 merged; mot.generation.usage will be None on early exit since post_processing() does not run — StreamChunkingResult exposes this)
  • Documents in docstring that v1 retry is simple re-invocation (no repair/rewrite); plugin hooks (SAMPLING_LOOP_START, SAMPLING_REPAIR, etc.) do not fire — feat(stdlib): add standard streaming event types #902 event types are the observability substitute
  • Real-backend tests marked @pytest.mark.integration and @pytest.mark.ollama
  • Example in docs/examples/streaming/ with # pytest: ollama, integration marker
  • Google-style docstrings on stream_with_chunking() and StreamChunkingResult

Blocked by #900, #899
Depends on #908 merging (uses mot.generation.* API throughout)
Part of #891

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions