Skip to content

Commit fc72ca1

Browse files
tonyxiaoclaude
andcommitted
fix: derive stream status from error traces and seed from source state
- Engine derives stream status from trace/error failure_type (only overrides non-complete streams) — no extra source messages needed - Seed streamStatus from source state on resume so streams the source skips (already complete/errored) show correct status in progress Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
1 parent eed87a4 commit fc72ca1

1 file changed

Lines changed: 11 additions & 1 deletion

File tree

apps/engine/src/lib/progress.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,20 @@ export function trackProgress(opts: {
6161
let stateCheckpointCount = 0
6262
const streamStatus = new Map<string, Status>()
6363

64-
// Restore stream statuses from persisted engine state
64+
// Restore stream statuses: engine state first, then source state overrides
65+
// (source state is authoritative — streams the source skips emit no messages)
6566
if (opts.initial_state?.engine?.streams) {
6667
for (const [stream, data] of Object.entries(opts.initial_state.engine.streams)) {
6768
const status = (data as { status?: Status })?.status
6869
if (status) streamStatus.set(stream, status)
6970
}
7071
}
72+
if (opts.initial_state?.source?.streams) {
73+
for (const [stream, data] of Object.entries(opts.initial_state.source.streams)) {
74+
const status = (data as { status?: string })?.status
75+
if (status) streamStatus.set(stream, status as Status)
76+
}
77+
}
7178
const streamErrors = new Map<string, StreamError[]>()
7279
const hadInitialState = opts.initial_state != null
7380
const finalState: SyncState = structuredClone(opts.initial_state ?? emptySyncState())
@@ -263,6 +270,9 @@ export function trackProgress(opts: {
263270
const errs = streamErrors.get(err.stream) ?? []
264271
errs.push({ message: err.message, failure_type: err.failure_type as FailureType })
265272
streamErrors.set(err.stream, errs)
273+
if (err.failure_type && streamStatus.get(err.stream) !== 'complete') {
274+
streamStatus.set(err.stream, err.failure_type as Status)
275+
}
266276
}
267277
}
268278
}

0 commit comments

Comments
 (0)