Skip to content

Commit 3809fb9

Browse files
feature federation ID on all API requests
1 parent 5a440ef commit 3809fb9

5 files changed

Lines changed: 26 additions & 18 deletions

File tree

nebula/addons/reporter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def __init__(self, config, trainer):
5454
self.frequency = self.config.participant["reporter_args"]["report_frequency"]
5555
self.grace_time = self.config.participant["reporter_args"]["grace_time_reporter"]
5656
self.data_queue = asyncio.Queue()
57-
self.url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant['scenario_args']['name']}/update"
57+
self.url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant["scenario_args"]["federation_id"]}/update"
5858
self.counter = 0
5959

6060
self.first_net_metrics = True
@@ -170,7 +170,7 @@ async def report_scenario_finished(self):
170170
might be temporarily overloaded.
171171
- Logs exceptions if the connection attempt to the controller fails.
172172
"""
173-
url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant['scenario_args']['name']}/done"
173+
url = f"http://{self.config.participant['scenario_args']['controller']}/nodes/{self.config.participant["scenario_args"]["federation_id"]}/done"
174174
data = json.dumps({"idx": self.config.participant["device_args"]["idx"],
175175
"deployment": self.config.participant["scenario_args"]["deployment"],
176176
"federation_id": self.config.participant["scenario_args"]["federation_id"]})

nebula/controller/federation/controllers/docker_federation_controller.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ async def update_nodes(self, scenario_name: str, request: Request):
230230
adds_deployed.add(index)
231231
request_body = await request.json()
232232
payload = {"scenario_name": scenario_name, "data": request_body}
233-
asyncio.create_task(self._send_to_hub("update", payload, scenario_name))
233+
asyncio.create_task(self._send_to_hub("update", payload, fed_id))
234234
return {"message": "Node updated successfully in Federation Controller"}
235235
except Exception as e:
236236
self.logger.info(f"ERROR: federation ID: ({fed_id}), {e}")
@@ -246,10 +246,10 @@ async def node_done(self, scenario_name: str, request: Request):
246246
payload = {"federation_id": federation_id, "scenario_name": scenario_name, "data": request_body}
247247
self.logger.info(f"All nodes have finished on federation ID: ({federation_id}), reporting to hub..")
248248
await self._remove_nebula_federation_from_pool(federation_id)
249-
asyncio.create_task(self._send_to_hub("finish", payload, scenario_name))
249+
asyncio.create_task(self._send_to_hub("finish", payload, federation_id))
250250

251251
payload = {"scenario_name": scenario_name, "data": request_body}
252-
asyncio.create_task(self._send_to_hub("done", payload, scenario_name))
252+
asyncio.create_task(self._send_to_hub("done", payload, federation_id))
253253
return {"message": "Nodes done received successfully"}
254254

255255
""" ###############################

nebula/controller/federation/controllers/processes_federation_controller.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ async def update_nodes(self, scenario_name: str, request: Request):
190190
adds_deployed.add(index)
191191
request_body = await request.json()
192192
payload = {"scenario_name": scenario_name, "data": request_body}
193-
asyncio.create_task(self._send_to_hub("update", payload, scenario_name))
193+
asyncio.create_task(self._send_to_hub("update", payload, fed_id))
194194
return {"message": "Node updated successfully in Federation Controller"}
195195
except Exception as e:
196196
self.logger.info(f"ERROR: federation ID: ({fed_id}) not found on pool..")
@@ -206,10 +206,10 @@ async def node_done(self, scenario_name: str, request: Request):
206206
payload = {"federation_id": federation_id, "scenario_name": scenario_name, "data": request_body}
207207
self.logger.info(f"All nodes have finished on federation ID: ({federation_id}), reporting to hub..")
208208
await self._remove_nebula_federation_from_pool(federation_id)
209-
asyncio.create_task(self._send_to_hub("finish", payload, scenario_name))
209+
asyncio.create_task(self._send_to_hub("finish", payload, federation_id))
210210

