Skip to content

Commit bef262f

Browse files
fix(executor): validate download and cache completeness before reuse (#251)
Cached artifacts were trusted on presence alone, so an incomplete write could be served as valid indefinitely. Range downloads ignored io.Copy's byte count, so a 206 body that ended early left a zero-filled hole in the pre-allocated file with no error. downloadChunk now verifies it received the full requested range, and the single-GET path checks the body against the advertised size. EEST preparation decided the cache was warm from the fixtures directory existing, but fixtures and genesis extract in two steps. A run interrupted between them was reused forever with genesis missing. Reuse is now gated on a completion marker written only after both steps succeed, and a partial cache is cleared and re-downloaded on the next run.
1 parent 0e004df commit bef262f

3 files changed

Lines changed: 322 additions & 15 deletions

File tree

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
package executor
2+
3+
import (
4+
"archive/tar"
5+
"bytes"
6+
"compress/gzip"
7+
"context"
8+
"fmt"
9+
"io"
10+
"net/http"
11+
"net/http/httptest"
12+
"os"
13+
"path/filepath"
14+
"strconv"
15+
"strings"
16+
"sync/atomic"
17+
"testing"
18+
19+
"github.com/ethpandaops/benchmarkoor/pkg/config"
20+
"github.com/sirupsen/logrus"
21+
)
22+
23+
func quietLog() logrus.FieldLogger {
24+
l := logrus.New()
25+
l.SetOutput(io.Discard)
26+
27+
return l
28+
}
29+
30+
func exists(path string) bool {
31+
_, err := os.Stat(path)
32+
33+
return err == nil
34+
}
35+
36+
const (
37+
rangeTotal = 30 * 1024 * 1024 // large enough to use the parallel range path
38+
fillByte = 0xAB
39+
testETag = `"etag-v1"`
40+
)
41+
42+
// rangeServer serves HEAD with range support and serves ranged GETs in full,
43+
// except the chunk starting at offset 0 is truncated when shortFirst is set, to
44+
// emulate a server that ends a 206 body early at a clean EOF.
45+
func rangeServer(t *testing.T, shortFirst bool, gets *int64) *httptest.Server {
46+
t.Helper()
47+
48+
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
49+
w.Header().Set("Accept-Ranges", "bytes")
50+
w.Header().Set("ETag", testETag)
51+
52+
if r.Method == http.MethodHead {
53+
w.Header().Set("Content-Length", strconv.Itoa(rangeTotal))
54+
w.WriteHeader(http.StatusOK)
55+
56+
return
57+
}
58+
59+
if gets != nil {
60+
atomic.AddInt64(gets, 1)
61+
}
62+
63+
spec := strings.TrimPrefix(r.Header.Get("Range"), "bytes=")
64+
parts := strings.SplitN(spec, "-", 2)
65+
start, _ := strconv.ParseInt(parts[0], 10, 64)
66+
end, _ := strconv.ParseInt(parts[1], 10, 64)
67+
68+
sendLen := end - start + 1
69+
if shortFirst && start == 0 {
70+
sendLen /= 2
71+
}
72+
73+
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, rangeTotal))
74+
w.WriteHeader(http.StatusPartialContent)
75+
_, _ = w.Write(bytes.Repeat([]byte{fillByte}, int(sendLen)))
76+
}))
77+
}
78+
79+
// A range download that is missing bytes must fail loudly rather than caching a
80+
// file with a zero hole.
81+
func TestParallelDownloadRejectsShortChunk(t *testing.T) {
82+
srv := rangeServer(t, true, nil)
83+
defer srv.Close()
84+
85+
cacheDir := t.TempDir()
86+
87+
_, err := fetchCached(context.Background(), quietLog(), srv.URL, srv.URL, "", cacheDir, "short")
88+
if err == nil {
89+
t.Fatal("expected an error for a truncated chunk, got nil")
90+
}
91+
92+
if !strings.Contains(err.Error(), "short chunk") {
93+
t.Fatalf("expected a short-chunk error, got: %v", err)
94+
}
95+
96+
// No file should be promoted into the cache on failure.
97+
if path := cachePath(cacheDir, "short", srv.URL); exists(path) {
98+
t.Fatalf("a corrupt file was left in the cache at %s", path)
99+
}
100+
}
101+
102+
// A fully delivered range download still works and yields the exact bytes.
103+
func TestParallelDownloadAcceptsCompleteFile(t *testing.T) {
104+
srv := rangeServer(t, false, nil)
105+
defer srv.Close()
106+
107+
res, err := fetchCached(context.Background(), quietLog(), srv.URL, srv.URL, "", t.TempDir(), "ok")
108+
if err != nil {
109+
t.Fatalf("complete download errored: %v", err)
110+
}
111+
112+
data, err := os.ReadFile(res.Path)
113+
if err != nil {
114+
t.Fatalf("reading cached file: %v", err)
115+
}
116+
117+
if len(data) != rangeTotal {
118+
t.Fatalf("size = %d, want %d", len(data), rangeTotal)
119+
}
120+
121+
for i, b := range data {
122+
if b != fillByte {
123+
t.Fatalf("unexpected byte 0x%02x at offset %d", b, i)
124+
}
125+
}
126+
}
127+
128+
// A non-range download whose body is shorter than the advertised size must be
129+
// rejected, not cached.
130+
func TestSequentialDownloadRejectsShortBody(t *testing.T) {
131+
const total = 1024 * 1024
132+
133+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
134+
// No Accept-Ranges: forces the single-GET path.
135+
w.Header().Set("ETag", testETag)
136+
137+
if r.Method == http.MethodHead {
138+
w.Header().Set("Content-Length", strconv.Itoa(total))
139+
w.WriteHeader(http.StatusOK)
140+
141+
return
142+
}
143+
144+
w.Header().Set("Content-Length", strconv.Itoa(total/2))
145+
w.WriteHeader(http.StatusOK)
146+
_, _ = w.Write(bytes.Repeat([]byte{fillByte}, total/2))
147+
}))
148+
defer srv.Close()
149+
150+
cacheDir := t.TempDir()
151+
152+
_, err := fetchCached(context.Background(), quietLog(), srv.URL, srv.URL, "", cacheDir, "seq")
153+
if err == nil {
154+
t.Fatal("expected an error for a truncated body, got nil")
155+
}
156+
157+
if path := cachePath(cacheDir, "seq", srv.URL); exists(path) {
158+
t.Fatalf("a corrupt file was left in the cache at %s", path)
159+
}
160+
}
161+
162+
// dirTarGz builds a gzipped tar containing the given directory entries.
163+
func dirTarGz(t *testing.T, dirs ...string) []byte {
164+
t.Helper()
165+
166+
var buf bytes.Buffer
167+
gz := gzip.NewWriter(&buf)
168+
tw := tar.NewWriter(gz)
169+
170+
for _, d := range dirs {
171+
if err := tw.WriteHeader(&tar.Header{
172+
Name: d + "/",
173+
Typeflag: tar.TypeDir,
174+
Mode: 0o755,
175+
}); err != nil {
176+
t.Fatal(err)
177+
}
178+
}
179+
180+
if err := tw.Close(); err != nil {
181+
t.Fatal(err)
182+
}
183+
184+
if err := gz.Close(); err != nil {
185+
t.Fatal(err)
186+
}
187+
188+
return buf.Bytes()
189+
}
190+
191+
// An interrupted extraction (genesis fails after fixtures succeed) must not be
192+
// treated as a complete cache: the next run re-downloads, and once both halves
193+
// are present the cache is reused without downloading again.
194+
func TestEESTCacheReDownloadsUntilComplete(t *testing.T) {
195+
fixturesTar := dirTarGz(t, config.DefaultEESTFixturesSubdir)
196+
genesisTar := dirTarGz(t)
197+
198+
var genesisServed int64
199+
var failGenesis atomic.Bool
200+
failGenesis.Store(true)
201+
202+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
203+
if strings.Contains(r.URL.Path, "genesis") {
204+
if failGenesis.Load() {
205+
http.Error(w, "interrupted", http.StatusInternalServerError)
206+
207+
return
208+
}
209+
210+
atomic.AddInt64(&genesisServed, 1)
211+
_, _ = w.Write(genesisTar)
212+
213+
return
214+
}
215+
216+
_, _ = w.Write(fixturesTar)
217+
}))
218+
defer srv.Close()
219+
220+
cacheDir := t.TempDir()
221+
cfg := &config.EESTFixturesSource{
222+
GitHubRepo: "owner/repo",
223+
GitHubRelease: "v1",
224+
FixturesURL: srv.URL + "/fixtures.tar.gz",
225+
GenesisURL: srv.URL + "/genesis.tar.gz",
226+
}
227+
228+
marker := filepath.Join(cacheDir, "eest", hashRepoURL(cfg.GitHubRepo), cfg.GitHubRelease, ".complete")
229+
230+
newSource := func() *EESTSource {
231+
return NewEESTSource(quietLog(), cfg, cacheDir, nil, "")
232+
}
233+
234+
// First run: genesis fails, so preparation fails and nothing is marked done.
235+
if _, err := newSource().Prepare(context.Background()); err == nil {
236+
t.Fatal("expected first Prepare to fail on genesis")
237+
}
238+
239+
if exists(marker) {
240+
t.Fatal("completion marker must not exist after an interrupted run")
241+
}
242+
243+
// Second run: genesis works now. The partial cache must be re-downloaded.
244+
failGenesis.Store(false)
245+
246+
if _, err := newSource().Prepare(context.Background()); err != nil {
247+
t.Fatalf("second Prepare failed: %v", err)
248+
}
249+
250+
if !exists(marker) {
251+
t.Fatal("completion marker should exist after a successful run")
252+
}
253+
254+
if got := atomic.LoadInt64(&genesisServed); got != 1 {
255+
t.Fatalf("genesis should have been downloaded once, served %d", got)
256+
}
257+
258+
// Third run: a complete cache is reused without downloading again.
259+
if _, err := newSource().Prepare(context.Background()); err != nil {
260+
t.Fatalf("third Prepare failed: %v", err)
261+
}
262+
263+
if got := atomic.LoadInt64(&genesisServed); got != 1 {
264+
t.Fatalf("complete cache was re-downloaded; genesis served %d times", got)
265+
}
266+
}

