Skip to content

Commit e1a0681

Browse files
authored
Infer workflow timeout (#1549)
* infer timeout
1 parent 4e42c90 commit e1a0681

3 files changed

Lines changed: 19 additions & 6 deletions

File tree

pkg/workflows/wasm/host/module.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -543,13 +543,16 @@ func runWasm[I, O proto.Message](
543543
return o, errors.New("could not get start function")
544544
}
545545

546+
startTime := time.Now()
546547
_, err = start.Call(store)
548+
executionDuration := time.Since(startTime)
549+
550+
// The error codes below are only returned by the v1 legacy DAG workflow.
547551
switch {
548552
case containsCode(err, wasm.CodeSuccess):
549553
if any(exec.response) == nil {
550554
return o, errors.New("could not find response for execution")
551555
}
552-
553556
return exec.response, nil
554557
case containsCode(err, wasm.CodeInvalidResponse):
555558
return o, fmt.Errorf("invariant violation: error marshaling response")
@@ -564,9 +567,18 @@ func runWasm[I, O proto.Message](
564567
return o, fmt.Errorf("error executing runner")
565568
case containsCode(err, wasm.CodeHostErr):
566569
return o, fmt.Errorf("invariant violation: host errored during sendResponse")
567-
default:
568-
return o, err
569570
}
571+
572+
// If an error has occurred and the deadline has been reached or exceeded, return a deadline exceeded error.
573+
// Note - there is no other reliable signal on the error that can be used to infer it is due to epoch deadline
574+
// being reached, so if an error is returned after the deadline it is assumed it is due to that and return
575+
// context.DeadlineExceeded.
576+
if err != nil && executionDuration >= *m.cfg.Timeout-m.cfg.TickInterval { // As start could be called just before epoch update 1 tick interval is deducted to account for this
577+
m.cfg.Logger.Errorw("start function returned error after deadline reached, returning deadline exceeded error", "errFromStartFunction", err)
578+
return o, context.DeadlineExceeded
579+
}
580+
581+
return o, err
570582
}
571583

572584
func containsCode(err error, code int) bool {

pkg/workflows/wasm/host/wasm_nodag_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package host
22

33
import (
4+
"context"
45
_ "embed"
56
"errors"
67
"testing"
@@ -47,7 +48,7 @@ func Test_Sleep_Timeout(t *testing.T) {
4748
start := time.Now()
4849
_, err = m.Execute(t.Context(), req, mockExecutionHelper)
4950
duration := time.Since(start)
50-
require.ErrorContains(t, err, "wasm trap: interrupt")
51+
require.ErrorIs(t, err, context.DeadlineExceeded)
5152
require.Less(t, duration.Seconds(), 3.0, "execution should be interrupted quickly")
5253
}
5354

pkg/workflows/wasm/host/wasm_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func Test_GetWorkflowSpec_Timeout(t *testing.T) {
152152
[]byte(""),
153153
)
154154
// panic
155-
assert.ErrorContains(t, err, "wasm trap: interrupt")
155+
assert.ErrorIs(t, err, context.DeadlineExceeded)
156156
}
157157

158158
func Test_GetWorkflowSpec_BuildError(t *testing.T) {
@@ -1029,7 +1029,7 @@ func TestModule_Sandbox_Timeout(t *testing.T) {
10291029

10301030
_, err = m.Run(ctx, req)
10311031

1032-
assert.ErrorContains(t, err, "interrupt")
1032+
assert.ErrorIs(t, err, context.DeadlineExceeded)
10331033
}
10341034

10351035
func TestModule_Sandbox_CantReadFiles(t *testing.T) {

0 commit comments

Comments
 (0)