Skip to content

Commit b1d59a1

Browse files
authored
Merge pull request #322 from tilezen/log-batch-process
Add logging to batch processing
2 parents 8a62ed7 + 6b20b3d commit b1d59a1

2 files changed

Lines changed: 104 additions & 24 deletions

File tree

tilequeue/command.py

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -399,11 +399,12 @@ def make_seed_tile_generator(cfg):
399399
return tile_generator
400400

401401

402-
def _make_store(cfg):
402+
def _make_store(cfg, logger=None):
403403
store_cfg = cfg.yml.get('store')
404404
assert store_cfg, "Store was not configured, but is necessary."
405405
credentials = cfg.subtree('aws credentials')
406-
logger = make_logger(cfg, 'process')
406+
if logger is None:
407+
logger = make_logger(cfg, 'process')
407408
store = make_store(store_cfg, credentials=credentials, logger=logger)
408409
return store
409410

@@ -1931,15 +1932,13 @@ def tilequeue_batch_enqueue(cfg, peripherals):
19311932

19321933

19331934
def tilequeue_batch_process(cfg, args):
1935+
from tilequeue.log import BatchProcessLogger
19341936
from tilequeue.metatile import make_metatiles
19351937

19361938
logger = make_logger(cfg, 'batch_process')
1939+
batch_logger = BatchProcessLogger(logger)
19371940

1938-
# TODO log json
1939-
1940-
store = _make_store(cfg)
1941-
1942-
logger.info('batch process ... start')
1941+
store = _make_store(cfg, logger)
19431942

19441943
coord_str = args.tile
19451944

@@ -1956,8 +1955,6 @@ def tilequeue_batch_process(cfg, args):
19561955

19571956
assert queue_coord.zoom == queue_zoom, 'Unexpected zoom: %s' % coord_str
19581957

1959-
logger.info('batch process: %s' % coord_str)
1960-
19611958
# TODO generalize and move to tile.py?
19621959
def find_job_coords_for(coord, target_zoom):
19631960
xmin = coord.column
@@ -1997,38 +1994,66 @@ def find_job_coords_for(coord, target_zoom):
19971994
assert zoom_stop > group_by_zoom
19981995
formats = lookup_formats(cfg.output_formats)
19991996

1997+
batch_logger.begin_run(queue_coord)
1998+
20001999
job_coords = find_job_coords_for(queue_coord, group_by_zoom)
20012000
for job_coord in job_coords:
2001+
2002+
batch_logger.begin_pyramid(job_coord)
2003+
20022004
# each coord here is the unit of work now
20032005
pyramid_coords = [job_coord]
20042006
pyramid_coords.extend(coord_children_range(job_coord, zoom_stop))
20052007
coord_data = [dict(coord=x) for x in pyramid_coords]
2006-
for fetch, coord_datum in data_fetcher.fetch_tiles(coord_data):
2008+
2009+
try:
2010+
fetched_coord_data = data_fetcher.fetch_tiles(coord_data)
2011+
except Exception as e:
2012+
batch_logger.pyramid_fetch_failed(e, job_coord)
2013+
continue
2014+
2015+
for fetch, coord_datum in fetched_coord_data:
20072016
coord = coord_datum['coord']
20082017
nominal_zoom = coord.zoom + cfg.metatile_zoom
20092018
unpadded_bounds = coord_to_mercator_bounds(coord)
2010-
source_rows = fetch(nominal_zoom, unpadded_bounds)
2011-
feature_layers = convert_source_data_to_feature_layers(
2012-
source_rows, layer_data, unpadded_bounds, coord.zoom)
2019+
2020+
try:
2021+
source_rows = fetch(nominal_zoom, unpadded_bounds)
2022+
feature_layers = convert_source_data_to_feature_layers(
2023+
source_rows, layer_data, unpadded_bounds, coord.zoom)
2024+
except Exception as e:
2025+
batch_logger.tile_fetch_failed(e, coord)
2026+
continue
20132027

