1818from exception import BridgeError , FarmerError , SuppliedTokenNotAcceptedError
1919from http import Client
2020
21+ import threading
22+ import thread
23+
24+ TIMEOUT = 60 # default = 1 minute
25+
2126
2227def foo (args ):
2328 self , shard , shard_index , frame , file_name , tmp_path = args
24- self .upload_shard (shard , shard_index , frame , file_name , tmp_path )
29+ return self .upload_shard (shard , shard_index , frame , file_name , tmp_path )
30+
31+
32+ def quit_function (fn_name ):
33+ # self.__logger.debug('{0} took too long'.format(fn_name))
34+ thread .interrupt_main () # raises KeyboardInterrupt
35+
36+
37+ def exit_after (s ):
38+ '''
39+ use as decorator to exit process if
40+ function takes longer than s seconds
41+ '''
42+ def outer (fn ):
43+ def inner (* args , ** kwargs ):
44+ timer = threading .Timer (s , quit_function , args = [fn .__name__ ])
45+ timer .start ()
46+ try :
47+ result = fn (* args , ** kwargs )
48+ finally :
49+ timer .cancel ()
50+ return result
51+ return inner
52+
53+ return outer
2554
2655
2756class Uploader :
@@ -101,6 +130,26 @@ def _prepare_bucket_entry_hmac(self, shard_array):
101130
102131 return current_hmac
103132
133+ @exit_after (TIMEOUT )
134+ def require_upload (self , shard_path , url , index ):
135+ with open (shard_path , 'rb' ) as f :
136+ response = requests .post (
137+ url ,
138+ data = self ._read_in_chunks (
139+ f , shard_index = index ),
140+ timeout = 1 )
141+ return response
142+
143+ def _calculate_timeout (self , shard_size , mbps = 0.5 ):
144+ """
145+ Args:
146+ shard_size: shard size in Byte
147+ mbps: upload throughtput. Default 500 kbps
148+ """
149+ global TIMEOUT
150+ TIMEOUT = int (shard_size * 8.0 / (1024 ** 2 * mbps ))
151+ self .__logger .debug ('Set timeout to %s seconds' % TIMEOUT )
152+
104153 def upload_shard (self , shard , chapters , frame ,
105154 file_name_ready_to_shard_upload , tmp_path ):
106155 """
@@ -120,7 +169,7 @@ def upload_shard(self, shard, chapters, frame,
120169 contract_negotiation_tries += 1
121170 self .__logger .debug ('Negotiating contract' )
122171 self .__logger .debug ('Trying to negotiate storage contract for \
123- shard at index %s... ' % chapters )
172+ shard at index %s. Attempt %s ' % ( chapters , contract_negotiation_tries ) )
124173
125174 try :
126175 frame_content = self .client .frame_add_shard (shard , frame .id )
@@ -132,7 +181,9 @@ def upload_shard(self, shard, chapters, frame,
132181 frame_content ['farmer' ]['port' ],
133182 frame_content ['hash' ],
134183 frame_content ['token' ])
135- self .__logger .debug ('upload_shard url=%s' , url )
184+ self .__logger .debug ('Done contract for shard %s with url=%s' ,
185+ chapters ,
186+ url )
136187
137188 # begin recording exchange report
138189 # exchange_report = model.ExchangeReport()
@@ -151,66 +202,82 @@ def upload_shard(self, shard, chapters, frame,
151202
152203 try :
153204 self .__logger .debug (
154- 'Upload shard at index %s to %s:%d attempt #%d' ,
205+ 'Upload shard at index %s to %s attempt #%d' ,
155206 shard .index ,
156207 frame_content ['farmer' ]['address' ],
157- frame_content ['farmer' ]['port' ],
158208 farmer_tries )
159209
160210 mypath = os .path .join (
161211 tmp_path , '%s-%s' % (
162212 file_name_ready_to_shard_upload ,
163213 chapters + 1 ))
164214
215+ """
165216 with open(mypath, 'rb') as f:
166217 response = requests.post(
167218 url,
168219 data=self._read_in_chunks(
169220 f, shard_index=chapters),
170221 timeout=1)
222+ """
223+ response = self .require_upload (mypath , url , chapters )
224+ self .__logger .debug ('>>> Shard %s Uploaded' % chapters )
171225
172226 j = json .loads (str (response .content ))
227+ self .__logger .info ('>>>> %s' % str (j ))
173228
174229 if j .get ('result' ) == \
175230 'The supplied token is not accepted' :
176231 raise SuppliedTokenNotAcceptedError ()
177232
233+ # Exceptions raised when uploading shards
178234 except FarmerError as e :
235+ self .__logger .error ('Farmer error' )
179236 self .__logger .error (e )
180237 continue
181238
239+ except KeyboardInterrupt :
240+ self .__logger .error ()
241+ self .__logger .error ()
242+ self .__logger .error (
243+ 'Upload shard %s to %s too slow.' % (
244+ chapters , url ))
245+ self .__logger .error (
246+ 'Upload timed out. Redo upload of shard %s' %
247+ chapters )
248+ continue
249+
182250 except Exception as e :
251+ self .__logger .error ('Exception' )
183252 self .__logger .error (e )
184253 self .__logger .error (
185- 'Shard upload error for to %s:%d' ,
254+ 'Shard upload error for %s to %s:%d' ,
255+ chapters ,
186256 frame_content ['farmer' ]['address' ],
187257 frame_content ['farmer' ]['port' ])
188258 continue
189259
190- self .shards_already_uploaded += 1
191- self .__logger .info (
192- 'Shard uploaded successfully to %s:%d' ,
193- frame_content ['farmer' ]['address' ],
194- frame_content ['farmer' ]['port' ])
195-
196- self .__logger .debug (
197- '%s shards, %s sent' ,
198- self .all_shards_count ,
199- self .shards_already_uploaded )
200-
201- if int (self .all_shards_count ) <= \
202- int (self .shards_already_uploaded ):
203- self .__logger .debug ('finish upload' )
260+ else :
261+ self .shards_already_uploaded += 1
262+ self .__logger .info (
263+ 'Shard uploaded successfully to %s:%d' ,
264+ frame_content ['farmer' ]['address' ],
265+ frame_content ['farmer' ]['port' ])
204266
205- break
267+ self .__logger .debug (
268+ '%s shards, %s sent' ,
269+ self .all_shards_count ,
270+ self .shards_already_uploaded )
206271
207- self .__logger .debug ('response.content=%s' , response .content )
272+ if int (self .all_shards_count ) <= \
273+ int (self .shards_already_uploaded ):
274+ self .__logger .debug ('finish upload' )
208275
209- j = json .loads (str (response .content ))
210- if j .get ('result' ) == 'The supplied token is not accepted' :
211- raise SuppliedTokenNotAcceptedError ()
276+ break
212277
278+ # Exceptions raised negotiating contracts
213279 except BridgeError as e :
280+ self .__logger .error ('Bridge error' )
214281 self .__logger .error (e )
215282
216283 # upload failed due to Storj Bridge failure
@@ -236,20 +303,22 @@ def upload_shard(self, shard, chapters, frame,
236303 # Send exchange report
237304 # self.client.send_exchange_report(exchange_report)
238305 continue
306+ else :
307+ # uploaded with success
308+ current_timestamp = int (time .time ())
309+ # prepare second half of exchange heport
310+ exchange_report .exchangeEnd = str (current_timestamp )
311+ exchange_report .exchangeResultCode = exchange_report .SUCCESS
312+ exchange_report .exchangeResultMessage = \
313+ exchange_report .STORJ_REPORT_SHARD_UPLOADED
239314
240- # uploaded with success
241- current_timestamp = int (time .time ())
242- # prepare second half of exchange heport
243- exchange_report .exchangeEnd = str (current_timestamp )
244- exchange_report .exchangeResultCode = exchange_report .SUCCESS
245- exchange_report .exchangeResultMessage = \
246- exchange_report .STORJ_REPORT_SHARD_UPLOADED
247-
248- self .__logger .info ('Shard %s successfully added and exchange \
249- report sent.' , chapters + 1 )
250- # Send exchange report
251- # self.client.send_exchange_report(exchange_report)
252- break
315+ self .__logger .info ('Shard %s successfully added and exchange \
316+ report sent. ' , chapters + 1 )
317+ # Send exchange report
318+ # self.client.send_exchange_report(exchange_report)
319+ # break
320+ return True
321+ return False
253322
254323 def _read_in_chunks (self , file_object , blocksize = 4096 , chunks = - 1 ,
255324 shard_index = None ):
@@ -277,11 +346,9 @@ def _read_in_chunks(self, file_object, blocksize=4096, chunks=-1,
277346 def file_upload (self , bucket_id , file_path , tmp_file_path ):
278347 """"""
279348
280- self .__logger .debug ('Upload %s in bucket %d ' , file_path , bucket_id )
349+ self .__logger .debug ('Upload %s in bucket %s ' , file_path , bucket_id )
281350 self .__logger .debug ('Temp folder %s' , tmp_file_path )
282351
283- encryption_enabled = True
284-
285352 bname = os .path .split (file_path )[1 ] # File name
286353
287354 file_mime_type = 'text/plain'
@@ -343,18 +410,29 @@ def file_upload(self, bucket_id, file_path, tmp_file_path):
343410
344411 self .__logger .debug ('Sharding ended...' )
345412
346- self .__logger .debug ('There are %d shards' , self .all_shards_count )
413+ self .__logger .debug ('There are %s shards' , self .all_shards_count )
347414
348- # create file hash
415+ # Calculate timeout
416+ self ._calculate_timeout (shard_size = shards_manager .shards [0 ].size ,
417+ mbps = 1 )
349418
419+ # Upload shards
350420 mp = Pool ()
351- mp .map (foo , [(self , shards_manager .shards [x ], x , frame ,
352- file_name_ready_to_shard_upload , tmp_file_path )
353- for x in range (len (shards_manager .shards ))])
354-
421+ res = mp .map (foo , [(self , shards_manager .shards [x ], x , frame ,
422+ file_name_ready_to_shard_upload , tmp_file_path )
423+ for x in range (len (shards_manager .shards ))])
424+
425+ self .__logger .debug ('===== RESULTS =====' )
426+ self .__logger .debug (res )
427+ if False in res :
428+ self .__logger .error ('File not uploaded: shard %s not uploaded' %
429+ res .index (False ))
430+ self .__logger .error ('Exiting with errors' )
431+ exit (1 )
355432 # finish_upload
356433 self .__logger .debug ('Generating HMAC...' )
357434
435+ # create file hash
358436 hash_sha512_hmac_b64 = self ._prepare_bucket_entry_hmac (
359437 shards_manager .shards )
360438 hash_sha512_hmac = hashlib .sha224 (str (
0 commit comments