Skip to content

Commit e9d2760

Browse files
feature node update-done format
1 parent 248ed1f commit e9d2760

6 files changed

Lines changed: 53 additions & 39 deletions

File tree

nebula/addons/reporter.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import os
66
import sys
7+
from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest
78
from typing import TYPE_CHECKING
89

910
import aiohttp
@@ -171,10 +172,17 @@ async def report_scenario_finished(self):
171172
- Logs exceptions if the connection attempt to the controller fails.
172173
"""
173174
url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant['scenario_args']['federation_id']}/done"
174-
data = json.dumps({"idx": self.config.participant["device_args"]["idx"],
175-
"deployment": self.config.participant["scenario_args"]["deployment"],
176-
"name": self.config.participant["scenario_args"]["name"],
177-
"federation_id": self.config.participant["scenario_args"]["federation_id"]})
175+
node_done_req = NodeDoneRequest(idx=self.config.participant["device_args"]["idx"],
176+
deployment=self.config.participant["scenario_args"]["deployment"],
177+
name=self.config.participant["scenario_args"]["name"],
178+
federation_id=self.config.participant["scenario_args"]["federation_id"]
179+
)
180+
payload = node_done_req.model_dump()
181+
data = json.dumps(payload)
182+
# data = json.dumps({"idx": self.config.participant["device_args"]["idx"],
183+
# "deployment": self.config.participant["scenario_args"]["deployment"],
184+
# "name": self.config.participant["scenario_args"]["name"],
185+
# "federation_id": self.config.participant["scenario_args"]["federation_id"]})
178186
headers = {
179187
"Content-Type": "application/json",
180188
"User-Agent": f"NEBULA Participant {self.config.participant['device_args']['idx']}",
@@ -266,11 +274,13 @@ async def __report_status_to_controller(self):
266274
- Delays for 5 seconds upon general exceptions to avoid rapid retry loops.
267275
"""
268276
try:
277+
node_updt_req = NodeUpdateRequest(config=self.config.participant)
278+
payload = node_updt_req.model_dump()
269279
async with (
270280
aiohttp.ClientSession() as session,
271281
session.post(
272282
self.url,
273-
data=json.dumps(self.config.participant),
283+
data=json.dumps(payload),
274284
headers={
275285
"Content-Type": "application/json",
276286
"User-Agent": f"NEBULA Participant {self.config.participant['device_args']['idx']}",

nebula/controller/federation/controllers/docker_federation_controller.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from nebula.controller.federation.federation_controller import FederationController
99
from nebula.controller.federation.scenario_builder import ScenarioBuilder
1010
from nebula.controller.federation.utils_requests import factory_requests_path
11+
from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest
1112
from typing import Dict
1213
from fastapi import Request
1314
from nebula.config.config import Config
@@ -200,8 +201,8 @@ async def stop_scenario(self, federation_id: str):
200201

201202
return True #TODO care about cases
202203

203-
async def update_nodes(self, federation_id: str, request: Request):
204-
config = await request.json()
204+
async def update_nodes(self, federation_id: str, node_update_request: NodeUpdateRequest):
205+
config = node_update_request.config
205206
scenario_name = config["scenario_args"]["name"]
206207
fed_id = config["scenario_args"]["federation_id"]
207208

@@ -229,27 +230,24 @@ async def update_nodes(self, federation_id: str, request: Request):
229230
nebula_federation.last_index_deployed += 1
230231
#additionals.remove(index)
231232
adds_deployed.add(index)
232-
request_body = await request.json()
233-
payload = {"scenario_name": scenario_name, "data": request_body}
233+
payload = node_update_request.model_dump()
234234
asyncio.create_task(self._send_to_hub("update", payload, federation_id=fed_id))
235235
return {"message": "Node updated successfully in Federation Controller"}
236236
except Exception as e:
237237
self.logger.info(f"ERROR: federation ID: ({fed_id}), {e}")
238238
return {"message": "Node updated failed in Federation Controller"}
239239

240-
async def node_done(self, federation_id: str, request: Request):
241-
request_body = await request.json()
242-
scenario_name = request_body["scenario_args"]["name"]
240+
async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest):
243241
nebula_federation = self.nfp[federation_id]
244242
self.logger.info(f"Node-Done received from node on federation ID: ({federation_id})")
245243

246244
if await nebula_federation.is_experiment_finish():
247-
payload = {"federation_id": federation_id, "scenario_name": scenario_name, "data": request_body}
245+
payload = node_done_request.model_dump()
248246
self.logger.info(f"All nodes have finished on federation ID: ({federation_id}), reporting to hub..")
249247
await self._remove_nebula_federation_from_pool(federation_id)
250248
asyncio.create_task(self._send_to_hub("finish", payload, federation_id=federation_id))
251249

252-
payload = {"scenario_name": scenario_name, "data": request_body}
250+
payload = node_done_request.model_dump()
253251
asyncio.create_task(self._send_to_hub("done", payload, federation_id=federation_id))
254252
return {"message": "Nodes done received successfully"}
255253

nebula/controller/federation/controllers/processes_federation_controller.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from nebula.controller.federation.federation_controller import FederationController
99
from nebula.controller.federation.scenario_builder import ScenarioBuilder
1010
from nebula.controller.federation.utils_requests import factory_requests_path
11+
from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest
1112
from typing import Dict
1213
from fastapi import Request
1314
from nebula.config.config import Config
@@ -161,8 +162,8 @@ async def stop_scenario(self, federation_id: str = ""):
161162
self.logger.exception(f"Error while removing current_scenario_commands.sh file: {e}")
162163
return False
163164

164-
async def update_nodes(self, federation_id: str, request: Request):
165-
config = await request.json()
165+
async def update_nodes(self, federation_id: str, node_update_request: NodeUpdateRequest):
166+
config = node_update_request.config
166167
fed_id = config["scenario_args"]["federation_id"]
167168
scenario_name = config["scenario_args"]["name"]
168169

@@ -189,27 +190,24 @@ async def update_nodes(self, federation_id: str, request: Request):
189190
nebula_federation.last_index_deployed += 1
190191
additionals.remove(index)
191192
adds_deployed.add(index)
192-
request_body = await request.json()
193-
payload = {"scenario_name": scenario_name, "data": request_body}
193+
payload = node_update_request.model_dump()
194194
asyncio.create_task(self._send_to_hub("update", payload, federation_id=fed_id))
195195
return {"message": "Node updated successfully in Federation Controller"}
196196
except Exception as e:
197197
self.logger.info(f"ERROR: federation ID: ({fed_id}) not found on pool..")
198198
return {"message": "Node updated failed in Federation Controller, ID not found.."}
199199

200-
async def node_done(self, federation_id: str, request: Request):
201-
request_body = await request.json()
202-
scenario_name = request_body["scenario_args"]["name"]
200+
async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest):
203201
nebula_federation = self.nfp[federation_id]
204202
self.logger.info(f"Node-Done received from node on federation ID: ({federation_id})")
205203

206204
if await nebula_federation.is_experiment_finish():
207-
payload = {"federation_id": federation_id, "scenario_name": scenario_name, "data": request_body}
205+
payload = node_done_request.model_dump()
208206
self.logger.info(f"All nodes have finished on federation ID: ({federation_id}), reporting to hub..")
209207
await self._remove_nebula_federation_from_pool(federation_id)
210208
asyncio.create_task(self._send_to_hub("finish", payload, federation_id=federation_id))
211209

212-
payload = {"scenario_name": scenario_name, "data": request_body}
210+
payload = node_done_request.model_dump()
213211
asyncio.create_task(self._send_to_hub("done", payload, federation_id=federation_id))
214212
return {"message": "Nodes done received successfully"}
215213

nebula/controller/federation/federation_api.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from nebula.utils import LoggerUtils
1111
from nebula.controller.federation.federation_controller import FederationController
1212
from nebula.controller.federation.factory_federation_controller import federation_controller_factory
13-
from nebula.controller.federation.utils_requests import RunScenarioRequest, StopScenarioRequest, Routes
13+
from nebula.controller.federation.utils_requests import RunScenarioRequest, StopScenarioRequest, NodeUpdateRequest, NodeDoneRequest, Routes
1414

1515
fed_controllers: Dict[str, FederationController] = {}
1616

@@ -61,7 +61,7 @@ async def run_scenario(run_scenario_request: RunScenarioRequest):
6161
if controller:
6262
return await controller.run_scenario(run_scenario_request.federation_id, run_scenario_request.scenario_data, run_scenario_request.user)
6363
else:
64-
return {"message": "Experyment type not allowed"}
64+
return {"message": "Experiment type not allowed"}
6565

6666
@app.post(Routes.STOP)
6767
async def stop_scenario(stop_scenario_request: StopScenarioRequest):
@@ -73,35 +73,33 @@ async def stop_scenario(stop_scenario_request: StopScenarioRequest):
7373
if controller:
7474
return await controller.stop_scenario(stop_scenario_request.federation_id)
7575
else:
76-
return {"message": "Experyment type not allowed"}
76+
return {"message": "Experiment type not allowed"}
7777

7878
@app.post(Routes.UPDATE)
7979
async def update_nodes(
8080
federation_id: str,
81-
request: Request,
81+
node_update_request: NodeUpdateRequest,
8282
):
8383
global fed_controllers
84-
config = await request.json()
85-
experiment_type = config["scenario_args"]["deployment"]
84+
experiment_type = node_update_request.config["scenario_args"]["deployment"]
8685
controller = fed_controllers.get(experiment_type, None)
8786
if controller:
88-
return await controller.update_nodes(federation_id, request)
87+
return await controller.update_nodes(federation_id, node_update_request)
8988
else:
90-
return {"message": "Experyment type not allowed on response for update message.."}
89+
return {"message": "Experiment type not allowed on response for update message.."}
9190

9291
@app.post(Routes.DONE)
93-
async def update_nodes(
92+
async def node_done(
9493
federation_id: str,
95-
request: Request,
94+
node_done_request: NodeDoneRequest,
9695
):
9796
global fed_controllers
98-
config = await request.json()
99-
experiment_type = config["deployment"]
97+
experiment_type = node_done_request.deployment
10098
controller = fed_controllers.get(experiment_type, None)
10199
if controller:
102-
return await controller.node_done(federation_id, request)
100+
return await controller.node_done(federation_id, node_done_request)
103101
else:
104-
return {"message": "Experyment type not allowed on responde for Node done message.."}
102+
return {"message": "Experiment type not allowed on responde for Node done message.."}
105103

106104
if __name__ == "__main__":
107105
# Parse args from command line

nebula/controller/federation/federation_controller.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from fastapi import Request
33
from typing import Dict
44
from nebula.controller.federation.scenario_builder import ScenarioBuilder
5+
from nebula.controller.federation.utils_requests import NodeUpdateRequest, NodeDoneRequest
56
import logging
67

78
class NebulaFederation(ABC):
@@ -26,9 +27,9 @@ async def stop_scenario(self, federation_id: str):
2627
pass
2728

2829
@abstractmethod
29-
async def update_nodes(self, federation_id: str, request: Request):
30+
async def update_nodes(self, federation_id: str, node_update_request: NodeUpdateRequest):
3031
pass
3132

3233
abstractmethod
33-
async def node_done(self, federation_id: str, request: Request):
34+
async def node_done(self, federation_id: str, node_done_request: NodeDoneRequest):
3435
pass

nebula/controller/federation/utils_requests.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@ class StopScenarioRequest(BaseModel):
1010
experiment_type: str
1111
federation_id: str
1212

13+
class NodeUpdateRequest(BaseModel):
14+
config: Dict[str, Any] = {}
15+
16+
class NodeDoneRequest(BaseModel):
17+
idx: int
18+
deployment: str
19+
name: str
20+
federation_id: str
21+
1322
class Routes:
1423
INIT = "/init"
1524
RUN = "/scenarios/run"

0 commit comments

Comments
 (0)