20142028
cut_coords = [coord]
20152029
if nominal_zoom > coord.zoom:
20162030
cut_coords.extend(coord_children_range(coord, nominal_zoom))
20172031

2018-
formatted_tiles, extra_data = process_coord(
2019-
coord, nominal_zoom, feature_layers, post_process_data,
2020-
formats, unpadded_bounds, cut_coords, cfg.buffer_cfg,
2021-
output_calc_mapping
2022-
)
2032+
try:
2033+
formatted_tiles, extra_data = process_coord(
2034+
coord, nominal_zoom, feature_layers, post_process_data,
2035+
formats, unpadded_bounds, cut_coords, cfg.buffer_cfg,
2036+
output_calc_mapping
2037+
)
2038+
except Exception as e:
2039+
batch_logger.tile_process_failed(e, coord)
2040+
continue
2041+
2042+
try:
2043+
tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
2044+
for tile in tiles:
2045+
store.write_tile(
2046+
tile['tile'], tile['coord'], tile['format'],
2047+
tile['layer'])
2048+
except Exception as e:
2049+
batch_logger.metatile_storage_failed(e, coord)
2050+
continue
20232051

2024-
tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
2025-
for tile in tiles:
2026-
store.write_tile(tile['tile'], tile['coord'], tile['format'],
2027-
tile['layer'])
2052+
batch_logger.tile_processed(coord)
20282053

2029-
# TODO log?
2054+
batch_logger.end_pyramid(job_coord)
20302055

2031-
logger.info('batch process ... done')
2056+
batch_logger.end_run(queue_coord)
20322057

20332058

20342059
def tilequeue_main(argv_args=None):

tilequeue/log.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from enum import Enum
2+
from tilequeue.utils import format_stacktrace_one_line
23
import json
34
import logging
45
import sys
@@ -263,3 +264,57 @@ def unknown_queue_handle_id(self, coord_id, queue_handle_id):
263264

264265
def unknown_coord_id(self, coord_id, queue_handle_id):
265266
self._log('Unknown coord_id', coord_id, queue_handle_id)
267+
268+
269+
class BatchProcessLogger(object):
270+
271+
def __init__(self, logger):
272+
self.logger = logger
273+
274+
def _log(self, msg, coord):
275+
json_obj = dict(
276+
coord=make_coord_dict(coord),
277+
type=log_level_name(LogLevel.INFO),
278+
msg=msg,
279+
)
280+
json_str = json.dumps(json_obj)
281+
self.logger.info(json_str)
282+
283+
def begin_run(self, coord):
284+
self._log('batch process run begin', coord)
285+
286+
def end_run(self, coord):
287+
self._log('batch process run end', coord)
288+
289+
def begin_pyramid(self, coord):
290+
self._log('pyramid begin', coord)
291+
292+
def end_pyramid(self, coord):
293+
self._log('pyramid end', coord)
294+
295+
def tile_processed(self, coord):
296+
self._log('tile processed', coord)
297+
298+
def _log_exception(self, msg, exception, coord):
299+
stacktrace = format_stacktrace_one_line()
300+
json_obj = dict(
301+
coord=make_coord_dict(coord),
302+
type=log_level_name(LogLevel.ERROR),
303+
msg=msg,
304+
exception=str(exception),
305+
stacktrace=stacktrace,
306+
)
307+
json_str = json.dumps(json_obj)
308+
self.logger.error(json_str)
309+
310+
def pyramid_fetch_failed(self, exception, coord):
311+
self._log_exception('pyramid fetch failed', exception, coord)
312+
313+
def tile_fetch_failed(self, exception, coord):
314+
self._log_exception('tile fetch failed', exception, coord)
315+
316+
def tile_process_failed(self, exception, coord):
317+
self._log_exception('tile process failed', exception, coord)
318+
319+
def metatile_storage_failed(self, exception, coord):
320+
self._log_exception('metatile storage failed', exception, coord)

0 commit comments

Comments
 (0)