Our HTTP streaming endpoints return 200 once the NDJSON stream is established. That is correct at the transport layer, but it leaves a protocol gap:
- a stream can fail after headers are committed
- message ordering is mostly implicit
- clients cannot reliably distinguish "clean completion" from "protocol bug" from "socket closed after a late exception"
We already have the right architectural direction:
- the system is message-first
- stream termination should be explicit (
eof) - mid-stream failures must be represented in-band, not as a late HTTP
500
What is missing is an explicit message lifecycle with validation.
Today the stream protocol allows callers to infer broad meaning from message types, but it does not define a strict ordering contract. That creates several failure modes:
- A producer can emit
progressbefore any start/initialization signal. - A stream can end on a thrown invariant with only a final
logline, which is not a machine-readable terminal outcome. - Different routes expose slightly different "first valid message" assumptions.
- Clients have to guess whether a missing
eofmeans crash, disconnect, proxy reset, or protocol violation.
This is a protocol problem, not an HTTP problem.
- Define a stream-level state machine for all NDJSON streaming routes.
- Make terminal outcomes explicit and machine-readable.
- Convert late exceptions and invariant violations into terminal protocol messages.
- Validate message order on the server so producers cannot emit nonsense silently.
- Give clients deterministic semantics for stream start, progress, success, and failure.
- Replace HTTP streaming with WebSockets or gRPC.
- Redesign every message type in one pass.
- Introduce route-specific ad hoc ordering rules without a shared abstraction.
Every streaming route should follow the same high-level lifecycle:
prelude -> streaming -> terminal
Definitions:
prelude: initial handshake / metadata before steady-state data flowstreaming: normal in-flight messagesterminal: exactly one terminal message, then end of stream
Different routes legitimately have different first messages. We should not force a single literal started envelope everywhere if a route already has a natural prelude.
Instead, define the validator in terms of allowed message classes per route:
/pipeline_check- prelude:
log,connection_status - terminal:
connection_statuswithfailed, oreof
- prelude:
/source_discover- prelude:
log,catalog - terminal:
eof
- prelude:
/pipeline_read- prelude:
log,catalog,stream_status(start) - streaming:
record,source_state,stream_status,progress,log - terminal:
eof, terminalerror
- prelude:
/pipeline_write- prelude:
log - streaming:
source_state,progress,log - terminal:
eof, terminalerror
- prelude:
/pipeline_sync- prelude:
log,catalog,stream_status(start),progress - streaming:
source_state,stream_status,progress,control,log - terminal:
eof, terminalerror
- prelude:
This preserves existing message shapes while making ordering explicit.
There must be an explicit terminal message for every successful or failed stream.
Two valid designs:
- Extend
eofto carry terminal status. - Add a dedicated top-level
errormessage and keepeofsuccess-oriented.
Recommendation: extend eof.
Rationale:
- we already use
eofas the canonical last message - clients already look for it
- a single terminal envelope avoids "did I get
errorand then also expecteof?"
Proposed shape:
type EofReason = 'complete' | 'state_limit' | 'time_limit' | 'aborted' | 'error'
interface EofPayload {
reason: EofReason
has_more: boolean
ending_state?: SyncState
run_progress: ProgressPayload
request_progress: ProgressPayload
error?: {
code: 'protocol_violation' | 'invariant_violation' | 'internal_error'
message: string
}
}Rules:
reason: 'complete'=> normal exhaustion,has_more: falsereason: 'state_limit' | 'time_limit'=> bounded pause,has_more: truereason: 'aborted'=> client disconnect / cancellation, usuallyhas_more: truereason: 'error'=> fatal stream failure,has_more: falseerrorfield is present only whenreason === 'error'
Introduce a stream validator wrapper with explicit phase tracking.
Pseudo-interface:
interface StreamProtocolSpec<T extends { type: string }> {
allow_in_prelude(msg: T): boolean
allow_in_streaming(msg: T): boolean
is_terminal(msg: T): boolean
on_violation(details: ViolationDetails): T
on_thrown_error(err: unknown): T
}Core rules:
- first emitted message must be allowed in
prelude - once a steady-state message appears, phase becomes
streaming - terminal message is allowed exactly once
- no messages after terminal
- a violation is converted into a terminal protocol message
- a thrown exception is converted into a terminal protocol message
This wrapper should sit at the API boundary, not inside every connector.
Update packages/protocol/src/protocol.ts:
- add
reasontoEofPayload - add optional terminal
errorpayload forreason: 'error'
This aligns the implementation with the existing EOF design intent already documented in docs/plans/stream-limits-and-eof.md.
Add a protocol or engine helper such as:
packages/protocol/src/stream-validator.ts, orapps/engine/src/lib/stream-validator.ts
Responsibilities:
- track lifecycle phase
- validate message ordering
- map violations to terminal
eof - map thrown errors to terminal
eof
Update the streaming response wrappers so that:
- pre-stream failures still return
4xx/5xx - post-stream failures become terminal
eof(reason='error') - bare "log-only" terminal failures are no longer the primary machine contract
log messages can still accompany the terminal eof, but they are supplemental.
Files:
packages/protocol/src/protocol.tspackages/protocol/src/helpers.tspackages/protocol/src/index.ts
Changes:
- extend
EofPayloadwithreason - add optional structured
error - add helper constructor for terminal error EOF if useful
Files:
apps/engine/src/lib/stream-validator.ts(new)apps/engine/src/api/helpers.tspackages/ts-cli/src/ndjson.ts
Changes:
- implement phase-tracking wrapper
- route thrown exceptions through terminal
eof(reason='error') - keep existing log emission, but ensure terminal EOF is always last
Files:
apps/engine/src/api/app.tsapps/service/src/api/app.ts
Changes:
- wrap streaming iterables with route-specific protocol specs
- define allowed prelude/streaming/terminal message sets per endpoint
Files:
apps/service/src/temporal/activities/_shared.tsapps/service/src/cli/pipeline-sync.tsx- any consumers that currently assume
has_moreis the only EOF signal
Changes:
- teach consumers to inspect
eof.reason - treat
reason: 'error'as failure even though HTTP status is200 - preserve
ending_statebehavior for resumable bounded runs
Successful bounded sync:
{"type":"log","log":{"level":"info","message":"starting sync"}}
{"type":"progress","progress":{"derived":{"status":"started"}}}
{"type":"source_state","source_state":{"state_type":"stream","stream":"customer","data":{"cursor":"cus_123"}}}
{"type":"eof","eof":{"reason":"time_limit","has_more":true,"ending_state":{},"run_progress":{},"request_progress":{}}}Invariant violation after streaming started:
{"type":"log","log":{"level":"info","message":"starting sync"}}
{"type":"progress","progress":{"derived":{"status":"started"}}}
{"type":"eof","eof":{"reason":"error","has_more":false,"error":{"code":"invariant_violation","message":"progress emitted before stream start"},"ending_state":{},"run_progress":{},"request_progress":{}}}Protocol violation from producer:
{
"type": "eof",
"eof": {
"reason": "error",
"has_more": false,
"error": {
"code": "protocol_violation",
"message": "record not allowed in prelude"
},
"ending_state": {},
"run_progress": {},
"request_progress": {}
}
}Add unit tests for:
- valid prelude -> streaming -> terminal sequences
progressbefore allowed prelude- duplicate terminal messages
- messages after terminal
- thrown exception after several successful messages
- client disconnect path emits
abortedor terminates consistently by route contract
Likely files:
packages/ts-cli/src/ndjson.test.tsapps/engine/src/api/app.test.tsapps/engine/src/lib/engine.test.ts- new validator-specific tests
- This should be backward-compatible where possible, but adding required
eof.reasonchanges the wire contract. - If needed, ship in two steps:
- add
reasonas optional and emit it everywhere - make
reasonrequired after all consumers are updated
- add
- EOF is not always emitted. If the engine throws mid-stream (e.g. an unhandled invariant in a connector or the setup timeout fires), the stream may end without an
eofmessage. Clients currently interpret socket close withouteofas a crash. The validator (Phase 2) must guarantee that every stream ends with exactly one terminaleof, converting thrown exceptions intoeof(reason='error'). pipeline_setupandpipeline_teardowndo not emiteof. These routes streamlogandcontrolmessages but have no terminal signal. Clients use "stream ended" as the completion marker. This is fragile — a proxy timeout or broken pipe is indistinguishable from success.pipeline_checkterminates withconnection_statusnoteof. This works but is inconsistent with the rest of the protocol.takeLimitsemits a bareeof({ has_more }only). The engine'spipeline_syncintercepts and enriches it withrun_progress,status, etc. If any code path consumes the rawtakeLimitsoutput without enrichment (e.g.pipeline_read), clients see a partialeofmissing required fields.
- Do we want an explicit
startedmessage eventually, or are route-specific preludes sufficient for v1? - Should
abortedproduce aneof, or is disconnect inherently best-effort? - Should protocol violations be visible to clients only as terminal
eof(reason='error'), or also mirrored aslog(level='error')for operator visibility? - Should non-sync routes (
check,discover,setup,teardown) all adopteofas well for full consistency?
Implement the validator and explicit eof.reason first.
That is the minimum change that solves the real problem:
200remains the correct HTTP status for an established stream- late failures become explicit protocol outcomes
- stream ordering becomes enforceable instead of implied