Skip to content

Commit e3acdad

Browse files
committed
Add snapshotting for deltas incase of IPC failure
1 parent 806bd5a commit e3acdad

2 files changed

Lines changed: 72 additions & 0 deletions

File tree

aikido_zen/thread/thread_cache.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,22 @@ def _clear_synced_deltas(self):
5555
self.ai_stats.clear()
5656
PackagesStore.clear()
5757

58+
def _restore_synced_deltas(self, payload):
59+
"""Merges a previously-cleared payload back, used when an IPC sync fails."""
60+
self.middleware_installed = (
61+
self.middleware_installed or payload["middleware_installed"]
62+
)
63+
for entry in payload["hostnames"]:
64+
self.hostnames.add(entry["hostname"], entry["port"], entry["hits"])
65+
for entry in payload["users"]:
66+
self.users.add_user_from_entry(entry)
67+
self.stats.import_from_record(payload["stats"])
68+
self.ai_stats.import_list(payload["ai_stats"])
69+
for pkg in payload["packages"]:
70+
existing = PackagesStore.get_package(pkg["name"])
71+
if existing:
72+
existing["cleared"] = False
73+
5874
def renew(self):
5975
if not comms.get_comms():
6076
return
@@ -79,6 +95,7 @@ def renew(self):
7995
receive=True,
8096
)
8197
if not res["success"] or not res["data"]:
98+
self._restore_synced_deltas(payload)
8299
return
83100

84101
# update config

aikido_zen/thread/thread_cache_test.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,61 @@ def test_renew_called_with_empty_routes(mock_get_comms, thread_cache: ThreadCach
429429
)
430430

431431

432+
@patch("aikido_zen.background_process.comms.get_comms")
433+
def test_renew_preserves_increments_during_ipc(
434+
mock_get_comms, thread_cache: ThreadCache
435+
):
436+
"""Increments arriving during the IPC call survive on the response path -
437+
the snapshot is sent, but the live counter keeps the concurrent increment."""
438+
mock_comms = MagicMock()
439+
mock_get_comms.return_value = mock_comms
440+
441+
thread_cache.stats.increment_total_hits()
442+
thread_cache.stats.increment_total_hits()
443+
444+
def simulate_concurrent_increment(*args, **kwargs):
445+
thread_cache.stats.increment_total_hits()
446+
return {"success": True, "data": {"routes": {}}}
447+
448+
mock_comms.send_data_to_bg_process.side_effect = simulate_concurrent_increment
449+
450+
thread_cache.renew()
451+
452+
sent_total = mock_comms.send_data_to_bg_process.call_args.kwargs["obj"]["stats"][
453+
"requests"
454+
]["total"]
455+
assert sent_total == 2
456+
assert thread_cache.stats.get_record()["requests"]["total"] == 1
457+
458+
459+
@patch("aikido_zen.background_process.comms.get_comms")
460+
def test_renew_restores_deltas_on_ipc_failure(
461+
mock_get_comms, thread_cache: ThreadCache
462+
):
463+
"""If the IPC fails, the cleared deltas must be merged back on top of any
464+
concurrent increments - nothing lost, nothing double-counted."""
465+
mock_comms = MagicMock()
466+
mock_get_comms.return_value = mock_comms
467+
468+
thread_cache.stats.increment_total_hits()
469+
thread_cache.stats.increment_total_hits()
470+
thread_cache.ai_stats.on_ai_call("openai", "gpt-4o", 100, 50)
471+
thread_cache.middleware_installed = True
472+
473+
def fail_after_concurrent_increment(*args, **kwargs):
474+
thread_cache.stats.increment_total_hits()
475+
return {"success": False}
476+
477+
mock_comms.send_data_to_bg_process.side_effect = fail_after_concurrent_increment
478+
479+
thread_cache.renew()
480+
481+
# 2 from the snapshot + 1 from the concurrent increment
482+
assert thread_cache.stats.get_record()["requests"]["total"] == 3
483+
assert thread_cache.middleware_installed is True
484+
assert thread_cache.ai_stats.get_stats()[0]["calls"] == 1
485+
486+
432487
@patch("aikido_zen.background_process.comms.get_comms")
433488
def test_renew_called_with_no_requests(mock_get_comms, thread_cache: ThreadCache):
434489
"""Test that renew calls send_data_to_bg_process with zero requests."""

0 commit comments

Comments
 (0)