@@ -10,6 +10,7 @@ import (
1010 "net/http"
1111 "os"
1212 "os/signal"
13+ "sync"
1314 "sync/atomic"
1415 "syscall"
1516 "time"
@@ -153,11 +154,76 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err
153154 }
154155 }
155156
157+ // Phase-boundary progress posts run on a dedicated worker so the scan
158+ // never blocks on HTTP at a call site. Buffer=1 + drop-oldest send
159+ // gives us two properties together:
160+ // - Strict ordering: a single consumer means the backend can never
161+ // see an older snapshot land after a newer one (which would cause
162+ // the console UI to briefly regress on degraded networks).
163+ // - Bounded resources: at most one pending snapshot is queued; a
164+ // slow-network backlog can't grow across the 11+ inline call
165+ // sites. The latest snapshot always wins, which matches what an
166+ // operator watching progress actually cares about.
167+ //
168+ // Without this, blocking phase posts could add the per-call retry
169+ // budget (~6s: 2 attempts × 3s HTTP timeout + 500ms backoff) to each
170+ // call site, compounding to over a minute of added scan latency on a
171+ // degraded link for purely best-effort progress data.
172+ phaseCh := make (chan RunStatusInfo , 1 )
173+ phaseDone := make (chan struct {})
174+ var phaseSendMu sync.Mutex // serialises drain+send so concurrent producers (main scan + heartbeat) don't race
175+ go func () {
176+ defer close (phaseDone )
177+ // process posts one snapshot using a Background-derived ctx with
178+ // a bounded per-post timeout. We deliberately do NOT chain off the
179+ // scan ctx here: the final phase-boundary post (which is the only
180+ // snapshot that includes "telemetry_upload" in phases_completed)
181+ // arrives at the worker *after* the function body returns and the
182+ // deferred cancelRun() fires. If we shared the scan ctx, that post
183+ // would always be cancelled mid-flight and the backend would never
184+ // learn the upload completed. The 10s budget covers postProgress's
185+ // own internal retry window (2×3s + 500ms backoff) with slack.
186+ process := func (snap RunStatusInfo ) {
187+ postCtx , postCancel := context .WithTimeout (context .Background (), 10 * time .Second )
188+ defer postCancel ()
189+ postProgress (postCtx , log , executionID , deviceID , invocationMethod , snap )
190+ }
191+ for {
192+ select {
193+ case snap := <- phaseCh :
194+ process (snap )
195+ case <- ctx .Done ():
196+ // Drain any queued snapshot before exiting. Without this,
197+ // a naïve select on the next iteration would 50/50 between
198+ // the ready ctx.Done() and the ready phaseCh — dropping
199+ // the final post is exactly what the user reports as
200+ // "telemetry_upload missing from phases_completed".
201+ for {
202+ select {
203+ case snap := <- phaseCh :
204+ process (snap )
205+ default :
206+ return
207+ }
208+ }
209+ }
210+ }
211+ }()
212+
156213 // postPhase is the convergence point for phase-boundary and heartbeat
157214 // progress updates. Captured here so the heartbeat goroutine and the
158215 // inline phase wrappers share a single call site.
159216 postPhase := func () {
160- postProgress (ctx , log , executionID , deviceID , invocationMethod , tracker .Snapshot ())
217+ snap := tracker .Snapshot ()
218+ phaseSendMu .Lock ()
219+ defer phaseSendMu .Unlock ()
220+ // Drop any queued (older) snapshot so the freshest one always lands.
221+ select {
222+ case <- phaseCh :
223+ default :
224+ }
225+ // Always succeeds: buffer=1, just drained, single sender under the mutex.
226+ phaseCh <- snap
161227 }
162228
163229 // Catch SIGINT / SIGTERM so cancellation (Ctrl+C, launchd stop, kill)
@@ -278,13 +344,15 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err
278344 }
279345 }
280346 }()
281- // Shut the heartbeat down cleanly on return. Order matters: cancel
282- // first so the goroutine sees ctx.Done() and exits, THEN wait for it
283- // to close heartbeatDone. Splitting these into two `defer` statements
284- // would deadlock — LIFO would block on the wait before cancel fires.
347+ // Shut down both the heartbeat goroutine and the phase-post worker
348+ // cleanly on return. Order matters: cancel first so both goroutines
349+ // see ctx.Done() and exit, THEN wait for each to close its done
350+ // channel. Splitting these into separate `defer` statements would
351+ // deadlock — LIFO would block on the waits before cancel fires.
285352 defer func () {
286353 cancelRun ()
287354 <- heartbeatDone
355+ <- phaseDone
288356 }()
289357
290358 // Detect logged-in user for running commands as the real user when root.
0 commit comments