Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions apps/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,43 @@ async def run(self, topo: dict, broker: EventBroker, task: dict):
# if the gNMI data diff contains a value_change and the second item in the diff is fuzz_me
if (
task.get("diff")
and task["diff"].get("values_changed")
and task["diff"]["values_changed"].items[1].t2 == "fuzz_me"
and isinstance(task["diff"].get("values_changed"), dict)
):
# self.logger.debug("gNMI data changed: " + str(task['diff']['values_changed']))
self.logger.info(
f"Sibling {sibling} detected gNMI notification 'fuzz_me', asking sec"
"app to run fuzzer..."
)
# add task to queue for sec app
broker.publish(
"security",
{
"type": "run fuzzer",
"source": "ci",
"timestamp": time.time(),
"data": "",
},
)
for key, change in task["diff"]["values_changed"].items():
if change.get("new_value") == "fuzz_me":
Comment on lines 24 to +28
self.logger.info(
f"Sibling {sibling} detected gNMI notification 'fuzz_me', asking sec app to run fuzzer..."
)
broker.publish(
"security",
{
"type": "run fuzzer",
"source": "ci",
"timestamp": time.time(),
"data": "",
},
)
break
# if (
# task.get("diff")
# and task["diff"].get("values_changed")
# and task["diff"]["values_changed"].items[1].t2 == "fuzz_me"
# ):
Comment on lines +42 to +46
# # self.logger.debug("gNMI data changed: " + str(task['diff']['values_changed']))
# self.logger.info(
# f"Sibling {sibling} detected gNMI notification 'fuzz_me', asking sec"
# "app to run fuzzer..."
# )
# # add task to queue for sec app
# broker.publish(
# "security",
# {
# "type": "run fuzzer",
# "source": "ci",
# "timestamp": time.time(),
# "data": "",
# },
# )

if task["type"] == "fuzzer result":
duration = time.time() - task["request_timestamp"]
Expand Down
119 changes: 101 additions & 18 deletions controllers/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,25 +405,108 @@ def __set_gnmi_data_on_nodes(self, task, sibling):
and self.sibling_topo.get(sibling) is not None
and self.sibling_topo[sibling]["running"]
):
if task["diff"] != {}:
notification_data = task["data"]
node = task["node"]
node_name = node
path = task["path"]
gnmi_instance = gnmi(
self.config,
sibling,
self.logger,
self.topology_prefix,
self.topology_name,
)
gnmi_instance.setNodeUpdate(
self.sibling_topo[sibling]["nodes"],
node_name,
path,
notification_data,
)
if "data" in task:
self.current_state = task["data"]
elif task["diff"] != {}:
# notification_data = task["data"]
self.current_state = self.apply_diff(task["diff"])
Comment on lines +408 to +412
node = task["node"]
node_name = node
path = task["path"]
gnmi_instance = gnmi(
self.config,
sibling,
self.logger,
self.topology_prefix,
self.topology_name,
)
gnmi_instance.setNodeUpdate(
self.sibling_topo[sibling]["nodes"],
node_name,
path,
self.current_state,
)

def apply_diff(self, changes):
"""
Apply differential changes to the current state.

Args:
changes (dict): Dictionary containing the differential changes from delta_based_dedup

Returns:
dict: Updated state after applying the changes
"""
# Make a deep copy to avoid modifying the original
updated_state = copy.deepcopy(self.current_state)

for change_path_key in changes:
# changes[change_path_key] is a LIST of changes, not a single change
change_list = changes[change_path_key]

for idx_notif, notif in enumerate(updated_state.get("notification", [])):
for idx, update in enumerate(notif.get("update", [])):
path = update.get("path")
if path is None:
path_key = f"NULL_PATH_{idx}"
else:
path_key = path
Comment on lines +449 to +453

if path_key == change_path_key:
# Process each change in the list
for change in change_list:
if not isinstance(change, dict):
continue

change_type = change.get("type")
json_key = change.get("json_key", "")

if change_type == "ALLNEW":
# Replace the entire val with new_value
update["val"] = change.get("new_value", {})
Comment on lines +455 to +466

