Skip to content

Commit cd466fd

Browse files
feat: extract DigestReplayGuard from Vault RequestAuthorizer (#21663)
* feat: extract DigestReplayGuard from Vault RequestAuthorizer Extract the request-digest deduplication logic from requestAuthorizer's internal map into a standalone DigestReplayGuard component. The requestAuthorizer now delegates to DigestReplayGuard internally — no behavior change for the existing on-chain allowlist flow. DigestReplayGuard will also be used by the upcoming JWT auth flow (both at the gateway handler and capability gateway handler layers) to reject replayed requests using the JWT's request_digest claim. Having it as a standalone component allows both auth flows to share the same dedup mechanism. Includes 9 unit tests covering: first-call success, duplicate rejection, expiry cleanup, mixed-expiry scenarios, concurrent access safety, and empty-digest edge case. All existing RequestAuthorizer tests pass as-is. Made-with: Cursor * fix: preserve eager expired-entry cleanup in AuthorizeRequest Before the DigestReplayGuard extraction, clearExpiredAuthorizedRequests ran via defer on every AuthorizeRequest call, pruning stale entries even when the request was rejected by the allowlist or expiry check. After the refactor, cleanup only ran inside CheckAndRecord on the success path, allowing unbounded accumulation of expired digests under sustained rejected traffic. Add a public ClearExpired() method to DigestReplayGuard and call it via defer in AuthorizeRequest to restore the original eager-cleanup behavior. Add a dedicated unit test for independent expiry pruning. Made-with: Cursor
1 parent b0dc7f0 commit cd466fd

3 files changed

Lines changed: 261 additions & 27 deletions

File tree

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package vault
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"time"
7+
)
8+
9+
var ErrDigestAlreadySeen = errors.New("request already authorized previously")
10+
11+
// DigestReplayGuard prevents replay of already-processed requests by tracking
12+
// request digests with expiry timestamps. It is safe for concurrent use.
13+
//
14+
// Used by both the on-chain allowlist flow and the JWT auth flow to ensure
15+
// that a given request digest is only accepted once.
16+
type DigestReplayGuard struct {
17+
mu sync.Mutex
18+
seen map[string]int64 // digest → unix expiry timestamp
19+
nowFunc func() time.Time // injectable for testing
20+
}
21+
22+
func NewDigestReplayGuard() *DigestReplayGuard {
23+
return &DigestReplayGuard{
24+
seen: make(map[string]int64),
25+
nowFunc: time.Now,
26+
}
27+
}
28+
29+
// CheckAndRecord returns ErrDigestAlreadySeen if the digest was previously
30+
// recorded and has not yet expired. Otherwise it records the digest with
31+
// the given expiry timestamp (unix seconds, UTC).
32+
//
33+
// Expired entries are cleaned up on every call.
34+
func (g *DigestReplayGuard) CheckAndRecord(digest string, expiresAtUnix int64) error {
35+
g.mu.Lock()
36+
defer g.mu.Unlock()
37+
38+
g.clearExpiredLocked()
39+
40+
if _, exists := g.seen[digest]; exists {
41+
return ErrDigestAlreadySeen
42+
}
43+
44+
g.seen[digest] = expiresAtUnix
45+
return nil
46+
}
47+
48+
// ClearExpired removes all entries whose expiry timestamp is in the past.
49+
// Call this to eagerly reclaim memory even when CheckAndRecord is not invoked.
50+
func (g *DigestReplayGuard) ClearExpired() {
51+
g.mu.Lock()
52+
defer g.mu.Unlock()
53+
g.clearExpiredLocked()
54+
}
55+
56+
func (g *DigestReplayGuard) clearExpiredLocked() {
57+
now := g.nowFunc().UTC().Unix()
58+
for digest, expiry := range g.seen {
59+
if now > expiry {
60+
delete(g.seen, digest)
61+
}
62+
}
63+
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package vault
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestDigestReplayGuard_FirstCallSucceeds(t *testing.T) {
13+
guard := NewDigestReplayGuard()
14+
futureExpiry := time.Now().UTC().Unix() + 100
15+
16+
err := guard.CheckAndRecord("digest-1", futureExpiry)
17+
require.NoError(t, err)
18+
}
19+
20+
func TestDigestReplayGuard_DuplicateRejected(t *testing.T) {
21+
guard := NewDigestReplayGuard()
22+
futureExpiry := time.Now().UTC().Unix() + 100
23+
24+
err := guard.CheckAndRecord("digest-1", futureExpiry)
25+
require.NoError(t, err)
26+
27+
err = guard.CheckAndRecord("digest-1", futureExpiry)
28+
require.ErrorIs(t, err, ErrDigestAlreadySeen)
29+
}
30+
31+
func TestDigestReplayGuard_DifferentDigestsIndependent(t *testing.T) {
32+
guard := NewDigestReplayGuard()
33+
futureExpiry := time.Now().UTC().Unix() + 100
34+
35+
require.NoError(t, guard.CheckAndRecord("digest-1", futureExpiry))
36+
require.NoError(t, guard.CheckAndRecord("digest-2", futureExpiry))
37+
require.NoError(t, guard.CheckAndRecord("digest-3", futureExpiry))
38+
39+
require.ErrorIs(t, guard.CheckAndRecord("digest-1", futureExpiry), ErrDigestAlreadySeen)
40+
require.ErrorIs(t, guard.CheckAndRecord("digest-2", futureExpiry), ErrDigestAlreadySeen)
41+
}
42+
43+
func TestDigestReplayGuard_ExpiredEntryCleaned(t *testing.T) {
44+
guard := NewDigestReplayGuard()
45+
now := time.Now()
46+
guard.nowFunc = func() time.Time { return now }
47+
48+
pastExpiry := now.UTC().Unix() - 10
49+
err := guard.CheckAndRecord("digest-1", pastExpiry)
50+
require.NoError(t, err)
51+
52+
// Advance time past the expiry — next call should clean up the entry
53+
guard.nowFunc = func() time.Time { return now.Add(20 * time.Second) }
54+
55+
// Same digest should succeed because the expired entry was cleaned up
56+
err = guard.CheckAndRecord("digest-1", now.Add(20*time.Second).UTC().Unix()+100)
57+
require.NoError(t, err)
58+
}
59+
60+
func TestDigestReplayGuard_NonExpiredEntryRetained(t *testing.T) {
61+
guard := NewDigestReplayGuard()
62+
now := time.Now()
63+
guard.nowFunc = func() time.Time { return now }
64+
65+
futureExpiry := now.UTC().Unix() + 100
66+
require.NoError(t, guard.CheckAndRecord("digest-1", futureExpiry))
67+
68+
// Advance time, but NOT past the expiry
69+
guard.nowFunc = func() time.Time { return now.Add(50 * time.Second) }
70+
71+
err := guard.CheckAndRecord("digest-1", futureExpiry)
72+
require.ErrorIs(t, err, ErrDigestAlreadySeen)
73+
}
74+
75+
func TestDigestReplayGuard_MixedExpiryCleanup(t *testing.T) {
76+
guard := NewDigestReplayGuard()
77+
now := time.Now()
78+
guard.nowFunc = func() time.Time { return now }
79+
80+
shortExpiry := now.UTC().Unix() + 10
81+
longExpiry := now.UTC().Unix() + 200
82+
83+
require.NoError(t, guard.CheckAndRecord("short-lived", shortExpiry))
84+
require.NoError(t, guard.CheckAndRecord("long-lived", longExpiry))
85+
86+
// Advance past short expiry but before long expiry
87+
guard.nowFunc = func() time.Time { return now.Add(50 * time.Second) }
88+
89+
// Short-lived should be re-recordable (cleaned up)
90+
require.NoError(t, guard.CheckAndRecord("short-lived", now.Add(50*time.Second).UTC().Unix()+100))
91+
92+
// Long-lived should still be rejected
93+
require.ErrorIs(t, guard.CheckAndRecord("long-lived", longExpiry), ErrDigestAlreadySeen)
94+
}
95+
96+
func TestDigestReplayGuard_ConcurrentAccess(t *testing.T) {
97+
guard := NewDigestReplayGuard()
98+
futureExpiry := time.Now().UTC().Unix() + 100
99+
100+
const goroutines = 100
101+
results := make([]error, goroutines)
102+
var wg sync.WaitGroup
103+
wg.Add(goroutines)
104+
105+
for i := range goroutines {
106+
go func(idx int) {
107+
defer wg.Done()
108+
results[idx] = guard.CheckAndRecord("same-digest", futureExpiry)
109+
}(i)
110+
}
111+
wg.Wait()
112+
113+
successCount := 0
114+
duplicateCount := 0
115+
for _, err := range results {
116+
if err == nil {
117+
successCount++
118+
} else {
119+
require.ErrorIs(t, err, ErrDigestAlreadySeen)
120+
duplicateCount++
121+
}
122+
}
123+
124+
assert.Equal(t, 1, successCount, "exactly one goroutine should succeed")
125+
assert.Equal(t, goroutines-1, duplicateCount, "all others should be rejected as duplicates")
126+
}
127+
128+
func TestDigestReplayGuard_ConcurrentDifferentDigests(t *testing.T) {
129+
guard := NewDigestReplayGuard()
130+
futureExpiry := time.Now().UTC().Unix() + 100
131+
132+
const goroutines = 50
133+
var wg sync.WaitGroup
134+
wg.Add(goroutines)
135+
136+
errors := make([]error, goroutines)
137+
for i := range goroutines {
138+
go func(idx int) {
139+
defer wg.Done()
140+
digest := "digest-" + string(rune('A'+idx))
141+
errors[idx] = guard.CheckAndRecord(digest, futureExpiry)
142+
}(i)
143+
}
144+
wg.Wait()
145+
146+
for i, err := range errors {
147+
assert.NoError(t, err, "goroutine %d should succeed for unique digest", i)
148+
}
149+
}
150+
151+
func TestDigestReplayGuard_ClearExpiredIndependently(t *testing.T) {
152+
guard := NewDigestReplayGuard()
153+
now := time.Now()
154+
guard.nowFunc = func() time.Time { return now }
155+
156+
shortExpiry := now.UTC().Unix() + 5
157+
longExpiry := now.UTC().Unix() + 200
158+
159+
require.NoError(t, guard.CheckAndRecord("ephemeral", shortExpiry))
160+
require.NoError(t, guard.CheckAndRecord("durable", longExpiry))
161+
162+
// Advance past the short expiry
163+
guard.nowFunc = func() time.Time { return now.Add(30 * time.Second) }
164+
165+
// ClearExpired should prune without needing a CheckAndRecord call
166+
guard.ClearExpired()
167+
168+
guard.mu.Lock()
169+
_, ephemeralPresent := guard.seen["ephemeral"]
170+
_, durablePresent := guard.seen["durable"]
171+
guard.mu.Unlock()
172+
173+
assert.False(t, ephemeralPresent, "expired entry should have been pruned")
174+
assert.True(t, durablePresent, "non-expired entry should remain")
175+
}
176+
177+
func TestDigestReplayGuard_EmptyDigest(t *testing.T) {
178+
guard := NewDigestReplayGuard()
179+
futureExpiry := time.Now().UTC().Unix() + 100
180+
181+
require.NoError(t, guard.CheckAndRecord("", futureExpiry))
182+
require.ErrorIs(t, guard.CheckAndRecord("", futureExpiry), ErrDigestAlreadySeen)
183+
}

core/capabilities/vault/request_authorizer.go

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9-
"sync"
109
"time"
1110

1211
jsonrpc "github.com/smartcontractkit/chainlink-common/pkg/jsonrpc2"
@@ -19,16 +18,15 @@ type RequestAuthorizer interface {
1918
AuthorizeRequest(ctx context.Context, req jsonrpc.Request[json.RawMessage]) (isAuthorized bool, owner string, err error)
2019
}
2120
type requestAuthorizer struct {
22-
workflowRegistrySyncer workflowsyncerv2.WorkflowRegistrySyncer
23-
alreadyAuthorizedRequests map[string]int64
24-
alreadyAuthorizedMutex sync.Mutex
25-
lggr logger.Logger
21+
workflowRegistrySyncer workflowsyncerv2.WorkflowRegistrySyncer
22+
replayGuard *DigestReplayGuard
23+
lggr logger.Logger
2624
}
2725

2826
// AuthorizeRequest authorizes a request based on the request digest and the allowlisted requests.
2927
// It does NOT check if the request method is allowed.
3028
func (r *requestAuthorizer) AuthorizeRequest(ctx context.Context, req jsonrpc.Request[json.RawMessage]) (isAuthorized bool, owner string, err error) {
31-
defer r.clearExpiredAuthorizedRequests()
29+
defer r.replayGuard.ClearExpired()
3230
r.lggr.Infow("AuthorizeRequest", "method", req.Method, "requestID", req.ID)
3331
requestDigest, err := req.Digest()
3432
if err != nil {
@@ -61,31 +59,21 @@ func (r *requestAuthorizer) AuthorizeRequest(ctx context.Context, req jsonrpc.Re
6159
"allowedRequestsStrs", allowedRequestsStrs)
6260
return false, "", errors.New("request not allowlisted")
6361
}
64-
authorizedRequestStr := string(allowlistedRequest.RequestDigest[:])
6562

66-
r.alreadyAuthorizedMutex.Lock()
67-
defer r.alreadyAuthorizedMutex.Unlock()
68-
if r.alreadyAuthorizedRequests[authorizedRequestStr] > 0 {
69-
r.lggr.Infow("AuthorizeRequest already authorized previously", "method", req.Method, "requestID", req.ID, "authorizedRequestStr", authorizedRequestStr)
70-
return false, "", errors.New("request already authorized previously")
71-
}
7263
if time.Now().UTC().Unix() > int64(allowlistedRequest.ExpiryTimestamp) {
64+
authorizedRequestStr := string(allowlistedRequest.RequestDigest[:])
7365
r.lggr.Infow("AuthorizeRequest expired authorization", "method", req.Method, "requestID", req.ID, "authorizedRequestStr", authorizedRequestStr)
7466
return false, "", errors.New("request authorization expired")
7567
}
76-
r.lggr.Infow("AuthorizeRequest success in auth", "method", req.Method, "requestID", req.ID, "authorizedRequestStr", authorizedRequestStr)
77-
r.alreadyAuthorizedRequests[authorizedRequestStr] = int64(allowlistedRequest.ExpiryTimestamp)
78-
return true, allowlistedRequest.Owner.Hex(), nil
79-
}
8068

81-
func (r *requestAuthorizer) clearExpiredAuthorizedRequests() {
82-
r.alreadyAuthorizedMutex.Lock()
83-
defer r.alreadyAuthorizedMutex.Unlock()
84-
for request, expiry := range r.alreadyAuthorizedRequests {
85-
if time.Now().UTC().Unix() > expiry {
86-
delete(r.alreadyAuthorizedRequests, request)
87-
}
69+
digestKey := string(allowlistedRequest.RequestDigest[:])
70+
if err := r.replayGuard.CheckAndRecord(digestKey, int64(allowlistedRequest.ExpiryTimestamp)); err != nil {
71+
r.lggr.Infow("AuthorizeRequest already authorized previously", "method", req.Method, "requestID", req.ID, "authorizedRequestStr", digestKey)
72+
return false, "", err
8873
}
74+
75+
r.lggr.Infow("AuthorizeRequest success in auth", "method", req.Method, "requestID", req.ID, "authorizedRequestStr", digestKey)
76+
return true, allowlistedRequest.Owner.Hex(), nil
8977
}
9078

9179
func (r *requestAuthorizer) fetchAllowlistedItem(allowListedRequests []workflow_registry_wrapper_v2.WorkflowRegistryOwnerAllowlistedRequest, digest [32]byte) *workflow_registry_wrapper_v2.WorkflowRegistryOwnerAllowlistedRequest {
@@ -99,8 +87,8 @@ func (r *requestAuthorizer) fetchAllowlistedItem(allowListedRequests []workflow_
9987

10088
func NewRequestAuthorizer(lggr logger.Logger, workflowRegistrySyncer workflowsyncerv2.WorkflowRegistrySyncer) *requestAuthorizer {
10189
return &requestAuthorizer{
102-
workflowRegistrySyncer: workflowRegistrySyncer,
103-
lggr: logger.Named(lggr, "VaultRequestAuthorizer"),
104-
alreadyAuthorizedRequests: make(map[string]int64),
90+
workflowRegistrySyncer: workflowRegistrySyncer,
91+
lggr: logger.Named(lggr, "VaultRequestAuthorizer"),
92+
replayGuard: NewDigestReplayGuard(),
10593
}
10694
}

0 commit comments

Comments
 (0)