Skip to content

Commit cc9dcd0

Browse files
CSResselclaude
andauthored
feat: Implement ACP inline streaming with word wrapping (#48)
## Summary This PR introduces a new inline streaming system for ACP agents that displays assistant messages incrementally with proper word wrapping before committing them to the scrollback buffer. ### Key Changes - **New BackendEvent enum**: Wraps `ConversationEvent` and adds inline events (`InlineBegin`, `InlineUpdate`, `InlineCommit`, `InlineAbort`) - **InlineEntryState tracking**: Manages streaming text with dynamic word wrapping in `src/history.rs` - **Text wrapping utility**: New `text_utils` module with `wrap_text_to_width` function for proper text reflow - **Backend updates**: All backends now emit `BackendEvent` instead of `ConversationEvent` - **UI integration**: Model tracks inline entries and renders them separately from committed scrollback - **Test updates**: All tests updated to handle new `BackendEvent` structure with inline event tracking ### Benefits ✨ Assistant messages appear immediately as they stream 📐 Proper word wrapping adapts to terminal width changes 🎯 Cleaner separation between streaming and committed content 🧪 All tests passing with comprehensive inline event handling ## Test Plan - [x] All existing tests pass - [x] New inline event tracking tested in `acp_runner_test.rs` - [x] Word wrapping behavior verified - [x] Terminal resize handling tested 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 7a3ad7e commit cc9dcd0

16 files changed

Lines changed: 750 additions & 263 deletions

src/acp_runner.rs

Lines changed: 169 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#![allow(dead_code)]
22

3+
use crate::backends::BackendEvent;
34
use crate::conversation::{ConversationEvent, PlanEntry};
5+
use crate::history::{InlineEntryId, InlineEntryKind, InlineEntryUpdate};
46
use agent_client_protocol::{
57
self as acp, Agent, Client, ClientCapabilities, ContentBlock, FileSystemCapability,
68
Implementation, InitializeRequest, NewSessionRequest, PermissionOptionKind, PlanEntryPriority,
@@ -10,6 +12,7 @@ use agent_client_protocol::{
1012
ToolCallStatus, ToolKind, WriteTextFileRequest, WriteTextFileResponse,
1113
};
1214
use futures::stream::Stream;
15+
use std::cell::RefCell;
1316
use std::path::PathBuf;
1417
use std::pin::Pin;
1518
use std::process::Stdio;
@@ -263,7 +266,7 @@ impl AcpAgentRunner {
263266
&mut self,
264267
prompt: String,
265268
cancel_token: CancellationToken,
266-
) -> Result<Pin<Box<dyn Stream<Item = ConversationEvent> + Send>>, String> {
269+
) -> Result<Pin<Box<dyn Stream<Item = BackendEvent> + Send>>, String> {
267270
if let Some(mut existing) = self._agent_process.take() {
268271
let _ = existing.kill().await;
269272
}
@@ -356,7 +359,7 @@ async fn run_acp_connection(
356359
stdout: ChildStdout,
357360
client_handler: AcpClientHandler,
358361
session_update_rx: mpsc::UnboundedReceiver<SessionUpdate>,
359-
event_tx: mpsc::UnboundedSender<ConversationEvent>,
362+
event_tx: mpsc::UnboundedSender<BackendEvent>,
360363
cancel_token: CancellationToken,
361364
prompt: String,
362365
cwd: PathBuf,
@@ -382,10 +385,10 @@ async fn run_acp_connection(
382385
.await;
383386

384387
if let Err(err) = result {
385-
let _ = event_tx.send(ConversationEvent::SystemEvent {
388+
let _ = event_tx.send(BackendEvent::Conversation(ConversationEvent::SystemEvent {
386389
subtype: "acp_error".to_string(),
387390
details: Some(err),
388-
});
391+
}));
389392
}
390393
}
391394

@@ -395,7 +398,7 @@ async fn run_connection_inner(
395398
stdout: ChildStdout,
396399
client_handler: AcpClientHandler,
397400
session_update_rx: mpsc::UnboundedReceiver<SessionUpdate>,
398-
event_tx: mpsc::UnboundedSender<ConversationEvent>,
401+
event_tx: mpsc::UnboundedSender<BackendEvent>,
399402
cancel_token: CancellationToken,
400403
prompt: String,
401404
cwd: PathBuf,
@@ -414,20 +417,41 @@ async fn run_connection_inner(
414417
let mut handshake_tx = Some(handshake_tx);
415418
let io_task = tokio::task::spawn_local(io_future);
416419

420+
let inline_tracker = Rc::new(RefCell::new(InlineEntryTracker::default()));
417421
{
418422
let event_tx = event_tx.clone();
423+
let tracker = Rc::clone(&inline_tracker);
419424
tokio::task::spawn_local(async move {
420425
let mut updates = session_update_rx;
421426
while let Some(update) = updates.recv().await {
422427
// Send debug event for all session updates
423428
let debug_event = ConversationEvent::UnknownEvent {
424429
raw: format!("{update:?}"),
425430
};
426-
let _ = event_tx.send(debug_event);
427-
428-
// Send translated event if available
429-
if let Some(event) = translate_session_update(update) {
430-
let _ = event_tx.send(event);
431+
let _ = event_tx.send(BackendEvent::Conversation(debug_event));
432+
433+
match update {
434+
SessionUpdate::AgentMessageChunk(chunk) => {
435+
if let ContentBlock::Text(text_content) = chunk.content {
436+
let mut tracker = tracker.borrow_mut();
437+
tracker.append_agent_chunk(text_content.text, &event_tx);
438+
}
439+
}
440+
SessionUpdate::AgentThoughtChunk(chunk) => {
441+
if let ContentBlock::Text(text_content) = chunk.content {
442+
let mut tracker = tracker.borrow_mut();
443+
tracker.append_thinking_chunk(text_content.text, &event_tx);
444+
}
445+
}
446+
other => {
447+
{
448+
let mut tracker = tracker.borrow_mut();
449+
tracker.commit_kind(InlineEntryKind::AgentThinking, &event_tx);
450+
}
451+
if let Some(event) = translate_session_update(other) {
452+
let _ = event_tx.send(BackendEvent::Conversation(event));
453+
}
454+
}
431455
}
432456
}
433457
});
@@ -483,13 +507,13 @@ async fn run_connection_inner(
483507
}
484508

485509
// Send debug event for successful initialization
486-
let _ = event_tx.send(ConversationEvent::SystemEvent {
510+
let _ = event_tx.send(BackendEvent::Conversation(ConversationEvent::SystemEvent {
487511
subtype: "acp_initialized".to_string(),
488512
details: Some(format!(
489513
"Protocol version: {:?}",
490514
init_response.protocol_version
491515
)),
492-
});
516+
}));
493517

494518
let session_response = match connection
495519
.new_session(NewSessionRequest {
@@ -511,10 +535,10 @@ async fn run_connection_inner(
511535
let session_id = session_response.session_id.clone();
512536

513537
// Send debug event for session creation
514-
let _ = event_tx.send(ConversationEvent::SystemEvent {
538+
let _ = event_tx.send(BackendEvent::Conversation(ConversationEvent::SystemEvent {
515539
subtype: "acp_session_created".to_string(),
516540
details: Some(format!("Session ID: {session_id}")),
517-
});
541+
}));
518542

519543
if let Some(tx) = handshake_tx.take() {
520544
let _ = tx.send(Ok(()));
@@ -536,10 +560,10 @@ async fn run_connection_inner(
536560
}
537561

538562
// Send debug event for prompt
539-
let _ = event_tx.send(ConversationEvent::SystemEvent {
563+
let _ = event_tx.send(BackendEvent::Conversation(ConversationEvent::SystemEvent {
540564
subtype: "acp_prompt_sent".to_string(),
541565
details: Some(format!("Prompt length: {} chars", prompt.len())),
542-
});
566+
}));
543567

544568
let prompt_request = PromptRequest {
545569
session_id,
@@ -551,11 +575,28 @@ async fn run_connection_inner(
551575
meta: None,
552576
};
553577

554-
if let Err(err) = connection.prompt(prompt_request).await {
555-
let _ = event_tx.send(ConversationEvent::SystemEvent {
556-
subtype: "prompt_failed".to_string(),
557-
details: Some(err.to_string()),
558-
});
578+
match connection.prompt(prompt_request).await {
579+
Ok(response) => {
580+
{
581+
let mut tracker = inline_tracker.borrow_mut();
582+
tracker.commit_all(&event_tx);
583+
}
584+
let success = matches!(response.stop_reason, acp::StopReason::EndTurn);
585+
let details = format!("Stop reason: {:?}", response.stop_reason);
586+
let _ = event_tx.send(BackendEvent::Conversation(
587+
ConversationEvent::ResultSummary { success, details },
588+
));
589+
}
590+
Err(err) => {
591+
{
592+
let mut tracker = inline_tracker.borrow_mut();
593+
tracker.abort_all(&event_tx);
594+
}
595+
let _ = event_tx.send(BackendEvent::Conversation(ConversationEvent::SystemEvent {
596+
subtype: "prompt_failed".to_string(),
597+
details: Some(err.to_string()),
598+
}));
599+
}
559600
}
560601

561602
io_task.abort();
@@ -567,6 +608,113 @@ async fn run_connection_inner(
567608
}
568609
}
569610

611+
#[derive(Default)]
612+
struct InlineEntryTracker {
613+
next_id: usize,
614+
assistant_entry: Option<InlineEntryId>,
615+
thinking_entry: Option<InlineEntryId>,
616+
}
617+
618+
impl InlineEntryTracker {
619+
fn append_agent_chunk(&mut self, text: String, event_tx: &mpsc::UnboundedSender<BackendEvent>) {
620+
if text.is_empty() {
621+
return;
622+
}
623+
self.commit_kind(InlineEntryKind::AgentThinking, event_tx);
624+
self.append_chunk(InlineEntryKind::AssistantMessage, text, event_tx);
625+
}
626+
627+
fn append_thinking_chunk(
628+
&mut self,
629+
text: String,
630+
event_tx: &mpsc::UnboundedSender<BackendEvent>,
631+
) {
632+
if text.is_empty() {
633+
return;
634+
}
635+
self.append_chunk(InlineEntryKind::AgentThinking, text, event_tx);
636+
}
637+
638+
fn append_chunk(
639+
&mut self,
640+
kind: InlineEntryKind,
641+
text: String,
642+
event_tx: &mpsc::UnboundedSender<BackendEvent>,
643+
) {
644+
let id = self.ensure_entry(kind.clone(), event_tx);
645+
let _ = event_tx.send(BackendEvent::InlineUpdate {
646+
id,
647+
update: InlineEntryUpdate::AppendText(text),
648+
});
649+
}
650+
651+
fn ensure_entry(
652+
&mut self,
653+
kind: InlineEntryKind,
654+
event_tx: &mpsc::UnboundedSender<BackendEvent>,
655+
) -> InlineEntryId {
656+
let slot = match kind {
657+
InlineEntryKind::AssistantMessage => &mut self.assistant_entry,
658+
InlineEntryKind::AgentThinking => &mut self.thinking_entry,
659+
};
660+
661+
if let Some(id) = slot.clone() {
662+
return id;
663+
}
664+
665+
self.next_id += 1;
666+
let prefix = match kind {
667+
InlineEntryKind::AssistantMessage => "assistant",
668+
InlineEntryKind::AgentThinking => "thinking",
669+
};
670+
let id = format!("{prefix}-{}", self.next_id);
671+
let _ = event_tx.send(BackendEvent::InlineBegin {
672+
id: id.clone(),
673+
kind,
674+
});
675+
*slot = Some(id.clone());
676+
id
677+
}
678+
679+
fn commit_kind(
680+
&mut self,
681+
kind: InlineEntryKind,
682+
event_tx: &mpsc::UnboundedSender<BackendEvent>,
683+
) {
684+
let slot = match kind {
685+
InlineEntryKind::AssistantMessage => &mut self.assistant_entry,
686+
InlineEntryKind::AgentThinking => &mut self.thinking_entry,
687+
};
688+
if let Some(id) = slot.take() {
689+
let _ = event_tx.send(BackendEvent::InlineCommit { id });
690+
}
691+
}
692+
693+
fn commit_all(&mut self, event_tx: &mpsc::UnboundedSender<BackendEvent>) {
694+
self.commit_kind(InlineEntryKind::AgentThinking, event_tx);
695+
self.commit_kind(InlineEntryKind::AssistantMessage, event_tx);
696+
}
697+
698+
fn abort_kind(
699+
&mut self,
700+
kind: InlineEntryKind,
701+
event_tx: &mpsc::UnboundedSender<BackendEvent>,
702+
) {
703+
let slot = match kind {
704+
InlineEntryKind::AssistantMessage => &mut self.assistant_entry,
705+
InlineEntryKind::AgentThinking => &mut self.thinking_entry,
706+
};
707+
if let Some(id) = slot.take() {
708+
let _ = event_tx.send(BackendEvent::InlineAbort { id });
709+
}
710+
}
711+
712+
fn abort_all(&mut self, event_tx: &mpsc::UnboundedSender<BackendEvent>) {
713+
self.abort_kind(InlineEntryKind::AgentThinking, event_tx);
714+
self.abort_kind(InlineEntryKind::AssistantMessage, event_tx);
715+
}
716+
}
717+
570718
#[cfg(test)]
571719
mod tests {
572720
use super::*;

0 commit comments

Comments
 (0)