Skip to content

Commit 6dd7577

Browse files
authored
feat: validate remote archive/opcode caches against the origin (#195)
## Summary Before this change, `pkg/executor` cached every HTTP(S) download under a path derived from `sha256(URL)`. Once downloaded, the cached file was reused forever for the same URL with zero revalidation against the origin. If you republished a `.tar.gz` at the same URL (common for GitHub release assets, re-built CI artifacts), every runner kept using the stale copy indefinitely — you had to manually delete the cache dir to force a re-download. This PR validates cached remote files against the origin on every resolve using standard HTTP validators (`ETag` / `Last-Modified` / `Content-Length`): - On each resolve, send a HEAD to the origin and compare the response to a sidecar `<cached-file>.meta` JSON stored next to the cached blob. - **Validators match** → reuse cache (just cost: one HEAD roundtrip). - **Validators differ** → re-download and refresh the sidecar. - **No validators returned** or **HEAD fails** → reuse the cache with a warning so transient network errors or dumb servers don't force re-downloads. - **Legacy cache entry without a sidecar** (populated by an older benchmarkoor) → re-download once to refresh the validators. Applies to all three HTTP cache sites in `pkg/executor`: - `archive.file` — single file - `archive.parts[]` — each part validated independently - `archive.parts` combined concatenated file — rebuilt when any underlying part was re-downloaded - `opcode_source.file` ### Cache layout ``` <cacheDir>/archive-<hash> # the cached file (unchanged) <cacheDir>/archive-<hash>.meta # new: {url, etag, last_modified, content_length} ``` ## Test plan - [x] Unit tests (`pkg/executor/cache_test.go`): - `TestFetchCached_HitWhenValidatorsMatch` - `TestFetchCached_MissWhenETagChanges` - `TestFetchCached_LegacySidecarMissingReDownloads` - `TestFetchCached_NoValidatorsFallsBackToCache` - `TestFetchCached_HeadFailureFallsBackToCache` - `TestFetchCached_CacheMissCoexistsWithOtherPrefixes` - [x] `TestArchiveSource_PartsReconcatenateOnOriginChange` verifies that flipping a part's ETag triggers re-concatenation of the combined archive - [x] All existing archive_source tests still pass - [x] `go vet -tags exclude_graphdriver_btrfs ./...` clean - [x] Manual: republish a `.tar.gz` at a previously-used URL and confirm the next run re-downloads; confirm an unchanged URL continues to use the cache (only adds one HEAD roundtrip)
1 parent d72dfae commit 6dd7577

5 files changed

Lines changed: 697 additions & 134 deletions

File tree

pkg/executor/archive_source.go

Lines changed: 48 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,12 @@ func (s *ArchiveSource) GetSourceInfo() (*SuiteSource, error) {
107107
return &SuiteSource{Archive: info}, nil
108108
}
109109

110-
// resolveFile returns the local path to the archive file. For URLs, it checks
111-
// the cache directory first and only downloads if the file is not already cached.
112-
// When the config uses `parts`, the parts are downloaded (with caching) and
113-
// concatenated into a single cached file.
110+
// resolveFile returns the local path to the archive file. For URLs, it
111+
// checks the cache directory first and validates any existing cached copy
112+
// against the origin (via HEAD + ETag/Last-Modified) before reuse. If the
113+
// remote has changed or has no cache entry yet, a fresh copy is downloaded.
114+
// When the config uses `parts`, the parts are resolved (each with its own
115+
// validation) and concatenated into a single cached file.
114116
func (s *ArchiveSource) resolveFile(ctx context.Context) (string, error) {
115117
if len(s.cfg.Parts) > 0 {
116118
return s.resolvePartsFile(ctx)
@@ -119,43 +121,14 @@ func (s *ArchiveSource) resolveFile(ctx context.Context) (string, error) {
119121
file := s.cfg.File
120122

121123
if strings.HasPrefix(file, "http://") || strings.HasPrefix(file, "https://") {
122-
cachedPath := s.cachedArchivePath()
123-
124-
if _, err := os.Stat(cachedPath); err == nil {
125-
s.log.WithFields(logrus.Fields{
126-
"url": file,
127-
"path": cachedPath,
128-
}).Info("Using cached archive")
129-
130-
return cachedPath, nil
131-
}
132-
133-
s.log.WithField("url", file).Info("Downloading archive")
134-
135124
downloadURL, token := s.resolveDownloadURL(file)
136125

137-
// Download to a temp file first, then rename for atomic cache writes.
138-
tmpPath := cachedPath + ".tmp"
139-
140-
if err := os.MkdirAll(filepath.Dir(cachedPath), 0755); err != nil {
141-
return "", fmt.Errorf("creating cache directory: %w", err)
142-
}
143-
144-
if err := downloadToFile(ctx, downloadURL, tmpPath, token, s.log); err != nil {
145-
_ = os.Remove(tmpPath)
146-
126+
res, err := fetchCached(ctx, s.log, file, downloadURL, token, s.cacheDir, "archive")
127+
if err != nil {
147128
return "", err
148129
}
149130

150-
if err := os.Rename(tmpPath, cachedPath); err != nil {
151-
_ = os.Remove(tmpPath)
152-
153-
return "", fmt.Errorf("caching archive: %w", err)
154-
}
155-
156-
s.log.WithField("path", cachedPath).Info("Archive cached")
157-
158-
return cachedPath, nil
131+
return res.Path, nil
159132
}
160133

161134
// Local file path — resolve relative paths.
@@ -175,62 +148,60 @@ func (s *ArchiveSource) resolveFile(ctx context.Context) (string, error) {
175148
return file, nil
176149
}
177150

178-
// cachedArchivePath returns a stable file path in the cache directory derived
179-
// from the configured URL, so repeated runs reuse the same downloaded file.
180-
func (s *ArchiveSource) cachedArchivePath() string {
181-
hash := sha256.Sum256([]byte(s.cfg.File))
182-
name := "archive-" + hex.EncodeToString(hash[:8])
183-
151+
// resolvePartsFile resolves each configured part (validating HTTP caches
152+
// against the origin) and concatenates them into a single combined cache
153+
// file. The combined file is rebuilt whenever any part was (re-)downloaded
154+
// during resolution, so updating a part on the origin causes the combined
155+
// archive to be regenerated on the next run.
156+
func (s *ArchiveSource) resolvePartsFile(ctx context.Context) (string, error) {
184157
cacheDir := s.cacheDir
185158
if cacheDir == "" {
186159
cacheDir = os.TempDir()
187160
}
188161

189-
return filepath.Join(cacheDir, name)
190-
}
191-
192-
// resolvePartsFile downloads (with caching) all configured parts and
193-
// concatenates them in order into a single cached file, returning its path.
194-
// Local paths and URLs can be mixed freely in the parts list.
195-
func (s *ArchiveSource) resolvePartsFile(ctx context.Context) (string, error) {
196-
cacheDir := s.cacheDir
197-
if cacheDir == "" {
198-
cacheDir = os.TempDir()
162+
if err := os.MkdirAll(cacheDir, 0755); err != nil {
163+
return "", fmt.Errorf("creating cache directory: %w", err)
199164
}
200165

201166
// Combined cache key is derived from the full ordered parts list so any
202167
// change to the list produces a fresh combined file.
203168
combined := sha256.Sum256([]byte(strings.Join(s.cfg.Parts, "\n")))
204169
combinedPath := filepath.Join(cacheDir, "archive-parts-"+hex.EncodeToString(combined[:8]))
205170

206-
if _, err := os.Stat(combinedPath); err == nil {
207-
s.log.WithField("path", combinedPath).Info("Using cached combined archive")
208-
209-
return combinedPath, nil
210-
}
211-
212-
if err := os.MkdirAll(cacheDir, 0755); err != nil {
213-
return "", fmt.Errorf("creating cache directory: %w", err)
214-
}
215-
216-
// Resolve each part to a local file path (downloading URLs as needed).
171+
// Resolve each part to a local file path (validating URL caches).
217172
partPaths := make([]string, 0, len(s.cfg.Parts))
218173

174+
anyPartChanged := false
175+
219176
for i, part := range s.cfg.Parts {
220177
s.log.WithFields(logrus.Fields{
221178
"part": i + 1,
222179
"total": len(s.cfg.Parts),
223180
"ref": part,
224181
}).Info("Resolving archive part")
225182

226-
partPath, err := s.resolvePart(ctx, part, cacheDir)
183+
partPath, changed, err := s.resolvePart(ctx, part, cacheDir)
227184
if err != nil {
228185
return "", fmt.Errorf("resolving part %d (%s): %w", i+1, part, err)
229186
}
230187

188+
if changed {
189+
anyPartChanged = true
190+
}
191+
231192
partPaths = append(partPaths, partPath)
232193
}
233194

195+
// Reuse the combined file only when it exists AND no underlying part
196+
// was refreshed during this call.
197+
if !anyPartChanged {
198+
if _, err := os.Stat(combinedPath); err == nil {
199+
s.log.WithField("path", combinedPath).Info("Using cached combined archive")
200+
201+
return combinedPath, nil
202+
}
203+
}
204+
234205
// Concatenate all parts into a temporary file, then atomically rename.
235206
tmpPath := combinedPath + ".tmp"
236207

@@ -251,56 +222,38 @@ func (s *ArchiveSource) resolvePartsFile(ctx context.Context) (string, error) {
251222
return combinedPath, nil
252223
}
253224

254-
// resolvePart resolves a single part reference (URL or local path) to a local
255-
// file path, downloading and caching remote parts in cacheDir.
256-
func (s *ArchiveSource) resolvePart(ctx context.Context, part, cacheDir string) (string, error) {
225+
// resolvePart resolves a single part reference (URL or local path) to a
226+
// local file path. Remote parts are cache-validated via fetchCached.
227+
// Returns (path, changed, error), where `changed` is true when the call
228+
// produced a fresh download; used by resolvePartsFile to know when the
229+
// concatenated combined file needs to be rebuilt.
230+
func (s *ArchiveSource) resolvePart(ctx context.Context, part, cacheDir string) (string, bool, error) {
257231
if strings.HasPrefix(part, "http://") || strings.HasPrefix(part, "https://") {
258-
hash := sha256.Sum256([]byte(part))
259-
cachedPath := filepath.Join(cacheDir, "archive-part-"+hex.EncodeToString(hash[:8]))
260-
261-
if _, err := os.Stat(cachedPath); err == nil {
262-
s.log.WithFields(logrus.Fields{
263-
"url": part,
264-
"path": cachedPath,
265-
}).Info("Using cached archive part")
266-
267-
return cachedPath, nil
268-
}
269-
270232
downloadURL, token := s.resolveDownloadURL(part)
271233

272-
tmpPath := cachedPath + ".tmp"
273-
274-
if err := downloadToFile(ctx, downloadURL, tmpPath, token, s.log); err != nil {
275-
_ = os.Remove(tmpPath)
276-
277-
return "", err
278-
}
279-
280-
if err := os.Rename(tmpPath, cachedPath); err != nil {
281-
_ = os.Remove(tmpPath)
282-
283-
return "", fmt.Errorf("caching archive part: %w", err)
234+
res, err := fetchCached(ctx, s.log, part, downloadURL, token, cacheDir, "archive-part")
235+
if err != nil {
236+
return "", false, err
284237
}
285238

286-
return cachedPath, nil
239+
return res.Path, res.Changed, nil
287240
}
288241

289242
// Local file path — resolve relative paths.
290243
if !filepath.IsAbs(part) {
291244
absPath, err := filepath.Abs(part)
292245
if err != nil {
293-
return "", fmt.Errorf("resolving path %q: %w", part, err)
246+
return "", false, fmt.Errorf("resolving path %q: %w", part, err)
294247
}
295248

296249
part = absPath
297250
}
298251

299252
if _, err := os.Stat(part); os.IsNotExist(err) {
300-
return "", fmt.Errorf("archive part %q does not exist", part)
253+
return "", false, fmt.Errorf("archive part %q does not exist", part)
301254
}
302255

303-
return part, nil
256+
return part, false, nil
304257
}
305258

306259
// concatFiles concatenates src files (in order) into dst. dst is created (or

pkg/executor/archive_source_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@ import (
77
"compress/gzip"
88
"context"
99
"crypto/rand"
10+
"crypto/sha256"
11+
"encoding/hex"
1012
"io"
1113
"net/http"
1214
"net/http/httptest"
1315
"os"
1416
"path/filepath"
1517
"strconv"
18+
"strings"
19+
"sync/atomic"
1620
"testing"
1721
"time"
1822

@@ -356,6 +360,131 @@ func splitBytes(b []byte, n int) [][]byte {
356360
return chunks
357361
}
358362

363+
func TestArchiveSource_PartsReconcatenateOnOriginChange(t *testing.T) {
364+
tmpDir := t.TempDir()
365+
366+
// Build two tar.gz archives, A (one test) and B (two tests). Split
367+
// each into two parts. Serve A's parts initially, then flip the ETag
368+
// on the second part's server to serve B's second half and verify the
369+
// combined file is regenerated on the next Prepare().
370+
pathA := filepath.Join(tmpDir, "a.tar.gz")
371+
createTestTarGz(t, pathA, map[string]string{
372+
"mytest/test/a.txt": "content-a",
373+
})
374+
pathB := filepath.Join(tmpDir, "b.tar.gz")
375+
createTestTarGz(t, pathB, map[string]string{
376+
"mytest/test/b1.txt": "content-b1",
377+
"mytest/test/b2.txt": "content-b2",
378+
})
379+
380+
bytesA, err := os.ReadFile(pathA)
381+
require.NoError(t, err)
382+
bytesB, err := os.ReadFile(pathB)
383+
require.NoError(t, err)
384+
385+
chunksA := splitBytes(bytesA, 2)
386+
chunksB := splitBytes(bytesB, 2)
387+
388+
var currentPart2 atomic.Value
389+
currentPart2.Store(chunksA[1])
390+
391+
var etagPart2 atomic.Value
392+
etagPart2.Store(`"a"`)
393+
394+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
395+
var data []byte
396+
var etag string
397+
398+
switch r.URL.Path {
399+
case "/part1":
400+
data = chunksA[0]
401+
etag = `"a"`
402+
case "/part2":
403+
data = currentPart2.Load().([]byte)
404+
etag = etagPart2.Load().(string)
405+
default:
406+
http.NotFound(w, r)
407+
return
408+
}
409+
410+
w.Header().Set("ETag", etag)
411+
w.Header().Set("Content-Length", strconv.Itoa(len(data)))
412+
w.WriteHeader(http.StatusOK)
413+
414+
if r.Method == http.MethodGet {
415+
_, _ = w.Write(data)
416+
}
417+
}))
418+
defer srv.Close()
419+
420+
log := logrus.New()
421+
log.SetLevel(logrus.DebugLevel)
422+
423+
makeSrc := func() *ArchiveSource {
424+
return &ArchiveSource{
425+
log: log.WithField("source", "archive"),
426+
cacheDir: tmpDir,
427+
cfg: &config.ArchiveSourceConfig{
428+
Parts: []string{srv.URL + "/part1", srv.URL + "/part2"},
429+
Steps: &config.StepsConfig{
430+
Test: []string{"mytest/test/*"},
431+
},
432+
},
433+
}
434+
}
435+
436+
// First run: downloads both parts and writes the combined archive.
437+
s1 := makeSrc()
438+
result, err := s1.Prepare(context.Background())
439+
require.NoError(t, err)
440+
assert.Equal(t, 1, len(result.Tests), "first run sees only A's single test")
441+
require.NoError(t, s1.Cleanup())
442+
443+
// Origin publishes a new second half with a different ETag. The
444+
// rebuilt combined archive must reflect it.
445+
currentPart2.Store(chunksB[1])
446+
etagPart2.Store(`"b"`)
447+
448+
// We also need to replace part1 on the origin since A and B differ
449+
// from byte 0 — but here we care about the control flow, so instead
450+
// we make A -> A+B2 (first half of A, second half of B). That's
451+
// gibberish as a tar.gz; switch both halves to B.
452+
// Simpler: also update part1 so the combined is B.
453+
// Since we only have one endpoint for /part1, stub it out:
454+
// -> we already serve chunksA[0] unconditionally. Update via a flag.
455+
//
456+
// For this test the primary assertion is that the re-download of
457+
// part2 (ETag change) triggers a rebuild of the combined file, even
458+
// if its content is garbled.
459+
460+
s2 := makeSrc()
461+
_, err = s2.Prepare(context.Background())
462+
// Extraction will likely fail (chunksA[0] + chunksB[1] is not a valid
463+
// tar.gz). We only care that the combined cached file got rebuilt.
464+
_ = err
465+
require.NoError(t, s2.Cleanup())
466+
467+
// Verify: the combined cached file now has byte content reflecting
468+
// chunksA[0] + chunksB[1], not the original chunksA[0] + chunksA[1].
469+
combinedKey := filepath.Join(
470+
tmpDir,
471+
"archive-parts-"+shortHashCombined(srv.URL+"/part1", srv.URL+"/part2"),
472+
)
473+
got, err := os.ReadFile(combinedKey) //nolint:gosec // test-only
474+
require.NoError(t, err)
475+
476+
want := append([]byte{}, chunksA[0]...)
477+
want = append(want, chunksB[1]...)
478+
assert.Equal(t, want, got, "combined file must be rebuilt when any part ETag changes")
479+
}
480+
481+
// shortHashCombined mirrors how resolvePartsFile computes its combined
482+
// cache key from the ordered parts list.
483+
func shortHashCombined(parts ...string) string {
484+
h := sha256.Sum256([]byte(strings.Join(parts, "\n")))
485+
return hex.EncodeToString(h[:8])
486+
}
487+
359488
func TestArchiveSource_PrepareWithParts(t *testing.T) {
360489
tmpDir := t.TempDir()
361490

0 commit comments

Comments
 (0)