Skip to content

Commit 7dbfe45

Browse files
aggregatable error message (#21767)
* aggregatable error message * lint
1 parent fb712af commit 7dbfe45

5 files changed

Lines changed: 127 additions & 5 deletions

File tree

core/services/workflows/artifacts/v2/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (h *Store) FetchWorkflowArtifacts(ctx context.Context, workflowID, binaryUR
203203
}
204204
binary, err = h.fetchFn(ctx, messageID(binaryURL, workflowID), req)
205205
if err != nil {
206-
return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", binaryURL, err)
206+
return nil, nil, &types.ArtifactFetchError{ArtifactType: "binary", URL: binaryURL, Err: err}
207207
}
208208

209209
if decodedBinary, err = base64.StdEncoding.DecodeString(string(binary)); err != nil {
@@ -247,7 +247,7 @@ func (h *Store) FetchWorkflowArtifacts(ctx context.Context, workflowID, binaryUR
247247

248248
config, err2 = h.fetchFn(ctx, messageID(configURL, workflowID), req)
249249
if err2 != nil {
250-
return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", configURL, err2)
250+
return nil, nil, &types.ArtifactFetchError{ArtifactType: "config", URL: configURL, Err: err2}
251251
}
252252
}
253253
return decodedBinary, config, nil

core/services/workflows/syncer/v2/handler.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,13 +294,14 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
294294

295295
var err error
296296
defer func() {
297-
if err2 := events.EmitWorkflowStatusChangedEventV2(ctx, cma.Labels(), toCommonHead(event.Head), string(event.Name), payload.BinaryURL, payload.ConfigURL, err); err2 != nil {
297+
if err2 := events.EmitWorkflowStatusChangedEventV2(ctx, cma.Labels(), toCommonHead(event.Head), string(event.Name), payload.BinaryURL, payload.ConfigURL, customerFacingError(err)); err2 != nil {
298298
h.lggr.Errorf("failed to emit status changed event: %+v", err2)
299299
}
300300
}()
301301
err = h.workflowActivatedEvent(ctx, payload)
302302
if err != nil {
303-
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr)
303+
h.lggr.Errorw("failed to handle workflow activated event", "error", err, "workflowID", wfID)
304+
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", customerFacingError(err)), h.lggr)
304305
return err
305306
}
306307

@@ -888,6 +889,20 @@ func (h *eventHandler) ensureCapRegistryReady(ctx context.Context) error {
888889
})
889890
}
890891

892+
// customerFacingError returns a deterministic, user-actionable error for beholder emission.
893+
// Internal errors (e.g. ArtifactFetchError with per-node signed URLs) are replaced with a
894+
// clean message so that workflow-service can aggregate error_message across nodes.
895+
func customerFacingError(err error) error {
896+
if err == nil {
897+
return nil
898+
}
899+
var fetchErr *types.ArtifactFetchError
900+
if errors.As(err, &fetchErr) {
901+
return errors.New(fetchErr.CustomerError())
902+
}
903+
return err
904+
}
905+
891906
func newHandlerTypeError(data any) error {
892907
return fmt.Errorf("invalid data type %T for event", data)
893908
}

core/services/workflows/syncer/v2/handler_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,40 @@ func testRunningWorkflow(t *testing.T, tc testCase) {
688688
})
689689
}
690690

