Skip to content

Commit ee7e9eb

Browse files
author
root
committed
Fix Claude SSE block framing for Claude Code
1 parent 317d721 commit ee7e9eb

1 file changed

Lines changed: 99 additions & 15 deletions

File tree

src/streaming.rs

Lines changed: 99 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,14 @@ struct ClaudeSink {
492492
id: String,
493493
model: String,
494494
started: bool,
495+
next_index: usize,
496+
active_block: Option<ClaudeActiveBlock>,
497+
}
498+
499+
#[derive(Clone)]
500+
enum ClaudeActiveBlock {
501+
Text { index: usize },
502+
Tool { index: usize, call_id: String },
495503
}
496504

497505
impl InternalSink for ClaudeSink {
@@ -515,6 +523,7 @@ impl InternalSink for ClaudeSink {
515523
"type": "message_start",
516524
"message": {
517525
"id": self.id,
526+
"type": "message",
518527
"model": self.model,
519528
"role": "assistant",
520529
"content": [],
@@ -529,22 +538,71 @@ impl InternalSink for ClaudeSink {
529538
if text.is_empty() {
530539
return Vec::new();
531540
}
532-
vec![encode_json_sse_with_event(
541+
let mut out = Vec::new();
542+
let index = match self.active_block.as_ref() {
543+
Some(ClaudeActiveBlock::Text { index }) => *index,
544+
Some(ClaudeActiveBlock::Tool { .. }) => {
545+
out.extend(self.close_active_block());
546+
let index = self.next_index;
547+
self.next_index += 1;
548+
self.active_block = Some(ClaudeActiveBlock::Text { index });
549+
out.push(encode_json_sse_with_event(
550+
&json!({
551+
"type": "content_block_start",
552+
"index": index,
553+
"content_block": {
554+
"type": "text",
555+
"text": ""
556+
}
557+
}),
558+
"content_block_start",
559+
));
560+
index
561+
}
562+
None => {
563+
let index = self.next_index;
564+
self.next_index += 1;
565+
self.active_block = Some(ClaudeActiveBlock::Text { index });
566+
out.push(encode_json_sse_with_event(
567+
&json!({
568+
"type": "content_block_start",
569+
"index": index,
570+
"content_block": {
571+
"type": "text",
572+
"text": ""
573+
}
574+
}),
575+
"content_block_start",
576+
));
577+
index
578+
}
579+
};
580+
out.push(encode_json_sse_with_event(
533581
&json!({
534582
"type": "content_block_delta",
583+
"index": index,
535584
"delta": {
536585
"type": "text_delta",
537586
"text": text
538587
}
539588
}),
540589
"content_block_delta",
541-
)]
590+
));
591+
out
542592
}
543593

544594
fn on_tool_call_start(&mut self, call_id: &str, name: &str) -> Vec<Vec<u8>> {
545-
vec![encode_json_sse_with_event(
595+
let mut out = self.close_active_block();
596+
let index = self.next_index;
597+
self.next_index += 1;
598+
self.active_block = Some(ClaudeActiveBlock::Tool {
599+
index,
600+
call_id: call_id.to_string(),
601+
});
602+
out.push(encode_json_sse_with_event(
546603
&json!({
547604
"type": "content_block_start",
605+
"index": index,
548606
"content_block": {
549607
"type": "tool_use",
550608
"id": call_id,
@@ -553,16 +611,24 @@ impl InternalSink for ClaudeSink {
553611
}
554612
}),
555613
"content_block_start",
556-
)]
614+
));
615+
out
557616
}
558617

559618
fn on_tool_call_args_delta(&mut self, _call_id: &str, _name: &str, delta: &str) -> Vec<Vec<u8>> {
560619
if delta.is_empty() {
561620
return Vec::new();
562621
}
622+
let Some(ClaudeActiveBlock::Tool { index, call_id }) = self.active_block.as_ref() else {
623+
return Vec::new();
624+
};
625+
if call_id != _call_id {
626+
return Vec::new();
627+
}
563628
vec![encode_json_sse_with_event(
564629
&json!({
565630
"type": "content_block_delta",
631+
"index": index,
566632
"delta": {
567633
"type": "input_json_delta",
568634
"partial_json": delta
@@ -581,24 +647,42 @@ impl InternalSink for ClaudeSink {
581647
let usage_obj = usage
582648
.and_then(chat_usage_to_claude_stream_usage)
583649
.unwrap_or_else(|| json!({"input_tokens": 0, "output_tokens": 0}));
584-
vec![
585-
encode_json_sse_with_event(
586-
&json!({
587-
"type": "message_delta",
588-
"delta": {"stop_reason": stop_reason, "stop_sequence": Value::Null},
589-
"usage": usage_obj
590-
}),
591-
"message_delta",
592-
),
593-
encode_json_sse_with_event(&json!({"type": "message_stop"}), "message_stop"),
594-
]
650+
let mut out = self.close_active_block();
651+
out.push(encode_json_sse_with_event(
652+
&json!({
653+
"type": "message_delta",
654+
"delta": {"stop_reason": stop_reason, "stop_sequence": Value::Null},
655+
"usage": usage_obj
656+
}),
657+
"message_delta",
658+
));
659+
out.push(encode_json_sse_with_event(&json!({"type": "message_stop"}), "message_stop"));
660+
out
595661
}
596662

597663
fn on_done(&mut self) -> Vec<Vec<u8>> {
598664
vec![encode_sse("[DONE]", None)]
599665
}
600666
}
601667

668+
impl ClaudeSink {
669+
fn close_active_block(&mut self) -> Vec<Vec<u8>> {
670+
let Some(active) = self.active_block.take() else {
671+
return Vec::new();
672+
};
673+
let index = match active {
674+
ClaudeActiveBlock::Text { index } | ClaudeActiveBlock::Tool { index, .. } => index,
675+
};
676+
vec![encode_json_sse_with_event(
677+
&json!({
678+
"type": "content_block_stop",
679+
"index": index
680+
}),
681+
"content_block_stop",
682+
)]
683+
}
684+
}
685+
602686
#[derive(Default)]
603687
struct GeminiSink {
604688
id: String,

0 commit comments

Comments
 (0)