-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrosetta.py
More file actions
278 lines (249 loc) · 14 KB
/
rosetta.py
File metadata and controls
278 lines (249 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""
Rosetta is PyAerial's module for filtering and saving to the database.
"""
import ast
import logging
import math
import pymongo
from geopy.distance import geodesic
from pymongo.errors import NetworkTimeout, ConnectionFailure, ServerSelectionTimeoutError, AutoReconnect, PyMongoError
from shapely import Polygon, Point
from shapely.ops import nearest_points
import calculations
from constants import *
from helpers import Datum
def filter_packets(packets, method=CONFIG_CAT_SAVE_METHOD_ALL):
"""
Filter packets using one of four methods:
all: return all packets
none: return no packets
decimate: remove all but every nth packet
sdecimate: allow a maximum of x packets every y seconds
:param packets: The packets to filter
:param method: method for filtering
:return: the filtered packets
"""
if method == CONFIG_CAT_SAVE_METHOD_ALL:
return packets
elif method.startswith(CONFIG_CAT_SAVE_METHOD_DECIMATE):
return [p for i, p in enumerate(packets) if
(i % int(method.replace(CONFIG_CAT_SAVE_METHOD_DECIMATE, "")
.replace(' ', "").replace("(", "").replace(")", ""))) == 0]
elif method.startswith(CONFIG_CAT_SAVE_METHOD_SMART_DECIMATE):
arg = tuple([float(i) for i in method
.replace(CONFIG_CAT_SAVE_METHOD_SMART_DECIMATE, "")
.replace("(", "")
.replace(")", "")
.replace(' ', "")
.split(",")])
reset_timestamp = packets[0][1] + arg[1]
return_packets = []
window_population_size = 0
for packet in packets:
if packet[1] < reset_timestamp and window_population_size < arg[0]:
window_population_size += 1
return_packets.append(packet)
if window_population_size >= arg[0] and reset_timestamp < packet[1]:
window_population_size = 0
reset_timestamp = packet[1] + arg[1]
return return_packets
if method == CONFIG_CAT_SAVE_METHOD_NONE:
return []
class Saver:
def __init__(self, log_name: str = "Saver") -> None:
"""
Initialize a Saver.
:param log_name: name of the logger
"""
self.logger = logging.getLogger(name=log_name)
self._cache = {}
def add_plane_to_cache(self, plane_id: str, zone: str, level: str, cache: dict[str, list[Datum]]) -> None:
"""
Adds a plane's data to the cache. Assumes data is valid.
:param plane_id: The ID of the plane
:param zone: The name of the zone
:param level: The level within the zone the plane has sated
:param cache: the plane's data to save
"""
self._cache[(plane_id, zone, level)] = cache
def save(self):
"""
Save the data in self._cache using whatever method was implemented by the child class.
This method is also expected to clear the cache variable
"""
raise NotImplementedError
def cache_flight(self, plane):
"""
Filters packets, adds them to the Saver cache, and requests the Saver to save the information
:param plane: The plane data to parse and save
"""
information = plane[STORE_INFO]
calculated_information = plane[STORE_CALC_DATA]
received_information = plane[STORE_RECV_DATA]
internal_information = plane[STORE_INTERNAL]
first_time = plane[STORE_INTERNAL][STORE_FIRST_PACKET]
last_time = plane[STORE_INTERNAL][STORE_MOST_RECENT_PACKET]
# Ensure importance
if STORE_LAT not in received_information.keys() or STORE_HEADING not in calculated_information.keys():
# Not important enough LMAO
self.logger.getChild("cache_flight").warning(f"Plane {plane[STORE_INFO][STORE_ICAO]}"
f" did not have heading and/or position information,"
f" can't infer importance. Ignoring")
return False
saved = False
for zone in CONFIGURATION[CONFIG_ZONES]:
levels = CONFIGURATION[CONFIG_ZONES][zone][CONFIG_ZONES_LEVELS]
for level in levels:
category = levels[level][CONFIG_ZONES_LEVELS_CATEGORY]
if type(category) is str:
category = CONFIGURATION[CONFIG_CATEGORIES][category]
minimum_eta = math.inf
total_valid_ticks = 0
for time in range(int(first_time + 1), int(last_time) + 1):
latitude_datum = calculations.get_latest(STORE_RECV_DATA, STORE_LAT, plane,
time)
longitude_datum = calculations.get_latest(STORE_RECV_DATA, STORE_LONG, plane,
time)
latest_direction = calculations.get_latest(STORE_CALC_DATA, STORE_HEADING, plane,
time)
latest_speed = calculations.get_latest(STORE_CALC_DATA, STORE_HORIZ_SPEED, plane,
time)
eta = calculations.time_to_enter_geofence([latitude_datum.value, longitude_datum.value],
latest_direction.value,
latest_speed.value,
CONFIGURATION[CONFIG_ZONES][zone][
CONFIG_ZONES_COORDINATES],
100000)
if eta < minimum_eta:
minimum_eta = eta
requirements = levels[level][CONFIG_ZONES_LEVELS_REQUIREMENTS]
component_names = [node.id for node in ast.walk(ast.parse(requirements))
if type(node) is ast.Name]
components = {}
for component_name in component_names:
component_failed = False
component = CONFIGURATION[CONFIG_COMPONENTS][component_name] # The data within the component
for data_type in component.keys(): # For each data type, find the piece of data that's relevant
relevant_data = None
if data_type in [STORE_LAT, STORE_LONG, STORE_ALT, STORE_VERT_SPEED]: # Received data
relevant_data = calculations.get_latest(STORE_RECV_DATA, data_type, plane, time).value
elif data_type in [STORE_HORIZ_SPEED, STORE_HEADING]: # Calculated data
relevant_data = calculations.get_latest(STORE_CALC_DATA, data_type, plane, time).value
elif data_type == ALERT_CAT_ETA: # ETA
relevant_data = eta
elif data_type == STORE_DISTANCE: # Distance
points = nearest_points(Polygon(zone[CONFIG_ZONES_COORDINATES]),
Point([latitude_datum.value, longitude_datum.value]))
relevant_data = geodesic((points[0].latitude, points[0].longitude),
[latitude_datum.value, longitude_datum.value])
if relevant_data is None: # If there isn't valid data, fail the component
component_failed = True
break
for comparison in component[data_type].keys():
# Has our component failed?
if not CONFIG_COMP_FUNCTIONS[comparison](relevant_data,
component[data_type][comparison]):
component_failed = True
break
if component_failed:
break
components[component_name] = not component_failed # Did the component succeed?
if eval(requirements, components): # Evaluate
total_valid_ticks += 1
if total_valid_ticks >= levels[level][CONFIG_ZONES_LEVELS_SECONDS]:
# Should we cache this level of this plane?
all_filtered_information = {STORE_INTERNAL: internal_information, STORE_INFO: information}
for type_of_information in STORE_DATA_TYPES.keys():
all_filtered_information[type_of_information] = {}
configuration_saving_category = STORE_DATA_CONFIG_NAMING[type_of_information]
for subcategory in STORE_DATA_TYPES[type_of_information]:
if subcategory in category[CONFIG_CAT_SAVE][configuration_saving_category].keys():
# saving method exists for this
filtered = filter_packets(plane[type_of_information][subcategory],
category[CONFIG_CAT_SAVE][configuration_saving_category][subcategory]
)
else:
filtered = filter_packets(plane[type_of_information][subcategory],
category[CONFIG_CAT_SAVE][configuration_saving_category][CONFIG_CAT_DEFAULT_SAVE_METHOD])
all_filtered_information[type_of_information][subcategory] = filtered
self.add_plane_to_cache(plane[STORE_INFO][STORE_ICAO], zone, level,
all_filtered_information)
saved = True
return saved
class PrintSaver(Saver):
def __init__(self):
super().__init__(log_name="print")
def save(self):
self.logger.info(f"SAVING: {self._cache}")
self._cache = {}
class MongoSaver(Saver):
def __init__(self, uri):
super().__init__(log_name="mongodb")
self.database: pymongo.MongoClient | None = None
self.uri = uri
self.connect_to_database()
def connect_to_database(self):
"""
Connects to the MongoDB database.
"""
self.database = pymongo.MongoClient(self.uri, serverSelectionTimeoutMS=2000,
connectTimeoutMS=1000, socketTimeoutMS=1000)
try:
# The ismaster command is cheap and does not require auth.
self.database.admin.command('ismaster')
except PyMongoError:
self.logger.error(f"Disconnected from MongoDB! Waiting to reconnect now... (uri={self.uri})")
while True:
try:
self.logger.debug("Attempting to reconnect to MongoDB...")
self.database = pymongo.MongoClient(self.uri, serverSelectionTimeoutMS=2000,
connectTimeoutMS=1000, socketTimeoutMS=1000)
self.database.admin.command('ismaster')
except PyMongoError:
self.logger.warning(f"Failed to reconnect to MongoDB! (uri={self.uri})")
continue
self.logger.info("Successfully reconnected to MongoDB!")
break
def save(self):
"""
Save all the data to MongoDB.
"""
self.logger.info(f"Beginning to save cache of length {len(str(self._cache).encode('utf-8'))} bytes"
f" ({len(self._cache.keys())} flights).")
for flight in self._cache:
icao = flight[0]
zone = flight[1]
level = flight[2]
data = self._cache[flight]
data[STORE_INTERNAL][STORE_PACKET_TYPE] = {str(i): data[STORE_INTERNAL][STORE_PACKET_TYPE][i]
for i in data[STORE_INTERNAL][STORE_PACKET_TYPE]}
database = self.database.get_database(icao.lower()) # Database is plane ID
# Truncate the flight start time for use, so it's cleaner
try:
collection = database.get_collection(str(int(data[STORE_INTERNAL][STORE_FIRST_PACKET]))
+ "-" + zone + "-" + level)
except PyMongoError:
self.connect_to_database()
collection = database.get_collection(str(int(data[STORE_INTERNAL][STORE_FIRST_PACKET]))
+ "-" + zone + "-" + level)
for data_type in [STORE_RECV_DATA, STORE_CALC_DATA]: # Add data to database.
# This is received and calculated data
for item in data[data_type]:
document = {STORAGE_CATEGORY: data_type,
STORAGE_DATA_TYPE: item,
STORAGE_DATA: [[datum.time, datum.value] for datum in data[data_type][item]]}
try:
collection.insert_one(document)
except PyMongoError:
self.connect_to_database()
# Add plane information to database. This is done under the STORE_INFO variable
document = {STORAGE_CATEGORY: STORE_INFO, STORAGE_ZONE: zone, STORAGE_LEVEL: level}
for info_type in [STORE_INFO, STORE_INTERNAL]:
document.update({str(i): data[info_type][i] for i in data[info_type]})
try:
collection.insert_one(document)
except PyMongoError:
self.connect_to_database()
# Reset cache
self.logger.info(f"Now done saving {len(self._cache)} eligible flight-levels.")
self._cache = {}