-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathDataSource.py
More file actions
83 lines (67 loc) · 2.78 KB
/
DataSource.py
File metadata and controls
83 lines (67 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Copyright (C) 2020-2021 OpenBikeSensor Contributors
# Contact: https://openbikesensor.org
#
# This file is part of the OpenBikeSensor Scripts Collection.
#
# The OpenBikeSensor Scripts Collection is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# The OpenBikeSensor Scripts Collection is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
# General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with the OpenBikeSensor Scripts Collection. If not, see
# <http://www.gnu.org/licenses/>.
import numpy as np
import logging
from .TileSource import TileSource
from .WayContainer import WayContainerAABBTree as WayContainer
from .Way import Way
log = logging.getLogger(__name__)
class DataSource:
def __init__(self, cache_dir="cache", tile_zoom=14):
self.nodes = {}
self.ways = {}
self.way_container = WayContainer()
self.loaded_tiles = []
self.tile_source = TileSource()
self.tile_zoom = tile_zoom
self.chunk_size = 100
def ensure_coverage(self, lat, lon, extend=0.0):
tiles = self.tile_source.get_required_tiles(lat, lon, self.tile_zoom, extend=extend)
for tile in tiles:
self.add_tile(tile)
def get_way_by_id(self, way_id):
if way_id and way_id in self.ways:
return self.ways[way_id]
else:
return None
def get_local_map(self):
return self.local_map
def add_tile(self, tile):
# skip if already in tile list
if tile in self.loaded_tiles:
return
# request tile, will be returned as a node-way-relation-split
nodes, ways, relations = self.tile_source.get_tile(tile[0], tile[1], tile[2])
# add nodes
self.nodes.update(nodes)
# add way objects, and store
for way_id, way in ways.items():
if way_id not in self.ways:
w = Way.create(way_id, way, nodes, self.chunk_size)
self.ways.update(w)
for id in w:
self.way_container.insert(w[id])
# update tile list
self.loaded_tiles.append(tile)
def get_map_center(self):
lat = np.mean([node["lat"] for node in self.nodes.values()])
lon = np.mean([node["lon"] for node in self.nodes.values()])
return lat, lon
def find_approximate_near_ways(self, lat_lon, d_max):
return self.way_container.find_near_candidates(lat_lon, d_max=d_max)