Skip to content

Commit 6b20b3d

Browse files
committed
Add logging to batch processing
Additionally, tiles that error out will be skipped and the run will proceed rather than terminating on the first error.
1 parent 8a62ed7 commit 6b20b3d

2 files changed

Lines changed: 104 additions & 24 deletions

File tree

tilequeue/command.py

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -399,11 +399,12 @@ def make_seed_tile_generator(cfg):
399399
return tile_generator
400400

401401

402-
def _make_store(cfg):
402+
def _make_store(cfg, logger=None):
403403
store_cfg = cfg.yml.get('store')
404404
assert store_cfg, "Store was not configured, but is necessary."
405405
credentials = cfg.subtree('aws credentials')
406-
logger = make_logger(cfg, 'process')
406+
if logger is None:
407+
logger = make_logger(cfg, 'process')
407408
store = make_store(store_cfg, credentials=credentials, logger=logger)
408409
return store
409410

@@ -1931,15 +1932,13 @@ def tilequeue_batch_enqueue(cfg, peripherals):
19311932

19321933

19331934
def tilequeue_batch_process(cfg, args):
1935+
from tilequeue.log import BatchProcessLogger
19341936
from tilequeue.metatile import make_metatiles
19351937

19361938
logger = make_logger(cfg, 'batch_process')
1939+
batch_logger = BatchProcessLogger(logger)
19371940

1938-
# TODO log json
1939-
1940-
store = _make_store(cfg)
1941-
1942-
logger.info('batch process ... start')
1941+
store = _make_store(cfg, logger)
19431942

19441943
coord_str = args.tile
19451944

@@ -1956,8 +1955,6 @@ def tilequeue_batch_process(cfg, args):
19561955

19571956
assert queue_coord.zoom == queue_zoom, 'Unexpected zoom: %s' % coord_str
19581957

1959-
logger.info('batch process: %s' % coord_str)
1960-
19611958
# TODO generalize and move to tile.py?
19621959
def find_job_coords_for(coord, target_zoom):
19631960
xmin = coord.column
@@ -1997,38 +1994,66 @@ def find_job_coords_for(coord, target_zoom):
19971994
assert zoom_stop > group_by_zoom
19981995
formats = lookup_formats(cfg.output_formats)
19991996

1997+
batch_logger.begin_run(queue_coord)
1998+
20001999
job_coords = find_job_coords_for(queue_coord, group_by_zoom)
20012000
for job_coord in job_coords:
2001+
2002+
batch_logger.begin_pyramid(job_coord)
2003+
20022004
# each coord here is the unit of work now
20032005
pyramid_coords = [job_coord]
20042006
pyramid_coords.extend(coord_children_range(job_coord, zoom_stop))
20052007
coord_data = [dict(coord=x) for x in pyramid_coords]
2006-
for fetch, coord_datum in data_fetcher.fetch_tiles(coord_data):
2008+
2009+
try:
2010+
fetched_coord_data = data_fetcher.fetch_tiles(coord_data)
2011+
except Exception as e:
2012+
batch_logger.pyramid_fetch_failed(e, job_coord)
2013+
continue
2014+
2015+
for fetch, coord_datum in fetched_coord_data:
20072016
coord = coord_datum['coord']
20082017
nominal_zoom = coord.zoom + cfg.metatile_zoom
20092018
unpadded_bounds = coord_to_mercator_bounds(coord)
2010-
source_rows = fetch(nominal_zoom, unpadded_bounds)
2011-
feature_layers = convert_source_data_to_feature_layers(
2012-
source_rows, layer_data, unpadded_bounds, coord.zoom)
2019+
2020+
try:
2021+
source_rows = fetch(nominal_zoom, unpadded_bounds)
2022+
feature_layers = convert_source_data_to_feature_layers(
2023+
source_rows, layer_data, unpadded_bounds, coord.zoom)
2024+
except Exception as e:
2025+
batch_logger.tile_fetch_failed(e, coord)
2026+
continue
20132027

