2525
2626"""
2727This package implements leader election using an annotation in a Kubernetes object.
28- The onstarted_leading function is run in a thread and when it returns, if it does
28+ The onstarted_leading function is run in a thread and when it returns, if it does
2929it might not be safe to run it again in a process.
3030
3131At first all candidates are considered followers. The one to create a lock or update
@@ -52,11 +52,14 @@ def __init__(self, election_config):
5252 def run (self ):
5353 # Try to create/ acquire a lock
5454 if self .acquire ():
55- logger .info ("{} successfully acquired lease" .format (self .election_config .lock .identity ))
55+ logger .info (
56+ "{} successfully acquired lease" .format (
57+ self .election_config .lock .identity ))
5658
5759 # Start leading and call OnStartedLeading()
5860 threading .daemon = True
59- threading .Thread (target = self .election_config .onstarted_leading ).start ()
61+ threading .Thread (
62+ target = self .election_config .onstarted_leading ).start ()
6063
6164 self .renew_loop ()
6265
@@ -65,7 +68,9 @@ def run(self):
6568
6669 def acquire (self ):
6770 # Follower
68- logger .info ("{} is a follower" .format (self .election_config .lock .identity ))
71+ logger .info (
72+ "{} is a follower" .format (
73+ self .election_config .lock .identity ))
6974 retry_period = self .election_config .retry_period
7075
7176 while True :
@@ -78,7 +83,8 @@ def acquire(self):
7883
7984 def renew_loop (self ):
8085 # Leader
81- logger .info ("Leader has entered renew loop and will try to update lease continuously" )
86+ logger .info (
87+ "Leader has entered renew loop and will try to update lease continuously" )
8288
8389 retry_period = self .election_config .retry_period
8490 renew_deadline = self .election_config .renew_deadline * 1000
@@ -106,34 +112,46 @@ def try_acquire_or_renew(self):
106112 now = datetime .datetime .fromtimestamp (now_timestamp )
107113
108114 # Check if lock is created
109- lock_status , old_election_record = self .election_config .lock .get (self . election_config . lock . name ,
110- self .election_config .lock .namespace )
115+ lock_status , old_election_record = self .election_config .lock .get (
116+ self . election_config . lock . name , self .election_config .lock .namespace )
111117
112118 # create a default Election record for this candidate
113- leader_election_record = LeaderElectionRecord (self .election_config .lock .identity ,
114- str (self .election_config .lease_duration ), str (now ), str (now ))
119+ leader_election_record = LeaderElectionRecord (
120+ self .election_config .lock .identity , str (
121+ self .election_config .lease_duration ), str (now ), str (now ))
115122
116123 # A lock is not created with that name, try to create one
117124 if not lock_status :
118125 # To be removed when support for python2 will be removed
119126 if sys .version_info > (3 , 0 ):
120- if json .loads (old_election_record .body )['code' ] != HTTPStatus .NOT_FOUND :
121- logger .info ("Error retrieving resource lock {} as {}" .format (self .election_config .lock .name ,
122- old_election_record .reason ))
127+ if json .loads (old_election_record .body )[
128+ 'code' ] != HTTPStatus .NOT_FOUND :
129+ logger .info (
130+ "Error retrieving resource lock {} as {}" .format (
131+ self .election_config .lock .name ,
132+ old_election_record .reason ))
123133 return False
124134 else :
125- if json .loads (old_election_record .body )['code' ] != httplib .NOT_FOUND :
126- logger .info ("Error retrieving resource lock {} as {}" .format (self .election_config .lock .name ,
127- old_election_record .reason ))
135+ if json .loads (old_election_record .body )[
136+ 'code' ] != httplib .NOT_FOUND :
137+ logger .info (
138+ "Error retrieving resource lock {} as {}" .format (
139+ self .election_config .lock .name ,
140+ old_election_record .reason ))
128141 return False
129142
130- logger .info ("{} is trying to create a lock" .format (leader_election_record .holder_identity ))
131- create_status = self .election_config .lock .create (name = self .election_config .lock .name ,
132- namespace = self .election_config .lock .namespace ,
133- election_record = leader_election_record )
143+ logger .info (
144+ "{} is trying to create a lock" .format (
145+ leader_election_record .holder_identity ))
146+ create_status = self .election_config .lock .create (
147+ name = self .election_config .lock .name ,
148+ namespace = self .election_config .lock .namespace ,
149+ election_record = leader_election_record )
134150
135151 if create_status is False :
136- logger .info ("{} Failed to create lock" .format (leader_election_record .holder_identity ))
152+ logger .info (
153+ "{} Failed to create lock" .format (
154+ leader_election_record .holder_identity ))
137155 return False
138156
139157 self .observed_record = leader_election_record
@@ -153,16 +171,21 @@ def try_acquire_or_renew(self):
153171
154172 # Report transitions
155173 if self .observed_record and self .observed_record .holder_identity != old_election_record .holder_identity :
156- logger .info ("Leader has switched to {}" .format (old_election_record .holder_identity ))
174+ logger .info (
175+ "Leader has switched to {}" .format (
176+ old_election_record .holder_identity ))
157177
158178 if self .observed_record is None or old_election_record .__dict__ != self .observed_record .__dict__ :
159179 self .observed_record = old_election_record
160180 self .observed_time_milliseconds = int (time .time () * 1000 )
161181
162- # If This candidate is not the leader and lease duration is yet to finish
182+ # If This candidate is not the leader and lease duration is yet to
183+ # finish
163184 if (self .election_config .lock .identity != self .observed_record .holder_identity
164185 and self .observed_time_milliseconds + self .election_config .lease_duration * 1000 > int (now_timestamp * 1000 )):
165- logger .info ("yet to finish lease_duration, lease held by {} and has not expired" .format (old_election_record .holder_identity ))
186+ logger .info (
187+ "yet to finish lease_duration, lease held by {} and has not expired" .format (
188+ old_election_record .holder_identity ))
166189 return False
167190
168191 # If this candidate is the Leader
@@ -174,15 +197,20 @@ def try_acquire_or_renew(self):
174197
175198 def update_lock (self , leader_election_record ):
176199 # Update object with latest election record
177- update_status = self .election_config .lock .update (self .election_config .lock .name ,
178- self .election_config .lock .namespace ,
179- leader_election_record )
200+ update_status = self .election_config .lock .update (
201+ self .election_config .lock .name ,
202+ self .election_config .lock .namespace ,
203+ leader_election_record )
180204
181205 if update_status is False :
182- logger .info ("{} failed to acquire lease" .format (leader_election_record .holder_identity ))
206+ logger .info (
207+ "{} failed to acquire lease" .format (
208+ leader_election_record .holder_identity ))
183209 return False
184210
185211 self .observed_record = leader_election_record
186212 self .observed_time_milliseconds = int (time .time () * 1000 )
187- logger .info ("leader {} has successfully acquired lease" .format (leader_election_record .holder_identity ))
213+ logger .info (
214+ "leader {} has successfully acquired lease" .format (
215+ leader_election_record .holder_identity ))
188216 return True
0 commit comments