@@ -139,12 +139,12 @@ def connect(self) -> None:
139139 timeout = self .deadline + time .monotonic ()
140140 while not self .connected and time .monotonic () < timeout :
141141 if self ._is_fatal :
142- raise ProviderFatalError ("fatal gRPC status code" )
142+ raise ProviderFatalError ("Fatal gRPC status code received " )
143143 time .sleep (0.05 )
144144 logger .debug ("Finished blocking gRPC state initialization" )
145145
146146 if self ._is_fatal :
147- raise ProviderFatalError ("fatal gRPC status code" )
147+ raise ProviderFatalError ("Fatal gRPC status code received " )
148148
149149 if not self .connected :
150150 raise ProviderNotReadyError (
@@ -243,65 +243,78 @@ def _fetch_metadata(self) -> typing.Optional[sync_pb2.GetMetadataResponse]:
243243 else :
244244 raise e
245245
246+ def _resolve_context_values (
247+ self ,
248+ flag_rsp : sync_pb2 .SyncFlagsResponse ,
249+ context_values_response : typing .Optional [sync_pb2 .GetMetadataResponse ],
250+ ) -> dict :
251+ if flag_rsp .sync_context :
252+ return MessageToDict (flag_rsp .sync_context )
253+ if context_values_response :
254+ return MessageToDict (context_values_response )["metadata" ]
255+ return {}
256+
257+ def _handle_flag_response (
258+ self ,
259+ flag_rsp : sync_pb2 .SyncFlagsResponse ,
260+ context_values_response : typing .Optional [sync_pb2 .GetMetadataResponse ],
261+ ) -> bool :
262+ """Process a single flag response. Returns True if the loop should terminate."""
263+ flag_str = flag_rsp .flag_configuration
264+ logger .debug (f"Received flag configuration - { abs (hash (flag_str )) % (10 ** 8 )} " )
265+ self .flag_store .update (json .loads (flag_str ))
266+
267+ if not self .connected :
268+ context_values = self ._resolve_context_values (
269+ flag_rsp , context_values_response
270+ )
271+ self .emit_provider_ready (
272+ ProviderEventDetails (message = "gRPC sync connection established" ),
273+ context_values ,
274+ )
275+ self .connected = True
276+
277+ if not self .active :
278+ logger .debug ("Terminating gRPC sync thread" )
279+ return True
280+ return False
281+
282+ def _handle_rpc_error (self , e : grpc .RpcError ) -> bool :
283+ """Handle a gRPC RpcError. Returns True if the stream loop should stop."""
284+ logger .warning (f"SyncFlags stream error, { e .code ()= } { e .details ()= } " )
285+ if e .code ().name in self .config .fatal_status_codes :
286+ self ._is_fatal = True
287+ self .active = False
288+ self .emit_provider_error (
289+ ProviderEventDetails (
290+ message = f"Fatal gRPC status code: { e .code ()} " ,
291+ error_code = ErrorCode .PROVIDER_FATAL ,
292+ )
293+ )
294+ return True
295+ return False
296+
246297 def listen (self ) -> None :
247298 call_args = self .generate_grpc_call_args ()
248-
249299 request_args = self ._create_request_args ()
250300
251301 while self .active :
252302 try :
253303 context_values_response = self ._fetch_metadata ()
254-
255304 request = sync_pb2 .SyncFlagsRequest (** request_args )
256-
257305 logger .debug ("Setting up gRPC sync flags connection" )
258306 for flag_rsp in self .stub .SyncFlags (request , ** call_args ):
259- flag_str = flag_rsp .flag_configuration
260- logger .debug (
261- f"Received flag configuration - { abs (hash (flag_str )) % (10 ** 8 )} "
262- )
263- self .flag_store .update (json .loads (flag_str ))
264-
265- context_values = {}
266- if flag_rsp .sync_context :
267- context_values = MessageToDict (flag_rsp .sync_context )
268- elif context_values_response :
269- context_values = MessageToDict (context_values_response )[
270- "metadata"
271- ]
272-
273- if not self .connected :
274- self .emit_provider_ready (
275- ProviderEventDetails (
276- message = "gRPC sync connection established"
277- ),
278- context_values ,
279- )
280- self .connected = True
281-
282- if not self .active :
283- logger .debug ("Terminating gRPC sync thread" )
307+ if self ._handle_flag_response (flag_rsp , context_values_response ):
284308 return
285309 except grpc .RpcError as e : # noqa: PERF203
286- logger .warning (f"SyncFlags stream error, { e .code ()= } { e .details ()= } " )
287- if e .code ().name in self .config .fatal_status_codes :
288- self ._is_fatal = True
289- self .active = False
290- self .emit_provider_error (
291- ProviderEventDetails (
292- message = f"Fatal gRPC status code: { e .code ()} " ,
293- error_code = ErrorCode .PROVIDER_FATAL ,
294- )
295- )
310+ if self ._handle_rpc_error (e ):
296311 return
297312 except json .JSONDecodeError :
298313 logger .exception (
299- f "Could not parse JSON flag data from SyncFlags endpoint: { flag_str = } "
314+ "Could not parse JSON flag data from SyncFlags endpoint"
300315 )
301316 except ParseError :
302- logger .exception (
303- f"Could not parse flag data using flagd syntax: { flag_str = } "
304- )
317+ logger .exception ("Could not parse flag data using flagd syntax" )
305318
306319 def generate_grpc_call_args (self ) -> GrpcMultiCallableArgs :
307320 call_args : GrpcMultiCallableArgs = {"wait_for_ready" : True }
0 commit comments