Skip to content

Commit 16c5690

Browse files
authored
feat: resume pre-run replay against existing datadir (#219)
## Summary Adds two pieces that together make pre-run replay resumable after an instance crashes mid-replay (e.g. reth dies on payload N, you don't want to re-execute payloads 1..N from scratch): - **`direct` datadir method** — bypasses copy/snapshot/clone and bind-mounts `source_dir` as-is into the container. Cleanup is a no-op so the directory survives the run. Intended to point at a ZFS clone left behind by `--debug.stop-after-prerun`. - **Skip already-applied blocks during pre-run** — after RPC-ready, the lifecycle reads the client's latest block number and passes it as `ExecuteOptions.SkipUntilBlockNumber`. `runStepLines` drops every leading line until it sees the first `engine_newPayload` whose `blockNumber` exceeds that target, then resumes normal processing. Wired into both the `--debug.stop-after-prerun` path (resume) and the main `ExecuteTests` path (idempotent normal runs). Resume workflow: 1. Find the ZFS clone path the prior `--debug.stop-after-prerun` run logged (`data_mount=...` field on the final exit log). 2. Edit your config: ```yaml datadirs: reth: source_dir: /path/to/benchmarkoor-clone-reth/... method: direct ``` 3. Re-run with `--debug.stop-after-prerun --limit-instance-id=reth`. The container mounts the existing clone, the latest applied block is detected, and pre-run resumes from the next block. ## Test plan - [x] `go build`, `go test ./pkg/executor/... ./pkg/datadir/... ./pkg/config/...`, and `golangci-lint run --new-from-rev="origin/master"` clean - [x] End-to-end: trigger a mid-pre-run failure (reth crash), reconfigure with `method: direct` pointing at the surviving clone, re-run with `--debug.stop-after-prerun`, verify the "Resuming pre-run replay" log fires with the expected `resumed_at_block` and the replay continues from the next payload ## Notes - The `runner-level` rollback strategy paths (`strategy_container.go` ZFS-snapshot path, `strategy_checkpoint.go`) build their own `ExecuteOptions` and call `RunPreRunSteps` directly. They don't yet receive `SkipUntilBlockNumber` — would need `blockNum` threaded through the strategy signatures. Out of scope here since the resume case goes through `--debug.stop-after-prerun`.
1 parent d2aa7ac commit 16c5690

6 files changed

Lines changed: 128 additions & 7 deletions

File tree

pkg/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -701,9 +701,9 @@ func (d *DataDirConfig) Validate(prefix string) error {
701701
return fmt.Errorf("%s: source_dir %q is not a directory", prefix, d.SourceDir)
702702
}
703703

704-
validMethods := map[string]bool{"": true, "copy": true, "overlayfs": true, "fuse-overlayfs": true, "zfs": true}
704+
validMethods := map[string]bool{"": true, "copy": true, "overlayfs": true, "fuse-overlayfs": true, "zfs": true, "direct": true}
705705
if !validMethods[d.Method] {
706-
return fmt.Errorf("%s: invalid method %q, must be: copy, overlayfs, fuse-overlayfs, zfs", prefix, d.Method)
706+
return fmt.Errorf("%s: invalid method %q, must be: copy, overlayfs, fuse-overlayfs, zfs, direct", prefix, d.Method)
707707
}
708708

709709
return nil

pkg/datadir/direct.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package datadir
2+
3+
import (
4+
"context"
5+
6+
"github.com/sirupsen/logrus"
7+
)
8+
9+
// DirectProvider implements Provider by mounting source_dir directly,
10+
// without copying, snapshotting, or cloning. The container writes
11+
// directly to the user-supplied path, so any changes persist after
12+
// the run.
13+
//
14+
// Intended for inspection / resume workflows (e.g. continuing a
15+
// pre-run replay against an existing ZFS clone left behind by
16+
// --debug.stop-after-prerun). Not suitable for normal benchmarking.
17+
type DirectProvider interface {
18+
Provider
19+
}
20+
21+
// NewDirectProvider creates a new direct provider.
22+
func NewDirectProvider(log logrus.FieldLogger) DirectProvider {
23+
return &directProvider{
24+
log: log.WithField("component", "datadir-direct"),
25+
}
26+
}
27+
28+
type directProvider struct {
29+
log logrus.FieldLogger
30+
}
31+
32+
var _ DirectProvider = (*directProvider)(nil)
33+
34+
// Prepare returns the source directory unchanged. No copy or snapshot.
35+
// Cleanup is a no-op so the directory survives the run.
36+
func (p *directProvider) Prepare(_ context.Context, cfg *ProviderConfig) (*PreparedDir, error) {
37+
p.log.WithFields(logrus.Fields{
38+
"source": cfg.SourceDir,
39+
"instance": cfg.InstanceID,
40+
}).Warn("Using datadir method=direct: container will write directly to source_dir, changes will persist")
41+
42+
return &PreparedDir{
43+
MountPath: cfg.SourceDir,
44+
Cleanup: func() error { return nil },
45+
}, nil
46+
}

pkg/datadir/provider.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type PreparedDir struct {
2727
}
2828

2929
// NewProvider creates a new Provider based on the method.
30-
// Supported methods: "copy" (default), "overlayfs", "fuse-overlayfs", "zfs".
30+
// Supported methods: "copy" (default), "overlayfs", "fuse-overlayfs", "zfs", "direct".
3131
func NewProvider(log logrus.FieldLogger, method string) (Provider, error) {
3232
switch method {
3333
case "", "copy":
@@ -38,6 +38,8 @@ func NewProvider(log logrus.FieldLogger, method string) (Provider, error) {
3838
return NewFuseOverlayFSProvider(log), nil
3939
case "zfs":
4040
return NewZFSProvider(log), nil
41+
case "direct":
42+
return NewDirectProvider(log), nil
4143
default:
4244
return nil, fmt.Errorf("unknown datadir method: %q", method)
4345
}

pkg/executor/executor.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ type ExecuteOptions struct {
113113
PostTestSleepDuration time.Duration // Sleep duration after each test (0 = disabled).
114114
FailFast bool // If true, return an error from runStepLines on the first failed RPC call.
115115
PreRunStepSleep time.Duration // Sleep between each RPC call within pre-run step files (0 = disabled).
116+
SkipUntilBlockNumber uint64 // Skip pre-run RPC lines until the first engine_newPayload with blockNumber > this. 0 = no skipping.
116117
}
117118

118119
// ExecutionResult contains the overall execution summary.
@@ -830,6 +831,12 @@ func (e *executor) runStepLines(
830831
) error {
831832
stepStart := time.Now()
832833

834+
// skipping is true when we're dropping already-applied lines at the
835+
// start of the file (resume scenario). Cleared once we encounter the
836+
// first engine_newPayload whose blockNumber > SkipUntilBlockNumber.
837+
skipping := opts.SkipUntilBlockNumber > 0
838+
skippedCount := 0
839+
833840
for lineNum, line := range lines {
834841
select {
835842
case <-ctx.Done():
@@ -859,6 +866,35 @@ func (e *executor) runStepLines(
859866
continue
860867
}
861868

869+
// Skip already-applied lines if requested. Stay in skipping mode
870+
// until we see a newPayload past the target; everything before
871+
// (other newPayloads, FCUs, etc.) is for a block already on
872+
// disk and would just be redundant work.
873+
if skipping {
874+
drop := true
875+
876+
if strings.HasPrefix(method, "engine_newPayload") {
877+
if bn, ok := extractBlockNumber(line); ok && bn > opts.SkipUntilBlockNumber {
878+
skipping = false
879+
drop = false
880+
881+
e.log.WithFields(logrus.Fields{
882+
"step": stepName,
883+
"resumed_at_line": lineNum + 1,
884+
"resumed_at_block": bn,
885+
"skipped_lines": skippedCount,
886+
"skip_until_block": opts.SkipUntilBlockNumber,
887+
}).Info("Resuming pre-run replay")
888+
}
889+
}
890+
891+
if drop {
892+
skippedCount++
893+
894+
continue
895+
}
896+
}
897+
862898
// Register blockHash BEFORE the RPC call for engine_newPayload methods.
863899
if captureBlockLogs && strings.HasPrefix(method, "engine_newPayload") &&
864900
opts.BlockLogCollector != nil && result != nil {

pkg/executor/results.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,41 @@ func extractGasUsed(request string) (uint64, error) {
307307
return strconv.ParseUint(strings.TrimPrefix(payload.GasUsed, "0x"), 16, 64)
308308
}
309309

310+
// extractBlockNumber extracts blockNumber from an engine_newPayload request.
311+
// Returns the parsed block number and ok=true on success. Returns ok=false
312+
// for any parse failure or missing field — callers should treat this as
313+
// "unknown" rather than an error.
314+
func extractBlockNumber(request string) (uint64, bool) {
315+
var req struct {
316+
Params []json.RawMessage `json:"params"`
317+
}
318+
if err := json.Unmarshal([]byte(request), &req); err != nil {
319+
return 0, false
320+
}
321+
322+
if len(req.Params) == 0 {
323+
return 0, false
324+
}
325+
326+
var payload struct {
327+
BlockNumber string `json:"blockNumber"`
328+
}
329+
if err := json.Unmarshal(req.Params[0], &payload); err != nil {
330+
return 0, false
331+
}
332+
333+
if payload.BlockNumber == "" {
334+
return 0, false
335+
}
336+
337+
n, err := strconv.ParseUint(strings.TrimPrefix(payload.BlockNumber, "0x"), 16, 64)
338+
if err != nil {
339+
return 0, false
340+
}
341+
342+
return n, true
343+
}
344+
310345
// extractBlockHash extracts blockHash from an engine_newPayload request.
311346
func extractBlockHash(request string) (string, error) {
312347
var req struct {

pkg/runner/lifecycle.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,10 +1013,11 @@ func (r *runner) runContainerLifecycle(
10131013
EngineEndpoint: fmt.Sprintf(
10141014
"http://%s:%d", containerIP, spec.EnginePort(),
10151015
),
1016-
JWT: r.cfg.JWT,
1017-
ResultsDir: runResultsDir,
1018-
FailFast: true,
1019-
PreRunStepSleep: r.cfg.PreRunStepSleep,
1016+
JWT: r.cfg.JWT,
1017+
ResultsDir: runResultsDir,
1018+
FailFast: true,
1019+
PreRunStepSleep: r.cfg.PreRunStepSleep,
1020+
SkipUntilBlockNumber: blockNum,
10201021
}
10211022

10221023
if n, err := r.executor.RunPreRunSteps(execCtx, preRunOpts); err != nil {
@@ -1122,6 +1123,7 @@ func (r *runner) runContainerLifecycle(
11221123
PostTestRPCCalls: r.cfg.FullConfig.GetPostTestRPCCalls(instance),
11231124
PostTestSleepDuration: r.cfg.FullConfig.GetPostTestSleepDuration(instance),
11241125
PreRunStepSleep: r.cfg.PreRunStepSleep,
1126+
SkipUntilBlockNumber: blockNum,
11251127
}
11261128

11271129
result, execErr = r.executor.ExecuteTests(execCtx, execOpts)

0 commit comments

Comments
 (0)