691+
func Test_customerFacingError(t *testing.T) {
692+
t.Run("nil error returns nil", func(t *testing.T) {
693+
assert.NoError(t, customerFacingError(nil))
694+
})
695+
696+
t.Run("ArtifactFetchError returns deterministic customer message", func(t *testing.T) {
697+
fetchErr := &types.ArtifactFetchError{
698+
ArtifactType: "binary",
699+
URL: "https://storage.example.com/binary.wasm?Expires=123&Signature=nodeSpecificSig",
700+
Err: errors.New("connection refused"),
701+
}
702+
got := customerFacingError(fetchErr)
703+
require.Error(t, got)
704+
assert.Equal(t, "Internal error: failed to fetch workflow binary from storage. Contact support if this persists.", got.Error())
705+
})
706+
707+
t.Run("wrapped ArtifactFetchError is still detected", func(t *testing.T) {
708+
fetchErr := &types.ArtifactFetchError{
709+
ArtifactType: "config",
710+
URL: "https://storage.example.com/config.yaml?Expires=456&Signature=abc",
711+
Err: errors.New("timeout"),
712+
}
713+
wrapped := fmt.Errorf("createWorkflowSpec: %w", fetchErr)
714+
got := customerFacingError(wrapped)
715+
assert.Contains(t, got.Error(), "workflow config")
716+
assert.NotContains(t, got.Error(), "Expires")
717+
})
718+
719+
t.Run("non-ArtifactFetchError passes through unchanged", func(t *testing.T) {
720+
original := errors.New("some other error")
721+
assert.Equal(t, original, customerFacingError(original))
722+
})
723+
}
724+
691725
type mockArtifactStore struct {
692726
artifactStore *artifacts.Store
693727
deleteWorkflowArtifactsErr error
Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,32 @@
11
package types
22

3-
import "errors"
3+
import (
4+
"errors"
5+
"fmt"
6+
)
47

58
var (
69
ErrGlobalWorkflowCountLimitReached = errors.New("global workflow count limit reached")
710
ErrPerOwnerWorkflowCountLimitReached = errors.New("per owner workflow count limit reached")
811
)
12+
13+
// ArtifactFetchError represents an internal failure to fetch a workflow artifact.
14+
// It preserves full details for developer debugging while providing a deterministic
15+
// customer-facing message suitable for aggregation across nodes.
16+
type ArtifactFetchError struct {
17+
ArtifactType string // "binary" or "config"
18+
URL string
19+
Err error
20+
}
21+
22+
func (e *ArtifactFetchError) Error() string {
23+
return fmt.Sprintf("failed to fetch %s from %s : %s", e.ArtifactType, e.URL, e.Err)
24+
}
25+
26+
func (e *ArtifactFetchError) Unwrap() error {
27+
return e.Err
28+
}
29+
30+
func (e *ArtifactFetchError) CustomerError() string {
31+
return fmt.Sprintf("Internal error: failed to fetch workflow %s from storage. Contact support if this persists.", e.ArtifactType)
32+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package types
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestArtifactFetchError(t *testing.T) {
13+
inner := errors.New("connection refused")
14+
fetchErr := &ArtifactFetchError{
15+
ArtifactType: "binary",
16+
URL: "https://storage.example.com/artifacts/abc123/binary.wasm?Expires=123&Signature=xyz",
17+
Err: inner,
18+
}
19+
20+
t.Run("Error preserves full URL for internal debugging", func(t *testing.T) {
21+
assert.Contains(t, fetchErr.Error(), "Expires=123&Signature=xyz")
22+
assert.Contains(t, fetchErr.Error(), "binary.wasm")
23+
assert.Contains(t, fetchErr.Error(), "connection refused")
24+
})
25+
26+
t.Run("CustomerError is deterministic and omits URL details", func(t *testing.T) {
27+
msg := fetchErr.CustomerError()
28+
assert.Equal(t, "Internal error: failed to fetch workflow binary from storage. Contact support if this persists.", msg)
29+
assert.NotContains(t, msg, "Expires")
30+
assert.NotContains(t, msg, "Signature")
31+
assert.NotContains(t, msg, "example.com")
32+
})
33+
34+
t.Run("CustomerError reflects artifact type", func(t *testing.T) {
35+
configErr := &ArtifactFetchError{ArtifactType: "config", URL: "https://x.com/c?s=1", Err: inner}
36+
assert.Contains(t, configErr.CustomerError(), "workflow config")
37+
})
38+
39+
t.Run("Unwrap returns inner error", func(t *testing.T) {
40+
require.ErrorIs(t, fetchErr, inner)
41+
})
42+
43+
t.Run("errors.As matches through wrapping", func(t *testing.T) {
44+
wrapped := fmt.Errorf("outer: %w", fetchErr)
45+
var target *ArtifactFetchError
46+
require.ErrorAs(t, wrapped, &target)
47+
assert.Equal(t, "binary", target.ArtifactType)
48+
})
49+
}

0 commit comments

Comments
 (0)