Skip to content

Commit 49aff3c

Browse files
committed
provider: use after cursor and backoff when waiting for jobs
1 parent f26f0fd commit 49aff3c

4 files changed

Lines changed: 137 additions & 8 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.25.6
55
require (
66
cdr.dev/slog/v3 v3.0.0-rc1
77
github.com/coder/coder/v2 v2.30.1
8+
github.com/coder/retry v1.5.1
89
github.com/docker/docker v28.5.2+incompatible
910
github.com/docker/go-connections v0.6.0
1011
github.com/google/uuid v1.6.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ github.com/coder/coder/v2 v2.30.1 h1:5dxGKxWx80xb6lNd958y8Y4h3fBbQubDgIooHTTv/RQ
122122
github.com/coder/coder/v2 v2.30.1/go.mod h1:w40ThqnpVr727SVnu3wwUrK2woxNx1MrV1zVxxABimk=
123123
github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0 h1:3A0ES21Ke+FxEM8CXx9n47SZOKOpgSE1bbJzlE4qPVs=
124124
github.com/coder/pretty v0.0.0-20230908205945-e89ba86370e0/go.mod h1:5UuS2Ts+nTToAMeOjNlnHFkPahrtDkmpydBen/3wgZc=
125+
github.com/coder/retry v1.5.1 h1:iWu8YnD8YqHs3XwqrqsjoBTAVqT9ml6z9ViJ2wlMiqc=
126+
github.com/coder/retry v1.5.1/go.mod h1:blHMk9vs6LkoRT9ZHyuZo360cufXEhrxqvEzeMtRGoY=
125127
github.com/coder/serpent v0.13.0 h1:6EoWjpEypkb8cS6i0eCF4qoAv9vrEVaX26RW+3FMMvo=
126128
github.com/coder/serpent v0.13.0/go.mod h1:7OIvFBYMd+OqarMy5einBl8AtRr8LliopVU7pyrwucY=
127129
github.com/coder/terraform-provider-coder/v2 v2.13.1 h1:dtPaJUvueFm+XwBPUMWQCc5Z1QUQBW4B4RNyzX4h4y8=

internal/provider/template_resource.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import (
88
"io"
99
"slices"
1010
"strings"
11+
"time"
1112

1213
"cdr.dev/slog/v3"
1314
"github.com/coder/coder/v2/coderd/util/ptr"
1415
"github.com/coder/coder/v2/codersdk"
1516
"github.com/coder/coder/v2/provisionersdk"
17+
"github.com/coder/retry"
1618
"github.com/coder/terraform-provider-coderd/internal/codersdkvalidator"
1719
"github.com/google/uuid"
1820
"github.com/hashicorp/terraform-plugin-framework-validators/listvalidator"
@@ -1105,21 +1107,31 @@ func uploadDirectory(ctx context.Context, client *codersdk.Client, logger slog.L
11051107
func waitForJob(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion) ([]codersdk.ProvisionerJobLog, error) {
11061108
const maxRetries = 3
11071109
var allLogs []codersdk.ProvisionerJobLog
1108-
for retries := 0; retries < maxRetries; retries++ {
1109-
logs, done, err := waitForJobOnce(ctx, client, version)
1110+
var lastLogID int64
1111+
1112+
for attempts, retrier := 0, retry.New(500*time.Millisecond, 5*time.Second); attempts < maxRetries && retrier.Wait(ctx); attempts++ {
1113+
logs, done, err := waitForJobOnce(ctx, client, version, lastLogID)
11101114
allLogs = append(allLogs, logs...)
1115+
if len(logs) > 0 {
1116+
lastLogID = logs[len(logs)-1].ID
1117+
}
11111118
if err != nil {
11121119
return allLogs, err
11131120
}
11141121
if done {
11151122
return allLogs, nil
11161123
}
1124+
tflog.Warn(ctx, fmt.Sprintf("provisioner job still active, retrying (attempt %d/%d, delay=%s)", attempts+1, maxRetries, retrier.Delay))
1125+
}
1126+
1127+
if err := ctx.Err(); err != nil {
1128+
return allLogs, err
11171129
}
11181130
return allLogs, fmt.Errorf("provisioner job did not complete after %d retries", maxRetries)
11191131
}
11201132

1121-
func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion) ([]codersdk.ProvisionerJobLog, bool, error) {
1122-
logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, 0)
1133+
func waitForJobOnce(ctx context.Context, client *codersdk.Client, version *codersdk.TemplateVersion, after int64) ([]codersdk.ProvisionerJobLog, bool, error) {
1134+
logs, closer, err := client.TemplateVersionLogsAfter(ctx, version.ID, after)
11231135
if err != nil {
11241136
return nil, false, fmt.Errorf("begin streaming logs: %w", err)
11251137
}

internal/provider/wait_for_job_test.go

Lines changed: 118 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestWaitForJobOnce_Success(t *testing.T) {
5353
client := codersdk.New(srvURL)
5454

5555
version := &codersdk.TemplateVersion{ID: versionID}
56-
logs, done, err := waitForJobOnce(context.Background(), client, version)
56+
logs, done, err := waitForJobOnce(context.Background(), client, version, 0)
5757
require.NoError(t, err)
5858
require.True(t, done)
5959
require.Len(t, logs, 1)
@@ -92,7 +92,7 @@ func TestWaitForJobOnce_JobFailed(t *testing.T) {
9292
client := codersdk.New(srvURL)
9393

9494
version := &codersdk.TemplateVersion{ID: versionID}
95-
_, done, err := waitForJobOnce(context.Background(), client, version)
95+
_, done, err := waitForJobOnce(context.Background(), client, version, 0)
9696
require.Error(t, err)
9797
require.False(t, done)
9898
require.Contains(t, err.Error(), "provisioner job did not succeed")
@@ -130,7 +130,7 @@ func TestWaitForJobOnce_StillActive(t *testing.T) {
130130
client := codersdk.New(srvURL)
131131

132132
version := &codersdk.TemplateVersion{ID: versionID}
133-
_, done, err := waitForJobOnce(context.Background(), client, version)
133+
_, done, err := waitForJobOnce(context.Background(), client, version, 0)
134134
require.NoError(t, err)
135135
require.False(t, done)
136136
}
@@ -178,18 +178,26 @@ func TestWaitForJob_SucceedsOnRetry(t *testing.T) {
178178
t.Parallel()
179179
versionID := uuid.New()
180180
var versionCallCount atomic.Int32
181+
var wsCallCount atomic.Int32
182+
var secondAfter atomic.Value
183+
secondAfter.Store("")
181184

182185
handler := http.NewServeMux()
183186
handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) {
184187
if strings.Contains(r.URL.RawQuery, "follow") {
188+
call := wsCallCount.Add(1)
189+
if call == 2 {
190+
secondAfter.Store(r.URL.Query().Get("after"))
191+
}
192+
185193
conn, err := websocket.Accept(w, r, nil)
186194
if err != nil {
187195
http.Error(w, err.Error(), http.StatusInternalServerError)
188196
return
189197
}
190198
ctx := r.Context()
191199
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{
192-
ID: int64(versionCallCount.Load()),
200+
ID: int64(call),
193201
Output: "log line",
194202
})
195203
_ = conn.Close(websocket.StatusNormalClosure, "done")
@@ -219,4 +227,110 @@ func TestWaitForJob_SucceedsOnRetry(t *testing.T) {
219227
logs, err := waitForJob(context.Background(), client, version)
220228
require.NoError(t, err)
221229
require.Len(t, logs, 2)
230+
require.Equal(t, int64(1), logs[0].ID)
231+
require.Equal(t, int64(2), logs[1].ID)
232+
require.Equal(t, "1", secondAfter.Load())
233+
}
234+
235+
func TestWaitForJob_UsesAfterCursorAcrossRetries(t *testing.T) {
236+
t.Parallel()
237+
versionID := uuid.New()
238+
var versionCallCount atomic.Int32
239+
var wsCallCount atomic.Int32
240+
var secondAfter atomic.Value
241+
secondAfter.Store("")
242+
243+
handler := http.NewServeMux()
244+
handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) {
245+
if strings.Contains(r.URL.RawQuery, "follow") {
246+
call := wsCallCount.Add(1)
247+
if call == 2 {
248+
secondAfter.Store(r.URL.Query().Get("after"))
249+
}
250+
251+
conn, err := websocket.Accept(w, r, nil)
252+
if err != nil {
253+
http.Error(w, err.Error(), http.StatusInternalServerError)
254+
return
255+
}
256+
ctx := r.Context()
257+
if call == 1 {
258+
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 1, Output: "log 1"})
259+
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 2, Output: "log 2"})
260+
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 3, Output: "log 3"})
261+
} else {
262+
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 4, Output: "log 4"})
263+
_ = wsjson.Write(ctx, conn, codersdk.ProvisionerJobLog{ID: 5, Output: "log 5"})
264+
}
265+
_ = conn.Close(websocket.StatusNormalClosure, "done")
266+
return
267+
}
268+
269+
count := versionCallCount.Add(1)
270+
status := codersdk.ProvisionerJobRunning
271+
if count >= 2 {
272+
status = codersdk.ProvisionerJobSucceeded
273+
}
274+
w.Header().Set("Content-Type", "application/json")
275+
_ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{
276+
ID: versionID,
277+
Job: codersdk.ProvisionerJob{Status: status},
278+
})
279+
})
280+
281+
srv := httptest.NewServer(handler)
282+
t.Cleanup(srv.Close)
283+
srvURL, err := url.Parse(srv.URL)
284+
require.NoError(t, err)
285+
client := codersdk.New(srvURL)
286+
287+
version := &codersdk.TemplateVersion{ID: versionID}
288+
logs, err := waitForJob(context.Background(), client, version)
289+
require.NoError(t, err)
290+
require.Len(t, logs, 5)
291+
for i, log := range logs {
292+
require.Equal(t, int64(i+1), log.ID)
293+
}
294+
require.Equal(t, int32(2), wsCallCount.Load())
295+
require.Equal(t, "3", secondAfter.Load())
296+
}
297+
298+
func TestWaitForJob_ContextCanceledDuringBackoff(t *testing.T) {
299+
t.Parallel()
300+
versionID := uuid.New()
301+
ctx, cancel := context.WithCancel(context.Background())
302+
t.Cleanup(cancel)
303+
var statusCallCount atomic.Int32
304+
305+
handler := http.NewServeMux()
306+
handler.HandleFunc("/api/v2/templateversions/", func(w http.ResponseWriter, r *http.Request) {
307+
if strings.Contains(r.URL.RawQuery, "follow") {
308+
conn, err := websocket.Accept(w, r, nil)
309+
if err != nil {
310+
http.Error(w, err.Error(), http.StatusInternalServerError)
311+
return
312+
}
313+
_ = conn.Close(websocket.StatusNormalClosure, "done")
314+
return
315+
}
316+
317+
w.Header().Set("Content-Type", "application/json")
318+
_ = json.NewEncoder(w).Encode(codersdk.TemplateVersion{
319+
ID: versionID,
320+
Job: codersdk.ProvisionerJob{Status: codersdk.ProvisionerJobRunning},
321+
})
322+
if statusCallCount.Add(1) == 1 {
323+
cancel()
324+
}
325+
})
326+
327+
srv := httptest.NewServer(handler)
328+
t.Cleanup(srv.Close)
329+
srvURL, err := url.Parse(srv.URL)
330+
require.NoError(t, err)
331+
client := codersdk.New(srvURL)
332+
333+
version := &codersdk.TemplateVersion{ID: versionID}
334+
_, err = waitForJob(ctx, client, version)
335+
require.ErrorIs(t, err, context.Canceled)
222336
}

0 commit comments

Comments
 (0)