Skip to content

Commit b094538

Browse files
authored
Make forecast interruption-aware, ignore skipped runs, and clean up empty CI output (#34740)
1 parent ee6854f commit b094538

5 files changed

Lines changed: 213 additions & 27 deletions

File tree

pkg/cli/forecast.go

Lines changed: 63 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@ package cli
99
// workflow is actually expected to fire and how many concurrent runs it supports.
1010

1111
import (
12+
"context"
1213
"encoding/json"
14+
"errors"
1315
"fmt"
1416
"math"
1517
"math/rand"
1618
"os"
19+
"os/signal"
1720
"path/filepath"
1821
"sort"
1922
"strconv"
@@ -44,7 +47,17 @@ const (
4447
var (
4548
forecastFetchGitHubWorkflows = fetchGitHubWorkflows
4649
forecastListWorkflowRunsPaginated = listWorkflowRunsWithPagination
47-
forecastRateLimitSleep = time.Sleep
50+
forecastRateLimitSleep = func(ctx context.Context, delay time.Duration) error {
51+
timer := time.NewTimer(delay)
52+
defer timer.Stop()
53+
54+
select {
55+
case <-timer.C:
56+
return nil
57+
case <-ctx.Done():
58+
return ctx.Err()
59+
}
60+
}
4861
)
4962

5063
// ForecastEpisodeSummary contains episode-level aggregate metrics derived from
@@ -165,6 +178,8 @@ type ForecastResult struct {
165178
// RunForecast is the entry point for the forecast command.
166179
func RunForecast(config ForecastConfig) error {
167180
forecastRunLog.Printf("Running forecast: workflows=%v, days=%d, period=%s, eval=%v", config.WorkflowIDs, config.Days, config.Period, config.EvalMode)
181+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
182+
defer stop()
168183

169184
// Emit experimental warning so users know this command is not yet stable.
170185
fmt.Fprintln(os.Stderr, console.FormatWarningMessage("forecast is an experimental command and may change without notice"))
@@ -182,7 +197,7 @@ func RunForecast(config ForecastConfig) error {
182197
}
183198

184199
// Resolve the list of workflow IDs to forecast.
185-
workflowIDs, err := resolveForecastWorkflows(config)
200+
workflowIDs, err := resolveForecastWorkflows(ctx, config)
186201
if err != nil {
187202
return err
188203
}
@@ -232,14 +247,26 @@ func RunForecast(config ForecastConfig) error {
232247

233248
results := make([]ForecastWorkflowResult, 0, len(workflowIDs))
234249
for _, wfID := range workflowIDs {
250+
if err := ctx.Err(); err != nil {
251+
if !config.Verbose {
252+
spinner.Stop()
253+
}
254+
return err
255+
}
235256
if !config.Verbose {
236257
spinner.UpdateMessage(fmt.Sprintf("Sampling %s…", wfID))
237258
}
238259

239260
// forecastWorkflow uses the shifted startDate; in eval mode we also pass the
240261
// anchor so the function knows where the training window ends.
241-
result, err := forecastWorkflow(wfID, startDate, config, periodDays)
262+
result, err := forecastWorkflow(ctx, wfID, startDate, config, periodDays)
242263
if err != nil {
264+
if errors.Is(err, context.Canceled) {
265+
if !config.Verbose {
266+
spinner.Stop()
267+
}
268+
return err
269+
}
243270
if !config.Verbose {
244271
spinner.Stop()
245272
}
@@ -253,7 +280,7 @@ func RunForecast(config ForecastConfig) error {
253280

254281
// In eval mode, fetch the validation-window runs and attach evaluation metrics.
255282
if config.EvalMode {
256-
result.Evaluation = evaluateForecast(wfID, result, validationStartDate, validationEndDate, config)
283+
result.Evaluation = evaluateForecast(ctx, wfID, result, validationStartDate, validationEndDate, config)
257284
}
258285

259286
results = append(results, result)
@@ -292,9 +319,9 @@ func RunForecast(config ForecastConfig) error {
292319
// resolveForecastWorkflows returns the ordered list of workflow IDs to forecast.
293320
// When WorkflowIDs is empty, all agentic workflow IDs in the repository are returned.
294321
// When RepoOverride is set, workflows are discovered via the GitHub API instead of local files.
295-
func resolveForecastWorkflows(config ForecastConfig) ([]string, error) {
322+
func resolveForecastWorkflows(ctx context.Context, config ForecastConfig) ([]string, error) {
296323
if config.RepoOverride != "" {
297-
return resolveForecastWorkflowsFromRemote(config.WorkflowIDs, config.RepoOverride, config.Verbose)
324+
return resolveForecastWorkflowsFromRemote(ctx, config.WorkflowIDs, config.RepoOverride, config.Verbose)
298325
}
299326

300327
if len(config.WorkflowIDs) > 0 {
@@ -322,8 +349,8 @@ func resolveForecastWorkflows(config ForecastConfig) ([]string, error) {
322349
// the GitHub API. When ids is empty, all workflows in the remote repository are returned.
323350
// When ids are provided, each is matched (case-insensitively) against remote workflow names
324351
// and file-path basenames.
325-
func resolveForecastWorkflowsFromRemote(ids []string, repoOverride string, verbose bool) ([]string, error) {
326-
githubWorkflows, err := fetchWorkflowsWithBackoff(ids, repoOverride, verbose)
352+
func resolveForecastWorkflowsFromRemote(ctx context.Context, ids []string, repoOverride string, verbose bool) ([]string, error) {
353+
githubWorkflows, err := fetchWorkflowsWithBackoff(ctx, ids, repoOverride, verbose)
327354
if err != nil {
328355
return nil, fmt.Errorf("failed to list workflows in %s: %w", repoOverride, err)
329356
}
@@ -357,7 +384,7 @@ func forecastRateLimitBackoffDuration(attempt int) time.Duration {
357384
return time.Duration(attempt) * forecastRateLimitBaseBackoff
358385
}
359386

360-
func fetchWorkflowsWithBackoff(ids []string, repoOverride string, verbose bool) (map[string]*GitHubWorkflow, error) {
387+
func fetchWorkflowsWithBackoff(ctx context.Context, ids []string, repoOverride string, verbose bool) (map[string]*GitHubWorkflow, error) {
361388
var lastErr error
362389

363390
for attempt := 1; attempt <= forecastRateLimitMaxAttempts; attempt++ {
@@ -378,7 +405,9 @@ func fetchWorkflowsWithBackoff(ids []string, repoOverride string, verbose bool)
378405
fmt.Fprintln(os.Stderr, console.FormatWarningMessage(
379406
fmt.Sprintf("GitHub API rate limit hit while discovering workflows in %s; backing off for %s before retry %d/%d",
380407
repoOverride, backoff, attempt+1, forecastRateLimitMaxAttempts)))
381-
forecastRateLimitSleep(backoff)
408+
if err := forecastRateLimitSleep(ctx, backoff); err != nil {
409+
return nil, err
410+
}
382411
}
383412

384413
if len(ids) > 0 {
@@ -396,8 +425,9 @@ func fetchWorkflowsWithBackoff(ids []string, repoOverride string, verbose bool)
396425
return nil, fmt.Errorf("GitHub API rate limit exhausted after %d attempts: %w", forecastRateLimitMaxAttempts, lastErr)
397426
}
398427

399-
func listRunsWithBackoff(opts ListWorkflowRunsOptions, workflowID string) ([]WorkflowRun, int, error) {
428+
func listRunsWithBackoff(ctx context.Context, opts ListWorkflowRunsOptions, workflowID string) ([]WorkflowRun, int, error) {
400429
var lastErr error
430+
opts.Context = ctx
401431

402432
for attempt := 1; attempt <= forecastRateLimitMaxAttempts; attempt++ {
403433
runs, total, err := forecastListWorkflowRunsPaginated(opts)
@@ -417,7 +447,9 @@ func listRunsWithBackoff(opts ListWorkflowRunsOptions, workflowID string) ([]Wor
417447
fmt.Fprintln(os.Stderr, console.FormatWarningMessage(
418448
fmt.Sprintf("GitHub API rate limit hit while sampling %s; backing off for %s before retry %d/%d",
419449
workflowID, backoff, attempt+1, forecastRateLimitMaxAttempts)))
420-
forecastRateLimitSleep(backoff)
450+
if err := forecastRateLimitSleep(ctx, backoff); err != nil {
451+
return nil, 0, err
452+
}
421453
}
422454

423455
return nil, 0, lastErr
@@ -437,7 +469,7 @@ func matchRemoteWorkflowName(id string, workflows map[string]*GitHubWorkflow) st
437469
}
438470

439471
// forecastWorkflow computes a ForecastWorkflowResult for a single workflow.
440-
func forecastWorkflow(workflowName, startDate string, config ForecastConfig, periodDays int) (ForecastWorkflowResult, error) {
472+
func forecastWorkflow(ctx context.Context, workflowName, startDate string, config ForecastConfig, periodDays int) (ForecastWorkflowResult, error) {
441473
result := ForecastWorkflowResult{
442474
WorkflowID: extractWorkflowIDFromName(workflowName),
443475
Period: config.Period,
@@ -461,11 +493,12 @@ func forecastWorkflow(workflowName, startDate string, config ForecastConfig, per
461493
WorkflowName: apiName,
462494
StartDate: startDate,
463495
Limit: config.SampleSize,
496+
TargetCount: config.SampleSize,
464497
RepoOverride: config.RepoOverride,
465498
Verbose: config.Verbose,
466499
}
467500

468-
runs, _, err := listRunsWithBackoff(opts, result.WorkflowID)
501+
runs, _, err := listRunsWithBackoff(ctx, opts, result.WorkflowID)
469502
if err != nil {
470503
if gitutil.IsRateLimitError(err.Error()) {
471504
fmt.Fprintln(os.Stderr, console.FormatWarningMessage(
@@ -478,7 +511,7 @@ func forecastWorkflow(workflowName, startDate string, config ForecastConfig, per
478511
// Only use completed runs for metric computation.
479512
completed := make([]WorkflowRun, 0, len(runs))
480513
for _, r := range runs {
481-
if r.Status == "completed" {
514+
if isCompletedNonSkippedRun(r) {
482515
// Compute Duration from StartedAt/UpdatedAt when not already set (gh run list
483516
// does not populate the Duration field; health_command uses the same approach).
484517
if r.Duration == 0 && !r.StartedAt.IsZero() && !r.UpdatedAt.IsZero() {
@@ -814,14 +847,18 @@ func loadCachedEffectiveTokens(runID int64, verbose bool) int {
814847
return 0
815848
}
816849

850+
func isCompletedNonSkippedRun(r WorkflowRun) bool {
851+
return r.Status == "completed" && r.Conclusion != "skipped"
852+
}
853+
817854
// evaluateForecast fetches actual completed runs in the validation window and
818855
// returns a ForecastEvaluation comparing them against the Monte Carlo forecast.
819856
//
820857
// validationStartDate / validationEndDate are ISO-8601 strings bracketing the
821858
// period that was forecast (= one projection period immediately before now).
822859
// Actual runs are fetched with the same pagination helper used for training,
823860
// but with the validation date range.
824-
func evaluateForecast(workflowName string, forecast ForecastWorkflowResult, validationStartDate, validationEndDate string, config ForecastConfig) *ForecastEvaluation {
861+
func evaluateForecast(ctx context.Context, workflowName string, forecast ForecastWorkflowResult, validationStartDate, validationEndDate string, config ForecastConfig) *ForecastEvaluation {
825862
// Compute the actual ISO-8601 training start date by subtracting HistoryDays
826863
// from the validation start (= anchor).
827864
var trainingStartDate string
@@ -847,9 +884,11 @@ func evaluateForecast(workflowName string, forecast ForecastWorkflowResult, vali
847884
WorkflowName: apiName,
848885
StartDate: validationStartDate,
849886
Limit: config.SampleSize,
887+
TargetCount: config.SampleSize,
850888
RepoOverride: config.RepoOverride,
851889
Verbose: config.Verbose,
852890
}
891+
opts.Context = ctx
853892
runs, _, err := listWorkflowRunsWithPagination(opts)
854893
if err != nil {
855894
forecastRunLog.Printf("Eval: failed to fetch validation runs for %s: %v", workflowName, err)
@@ -860,7 +899,7 @@ func evaluateForecast(workflowName string, forecast ForecastWorkflowResult, vali
860899
validationEnd := time.Now()
861900
validationStart, _ := time.Parse("2006-01-02", validationStartDate)
862901
for _, r := range runs {
863-
if r.Status != "completed" {
902+
if !isCompletedNonSkippedRun(r) {
864903
continue
865904
}
866905
// Skip runs with no timestamp — we cannot verify they belong to the
@@ -937,9 +976,13 @@ func renderForecastTable(output ForecastResult, config ForecastConfig) error {
937976
unreliableMark := ""
938977
if mc := wf.MonteCarlo; mc != nil {
939978
projETStr = formatForecastTokens(mc.P50ProjectedEffectiveTokens)
940-
etRangeStr = fmt.Sprintf("%s–%s",
941-
formatForecastTokens(mc.P10ProjectedEffectiveTokens),
942-
formatForecastTokens(mc.P90ProjectedEffectiveTokens))
979+
if mc.P10ProjectedEffectiveTokens == 0 && mc.P90ProjectedEffectiveTokens == 0 {
980+
etRangeStr = "-"
981+
} else {
982+
etRangeStr = fmt.Sprintf("%s–%s",
983+
formatForecastTokens(mc.P10ProjectedEffectiveTokens),
984+
formatForecastTokens(mc.P90ProjectedEffectiveTokens))
985+
}
943986
if !mc.IsReliable {
944987
anyUnreliable = true
945988
unreliableMark = "*"

pkg/cli/forecast_montecarlo_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package cli
44

55
import (
6+
"context"
67
"errors"
78
"io"
89
"math"
@@ -415,10 +416,6 @@ func TestResolveForecastWorkflowsFromRemote_RateLimitFallsBackToPartialResults(t
415416
attempts++
416417
return nil, errors.New("API rate limit exceeded")
417418
}
418-
forecastRateLimitSleep = func(delay time.Duration) {
419-
backoffs = append(backoffs, delay)
420-
}
421-
422419
stderrReader, stderrWriter, err := os.Pipe()
423420
require.NoError(t, err, "Should create stderr pipe")
424421
originalStderr := os.Stderr
@@ -427,7 +424,12 @@ func TestResolveForecastWorkflowsFromRemote_RateLimitFallsBackToPartialResults(t
427424
os.Stderr = originalStderr
428425
})
429426

430-
names, err := resolveForecastWorkflowsFromRemote([]string{"ci-doctor", "daily-planner"}, "owner/repo", true)
427+
forecastRateLimitSleep = func(_ context.Context, delay time.Duration) error {
428+
backoffs = append(backoffs, delay)
429+
return nil
430+
}
431+
432+
names, err := resolveForecastWorkflowsFromRemote(context.Background(), []string{"ci-doctor", "daily-planner"}, "owner/repo", true)
431433
require.NoError(t, err, "T-FC-030 should return caller-supplied partial results after rate-limit retries")
432434
assert.Equal(t, []string{"ci-doctor", "daily-planner"}, names, "Should preserve caller-supplied workflow order")
433435
assert.Equal(t, forecastRateLimitMaxAttempts, attempts, "Should retry discovery until the retry budget is exhausted")

pkg/cli/forecast_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
package cli
44

55
import (
6+
"context"
7+
"io"
68
"math/rand"
9+
"os"
710
"testing"
811
"time"
912

@@ -169,3 +172,76 @@ func TestObservedRunsPerPeriodConsistency(t *testing.T) {
169172
assert.LessOrEqual(t, mc.P50ProjectedEffectiveTokens, mc.P90ProjectedEffectiveTokens,
170173
"P50 ≤ P90")
171174
}
175+
176+
func TestForecastRateLimitSleep_ContextCancelled(t *testing.T) {
177+
ctx, cancel := context.WithCancel(context.Background())
178+
cancel()
179+
180+
err := forecastRateLimitSleep(ctx, time.Second)
181+
require.ErrorIs(t, err, context.Canceled)
182+
}
183+
184+
func TestForecastRateLimitSleep_CompletesWithoutCancellation(t *testing.T) {
185+
err := forecastRateLimitSleep(context.Background(), time.Millisecond)
186+
require.NoError(t, err)
187+
}
188+
189+
func TestForecastWorkflow_IgnoresSkippedRuns(t *testing.T) {
190+
originalList := forecastListWorkflowRunsPaginated
191+
t.Cleanup(func() {
192+
forecastListWorkflowRunsPaginated = originalList
193+
})
194+
195+
start := time.Date(2026, 1, 1, 10, 0, 0, 0, time.UTC)
196+
forecastListWorkflowRunsPaginated = func(_ ListWorkflowRunsOptions) ([]WorkflowRun, int, error) {
197+
runs := []WorkflowRun{
198+
{Status: "completed", Conclusion: "skipped", EffectiveTokens: 999, Duration: 10 * time.Minute},
199+
{Status: "completed", Conclusion: "success", EffectiveTokens: 100, Duration: 5 * time.Minute, StartedAt: start, UpdatedAt: start.Add(5 * time.Minute)},
200+
{Status: "completed", Conclusion: "failure", EffectiveTokens: 200, Duration: 6 * time.Minute, StartedAt: start.Add(10 * time.Minute), UpdatedAt: start.Add(16 * time.Minute)},
201+
}
202+
return runs, len(runs), nil
203+
}
204+
205+
result, err := forecastWorkflow(context.Background(), "smoke-copilot", "2026-01-01", ForecastConfig{
206+
Days: 30,
207+
Period: "month",
208+
SampleSize: 100,
209+
}, 30)
210+
require.NoError(t, err)
211+
assert.Equal(t, 2, result.SampledRuns, "skipped runs should not be sampled")
212+
assert.Equal(t, 150, result.AvgEffectiveTokens, "metrics should ignore skipped runs")
213+
assert.InEpsilon(t, 0.5, result.SuccessRate, 1e-9)
214+
}
215+
216+
func TestRenderForecastTable_ZeroMonteCarloRangeRendersDash(t *testing.T) {
217+
reader, writer, err := os.Pipe()
218+
require.NoError(t, err)
219+
originalStderr := os.Stderr
220+
os.Stderr = writer
221+
t.Cleanup(func() {
222+
os.Stderr = originalStderr
223+
})
224+
225+
err = renderForecastTable(ForecastResult{
226+
Period: "month",
227+
Workflows: []ForecastWorkflowResult{
228+
{
229+
WorkflowID: "smoke-copilot",
230+
SampledRuns: 1,
231+
SuccessRate: 1,
232+
Yield: 1,
233+
MonteCarlo: &ForecastMonteCarloSummary{
234+
P10ProjectedEffectiveTokens: 0,
235+
P50ProjectedEffectiveTokens: 0,
236+
P90ProjectedEffectiveTokens: 0,
237+
},
238+
},
239+
},
240+
}, ForecastConfig{Days: 30, Period: "month"})
241+
require.NoError(t, err)
242+
243+
require.NoError(t, writer.Close())
244+
out, readErr := io.ReadAll(reader)
245+
require.NoError(t, readErr)
246+
assert.NotContains(t, string(out), "-–-")
247+
}

0 commit comments

Comments
 (0)