From 05d74c40032d8fabeb10c5cf054a160bac1f5b01 Mon Sep 17 00:00:00 2001 From: Alexander Frey Date: Wed, 20 Aug 2025 18:50:27 +0200 Subject: [PATCH 1/4] fix crash after one synchronisation --- apps/ci.py | 53 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/apps/ci.py b/apps/ci.py index 8d3f253..12aa95b 100644 --- a/apps/ci.py +++ b/apps/ci.py @@ -22,24 +22,43 @@ async def run(self, topo: dict, broker: EventBroker, task: dict): # if the gNMI data diff contains a value_change and the second item in the diff is fuzz_me if ( task.get("diff") - and task["diff"].get("values_changed") - and task["diff"]["values_changed"].items[1].t2 == "fuzz_me" + and isinstance(task["diff"].get("values_changed"), dict) ): - # self.logger.debug("gNMI data changed: " + str(task['diff']['values_changed'])) - self.logger.info( - f"Sibling {sibling} detected gNMI notification 'fuzz_me', asking sec" - "app to run fuzzer..." - ) - # add task to queue for sec app - broker.publish( - "security", - { - "type": "run fuzzer", - "source": "ci", - "timestamp": time.time(), - "data": "", - }, - ) + for key, change in task["diff"]["values_changed"].items(): + if change.get("new_value") == "fuzz_me": + self.logger.info( + f"Sibling {sibling} detected gNMI notification 'fuzz_me', asking sec app to run fuzzer..." + ) + broker.publish( + "security", + { + "type": "run fuzzer", + "source": "ci", + "timestamp": time.time(), + "data": "", + }, + ) + break + # if ( + # task.get("diff") + # and task["diff"].get("values_changed") + # and task["diff"]["values_changed"].items[1].t2 == "fuzz_me" + # ): + # # self.logger.debug("gNMI data changed: " + str(task['diff']['values_changed'])) + # self.logger.info( + # f"Sibling {sibling} detected gNMI notification 'fuzz_me', asking sec" + # "app to run fuzzer..." + # ) + # # add task to queue for sec app + # broker.publish( + # "security", + # { + # "type": "run fuzzer", + # "source": "ci", + # "timestamp": time.time(), + # "data": "", + # }, + # ) if task["type"] == "fuzzer result": duration = time.time() - task["request_timestamp"] From 4b8bddb123977639e7fd138202c96b64096b3d09 Mon Sep 17 00:00:00 2001 From: Alexander Frey Date: Wed, 20 Aug 2025 18:51:12 +0200 Subject: [PATCH 2/4] add delta based dedup file --- interfaces/delta_based_dedup.py | 110 ++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 interfaces/delta_based_dedup.py diff --git a/interfaces/delta_based_dedup.py b/interfaces/delta_based_dedup.py new file mode 100644 index 0000000..1e59db6 --- /dev/null +++ b/interfaces/delta_based_dedup.py @@ -0,0 +1,110 @@ +import json +from copy import deepcopy + +def calculate_delta_diff( oldData, newData): + """Check if anything changed from the last state""" + delta = {} + print("OLDDATA: ", oldData) + print() + print("NEWDATA: ", newData) + try: + for idx_notif, notif in enumerate(newData.get("notification", [])): + for idx, update in enumerate(notif.get("update", [])): + # Create unique key for each update + path = update.get("path") + if path is None: + path_key = f"NULL_PATH_{idx}" + else: + path_key = path + + # Get values for this specific update + new_values = update.get("val", update.get("values", {})) + current_values = {} + # Get the last stored state for this path + if "notification" in oldData: + if len(oldData["notification"]) > idx_notif: + if len(oldData["notification"][idx_notif].get("update", [])) > idx: + current_update = oldData["notification"][idx_notif].get("update", [])[idx] + current_values = current_update.get("val", current_update.get("values", {})) + print("CURRENT VALUES: ", current_values) + print() + # Find what changed + changes = _find_changes(new_values, current_values, path_key) + + + if changes: + delta[path_key] = changes + print(f"=== CHANGES DETECTED ===") + print(f"Path: {path_key}") + for change in changes: + print(f" {change}") + print("========================") + else: + print(f"=== NO CHANGES ===") + print(f"Path: {path_key}") + print("==================") + + except Exception as e: + print(f"Error processing notification: {e}") + return delta + + return delta + +def _get_path_key( notification): + """Extract the path from notification""" + try: + for notif in notification.get("notification", []): + for update in notif.get("update", []): + # Return the path even if it's None/null + return update.get("path") + return None + except: + return None + +def _get_values_from_notification( notification): + """Extract all the actual values from the notification""" + try: + for notif in notification.get("notification", []): + for update in notif.get("update", []): + return update.get("val", update.get("values", {})) + return {} + except: + return {} + +def _find_changes(new_values, old_values, path): + """Find what changed between new and old values""" + changes = [] + + # If this is the first time seeing this path, everything is "new" + if not old_values: + #changes.append(f"NEW PATH: {path} (all values are new)") + changes.append({"json_key": path, "new_value": new_values, "type": "ALLNEW"}) + return changes + + # Compare all fields recursively + _compare_dict(new_values, old_values, "", changes) + + return changes + +def _compare_dict( new_dict, old_dict, prefix, changes): + """Recursively compare two dictionaries and find changes""" + # Check for new or changed fields + for key, new_value in new_dict.items(): + full_key = f"{prefix}.{key}" if prefix else key + + if key not in old_dict: + # changes.append(f"NEW: {full_key} = {new_value}") + changes.append({"json_key": full_key, "new_value": new_value, "type": "NEW" }) + elif isinstance(new_value, dict) and isinstance(old_dict[key], dict): + # Recursively compare nested dictionaries + _compare_dict(new_value, old_dict[key], full_key, changes) + elif new_value != old_dict[key]: + # changes.append(f"CHANGED: {full_key} = {old_dict[key]} → {new_value}") + changes.append({"json_key": full_key, "new_value": new_value,"old_value": old_dict[key], "type": "CHANGED" }) + + # Check for removed fields + for key in old_dict: + if key not in new_dict: + full_key = f"{prefix}.{key}" if prefix else key + # changes.append(f"REMOVED: {full_key}") + changes.append({"json_key": full_key, "type": "REMOVED" }) \ No newline at end of file From 4e89165418092466f2178dbb974048f5986f2ce8 Mon Sep 17 00:00:00 2001 From: Alexander Frey Date: Wed, 20 Aug 2025 18:52:39 +0200 Subject: [PATCH 3/4] replace deepdiff dedup with delta dedup --- controllers/controller.py | 119 ++++++++++++++++++++++++++++++++------ interfaces/gnmi.py | 64 +++++++++++++------- 2 files changed, 143 insertions(+), 40 deletions(-) diff --git a/controllers/controller.py b/controllers/controller.py index d71b973..998b9c6 100644 --- a/controllers/controller.py +++ b/controllers/controller.py @@ -405,25 +405,108 @@ def __set_gnmi_data_on_nodes(self, task, sibling): and self.sibling_topo.get(sibling) is not None and self.sibling_topo[sibling]["running"] ): - if task["diff"] != {}: - notification_data = task["data"] - node = task["node"] - node_name = node - path = task["path"] - gnmi_instance = gnmi( - self.config, - sibling, - self.logger, - self.topology_prefix, - self.topology_name, - ) - gnmi_instance.setNodeUpdate( - self.sibling_topo[sibling]["nodes"], - node_name, - path, - notification_data, - ) + if "data" in task: + self.current_state = task["data"] + elif task["diff"] != {}: + # notification_data = task["data"] + self.current_state = self.apply_diff(task["diff"]) + node = task["node"] + node_name = node + path = task["path"] + gnmi_instance = gnmi( + self.config, + sibling, + self.logger, + self.topology_prefix, + self.topology_name, + ) + gnmi_instance.setNodeUpdate( + self.sibling_topo[sibling]["nodes"], + node_name, + path, + self.current_state, + ) + def apply_diff(self, changes): + """ + Apply differential changes to the current state. + + Args: + changes (dict): Dictionary containing the differential changes from delta_based_dedup + + Returns: + dict: Updated state after applying the changes + """ + # Make a deep copy to avoid modifying the original + updated_state = copy.deepcopy(self.current_state) + + for change_path_key in changes: + # changes[change_path_key] is a LIST of changes, not a single change + change_list = changes[change_path_key] + + for idx_notif, notif in enumerate(updated_state.get("notification", [])): + for idx, update in enumerate(notif.get("update", [])): + path = update.get("path") + if path is None: + path_key = f"NULL_PATH_{idx}" + else: + path_key = path + + if path_key == change_path_key: + # Process each change in the list + for change in change_list: + if not isinstance(change, dict): + continue + + change_type = change.get("type") + json_key = change.get("json_key", "") + + if change_type == "ALLNEW": + # Replace the entire val with new_value + update["val"] = change.get("new_value", {}) + + elif change_type in ["NEW", "CHANGED"]: + # Navigate to the correct location in val and update + if "val" not in update: + update["val"] = {} + + # Split the JSON path into components + json_key_parts = json_key.split(".") if json_key else [] + + # Navigate through the nested structure in val + current = update["val"] + for part in json_key_parts[:-1]: + if part not in current: + current[part] = {} + current = current[part] + + # Set the new value at the last path component + if json_key_parts: + last_key = json_key_parts[-1] + current[last_key] = change.get("new_value") + + elif change_type == "REMOVED": + # Remove the key from val + if "val" in update: + json_key_parts = json_key.split(".") if json_key else [] + current = update["val"] + + # Navigate to parent + for part in json_key_parts[:-1]: + if part in current and isinstance(current, dict): + current = current[part] + else: + break + + # Remove the key if it exists + if json_key_parts and isinstance(current, dict): + last_key = json_key_parts[-1] + current.pop(last_key, None) + + return updated_state + + + def __build_sibling_topology(self, task, sibling): if task["type"] == "topology build request" and task["sibling"] == sibling: self.sibling_topo[sibling] = self.__build_topology( diff --git a/interfaces/gnmi.py b/interfaces/gnmi.py index bebe19e..6d72848 100644 --- a/interfaces/gnmi.py +++ b/interfaces/gnmi.py @@ -10,12 +10,13 @@ from multiprocessing import Queue, Semaphore from pygnmi.client import gNMIclient from deepdiff import DeepDiff, grep - +from .delta_based_dedup import calculate_delta_diff class gnmi(Interface): """ gNMI interface """ + updateCounter = 0 port = None username = None @@ -169,36 +170,55 @@ def _process_no_diff(self, node, path, node_paths, gc, broker: EventBroker): return node_paths def _calculate_diff(self, old_data, new_data): - # TODO evaluate gNMIclient show_diff? - if new_data | grep("Hello World! update for node"): - # if the new data contains the "Hello World! update for node" string, return an empty diff - # this excludes hello_world app updates from the diff - return {} - # exclude_timestamp = re.compile(r"\['timestamp'\]") - # node_data_diff = DeepDiff(old_data, new_data, ignore_order=True, exclude_regex_paths=[exclude_timestamp]) - node_data_diff = DeepDiff( - old_data, - new_data, - ignore_order=True, - exclude_regex_paths="\\['timestamp'\\]", - ) - return node_data_diff.tree + # # TODO evaluate gNMIclient show_diff? + # if new_data | grep("Hello World! update for node"): + # # if the new data contains the "Hello World! update for node" string, return an empty diff + # # this excludes hello_world app updates from the diff + # return {} + # # exclude_timestamp = re.compile(r"\['timestamp'\]") + # # node_data_diff = DeepDiff(old_data, new_data, ignore_order=True, exclude_regex_paths=[exclude_timestamp]) + # node_data_diff = DeepDiff( + # old_data, + # new_data, + # ignore_order=True, + # exclude_regex_paths="\\['timestamp'\\]", + # ) + # print(node_data_diff.tree) + # return node_data_diff.tree + + + diff = calculate_delta_diff(old_data, new_data) + + if diff != {}: + print() + print("*******************",diff) + print() + return diff + def _send_update_to_queues(self, node, path, node_data, diff, broker: EventBroker): + sendFullData = self.updateCounter == 0 # if differential data exists and is empty, don't send updates the queues - if diff is not None and len(diff) > 0: + # even if diff is null send data in regular intervals + if (diff is not None and len(diff) > 0) or sendFullData: for channel in broker.get_sibling_channels(): - broker.publish( - channel, - { + dataToSend = { "type": "gNMI notification", "source": self.target_topo, "node": node, "path": path, - "data": node_data, - "diff": diff, - }, + "diff": diff + } + # send full data regulary + if sendFullData: + dataToSend["data"] = node_data + + broker.publish( + channel, + dataToSend ) + self.updateCounter = self.updateCounter + 1 + # print("****node_data: ****", node_data) def setNodeUpdate( self, nodes: dict, node_name: str, path: str, notification_data: dict From 742ac4f1e8283979c698a613c50eba19b5fbbc0e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 31 May 2026 10:37:31 +0000 Subject: [PATCH 4/4] fix: remove unused global broker declaration in gracefull_shutdown_handler --- digsinet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/digsinet.py b/digsinet.py index eb5273a..37f8665 100755 --- a/digsinet.py +++ b/digsinet.py @@ -19,7 +19,6 @@ def gracefull_shutdown_handler(sig, frame): - global broker print("Shutting down gracefully...") if broker: broker.close()