@@ -58,6 +58,10 @@ pub const AgentStatus = struct {
5858var agent_statuses : [MAX_AGENTS ]AgentStatus = undefined ;
5959var status_count : usize = 0 ;
6060
61+ // Track last event time per issue for deduplication (5s window)
62+ var last_event_times : [MAX_AGENTS ][2 ]u32 = [_ ][2 ]u32 {.{ 0 , 0 }} ** MAX_AGENTS ; // [issue_index][0]=issue, [1]=timestamp
63+ var last_event_count : usize = 0 ;
64+
6165// ═══════════════════════════════════════════════════════════════════════════════
6266// PUBLIC API
6367// ═══════════════════════════════════════════════════════════════════════════════
@@ -69,6 +73,11 @@ pub fn runMonitor(port: u16) !void {
6973
7074 std .log .info ("Cloud Monitor starting on port {d}" , .{actual_port });
7175
76+ // Restore state from JSONL on startup
77+ restoreStateFromEvents () catch | err | {
78+ std .log .warn ("Failed to restore state from events: {}" , .{err });
79+ };
80+
7281 const address = net .Address .parseIp4 ("0.0.0.0" , actual_port ) catch return ;
7382 var server = address .listen (.{ .reuse_address = true }) catch | err | {
7483 std .log .err ("Cloud Monitor failed to bind: {}" , .{err });
@@ -90,7 +99,13 @@ pub fn runMonitor(port: u16) !void {
9099
91100/// Update agent status (called from HTTP POST handler).
92101/// Persists to JSONL, alerts on error states via tri notify.
102+ /// Skips duplicate events within 5s window.
93103pub fn updateStatus (issue : u32 , status_str : []const u8 , detail : []const u8 ) void {
104+ // Deduplication: skip if same status within 5s
105+ if (shouldSkipEvent (issue , status_str )) {
106+ return ;
107+ }
108+
94109 // 1. Append to JSONL event log
95110 appendEvent (issue , status_str , detail );
96111
@@ -197,6 +212,130 @@ pub fn getStatusJson(buf: []u8) []const u8 {
197212 return fbs .getWritten ();
198213}
199214
215+ /// Restore agent states from JSONL event log on startup.
216+ /// Reads the last event for each issue and populates agent_statuses.
217+ fn restoreStateFromEvents () ! void {
218+ const file = std .fs .cwd ().openFile (EVENTS_FILE , .{}) catch return ;
219+ defer file .close ();
220+
221+ // Read entire file
222+ var file_buf : [65536 ]u8 = undefined ;
223+ const file_len = file .readAll (& file_buf ) catch 0 ;
224+ if (file_len == 0 ) return ;
225+ const content = file_buf [0.. file_len ];
226+
227+ // Track latest status per issue
228+ var latest_issue : [MAX_AGENTS ]u32 = undefined ;
229+ var latest_ts : [MAX_AGENTS ]i64 = undefined ;
230+ var latest_status : [MAX_AGENTS ][32 ]u8 = undefined ;
231+ var latest_status_len : [MAX_AGENTS ]usize = undefined ;
232+ var latest_detail : [MAX_AGENTS ][256 ]u8 = undefined ;
233+ var latest_detail_len : [MAX_AGENTS ]usize = undefined ;
234+ var latest_count : usize = 0 ;
235+
236+ var offset : usize = 0 ;
237+ while (offset < content .len ) {
238+ const line_end = std .mem .indexOfPos (u8 , content , offset , "\n " ) orelse content .len ;
239+ const line = content [offset .. line_end ];
240+ offset = line_end + 1 ;
241+ if (line .len == 0 ) continue ;
242+
243+ // Parse issue
244+ const issue_idx = std .mem .indexOf (u8 , line , "\" issue\" :" ) orelse continue ;
245+ const istart = issue_idx + 8 ;
246+ var iend = istart ;
247+ while (iend < line .len and line [iend ] >= '0' and line [iend ] <= '9' ) : (iend += 1 ) {}
248+ const issue = std .fmt .parseInt (u32 , line [istart .. iend ], 10 ) catch continue ;
249+
250+ // Parse timestamp
251+ const ts_idx = std .mem .indexOf (u8 , line , "\" ts\" :" ) orelse continue ;
252+ const tstart = ts_idx + 5 ;
253+ var tend = tstart ;
254+ while (tend < line .len and line [tend ] >= '0' and line [tend ] <= '9' ) : (tend += 1 ) {}
255+ const ts = std .fmt .parseInt (i64 , line [tstart .. tend ], 10 ) catch continue ;
256+
257+ // Parse status and detail
258+ const status_str = extractJsonString (line , "status" ) orelse continue ;
259+ const detail_str = extractJsonString (line , "detail" ) orelse "" ;
260+
261+ // Find or create entry for this issue (keep latest timestamp)
262+ var entry_idx : ? usize = null ;
263+ for (0.. latest_count ) | i | {
264+ if (latest_issue [i ] == issue ) {
265+ if (ts > latest_ts [i ]) {
266+ entry_idx = i ;
267+ }
268+ break ;
269+ }
270+ }
271+
272+ if (entry_idx == null and latest_count < MAX_AGENTS ) {
273+ entry_idx = latest_count ;
274+ latest_issue [latest_count ] = issue ;
275+ latest_count += 1 ;
276+ }
277+
278+ if (entry_idx ) | idx | {
279+ latest_ts [idx ] = ts ;
280+ latest_status_len [idx ] = @min (status_str .len , 32 );
281+ @memcpy (latest_status [idx ][0.. latest_status_len [idx ]], status_str [0.. latest_status_len [idx ]]);
282+ latest_detail_len [idx ] = @min (detail_str .len , 256 );
283+ @memcpy (latest_detail [idx ][0.. latest_detail_len [idx ]], detail_str [0.. latest_detail_len [idx ]]);
284+ }
285+ }
286+
287+ // Populate agent_statuses from latest events
288+ status_count = 0 ;
289+ for (0.. latest_count ) | i | {
290+ if (status_count >= MAX_AGENTS ) break ;
291+ const entry = & agent_statuses [status_count ];
292+ entry .issue = latest_issue [i ];
293+ entry .status_len = latest_status_len [i ];
294+ @memcpy (entry .status [0.. entry .status_len ], latest_status [i ][0.. entry .status_len ]);
295+ entry .detail_len = latest_detail_len [i ];
296+ @memcpy (entry .detail [0.. entry .detail_len ], latest_detail [i ][0.. entry .detail_len ]);
297+ entry .last_heartbeat = latest_ts [i ];
298+ status_count += 1 ;
299+ }
300+
301+ std .log .info ("Restored {d} agent states from events log" , .{status_count });
302+ }
303+
304+ /// Check if we should skip this event (deduplication: same status within 5 seconds)
305+ fn shouldSkipEvent (issue : u32 , status_str : []const u8 ) bool {
306+ const now = std .time .timestamp ();
307+ const dedup_window : i64 = 5 ; // 5 seconds
308+
309+ for (0.. last_event_count ) | i | {
310+ if (last_event_times [i ][0 ] == issue ) {
311+ const last_ts = @as (i64 , @intCast (last_event_times [i ][1 ]));
312+ if (now - last_ts < dedup_window ) {
313+ // Check if status is the same
314+ for (0.. status_count ) | j | {
315+ if (agent_statuses [j ].issue == issue ) {
316+ const last_status = agent_statuses [j ].getStatus ();
317+ if (std .mem .eql (u8 , last_status , status_str )) {
318+ return true ; // Skip: same status within 5s window
319+ }
320+ break ;
321+ }
322+ }
323+ }
324+ // Update timestamp
325+ last_event_times [i ][1 ] = @as (u32 , @intCast (now ));
326+ return false ;
327+ }
328+ }
329+
330+ // New issue, add to tracking
331+ if (last_event_count < MAX_AGENTS ) {
332+ last_event_times [last_event_count ][0 ] = issue ;
333+ last_event_times [last_event_count ][1 ] = @as (u32 , @intCast (now ));
334+ last_event_count += 1 ;
335+ }
336+ return false ;
337+ }
338+
200339// ═══════════════════════════════════════════════════════════════════════════════
201340// INTERNAL
202341// ═══════════════════════════════════════════════════════════════════════════════
@@ -209,15 +348,17 @@ fn appendEvent(issue: u32, status_str: []const u8, detail: []const u8) void {
209348
210349 // Seek to end for append
211350 file .seekFromEnd (0 ) catch return ;
212- const w = file .writer ();
213351
352+ // Format JSON line to buffer, then write
353+ var buf : [512 ]u8 = undefined ;
214354 const ts = std .time .timestamp ();
215- std .fmt .format ( w , "{{\" ts\" :{d},\" issue\" :{d},\" status\" :\" {s}\" ,\" detail\" :\" {s}\" }}\n " , .{
355+ const line = std .fmt .bufPrint ( & buf , "{{\" ts\" :{d},\" issue\" :{d},\" status\" :\" {s}\" ,\" detail\" :\" {s}\" }}\n " , .{
216356 ts ,
217357 issue ,
218358 status_str ,
219359 detail ,
220360 }) catch return ;
361+ _ = file .writeAll (line ) catch return ;
221362}
222363
223364fn sendTriNotify (issue : u32 , status_str : []const u8 , detail : []const u8 ) void {
0 commit comments