Skip to content

Commit eed87a4

Browse files
tonyxiaoclaude
andcommitted
fix: persist stream status in engine state, stop defaulting to running
- Initialize streamStatus from persisted engine state on resume so streams completed in prior runs show as complete, not running - Save stream status alongside cumulative_record_count in engine state - Never default to 'running' — streams without a known status are excluded from stream_progress and stream_status traces - source_state without prior stream_status infers 'started' not 'running' Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
1 parent 7aadc73 commit eed87a4

1 file changed

Lines changed: 24 additions & 8 deletions

File tree

apps/engine/src/lib/progress.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ export function trackProgress(opts: {
6060
const prevSnapshotCounts = new Map<string, number>()
6161
let stateCheckpointCount = 0
6262
const streamStatus = new Map<string, Status>()
63+
64+
// Restore stream statuses from persisted engine state
65+
if (opts.initial_state?.engine?.streams) {
66+
for (const [stream, data] of Object.entries(opts.initial_state.engine.streams)) {
67+
const status = (data as { status?: Status })?.status
68+
if (status) streamStatus.set(stream, status)
69+
}
70+
}
6371
const streamErrors = new Map<string, StreamError[]>()
6472
const hadInitialState = opts.initial_state != null
6573
const finalState: SyncState = structuredClone(opts.initial_state ?? emptySyncState())
@@ -115,7 +123,9 @@ export function trackProgress(opts: {
115123
lastEmitAt = Date.now()
116124
}
117125

118-
function buildStreamStatus(stream: string): SyncOutput {
126+
function buildStreamStatus(stream: string): SyncOutput | undefined {
127+
const status = streamStatus.get(stream)
128+
if (!status) return undefined
119129
const run = runRecordCount(stream)
120130
const cumulative = (cumulativeRecordCount.get(stream) ?? 0) + run
121131
return {
@@ -124,7 +134,7 @@ export function trackProgress(opts: {
124134
trace_type: 'stream_status' as const,
125135
stream_status: {
126136
stream,
127-
status: streamStatus.get(stream) ?? 'running',
137+
status,
128138
cumulative_record_count: cumulative,
129139
run_record_count: run,
130140
window_record_count: windowRecordCount(stream),
@@ -153,11 +163,13 @@ export function trackProgress(opts: {
153163
} as SyncOutput
154164
}
155165

156-
function buildStreamProgress(stream: string): EofStreamProgress {
166+
function buildStreamProgress(stream: string): EofStreamProgress | undefined {
167+
const status = streamStatus.get(stream)
168+
if (!status) return undefined
157169
const run = runRecordCount(stream)
158170
const cumulative = (cumulativeRecordCount.get(stream) ?? 0) + run
159171
return {
160-
status: streamStatus.get(stream) ?? 'running',
172+
status,
161173
cumulative_record_count: cumulative,
162174
run_record_count: run,
163175
records_per_second: run / elapsedSec(),
@@ -176,6 +188,7 @@ export function trackProgress(opts: {
176188
finalState.engine.streams[stream] = {
177189
...existing,
178190
cumulative_record_count: cumulative,
191+
...(streamStatus.has(stream) ? { status: streamStatus.get(stream) } : {}),
179192
}
180193
}
181194

@@ -195,7 +208,8 @@ export function trackProgress(opts: {
195208
const streams = allStreams()
196209
const streamProgressMap: Record<string, EofStreamProgress> = {}
197210
for (const s of streams) {
198-
streamProgressMap[s] = buildStreamProgress(s)
211+
const sp = buildStreamProgress(s)
212+
if (sp) streamProgressMap[s] = sp
199213
}
200214
const eof: EofPayload = {
201215
reason,
@@ -222,7 +236,8 @@ export function trackProgress(opts: {
222236
if (now - lastEmitAt < intervalMs) return
223237

224238
for (const stream of allStreams()) {
225-
yield buildStreamStatus(stream)
239+
const ss = buildStreamStatus(stream)
240+
if (ss) yield ss
226241
}
227242
yield buildGlobalProgress()
228243
snapshotWindow()
@@ -234,7 +249,7 @@ export function trackProgress(opts: {
234249
if (msg.source_state.state_type === 'stream') {
235250
const stream = msg.source_state.stream
236251
finalState.source.streams[stream] = msg.source_state.data
237-
if (!streamStatus.has(stream)) streamStatus.set(stream, 'running')
252+
if (!streamStatus.has(stream)) streamStatus.set(stream, 'started')
238253
} else if (msg.source_state.state_type === 'global') {
239254
finalState.source.global = msg.source_state.data as Record<string, unknown>
240255
}
@@ -254,7 +269,8 @@ export function trackProgress(opts: {
254269

255270
if (msg.type === 'eof') {
256271
for (const stream of allStreams()) {
257-
yield buildStreamStatus(stream)
272+
const ss = buildStreamStatus(stream)
273+
if (ss) yield ss
258274
}
259275
yield buildGlobalProgress()
260276
yield buildEnrichedEof(msg.eof.reason)

0 commit comments

Comments
 (0)