Skip to content

Commit b11b4a9

Browse files
adding fallback org resolver cache (#1941)
1 parent 0d28482 commit b11b4a9

2 files changed

Lines changed: 420 additions & 0 deletions

File tree

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package orgresolver
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"google.golang.org/grpc/codes"
9+
"google.golang.org/grpc/status"
10+
11+
log "github.com/smartcontractkit/chainlink-common/pkg/logger"
12+
)
13+
14+
const unavailableRetryDelay = 500 * time.Millisecond
15+
16+
// OrgResolverFallback wraps an OrgResolver and maintains an in-memory cache of
17+
// owner->orgID mappings. On successful resolution the cache is updated. When
18+
// the inner resolver returns NotFound or Unavailable, the cache is consulted
19+
// as a fallback (with one retry for Unavailable before falling back).
20+
//
21+
// This addresses a race condition where a workflow owner can be unlinked from
22+
// an org just before a WorkflowDeleted event is processed, causing the
23+
// resolver to return NotFound for an owner whose org was previously known.
24+
type OrgResolverFallback struct {
25+
inner OrgResolver
26+
cache sync.Map // owner (string) -> orgID (string)
27+
logger log.SugaredLogger
28+
}
29+
30+
func NewOrgResolverWithFallback(inner OrgResolver, logger log.Logger) *OrgResolverFallback {
31+
return &OrgResolverFallback{
32+
inner: inner,
33+
logger: log.Sugared(logger).Named("OrgResolverFallback"),
34+
}
35+
}
36+
37+
func (c *OrgResolverFallback) Get(ctx context.Context, owner string) (string, error) {
38+
orgID, err := c.inner.Get(ctx, owner)
39+
if err == nil {
40+
c.cache.Store(owner, orgID)
41+
return orgID, nil
42+
}
43+
44+
code := grpcStatusCode(err)
45+
46+
if code == codes.Unavailable {
47+
c.logger.Warnw("Org resolver unavailable, retrying once", "owner", owner, "err", err)
48+
49+
select {
50+
case <-ctx.Done():
51+
return c.fallbackToCache(owner, err)
52+
case <-time.After(unavailableRetryDelay):
53+
}
54+
55+
orgID, retryErr := c.inner.Get(ctx, owner)
56+
if retryErr == nil {
57+
c.cache.Store(owner, orgID)
58+
return orgID, nil
59+
}
60+
c.logger.Warnw("Org resolver retry failed", "owner", owner, "err", retryErr)
61+
return c.fallbackToCache(owner, err)
62+
}
63+
64+
if code == codes.NotFound {
65+
return c.fallbackToCache(owner, err)
66+
}
67+
68+
return "", err
69+
}
70+
71+
func (c *OrgResolverFallback) fallbackToCache(owner string, originalErr error) (string, error) {
72+
if cached, ok := c.cache.Load(owner); ok {
73+
orgID := cached.(string)
74+
c.logger.Infow("Using cached org ID after resolver failure", "owner", owner, "cachedOrgID", orgID)
75+
return orgID, nil
76+
}
77+
return "", originalErr
78+
}
79+
80+
// grpcStatusCode extracts the gRPC status code from an error, handling
81+
// wrapped errors from fmt.Errorf("%w", ...) chains.
82+
func grpcStatusCode(err error) codes.Code {
83+
type grpcStatus interface {
84+
GRPCStatus() *status.Status
85+
}
86+
var se grpcStatus
87+
if ok := errorAs(err, &se); ok {
88+
return se.GRPCStatus().Code()
89+
}
90+
return codes.OK
91+
}
92+
93+
// errorAs is a typed wrapper for the standard errors.As, allowing interface targets.
94+
// Go's errors.As requires a pointer to a concrete or interface type; this helper
95+
// keeps the call site at grpcStatusCode clean.
96+
func errorAs[T any](err error, target *T) bool {
97+
for err != nil {
98+
if t, ok := err.(T); ok {
99+
*target = t
100+
return true
101+
}
102+
err = unwrapErr(err)
103+
}
104+
return false
105+
}
106+
107+
func unwrapErr(err error) error {
108+
type wrapper interface{ Unwrap() error }
109+
if w, ok := err.(wrapper); ok {
110+
return w.Unwrap()
111+
}
112+
return nil
113+
}
114+
115+
func (c *OrgResolverFallback) Start(ctx context.Context) error { return c.inner.Start(ctx) }
116+
func (c *OrgResolverFallback) Close() error { return c.inner.Close() }
117+
func (c *OrgResolverFallback) Ready() error { return c.inner.Ready() }
118+
func (c *OrgResolverFallback) HealthReport() map[string]error { return c.inner.HealthReport() }
119+
func (c *OrgResolverFallback) Name() string { return c.inner.Name() }

0 commit comments

Comments
 (0)