@@ -965,6 +965,49 @@ def _should_gzip(self, payload: str, enable_gzip: bool = False, gzip_threshold:
965965
966966 return False
967967
968+ @staticmethod
969+ def _on_error (ex ):
970+ logger .error ("unexpected error during batching: %s" , ex )
971+
972+ def _to_response (self , data : _BatchItem , delay : datetime .timedelta ):
973+ return rx .of (data ).pipe (
974+ ops .subscribe_on (self ._write_options .write_scheduler ),
975+ # use delay if its specified
976+ ops .delay (duetime = delay , scheduler = self ._write_options .write_scheduler ),
977+ # invoke http call
978+ ops .map (lambda x : self ._http (x , ** x .key .kwargs )),
979+ # catch exception to fail batch response
980+ ops .catch (handler = lambda exception , source : rx .just (_BatchResponse (exception = exception , data = data ))),
981+ )
982+
983+ def _on_next (self , response : _BatchResponse ):
984+ if response .exception :
985+ logger .error ("The batch item wasn't processed successfully because: %s" , response .exception )
986+ if self ._error_callback :
987+ try :
988+ self ._error_callback (response .data .to_key_tuple (), response .data .data , response .exception )
989+ except Exception as e :
990+ """
991+ Unfortunately, because callbacks are user-provided generic code, exceptions can be entirely
992+ arbitrary
993+
994+ We trap it, log that it occurred and then proceed - there's not much more that we can
995+ really do.
996+ """
997+ logger .error ("The configured error callback threw an exception: %s" , e )
998+
999+ else :
1000+ logger .debug ("The batch item: %s was processed successfully." , response )
1001+ if self ._success_callback :
1002+ try :
1003+ self ._success_callback (response .data .to_key_tuple (), response .data .data )
1004+ except Exception as e :
1005+ logger .error ("The configured success callback threw an exception: %s" , e )
1006+
1007+ def _on_complete (self ):
1008+ self ._disposable .dispose ()
1009+ logger .debug ("the batching processor was disposed" )
1010+
9681011 def _append_default_tag (self , key , val , record ):
9691012 from influxdb_client_3 .write_client import Point
9701013 if isinstance (record , bytes ) or isinstance (record , str ):
@@ -1049,49 +1092,6 @@ def __del__(self):
10491092 """Close WriteApi."""
10501093 self .close ()
10511094
1052- @staticmethod
1053- def _on_error (ex ):
1054- logger .error ("unexpected error during batching: %s" , ex )
1055-
1056- def _on_complete (self ):
1057- self ._disposable .dispose ()
1058- logger .debug ("the batching processor was disposed" )
1059-
1060- def _to_response (self , data : _BatchItem , delay : datetime .timedelta ):
1061- return rx .of (data ).pipe (
1062- ops .subscribe_on (self ._write_options .write_scheduler ),
1063- # use delay if its specified
1064- ops .delay (duetime = delay , scheduler = self ._write_options .write_scheduler ),
1065- # invoke http call
1066- ops .map (lambda x : self ._http (x , ** x .key .kwargs )),
1067- # catch exception to fail batch response
1068- ops .catch (handler = lambda exception , source : rx .just (_BatchResponse (exception = exception , data = data ))),
1069- )
1070-
1071- def _on_next (self , response : _BatchResponse ):
1072- if response .exception :
1073- logger .error ("The batch item wasn't processed successfully because: %s" , response .exception )
1074- if self ._error_callback :
1075- try :
1076- self ._error_callback (response .data .to_key_tuple (), response .data .data , response .exception )
1077- except Exception as e :
1078- """
1079- Unfortunately, because callbacks are user-provided generic code, exceptions can be entirely
1080- arbitrary
1081-
1082- We trap it, log that it occurred and then proceed - there's not much more that we can
1083- really do.
1084- """
1085- logger .error ("The configured error callback threw an exception: %s" , e )
1086-
1087- else :
1088- logger .debug ("The batch item: %s was processed successfully." , response )
1089- if self ._success_callback :
1090- try :
1091- self ._success_callback (response .data .to_key_tuple (), response .data .data )
1092- except Exception as e :
1093- logger .error ("The configured success callback threw an exception: %s" , e )
1094-
10951095 def __setstate__ (self , state ):
10961096 """Set your object with the provided dict."""
10971097 self .__dict__ .update (state )
0 commit comments