diff --git a/nebula/addons/reputation/reputation.py b/nebula/addons/reputation/reputation.py index 6a9ccc0b4..509f48cb6 100644 --- a/nebula/addons/reputation/reputation.py +++ b/nebula/addons/reputation/reputation.py @@ -9,7 +9,7 @@ from nebula.addons.functions import print_msg_box from nebula.core.eventmanager import EventManager -from nebula.core.nebulaevents import AggregationEvent, RoundStartEvent, UpdateReceivedEvent +from nebula.core.nebulaevents import AggregationEvent, DuplicatedMessageEvent, RoundStartEvent, UpdateReceivedEvent from nebula.core.utils.helper import ( cosine_metric, euclidean_metric, @@ -33,26 +33,13 @@ def __init__( threshold=None, latency=None, ): - """ - Initialize a Metrics instance to store various evaluation metrics for a participant. - - Args: - num_round (optional): The current round number. - current_round (optional): The round when the metric is measured. - fraction_changed (optional): Fraction of parameters changed. - threshold (optional): Threshold used for evaluating changes. - latency (optional): Latency value for model arrival. - """ self.fraction_of_params_changed = { "fraction_changed": fraction_changed, "threshold": threshold, - "round": num_round, + "current_round": num_round, } - self.model_arrival_latency = {"latency": latency, "round": num_round, "round_received": current_round} - self.messages = [] - self.similarity = [] @@ -65,13 +52,6 @@ class Reputation: """ def __init__(self, engine: "Engine", config: "Config"): - """ - Initialize the Reputation system. - - Args: - engine (Engine): The engine instance providing the runtime context. - config (Config): The configuration object with participant settings. - """ self._engine = engine self._config = config self.fraction_of_params_changed = {} @@ -92,56 +72,54 @@ def __init__(self, engine: "Engine", config: "Config"): self.previous_std_dev_number_message = {} self.messages_model_arrival_latency = {} self.model_arrival_latency_history = {} - self.previous_percentile_25_number_message = {} - self.previous_percentile_85_number_message = {} self._addr = engine.addr self._log_dir = engine.log_dir - self._idx = engine.idx self.connection_metrics = {} neighbors: str = self._config.participant["network_args"]["neighbors"] for nei in neighbors.split(): self.connection_metrics[f"{nei}"] = Metrics() - reputation_config = self._config.participant["defense_args"]["reputation"] - self._enabled = reputation_config["enabled"] - self._metrics = reputation_config["metrics"] - self._initial_reputation = float(reputation_config["initial_reputation"]) - self._weighting_factor = reputation_config["weighting_factor"] - - # Extract weights from metrics if using static weighting - if self._weighting_factor == "static": + self._with_reputation = self._config.participant["defense_args"]["reputation"]["with_reputation"] + self._metrics = self._config.participant["defense_args"]["reputation"]["reputation_metrics"] + self._enabled = self._with_reputation and self._metrics + if isinstance(self._metrics, list): + expected_metrics = [ + "model_similarity", + "num_messages", + "model_arrival_latency", + "fraction_parameters_changed", + ] + self._metrics = {key: key in self._metrics for key in expected_metrics} + self._initial_reputation = float(self._config.participant["defense_args"]["reputation"]["initial_reputation"]) + self._weighting_factor = self._config.participant["defense_args"]["reputation"]["weighting_factor"] + if str(self._weighting_factor).lower() != "dynamic": self._weight_model_arrival_latency = float( - self._metrics.get("model_arrival_latency", {}).get("weight", 0.25) + self._config.participant["defense_args"]["reputation"]["weight_model_arrival_latency"] + ) + self._weight_model_similarity = float( + self._config.participant["defense_args"]["reputation"]["weight_model_similarity"] + ) + self._weight_num_messages = float( + self._config.participant["defense_args"]["reputation"]["weight_num_messages"] ) - self._weight_model_similarity = float(self._metrics.get("model_similarity", {}).get("weight", 0.25)) - self._weight_num_messages = float(self._metrics.get("num_messages", {}).get("weight", 0.25)) self._weight_fraction_params_changed = float( - self._metrics.get("fraction_parameters_changed", {}).get("weight", 0.25) + self._config.participant["defense_args"]["reputation"]["weight_fraction_params_changed"] ) - else: - self._metrics["model_arrival_latency"]["weight"] = 0.25 - self._metrics["model_similarity"]["weight"] = 0.25 - self._metrics["num_messages"]["weight"] = 0.25 - self._metrics["fraction_parameters_changed"]["weight"] = 0.25 - self._weight_model_arrival_latency = 0.25 - self._weight_model_similarity = 0.25 - self._weight_num_messages = 0.25 - self._weight_fraction_params_changed = 0.25 - - msg = f"Reputation system: {self._enabled}" + + msg = f"Reputation system: {self._with_reputation}" msg += f"\nReputation metrics: {self._metrics}" msg += f"\nInitial reputation: {self._initial_reputation}" msg += f"\nWeighting factor: {self._weighting_factor}" - msg += f"\nWeight model arrival latency: {self._weight_model_arrival_latency}" - msg += f"\nWeight model similarity: {self._weight_model_similarity}" - msg += f"\nWeight number of messages: {self._weight_num_messages}" - msg += f"\nWeight fraction of parameters changed: {self._weight_fraction_params_changed}" + if str(self._weighting_factor).lower() != "dynamic": + msg += f"\nWeight model arrival latency: {self._weight_model_arrival_latency}" + msg += f"\nWeight model similarity: {self._weight_model_similarity}" + msg += f"\nWeight number of messages: {self._weight_num_messages}" + msg += f"\nWeight fraction of parameters changed: {self._weight_fraction_params_changed}" print_msg_box(msg=msg, indent=2, title="Defense information") @property def engine(self): - """Return the engine instance.""" return self._engine def save_data( @@ -153,35 +131,19 @@ def save_data( time=None, current_round=None, fraction_changed=None, - total_params=None, - changed_params=None, threshold=None, - changes_record=None, latency=None, ): """ - Save data received from nodes for further reputation calculations. - - Args: - type_data (str): The type of data being saved ("number_message", "fraction_of_params_changed", or "model_arrival_latency"). - nei (str): The neighbor node address. - addr (str): The current node address. - num_round (optional): The round number associated with the data. - time (optional): Timestamp or time value. - current_round (optional): The current round number. - fraction_changed (optional): Fraction of parameters changed. - total_params (optional): Total number of parameters. - changed_params (optional): Number of changed parameters. - threshold (optional): Threshold used for metrics. - changes_record (optional): Record of parameter changes. - latency (optional): Latency value. + Save data between nodes and aggregated models. """ + try: + combined_data = {} + if addr == nei: return - combined_data = {} - if type_data == "number_message": combined_data["number_message"] = { "time": time, @@ -191,7 +153,7 @@ def save_data( combined_data["fraction_of_params_changed"] = { "fraction_changed": fraction_changed, "threshold": threshold, - "round": num_round, + "current_round": current_round, } elif type_data == "model_arrival_latency": combined_data["model_arrival_latency"] = { @@ -220,37 +182,32 @@ async def setup(self): if self._enabled: await EventManager.get_instance().subscribe_node_event(RoundStartEvent, self.on_round_start) await EventManager.get_instance().subscribe_node_event(AggregationEvent, self.calculate_reputation) - if self._metrics.get("model_similarity", {}).get("enabled", False): + if self._metrics.get("model_similarity", False): await EventManager.get_instance().subscribe_node_event(UpdateReceivedEvent, self.recollect_similarity) - if self._metrics.get("fraction_parameters_changed", {}).get("enabled", False): + if self._metrics.get("fraction_parameters_changed", False): await EventManager.get_instance().subscribe_node_event( UpdateReceivedEvent, self.recollect_fraction_of_parameters_changed ) - if self._metrics.get("num_messages", {}).get("enabled", False): + if self._metrics.get("model_arrival_latency", False): + await EventManager.get_instance().subscribe_node_event( + UpdateReceivedEvent, self.recollect_model_arrival_latency + ) + if self._metrics.get("num_messages", False): await EventManager.get_instance().subscribe(("model", "update"), self.recollect_number_message) await EventManager.get_instance().subscribe(("model", "initialization"), self.recollect_number_message) await EventManager.get_instance().subscribe(("control", "alive"), self.recollect_number_message) await EventManager.get_instance().subscribe( ("federation", "federation_models_included"), self.recollect_number_message ) - await EventManager.get_instance().subscribe(("reputation", "share"), self.recollect_number_message) - if self._metrics.get("model_arrival_latency", {}).get("enabled", False): await EventManager.get_instance().subscribe_node_event( - UpdateReceivedEvent, self.recollect_model_arrival_latency + DuplicatedMessageEvent, self.recollect_duplicated_number_message ) def init_reputation( self, addr, federation_nodes=None, round_num=None, last_feedback_round=None, init_reputation=None ): """ - Initialize the reputation for each federation node. - - Args: - addr (str): The address of the current node. - federation_nodes (list): List of federation nodes' addresses. - round_num (int): The current round number. - last_feedback_round (int): The last round in which feedback was provided. - init_reputation (float): The initial reputation value. + Initialize the reputation system. """ if not federation_nodes: logging.error("init_reputation | No federation nodes provided") @@ -277,15 +234,16 @@ def init_reputation( avg_reputation = self.save_reputation_history_in_memory(self._addr, nei, init_reputation) + metrics_data = { + "addr": addr, + "nei": nei, + "round": round_num, + "reputation_without_feedback": avg_reputation, + } + def is_valid_ip(self, federation_nodes): """ - Check if the IP addresses provided are valid. - - Args: - federation_nodes (list): List of federation node addresses. - - Returns: - list: A list of valid IP addresses. + Check if the IP addresses are valid. """ valid_ip = [] for i in federation_nodes: @@ -307,20 +265,24 @@ def _calculate_static_reputation( weight_model_arrival_latency, ): """ - Calculate the static reputation of a participant using fixed weights. + Calculate the static reputation of a participant. Args: - addr (str): The IP address of the current node. - nei (str): The neighbor node's IP address. - metric_messages_number (float): Metric value for number of messages. - metric_similarity (float): Metric value for model similarity. - metric_fraction (float): Metric value for fraction of parameters changed. - metric_model_arrival_latency (float): Metric value for model arrival latency. - weight_messages_number (float): Weight for number of messages. - weight_similarity (float): Weight for model similarity. - weight_fraction (float): Weight for fraction of parameters changed. - weight_model_arrival_latency (float): Weight for model arrival latency. + addr (str): The IP address of the participant. + nei (str): The IP address of the participant. + metric_messages_number (float): The number of messages. + metric_similarity (float): The similarity between models. + metric_fraction (float): The fraction of parameters changed. + metric_model_arrival_latency (float): The model arrival latency. + weight_messages_number (float): The weight of the number of messages. + weight_similarity (float): The weight of the similarity. + weight_fraction (float): The weight of the fraction. + weight_model_arrival_latency (float): The weight of the model arrival latency. + + Returns: + float: The static reputation of the participant. """ + static_weights = { "num_messages": weight_messages_number, "model_similarity": weight_similarity, @@ -356,14 +318,14 @@ def _calculate_static_reputation( async def _calculate_dynamic_reputation(self, addr, neighbors): """ - Calculate the dynamic reputation of a participant based on historical metric data. + Calculate the dynamic reputation of a participant. Args: - addr (str): The address of the current node. - neighbors (list): List of neighbor node addresses. + addr (str): The IP address of the participant. + neighbors (list): The list of neighbors. Returns: - dict: Updated dynamic reputation values. + dict: The dynamic reputation of the participant. """ average_weights = {} @@ -418,12 +380,12 @@ async def _calculate_dynamic_reputation(self, addr, neighbors): def _update_reputation_record(self, nei, reputation, data): """ - Update the reputation record of a neighbor. + Update the reputation record of a participant. Args: - nei (str): The neighbor node's address. - reputation (float): The computed reputation value. - data (dict): Additional metrics data associated with the reputation. + nei (str): The IP address of the participant. + reputation (float): The reputation of the participant. + data (dict): The data to update. """ if nei not in self.reputation: self.reputation[nei] = { @@ -436,7 +398,8 @@ def _update_reputation_record(self, nei, reputation, data): self.reputation[nei]["round"] = self._engine.get_round() logging.info(f"Reputation of node {nei}: {self.reputation[nei]['reputation']}") - if self.reputation[nei]["reputation"] < 0.6: + # if self.reputation[nei]["reputation"] < 0.75: + if self.reputation[nei]["reputation"] < 0.6 and self._engine.get_round() > 0: self.rejected_nodes.add(nei) logging.info(f"Rejected node {nei} at round {self._engine.get_round()}") @@ -453,18 +416,7 @@ def calculate_weighted_values( reputation_metrics, ): """ - Calculate the weighted values for each metric based on current measurements and historical data. - - Args: - avg_messages_number_message_normalized (float): Normalized average message count. - similarity_reputation (float): Reputation score based on model similarity. - fraction_score_asign (float): Score assigned from fraction of parameters changed. - avg_model_arrival_latency (float): Average model arrival latency. - history_data (dict): Historical metrics data. - current_round (int): The current round number. - addr (str): The address of the current node. - nei (str): The neighbor node's address. - reputation_metrics (dict): Dictionary indicating which metrics are active. + Calculate the weighted values for each metric. """ if current_round is not None: normalized_weights = {} @@ -501,7 +453,7 @@ def calculate_weighted_values( adjusted_weights = {} - if current_round >= 5 and num_active_metrics > 0: + if current_round >= 1 and num_active_metrics > 0: desviations = {} for metric_name, current_value in active_metrics.items(): historical_values = history_data[metric_name] @@ -563,29 +515,20 @@ def calculate_weighted_values( if entry["metric_name"] == metric_name and entry["round"] == current_round and entry["nei"] == nei: entry["weight"] = weight - async def calculate_value_metrics(self, log_dir, id_node, addr, nei, metrics_active=None): + async def calculate_value_metrics(self, addr, nei, metrics_active=None): """ - Calculate various metrics (message count, model similarity, fraction of parameters changed, and model arrival latency) - for a given neighbor node based on stored connection data. + Calculate the reputation of each participant based on the data stored in self.connection_metrics. Args: - log_dir (str): Directory for log files. - id_node (str): Identifier for the node. - addr (str): The address of the current node. - nei (str): The neighbor node's address. - metrics_active (dict): Dictionary indicating which metrics are active. - - Returns: - tuple: A tuple containing: - - avg_messages_number_message_normalized (float) - - similarity_reputation (float) - - fraction_score_asign (float) - - avg_model_arrival_latency (float) + addr (str): Source IP address. + nei (str): Destination IP address. + metrics_active (dict): The active metrics. """ + messages_number_message_normalized = 0 messages_number_message_count = 0 avg_messages_number_message_normalized = 0 - fraction_score_normalized = 0 + score_fraction = 0 fraction_score_asign = 0 messages_model_arrival_latency_normalized = 0 avg_model_arrival_latency = 0 @@ -594,7 +537,6 @@ async def calculate_value_metrics(self, log_dir, id_node, addr, nei, metrics_act try: current_round = self._engine.get_round() - metrics_instance = self.connection_metrics.get(nei) if not metrics_instance: logging.warning(f"No metrics found for neighbor {nei}") @@ -628,95 +570,84 @@ async def calculate_value_metrics(self, log_dir, id_node, addr, nei, metrics_act ]["avg_number_message"] if metrics_active.get("fraction_parameters_changed", False): - if metrics_instance.fraction_of_params_changed.get("round") == current_round: + if metrics_instance.fraction_of_params_changed.get("current_round") == current_round: fraction_changed = metrics_instance.fraction_of_params_changed.get("fraction_changed") threshold = metrics_instance.fraction_of_params_changed.get("threshold") - fraction_score_normalized = self.analyze_anomalies( + current_round = metrics_instance.fraction_of_params_changed.get("current_round") + score_fraction = self.analyze_anomalies( addr, nei, current_round, - current_round, # Assumes round_received is the current round. fraction_changed, threshold, ) - if metrics_active.get("model_arrival_latency", False): - if metrics_instance.model_arrival_latency.get("round_received") == current_round: - round_latency = metrics_instance.model_arrival_latency.get("round") - latency = metrics_instance.model_arrival_latency.get("latency") - messages_model_arrival_latency_normalized = self.manage_model_arrival_latency( - round_latency, addr, nei, latency, current_round - ) + if current_round >= 1: + key_current = (addr, nei, current_round) - if current_round >= 5 and metrics_active.get("model_similarity", False): - similarity_reputation = self.calculate_similarity_from_metrics(nei, current_round) - else: - similarity_reputation = 0 + if score_fraction > 0: + past_scores = [] + for i in range(1, 5): + key_prev = (addr, nei, current_round - i) + score_prev = self.fraction_changed_history.get(key_prev, {}).get("finally_fraction_score") + if score_prev is not None and score_prev > 0: + past_scores.append(score_prev) - if messages_model_arrival_latency_normalized >= 0: - avg_model_arrival_latency = self.save_model_arrival_latency_history( - addr, nei, messages_model_arrival_latency_normalized, current_round - ) - if avg_model_arrival_latency is None and current_round > 4: - avg_model_arrival_latency = self.model_arrival_latency_history[(addr, nei)][current_round - 1][ - "score" - ] + if past_scores: + avg_past = sum(past_scores) / len(past_scores) + fraction_score_asign = score_fraction * 0.2 + avg_past * 0.8 + else: + fraction_score_asign = score_fraction - if self.messages_number_message is not None: - messages_number_message_normalized, messages_number_message_count = self.manage_metric_number_message( - self.messages_number_message, addr, nei, current_round, metrics_active.get("num_messages", False) - ) - avg_messages_number_message_normalized = self.save_number_message_history( - addr, nei, messages_number_message_normalized, current_round - ) - if avg_messages_number_message_normalized is None and current_round > 4: - avg_messages_number_message_normalized = self.number_message_history[(addr, nei)][ - current_round - 1 - ]["avg_number_message"] + self.fraction_changed_history[key_current]["finally_fraction_score"] = fraction_score_asign + + else: + key_prev = (addr, nei, current_round - 1) + prev_score = self.fraction_changed_history.get(key_prev, {}).get("finally_fraction_score") - if current_round >= 5: - if fraction_score_normalized > 0: - key_previous_round = (addr, nei, current_round - 1) if current_round - 1 > 0 else None - fraction_previous_round = None + if prev_score is not None: + fraction_score_asign = prev_score * 0.1 + else: + if fraction_neighbors_scores is None: + fraction_neighbors_scores = {} - if key_previous_round is not None and key_previous_round in self.fraction_changed_history: - fraction_score_prev = self.fraction_changed_history[key_previous_round].get("fraction_score") - fraction_previous_round = fraction_score_prev if fraction_score_prev is not None else None + for key, value in self.fraction_changed_history.items(): + score = value.get("finally_fraction_score") + if score is not None: + fraction_neighbors_scores[key] = score - if fraction_previous_round is not None: - fraction_score_asign = fraction_score_normalized * 0.8 + fraction_previous_round * 0.2 - self.fraction_changed_history[(addr, nei, current_round)]["fraction_score"] = ( - fraction_score_asign - ) - else: - fraction_score_asign = fraction_score_normalized - self.fraction_changed_history[(addr, nei, current_round)]["fraction_score"] = ( - fraction_score_asign - ) + fraction_score_asign = ( + np.mean(list(fraction_neighbors_scores.values())) if fraction_neighbors_scores else 0 + ) + + if key_current not in self.fraction_changed_history: + self.fraction_changed_history[key_current] = {} + + self.fraction_changed_history[key_current]["finally_fraction_score"] = fraction_score_asign else: - fraction_previous_round = None - key_previous_round = (addr, nei, current_round - 1) if current_round - 1 > 0 else None - if key_previous_round is not None and key_previous_round in self.fraction_changed_history: - fraction_score_prev = self.fraction_changed_history[key_previous_round].get("fraction_score") - fraction_previous_round = fraction_score_prev if fraction_score_prev is not None else None - - if fraction_previous_round is not None: - fraction_score_asign = fraction_previous_round - (fraction_previous_round * 0.5) - else: - if fraction_neighbors_scores is None: - fraction_neighbors_scores = {} + fraction_score_asign = 0 + + if metrics_active.get("model_arrival_latency", False): + if metrics_instance.model_arrival_latency.get("round_received") == current_round: + round_num = metrics_instance.model_arrival_latency.get("round") + latency = metrics_instance.model_arrival_latency.get("latency") + messages_model_arrival_latency_normalized = self.manage_model_arrival_latency( + addr, nei, latency, current_round, round_num + ) - for key, value in self.fraction_changed_history.items(): - score = value.get("fraction_score") - if score is not None: - fraction_neighbors_scores[key] = score + if messages_model_arrival_latency_normalized >= 0: + avg_model_arrival_latency = self.save_model_arrival_latency_history( + nei, messages_model_arrival_latency_normalized, current_round + ) + if avg_model_arrival_latency is None and current_round > 4: + avg_model_arrival_latency = self.model_arrival_latency_history[(addr, nei)][current_round - 1][ + "score" + ] - if fraction_neighbors_scores: - fraction_score_asign = np.mean(list(fraction_neighbors_scores.values())) - else: - fraction_score_asign = 0 + if current_round >= 1 and metrics_active.get("model_similarity", False): + similarity_reputation = self.calculate_similarity_from_metrics(nei, current_round) else: - fraction_score_asign = 0 + similarity_reputation = 0 self.create_graphics_to_metrics( messages_number_message_count, @@ -736,9 +667,9 @@ async def calculate_value_metrics(self, log_dir, id_node, addr, nei, metrics_act fraction_score_asign, avg_model_arrival_latency, ) - except Exception as e: logging.exception(f"Error calculating reputation. Type: {type(e).__name__}") + return 0, 0, 0, 0 def create_graphics_to_metrics( self, @@ -753,19 +684,9 @@ def create_graphics_to_metrics( total_rounds, ): """ - Create and log graphics representing different metric values over the rounds. - - Args: - number_message_count (int): Count of messages. - number_message_norm (float): Normalized number of messages. - similarity (float): Similarity metric value. - fraction (float): Fraction score metric. - model_arrival_latency (float): Latency metric score. - addr (str): The current node's address. - nei (str): The neighbor node's address. - current_round (int): The current round number. - total_rounds (int): Total rounds in the session. + Create graphics to metrics. """ + if current_round is not None and current_round < total_rounds: model_arrival_latency_dict = {f"R-Model_arrival_latency_reputation/{addr}": {nei: model_arrival_latency}} messages_number_message_count_dict = { @@ -790,47 +711,40 @@ def create_graphics_to_metrics( if model_arrival_latency_dict is not None: self.engine.trainer._logger.log_data(model_arrival_latency_dict, step=current_round) + data = { + "addr": addr, + "nei": nei, + "round": current_round, + "number_message_count": number_message_count, + "number_message_norm": number_message_norm, + "similarity": similarity, + "fraction": fraction, + "model_arrival_latency": model_arrival_latency, + } + def analyze_anomalies( self, addr, nei, - round_num, current_round, fraction_changed, threshold, ): """ - Analyze anomalies in the fraction of parameters changed and calculate a corresponding score. - - Args: - addr (str): The source node's address. - nei (str): The neighbor node's address. - round_num (int): The round number for the metric. - current_round (int): The current round number. - fraction_changed (float): Fraction of parameters changed. - threshold (float): Threshold value for changes. + Analyze anomalies in the fraction of parameters changed. Returns: - float: A normalized fraction score between 0 and 1. + float: The fraction score between 0 and 1. """ try: - key = (addr, nei, round_num) + key = (addr, nei, current_round) + penalization_factor_fraction = 0.0 + penalization_factor_threshold = 0.0 if key not in self.fraction_changed_history: - prev_key = (addr, nei, round_num - 1) - if round_num > 0 and prev_key in self.fraction_changed_history: - previous_data = self.fraction_changed_history[prev_key] - fraction_changed = ( - fraction_changed if fraction_changed is not None else previous_data["fraction_changed"] - ) - threshold = threshold if threshold is not None else previous_data["threshold"] - else: - fraction_changed = fraction_changed if fraction_changed is not None else 0 - threshold = threshold if threshold is not None else 0 - self.fraction_changed_history[key] = { - "fraction_changed": fraction_changed, - "threshold": threshold, + "fraction_changed": fraction_changed or 0, + "threshold": threshold or 0, "fraction_score": None, "fraction_anomaly": False, "threshold_anomaly": False, @@ -840,53 +754,52 @@ def analyze_anomalies( "std_dev_threshold": None, } - if round_num < 5: - past_fractions = [] - past_thresholds = [] - - for r in range(round_num): - past_key = (addr, nei, r) - if past_key in self.fraction_changed_history: - past_fractions.append(self.fraction_changed_history[past_key]["fraction_changed"]) - past_thresholds.append(self.fraction_changed_history[past_key]["threshold"]) - - if past_fractions: - mean_fraction = np.mean(past_fractions) - std_dev_fraction = np.std(past_fractions) - self.fraction_changed_history[key]["mean_fraction"] = mean_fraction - self.fraction_changed_history[key]["std_dev_fraction"] = std_dev_fraction - - if past_thresholds: - mean_threshold = np.mean(past_thresholds) - std_dev_threshold = np.std(past_thresholds) - self.fraction_changed_history[key]["mean_threshold"] = mean_threshold - self.fraction_changed_history[key]["std_dev_threshold"] = std_dev_threshold - - return 0 + current_fraction = self.fraction_changed_history[key]["fraction_changed"] + current_threshold = self.fraction_changed_history[key]["threshold"] + if current_round == 0: + self.fraction_changed_history[key].update({ + "mean_fraction": current_fraction, + "std_dev_fraction": 0.0, + "mean_threshold": current_threshold, + "std_dev_threshold": 0.0, + "fraction_score": 1.0, + }) + + mean_fraction_prev = current_fraction + std_dev_fraction_prev = 0.0 + mean_threshold_prev = current_threshold + std_dev_threshold_prev = 0.0 + upper_mean_fraction_prev = None + upper_mean_threshold_prev = None + fraction_anomaly = False + threshold_anomaly = False + fraction_value = 1.0 + threshold_value = 1.0 + fraction_score = 1.0 + else: - fraction_value = 0 - threshold_value = 0 - prev_key = (addr, nei, round_num - 1) - if prev_key not in self.fraction_changed_history: - for i in range(0, round_num + 1): - potential_prev_key = (addr, nei, round_num - i) - if potential_prev_key in self.fraction_changed_history: - mean_fraction_prev = self.fraction_changed_history[potential_prev_key]["mean_fraction"] - if mean_fraction_prev is not None: - prev_key = potential_prev_key - break - - if prev_key: - mean_fraction_prev = self.fraction_changed_history[prev_key]["mean_fraction"] - std_dev_fraction_prev = self.fraction_changed_history[prev_key]["std_dev_fraction"] - mean_threshold_prev = self.fraction_changed_history[prev_key]["mean_threshold"] - std_dev_threshold_prev = self.fraction_changed_history[prev_key]["std_dev_threshold"] - - current_fraction = self.fraction_changed_history[key]["fraction_changed"] - current_threshold = self.fraction_changed_history[key]["threshold"] - - upper_mean_fraction_prev = (mean_fraction_prev + std_dev_fraction_prev) * 1.05 - upper_mean_threshold_prev = (mean_threshold_prev + std_dev_threshold_prev) * 1.10 + prev_key = None + for i in range(1, current_round + 1): + candidate_key = (addr, nei, current_round - i) + candidate_data = self.fraction_changed_history.get(candidate_key, {}) + if all( + candidate_data.get(k) is not None + for k in ["mean_fraction", "std_dev_fraction", "mean_threshold", "std_dev_threshold"] + ): + prev_key = candidate_key + break + + if prev_key is None: + logging.warning(f"No valid previous stats found for {addr}, {nei}, round {current_round}") + else: + prev_data = self.fraction_changed_history[prev_key] + mean_fraction_prev = prev_data.get("mean_fraction") + std_dev_fraction_prev = prev_data.get("std_dev_fraction") + mean_threshold_prev = prev_data.get("mean_threshold") + std_dev_threshold_prev = prev_data.get("std_dev_threshold") + + upper_mean_fraction_prev = (mean_fraction_prev + std_dev_fraction_prev) * 1.20 + upper_mean_threshold_prev = (mean_threshold_prev + std_dev_threshold_prev) * 1.15 fraction_anomaly = current_fraction > upper_mean_fraction_prev threshold_anomaly = current_threshold > upper_mean_threshold_prev @@ -894,80 +807,81 @@ def analyze_anomalies( self.fraction_changed_history[key]["fraction_anomaly"] = fraction_anomaly self.fraction_changed_history[key]["threshold_anomaly"] = threshold_anomaly - penalization_factor_fraction = ( - abs(current_fraction - mean_fraction_prev) / mean_fraction_prev - if mean_fraction_prev != 0 - else 1 - ) - penalization_factor_threshold = ( - abs(current_threshold - mean_threshold_prev) / mean_threshold_prev - if mean_threshold_prev != 0 - else 1 - ) - - k_fraction = penalization_factor_fraction if penalization_factor_fraction != 0 else 1 - k_threshold = penalization_factor_threshold if penalization_factor_threshold != 0 else 1 - if fraction_anomaly: - fraction_value = ( - 1 - (1 / (1 + np.exp(-k_fraction))) - if current_fraction is not None and mean_fraction_prev is not None - else 0 + penalization_factor_fraction = ( + abs(current_fraction - mean_fraction_prev) / mean_fraction_prev if mean_fraction_prev else 1 ) + fraction_value = 1 - (1 / (1 + np.exp(-penalization_factor_fraction))) else: - fraction_value = ( - 1 - (1 / (1 + np.exp(k_fraction))) - if current_fraction is not None and mean_fraction_prev is not None - else 0 - ) + fraction_value = 1.0 if threshold_anomaly: - threshold_value = ( - 1 - (1 / (1 + np.exp(-k_threshold))) - if current_threshold is not None and mean_threshold_prev is not None - else 0 + penalization_factor_threshold = ( + abs(current_threshold - mean_threshold_prev) / mean_threshold_prev + if mean_threshold_prev + else 1 ) + threshold_value = 1 - (1 / (1 + np.exp(-penalization_factor_threshold))) else: - threshold_value = ( - 1 - (1 / (1 + np.exp(k_threshold))) - if current_threshold is not None and mean_threshold_prev is not None - else 0 - ) + threshold_value = 1.0 fraction_weight = 0.5 threshold_weight = 0.5 - fraction_score = fraction_weight * fraction_value + threshold_weight * threshold_value self.fraction_changed_history[key]["mean_fraction"] = (current_fraction + mean_fraction_prev) / 2 self.fraction_changed_history[key]["std_dev_fraction"] = np.sqrt( ((current_fraction - mean_fraction_prev) ** 2 + std_dev_fraction_prev**2) / 2 ) + self.fraction_changed_history[key]["std_dev_fraction"] = np.sqrt( + ((current_fraction - mean_fraction_prev) ** 2 + std_dev_fraction_prev**2) / 2 + ) self.fraction_changed_history[key]["mean_threshold"] = (current_threshold + mean_threshold_prev) / 2 self.fraction_changed_history[key]["std_dev_threshold"] = np.sqrt( ((0.1 * (current_threshold - mean_threshold_prev) ** 2) + std_dev_threshold_prev**2) / 2 ) + self.fraction_changed_history[key]["fraction_score"] = fraction_score + + data = { + "addr": addr, + "nei": nei, + "current_round": current_round, + "fraction_changed": current_fraction, + "threshold": current_threshold, + "mean_fraction": mean_fraction_prev, + "std_dev_fraction": std_dev_fraction_prev, + "mean_threshold": mean_threshold_prev, + "std_dev_threshold": std_dev_threshold_prev, + "upper_mean_fraction": upper_mean_fraction_prev, + "upper_mean_threshold": upper_mean_threshold_prev, + "fraction_anomaly": fraction_anomaly, + "threshold_anomaly": threshold_anomaly, + "penalization_factor_fraction": penalization_factor_fraction or 0, + "penalization_factor_threshold": penalization_factor_threshold or 0, + "fraction_value": fraction_value, + "threshold_value": threshold_value, + "fraction_score": fraction_score, + } + + return max(fraction_score, 0) - return max(fraction_score, 0) - else: - return -1 except Exception: logging.exception("Error analyzing anomalies") return -1 - def manage_model_arrival_latency(self, round_num, addr, nei, latency, current_round): + def manage_model_arrival_latency(self, addr, nei, latency, current_round, round_num): """ - Manage the model arrival latency metric and normalize it based on historical latencies. + Manage the model_arrival_latency metric using latency. Args: - round_num (int): The round number when the model was sent. - addr (str): The address of the current node. - nei (str): The neighbor node's address. - latency (float): The measured latency. - current_round (int): The current round number. + addr (str): Source IP address. + nei (str): Destination IP address. + latency (float): Latency value for the current model_arrival_latency. + current_round (int): The current round of the program. + round_num (int): The round number of the model_arrival_latency. Returns: - float: Normalized latency score between 0 and 1. + float: Normalized score between 0 and 1 for model_arrival_latency. """ try: current_key = nei @@ -980,65 +894,68 @@ def manage_model_arrival_latency(self, round_num, addr, nei, latency, current_ro "score": 0.0, } - prev_mean_latency = 0 - prev_percentil_0 = 0 - prev_percentil_25 = 0 + mean_latency = 0 difference = 0 - if current_round >= 5: + if current_round >= 1: + target_round = ( + current_round - 1 if (current_round - 1) in self.model_arrival_latency_history else current_round + ) + all_latencies = [ data["latency"] - for r in self.model_arrival_latency_history - for key, data in self.model_arrival_latency_history[r].items() - if "latency" in data and data["latency"] != 0 + for data in self.model_arrival_latency_history.get(target_round, {}).values() + if data.get("latency") not in (None, 0.0) ] - prev_mean_latency = np.mean(all_latencies) if all_latencies else 0 - prev_percentil_0 = np.percentile(all_latencies, 0) if all_latencies else 0 - prev_percentil_25 = np.percentile(all_latencies, 25) if all_latencies else 0 - - k = 0.1 - prev_mean_latency += k * (prev_percentil_25 - prev_percentil_0) - - difference = latency - prev_mean_latency - if latency <= prev_mean_latency: - score = 1.0 + mean_latency = np.mean(all_latencies) if all_latencies else 0 + aument_mean = mean_latency * 1.4 + if latency is not None: + difference = latency - mean_latency + if latency <= aument_mean: + score = 1.0 + else: + score = 1 / (1 + np.exp(abs(difference) / mean_latency)) if mean_latency != 0 else 0.0 else: - score = 1 / (1 + np.exp(abs(difference) / prev_mean_latency)) - - if round_num < current_round: - round_diff = current_round - round_num - penalty_factor = round_diff * 0.1 - penalty = penalty_factor * (1 - score) - score -= penalty * score + logging.info(f"latency is None in round {current_round} for nei {nei}") + score = -0.5 self.model_arrival_latency_history[current_round][current_key].update({ - "mean_latency": prev_mean_latency, - "percentil_0": prev_percentil_0, - "percentil_25": prev_percentil_25, + "mean_latency": mean_latency, "score": score, }) else: score = 0 + data = { + "addr": addr, + "nei": nei, + "round": round_num, + "current_round": current_round, + "latency": latency, + "mean_latency": mean_latency if current_round >= 1 else None, + "aument_latency": aument_mean if current_round >= 1 else None, + "difference": difference if current_round >= 1 else None, + "score": score, + } + return score except Exception as e: logging.exception(f"Error managing model_arrival_latency: {e}") return 0 - def save_model_arrival_latency_history(self, addr, nei, model_arrival_latency, round_num): + def save_model_arrival_latency_history(self, nei, model_arrival_latency, round_num): """ - Save and update the model arrival latency history in memory. - + Save the model_arrival_latency history of a participant (addr) regarding its neighbor (nei) in memory. + Use 3 rounds for the average. Args: - addr (str): The current node's address. - nei (str): The neighbor node's address. - model_arrival_latency (float): The normalized latency score. + nei (str): The neighboring node involved. + model_arrival_latency (float): The model_arrival_latency value to be saved. round_num (int): The current round number. Returns: - float: The updated average model arrival latency. + float: The smoothed average model_arrival_latency including the current round. """ try: current_key = nei @@ -1053,30 +970,33 @@ def save_model_arrival_latency_history(self, addr, nei, model_arrival_latency, r "score": model_arrival_latency, }) - if model_arrival_latency > 0 and round_num > 5: - previous_avg = ( - self.model_arrival_latency_history.get(round_num - 1, {}) - .get(current_key, {}) - .get("avg_model_arrival_latency", None) - ) - - if previous_avg is not None: - avg_model_arrival_latency = ( - model_arrival_latency * 0.8 + previous_avg * 0.2 - if previous_avg is not None - else model_arrival_latency + if model_arrival_latency > 0 and round_num >= 1: + past_values = [] + for r in range(round_num - 3, round_num): + val = ( + self.model_arrival_latency_history.get(r, {}) + .get(current_key, {}) + .get("avg_model_arrival_latency", None) ) + if val is not None and val != 0: + past_values.append(val) + + if past_values: + avg_past = sum(past_values) / len(past_values) + avg_model_arrival_latency = model_arrival_latency * 0.2 + avg_past * 0.8 else: - avg_model_arrival_latency = model_arrival_latency - (model_arrival_latency * 0.05) - elif model_arrival_latency == 0 and round_num > 5: + avg_model_arrival_latency = model_arrival_latency + elif model_arrival_latency == 0 and round_num >= 1: previous_avg = ( self.model_arrival_latency_history.get(round_num - 1, {}) .get(current_key, {}) .get("avg_model_arrival_latency", None) ) - avg_model_arrival_latency = previous_avg - (previous_avg * 0.05) + avg_model_arrival_latency = previous_avg * 0.1 if previous_avg is not None else 0 + elif model_arrival_latency < 0 and round_num >= 1: + avg_model_arrival_latency = abs(model_arrival_latency) * 0.3 else: - avg_model_arrival_latency = model_arrival_latency + avg_model_arrival_latency = 0 self.model_arrival_latency_history[round_num][current_key]["avg_model_arrival_latency"] = ( avg_model_arrival_latency @@ -1086,90 +1006,104 @@ def save_model_arrival_latency_history(self, addr, nei, model_arrival_latency, r except Exception: logging.exception("Error saving model_arrival_latency history") - def manage_metric_number_message(self, messages_number_message, addr, nei, current_round, metric_active=True): - """ - Manage and normalize the number of messages metric using percentiles. - - Args: - messages_number_message (list): List containing message data. - addr (str): The current node's address. - nei (str): The neighbor node's address. - current_round (int): The current round number. - metric_active (bool): Flag indicating whether the metric is active. - - Returns: - tuple: A tuple with the normalized number_message value (float) and the count of messages (int). - """ + def manage_metric_number_message( + self, messages_number_message: list, addr: str, nei: str, current_round: int, metric_active: bool = True + ) -> tuple[float, int]: try: - if current_round == 0: - return 0.0, 0 - - if not metric_active: + if current_round == 0 or not metric_active: return 0.0, 0 - previous_round = current_round - current_addr_nei = (addr, nei) + relevant_messages = [ msg for msg in messages_number_message - if msg["key"] == current_addr_nei and msg["current_round"] == previous_round + if msg["key"] == current_addr_nei and msg["current_round"] == current_round ] - messages_count = len(relevant_messages) if relevant_messages else 0 - - rounds_to_consider = [] - if previous_round >= 4: - rounds_to_consider = [previous_round - 4, previous_round - 3, previous_round - 2, previous_round - 1] - elif previous_round == 3: - rounds_to_consider = [0, 1, 2, 3] - elif previous_round == 2: - rounds_to_consider = [0, 1, 2] - elif previous_round == 1: - rounds_to_consider = [0, 1] - elif previous_round == 0: - rounds_to_consider = [0] - - previous_counts = [ - len([m for m in messages_number_message if m["key"] == current_addr_nei and m["current_round"] == r]) - for r in rounds_to_consider + messages_count = len(relevant_messages) + + previous_round = current_round - 1 + all_messages_previous_round = [ + m for m in messages_number_message if m.get("current_round") == previous_round ] - self.previous_percentile_25_number_message[current_addr_nei] = ( - np.percentile(previous_counts, 25) if previous_counts else 0 - ) - self.previous_percentile_85_number_message[current_addr_nei] = ( - np.percentile(previous_counts, 85) if previous_counts else 0 - ) + neighbor_counts = {} + for m in all_messages_previous_round: + key = m.get("key") + neighbor_counts[key] = neighbor_counts.get(key, 0) + 1 + + counts_all_neighbors = list(neighbor_counts.values()) + percentile_reference = np.percentile(counts_all_neighbors, 25) if counts_all_neighbors else 0 + logging.info(f"count_all_neighbors: {counts_all_neighbors}, percentile_reference: {percentile_reference}") + std_dev = np.std(counts_all_neighbors) if counts_all_neighbors else 0 + mean_messages_all_neighbors = np.mean(counts_all_neighbors) if counts_all_neighbors else 0 + aument_mean = mean_messages_all_neighbors * 2 if current_round <= 1 else mean_messages_all_neighbors * 1.1 + + if percentile_reference > 0: + raw_relative_increase = (messages_count - percentile_reference) / percentile_reference + relative_increase = np.log1p(raw_relative_increase) + else: + relative_increase = 0.0 + dynamic_margin = (std_dev + 1) / (np.log1p(percentile_reference) + 1) normalized_messages = 1.0 - relative_position = 0 - - if previous_round > 4: - percentile_25 = self.previous_percentile_25_number_message.get(current_addr_nei, 0) - percentile_85 = self.previous_percentile_85_number_message.get(current_addr_nei, 0) - if messages_count > percentile_85: - relative_position = (messages_count - percentile_85) / (percentile_85 - percentile_25) - normalized_messages = np.exp(-relative_position) - - normalized_messages = max(0.01, normalized_messages) + was_penalized = False + + if relative_increase > dynamic_margin: + penalty_ratio = np.log1p(relative_increase - dynamic_margin) / (np.log1p(dynamic_margin + 1e-6) + 1e-6) + normalized_messages *= np.exp(-(penalty_ratio**2)) + was_penalized = True + + extra_penalty = 0.0 + if mean_messages_all_neighbors > 0 and messages_count > aument_mean: + extra_penalty = (messages_count - mean_messages_all_neighbors) / (mean_messages_all_neighbors + 1e-6) + amplification = 1 + (aument_mean / (mean_messages_all_neighbors + 1e-6)) + normalized_messages *= np.exp(-((extra_penalty * amplification) ** 2)) + was_penalized = True + + if was_penalized and current_round > 1: + prev_score = ( + self.number_message_history.get((addr, nei), {}) + .get(current_round - 1, {}) + .get("normalized_messages") + ) + if prev_score is not None and prev_score < 0.9: + normalized_messages *= 0.9 + + if (addr, nei) not in self.number_message_history: + self.number_message_history[(addr, nei)] = {} + self.number_message_history[(addr, nei)][current_round] = {"normalized_messages": normalized_messages} + + normalized_messages = max(0.001, normalized_messages) + + data = { + "addr": addr, + "nei": nei, + "round": current_round, + "messages_count": messages_count, + "percentile_reference": percentile_reference, + "dynamic_margin": dynamic_margin, + "relative_increase": relative_increase, + "std_dev": std_dev, + "mean_all_neighbors": mean_messages_all_neighbors, + "aument_mean": aument_mean, + "extra_penalty": extra_penalty, + "normalized_messages": normalized_messages, + } return normalized_messages, messages_count + except Exception: logging.exception("Error managing metric number_message") return 0.0, 0 def save_number_message_history(self, addr, nei, messages_number_message_normalized, current_round): """ - Save the normalized number_message history in memory and calculate a weighted average. - - Args: - addr (str): The current node's address. - nei (str): The neighbor node's address. - messages_number_message_normalized (float): The normalized number_message value. - current_round (int): The current round number. + Save the number_message history of a participant (addr) regarding its neighbor (nei) in memory. + Uses a weighted average of the past 3 rounds to smooth the result. Returns: - float: The weighted average of the number_message metric. + float: The weighted average including the current round. """ try: key = (addr, nei) @@ -1178,41 +1112,55 @@ def save_number_message_history(self, addr, nei, messages_number_message_normali if key not in self.number_message_history: self.number_message_history[key] = {} - self.number_message_history[key][current_round] = {"number_message": messages_number_message_normalized} + if current_round not in self.number_message_history[key]: + self.number_message_history[key][current_round] = {} - if messages_number_message_normalized != 0 and current_round > 4: - previous_avg = ( - self.number_message_history[key].get(current_round - 1, {}).get("avg_number_message", None) - ) - if previous_avg is not None: - avg_number_message = messages_number_message_normalized * 0.8 + previous_avg * 0.2 + self.number_message_history[key][current_round].update({ + "number_message": messages_number_message_normalized, + }) + + if messages_number_message_normalized > 0 and current_round >= 1: + past_values = [] + for r in range(current_round - 3, current_round): + val = self.number_message_history.get(key, {}).get(r, {}).get("avg_number_message", None) + if val is not None and val != 0: + past_values.append(val) + + if past_values: + avg_past = sum(past_values) / len(past_values) + avg_number_message = messages_number_message_normalized * 0.9 + avg_past * 0.1 + # avg_number_message = messages_number_message_normalized * 0.1 + avg_past * 0.9 else: avg_number_message = messages_number_message_normalized - - self.number_message_history[key][current_round]["avg_number_message"] = avg_number_message + elif messages_number_message_normalized == 0 and current_round >= 1: + previous_avg = ( + self.number_message_history.get(key, {}).get(current_round - 1, {}).get("avg_number_message", None) + ) + avg_number_message = previous_avg * 0.1 if previous_avg is not None else 0 + elif messages_number_message_normalized < 0 and current_round >= 1: + avg_number_message = abs(messages_number_message_normalized) * 0.3 else: avg_number_message = 0 + self.number_message_history[key][current_round]["avg_number_message"] = avg_number_message + return avg_number_message except Exception: logging.exception("Error saving number_message history") return -1 - except Exception as e: - logging.exception(f"Error managing model_arrival_latency latency: {e}") - return 0.0 - def save_reputation_history_in_memory(self, addr, nei, reputation): """ - Save the reputation history for a neighbor and compute an average reputation. + Save the reputation history of a participant (addr) regarding its neighbor (nei) in memory + and calculate the average reputation. Args: - addr (str): The current node's address. - nei (str): The neighbor node's address. - reputation (float): The computed reputation for the current round. + addr (str): The identifier of the node whose reputation is being saved. + nei (str): The neighboring node involved. + reputation (float): The reputation value to be saved. Returns: - float: The updated (weighted) reputation. + float: The cumulative reputation including the current round. """ try: key = (addr, nei) @@ -1221,11 +1169,30 @@ def save_reputation_history_in_memory(self, addr, nei, reputation): self.reputation_history[key] = {} self.reputation_history[key][self._engine.get_round()] = reputation - avg_reputation = 0 - current_round = self._engine.get_round() - rounds = sorted(self.reputation_history[key].keys(), reverse=True)[:2] + # With the last 3 rounds + # rounds = sorted(self.reputation_history[key].keys(), reverse=True)[:3] + # if len(rounds) >= 3: + # selected_rounds = rounds[:3] + # weights = [0.5, 0.3, 0.2] + # elif len(rounds) == 2: + # selected_rounds = rounds[:2] + # weights = [0.6, 0.4] if selected_rounds[0] > selected_rounds[1] else [0, 0] + # elif len(rounds) == 1: + # selected_rounds = rounds + # weights = [1.0] + # else: + # return 0 # No reputation to average + + # values = [self.reputation_history[key][r] for r in selected_rounds] + # avg_reputation = sum(v * w for v, w in zip(values, weights)) / sum(weights) + + # return avg_reputation + + # With the last 2 rounds + rounds = sorted(self.reputation_history[key].keys(), reverse=True)[:2] + current_round = self._engine.get_round() if len(rounds) >= 2: current_round = rounds[0] previous_round = rounds[1] @@ -1234,27 +1201,42 @@ def save_reputation_history_in_memory(self, addr, nei, reputation): previous_rep = self.reputation_history[key][previous_round] logging.info(f"Current reputation: {current_rep}, Previous reputation: {previous_rep}") - avg_reputation = (current_rep * 0.8) + (previous_rep * 0.2) + avg_reputation = (current_rep * 0.9) + (previous_rep * 0.1) logging.info(f"Reputation ponderated: {avg_reputation}") else: avg_reputation = self.reputation_history[key][current_round] - return avg_reputation + # for i, n_round in enumerate(rounds, start=1): + # rep = self.reputation_history[key][n_round] + # decay_factor = self.calculate_decay_rate(rep) ** i + # total_reputation += rep * decay_factor + # total_weights += decay_factor + # logging.info( + # f"Round: {n_round}, Reputation: {rep}, Decay: {decay_factor}, Total reputation: {total_reputation}" + # ) + + # avg_reputation = total_reputation / total_weights + # if total_weights > 0: + # return avg_reputation + # else: + # return -1 + except Exception: logging.exception("Error saving reputation history") return -1 def calculate_decay_rate(self, reputation): """ - Calculate the decay rate for a given reputation value. + Calculate the decay rate for a reputation value. Args: - reputation (float): The current reputation value. + reputation (float): Reputation value. Returns: - float: The decay rate. + float: Decay rate. """ + if reputation > 0.8: return 0.9 # Very low decay elif reputation > 0.7: @@ -1268,14 +1250,15 @@ def calculate_decay_rate(self, reputation): def calculate_similarity_from_metrics(self, nei, current_round): """ - Calculate the similarity score based on stored similarity metrics. + Calculate the similarity value from the stored metrics in the 'similarity' + attribute of the Metrics instance for the given neighbor (nei) and current round. Args: - nei (str): The neighbor node's address. + nei (str): The IP address of the neighbor. current_round (int): The current round number. Returns: - float: The aggregated similarity score. + float: The computed similarity value. """ similarity_value = 0.0 @@ -1288,7 +1271,8 @@ def calculate_similarity_from_metrics(self, nei, current_round): source_ip = metric.get("nei") round_in_metric = metric.get("round") - if source_ip == nei and round_in_metric == current_round: + # if source_ip == nei and round_in_metric == current_round: + if source_ip == nei: weight_cosine = 0.25 weight_euclidean = 0.25 weight_manhattan = 0.25 @@ -1310,19 +1294,18 @@ def calculate_similarity_from_metrics(self, nei, current_round): async def calculate_reputation(self, ae: AggregationEvent): """ - Calculate and update the reputation for all neighbor nodes based on active metrics. - - This method processes the aggregated updates and then, based on the selected weighting factor (static or dynamic), - calculates the reputation. It also includes feedback and sends the reputation scores to neighbors. + Calculate the reputation of the node based on the active metrics. Args: - ae (AggregationEvent): The event containing aggregated updates. + ae (AggregationEvent): The aggregation event. """ (updates, _, _) = await ae.get_event_data() if self._enabled: logging.info(f"Calculating reputation at round {self._engine.get_round()}") logging.info(f"Active metrics: {self._metrics}") logging.info(f"rejected nodes at round {self._engine.get_round()}: {self.rejected_nodes}") + self.rejected_nodes.clear() + logging.info(f"Rejected nodes clear: {self.rejected_nodes}") neighbors = set(await self._engine._cm.get_addrs_current_connections(only_direct=True)) history_data = self.history_data @@ -1334,8 +1317,6 @@ async def calculate_reputation(self, ae: AggregationEvent): metric_fraction, metric_model_arrival_latency, ) = await self.calculate_value_metrics( - self._log_dir, - self._idx, self._addr, nei, metrics_active=self._metrics, @@ -1354,7 +1335,7 @@ async def calculate_reputation(self, ae: AggregationEvent): self._metrics, ) - if self._weighting_factor == "static" and self._engine.get_round() >= 5: + if self._weighting_factor == "static" and self._engine.get_round() >= 1: self._calculate_static_reputation( self._addr, nei, @@ -1368,10 +1349,10 @@ async def calculate_reputation(self, ae: AggregationEvent): self._weight_model_arrival_latency, ) - if self._weighting_factor == "dynamic" and self._engine.get_round() >= 5: + if self._weighting_factor == "dynamic" and self._engine.get_round() >= 1: await self._calculate_dynamic_reputation(self._addr, neighbors) - if self._engine.get_round() < 5 and self._enabled: + if self._engine.get_round() < 1 and self._with_reputation: federation = self._engine.config.participant["network_args"]["neighbors"].split() self.init_reputation( self._addr, @@ -1398,10 +1379,7 @@ async def calculate_reputation(self, ae: AggregationEvent): async def send_reputation_to_neighbors(self, neighbors): """ - Send the calculated reputation scores to all neighbors. - - Args: - neighbors (iterable): An iterable of neighbor node addresses. + Send the calculated reputation to the neighbors. """ for nei, data in self.reputation.items(): if data["reputation"] is not None: @@ -1420,13 +1398,16 @@ async def send_reputation_to_neighbors(self, neighbors): f"Sending reputation to node {nei} from node {neighbor} with reputation {data['reputation']}" ) + metrics_data = { + "addr": self._addr, + "nei": nei, + "round": self._engine.get_round(), + "reputation_with_feedback": data["reputation"], + } + def create_graphic_reputation(self, addr, round_num): """ - Create a graphical representation of the reputation scores and log the data. - - Args: - addr (str): The current node's address. - round_num (int): The current round number. + Create a graphic with the reputation of a node in a specific round. """ try: reputation_dict_with_values = { @@ -1445,27 +1426,37 @@ def create_graphic_reputation(self, addr, round_num): async def update_process_aggregation(self, updates): """ - Update the aggregation process by removing nodes that have been rejected. - - Args: - updates (dict): The dictionary of updates. + Update the process of aggregation by removing rejected nodes from the updates and + scaling the weights of the models based on their reputation. """ + # Reject node if the reputation is below 0.6 from the updates for rn in self.rejected_nodes: if rn in updates: updates.pop(rn) + # Scale the model weights based on the reputation of the nodes + if self.engine.get_round() >= 1: + for nei in list(updates.keys()): + if nei in self.reputation: + rep = self.reputation[nei].get("reputation", 0) + if rep >= 0.6: + weight = (rep - 0.6) / (1.0 - 0.6) + model_dict = updates[nei][0] + extra_data = updates[nei][1] + + scaled_model = {k: v * weight for k, v in model_dict.items()} + updates[nei] = (scaled_model, extra_data) + + logging.info(f"✅ Nei {nei} with reputation {rep:.4f}, scaled model with weight {weight:.4f}") + else: + logging.info(f"⛔ Nei {nei} with reputation {rep:.4f}, model rejected") + logging.info(f"Updates after rejected nodes: {list(updates.keys())}") - self.rejected_nodes.clear() - logging.info(f"rejected nodes after clear at round {self._engine.get_round()}: {self.rejected_nodes}") + logging.info(f"Nodes rejected: {self.rejected_nodes}") async def include_feedback_in_reputation(self): """ - Integrate feedback scores into the current reputation values. - - The final reputation is computed as a weighted sum of the current reputation and the average feedback. - - Returns: - bool: True if feedback was successfully included, False otherwise. + Include feedback of neighbors in the reputation. """ weight_current_reputation = 0.9 weight_feedback = 0.1 @@ -1517,67 +1508,127 @@ async def include_feedback_in_reputation(self): async def on_round_start(self, rse: RoundStartEvent): """ - Event handler for the start of a round. It stores the start time and updates the expected nodes. - - Args: - rse (RoundStartEvent): The event data containing round start information. + Handle the start of a new round and initialize the round timing information. """ (round_id, start_time, expected_nodes) = await rse.get_event_data() if round_id not in self.round_timing_info: self.round_timing_info[round_id] = {} self.round_timing_info[round_id]["start_time"] = start_time expected_nodes.difference_update(self.rejected_nodes) + expected_nodes = list(expected_nodes) + self._recalculate_pending_latencies(round_id) async def recollect_model_arrival_latency(self, ure: UpdateReceivedEvent): - """ - Event handler to record the model arrival latency when an update is received. - - Args: - ure (UpdateReceivedEvent): The event data for a model update. - """ (decoded_model, weight, source, round_num, local) = await ure.get_event_data() + current_round = self._engine.get_round() - # Exclude the node itself from the calculation - if source == self._addr: - return + # logging.info(f"Model from source {source}, round {round_num}, current_round {current_round}") - current_time = time.time() - current_round = round_num + self.round_timing_info.setdefault(round_num, {}) - if current_round not in self.round_timing_info: - self.round_timing_info[current_round] = {} + if round_num == current_round: + self._process_current_round(round_num, source) + elif round_num > current_round: + self.round_timing_info[round_num]["pending_recalculation"] = True + self.round_timing_info[round_num].setdefault("pending_sources", set()).add(source) + logging.info(f"Model from future round {round_num} stored, pending recalculation.") + else: + self._process_past_round(round_num, source) - if "model_received_time" not in self.round_timing_info[current_round]: - self.round_timing_info[current_round]["model_received_time"] = {} + self._recalculate_pending_latencies(current_round) - if source not in self.round_timing_info[current_round]["model_received_time"]: - self.round_timing_info[current_round]["model_received_time"][source] = current_time + def _process_current_round(self, round_num, source): + """ + Process models that arrive in the current round. + """ + if "start_time" in self.round_timing_info[round_num]: + current_time = time.time() + self.round_timing_info[round_num].setdefault("model_received_time", {}) + existing_time = self.round_timing_info[round_num]["model_received_time"].get(source) + if existing_time is None or current_time < existing_time: + self.round_timing_info[round_num]["model_received_time"][source] = current_time - if "start_time" in self.round_timing_info[current_round]: - start = self.round_timing_info[current_round]["start_time"] - received_time = self.round_timing_info[current_round]["model_received_time"][source] - duration = received_time - start - self.round_timing_info[current_round]["duration"] = duration - logging.info(f"Source {source} , round {current_round}, duration: {duration:.4f} seconds") + start_time = self.round_timing_info[round_num]["start_time"] + duration = current_time - start_time + self.round_timing_info[round_num]["duration"] = duration - self.save_data( - "model_arrival_latency", - source, - self._addr, - num_round=current_round, - current_round=self._engine.get_round(), - latency=duration, - ) + logging.info(f"Source {source}, round {round_num}, duration: {duration:.4f} seconds") + + self.save_data( + "model_arrival_latency", + source, + self._addr, + num_round=round_num, + current_round=self._engine.get_round(), + latency=duration, + ) else: - logging.info(f"Model arrival latency already calculated for node {source} in round {current_round}") + logging.info(f"Start time not yet available for round {round_num}.") - async def recollect_similarity(self, ure: UpdateReceivedEvent): + def _process_past_round(self, round_num, source): """ - Event handler to recollect and store model similarity metrics when an update is received. + Process models that arrive in past rounds. + """ + logging.info(f"Model from past round {round_num} received, storing for recalculation.") + current_time = time.time() + self.round_timing_info.setdefault(round_num, {}) + self.round_timing_info[round_num].setdefault("model_received_time", {}) + existing_time = self.round_timing_info[round_num]["model_received_time"].get(source) + if existing_time is None or current_time < existing_time: + self.round_timing_info[round_num]["model_received_time"][source] = current_time - Args: - ure (UpdateReceivedEvent): The event data containing model update information. + prev_start_time = self.round_timing_info.get(round_num, {}).get("start_time") + if prev_start_time: + duration = current_time - prev_start_time + self.round_timing_info[round_num]["duration"] = duration + + # logging.info(f"Source {source}, calculated latency using start_time at round {round_num}: {duration:.4f} seconds") + + self.save_data( + "model_arrival_latency", + source, + self._addr, + num_round=round_num, + current_round=self._engine.get_round(), + latency=duration, + ) + else: + logging.info(f"Start time for previous round {round_num - 1} not available yet.") + + def _recalculate_pending_latencies(self, current_round): """ + Recalculate latencies for rounds that have pending recalculation. + """ + logging.info("Recalculating latencies for rounds with pending recalculation.") + for r_num, r_data in self.round_timing_info.items(): + new_time = time.time() + if r_data.get("pending_recalculation"): + if "start_time" in r_data and "model_received_time" in r_data: + r_data.setdefault("model_received_time", {}) + + for src in list(r_data["pending_sources"]): + existing_time = r_data["model_received_time"].get(src) + if existing_time is None or new_time < existing_time: + r_data["model_received_time"][src] = new_time + duration = new_time - r_data["start_time"] + r_data["duration"] = duration + + logging.info(f"[Recalc] Source {src}, round {r_num}, duration: {duration:.4f} s") + # logging.info(f"Source {src}, round {r_num}, recalculated duration: {duration:.4f} seconds") + + self.save_data( + "model_arrival_latency", + src, + self._addr, + num_round=r_num, + current_round=current_round, + latency=duration, + ) + + r_data["pending_sources"].clear() + r_data["pending_recalculation"] = False + + async def recollect_similarity(self, ure: UpdateReceivedEvent): (decoded_model, weight, nei, round_num, local) = await ure.get_event_data() if self._enabled and self._metrics.get("model_similarity"): if self._engine.config.participant["adaptive_args"]["model_similarity"]: @@ -1614,7 +1665,6 @@ async def recollect_similarity(self, ure: UpdateReceivedEvent): decoded_model, similarity=True, ) - similarity_metrics = { "timestamp": datetime.now(), "nei": nei, @@ -1630,7 +1680,6 @@ async def recollect_similarity(self, ure: UpdateReceivedEvent): if nei in self.connection_metrics: self.connection_metrics[nei].similarity.append(similarity_metrics) - logging.info(f"Stored similarity metrics for {nei}: {similarity_metrics}") else: logging.warning(f"No metrics instance found for neighbor {nei}") @@ -1639,13 +1688,19 @@ async def recollect_similarity(self, ure: UpdateReceivedEvent): self.rejected_nodes.add(nei) async def recollect_number_message(self, source, message): - """ - Event handler to collect message count metrics when a message is received. + if source != self._addr: + current_time = time.time() + if current_time: + self.save_data( + "number_message", + source, + self._addr, + time=current_time, + current_round=self._engine.get_round(), + ) - Args: - source (str): The source node's address. - message: The received message (content not used directly). - """ + async def recollect_duplicated_number_message(self, dme: DuplicatedMessageEvent): + (source) = await dme.get_event_data() if source != self._addr: current_time = time.time() if current_time: @@ -1658,13 +1713,8 @@ async def recollect_number_message(self, source, message): ) async def recollect_fraction_of_parameters_changed(self, ure: UpdateReceivedEvent): - """ - Event handler to recollect the fraction of parameters changed when an update is received. - - Args: - ure (UpdateReceivedEvent): The event data containing model update information. - """ (decoded_model, weight, source, round_num, local) = await ure.get_event_data() + current_round = self._engine.get_round() parameters_local = self._engine.trainer.get_model_parameters() parameters_received = decoded_model @@ -1678,12 +1728,14 @@ async def recollect_fraction_of_parameters_changed(self, ure: UpdateReceivedEven prev_threshold = self.fraction_of_params_changed[source][current_round - 1][-1]["threshold"] for key in parameters_local.keys(): + # logging.info(f"🤖 fraction_of_parameters_changed | Key: {key}") if key in parameters_received: local_tensor = parameters_local[key].cpu() received_tensor = parameters_received[key].cpu() diff = torch.abs(local_tensor - received_tensor) differences.extend(diff.flatten().tolist()) total_params += diff.numel() + # logging.info(f"🤖 fraction_of_parameters_changed | Total params: {total_params}") if differences: mean_threshold = torch.mean(torch.tensor(differences)).item() @@ -1720,10 +1772,7 @@ async def recollect_fraction_of_parameters_changed(self, ure: UpdateReceivedEven "fraction_of_params_changed", source, self._addr, - current_round, + current_round=current_round, fraction_changed=fraction_changed, - total_params=total_params, - changed_params=changed_params, threshold=current_threshold, - changes_record=changes_record, ) diff --git a/nebula/controller/scenarios.py b/nebula/controller/scenarios.py index 421ab7936..155f2348c 100644 --- a/nebula/controller/scenarios.py +++ b/nebula/controller/scenarios.py @@ -6,15 +6,12 @@ import math import os import shutil -import subprocess -import sys import time from datetime import datetime from urllib.parse import quote -from aiohttp import FormData import docker -import tensorboard_reducer as tbr +from aiohttp import FormData from nebula.addons.topologymanager import TopologyManager from nebula.config.config import Config @@ -199,30 +196,30 @@ def __init__( self.mobile_participants_percent = mobile_participants_percent self.additional_participants = additional_participants self.with_trustworthiness = with_trustworthiness - self.robustness_pillar = robustness_pillar, - self.resilience_to_attacks = resilience_to_attacks, - self.algorithm_robustness = algorithm_robustness, - self.client_reliability = client_reliability, - self.privacy_pillar = privacy_pillar, - self.technique = technique, - self.uncertainty = uncertainty, - self.indistinguishability = indistinguishability, - self.fairness_pillar = fairness_pillar, - self.selection_fairness = selection_fairness, - self.performance_fairness = performance_fairness, - self.class_distribution = class_distribution, - self.explainability_pillar = explainability_pillar, - self.interpretability = interpretability, - self.post_hoc_methods = post_hoc_methods, - self.accountability_pillar = accountability_pillar, - self.factsheet_completeness = factsheet_completeness, - self.architectural_soundness_pillar = architectural_soundness_pillar, - self.client_management = client_management, - self.optimization = optimization, - self.sustainability_pillar = sustainability_pillar, - self.energy_source = energy_source, - self.hardware_efficiency = hardware_efficiency, - self.federation_complexity = federation_complexity, + self.robustness_pillar = (robustness_pillar,) + self.resilience_to_attacks = (resilience_to_attacks,) + self.algorithm_robustness = (algorithm_robustness,) + self.client_reliability = (client_reliability,) + self.privacy_pillar = (privacy_pillar,) + self.technique = (technique,) + self.uncertainty = (uncertainty,) + self.indistinguishability = (indistinguishability,) + self.fairness_pillar = (fairness_pillar,) + self.selection_fairness = (selection_fairness,) + self.performance_fairness = (performance_fairness,) + self.class_distribution = (class_distribution,) + self.explainability_pillar = (explainability_pillar,) + self.interpretability = (interpretability,) + self.post_hoc_methods = (post_hoc_methods,) + self.accountability_pillar = (accountability_pillar,) + self.factsheet_completeness = (factsheet_completeness,) + self.architectural_soundness_pillar = (architectural_soundness_pillar,) + self.client_management = (client_management,) + self.optimization = (optimization,) + self.sustainability_pillar = (sustainability_pillar,) + self.energy_source = (energy_source,) + self.hardware_efficiency = (hardware_efficiency,) + self.federation_complexity = (federation_complexity,) self.schema_additional_participants = schema_additional_participants self.random_topology_probability = random_topology_probability self.with_sa = with_sa @@ -697,8 +694,40 @@ def __init__(self, scenario, user=None): participant_config["adversarial_args"]["attack_params"] = node_config["attack_params"] else: participant_config["adversarial_args"]["attack_params"] = {"attacks": "No Attack"} - participant_config["defense_args"]["reputation"] = self.scenario.reputation - + # Defense parameters + participant_config["defense_args"]["reputation"]["with_reputation"] = self.scenario.reputation.get( + "with_reputation", False + ) + participant_config["defense_args"]["reputation"]["initial_reputation"] = self.scenario.reputation.get( + "initial_reputation", 0.2 + ) + metrics_list = self.scenario.reputation.get("reputation_metrics", []) + if isinstance(metrics_list, list): + metrics_dict = { + "model_similarity": "model_similarity" in metrics_list, + "num_messages": "num_messages" in metrics_list, + "model_arrival_latency": "model_arrival_latency" in metrics_list, + "fraction_parameters_changed": "fraction_parameters_changed" in metrics_list, + } + participant_config["defense_args"]["reputation"]["reputation_metrics"] = metrics_dict + else: + participant_config["defense_args"]["reputation"]["reputation_metrics"] = metrics_list + participant_config["defense_args"]["reputation"]["weighting_factor"] = self.scenario.reputation.get( + "weighting_factor", "dynamic" + ) + participant_config["defense_args"]["reputation"]["weight_model_arrival_latency"] = ( + self.scenario.reputation.get("weight_model_arrival_latency", 1.0) + ) + participant_config["defense_args"]["reputation"]["weight_model_similarity"] = ( + self.scenario.reputation.get("weight_model_similarity", 1.0) + ) + participant_config["defense_args"]["reputation"]["weight_num_messages"] = self.scenario.reputation.get( + "weight_num_messages", 1.0 + ) + participant_config["defense_args"]["reputation"]["weight_fraction_params_changed"] = ( + self.scenario.reputation.get("weight_fraction_params_changed", 1.0) + ) + # Mobility and network simulation parameters participant_config["mobility_args"]["random_geo"] = self.scenario.random_geo participant_config["mobility_args"]["latitude"] = self.scenario.latitude participant_config["mobility_args"]["longitude"] = self.scenario.longitude diff --git a/nebula/core/engine.py b/nebula/core/engine.py index a88ca1ddb..61c0484e2 100644 --- a/nebula/core/engine.py +++ b/nebula/core/engine.py @@ -158,7 +158,7 @@ def __init__( else: self._situational_awareness = None - if self.config.participant["defense_args"]["reputation"]["enabled"]: + if self.config.participant["defense_args"]["reputation"]["with_reputation"]: self._reputation = Reputation(engine=self, config=self.config) @property @@ -380,6 +380,29 @@ async def _federation_federation_models_included_callback(self, source, message) finally: await self.cm.get_connections_lock().release_async() + async def _reputation_share_callback(self, source, message): + try: + logging.info( + f"handle_reputation_message | Trigger | Received reputation message from {source} | Node: {message.node_id} | Score: {message.score} | Round: {message.round}" + ) + + current_node = self.addr + nei = message.node_id + + if hasattr(self, "_reputation") and self._reputation is not None: + if current_node != nei: + key = (current_node, nei, message.round) + + if key not in self._reputation.reputation_with_all_feedback: + self._reputation.reputation_with_all_feedback[key] = [] + + self._reputation.reputation_with_all_feedback[key].append(message.score) + else: + logging.error("Reputation object (_reputation) is not available.") + + except Exception as e: + logging.exception(f"Error handling reputation message: {e}") + """ ############################## # REGISTERING CALLBACKS # ############################## @@ -591,7 +614,7 @@ async def deploy_components(self): await self.aggregator.init() if "situational_awareness" in self.config.participant: await self.sa.init() - if self.config.participant["defense_args"]["reputation"]["enabled"]: + if self.config.participant["defense_args"]["reputation"]["with_reputation"]: await self._reputation.setup() await self._reporter.start() await self._addon_manager.deploy_additional_services() diff --git a/nebula/core/nebulaevents.py b/nebula/core/nebulaevents.py index e35c17fca..bcd1d4749 100644 --- a/nebula/core/nebulaevents.py +++ b/nebula/core/nebulaevents.py @@ -5,7 +5,7 @@ class AddonEvent(ABC): """ Abstract base class for all addon-related events in the system. """ - + @abstractmethod async def get_event_data(self): """ @@ -21,7 +21,7 @@ class NodeEvent(ABC): """ Abstract base class for all node-related events in the system. """ - + @abstractmethod async def get_event_data(self): """ @@ -52,7 +52,7 @@ class MessageEvent: source (str): Address or identifier of the message sender. message (Any): The actual message payload. """ - + def __init__(self, message_type, source, message): """ Initializes a MessageEvent instance. @@ -335,6 +335,27 @@ async def is_concurrent(self) -> bool: return True +class DuplicatedMessageEvent(NodeEvent): + """ + Event triggered when a message is received that has already been processed. + + Attributes: + source (str): The address of the node that sent the duplicated message. + """ + + def __init__(self, source: str, message_type: str): + self.source = source + + def __str__(self): + return f"DuplicatedMessageEvent from {self.source}" + + async def get_event_data(self) -> tuple[str]: + return self.source + + async def is_concurrent(self) -> bool: + return True + + """ ############################## # ADDON EVENTS # ############################## @@ -348,7 +369,7 @@ class GPSEvent(AddonEvent): Attributes: distances (dict): A dictionary mapping node addresses to their respective distances. """ - + def __init__(self, distances: dict): """ Initializes a GPSEvent. @@ -379,7 +400,7 @@ class ChangeLocationEvent(AddonEvent): latitude (float): New latitude of the node. longitude (float): New longitude of the node. """ - + def __init__(self, latitude, longitude): """ Initializes a ChangeLocationEvent. @@ -402,7 +423,8 @@ async def get_event_data(self): tuple: A tuple containing latitude and longitude. """ return (self.latitude, self.longitude) - + + class TestMetricsEvent(AddonEvent): def __init__(self, loss, accuracy): self._loss = loss diff --git a/nebula/core/network/communications.py b/nebula/core/network/communications.py index 5270c5d13..97b2a9233 100755 --- a/nebula/core/network/communications.py +++ b/nebula/core/network/communications.py @@ -6,7 +6,7 @@ import requests from nebula.core.eventmanager import EventManager -from nebula.core.nebulaevents import MessageEvent +from nebula.core.nebulaevents import DuplicatedMessageEvent, MessageEvent from nebula.core.network.blacklist import BlackList from nebula.core.network.connection import Connection from nebula.core.network.discoverer import Discoverer @@ -803,7 +803,7 @@ async def deploy_additional_services(self): await self._forwarder.start() self._propagator.start() - async def include_received_message_hash(self, hash_message): + async def include_received_message_hash(self, hash_message, source=None): """ Adds a received message hash to the tracking list if it hasn't been seen before. @@ -819,6 +819,8 @@ async def include_received_message_hash(self, hash_message): await self.receive_messages_lock.acquire_async() if hash_message in self.received_messages_hashes: logging.info("❗️ handle_incoming_message | Ignoring message already received.") + duplicated_event = DuplicatedMessageEvent(source, "Duplicated message received") + asyncio.create_task(EventManager.get_instance().publish_node_event(duplicated_event)) return False self.received_messages_hashes.append(hash_message) if len(self.received_messages_hashes) % 10000 == 0: diff --git a/nebula/core/network/messages.py b/nebula/core/network/messages.py index 8f1e16cd3..7870acddf 100644 --- a/nebula/core/network/messages.py +++ b/nebula/core/network/messages.py @@ -151,7 +151,7 @@ async def process_message(self, data, addr_from): # Message-specific forwarding and processing elif message_type in special_processing_messages: - if await self.cm.include_received_message_hash(hashlib.md5(data).hexdigest()): + if await self.cm.include_received_message_hash(hashlib.md5(data).hexdigest(), addr_from): # Forward the message if required if self._should_forward_message(message_type, message_wrapper): await self.cm.forward_message(data, addr_from) diff --git a/nebula/frontend/config/participant.json.example b/nebula/frontend/config/participant.json.example index 4f4d9b260..5edca2033 100755 --- a/nebula/frontend/config/participant.json.example +++ b/nebula/frontend/config/participant.json.example @@ -91,11 +91,11 @@ "aggregation_timeout": 60, "aggregation_push": "slow" }, - "defense_args": { + "defense_args": { "reputation": { - "enabled": false, - "metrics": {}, - "initial_reputation": 0.6, + "with_reputation": false, + "reputation_metrics": [], + "initial_reputation": 0.2, "weighting_factor": "dynamic" } }, diff --git a/nebula/frontend/static/js/deployment/reputation.js b/nebula/frontend/static/js/deployment/reputation.js index f74fb6fea..75bb7d622 100644 --- a/nebula/frontend/static/js/deployment/reputation.js +++ b/nebula/frontend/static/js/deployment/reputation.js @@ -60,70 +60,62 @@ const ReputationManager = (function() { } function getReputationConfig() { + const rep_metrics = []; + + if (document.getElementById("model-similarity").checked) + rep_metrics.push("model_similarity"); + if (document.getElementById("num-messages").checked) + rep_metrics.push("num_messages"); + if (document.getElementById("model-arrival-latency").checked) + rep_metrics.push("model_arrival_latency"); + if (document.getElementById("fraction-parameters-changed").checked) + rep_metrics.push("fraction_parameters_changed"); + return { - enabled: document.getElementById("reputationSwitch").checked, - initialReputation: parseFloat(document.getElementById("initial-reputation").value), - weightingFactor: document.getElementById("weighting-factor").value, - metrics: { - model_similarity: { - enabled: document.getElementById("model-similarity").checked, - weight: parseFloat(document.getElementById("weight-model-similarity").value) - }, - num_messages: { - enabled: document.getElementById("num-messages").checked, - weight: parseFloat(document.getElementById("weight-num-messages").value) - }, - model_arrival_latency: { - enabled: document.getElementById("model-arrival-latency").checked, - weight: parseFloat(document.getElementById("weight-model-arrival-latency").value) - }, - fraction_parameters_changed: { - enabled: document.getElementById("fraction-parameters-changed").checked, - weight: parseFloat(document.getElementById("weight-fraction-parameters-changed").value) - } - } + with_reputation: document.getElementById("reputationSwitch").checked, + reputation_metrics: rep_metrics, + initial_reputation: parseFloat(document.getElementById("initial-reputation").value), + weighting_factor: document.getElementById("weighting-factor").value, + weight_model_arrival_latency: parseFloat(document.getElementById("weight-model-arrival-latency").value), + weight_model_similarity: parseFloat(document.getElementById("weight-model-similarity").value), + weight_num_messages: parseFloat(document.getElementById("weight-num-messages").value), + weight_fraction_params_changed: parseFloat(document.getElementById("weight-fraction-parameters-changed").value), }; } function setReputationConfig(config) { if (!config) return; - // Set reputation enabled/disabled - document.getElementById("reputationSwitch").checked = config.enabled; - document.getElementById("reputation-metrics").style.display = config.enabled ? "block" : "none"; - document.getElementById("reputation-settings").style.display = config.enabled ? "block" : "none"; - document.getElementById("weighting-settings").style.display = config.enabled ? "block" : "none"; + const enabled = config.with_reputation ?? config.enabled ?? false; + + // Set reputation switch and visibility + document.getElementById("reputationSwitch").checked = enabled; + document.getElementById("reputation-metrics").style.display = enabled ? "block" : "none"; + document.getElementById("reputation-settings").style.display = enabled ? "block" : "none"; + document.getElementById("weighting-settings").style.display = enabled ? "block" : "none"; - // Set initial reputation - document.getElementById("initial-reputation").value = config.initialReputation || 0.6; + // Initial reputation and weighting factor + document.getElementById("initial-reputation").value = config.initial_reputation ?? config.initialReputation ?? 0.2; + document.getElementById("weighting-factor").value = config.weighting_factor ?? config.weightingFactor ?? "dynamic"; - // Set weighting factor - document.getElementById("weighting-factor").value = config.weightingFactor || "dynamic"; - const showWeights = config.weightingFactor === "static"; + const showWeights = (config.weighting_factor ?? config.weightingFactor) === "static"; document.querySelectorAll(".weight-input").forEach(input => { input.style.display = showWeights ? "inline-block" : "none"; }); - // Set metrics - if (config.metrics) { - // Model Similarity - document.getElementById("model-similarity").checked = config.metrics.modelSimilarity?.enabled || false; - document.getElementById("weight-model-similarity").value = config.metrics.modelSimilarity?.weight || 0; + // Metrics (both legacy flat and nested) + document.getElementById("model-similarity").checked = config.reputation_metrics?.includes("modelSimilarity") ?? config.metrics?.modelSimilarity?.enabled ?? false; + document.getElementById("weight-model-similarity").value = config.weight_model_similarity ?? config.metrics?.modelSimilarity?.weight ?? 0; - // Number of Messages - document.getElementById("num-messages").checked = config.metrics.numMessages?.enabled || false; - document.getElementById("weight-num-messages").value = config.metrics.numMessages?.weight || 0; + document.getElementById("num-messages").checked = config.reputation_metrics?.includes("numMessages") ?? config.metrics?.numMessages?.enabled ?? false; + document.getElementById("weight-num-messages").value = config.weight_num_messages ?? config.metrics?.numMessages?.weight ?? 0; - // Model Arrival Latency - document.getElementById("model-arrival-latency").checked = config.metrics.modelArrivalLatency?.enabled || false; - document.getElementById("weight-model-arrival-latency").value = config.metrics.modelArrivalLatency?.weight || 0; + document.getElementById("model-arrival-latency").checked = config.reputation_metrics?.includes("modelArrivalLatency") ?? config.metrics?.modelArrivalLatency?.enabled ?? false; + document.getElementById("weight-model-arrival-latency").value = config.weight_model_arrival_latency ?? config.metrics?.modelArrivalLatency?.weight ?? 0; - // Fraction Parameters Changed - document.getElementById("fraction-parameters-changed").checked = config.metrics.fractionParametersChanged?.enabled || false; - document.getElementById("weight-fraction-parameters-changed").value = config.metrics.fractionParametersChanged?.weight || 0; - } + document.getElementById("fraction-parameters-changed").checked = config.reputation_metrics?.includes("fractionParametersChanged") ?? config.metrics?.fractionParametersChanged?.enabled ?? false; + document.getElementById("weight-fraction-parameters-changed").value = config.weight_fraction_params_changed ?? config.metrics?.fractionParametersChanged?.weight ?? 0; - // Validate weights validateWeights(); } @@ -133,7 +125,7 @@ const ReputationManager = (function() { document.getElementById("reputation-metrics").style.display = "none"; document.getElementById("reputation-settings").style.display = "none"; document.getElementById("weighting-settings").style.display = "none"; - document.getElementById("initial-reputation").value = "0.6"; + document.getElementById("initial-reputation").value = "0.2"; document.getElementById("weighting-factor").value = "dynamic"; document.getElementById("weight-warning").style.display = "none"; diff --git a/nebula/frontend/static/js/deployment/scenario.js b/nebula/frontend/static/js/deployment/scenario.js index af2d97002..28f10926f 100644 --- a/nebula/frontend/static/js/deployment/scenario.js +++ b/nebula/frontend/static/js/deployment/scenario.js @@ -75,12 +75,7 @@ const ScenarioManager = (function() { report_status_data_queue: document.getElementById("reportingSwitch").checked, epochs: parseInt(document.getElementById("epochs").value), attack_params: attackConfig, - reputation: { - enabled: window.ReputationManager.getReputationConfig().enabled || false, - metrics: window.ReputationManager.getReputationConfig().metrics || {}, - initial_reputation: window.ReputationManager.getReputationConfig().initialReputation || 0.6, - weighting_factor: window.ReputationManager.getReputationConfig().weightingFactor || "dynamic" - }, + reputation: window.ReputationManager.getReputationConfig(), mobility: window.MobilityManager.getMobilityConfig().enabled || false, network_simulation: window.MobilityManager.getMobilityConfig().network_simulation || false, mobility_type: window.MobilityManager.getMobilityConfig().mobilityType || "random", @@ -353,16 +348,16 @@ const ScenarioManager = (function() { function setPhysicalIPs(ipList = []) { physical_ips = [...ipList]; } - + function setActualScenario(index) { actual_scenario = index; if (scenariosList[index]) { // Clear the current graph window.TopologyManager.clearGraph(); - + // Load new scenario data loadScenarioData(scenariosList[index]); - + // If physical deployment, set physical IPs if (scenariosList[index].deployment === 'physical' && scenariosList[index].physical_ips) { window.TopologyManager.setPhysicalIPs(scenariosList[index].physical_ips);