-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathrocket.py
More file actions
122 lines (102 loc) · 3.59 KB
/
Copy pathrocket.py
File metadata and controls
122 lines (102 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from fastapi import HTTPException, status
from src.controllers.interface import (
ControllerBase,
controller_exception_handler,
)
from src.views.rocket import (
RocketSimulation,
RocketCreated,
RocketDrawingGeometry,
)
from src.models.motor import MotorModel
from src.models.rocket import (
RocketModel,
RocketWithMotorReferenceRequest,
)
from src.repositories.interface import RepositoryInterface
from src.services.rocket import RocketService
class RocketController(ControllerBase):
"""
Controller for the Rocket model.
Enables:
- Simulation of a RocketPy Rocket.
- CRUD for Rocket BaseApiModel.
"""
def __init__(self):
super().__init__(models=[RocketModel])
async def _load_motor(self, motor_id: str) -> MotorModel:
repo_cls = RepositoryInterface.get_model_repo(MotorModel)
async with repo_cls() as repo:
motor = await repo.read_motor_by_id(motor_id)
if motor is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Motor not found",
)
return motor
@controller_exception_handler
async def create_rocket_from_motor_reference(
self, payload: RocketWithMotorReferenceRequest
) -> RocketCreated:
motor = await self._load_motor(payload.motor_id)
rocket_model = payload.rocket.assemble(motor)
return await self.post_rocket(rocket_model)
@controller_exception_handler
async def update_rocket_from_motor_reference(
self,
rocket_id: str,
payload: RocketWithMotorReferenceRequest,
) -> None:
motor = await self._load_motor(payload.motor_id)
rocket_model = payload.rocket.assemble(motor)
rocket_model.set_id(rocket_id)
await self.put_rocket_by_id(rocket_id, rocket_model)
return
@controller_exception_handler
async def get_rocketpy_rocket_binary(self, rocket_id: str) -> bytes:
"""
Get a rocketpy.Rocket object as dill binary.
Args:
rocket_id: str
Returns:
bytes
Raises:
HTTP 404 Not Found: If the rocket is not found in the database.
"""
rocket = await self.get_rocket_by_id(rocket_id)
rocket_service = RocketService.from_rocket_model(rocket.rocket)
return rocket_service.get_rocket_binary()
@controller_exception_handler
async def get_rocket_drawing_geometry(
self, rocket_id: str
) -> RocketDrawingGeometry:
"""
Build the drawing geometry payload for a persisted rocket.
Args:
rocket_id: str
Returns:
views.RocketDrawingGeometry
Raises:
HTTP 404 Not Found: If the rocket does not exist in the database.
HTTP 422: If the rocket has no aerodynamic surfaces to draw.
"""
rocket = await self.get_rocket_by_id(rocket_id)
rocket_service = RocketService.from_rocket_model(rocket.rocket)
return rocket_service.get_drawing_geometry()
@controller_exception_handler
async def get_rocket_simulation(
self,
rocket_id: str,
) -> RocketSimulation:
"""
Simulate a rocketpy rocket.
Args:
rocket_id: str
Returns:
views.RocketSimulation
Raises:
HTTP 404 Not Found: If the rocket does not exist in the database.
"""
rocket = await self.get_rocket_by_id(rocket_id)
rocket_service = RocketService.from_rocket_model(rocket.rocket)
return rocket_service.get_rocket_simulation()