Skip to content

Commit 6b6d76b

Browse files
authored
Prevent response cache mutex from serializing concurrent HTTP action requests (#21880)
* Add singleflight and short lock for concurrent * Implement feedback * Use atomic integer for fetch * Add test for singleflight leader behavior and refine cache fetch logic * Adjust response cache test to use seconds for delays and refactor assertions
1 parent c0d028c commit 6b6d76b

4 files changed

Lines changed: 249 additions & 61 deletions

File tree

core/services/gateway/handlers/capabilities/v2/http_handler.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ type gatewayHandler struct {
5959

6060
type ResponseCache interface {
6161
// Set caches a response if it is cacheable (2xx or 4xx status codes) and the cache is empty or expired for the given request.
62-
Set(workflowID string, req gateway_common.OutboundHTTPRequest, response gateway_common.OutboundHTTPResponse)
62+
Set(req gateway_common.OutboundHTTPRequest, response gateway_common.OutboundHTTPResponse)
6363

6464
// Fetch retrieves a response from the cache if it exists and the age of cached response is less than the max age of the request.
6565
// If the cached response is expired or not cached, it fetches a new response from the fetchFn.
6666
// The response is cached if it is cacheable and storeOnFetch is true.
67-
Fetch(ctx context.Context, workflowID string, req gateway_common.OutboundHTTPRequest, fetchFn func() gateway_common.OutboundHTTPResponse, storeOnFetch bool) gateway_common.OutboundHTTPResponse
67+
Fetch(ctx context.Context, req gateway_common.OutboundHTTPRequest, fetchFn func() gateway_common.OutboundHTTPResponse, storeOnFetch bool) gateway_common.OutboundHTTPResponse
6868

6969
// DeleteExpired removes all cached responses that have exceeded their TTL (Time To Live).
7070
DeleteExpired(ctx context.Context) int
@@ -306,16 +306,6 @@ func (h *gatewayHandler) createHTTPRequestCallback(ctx context.Context, requestI
306306
}
307307
}
308308

309-
// extractWorkflowIDFromRequestPath extracts the workflowID from an outgoing request path string.
310-
// The workflowID is expected to be the first element after splitting the string by "/".
311-
func extractWorkflowIDFromRequestPath(path string) string {
312-
parts := strings.Split(path, "/")
313-
if len(parts) > 1 {
314-
return parts[1]
315-
}
316-
return ""
317-
}
318-
319309
func (h *gatewayHandler) HandleLegacyUserMessage(context.Context, *api.Message, handlers.Callback) error {
320310
return errors.New("HTTP capability gateway handler does not support legacy messages")
321311
}
@@ -340,7 +330,6 @@ func (h *gatewayHandler) makeOutgoingRequest(ctx context.Context, resp *jsonrpc.
340330
if err != nil {
341331
return fmt.Errorf("failed to unmarshal HTTP request from node %s: %w", nodeAddr, err)
342332
}
343-
workflowID := extractWorkflowIDFromRequestPath(requestID)
344333
timeout := time.Duration(req.TimeoutMs) * time.Millisecond
345334
httpReq := network.HTTPRequest{
346335
Method: req.Method,
@@ -367,11 +356,11 @@ func (h *gatewayHandler) makeOutgoingRequest(ctx context.Context, resp *jsonrpc.
367356
callback := h.createHTTPRequestCallback(httpCtx, requestID, httpReq, req)
368357
if req.CacheSettings.MaxAgeMs > 0 {
369358
h.metrics.IncrementCacheReadCount(ctx, h.lggr)
370-
outboundResp = h.responseCache.Fetch(httpCtx, workflowID, req, callback, req.CacheSettings.Store)
359+
outboundResp = h.responseCache.Fetch(httpCtx, req, callback, req.CacheSettings.Store)
371360
} else {
372361
outboundResp = callback()
373362
if req.CacheSettings.Store {
374-
h.responseCache.Set(workflowID, req, outboundResp)
363+
h.responseCache.Set(req, outboundResp)
375364
}
376365
}
377366
h.metrics.IncrementActionCapabilityRequestCount(ctx, nodeAddr, h.lggr)

core/services/gateway/handlers/capabilities/v2/http_handler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,11 +402,11 @@ func newMockResponseCache() *mockResponseCache {
402402
}
403403
}
404404

405-
func (m *mockResponseCache) Set(workflowID string, req gateway_common.OutboundHTTPRequest, response gateway_common.OutboundHTTPResponse) {
405+
func (m *mockResponseCache) Set(req gateway_common.OutboundHTTPRequest, response gateway_common.OutboundHTTPResponse) {
406406
m.setCallCount++
407407
}
408408

409-
func (m *mockResponseCache) Fetch(ctx context.Context, workflowID string, req gateway_common.OutboundHTTPRequest, fetchFn func() gateway_common.OutboundHTTPResponse, storeOnFetch bool) gateway_common.OutboundHTTPResponse {
409+
func (m *mockResponseCache) Fetch(ctx context.Context, req gateway_common.OutboundHTTPRequest, fetchFn func() gateway_common.OutboundHTTPResponse, storeOnFetch bool) gateway_common.OutboundHTTPResponse {
410410
m.fetchCallCount++
411411
return fetchFn()
412412
}

core/services/gateway/handlers/capabilities/v2/response_cache.go

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,19 @@ import (
55
"sync"
66
"time"
77

8+
"golang.org/x/sync/singleflight"
9+
810
"github.com/smartcontractkit/chainlink-common/pkg/logger"
911
"github.com/smartcontractkit/chainlink-common/pkg/types/gateway"
1012
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities/v2/metrics"
1113
)
1214

1315
// responseCache is a thread-safe cache for storing HTTP responses.
14-
// It uses a map to store responses keyed by a unique identifier generated from the request
15-
// cache key is prefixed by workflowID to avoid collisions between different workflows.
16+
// It uses a map to store responses keyed by a hash of the request (method, URL, headers, body, workflowOwner).
1617
type responseCache struct {
17-
cacheMu sync.Mutex
18+
cacheMu sync.RWMutex
1819
cache map[string]*cachedResponse
20+
flight singleflight.Group
1921
lggr logger.Logger
2022
ttl time.Duration
2123
metrics *metrics.Metrics
@@ -42,8 +44,8 @@ func isCacheableStatusCode(statusCode int) bool {
4244
}
4345

4446
// isExpiredOrNotCached returns true if the cached response is expired or not cached.
45-
// IMPORTANT: this method does not lock the cache map. MUST be called with the cacheMu locked.
46-
func (rc *responseCache) isExpiredOrNotCached(_ string, req gateway.OutboundHTTPRequest) bool {
47+
// IMPORTANT: this method does not lock the cache map. MUST be called with cacheMu write-locked.
48+
func (rc *responseCache) isExpiredOrNotCached(req gateway.OutboundHTTPRequest) bool {
4749
cachedResp, exists := rc.cache[req.Hash()]
4850
if !exists || time.Now().After(cachedResp.storedAt.Add(rc.ttl)) {
4951
return true
@@ -53,32 +55,63 @@ func (rc *responseCache) isExpiredOrNotCached(_ string, req gateway.OutboundHTTP
5355

5456
// Fetch fetches a response from the cache if it exists and
5557
// the age of cached response is less than the max age of the request.
56-
// If the cached response is expired or not cached, it fetches a new response from the fetchFn.
58+
// If the cached response is expired or not cached, it fetches a new response from the fetchFn
5759
// and caches the response if it is cacheable and storeOnFetch is true.
58-
func (rc *responseCache) Fetch(ctx context.Context, workflowID string, req gateway.OutboundHTTPRequest, fetchFn func() gateway.OutboundHTTPResponse, storeOnFetch bool) gateway.OutboundHTTPResponse {
59-
rc.cacheMu.Lock()
60-
defer rc.cacheMu.Unlock()
60+
//
61+
// The mutex is only held during cache map access (microseconds), not during fetchFn execution.
62+
// Singleflight deduplicates concurrent requests to the same cache key so only one fetchFn
63+
// runs per key, while requests to different keys execute in parallel.
64+
// Cache read and write happen inside the singleflight callback to ensure the key remains
65+
// in-flight until the result is stored, preventing duplicate fetches.
66+
func (rc *responseCache) Fetch(ctx context.Context, req gateway.OutboundHTTPRequest, fetchFn func() gateway.OutboundHTTPResponse, storeOnFetch bool) gateway.OutboundHTTPResponse {
67+
cacheKey := req.Hash()
6168
cacheMaxAge := time.Duration(req.CacheSettings.MaxAgeMs) * time.Millisecond
62-
cachedResp, exists := rc.cache[req.Hash()]
69+
70+
// Fast path: check cache without singleflight overhead.
71+
rc.cacheMu.RLock()
72+
cachedResp, exists := rc.cache[cacheKey]
73+
rc.cacheMu.RUnlock()
6374
if exists && cachedResp.storedAt.Add(cacheMaxAge).After(time.Now()) {
6475
rc.metrics.IncrementCacheHitCount(ctx, rc.lggr)
6576
return cachedResp.response
6677
}
67-
response := fetchFn()
68-
if storeOnFetch && isCacheableStatusCode(response.StatusCode) && rc.isExpiredOrNotCached(workflowID, req) {
69-
rc.cache[req.Hash()] = &cachedResponse{
70-
response: response,
71-
storedAt: time.Now(),
78+
79+
// Slow path: singleflight deduplicates concurrent fetches per key.
80+
// Cache check + store happen inside the flight so the key isn't released
81+
// until the result is cached, closing the race window between singleflight
82+
// completion and cache write.
83+
result, _, _ := rc.flight.Do(cacheKey, func() (interface{}, error) {
84+
// Re-check cache: a previous flight may have just stored the result.
85+
rc.cacheMu.RLock()
86+
cachedResp, exists := rc.cache[cacheKey]
87+
rc.cacheMu.RUnlock()
88+
if exists && cachedResp.storedAt.Add(cacheMaxAge).After(time.Now()) {
89+
rc.metrics.IncrementCacheHitCount(ctx, rc.lggr)
90+
return cachedResp.response, nil
7291
}
73-
}
74-
return response
92+
93+
response := fetchFn()
94+
95+
if storeOnFetch && isCacheableStatusCode(response.StatusCode) {
96+
rc.cacheMu.Lock()
97+
rc.cache[cacheKey] = &cachedResponse{
98+
response: response,
99+
storedAt: time.Now(),
100+
}
101+
rc.cacheMu.Unlock()
102+
}
103+
104+
return response, nil
105+
})
106+
107+
return result.(gateway.OutboundHTTPResponse)
75108
}
76109

77110
// Set caches a response if it is cacheable (2xx or 4xx and cache is empty or expired for the given request)
78-
func (rc *responseCache) Set(workflowID string, req gateway.OutboundHTTPRequest, response gateway.OutboundHTTPResponse) {
111+
func (rc *responseCache) Set(req gateway.OutboundHTTPRequest, response gateway.OutboundHTTPResponse) {
79112
rc.cacheMu.Lock()
80113
defer rc.cacheMu.Unlock()
81-
if isCacheableStatusCode(response.StatusCode) && rc.isExpiredOrNotCached(workflowID, req) {
114+
if isCacheableStatusCode(response.StatusCode) && rc.isExpiredOrNotCached(req) {
82115
rc.cache[req.Hash()] = &cachedResponse{
83116
response: response,
84117
storedAt: time.Now(),

0 commit comments

Comments
 (0)