211211
payload = {"scenario_name": scenario_name, "data": request_body}
212-
asyncio.create_task(self._send_to_hub("done", payload, scenario_name))
212+
asyncio.create_task(self._send_to_hub("done", payload, federation_id))
213213
return {"message": "Nodes done received successfully"}
214214

215215
""" ###############################

nebula/controller/federation/federation_api.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from nebula.utils import LoggerUtils
1111
from nebula.controller.federation.federation_controller import FederationController
1212
from nebula.controller.federation.factory_federation_controller import federation_controller_factory
13-
from nebula.controller.federation.utils_requests import RunScenarioRequest, StopScenarioRequest
13+
from nebula.controller.federation.utils_requests import RunScenarioRequest, StopScenarioRequest, Routes
1414

1515
fed_controllers: Dict[str, FederationController] = {}
1616

@@ -51,7 +51,7 @@ async def read_root():
5151
logger.info("Test curl succesfull")
5252
return {"message": "Welcome to the NEBULA Federation Controller API"}
5353

54-
@app.post("/scenarios/run")
54+
@app.post(Routes.RUN)
5555
async def run_scenario(run_scenario_request: RunScenarioRequest):
5656
global fed_controllers
5757
experiment_type = run_scenario_request.scenario_data["deployment"]
@@ -63,7 +63,7 @@ async def run_scenario(run_scenario_request: RunScenarioRequest):
6363
else:
6464
return {"message": "Experyment type not allowed"}
6565

66-
@app.post("/scenarios/stop")
66+
@app.post(Routes.STOP)
6767
async def stop_scenario(stop_scenario_request: StopScenarioRequest):
6868
global fed_controllers
6969
experiment_type = stop_scenario_request.experiment_type
@@ -75,7 +75,7 @@ async def stop_scenario(stop_scenario_request: StopScenarioRequest):
7575
else:
7676
return {"message": "Experyment type not allowed"}
7777

78-
@app.post("/nodes/{scenario_name}/update")
78+
@app.post(Routes.UPDATE)
7979
async def update_nodes(
8080
scenario_name: Annotated[
8181
str,
@@ -92,7 +92,7 @@ async def update_nodes(
9292
else:
9393
return {"message": "Experyment type not allowed on response for update message.."}
9494

95-
@app.post("/nodes/{scenario_name}/done")
95+
@app.post(Routes.DONE)
9696
async def update_nodes(
9797
scenario_name: Annotated[
9898
str,

nebula/controller/federation/utils_requests.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,27 @@ class StopScenarioRequest(BaseModel):
1010
experiment_type: str
1111
federation_id: str
1212

13+
class Routes:
14+
INIT = "/init"
15+
RUN = "/scenarios/run"
16+
STOP = "/scenarios/stop"
17+
UPDATE = "/nodes/{federation_id}/update"
18+
DONE = "/nodes/{federation_id}/done"
19+
FINISH = "/scenarios/{federation_id}/finish"
20+
1321
def factory_requests_path(resource: str, scenario_name: str = "", federation_id: str = "") -> str:
1422
if resource == "init":
1523
return "/init"
1624
elif resource == "run":
17-
return "/scenarios/run"
25+
return Routes.RUN
1826
elif resource == "stop":
19-
return "/scenarios/stop"
27+
return Routes.STOP
2028
elif resource == "update":
21-
return f"/nodes/{scenario_name}/update"
29+
return Routes.UPDATE.format(federation_id=federation_id)
2230
elif resource == "done":
23-
return f"/nodes/{scenario_name}/done"
31+
return Routes.DONE.format(federation_id=federation_id)
2432
elif resource == "finish":
25-
return f"/scenarios/{federation_id}/finish"
33+
return Routes.FINISH.format(federation_id=federation_id)
2634
else:
2735
raise Exception(f"resource not found: {resource}")
2836

0 commit comments

Comments
 (0)