Skip to content

Commit c639376

Browse files
authored
Merge pull request #129 from gHashTag/feat/issue-124
feat(cloud): JSONL event persistence + deduplication + JSON history format (#124)
2 parents be6527a + 7778d63 commit c639376

2 files changed

Lines changed: 200 additions & 11 deletions

File tree

src/tri/tri_cloud.zig

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ fn cloudCleanup(allocator: Allocator) !void {
564564
print("{s}✓ Cleaned {d} inactive agent(s){s}\n", .{ GREEN, cleaned, RESET });
565565
}
566566

567-
/// tri cloud history [issue] — Show event history from JSONL
567+
/// tri cloud history [issue] [--format=json] — Show event history from JSONL
568568
fn cloudHistory(_: Allocator, args: []const []const u8) !void {
569569
const events_path = ".trinity/cloud_events.jsonl";
570570
const file = std.fs.cwd().openFile(events_path, .{}) catch {
@@ -573,22 +573,70 @@ fn cloudHistory(_: Allocator, args: []const []const u8) !void {
573573
};
574574
defer file.close();
575575

576-
// Optional issue filter
576+
// Parse args: [issue] [--format=json]
577577
var filter_issue: ?u32 = null;
578-
if (args.len >= 1) {
579-
filter_issue = std.fmt.parseInt(u32, args[0], 10) catch null;
578+
var json_output: bool = false;
579+
for (args) |arg| {
580+
if (eql(u8, arg, "--format=json")) {
581+
json_output = true;
582+
} else if (eql(u8, arg, "--format=human")) {
583+
json_output = false;
584+
} else if (filter_issue == null) {
585+
// Try to parse as issue number
586+
filter_issue = std.fmt.parseInt(u32, arg, 10) catch null;
587+
}
580588
}
581589

582-
print("\n{s}{s}", .{ GOLDEN, BOLD });
583-
print("═══════════════════════════════════════════════════\n", .{});
584-
print(" CLOUD EVENTS — History\n", .{});
585-
print("═══════════════════════════════════════════════════{s}\n", .{RESET});
586-
587590
// Read entire file (cloud events are small)
588591
var buf: [32768]u8 = undefined;
589592
const len = file.readAll(&buf) catch 0;
590593
const content = buf[0..len];
591594

595+
if (json_output) {
596+
// JSON output for machine consumption
597+
var json_buf: [65536]u8 = undefined;
598+
var fbs = std.io.fixedBufferStream(&json_buf);
599+
const w = fbs.writer();
600+
601+
w.writeAll("{\"events\":[") catch return;
602+
603+
var first = true;
604+
var count: u32 = 0;
605+
var offset: usize = 0;
606+
607+
while (offset < content.len) {
608+
const line_end = std.mem.indexOfPos(u8, content, offset, "\n") orelse content.len;
609+
const line = content[offset..line_end];
610+
offset = line_end + 1;
611+
612+
if (line.len == 0) continue;
613+
614+
// Filter by issue if specified
615+
if (filter_issue) |fi| {
616+
var needle_buf: [32]u8 = undefined;
617+
const needle = std.fmt.bufPrint(&needle_buf, "\"issue\":{d}", .{fi}) catch continue;
618+
if (std.mem.indexOf(u8, line, needle) == null) continue;
619+
}
620+
621+
if (!first) w.writeAll(",") catch {};
622+
first = false;
623+
w.writeAll(line) catch break;
624+
count += 1;
625+
}
626+
627+
w.writeAll("],\"count\":") catch {};
628+
std.fmt.format(w, "{d}}}", .{count}) catch {};
629+
630+
print("{s}\n", .{fbs.getWritten()});
631+
return;
632+
}
633+
634+
// Human-readable output
635+
print("\n{s}{s}", .{ GOLDEN, BOLD });
636+
print("═══════════════════════════════════════════════════\n", .{});
637+
print(" CLOUD EVENTS — History\n", .{});
638+
print("═══════════════════════════════════════════════════{s}\n", .{RESET});
639+
592640
var count: u32 = 0;
593641
var offset: usize = 0;
594642

tools/mcp/trinity_mcp/cloud_monitor.zig

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ pub const AgentStatus = struct {
5858
var agent_statuses: [MAX_AGENTS]AgentStatus = undefined;
5959
var status_count: usize = 0;
6060

61+
// Track last event time per issue for deduplication (5s window)
62+
var last_event_times: [MAX_AGENTS][2]u32 = [_][2]u32{.{ 0, 0 }} ** MAX_AGENTS; // [issue_index][0]=issue, [1]=timestamp
63+
var last_event_count: usize = 0;
64+
6165
// ═══════════════════════════════════════════════════════════════════════════════
6266
// PUBLIC API
6367
// ═══════════════════════════════════════════════════════════════════════════════
@@ -69,6 +73,11 @@ pub fn runMonitor(port: u16) !void {
6973

7074
std.log.info("Cloud Monitor starting on port {d}", .{actual_port});
7175

76+
// Restore state from JSONL on startup
77+
restoreStateFromEvents() catch |err| {
78+
std.log.warn("Failed to restore state from events: {}", .{err});
79+
};
80+
7281
const address = net.Address.parseIp4("0.0.0.0", actual_port) catch return;
7382
var server = address.listen(.{ .reuse_address = true }) catch |err| {
7483
std.log.err("Cloud Monitor failed to bind: {}", .{err});
@@ -90,7 +99,13 @@ pub fn runMonitor(port: u16) !void {
9099

91100
/// Update agent status (called from HTTP POST handler).
92101
/// Persists to JSONL, alerts on error states via tri notify.
102+
/// Skips duplicate events within 5s window.
93103
pub fn updateStatus(issue: u32, status_str: []const u8, detail: []const u8) void {
104+
// Deduplication: skip if same status within 5s
105+
if (shouldSkipEvent(issue, status_str)) {
106+
return;
107+
}
108+
94109
// 1. Append to JSONL event log
95110
appendEvent(issue, status_str, detail);
96111

@@ -197,6 +212,130 @@ pub fn getStatusJson(buf: []u8) []const u8 {
197212
return fbs.getWritten();
198213
}
199214

215+
/// Restore agent states from JSONL event log on startup.
216+
/// Reads the last event for each issue and populates agent_statuses.
217+
fn restoreStateFromEvents() !void {
218+
const file = std.fs.cwd().openFile(EVENTS_FILE, .{}) catch return;
219+
defer file.close();
220+
221+
// Read entire file
222+
var file_buf: [65536]u8 = undefined;
223+
const file_len = file.readAll(&file_buf) catch 0;
224+
if (file_len == 0) return;
225+
const content = file_buf[0..file_len];
226+
227+
// Track latest status per issue
228+
var latest_issue: [MAX_AGENTS]u32 = undefined;
229+
var latest_ts: [MAX_AGENTS]i64 = undefined;
230+
var latest_status: [MAX_AGENTS][32]u8 = undefined;
231+
var latest_status_len: [MAX_AGENTS]usize = undefined;
232+
var latest_detail: [MAX_AGENTS][256]u8 = undefined;
233+
var latest_detail_len: [MAX_AGENTS]usize = undefined;
234+
var latest_count: usize = 0;
235+
236+
var offset: usize = 0;
237+
while (offset < content.len) {
238+
const line_end = std.mem.indexOfPos(u8, content, offset, "\n") orelse content.len;
239+
const line = content[offset..line_end];
240+
offset = line_end + 1;
241+
if (line.len == 0) continue;
242+
243+
// Parse issue
244+
const issue_idx = std.mem.indexOf(u8, line, "\"issue\":") orelse continue;
245+
const istart = issue_idx + 8;
246+
var iend = istart;
247+
while (iend < line.len and line[iend] >= '0' and line[iend] <= '9') : (iend += 1) {}
248+
const issue = std.fmt.parseInt(u32, line[istart..iend], 10) catch continue;
249+
250+
// Parse timestamp
251+
const ts_idx = std.mem.indexOf(u8, line, "\"ts\":") orelse continue;
252+
const tstart = ts_idx + 5;
253+
var tend = tstart;
254+
while (tend < line.len and line[tend] >= '0' and line[tend] <= '9') : (tend += 1) {}
255+
const ts = std.fmt.parseInt(i64, line[tstart..tend], 10) catch continue;
256+
257+
// Parse status and detail
258+
const status_str = extractJsonString(line, "status") orelse continue;
259+
const detail_str = extractJsonString(line, "detail") orelse "";
260+
261+
// Find or create entry for this issue (keep latest timestamp)
262+
var entry_idx: ?usize = null;
263+
for (0..latest_count) |i| {
264+
if (latest_issue[i] == issue) {
265+
if (ts > latest_ts[i]) {
266+
entry_idx = i;
267+
}
268+
break;
269+
}
270+
}
271+
272+
if (entry_idx == null and latest_count < MAX_AGENTS) {
273+
entry_idx = latest_count;
274+
latest_issue[latest_count] = issue;
275+
latest_count += 1;
276+
}
277+
278+
if (entry_idx) |idx| {
279+
latest_ts[idx] = ts;
280+
latest_status_len[idx] = @min(status_str.len, 32);
281+
@memcpy(latest_status[idx][0..latest_status_len[idx]], status_str[0..latest_status_len[idx]]);
282+
latest_detail_len[idx] = @min(detail_str.len, 256);
283+
@memcpy(latest_detail[idx][0..latest_detail_len[idx]], detail_str[0..latest_detail_len[idx]]);
284+
}
285+
}
286+
287+
// Populate agent_statuses from latest events
288+
status_count = 0;
289+
for (0..latest_count) |i| {
290+
if (status_count >= MAX_AGENTS) break;
291+
const entry = &agent_statuses[status_count];
292+
entry.issue = latest_issue[i];
293+
entry.status_len = latest_status_len[i];
294+
@memcpy(entry.status[0..entry.status_len], latest_status[i][0..entry.status_len]);
295+
entry.detail_len = latest_detail_len[i];
296+
@memcpy(entry.detail[0..entry.detail_len], latest_detail[i][0..entry.detail_len]);
297+
entry.last_heartbeat = latest_ts[i];
298+
status_count += 1;
299+
}
300+
301+
std.log.info("Restored {d} agent states from events log", .{status_count});
302+
}
303+
304+
/// Check if we should skip this event (deduplication: same status within 5 seconds)
305+
fn shouldSkipEvent(issue: u32, status_str: []const u8) bool {
306+
const now = std.time.timestamp();
307+
const dedup_window: i64 = 5; // 5 seconds
308+
309+
for (0..last_event_count) |i| {
310+
if (last_event_times[i][0] == issue) {
311+
const last_ts = @as(i64, @intCast(last_event_times[i][1]));
312+
if (now - last_ts < dedup_window) {
313+
// Check if status is the same
314+
for (0..status_count) |j| {
315+
if (agent_statuses[j].issue == issue) {
316+
const last_status = agent_statuses[j].getStatus();
317+
if (std.mem.eql(u8, last_status, status_str)) {
318+
return true; // Skip: same status within 5s window
319+
}
320+
break;
321+
}
322+
}
323+
}
324+
// Update timestamp
325+
last_event_times[i][1] = @as(u32, @intCast(now));
326+
return false;
327+
}
328+
}
329+
330+
// New issue, add to tracking
331+
if (last_event_count < MAX_AGENTS) {
332+
last_event_times[last_event_count][0] = issue;
333+
last_event_times[last_event_count][1] = @as(u32, @intCast(now));
334+
last_event_count += 1;
335+
}
336+
return false;
337+
}
338+
200339
// ═══════════════════════════════════════════════════════════════════════════════
201340
// INTERNAL
202341
// ═══════════════════════════════════════════════════════════════════════════════
@@ -209,15 +348,17 @@ fn appendEvent(issue: u32, status_str: []const u8, detail: []const u8) void {
209348

210349
// Seek to end for append
211350
file.seekFromEnd(0) catch return;
212-
const w = file.writer();
213351

352+
// Format JSON line to buffer, then write
353+
var buf: [512]u8 = undefined;
214354
const ts = std.time.timestamp();
215-
std.fmt.format(w, "{{\"ts\":{d},\"issue\":{d},\"status\":\"{s}\",\"detail\":\"{s}\"}}\n", .{
355+
const line = std.fmt.bufPrint(&buf, "{{\"ts\":{d},\"issue\":{d},\"status\":\"{s}\",\"detail\":\"{s}\"}}\n", .{
216356
ts,
217357
issue,
218358
status_str,
219359
detail,
220360
}) catch return;
361+
_ = file.writeAll(line) catch return;
221362
}
222363

223364
fn sendTriNotify(issue: u32, status_str: []const u8, detail: []const u8) void {

0 commit comments

Comments
 (0)