Skip to content

Commit 248ed1f

Browse files
update API requests
1 parent 3809fb9 commit 248ed1f

5 files changed

Lines changed: 24 additions & 27 deletions

File tree

nebula/addons/reporter.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def __init__(self, config, trainer):
5454
self.frequency = self.config.participant["reporter_args"]["report_frequency"]
5555
self.grace_time = self.config.participant["reporter_args"]["grace_time_reporter"]
5656
self.data_queue = asyncio.Queue()
57-
self.url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant["scenario_args"]["federation_id"]}/update"
57+
self.url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant['scenario_args']['federation_id']}/update"
5858
self.counter = 0
5959

6060
self.first_net_metrics = True
@@ -170,9 +170,10 @@ async def report_scenario_finished(self):
170170
might be temporarily overloaded.
171171
- Logs exceptions if the connection attempt to the controller fails.
172172
"""
173-
url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant["scenario_args"]["federation_id"]}/done"
173+
url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant['scenario_args']['federation_id']}/done"
174174
data = json.dumps({"idx": self.config.participant["device_args"]["idx"],
175175
"deployment": self.config.participant["scenario_args"]["deployment"],
176+
"name": self.config.participant["scenario_args"]["name"],
176177
"federation_id": self.config.participant["scenario_args"]["federation_id"]})
177178
headers = {
178179
"Content-Type": "application/json",

nebula/controller/federation/controllers/docker_federation_controller.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,9 @@ async def stop_scenario(self, federation_id: str):
200200

201201
return True #TODO care about cases
202202

203-
async def update_nodes(self, scenario_name: str, request: Request):
203+
async def update_nodes(self, federation_id: str, request: Request):
204204
config = await request.json()
205+
scenario_name = config["scenario_args"]["name"]
205206
fed_id = config["scenario_args"]["federation_id"]
206207

207208
try:
@@ -230,26 +231,26 @@ async def update_nodes(self, scenario_name: str, request: Request):
230231
adds_deployed.add(index)
231232
request_body = await request.json()
232233
payload = {"scenario_name": scenario_name, "data": request_body}
233-
asyncio.create_task(self._send_to_hub("update", payload, fed_id))
234+
asyncio.create_task(self._send_to_hub("update", payload, federation_id=fed_id))
234235
return {"message": "Node updated successfully in Federation Controller"}
235236
except Exception as e:
236237
self.logger.info(f"ERROR: federation ID: ({fed_id}), {e}")
237238
return {"message": "Node updated failed in Federation Controller"}
238239

239-
async def node_done(self, scenario_name: str, request: Request):
240+
async def node_done(self, federation_id: str, request: Request):
240241
request_body = await request.json()
241-
federation_id = request_body["federation_id"]
242+
scenario_name = request_body["scenario_args"]["name"]
242243
nebula_federation = self.nfp[federation_id]
243244
self.logger.info(f"Node-Done received from node on federation ID: ({federation_id})")
244245

245246
if await nebula_federation.is_experiment_finish():
246247
payload = {"federation_id": federation_id, "scenario_name": scenario_name, "data": request_body}
247248
self.logger.info(f"All nodes have finished on federation ID: ({federation_id}), reporting to hub..")
248249
await self._remove_nebula_federation_from_pool(federation_id)
249-
asyncio.create_task(self._send_to_hub("finish", payload, federation_id))
250+
asyncio.create_task(self._send_to_hub("finish", payload, federation_id=federation_id))
250251

251252
payload = {"scenario_name": scenario_name, "data": request_body}
252-
asyncio.create_task(self._send_to_hub("done", payload, federation_id))
253+
asyncio.create_task(self._send_to_hub("done", payload, federation_id=federation_id))
253254
return {"message": "Nodes done received successfully"}
254255

255256
""" ###############################
@@ -292,7 +293,7 @@ async def _update_federation_on_pool(self, federation_id: str, user: str, nf: Ne
292293
async def _send_to_hub(self, path, payload, scenario_name="", federation_id="" ):
293294
try:
294295
url_request = self._hub_url + factory_requests_path(path, scenario_name, federation_id)
295-
# self.logger.info(f"Seding to hub, url: {url_request}")
296+
# self.logger.info(f"Sending to hub, url: {url_request}")
296297
# self.logger.info(f"payload sent to hub, data: {payload}")
297298
await APIUtils.post(url_request, payload)
298299
except Exception as e:

nebula/controller/federation/controllers/processes_federation_controller.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,10 @@ async def stop_scenario(self, federation_id: str = ""):
161161
self.logger.exception(f"Error while removing current_scenario_commands.sh file: {e}")
162162
return False
163163

164-
async def update_nodes(self, scenario_name: str, request: Request):
164+
async def update_nodes(self, federation_id: str, request: Request):
165165
config = await request.json()
166166
fed_id = config["scenario_args"]["federation_id"]
167+
scenario_name = config["scenario_args"]["name"]
167168

168169
try:
169170
nebula_federation = self.nfp[fed_id]
@@ -190,26 +191,26 @@ async def update_nodes(self, scenario_name: str, request: Request):
190191
adds_deployed.add(index)
191192
request_body = await request.json()
192193
payload = {"scenario_name": scenario_name, "data": request_body}
193-
asyncio.create_task(self._send_to_hub("update", payload, fed_id))
194+
asyncio.create_task(self._send_to_hub("update", payload, federation_id=fed_id))
194195
return {"message": "Node updated successfully in Federation Controller"}
195196
except Exception as e:
196197
self.logger.info(f"ERROR: federation ID: ({fed_id}) not found on pool..")
197198
return {"message": "Node updated failed in Federation Controller, ID not found.."}
198199

199-
async def node_done(self, scenario_name: str, request: Request):
200+
async def node_done(self, federation_id: str, request: Request):
200201
request_body = await request.json()
201-
federation_id = request_body["federation_id"]
202+
scenario_name = request_body["scenario_args"]["name"]
202203
nebula_federation = self.nfp[federation_id]
203204
self.logger.info(f"Node-Done received from node on federation ID: ({federation_id})")
204205

205206
if await nebula_federation.is_experiment_finish():
206207
payload = {"federation_id": federation_id, "scenario_name": scenario_name, "data": request_body}
207208
self.logger.info(f"All nodes have finished on federation ID: ({federation_id}), reporting to hub..")
208209
await self._remove_nebula_federation_from_pool(federation_id)
209-
asyncio.create_task(self._send_to_hub("finish", payload, federation_id))
210+
asyncio.create_task(self._send_to_hub("finish", payload, federation_id=federation_id))
210211

211212
payload = {"scenario_name": scenario_name, "data": request_body}
212-
asyncio.create_task(self._send_to_hub("done", payload, federation_id))
213+
asyncio.create_task(self._send_to_hub("done", payload, federation_id=federation_id))
213214
return {"message": "Nodes done received successfully"}
214215

215216
""" ###############################

nebula/controller/federation/federation_api.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,35 +77,29 @@ async def stop_scenario(stop_scenario_request: StopScenarioRequest):
7777

7878
@app.post(Routes.UPDATE)
7979
async def update_nodes(
80-
scenario_name: Annotated[
81-
str,
82-
Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid scenario name"),
83-
],
80+
federation_id: str,
8481
request: Request,
8582
):
8683
global fed_controllers
8784
config = await request.json()
8885
experiment_type = config["scenario_args"]["deployment"]
8986
controller = fed_controllers.get(experiment_type, None)
9087
if controller:
91-
return await controller.update_nodes(scenario_name, request)
88+
return await controller.update_nodes(federation_id, request)
9289
else:
9390
return {"message": "Experyment type not allowed on response for update message.."}
9491

9592
@app.post(Routes.DONE)
9693
async def update_nodes(
97-
scenario_name: Annotated[
98-
str,
99-
Path(regex="^[a-zA-Z0-9_-]+$", min_length=1, max_length=50, description="Valid scenario name"),
100-
],
94+
federation_id: str,
10195
request: Request,
10296
):
10397
global fed_controllers
10498
config = await request.json()
10599
experiment_type = config["deployment"]
106100
controller = fed_controllers.get(experiment_type, None)
107101
if controller:
108-
return await controller.node_done(scenario_name, request)
102+
return await controller.node_done(federation_id, request)
109103
else:
110104
return {"message": "Experyment type not allowed on responde for Node done message.."}
111105

nebula/controller/federation/federation_controller.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ async def stop_scenario(self, federation_id: str):
2626
pass
2727

2828
@abstractmethod
29-
async def update_nodes(self, scenario_name: str, request: Request):
29+
async def update_nodes(self, federation_id: str, request: Request):
3030
pass
3131

3232
abstractmethod
33-
async def node_done(self, scenario_name: str, request: Request):
33+
async def node_done(self, federation_id: str, request: Request):
3434
pass

0 commit comments

Comments
 (0)