-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathtest_simulator.py
More file actions
320 lines (253 loc) · 12.3 KB
/
test_simulator.py
File metadata and controls
320 lines (253 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import json
import random
from datetime import datetime, timedelta
from virtualship.instruments.types import InstrumentType
from virtualship.make_realistic.problems.scenarios import (
GENERAL_PROBLEMS,
GeneralProblem,
InstrumentProblem,
)
from virtualship.make_realistic.problems.simulator import ProblemSimulator
from virtualship.models.expedition import (
Expedition,
InstrumentsConfig,
Schedule,
ShipConfig,
Waypoint,
)
from virtualship.models.location import Location
from virtualship.utils import REPORT
def _make_simple_expedition(
num_waypoints: int = 2, distance_scale: float = 1.0, no_instruments: bool = False
) -> Expedition:
"""Func. rather than fixture to allow for configurability in different tests."""
sample_datetime = datetime(2024, 1, 1, 0, 0, 0)
instruments_non_underway = [inst for inst in InstrumentType if not inst.is_underway]
waypoints = []
for i in range(num_waypoints):
wp = Waypoint(
location=Location(
latitude=0.0 + i * distance_scale, longitude=0.0 + i * distance_scale
),
time=sample_datetime + timedelta(days=i),
instrument=[]
if no_instruments
else random.sample(instruments_non_underway, 3),
)
waypoints.append(wp)
schedule = Schedule(waypoints=waypoints)
instruments = InstrumentsConfig()
ship = ShipConfig(ship_speed_knots=10.0)
return Expedition(
schedule=schedule, instruments_config=instruments, ship_config=ship
)
def test_select_problems_single_waypoint_returns_pre_departure(tmp_path):
expedition = _make_simple_expedition(num_waypoints=1)
instruments_in_expedition = expedition.get_instruments()
simulator = ProblemSimulator(expedition, str(tmp_path))
problems = simulator.select_problems(
instruments_in_expedition, difficulty_level="hard"
)
assert isinstance(problems, dict)
assert len(problems["problem_class"]) == 1
assert problems["waypoint_i"] == [None]
problem_cls = problems["problem_class"][0]
assert isinstance(problem_cls, GeneralProblem)
assert getattr(problem_cls, "pre_departure", False) is True
def test_no_instruments_no_instruments_problems(tmp_path):
expedition = _make_simple_expedition(num_waypoints=2, no_instruments=True)
instruments_in_expedition = expedition.get_instruments()
assert len(instruments_in_expedition) == 0, "Expedition should have no instruments"
simulator = ProblemSimulator(expedition, str(tmp_path))
problems = simulator.select_problems(
instruments_in_expedition, difficulty_level="hard"
)
has_instrument_problems = any(
isinstance(cls, InstrumentProblem) for cls in problems["problem_class"]
)
assert not has_instrument_problems, (
"Should not select instrument problems when no instruments are present"
)
def test_select_problems_difficulty_level_zero():
expedition = _make_simple_expedition(num_waypoints=2)
instruments_in_expedition = expedition.get_instruments()
simulator = ProblemSimulator(expedition, ".")
problems = simulator.select_problems(
instruments_in_expedition, difficulty_level="easy"
)
assert problems is None
def test_cache_and_load_selected_problems_roundtrip(tmp_path):
expedition = _make_simple_expedition(num_waypoints=2)
simulator = ProblemSimulator(expedition, str(tmp_path))
# pick two general problems (registry should contain entries)
problem1 = GENERAL_PROBLEMS[0]
problem2 = GENERAL_PROBLEMS[1] if len(GENERAL_PROBLEMS) > 1 else problem1
problems = {"problem_class": [problem1, problem2], "waypoint_i": [None, 0]}
sel_fpath = tmp_path / "subdir" / "selected_problems.json"
simulator.cache_selected_problems(problems, str(sel_fpath))
assert sel_fpath.exists()
with open(sel_fpath, encoding="utf-8") as f:
data = json.load(f)
assert "problem_class" in data and "waypoint_i" in data
# now load via simulator, verify class names map back to original selected problem classes
loaded = simulator.load_selected_problems(str(sel_fpath))
assert loaded["waypoint_i"] == problems["waypoint_i"]
assert [c.short_name for c in problems["problem_class"]] == [
c.short_name for c in loaded["problem_class"]
]
def test_hash_to_json(tmp_path):
expedition = _make_simple_expedition(num_waypoints=2)
simulator = ProblemSimulator(expedition, str(tmp_path))
any_problem = GENERAL_PROBLEMS[0]
hash_path = tmp_path / "problem_hash.json"
simulator._hash_to_json(
any_problem, "deadbeef", None, hash_path
) # "deadbeef" as sub for hex in test
assert hash_path.exists()
with open(hash_path, encoding="utf-8") as f:
obj = json.load(f)
assert obj["problem_hash"] == "deadbeef"
assert "message" in obj and "delay_duration_hours" in obj
assert obj["resolved"] is False
def test_has_contingency_pre_departure(tmp_path):
expedition = _make_simple_expedition(num_waypoints=2)
simulator = ProblemSimulator(expedition, str(tmp_path))
pre_departure_problem = next(
gp for gp in GENERAL_PROBLEMS if getattr(gp, "pre_departure", False)
)
assert pre_departure_problem is not None, (
"Need at least one pre-departure problem class in the general problem registry"
)
# _has_contingency should return False for pre-departure (waypoint = None)
assert simulator._has_contingency(pre_departure_problem, None) is False
def test_select_problems_difficulty_levels(tmp_path):
expedition = _make_simple_expedition(num_waypoints=3)
instruments_in_expedition = expedition.get_instruments()
simulator = ProblemSimulator(expedition, str(tmp_path))
for level in ["easy", "medium", "hard"]:
problems = simulator.select_problems(
instruments_in_expedition, difficulty_level=level
)
if level == "easy":
assert problems is None
else:
assert isinstance(problems, dict)
assert len(problems["problem_class"]) > 0
assert len(problems["waypoint_i"]) == len(problems["problem_class"])
if level == "medium":
assert len(problems["problem_class"]) <= 2
def test_difficulty_level_hard_more_problems(tmp_path):
difficulty_level = "hard"
short_expedition = _make_simple_expedition(
num_waypoints=2
) # short in terms of number of waypoints
instruments_in_short_expedition = short_expedition.get_instruments()
simulator_short = ProblemSimulator(short_expedition, str(tmp_path))
long_expedition = _make_simple_expedition(num_waypoints=12)
instruments_in_long_expedition = long_expedition.get_instruments()
simulator_long = ProblemSimulator(long_expedition, str(tmp_path))
problems_short = simulator_short.select_problems(
instruments_in_short_expedition, difficulty_level=difficulty_level
)
problems_long = simulator_long.select_problems(
instruments_in_long_expedition, difficulty_level=difficulty_level
)
assert len(problems_long["problem_class"]) >= len(
problems_short["problem_class"]
), (
'Longer expedition should have more problems than shorter one at difficulty_level="hard"'
)
def test_unique_waypoint_assignment(tmp_path):
expedition = _make_simple_expedition(num_waypoints=12)
instruments_in_expedition = expedition.get_instruments()
simulator = ProblemSimulator(expedition, str(tmp_path))
problems = simulator.select_problems(
instruments_in_expedition, difficulty_level="hard"
)
waypoint_indices = problems["waypoint_i"]
# filter None (pre-departure) and check uniqueness of waypoint indices
non_none_indices = [i for i in waypoint_indices if i is not None]
assert len(non_none_indices) == len(set(non_none_indices)), (
"Each problem should be assigned a unique waypoint index (excluding pre-departure problems)"
)
def test_has_contingency_during_expedition(tmp_path):
# expedition with long distance between waypoints
long_wp_expedition = _make_simple_expedition(num_waypoints=2, distance_scale=3.0)
long_simulator = ProblemSimulator(long_wp_expedition, str(tmp_path))
# short distance
short_wp_expedition = _make_simple_expedition(num_waypoints=2, distance_scale=0.01)
short_simulator = ProblemSimulator(short_wp_expedition, str(tmp_path))
# a during-expedition general problem
problem_cls = next(
c for c in GENERAL_PROBLEMS if not getattr(c, "pre_departure", False)
)
assert problem_cls is not None, (
"Need at least one non-pre-departure problem class in the general problem registry"
)
# short distance expedition should have contingency, long distance should not (given time between waypoints and ship speed is constant)
assert short_simulator._has_contingency(problem_cls, problem_waypoint_i=0) is True
assert long_simulator._has_contingency(problem_cls, problem_waypoint_i=0) is False
def test_post_expedition_report(tmp_path):
expedition = _make_simple_expedition(
num_waypoints=12
) # longer expedition to increase likelihood of multiple problems at difficulty_level="hard"
instruments_in_expedition = expedition.get_instruments()
simulator = ProblemSimulator(expedition, str(tmp_path))
problems = simulator.select_problems(
instruments_in_expedition, difficulty_level="hard"
)
report_path = tmp_path / REPORT
simulator.post_expedition_report(problems, report_path)
assert report_path.exists()
with open(report_path, encoding="utf-8") as f:
content = f.read()
assert content.count("Problem:") == len(problems["problem_class"]), (
"Number of reported problems should match number of selected problems."
)
assert content.count("Delay caused:") == len(problems["problem_class"]), (
"Number of reported delay durations should match number of selected problems."
)
for problem in problems["problem_class"]:
assert problem.message in content, (
"Problem messages in report should match those of selected problems."
)
def test_instrument_problems_only_selected_when_instruments_present(tmp_path):
expedition = _make_simple_expedition(num_waypoints=3, no_instruments=True)
instruments_in_expedition = expedition.get_instruments()
assert len(instruments_in_expedition) == 0, "Expedition should have no instruments"
simulator = ProblemSimulator(expedition, str(tmp_path))
problems = simulator.select_problems(
instruments_in_expedition, difficulty_level="hard"
)
has_instrument_problems = any(
isinstance(cls, InstrumentProblem) for cls in problems["problem_class"]
)
assert not has_instrument_problems, (
"Should not select instrument problems when no instruments are present"
)
def test_instrument_not_present_doesnt_select_instrument_problem(tmp_path):
expedition = _make_simple_expedition(num_waypoints=3, no_instruments=True)
# prescribe instruments at waypoints, for this test case each should only be present at one waypoint
expedition.schedule.waypoints[0].instrument = [InstrumentType.CTD]
expedition.schedule.waypoints[1].instrument = [
InstrumentType.ARGO_FLOAT,
InstrumentType.DRIFTER,
]
instruments_in_expedition = expedition.get_instruments()
simulator = ProblemSimulator(expedition, str(tmp_path))
# run many iterations of randomly selecting problems and check that if an instrument problem is selected, the associated instrument is actually present at the selected waypoint
for _ in range(int(1e4)):
problems = simulator.select_problems(
instruments_in_expedition, difficulty_level="hard"
)
for problem, wp_i in zip(
problems["problem_class"], problems["waypoint_i"], strict=False
):
if isinstance(problem, InstrumentProblem):
wp_instruments = expedition.schedule.waypoints[wp_i].instrument
assert problem.instrument_type in wp_instruments, (
"Instrument problem should only be selected if the instrument is present at the selected waypoint"
)
# any incompatible waypoint x instrument problem combinations should have been replaced by a general problem
else:
assert isinstance(problem, GeneralProblem)