@@ -21,8 +21,14 @@ type requestAuthorizer struct {
2121 workflowRegistrySyncer workflowsyncerv2.WorkflowRegistrySyncer
2222 replayGuard * DigestReplayGuard
2323 lggr logger.Logger
24+ sleep func (time.Duration )
2425}
2526
27+ const (
28+ allowlistReadRetryCount = 3
29+ allowlistReadRetryInterval = 3 * time .Second
30+ )
31+
2632// AuthorizeRequest authorizes a request based on the request digest and the allowlisted requests.
2733// It does NOT check if the request method is allowed.
2834func (r * requestAuthorizer ) AuthorizeRequest (ctx context.Context , req jsonrpc.Request [json.RawMessage ]) (isAuthorized bool , owner string , err error ) {
@@ -43,20 +49,8 @@ func (r *requestAuthorizer) AuthorizeRequest(ctx context.Context, req jsonrpc.Re
4349 r .lggr .Errorw ("AuthorizeRequest workflowRegistrySyncer is nil" , "method" , req .Method , "requestID" , req .ID )
4450 return false , "" , errors .New ("internal error: workflowRegistrySyncer is nil" )
4551 }
46- allowedRequests := r .workflowRegistrySyncer .GetAllowlistedRequests (ctx )
47- allowedRequestsStrs := make ([]string , 0 , len (allowedRequests ))
48- for _ , rr := range allowedRequests {
49- allowedReqStr := fmt .Sprintf ("Owner: %s, RequestDigest: %s, ExpiryTimestamp: %d" , rr .Owner .Hex (), hex .EncodeToString (rr .RequestDigest [:]), rr .ExpiryTimestamp )
50- allowedRequestsStrs = append (allowedRequestsStrs , allowedReqStr )
51- }
52- r .lggr .Infow ("AuthorizeRequest GetAllowlistedRequests" , "method" , req .Method , "requestID" , req .ID , "allowedRequests" , allowedRequestsStrs )
53- allowlistedRequest := r .fetchAllowlistedItem (allowedRequests , requestDigestBytes32 )
52+ allowlistedRequest , _ := r .fetchAllowlistedItemWithRetry (ctx , req .Method , req .ID , requestDigest , requestDigestBytes32 )
5453 if allowlistedRequest == nil {
55- r .lggr .Infow ("AuthorizeRequest fetchAllowlistedItem request not allowlisted" ,
56- "method" , req .Method ,
57- "requestID" , req .ID ,
58- "digestHexStr" , requestDigest ,
59- "allowedRequestsStrs" , allowedRequestsStrs )
6054 return false , "" , errors .New ("request not allowlisted" )
6155 }
6256
@@ -76,6 +70,56 @@ func (r *requestAuthorizer) AuthorizeRequest(ctx context.Context, req jsonrpc.Re
7670 return true , allowlistedRequest .Owner .Hex (), nil
7771}
7872
73+ func (r * requestAuthorizer ) fetchAllowlistedItemWithRetry (ctx context.Context , method string , requestID interface {}, requestDigest string , digest [32 ]byte ) (* workflow_registry_wrapper_v2.WorkflowRegistryOwnerAllowlistedRequest , []string ) {
74+ var allowedRequestsStrs []string
75+ for attempt := 0 ; attempt <= allowlistReadRetryCount ; attempt ++ {
76+ allowedRequests := r .workflowRegistrySyncer .GetAllowlistedRequests (ctx )
77+ allowedRequestsStrs = make ([]string , 0 , len (allowedRequests ))
78+ for _ , rr := range allowedRequests {
79+ allowedReqStr := fmt .Sprintf ("Owner: %s, RequestDigest: %s, ExpiryTimestamp: %d" , rr .Owner .Hex (), hex .EncodeToString (rr .RequestDigest [:]), rr .ExpiryTimestamp )
80+ allowedRequestsStrs = append (allowedRequestsStrs , allowedReqStr )
81+ }
82+ r .lggr .Infow ("AuthorizeRequest GetAllowlistedRequests" , "method" , method , "requestID" , requestID , "attempt" , attempt + 1 , "allowedRequests" , allowedRequestsStrs )
83+
84+ allowlistedRequest := r .fetchAllowlistedItem (allowedRequests , digest )
85+ if allowlistedRequest != nil {
86+ return allowlistedRequest , allowedRequestsStrs
87+ }
88+
89+ if attempt == allowlistReadRetryCount {
90+ break
91+ }
92+
93+ r .lggr .Warnw ("AuthorizeRequest request not found in allowlist, retrying" ,
94+ "method" , method ,
95+ "requestID" , requestID ,
96+ "digestHexStr" , requestDigest ,
97+ "attempt" , attempt + 1 ,
98+ "retryInterval" , allowlistReadRetryInterval ,
99+ "allowedRequestsStrs" , allowedRequestsStrs )
100+
101+ select {
102+ case <- ctx .Done ():
103+ r .lggr .Warnw ("AuthorizeRequest allowlist retry canceled" ,
104+ "method" , method ,
105+ "requestID" , requestID ,
106+ "digestHexStr" , requestDigest ,
107+ "attempt" , attempt + 1 )
108+ return nil , allowedRequestsStrs
109+ default :
110+ }
111+
112+ r .sleep (allowlistReadRetryInterval )
113+ }
114+
115+ r .lggr .Infow ("AuthorizeRequest fetchAllowlistedItem request not allowlisted" ,
116+ "method" , method ,
117+ "requestID" , requestID ,
118+ "digestHexStr" , requestDigest ,
119+ "allowedRequestsStrs" , allowedRequestsStrs )
120+ return nil , allowedRequestsStrs
121+ }
122+
79123func (r * requestAuthorizer ) fetchAllowlistedItem (allowListedRequests []workflow_registry_wrapper_v2.WorkflowRegistryOwnerAllowlistedRequest , digest [32 ]byte ) * workflow_registry_wrapper_v2.WorkflowRegistryOwnerAllowlistedRequest {
80124 for _ , item := range allowListedRequests {
81125 if item .RequestDigest == digest {
@@ -90,5 +134,6 @@ func NewRequestAuthorizer(lggr logger.Logger, workflowRegistrySyncer workflowsyn
90134 workflowRegistrySyncer : workflowRegistrySyncer ,
91135 lggr : logger .Named (lggr , "VaultRequestAuthorizer" ),
92136 replayGuard : NewDigestReplayGuard (),
137+ sleep : time .Sleep ,
93138 }
94139}
0 commit comments