Skip to content

Commit 8a77d32

Browse files
gHashTagAntigravity Agentclaude
authored
feat(bot): Phase 2 — streaming via sendMessageDraft + /stop (Issue #55) (#58)
Two-thread architecture: main thread polls Telegram, worker thread streams Claude output. NDJSON parsing from --output-format stream-json, sendDraft every 500ms, /stop sends SIGTERM to active process. New: claude_stream.zig (StreamState + runStreaming + stopProcess). Modified: bot_loop.zig (thread dispatch), handlers.zig (help text), telegram_api.zig (shared sendLongMessage). Closes #55 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Antigravity Agent <antigravity@vibee.org> Co-authored-by: Claude <noreply@anthropic.com>
1 parent b87f6aa commit 8a77d32

4 files changed

Lines changed: 232 additions & 64 deletions

File tree

tools/mcp/trinity_mcp/bot/bot_loop.zig

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
11
// bot_loop.zig — Main poll → parse → dispatch → repeat loop
2+
// Phase 2: Two-thread arch — main thread polls, worker thread streams Claude output
23
const std = @import("std");
34
const telegram_api = @import("telegram_api.zig");
45
const json_utils = @import("json_utils.zig");
56
const command_parser = @import("command_parser.zig");
67
const handlers = @import("handlers.zig");
8+
const claude_stream = @import("claude_stream.zig");
79

810
const BotConfig = telegram_api.BotConfig;
911

12+
/// Shared state for streaming — module-level, accessible from main + worker threads
13+
var stream_state = claude_stream.StreamState{};
14+
1015
/// Run the bot loop: poll Telegram, parse commands, dispatch handlers.
11-
/// Never returns (infinite loop).
16+
/// Never returns (infinite loop). Main thread stays responsive while
17+
/// worker thread handles Claude streaming.
1218
pub fn run(allocator: std.mem.Allocator, config: BotConfig) void {
1319
var last_update_id: i64 = 0;
1420

1521
// Announce startup
16-
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xf0\x9f\xa4\x96 TRI BOT online! Send /help for commands.");
22+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xf0\x9f\xa4\x96 TRI BOT v2.0 online! Send /help for commands.");
1723

18-
std.debug.print("[tri-bot] Started. Polling Telegram...\n", .{});
24+
std.debug.print("[tri-bot] Started (Phase 2: streaming). Polling Telegram...\n", .{});
1925

2026
while (true) {
2127
const body = telegram_api.getUpdates(allocator, config.bot_token, last_update_id + 1) orelse {
@@ -25,48 +31,31 @@ pub fn run(allocator: std.mem.Allocator, config: BotConfig) void {
2531
};
2632
defer allocator.free(body);
2733

28-
// Process each update
29-
const Context = struct {
30-
allocator: std.mem.Allocator,
31-
config: BotConfig,
32-
max_id: i64,
33-
};
34-
var ctx = Context{
35-
.allocator = allocator,
36-
.config = config,
37-
.max_id = last_update_id,
38-
};
39-
40-
// We can't use closures in Zig, so use a global-style dispatch.
41-
// Instead, manually iterate updates with a simple loop.
42-
processUpdates(allocator, config, body, &ctx.max_id);
43-
last_update_id = ctx.max_id;
34+
var max_id = last_update_id;
35+
processUpdates(allocator, config, body, &max_id);
36+
last_update_id = max_id;
4437
}
4538
}
4639

4740
fn processUpdates(allocator: std.mem.Allocator, config: BotConfig, body: []const u8, max_id: *i64) void {
48-
// Find each "update_id": block manually
4941
var pos: usize = 0;
5042
while (pos < body.len) {
5143
const needle = "\"update_id\":";
5244
const idx = std.mem.indexOfPos(u8, body, pos, needle) orelse break;
5345

54-
// Determine block boundary (next update_id or end)
5546
const next_idx = std.mem.indexOfPos(u8, body, idx + needle.len + 1, needle) orelse body.len;
5647
const block = body[idx..next_idx];
5748

58-
// Extract update_id
5949
const uid = json_utils.extractInt(block, "update_id") orelse {
6050
pos = idx + needle.len;
6151
continue;
6252
};
6353

64-
// Update max
6554
if (uid > max_id.*) {
6655
max_id.* = uid;
6756
}
6857

69-
// Extract chat_id
58+
// Extract chat_id from nested message.chat.id
7059
const chat_id_val = blk: {
7160
const chat_needle = "\"chat\":{\"id\":";
7261
const ci = std.mem.indexOf(u8, block, chat_needle) orelse break :blk @as(i64, 0);
@@ -76,26 +65,22 @@ fn processUpdates(allocator: std.mem.Allocator, config: BotConfig, body: []const
7665
break :blk std.fmt.parseInt(i64, block[cs..ce], 10) catch 0;
7766
};
7867

79-
// Auth check: only respond to configured chat_id
68+
// Auth check
8069
const expected_chat_id = std.fmt.parseInt(i64, config.chat_id, 10) catch 0;
8170
if (chat_id_val != expected_chat_id) {
82-
std.debug.print("[tri-bot] Ignoring update from chat {d} (expected {d})\n", .{ chat_id_val, expected_chat_id });
71+
std.debug.print("[tri-bot] Ignoring update from chat {d}\n", .{chat_id_val});
8372
pos = next_idx;
8473
continue;
8574
}
8675

87-
// Extract text
8876
const text = json_utils.extractString(block, "text") orelse {
8977
pos = next_idx;
9078
continue;
9179
};
9280

9381
std.debug.print("[tri-bot] Update {d}: \"{s}\"\n", .{ uid, text });
9482

95-
// Parse command
9683
const cmd = command_parser.parse(text);
97-
98-
// Dispatch
9984
dispatch(allocator, config, cmd);
10085

10186
pos = next_idx;
@@ -106,15 +91,45 @@ fn dispatch(allocator: std.mem.Allocator, config: BotConfig, cmd: command_parser
10691
if (std.mem.eql(u8, cmd.name, "help")) {
10792
handlers.handleHelp(allocator, config);
10893
} else if (std.mem.eql(u8, cmd.name, "ask")) {
109-
handlers.handleAsk(allocator, config, cmd.args);
94+
// Phase 2: streaming /ask via worker thread
95+
if (cmd.args.len == 0) {
96+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9a\xa0 Usage: /ask <question>");
97+
return;
98+
}
99+
if (stream_state.is_busy.load(.acquire)) {
100+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x8f\xb3 Already processing. Send /stop first.");
101+
return;
102+
}
103+
// Dupe args — they point into getUpdates body which will be freed
104+
const args_owned = allocator.dupe(u8, cmd.args) catch return;
105+
stream_state.is_busy.store(true, .release);
106+
_ = std.Thread.spawn(.{}, claude_stream.runStreaming, .{ allocator, config, args_owned, false, &stream_state }) catch {
107+
stream_state.is_busy.store(false, .release);
108+
allocator.free(args_owned);
109+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9d\x8c Failed to spawn worker thread");
110+
return;
111+
};
110112
} else if (std.mem.eql(u8, cmd.name, "continue")) {
111-
handlers.handleContinue(allocator, config, cmd.args);
113+
// Phase 2: streaming /continue via worker thread
114+
if (stream_state.is_busy.load(.acquire)) {
115+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x8f\xb3 Already processing. Send /stop first.");
116+
return;
117+
}
118+
const args_owned = allocator.dupe(u8, cmd.args) catch return;
119+
stream_state.is_busy.store(true, .release);
120+
_ = std.Thread.spawn(.{}, claude_stream.runStreaming, .{ allocator, config, args_owned, true, &stream_state }) catch {
121+
stream_state.is_busy.store(false, .release);
122+
allocator.free(args_owned);
123+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9d\x8c Failed to spawn worker thread");
124+
return;
125+
};
112126
} else if (std.mem.eql(u8, cmd.name, "status")) {
127+
// Status stays blocking (short query, no streaming needed)
113128
handlers.handleStatus(allocator, config);
114129
} else if (std.mem.eql(u8, cmd.name, "stop")) {
115-
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9b\x94 /stop not yet implemented (Phase 2)");
130+
// Phase 2: /stop kills active Claude process
131+
claude_stream.stopProcess(allocator, config, &stream_state);
116132
} else if (cmd.name.len > 0) {
117-
// Unknown command
118133
var buf: [256]u8 = undefined;
119134
telegram_api.sendFmt(allocator, config.bot_token, config.chat_id, &buf, "\xe2\x9d\x93 Unknown command: /{s}. Try /help", .{cmd.name});
120135
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// claude_stream.zig — Streaming Claude CLI execution with sendMessageDraft
2+
// Spawns claude with --output-format stream-json, pipes stdout,
3+
// reads NDJSON line-by-line, sends drafts every 500ms, final message when done.
4+
const std = @import("std");
5+
const telegram_api = @import("telegram_api.zig");
6+
const json_utils = @import("json_utils.zig");
7+
8+
const BotConfig = telegram_api.BotConfig;
9+
10+
/// Shared state between main thread (polling) and worker thread (streaming).
11+
/// Atomics for lock-free is_busy check, PID for /stop.
12+
pub const StreamState = struct {
13+
is_busy: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
14+
active_pid: std.atomic.Value(i32) = std.atomic.Value(i32).init(0),
15+
};
16+
17+
/// Worker thread entry point: spawn claude, stream output, send drafts.
18+
/// Called via std.Thread.spawn() from bot_loop dispatch.
19+
pub fn runStreaming(
20+
allocator: std.mem.Allocator,
21+
config: BotConfig,
22+
args_owned: []const u8,
23+
use_continue: bool,
24+
state: *StreamState,
25+
) void {
26+
defer {
27+
state.active_pid.store(0, .release);
28+
state.is_busy.store(false, .release);
29+
allocator.free(args_owned);
30+
}
31+
32+
// Notify user
33+
if (use_continue) {
34+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xf0\x9f\x94\x84 TRI streaming...");
35+
} else {
36+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xf0\x9f\xa7\xa0 TRI streaming...");
37+
}
38+
39+
// Build turns string
40+
var turns_buf: [16]u8 = undefined;
41+
const turns_str = std.fmt.bufPrint(&turns_buf, "{d}", .{config.max_turns}) catch "10";
42+
43+
// Build argv dynamically based on args and --continue flag
44+
var argv_buf: [12][]const u8 = undefined;
45+
var argc: usize = 0;
46+
47+
argv_buf[argc] = "claude";
48+
argc += 1;
49+
if (args_owned.len > 0) {
50+
argv_buf[argc] = "-p";
51+
argc += 1;
52+
argv_buf[argc] = args_owned;
53+
argc += 1;
54+
}
55+
if (use_continue) {
56+
argv_buf[argc] = "--continue";
57+
argc += 1;
58+
}
59+
argv_buf[argc] = "--output-format";
60+
argc += 1;
61+
argv_buf[argc] = "stream-json";
62+
argc += 1;
63+
argv_buf[argc] = "--max-turns";
64+
argc += 1;
65+
argv_buf[argc] = turns_str;
66+
argc += 1;
67+
68+
// Init child with piped stdout
69+
var child = std.process.Child.init(argv_buf[0..argc], allocator);
70+
child.stdout_behavior = .Pipe;
71+
child.stderr_behavior = .Pipe;
72+
child.cwd = config.project_root;
73+
74+
child.spawn() catch |err| {
75+
var err_buf: [256]u8 = undefined;
76+
telegram_api.sendFmt(allocator, config.bot_token, config.chat_id, &err_buf, "\xe2\x9d\x8c Spawn error: {s}", .{@errorName(err)});
77+
return;
78+
};
79+
80+
// Store PID for /stop
81+
state.active_pid.store(@intCast(child.id), .release);
82+
83+
std.debug.print("[tri-bot] Streaming started (PID {d})\n", .{child.id});
84+
85+
// Read stdout line-by-line, accumulate text deltas, send drafts
86+
var text_buf: std.ArrayList(u8) = .empty;
87+
defer text_buf.deinit(allocator);
88+
89+
var last_draft_ns: i128 = std.time.nanoTimestamp();
90+
const draft_interval: i128 = 500_000_000; // 500ms
91+
92+
const stdout_file = child.stdout.?;
93+
var line_buf: [65536]u8 = undefined;
94+
var line_len: usize = 0;
95+
var read_chunk: [4096]u8 = undefined;
96+
97+
while (true) {
98+
const n = stdout_file.read(&read_chunk) catch break;
99+
if (n == 0) break; // EOF
100+
101+
for (read_chunk[0..n]) |byte| {
102+
if (byte == '\n') {
103+
// Process complete NDJSON line
104+
const line = line_buf[0..line_len];
105+
if (std.mem.indexOf(u8, line, "\"text_delta\"") != null) {
106+
if (json_utils.extractString(line, "text")) |text_val| {
107+
text_buf.appendSlice(allocator, text_val) catch {};
108+
}
109+
}
110+
line_len = 0;
111+
112+
// Throttle: sendDraft every 500ms
113+
const now = std.time.nanoTimestamp();
114+
if (now - last_draft_ns >= draft_interval and text_buf.items.len > 0) {
115+
const draft_len = @min(text_buf.items.len, 4000);
116+
telegram_api.sendDraft(allocator, config.bot_token, config.chat_id, text_buf.items[0..draft_len]);
117+
last_draft_ns = now;
118+
}
119+
} else if (line_len < line_buf.len - 1) {
120+
line_buf[line_len] = byte;
121+
line_len += 1;
122+
}
123+
}
124+
}
125+
126+
// Wait for child to finish
127+
_ = child.wait() catch {};
128+
129+
std.debug.print("[tri-bot] Streaming done ({d} bytes)\n", .{text_buf.items.len});
130+
131+
// Send final message
132+
if (text_buf.items.len > 0) {
133+
telegram_api.sendLongMessage(allocator, config, text_buf.items);
134+
} else {
135+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9a\xa0 Claude returned empty response");
136+
}
137+
}
138+
139+
/// Stop the active Claude process via SIGTERM.
140+
pub fn stopProcess(allocator: std.mem.Allocator, config: BotConfig, state: *StreamState) void {
141+
const pid = state.active_pid.load(.acquire);
142+
if (pid > 0) {
143+
const posix_pid: std.posix.pid_t = @intCast(pid);
144+
std.posix.kill(posix_pid, 15) catch |err| { // 15 = SIGTERM
145+
var err_buf: [256]u8 = undefined;
146+
telegram_api.sendFmt(allocator, config.bot_token, config.chat_id, &err_buf, "\xe2\x9d\x8c Kill error: {s}", .{@errorName(err)});
147+
return;
148+
};
149+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9b\x94 Claude process stopped");
150+
} else {
151+
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9a\xa0 No active Claude process");
152+
}
153+
}

tools/mcp/trinity_mcp/bot/handlers.zig

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ pub fn handleHelp(allocator: std.mem.Allocator, config: BotConfig) void {
1010
const help_text =
1111
"\xf0\x9f\xa4\x96 TRI BOT \xe2\x80\x94 Claude Code Remote Control\n" ++
1212
"\n" ++
13-
"/ask <question> \xe2\x80\x94 Ask Claude anything\n" ++
14-
"/continue <question> \xe2\x80\x94 Continue last session\n" ++
13+
"/ask <question> \xe2\x80\x94 Ask Claude (streaming)\n" ++
14+
"/continue [question] \xe2\x80\x94 Continue session (streaming)\n" ++
1515
"/status \xe2\x80\x94 Project status\n" ++
1616
"/stop \xe2\x80\x94 Stop running Claude process\n" ++
1717
"/help \xe2\x80\x94 This message\n" ++
1818
"\n" ++
19-
"Phase 2: /resume, /model, /board, /pr, /worktree";
19+
"Phase 3: /resume, /model, /board, /pr, /worktree";
2020
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, help_text);
2121
}
2222

@@ -54,7 +54,7 @@ pub fn handleAsk(allocator: std.mem.Allocator, config: BotConfig, args: []const
5454
}
5555

5656
// Telegram message limit: 4096 chars. Split if needed.
57-
sendLongMessage(allocator, config, result.stdout);
57+
telegram_api.sendLongMessage(allocator, config, result.stdout);
5858
}
5959

6060
/// /continue <question> — Run claude -p "<question>" --continue
@@ -87,7 +87,7 @@ pub fn handleContinue(allocator: std.mem.Allocator, config: BotConfig, args: []c
8787
return;
8888
}
8989

90-
sendLongMessage(allocator, config, result.stdout);
90+
telegram_api.sendLongMessage(allocator, config, result.stdout);
9191
}
9292

9393
/// /status — Run claude -p "Summarize project status in 5 lines"
@@ -117,29 +117,5 @@ pub fn handleStatus(allocator: std.mem.Allocator, config: BotConfig) void {
117117
return;
118118
}
119119

120-
sendLongMessage(allocator, config, result.stdout);
121-
}
122-
123-
/// Send a potentially long message, splitting at 4096 char boundary
124-
fn sendLongMessage(allocator: std.mem.Allocator, config: BotConfig, text: []const u8) void {
125-
const max_len: usize = 4000; // Leave some margin below 4096
126-
var pos: usize = 0;
127-
128-
while (pos < text.len) {
129-
const end = @min(pos + max_len, text.len);
130-
// Try to split at a newline
131-
var split = end;
132-
if (end < text.len) {
133-
var j = end;
134-
while (j > pos + max_len / 2) : (j -= 1) {
135-
if (text[j] == '\n') {
136-
split = j + 1;
137-
break;
138-
}
139-
}
140-
}
141-
const chunk = text[pos..split];
142-
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, chunk);
143-
pos = split;
144-
}
120+
telegram_api.sendLongMessage(allocator, config, result.stdout);
145121
}

0 commit comments

Comments
 (0)