Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,197 changes: 623 additions & 574 deletions nebula/addons/reputation/reputation.py

Large diffs are not rendered by default.

89 changes: 59 additions & 30 deletions nebula/controller/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
import math
import os
import shutil
import subprocess
import sys
import time
from datetime import datetime
from urllib.parse import quote

from aiohttp import FormData
import docker
import tensorboard_reducer as tbr
from aiohttp import FormData

from nebula.addons.topologymanager import TopologyManager
from nebula.config.config import Config
Expand Down Expand Up @@ -199,30 +196,30 @@ def __init__(
self.mobile_participants_percent = mobile_participants_percent
self.additional_participants = additional_participants
self.with_trustworthiness = with_trustworthiness
self.robustness_pillar = robustness_pillar,
self.resilience_to_attacks = resilience_to_attacks,
self.algorithm_robustness = algorithm_robustness,
self.client_reliability = client_reliability,
self.privacy_pillar = privacy_pillar,
self.technique = technique,
self.uncertainty = uncertainty,
self.indistinguishability = indistinguishability,
self.fairness_pillar = fairness_pillar,
self.selection_fairness = selection_fairness,
self.performance_fairness = performance_fairness,
self.class_distribution = class_distribution,
self.explainability_pillar = explainability_pillar,
self.interpretability = interpretability,
self.post_hoc_methods = post_hoc_methods,
self.accountability_pillar = accountability_pillar,
self.factsheet_completeness = factsheet_completeness,
self.architectural_soundness_pillar = architectural_soundness_pillar,
self.client_management = client_management,
self.optimization = optimization,
self.sustainability_pillar = sustainability_pillar,
self.energy_source = energy_source,
self.hardware_efficiency = hardware_efficiency,
self.federation_complexity = federation_complexity,
self.robustness_pillar = (robustness_pillar,)
self.resilience_to_attacks = (resilience_to_attacks,)
self.algorithm_robustness = (algorithm_robustness,)
self.client_reliability = (client_reliability,)
self.privacy_pillar = (privacy_pillar,)
self.technique = (technique,)
self.uncertainty = (uncertainty,)
self.indistinguishability = (indistinguishability,)
self.fairness_pillar = (fairness_pillar,)
self.selection_fairness = (selection_fairness,)
self.performance_fairness = (performance_fairness,)
self.class_distribution = (class_distribution,)
self.explainability_pillar = (explainability_pillar,)
self.interpretability = (interpretability,)
self.post_hoc_methods = (post_hoc_methods,)
self.accountability_pillar = (accountability_pillar,)
self.factsheet_completeness = (factsheet_completeness,)
self.architectural_soundness_pillar = (architectural_soundness_pillar,)
self.client_management = (client_management,)
self.optimization = (optimization,)
self.sustainability_pillar = (sustainability_pillar,)
self.energy_source = (energy_source,)
self.hardware_efficiency = (hardware_efficiency,)
self.federation_complexity = (federation_complexity,)
self.schema_additional_participants = schema_additional_participants
self.random_topology_probability = random_topology_probability
self.with_sa = with_sa
Expand Down Expand Up @@ -697,8 +694,40 @@ def __init__(self, scenario, user=None):
participant_config["adversarial_args"]["attack_params"] = node_config["attack_params"]
else:
participant_config["adversarial_args"]["attack_params"] = {"attacks": "No Attack"}
participant_config["defense_args"]["reputation"] = self.scenario.reputation

# Defense parameters
participant_config["defense_args"]["reputation"]["with_reputation"] = self.scenario.reputation.get(
"with_reputation", False
)
participant_config["defense_args"]["reputation"]["initial_reputation"] = self.scenario.reputation.get(
"initial_reputation", 0.2
)
metrics_list = self.scenario.reputation.get("reputation_metrics", [])
if isinstance(metrics_list, list):
metrics_dict = {
"model_similarity": "model_similarity" in metrics_list,
"num_messages": "num_messages" in metrics_list,
"model_arrival_latency": "model_arrival_latency" in metrics_list,
"fraction_parameters_changed": "fraction_parameters_changed" in metrics_list,
}
participant_config["defense_args"]["reputation"]["reputation_metrics"] = metrics_dict
else:
participant_config["defense_args"]["reputation"]["reputation_metrics"] = metrics_list
participant_config["defense_args"]["reputation"]["weighting_factor"] = self.scenario.reputation.get(
"weighting_factor", "dynamic"
)
participant_config["defense_args"]["reputation"]["weight_model_arrival_latency"] = (
self.scenario.reputation.get("weight_model_arrival_latency", 1.0)
)
participant_config["defense_args"]["reputation"]["weight_model_similarity"] = (
self.scenario.reputation.get("weight_model_similarity", 1.0)
)
participant_config["defense_args"]["reputation"]["weight_num_messages"] = self.scenario.reputation.get(
"weight_num_messages", 1.0
)
participant_config["defense_args"]["reputation"]["weight_fraction_params_changed"] = (
self.scenario.reputation.get("weight_fraction_params_changed", 1.0)
)
# Mobility and network simulation parameters
participant_config["mobility_args"]["random_geo"] = self.scenario.random_geo
participant_config["mobility_args"]["latitude"] = self.scenario.latitude
participant_config["mobility_args"]["longitude"] = self.scenario.longitude
Expand Down
27 changes: 25 additions & 2 deletions nebula/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def __init__(
else:
self._situational_awareness = None

if self.config.participant["defense_args"]["reputation"]["enabled"]:
if self.config.participant["defense_args"]["reputation"]["with_reputation"]:
self._reputation = Reputation(engine=self, config=self.config)

@property
Expand Down Expand Up @@ -380,6 +380,29 @@ async def _federation_federation_models_included_callback(self, source, message)
finally:
await self.cm.get_connections_lock().release_async()

async def _reputation_share_callback(self, source, message):
try:
logging.info(
f"handle_reputation_message | Trigger | Received reputation message from {source} | Node: {message.node_id} | Score: {message.score} | Round: {message.round}"
)

current_node = self.addr
nei = message.node_id

if hasattr(self, "_reputation") and self._reputation is not None:
if current_node != nei:
key = (current_node, nei, message.round)

if key not in self._reputation.reputation_with_all_feedback:
self._reputation.reputation_with_all_feedback[key] = []

self._reputation.reputation_with_all_feedback[key].append(message.score)
else:
logging.error("Reputation object (_reputation) is not available.")

except Exception as e:
logging.exception(f"Error handling reputation message: {e}")

""" ##############################
# REGISTERING CALLBACKS #
##############################
Expand Down Expand Up @@ -591,7 +614,7 @@ async def deploy_components(self):
await self.aggregator.init()
if "situational_awareness" in self.config.participant:
await self.sa.init()
if self.config.participant["defense_args"]["reputation"]["enabled"]:
if self.config.participant["defense_args"]["reputation"]["with_reputation"]:
await self._reputation.setup()
await self._reporter.start()
await self._addon_manager.deploy_additional_services()
Expand Down
34 changes: 28 additions & 6 deletions nebula/core/nebulaevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class AddonEvent(ABC):
"""
Abstract base class for all addon-related events in the system.
"""

@abstractmethod
async def get_event_data(self):
"""
Expand All @@ -21,7 +21,7 @@ class NodeEvent(ABC):
"""
Abstract base class for all node-related events in the system.
"""

@abstractmethod
async def get_event_data(self):
"""
Expand Down Expand Up @@ -52,7 +52,7 @@ class MessageEvent:
source (str): Address or identifier of the message sender.
message (Any): The actual message payload.
"""

def __init__(self, message_type, source, message):
"""
Initializes a MessageEvent instance.
Expand Down Expand Up @@ -335,6 +335,27 @@ async def is_concurrent(self) -> bool:
return True


class DuplicatedMessageEvent(NodeEvent):
"""
Event triggered when a message is received that has already been processed.

Attributes:
source (str): The address of the node that sent the duplicated message.
"""

def __init__(self, source: str, message_type: str):
self.source = source

def __str__(self):
return f"DuplicatedMessageEvent from {self.source}"

async def get_event_data(self) -> tuple[str]:
return self.source

async def is_concurrent(self) -> bool:
return True


""" ##############################
# ADDON EVENTS #
##############################
Expand All @@ -348,7 +369,7 @@ class GPSEvent(AddonEvent):
Attributes:
distances (dict): A dictionary mapping node addresses to their respective distances.
"""

def __init__(self, distances: dict):
"""
Initializes a GPSEvent.
Expand Down Expand Up @@ -379,7 +400,7 @@ class ChangeLocationEvent(AddonEvent):
latitude (float): New latitude of the node.
longitude (float): New longitude of the node.
"""

def __init__(self, latitude, longitude):
"""
Initializes a ChangeLocationEvent.
Expand All @@ -402,7 +423,8 @@ async def get_event_data(self):
tuple: A tuple containing latitude and longitude.
"""
return (self.latitude, self.longitude)



class TestMetricsEvent(AddonEvent):
def __init__(self, loss, accuracy):
self._loss = loss
Expand Down
6 changes: 4 additions & 2 deletions nebula/core/network/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import requests

from nebula.core.eventmanager import EventManager
from nebula.core.nebulaevents import MessageEvent
from nebula.core.nebulaevents import DuplicatedMessageEvent, MessageEvent
from nebula.core.network.blacklist import BlackList
from nebula.core.network.connection import Connection
from nebula.core.network.discoverer import Discoverer
Expand Down Expand Up @@ -803,7 +803,7 @@ async def deploy_additional_services(self):
await self._forwarder.start()
self._propagator.start()

async def include_received_message_hash(self, hash_message):
async def include_received_message_hash(self, hash_message, source=None):
"""
Adds a received message hash to the tracking list if it hasn't been seen before.

Expand All @@ -819,6 +819,8 @@ async def include_received_message_hash(self, hash_message):
await self.receive_messages_lock.acquire_async()
if hash_message in self.received_messages_hashes:
logging.info("❗️ handle_incoming_message | Ignoring message already received.")
duplicated_event = DuplicatedMessageEvent(source, "Duplicated message received")
asyncio.create_task(EventManager.get_instance().publish_node_event(duplicated_event))
return False
self.received_messages_hashes.append(hash_message)
if len(self.received_messages_hashes) % 10000 == 0:
Expand Down
2 changes: 1 addition & 1 deletion nebula/core/network/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async def process_message(self, data, addr_from):

# Message-specific forwarding and processing
elif message_type in special_processing_messages:
if await self.cm.include_received_message_hash(hashlib.md5(data).hexdigest()):
if await self.cm.include_received_message_hash(hashlib.md5(data).hexdigest(), addr_from):
# Forward the message if required
if self._should_forward_message(message_type, message_wrapper):
await self.cm.forward_message(data, addr_from)
Expand Down
8 changes: 4 additions & 4 deletions nebula/frontend/config/participant.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@
"aggregation_timeout": 60,
"aggregation_push": "slow"
},
"defense_args": {
"defense_args": {
"reputation": {
"enabled": false,
"metrics": {},
"initial_reputation": 0.6,
"with_reputation": false,
"reputation_metrics": [],
"initial_reputation": 0.2,
"weighting_factor": "dynamic"
}
},
Expand Down
Loading
Loading