1- // claude_stream.zig — Streaming Claude CLI execution with sendMessageDraft
2- // Spawns claude with --output-format stream-json, pipes stdout,
3- // reads NDJSON line-by-line, sends drafts every 500ms, final message when done.
1+ // claude_stream.zig — Streaming via direct Anthropic API (SSE)
2+ // POST to api.anthropic.com/v1/messages with stream:true, parse SSE events,
3+ // send Telegram drafts every 500ms, final message when done.
4+ // No claude CLI dependency. Pure Zig std.http.Client.
45const std = @import ("std" );
56const telegram_api = @import ("telegram_api.zig" );
6- const json_utils = @import ("json_utils.zig" );
77
88const BotConfig = telegram_api .BotConfig ;
99
10+ const api_url = "https://api.anthropic.com/v1/messages" ;
11+ const api_version = "2023-06-01" ;
12+ const default_model = "claude-sonnet-4-20250514" ;
13+
1014/// Shared state between main thread (polling) and worker thread (streaming).
11- /// Atomics for lock-free is_busy check, PID for /stop.
1215pub const StreamState = struct {
1316 is_busy : std .atomic .Value (bool ) = std .atomic .Value (bool ).init (false ),
14- active_pid : std .atomic .Value (i32 ) = std .atomic .Value (i32 ).init (0 ),
17+ cancel_requested : std .atomic .Value (bool ) = std .atomic .Value (bool ).init (false ),
1518};
1619
17- /// Options for streaming — covers /ask, /continue, and /resume .
20+ /// Options for streaming /ask.
1821pub const StreamOpts = struct {
1922 args : []const u8 , // prompt text (caller-owned, freed by worker)
20- use_continue : bool = false ,
21- resume_id : ? []const u8 = null , // session ID for --resume
22- model : ? []const u8 = null , // model override (pointer to BotState buffer)
23+ model : ? []const u8 = null , // model override
2324};
2425
25- /// Worker thread entry point: spawn claude, stream output, send drafts.
26- /// Called via std.Thread.spawn() from bot_loop dispatch.
26+ /// Worker thread entry point: POST to Anthropic API, stream SSE, send drafts.
2727pub fn runStreaming (
2828 allocator : std.mem.Allocator ,
2929 config : BotConfig ,
3030 opts : StreamOpts ,
3131 state : * StreamState ,
3232) void {
3333 defer {
34- state .active_pid .store (0 , .release );
34+ state .cancel_requested .store (false , .release );
3535 state .is_busy .store (false , .release );
3636 allocator .free (opts .args );
37- if (opts .resume_id ) | rid | allocator .free (rid );
3837 }
3938
40- // Notify user
41- if (opts .resume_id != null ) {
42- telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xf0\x9f\x94\x84 TRI resuming session..." );
43- } else if (opts .use_continue ) {
44- telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xf0\x9f\x94\x84 TRI streaming..." );
45- } else {
46- telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xf0\x9f\xa7\xa0 TRI streaming..." );
47- }
39+ telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xf0\x9f\xa7\xa0 TRI streaming..." );
4840
49- // Build turns string
50- var turns_buf : [16 ]u8 = undefined ;
51- const turns_str = std .fmt .bufPrint (& turns_buf , "{d}" , .{config .max_turns }) catch "10" ;
52-
53- // Build argv dynamically based on opts
54- var argv_buf : [16 ][]const u8 = undefined ;
55- var argc : usize = 0 ;
56-
57- argv_buf [argc ] = "claude" ;
58- argc += 1 ;
59- if (opts .args .len > 0 ) {
60- argv_buf [argc ] = "-p" ;
61- argc += 1 ;
62- argv_buf [argc ] = opts .args ;
63- argc += 1 ;
64- }
65- if (opts .resume_id ) | rid | {
66- argv_buf [argc ] = "--resume" ;
67- argc += 1 ;
68- argv_buf [argc ] = rid ;
69- argc += 1 ;
70- } else if (opts .use_continue ) {
71- argv_buf [argc ] = "--continue" ;
72- argc += 1 ;
73- }
74- if (opts .model ) | model | {
75- argv_buf [argc ] = "--model" ;
76- argc += 1 ;
77- argv_buf [argc ] = model ;
78- argc += 1 ;
79- }
80- argv_buf [argc ] = "--output-format" ;
81- argc += 1 ;
82- argv_buf [argc ] = "stream-json" ;
83- argc += 1 ;
84- argv_buf [argc ] = "--max-turns" ;
85- argc += 1 ;
86- argv_buf [argc ] = turns_str ;
87- argc += 1 ;
88-
89- // Init child with piped stdout
90- var child = std .process .Child .init (argv_buf [0.. argc ], allocator );
91- child .stdout_behavior = .Pipe ;
92- child .stderr_behavior = .Pipe ;
93- child .cwd = config .project_root ;
94-
95- child .spawn () catch | err | {
96- var err_buf : [256 ]u8 = undefined ;
97- telegram_api .sendFmt (allocator , config .bot_token , config .chat_id , & err_buf , "\xe2\x9d\x8c Spawn error: {s}" , .{@errorName (err )});
41+ const model = opts .model orelse default_model ;
42+
43+ // Build request body
44+ var body_buf : std .ArrayList (u8 ) = .empty ;
45+ defer body_buf .deinit (allocator );
46+
47+ buildRequestBody (allocator , & body_buf , model , opts .args ) catch {
48+ telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9d\x8c Failed to build request" );
49+ return ;
50+ };
51+
52+ std .debug .print ("[tri-bot] SSE request: {d} bytes, model={s}\n " , .{ body_buf .items .len , model });
53+
54+ // HTTP POST to Anthropic API
55+ var client = std.http.Client { .allocator = allocator };
56+ defer client .deinit ();
57+
58+ const uri = std .Uri .parse (api_url ) catch unreachable ;
59+
60+ var req = client .request (.POST , uri , .{
61+ .extra_headers = &.{
62+ .{ .name = "Content-Type" , .value = "application/json" },
63+ .{ .name = "x-api-key" , .value = config .api_key },
64+ .{ .name = "anthropic-version" , .value = api_version },
65+ },
66+ }) catch {
67+ telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9d\x8c Connection failed" );
68+ return ;
69+ };
70+ defer req .deinit ();
71+
72+ // Send body
73+ req .transfer_encoding = .{ .content_length = body_buf .items .len };
74+ var bw = req .sendBodyUnflushed (&.{}) catch {
75+ telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9d\x8c Send failed" );
76+ return ;
77+ };
78+ bw .writer .writeAll (body_buf .items ) catch {
79+ telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9d\x8c Write failed" );
80+ return ;
81+ };
82+ bw .end () catch {};
83+ if (req .connection ) | conn | conn .flush () catch {};
84+
85+ // Receive response head
86+ var redirect_buf : [0 ]u8 = .{};
87+ var response = req .receiveHead (& redirect_buf ) catch {
88+ telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9d\x8c No response from API" );
9889 return ;
9990 };
10091
101- // Store PID for /stop
102- state .active_pid .store (@intCast (child .id ), .release );
92+ // Check for HTTP errors
93+ const status = @intFromEnum (response .head .status );
94+ if (status >= 400 ) {
95+ handleApiError (allocator , config , & response , status );
96+ return ;
97+ }
10398
104- std .debug .print ("[tri-bot] Streaming started (PID {d})\n " , .{child .id });
99+ std .debug .print ("[tri-bot] SSE stream started (status {d})\n " , .{status });
100+
101+ // Read SSE stream line-by-line
102+ var transfer_buf : [8192 ]u8 = undefined ;
103+ var reader = response .reader (& transfer_buf );
105104
106- // Read stdout line-by-line, accumulate text deltas, send drafts
107105 var text_buf : std .ArrayList (u8 ) = .empty ;
108106 defer text_buf .deinit (allocator );
109107
110108 var last_draft_ns : i128 = std .time .nanoTimestamp ();
111109 const draft_interval : i128 = 500_000_000 ; // 500ms
112110
113- const stdout_file = child .stdout .? ;
114111 var line_buf : [65536 ]u8 = undefined ;
115112 var line_len : usize = 0 ;
116113 var read_chunk : [4096 ]u8 = undefined ;
117114
118115 while (true ) {
119- const n = stdout_file .read (& read_chunk ) catch break ;
120- if (n == 0 ) break ; // EOF
116+ if (state .cancel_requested .load (.acquire )) break ;
117+
118+ const n = reader .readSliceShort (& read_chunk ) catch break ;
121119
122120 for (read_chunk [0.. n ]) | byte | {
123121 if (byte == '\n ' ) {
124- // Process complete NDJSON line
125122 const line = line_buf [0.. line_len ];
126- if (std .mem .indexOf (u8 , line , "\" text_delta\" " ) != null ) {
127- if (json_utils .extractString (line , "text" )) | text_val | {
123+
124+ // Parse SSE: "data: {json}"
125+ if (line .len > 6 and std .mem .eql (u8 , line [0.. 6], "data: " )) {
126+ const json = line [6.. ];
127+ if (extractTextDelta (json )) | text_val | {
128128 text_buf .appendSlice (allocator , text_val ) catch {};
129129 }
130130 }
@@ -144,31 +144,92 @@ pub fn runStreaming(
144144 }
145145 }
146146
147- // Wait for child to finish
148- _ = child .wait () catch {};
149-
150- std .debug .print ("[tri-bot] Streaming done ({d} bytes)\n " , .{text_buf .items .len });
147+ std .debug .print ("[tri-bot] SSE done ({d} bytes text)\n " , .{text_buf .items .len });
151148
152149 // Send final message
153150 if (text_buf .items .len > 0 ) {
154151 telegram_api .sendLongMessage (allocator , config , text_buf .items );
155152 } else {
156- telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9a\xa0 Claude returned empty response " );
153+ telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9a\xa0 Empty response from API " );
157154 }
158155}
159156
160- /// Stop the active Claude process via SIGTERM.
161- pub fn stopProcess (allocator : std.mem.Allocator , config : BotConfig , state : * StreamState ) void {
162- const pid = state .active_pid .load (.acquire );
163- if (pid > 0 ) {
164- const posix_pid : std.posix.pid_t = @intCast (pid );
165- std .posix .kill (posix_pid , 15 ) catch | err | { // 15 = SIGTERM
166- var err_buf : [256 ]u8 = undefined ;
167- telegram_api .sendFmt (allocator , config .bot_token , config .chat_id , & err_buf , "\xe2\x9d\x8c Kill error: {s}" , .{@errorName (err )});
157+ /// Build JSON request body for Anthropic Messages API (streaming).
158+ fn buildRequestBody (allocator : std.mem.Allocator , body : * std .ArrayList (u8 ), model : []const u8 , prompt : []const u8 ) ! void {
159+ try body .appendSlice (allocator , "{\" model\" :\" " );
160+ try body .appendSlice (allocator , model );
161+ try body .appendSlice (allocator , "\" ,\" max_tokens\" :8192,\" stream\" :true,\" messages\" :[{\" role\" :\" user\" ,\" content\" :\" " );
162+ // JSON-escape prompt
163+ for (prompt ) | c | {
164+ switch (c ) {
165+ '"' = > try body .appendSlice (allocator , "\\ \" " ),
166+ '\\ ' = > try body .appendSlice (allocator , "\\\\ " ),
167+ '\n ' = > try body .appendSlice (allocator , "\\ n" ),
168+ '\r ' = > try body .appendSlice (allocator , "\\ r" ),
169+ '\t ' = > try body .appendSlice (allocator , "\\ t" ),
170+ else = > try body .append (allocator , c ),
171+ }
172+ }
173+ try body .appendSlice (allocator , "\" }]}" );
174+ }
175+
176+ /// Extract text from SSE content_block_delta event.
177+ /// Input: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
178+ /// Returns: "Hello"
179+ fn extractTextDelta (json : []const u8 ) ? []const u8 {
180+ // Find "text_delta","text":" — the text field after the delta type
181+ const needle = "\" text_delta\" ,\" text\" :\" " ;
182+ const idx = std .mem .indexOf (u8 , json , needle ) orelse return null ;
183+ const start = idx + needle .len ;
184+ if (start >= json .len ) return null ;
185+ var end = start ;
186+ while (end < json .len ) : (end += 1 ) {
187+ if (json [end ] == '"' and (end == start or json [end - 1 ] != '\\ ' )) break ;
188+ }
189+ if (end == start ) return null ;
190+ return json [start .. end ];
191+ }
192+
193+ /// Handle API error response — read body and send error to Telegram.
194+ fn handleApiError (allocator : std.mem.Allocator , config : BotConfig , response : * std.http.Client.Response , status : u16 ) void {
195+ var transfer_buf : [8192 ]u8 = undefined ;
196+ var reader = response .reader (& transfer_buf );
197+ const err_body = reader .allocRemaining (allocator , std .Io .Limit .limited (4096 )) catch {
198+ var buf : [256 ]u8 = undefined ;
199+ telegram_api .sendFmt (allocator , config .bot_token , config .chat_id , & buf , "\xe2\x9d\x8c API error: HTTP {d}" , .{status });
200+ return ;
201+ };
202+ defer allocator .free (err_body );
203+
204+ // Try to extract error message from JSON
205+ var needle_buf : [128 ]u8 = undefined ;
206+ const needle = std .fmt .bufPrint (& needle_buf , "\" message\" :\" " , .{}) catch {
207+ var buf : [256 ]u8 = undefined ;
208+ telegram_api .sendFmt (allocator , config .bot_token , config .chat_id , & buf , "\xe2\x9d\x8c API error: HTTP {d}" , .{status });
209+ return ;
210+ };
211+ if (std .mem .indexOf (u8 , err_body , needle )) | idx | {
212+ const msg_start = idx + needle .len ;
213+ if (msg_start < err_body .len ) {
214+ var msg_end = msg_start ;
215+ while (msg_end < err_body .len and err_body [msg_end ] != '"' ) : (msg_end += 1 ) {}
216+ const msg = err_body [msg_start .. msg_end ];
217+ var buf : [512 ]u8 = undefined ;
218+ telegram_api .sendFmt (allocator , config .bot_token , config .chat_id , & buf , "\xe2\x9d\x8c API {d}: {s}" , .{ status , msg });
168219 return ;
169- };
170- telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9b\x94 Claude process stopped" );
220+ }
221+ }
222+
223+ var buf : [256 ]u8 = undefined ;
224+ telegram_api .sendFmt (allocator , config .bot_token , config .chat_id , & buf , "\xe2\x9d\x8c API error: HTTP {d}" , .{status });
225+ }
226+
227+ /// Cancel the active streaming request.
228+ pub fn stopProcess (allocator : std.mem.Allocator , config : BotConfig , state : * StreamState ) void {
229+ if (state .is_busy .load (.acquire )) {
230+ state .cancel_requested .store (true , .release );
231+ telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9b\x94 Cancelling request..." );
171232 } else {
172- telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9a\xa0 No active Claude process " );
233+ telegram_api .sendMessage (allocator , config .bot_token , config .chat_id , "\xe2\x9a\xa0 No active request " );
173234 }
174235}
0 commit comments