pkg/executor/eest_source.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -135,23 +135,45 @@ func (s *EESTSource) Prepare(ctx context.Context) (*PreparedSource, error) {
135135
s.fixturesDir = filepath.Join(cacheBase, "fixtures")
136136
s.genesisDir = filepath.Join(cacheBase, "genesis")
137137

138-
// Check if already extracted.
139-
if _, err := os.Stat(s.fixturesDir); os.IsNotExist(err) {
140-
if s.cfg.UseArtifacts() {
141-
s.log.Info("Downloading EEST fixtures from GitHub artifacts")
138+
// Fixtures and genesis are extracted in two steps. The existence of either
139+
// directory does not mean the cache is complete, so gate reuse on a marker
140+
// written only after both steps succeed. Without it, a run interrupted
141+
// between the two steps would be treated as cached and reused forever with
142+
// genesis missing.
143+
completeMarker := filepath.Join(cacheBase, ".complete")
144+
145+
if _, err := os.Stat(completeMarker); err == nil {
146+
s.log.WithField("path", cacheBase).Info("Using cached EEST fixtures")
142147

143-
if err := s.downloadArtifacts(ctx, cacheBase); err != nil {
144-
return nil, fmt.Errorf("downloading artifacts: %w", err)
145-
}
146-
} else {
147-
s.log.Info("Downloading EEST fixtures from GitHub release")
148+
return s.discoverTests()
149+
}
148150

149-
if err := s.downloadAndExtract(ctx, cacheBase); err != nil {
150-
return nil, fmt.Errorf("downloading fixtures: %w", err)
151-
}
151+
// Clear any partial cache left by an earlier interrupted run before
152+
// re-downloading.
153+
if err := os.RemoveAll(s.fixturesDir); err != nil {
154+
return nil, fmt.Errorf("clearing partial fixtures cache: %w", err)
155+
}
156+
157+
if err := os.RemoveAll(s.genesisDir); err != nil {
158+
return nil, fmt.Errorf("clearing partial genesis cache: %w", err)
159+
}
160+
161+
if s.cfg.UseArtifacts() {
162+
s.log.Info("Downloading EEST fixtures from GitHub artifacts")
163+
164+
if err := s.downloadArtifacts(ctx, cacheBase); err != nil {
165+
return nil, fmt.Errorf("downloading artifacts: %w", err)
152166
}
153167
} else {
154-
s.log.WithField("path", cacheBase).Info("Using cached EEST fixtures")
168+
s.log.Info("Downloading EEST fixtures from GitHub release")
169+
170+
if err := s.downloadAndExtract(ctx, cacheBase); err != nil {
171+
return nil, fmt.Errorf("downloading fixtures: %w", err)
172+
}
173+
}
174+
175+
if err := os.WriteFile(completeMarker, nil, 0o644); err != nil {
176+
return nil, fmt.Errorf("writing cache completion marker: %w", err)
155177
}
156178

157179
// Parse fixtures and build tests.

pkg/executor/extract.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ func downloadSequential(
270270

271271
pw := newProgressLogger(log, totalSize)
272272

273-
if _, err := io.Copy(out, io.TeeReader(resp.Body, pw)); err != nil {
273+
written, err := io.Copy(out, io.TeeReader(resp.Body, pw))
274+
if err != nil {
274275
_ = out.Close()
275276
_ = os.Remove(destPath)
276277

@@ -281,6 +282,15 @@ func downloadSequential(
281282
return fmt.Errorf("closing file %s: %w", destPath, err)
282283
}
283284

285+
// When the server told us the size up front, make sure the body delivered
286+
// it in full. A truncated body otherwise becomes a silently corrupt cache
287+
// entry that is reused forever.
288+
if totalSize > 0 && written != totalSize {
289+
_ = os.Remove(destPath)
290+
291+
return fmt.Errorf("incomplete download %s: got %d bytes, want %d", destPath, written, totalSize)
292+
}
293+
284294
log.WithField("size", formatBytes(pw.Written())).Info("Download complete")
285295

286296
return nil
@@ -417,10 +427,19 @@ func downloadChunk(
417427
return fmt.Errorf("seeking to offset %d: %w", start, err)
418428
}
419429

420-
if _, err := io.Copy(f, io.TeeReader(resp.Body, pw)); err != nil {
430+
// A short read here is not reported as an error by io.Copy: a server can
431+
// end the body early at a clean EOF and we would write fewer bytes than the
432+
// range we asked for. Because the destination was pre-allocated, the missing
433+
// bytes stay as a zero-filled hole. Verify we got every byte of the range.
434+
written, err := io.Copy(f, io.TeeReader(resp.Body, pw))
435+
if err != nil {
421436
return fmt.Errorf("writing chunk: %w", err)
422437
}
423438

439+
if want := end - start + 1; written != want {
440+
return fmt.Errorf("short chunk: got %d bytes, want %d", written, want)
441+
}
442+
424443
return nil
425444
}
426445

0 commit comments

Comments
 (0)