20142028
cut_coords = [coord]
20152029
if nominal_zoom > coord.zoom:
20162030
cut_coords.extend(coord_children_range(coord, nominal_zoom))
20172031

2018-
formatted_tiles, extra_data = process_coord(
2019-
coord, nominal_zoom, feature_layers, post_process_data,
2020-
formats, unpadded_bounds, cut_coords, cfg.buffer_cfg,
2021-
output_calc_mapping
2022-
)
2032+
try:
2033+
formatted_tiles, extra_data = process_coord(
2034+
coord, nominal_zoom, feature_layers, post_process_data,
2035+
formats, unpadded_bounds, cut_coords, cfg.buffer_cfg,
2036+
output_calc_mapping
2037+
)
2038+
except Exception as e:
2039+
batch_logger.tile_process_failed(e, coord)
2040+
continue
2041+
2042+
try:
2043+
tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
2044+
for tile in tiles:
2045+
store.write_tile(
2046+
tile['tile'], tile['coord'], tile['format'],
2047+
tile['layer'])
2048+
except Exception as e:
2049+
batch_logger.metatile_storage_failed(e, coord)
2050+
continue
20232051

2024-
tiles = make_metatiles(cfg.metatile_size, formatted_tiles)
2025-
for tile in tiles:
2026-
store.write_tile(tile['tile'], tile['coord'], tile['format'],
2027-
tile['layer'])
2052+
batch_logger.tile_processed(coord)
20282053

2029-
# TODO log?
2054+
batch_logger.end_pyramid(job_coord)
20302055

2031-
logger.info('batch process ... done')
2056+
batch_logger.end_run(queue_coord)
20322057

20332058

20342059
def tilequeue_main(argv_args=None):

tilequeue/log.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from enum import Enum
2+
from tilequeue.utils import format_stacktrace_one_line
23
import json
34
import logging
45
import sys
@@ -263,3 +264,57 @@ def unknown_queue_handle_id(self, coord_id, queue_handle_id):
263264

264265
def unknown_coord_id(self, coord_id, queue_handle_id):
265266
self._log('Unknown coord_id', coord_id, queue_handle_id)
267+
268+
269+
class BatchProcessLogger(object):
270+
271+
def __init__(self, logger):
272+
self.logger = logger
273+
274+
def _log(self, msg, coord):
275+
json_obj = dict(
276+
coord=make_coord_dict(coord),
277+
type=log_level_name(LogLevel.INFO),
278+
msg=msg,
279+
)
280+
json_str = json.dumps(json_obj)
281+
self.logger.info(json_str)
282+
283+
def begin_run(self, coord):
284+
self._log('batch process run begin', coord)
285+
286+
def end_run(self, coord):
287+
self._log('batch process run end', coord)
288+
289+
def begin_pyramid(self, coord):
290+
self._log('pyramid begin', coord)
291+
292+
def end_pyramid(self, coord):
293+
self._log('pyramid end', coord)
294+
295+
def tile_processed(self, coord):
296+
self._log('tile processed', coord)
297+
298+
def _log_exception(self, msg, exception, coord):
299+
stacktrace = format_stacktrace_one_line()
300+
json_obj = dict(
301+
coord=make_coord_dict(coord),
302+
type=log_level_name(LogLevel.ERROR),
303+
msg=msg,
304+
exception=str(exception),
305+
stacktrace=stacktrace,
306+
)
307+
json_str = json.dumps(json_obj)
308+
self.logger.error(json_str)
309+
310+
def pyramid_fetch_failed(self, exception, coord):
311+
self._log_exception('pyramid fetch failed', exception, coord)
312+
313+
def tile_fetch_failed(self, exception, coord):
314+
self._log_exception('tile fetch failed', exception, coord)
315+
316+
def tile_process_failed(self, exception, coord):
317+
self._log_exception('tile process failed', exception, coord)
318+
319+
def metatile_storage_failed(self, exception, coord):
320+
self._log_exception('metatile storage failed', exception, coord)

0 commit comments

Comments
 (0)