Delta deduplication#55
Conversation
There was a problem hiding this comment.
Pull request overview
This PR replaces DeepDiff-based gNMI diffing with a custom delta format and updates consumers to apply or inspect those deltas. It fits into the synchronization path between realnet gNMI notifications, sibling topology updates, and CI-triggered fuzzer behavior.
Changes:
- Adds a new delta calculation module for gNMI notification updates.
- Changes gNMI publishing to send full data conditionally and otherwise send deltas.
- Updates the controller and CI app to consume the changed notification structure.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 22 comments.
| File | Description |
|---|---|
interfaces/gnmi.py |
Switches diff calculation to the new delta module and changes notification payload publishing. |
interfaces/delta_based_dedup.py |
Adds custom delta generation helpers for gNMI notification data. |
controllers/controller.py |
Adds delta application logic and uses either full data or reconstructed state for sibling updates. |
apps/ci.py |
Updates fuzzer trigger logic for gNMI diff handling. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| new_values = update.get("val", update.get("values", {})) | ||
| current_values = {} | ||
| # Get the last stored state for this path | ||
| if "notification" in oldData: |
| sendFullData = self.updateCounter == 0 | ||
| # if differential data exists and is empty, don't send updates the queues | ||
| if diff is not None and len(diff) > 0: | ||
| # even if diff is null send data in regular intervals | ||
| if (diff is not None and len(diff) > 0) or sendFullData: |
| if "data" in task: | ||
| self.current_state = task["data"] | ||
| elif task["diff"] != {}: | ||
| # notification_data = task["data"] | ||
| self.current_state = self.apply_diff(task["diff"]) |
| if path_key == change_path_key: | ||
| # Process each change in the list | ||
| for change in change_list: | ||
| if not isinstance(change, dict): | ||
| continue | ||
|
|
||
| change_type = change.get("type") | ||
| json_key = change.get("json_key", "") | ||
|
|
||
| if change_type == "ALLNEW": | ||
| # Replace the entire val with new_value | ||
| update["val"] = change.get("new_value", {}) |
| task.get("diff") | ||
| and task["diff"].get("values_changed") | ||
| and task["diff"]["values_changed"].items[1].t2 == "fuzz_me" | ||
| and isinstance(task["diff"].get("values_changed"), dict) | ||
| ): | ||
| # self.logger.debug("gNMI data changed: " + str(task['diff']['values_changed'])) | ||
| self.logger.info( | ||
| f"Sibling {sibling} detected gNMI notification 'fuzz_me', asking sec" | ||
| "app to run fuzzer..." | ||
| ) | ||
| # add task to queue for sec app | ||
| broker.publish( | ||
| "security", | ||
| { | ||
| "type": "run fuzzer", | ||
| "source": "ci", | ||
| "timestamp": time.time(), | ||
| "data": "", | ||
| }, | ||
| ) | ||
| for key, change in task["diff"]["values_changed"].items(): | ||
| if change.get("new_value") == "fuzz_me": |
| import json | ||
| from copy import deepcopy |
| for idx_notif, notif in enumerate(newData.get("notification", [])): | ||
| for idx, update in enumerate(notif.get("update", [])): |
| # if ( | ||
| # task.get("diff") | ||
| # and task["diff"].get("values_changed") | ||
| # and task["diff"]["values_changed"].items[1].t2 == "fuzz_me" | ||
| # ): |
| }, | ||
| "diff": diff | ||
| } | ||
| # send full data regulary |
| if len(oldData["notification"]) > idx_notif: | ||
| if len(oldData["notification"][idx_notif].get("update", [])) > idx: | ||
| current_update = oldData["notification"][idx_notif].get("update", [])[idx] | ||
| current_values = current_update.get("val", current_update.get("values", {})) |
|
@copilot yeah! I know! But I thought you could take a look at the test that failed. Otherwise I'd be happy to merge the new functionality |
The CI failure was a flake8 lint error in |
No description provided.