Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 5 additions & 39 deletions tools/mcp/trinity_mcp/bot/bot_loop.zig
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// bot_loop.zig — Main poll → parse → dispatch → repeat loop
// Phase 2.5: Two-thread arch + session management (/model, /resume, /sessions)
// v2.0: Direct Anthropic API (no claude CLI)
const std = @import("std");
const telegram_api = @import("telegram_api.zig");
const json_utils = @import("json_utils.zig");
Expand Down Expand Up @@ -96,7 +96,6 @@ fn spawnStreaming(allocator: std.mem.Allocator, config: BotConfig, opts: claude_
_ = std.Thread.spawn(.{}, claude_stream.runStreaming, .{ allocator, config, opts, &stream_state }) catch {
stream_state.is_busy.store(false, .release);
allocator.free(opts.args);
if (opts.resume_id) |rid| allocator.free(rid);
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9d\x8c Failed to spawn worker thread");
return;
};
Expand All @@ -120,42 +119,9 @@ fn dispatch(allocator: std.mem.Allocator, config: BotConfig, cmd: command_parser
.args = args_owned,
.model = bot_state.getModel(),
});
} else if (std.mem.eql(u8, cmd.name, "continue")) {
// Streaming /continue via worker thread
if (stream_state.is_busy.load(.acquire)) {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x8f\xb3 Already processing. Send /stop first.");
return;
}
const args_owned = allocator.dupe(u8, cmd.args) catch return;
spawnStreaming(allocator, config, .{
.args = args_owned,
.use_continue = true,
.model = bot_state.getModel(),
});
} else if (std.mem.eql(u8, cmd.name, "resume")) {
// Streaming /resume <id> via worker thread
if (cmd.args.len == 0) {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9a\xa0 Usage: /resume <session-id>");
return;
}
if (stream_state.is_busy.load(.acquire)) {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x8f\xb3 Already processing. Send /stop first.");
return;
}
// Extract session ID (first word of args)
const id_end = std.mem.indexOf(u8, cmd.args, " ") orelse cmd.args.len;
const resume_id = allocator.dupe(u8, cmd.args[0..id_end]) catch return;
// Remaining text after ID is the prompt (if any)
const prompt_start = if (id_end < cmd.args.len) id_end + 1 else id_end;
const prompt = allocator.dupe(u8, cmd.args[prompt_start..]) catch {
allocator.free(resume_id);
return;
};
spawnStreaming(allocator, config, .{
.args = prompt,
.resume_id = resume_id,
.model = bot_state.getModel(),
});
} else if (std.mem.eql(u8, cmd.name, "continue") or std.mem.eql(u8, cmd.name, "resume")) {
// Deferred to Phase 5 (sessions)
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xf0\x9f\x93\x8b Coming in Phase 5 (sessions). Use /ask for new queries.");
} else if (std.mem.eql(u8, cmd.name, "model")) {
// Blocking: set/show model
handlers.handleModel(allocator, config, cmd.args, &bot_state);
Expand All @@ -166,7 +132,7 @@ fn dispatch(allocator: std.mem.Allocator, config: BotConfig, cmd: command_parser
// Blocking: project status
handlers.handleStatus(allocator, config);
} else if (std.mem.eql(u8, cmd.name, "stop")) {
// Kill active Claude process
// Cancel active streaming request
claude_stream.stopProcess(allocator, config, &stream_state);
} else if (cmd.name.len > 0) {
var buf: [256]u8 = undefined;
Expand Down
255 changes: 158 additions & 97 deletions tools/mcp/trinity_mcp/bot/claude_stream.zig
Original file line number Diff line number Diff line change
@@ -1,130 +1,130 @@
// claude_stream.zig — Streaming Claude CLI execution with sendMessageDraft
// Spawns claude with --output-format stream-json, pipes stdout,
// reads NDJSON line-by-line, sends drafts every 500ms, final message when done.
// claude_stream.zig — Streaming via direct Anthropic API (SSE)
// POST to api.anthropic.com/v1/messages with stream:true, parse SSE events,
// send Telegram drafts every 500ms, final message when done.
// No claude CLI dependency. Pure Zig std.http.Client.
const std = @import("std");
const telegram_api = @import("telegram_api.zig");
const json_utils = @import("json_utils.zig");

const BotConfig = telegram_api.BotConfig;

const api_url = "https://api.anthropic.com/v1/messages";
const api_version = "2023-06-01";
const default_model = "claude-sonnet-4-20250514";

/// Shared state between main thread (polling) and worker thread (streaming).
/// Atomics for lock-free is_busy check, PID for /stop.
pub const StreamState = struct {
is_busy: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
active_pid: std.atomic.Value(i32) = std.atomic.Value(i32).init(0),
cancel_requested: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
};

/// Options for streaming — covers /ask, /continue, and /resume.
/// Options for streaming /ask.
pub const StreamOpts = struct {
args: []const u8, // prompt text (caller-owned, freed by worker)
use_continue: bool = false,
resume_id: ?[]const u8 = null, // session ID for --resume
model: ?[]const u8 = null, // model override (pointer to BotState buffer)
model: ?[]const u8 = null, // model override
};

/// Worker thread entry point: spawn claude, stream output, send drafts.
/// Called via std.Thread.spawn() from bot_loop dispatch.
/// Worker thread entry point: POST to Anthropic API, stream SSE, send drafts.
pub fn runStreaming(
allocator: std.mem.Allocator,
config: BotConfig,
opts: StreamOpts,
state: *StreamState,
) void {
defer {
state.active_pid.store(0, .release);
state.cancel_requested.store(false, .release);
state.is_busy.store(false, .release);
allocator.free(opts.args);
if (opts.resume_id) |rid| allocator.free(rid);
}

// Notify user
if (opts.resume_id != null) {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xf0\x9f\x94\x84 TRI resuming session...");
} else if (opts.use_continue) {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xf0\x9f\x94\x84 TRI streaming...");
} else {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xf0\x9f\xa7\xa0 TRI streaming...");
}
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xf0\x9f\xa7\xa0 TRI streaming...");

// Build turns string
var turns_buf: [16]u8 = undefined;
const turns_str = std.fmt.bufPrint(&turns_buf, "{d}", .{config.max_turns}) catch "10";

// Build argv dynamically based on opts
var argv_buf: [16][]const u8 = undefined;
var argc: usize = 0;

argv_buf[argc] = "claude";
argc += 1;
if (opts.args.len > 0) {
argv_buf[argc] = "-p";
argc += 1;
argv_buf[argc] = opts.args;
argc += 1;
}
if (opts.resume_id) |rid| {
argv_buf[argc] = "--resume";
argc += 1;
argv_buf[argc] = rid;
argc += 1;
} else if (opts.use_continue) {
argv_buf[argc] = "--continue";
argc += 1;
}
if (opts.model) |model| {
argv_buf[argc] = "--model";
argc += 1;
argv_buf[argc] = model;
argc += 1;
}
argv_buf[argc] = "--output-format";
argc += 1;
argv_buf[argc] = "stream-json";
argc += 1;
argv_buf[argc] = "--max-turns";
argc += 1;
argv_buf[argc] = turns_str;
argc += 1;

// Init child with piped stdout
var child = std.process.Child.init(argv_buf[0..argc], allocator);
child.stdout_behavior = .Pipe;
child.stderr_behavior = .Pipe;
child.cwd = config.project_root;

child.spawn() catch |err| {
var err_buf: [256]u8 = undefined;
telegram_api.sendFmt(allocator, config.bot_token, config.chat_id, &err_buf, "\xe2\x9d\x8c Spawn error: {s}", .{@errorName(err)});
const model = opts.model orelse default_model;

// Build request body
var body_buf: std.ArrayList(u8) = .empty;
defer body_buf.deinit(allocator);

buildRequestBody(allocator, &body_buf, model, opts.args) catch {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9d\x8c Failed to build request");
return;
};

std.debug.print("[tri-bot] SSE request: {d} bytes, model={s}\n", .{ body_buf.items.len, model });

// HTTP POST to Anthropic API
var client = std.http.Client{ .allocator = allocator };
defer client.deinit();

const uri = std.Uri.parse(api_url) catch unreachable;

var req = client.request(.POST, uri, .{
.extra_headers = &.{
.{ .name = "Content-Type", .value = "application/json" },
.{ .name = "x-api-key", .value = config.api_key },
.{ .name = "anthropic-version", .value = api_version },
},
}) catch {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9d\x8c Connection failed");
return;
};
defer req.deinit();

// Send body
req.transfer_encoding = .{ .content_length = body_buf.items.len };
var bw = req.sendBodyUnflushed(&.{}) catch {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9d\x8c Send failed");
return;
};
bw.writer.writeAll(body_buf.items) catch {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9d\x8c Write failed");
return;
};
bw.end() catch {};
if (req.connection) |conn| conn.flush() catch {};

// Receive response head
var redirect_buf: [0]u8 = .{};
var response = req.receiveHead(&redirect_buf) catch {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9d\x8c No response from API");
return;
};

// Store PID for /stop
state.active_pid.store(@intCast(child.id), .release);
// Check for HTTP errors
const status = @intFromEnum(response.head.status);
if (status >= 400) {
handleApiError(allocator, config, &response, status);
return;
}

std.debug.print("[tri-bot] Streaming started (PID {d})\n", .{child.id});
std.debug.print("[tri-bot] SSE stream started (status {d})\n", .{status});

// Read SSE stream line-by-line
var transfer_buf: [8192]u8 = undefined;
var reader = response.reader(&transfer_buf);

// Read stdout line-by-line, accumulate text deltas, send drafts
var text_buf: std.ArrayList(u8) = .empty;
defer text_buf.deinit(allocator);

var last_draft_ns: i128 = std.time.nanoTimestamp();
const draft_interval: i128 = 500_000_000; // 500ms

const stdout_file = child.stdout.?;
var line_buf: [65536]u8 = undefined;
var line_len: usize = 0;
var read_chunk: [4096]u8 = undefined;

while (true) {
const n = stdout_file.read(&read_chunk) catch break;
if (n == 0) break; // EOF
if (state.cancel_requested.load(.acquire)) break;

const n = reader.readSliceShort(&read_chunk) catch break;

for (read_chunk[0..n]) |byte| {
if (byte == '\n') {
// Process complete NDJSON line
const line = line_buf[0..line_len];
if (std.mem.indexOf(u8, line, "\"text_delta\"") != null) {
if (json_utils.extractString(line, "text")) |text_val| {

// Parse SSE: "data: {json}"
if (line.len > 6 and std.mem.eql(u8, line[0..6], "data: ")) {
const json = line[6..];
if (extractTextDelta(json)) |text_val| {
text_buf.appendSlice(allocator, text_val) catch {};
}
}
Expand All @@ -144,31 +144,92 @@ pub fn runStreaming(
}
}

// Wait for child to finish
_ = child.wait() catch {};

std.debug.print("[tri-bot] Streaming done ({d} bytes)\n", .{text_buf.items.len});
std.debug.print("[tri-bot] SSE done ({d} bytes text)\n", .{text_buf.items.len});

// Send final message
if (text_buf.items.len > 0) {
telegram_api.sendLongMessage(allocator, config, text_buf.items);
} else {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9a\xa0 Claude returned empty response");
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9a\xa0 Empty response from API");
}
}

/// Stop the active Claude process via SIGTERM.
pub fn stopProcess(allocator: std.mem.Allocator, config: BotConfig, state: *StreamState) void {
const pid = state.active_pid.load(.acquire);
if (pid > 0) {
const posix_pid: std.posix.pid_t = @intCast(pid);
std.posix.kill(posix_pid, 15) catch |err| { // 15 = SIGTERM
var err_buf: [256]u8 = undefined;
telegram_api.sendFmt(allocator, config.bot_token, config.chat_id, &err_buf, "\xe2\x9d\x8c Kill error: {s}", .{@errorName(err)});
/// Build JSON request body for Anthropic Messages API (streaming).
fn buildRequestBody(allocator: std.mem.Allocator, body: *std.ArrayList(u8), model: []const u8, prompt: []const u8) !void {
try body.appendSlice(allocator, "{\"model\":\"");
try body.appendSlice(allocator, model);
try body.appendSlice(allocator, "\",\"max_tokens\":8192,\"stream\":true,\"messages\":[{\"role\":\"user\",\"content\":\"");
// JSON-escape prompt
for (prompt) |c| {
switch (c) {
'"' => try body.appendSlice(allocator, "\\\""),
'\\' => try body.appendSlice(allocator, "\\\\"),
'\n' => try body.appendSlice(allocator, "\\n"),
'\r' => try body.appendSlice(allocator, "\\r"),
'\t' => try body.appendSlice(allocator, "\\t"),
else => try body.append(allocator, c),
}
}
try body.appendSlice(allocator, "\"}]}");
}

/// Extract text from SSE content_block_delta event.
/// Input: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
/// Returns: "Hello"
fn extractTextDelta(json: []const u8) ?[]const u8 {
// Find "text_delta","text":" — the text field after the delta type
const needle = "\"text_delta\",\"text\":\"";
const idx = std.mem.indexOf(u8, json, needle) orelse return null;
const start = idx + needle.len;
if (start >= json.len) return null;
var end = start;
while (end < json.len) : (end += 1) {
if (json[end] == '"' and (end == start or json[end - 1] != '\\')) break;
}
if (end == start) return null;
return json[start..end];
}

/// Handle API error response — read body and send error to Telegram.
fn handleApiError(allocator: std.mem.Allocator, config: BotConfig, response: *std.http.Client.Response, status: u16) void {
var transfer_buf: [8192]u8 = undefined;
var reader = response.reader(&transfer_buf);
const err_body = reader.allocRemaining(allocator, std.Io.Limit.limited(4096)) catch {
var buf: [256]u8 = undefined;
telegram_api.sendFmt(allocator, config.bot_token, config.chat_id, &buf, "\xe2\x9d\x8c API error: HTTP {d}", .{status});
return;
};
defer allocator.free(err_body);

// Try to extract error message from JSON
var needle_buf: [128]u8 = undefined;
const needle = std.fmt.bufPrint(&needle_buf, "\"message\":\"", .{}) catch {
var buf: [256]u8 = undefined;
telegram_api.sendFmt(allocator, config.bot_token, config.chat_id, &buf, "\xe2\x9d\x8c API error: HTTP {d}", .{status});
return;
};
if (std.mem.indexOf(u8, err_body, needle)) |idx| {
const msg_start = idx + needle.len;
if (msg_start < err_body.len) {
var msg_end = msg_start;
while (msg_end < err_body.len and err_body[msg_end] != '"') : (msg_end += 1) {}
const msg = err_body[msg_start..msg_end];
var buf: [512]u8 = undefined;
telegram_api.sendFmt(allocator, config.bot_token, config.chat_id, &buf, "\xe2\x9d\x8c API {d}: {s}", .{ status, msg });
return;
};
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9b\x94 Claude process stopped");
}
}

var buf: [256]u8 = undefined;
telegram_api.sendFmt(allocator, config.bot_token, config.chat_id, &buf, "\xe2\x9d\x8c API error: HTTP {d}", .{status});
}

/// Cancel the active streaming request.
pub fn stopProcess(allocator: std.mem.Allocator, config: BotConfig, state: *StreamState) void {
if (state.is_busy.load(.acquire)) {
state.cancel_requested.store(true, .release);
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9b\x94 Cancelling request...");
} else {
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9a\xa0 No active Claude process");
telegram_api.sendMessage(allocator, config.bot_token, config.chat_id, "\xe2\x9a\xa0 No active request");
}
}
Loading
Loading