elif change_type in ["NEW", "CHANGED"]:
# Navigate to the correct location in val and update
if "val" not in update:
update["val"] = {}

# Split the JSON path into components
json_key_parts = json_key.split(".") if json_key else []

# Navigate through the nested structure in val
current = update["val"]
for part in json_key_parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]

# Set the new value at the last path component
if json_key_parts:
last_key = json_key_parts[-1]
current[last_key] = change.get("new_value")

elif change_type == "REMOVED":
# Remove the key from val
if "val" in update:
json_key_parts = json_key.split(".") if json_key else []
current = update["val"]

# Navigate to parent
for part in json_key_parts[:-1]:
if part in current and isinstance(current, dict):
current = current[part]
else:
break

# Remove the key if it exists
if json_key_parts and isinstance(current, dict):
last_key = json_key_parts[-1]
current.pop(last_key, None)

return updated_state



def __build_sibling_topology(self, task, sibling):
if task["type"] == "topology build request" and task["sibling"] == sibling:
self.sibling_topo[sibling] = self.__build_topology(
Expand Down
1 change: 0 additions & 1 deletion digsinet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@


def gracefull_shutdown_handler(sig, frame):
global broker
print("Shutting down gracefully...")
if broker:
broker.close()
Expand Down
110 changes: 110 additions & 0 deletions interfaces/delta_based_dedup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import json
from copy import deepcopy
Comment on lines +1 to +2

def calculate_delta_diff( oldData, newData):
"""Check if anything changed from the last state"""
delta = {}
print("OLDDATA: ", oldData)
print()
print("NEWDATA: ", newData)
Comment on lines +7 to +9
try:
for idx_notif, notif in enumerate(newData.get("notification", [])):
for idx, update in enumerate(notif.get("update", [])):
Comment on lines +11 to +12
# Create unique key for each update
path = update.get("path")
if path is None:
path_key = f"NULL_PATH_{idx}"
else:
path_key = path
Comment on lines +15 to +18

# Get values for this specific update
new_values = update.get("val", update.get("values", {}))
current_values = {}
# Get the last stored state for this path
if "notification" in oldData:
if len(oldData["notification"]) > idx_notif:
if len(oldData["notification"][idx_notif].get("update", [])) > idx:
current_update = oldData["notification"][idx_notif].get("update", [])[idx]
current_values = current_update.get("val", current_update.get("values", {}))
Comment on lines +25 to +28
print("CURRENT VALUES: ", current_values)
print()
Comment on lines +29 to +30
# Find what changed
changes = _find_changes(new_values, current_values, path_key)


if changes:
delta[path_key] = changes
Comment on lines +35 to +36
print(f"=== CHANGES DETECTED ===")
print(f"Path: {path_key}")
for change in changes:
print(f" {change}")
print("========================")
Comment on lines +37 to +41
else:
print(f"=== NO CHANGES ===")
print(f"Path: {path_key}")
print("==================")
Comment on lines +43 to +45

except Exception as e:
print(f"Error processing notification: {e}")
return delta

return delta

def _get_path_key( notification):
"""Extract the path from notification"""
Comment on lines +53 to +54
try:
for notif in notification.get("notification", []):
for update in notif.get("update", []):
# Return the path even if it's None/null
return update.get("path")
return None
except:
return None

def _get_values_from_notification( notification):
"""Extract all the actual values from the notification"""
Comment on lines +64 to +65
try:
for notif in notification.get("notification", []):
for update in notif.get("update", []):
return update.get("val", update.get("values", {}))
return {}
except:
return {}

def _find_changes(new_values, old_values, path):
"""Find what changed between new and old values"""
changes = []

# If this is the first time seeing this path, everything is "new"
if not old_values:
#changes.append(f"NEW PATH: {path} (all values are new)")
changes.append({"json_key": path, "new_value": new_values, "type": "ALLNEW"})
Comment on lines +79 to +81
return changes

# Compare all fields recursively
_compare_dict(new_values, old_values, "", changes)

return changes

def _compare_dict( new_dict, old_dict, prefix, changes):
"""Recursively compare two dictionaries and find changes"""
# Check for new or changed fields
for key, new_value in new_dict.items():
full_key = f"{prefix}.{key}" if prefix else key

if key not in old_dict:
# changes.append(f"NEW: {full_key} = {new_value}")
changes.append({"json_key": full_key, "new_value": new_value, "type": "NEW" })
elif isinstance(new_value, dict) and isinstance(old_dict[key], dict):
# Recursively compare nested dictionaries
_compare_dict(new_value, old_dict[key], full_key, changes)
elif new_value != old_dict[key]:
# changes.append(f"CHANGED: {full_key} = {old_dict[key]} → {new_value}")
changes.append({"json_key": full_key, "new_value": new_value,"old_value": old_dict[key], "type": "CHANGED" })

# Check for removed fields
for key in old_dict:
if key not in new_dict:
full_key = f"{prefix}.{key}" if prefix else key
# changes.append(f"REMOVED: {full_key}")
changes.append({"json_key": full_key, "type": "REMOVED" })
64 changes: 42 additions & 22 deletions interfaces/gnmi.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
from multiprocessing import Queue, Semaphore
from pygnmi.client import gNMIclient
from deepdiff import DeepDiff, grep

from .delta_based_dedup import calculate_delta_diff

class gnmi(Interface):
"""
gNMI interface
"""
updateCounter = 0

port = None
username = None
Expand Down Expand Up @@ -169,36 +170,55 @@ def _process_no_diff(self, node, path, node_paths, gc, broker: EventBroker):
return node_paths

def _calculate_diff(self, old_data, new_data):
# TODO evaluate gNMIclient show_diff?
if new_data | grep("Hello World! update for node"):
# if the new data contains the "Hello World! update for node" string, return an empty diff
# this excludes hello_world app updates from the diff
return {}
# exclude_timestamp = re.compile(r"\['timestamp'\]")
# node_data_diff = DeepDiff(old_data, new_data, ignore_order=True, exclude_regex_paths=[exclude_timestamp])
node_data_diff = DeepDiff(
old_data,
new_data,
ignore_order=True,
exclude_regex_paths="\\['timestamp'\\]",
)
return node_data_diff.tree
# # TODO evaluate gNMIclient show_diff?
# if new_data | grep("Hello World! update for node"):
# # if the new data contains the "Hello World! update for node" string, return an empty diff
# # this excludes hello_world app updates from the diff
# return {}
Comment on lines +174 to +177
# # exclude_timestamp = re.compile(r"\['timestamp'\]")
# # node_data_diff = DeepDiff(old_data, new_data, ignore_order=True, exclude_regex_paths=[exclude_timestamp])
# node_data_diff = DeepDiff(
# old_data,
# new_data,
# ignore_order=True,
# exclude_regex_paths="\\['timestamp'\\]",
# )
# print(node_data_diff.tree)
# return node_data_diff.tree


diff = calculate_delta_diff(old_data, new_data)

if diff != {}:
print()
print("*******************",diff)
print()
Comment on lines +192 to +195
return diff


def _send_update_to_queues(self, node, path, node_data, diff, broker: EventBroker):
sendFullData = self.updateCounter == 0
# if differential data exists and is empty, don't send updates the queues
if diff is not None and len(diff) > 0:
# even if diff is null send data in regular intervals
if (diff is not None and len(diff) > 0) or sendFullData:
Comment on lines +200 to +203
for channel in broker.get_sibling_channels():
broker.publish(
channel,
{
dataToSend = {
"type": "gNMI notification",
"source": self.target_topo,
"node": node,
"path": path,
"data": node_data,
"diff": diff,
},
"diff": diff
}
# send full data regulary
if sendFullData:
dataToSend["data"] = node_data

broker.publish(
channel,
dataToSend
)
self.updateCounter = self.updateCounter + 1
# print("****node_data: ****", node_data)

def setNodeUpdate(
self, nodes: dict, node_name: str, path: str, notification_data: dict
Expand Down
Loading