Skip to content

Commit ee9498d

Browse files
authored
Merge pull request #321 from tilezen/zerebubuth/backoff-and-retry-s3-puts
Add a backoff and retry to S3 puts to help with Slow Down messages.
2 parents c1d820e + 08e5184 commit ee9498d

2 files changed

Lines changed: 53 additions & 12 deletions

File tree

tilequeue/command.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,8 @@ def _make_store(cfg):
403403
store_cfg = cfg.yml.get('store')
404404
assert store_cfg, "Store was not configured, but is necessary."
405405
credentials = cfg.subtree('aws credentials')
406-
store = make_store(store_cfg, credentials=credentials)
406+
logger = make_logger(cfg, 'process')
407+
store = make_store(store_cfg, credentials=credentials, logger=logger)
407408
return store
408409

409410

tilequeue/store.py

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,67 @@ def parse_coordinate_from_path(path, extension, layer):
5959
pass
6060

6161

62+
# decorates a function to back off and retry
63+
def _backoff_and_retry(ExceptionType, num_tries=5, retry_factor=2,
64+
retry_interval=1, logger=None):
65+
from time import sleep
66+
from functools import wraps
67+
68+
def decorator(f):
69+
@wraps(f)
70+
def func(*args, **kwargs):
71+
# do the first num_tries-1 attempts wrapped in something to catch
72+
# any exceptions, optionally log them, and try again.
73+
interval = retry_interval
74+
factor = retry_factor
75+
76+
for _ in xrange(1, num_tries):
77+
try:
78+
return f(*args, **kwargs)
79+
80+
except ExceptionType as e:
81+
if logger:
82+
logger.warning("Failed. Backing off and retrying. "
83+
"Error: %s" % str(e))
84+
85+
sleep(interval)
86+
interval *= factor
87+
88+
# do final attempt without try-except, so we get the exception
89+
# in normal code.
90+
return f(*args, **kwargs)
91+
92+
return func
93+
return decorator
94+
95+
6296
class S3(object):
6397

6498
def __init__(
6599
self, bucket, date_prefix, path, reduced_redundancy,
66-
delete_retry_interval):
100+
delete_retry_interval, logger):
67101
self.bucket = bucket
68102
self.date_prefix = date_prefix
69103
self.path = path
70104
self.reduced_redundancy = reduced_redundancy
71105
self.delete_retry_interval = delete_retry_interval
106+
self.logger = logger
72107

73108
def write_tile(self, tile_data, coord, format, layer):
74109
key_name = s3_tile_key(
75110
self.date_prefix, self.path, layer, coord, format.extension)
76111
key = self.bucket.new_key(key_name)
77-
key.set_contents_from_string(
78-
tile_data,
79-
headers={'Content-Type': format.mimetype},
80-
policy='public-read',
81-
reduced_redundancy=self.reduced_redundancy,
82-
)
112+
113+
@_backoff_and_retry(Exception, logger=self.logger)
114+
def write_to_s3():
115+
key.set_contents_from_string(
116+
tile_data,
117+
headers={'Content-Type': format.mimetype},
118+
policy='public-read',
119+
reduced_redundancy=self.reduced_redundancy,
120+
)
121+
122+
write_to_s3()
83123

84124
def read_tile(self, coord, format, layer):
85125
key_name = s3_tile_key(
@@ -314,11 +354,11 @@ def list_tiles(self, format, layer):
314354
def make_s3_store(bucket_name,
315355
aws_access_key_id=None, aws_secret_access_key=None,
316356
path='osm', reduced_redundancy=False, date_prefix='',
317-
delete_retry_interval=60):
357+
delete_retry_interval=60, logger=None):
318358
conn = connect_s3(aws_access_key_id, aws_secret_access_key)
319359
bucket = Bucket(conn, bucket_name)
320360
s3_store = S3(bucket, date_prefix, path, reduced_redundancy,
321-
delete_retry_interval)
361+
delete_retry_interval, logger)
322362
return s3_store
323363

324364

@@ -354,7 +394,7 @@ def write_tile_if_changed(store, tile_data, coord, format, layer):
354394
return False
355395

356396

357-
def make_store(yml, credentials={}):
397+
def make_store(yml, credentials={}, logger=None):
358398
store_type = yml.get('type')
359399

360400
if store_type == 'directory':
@@ -377,7 +417,7 @@ def make_store(yml, credentials={}):
377417
return make_s3_store(
378418
bucket, aws_access_key_id, aws_secret_access_key, path=path,
379419
reduced_redundancy=reduced_redundancy, date_prefix=date_prefix,
380-
delete_retry_interval=delete_retry_interval)
420+
delete_retry_interval=delete_retry_interval, logger=logger)
381421

382422
else:
383423
raise ValueError('Unrecognized store type: `{}`'.format(store_type))

0 commit comments

Comments
 (0)