Skip to content

Commit fc709cf

Browse files
authored
Merge pull request #923 from AgentWorkforce/claude/fix-issue-922-4REVi
fix(broker): preserve split multi-byte UTF-8 in worker_stream (#922)
2 parents cf82bf5 + 444edd8 commit fc709cf

4 files changed

Lines changed: 290 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
7979
- SDK `sendInput` routes through the PTY worker protocol so input reaches the agent PTY.
8080
- DM delivery retries now end in a surfaced `message_delivery_failed` event instead of silently retrying forever.
8181
- The PTY watchdog marks agents with pending delivery work as blocked-on-send instead of idle.
82+
- PTY `worker_stream` events preserve multi-byte UTF-8 characters split across read chunks instead of emitting `U+FFFD` replacement glyphs.
8283

8384
## [6.2.5] - 2026-05-19
8485

crates/broker/src/pty_worker.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::readiness::{cli_prompt_ready, detect_cli_ready, GridReadinessSnapshot
3232
use crate::runtime::{get_terminal_size, send_frame};
3333
use crate::snapshot::Snapshot;
3434
use crate::util::ansi::{floor_char_boundary, strip_ansi};
35+
use crate::util::utf8_stream::Utf8StreamDecoder;
3536
use crate::worker::detection::ActivityDetector;
3637
use crate::wrap::{PtyAutoState, AUTO_SUGGESTION_BLOCK_TIMEOUT};
3738
use base64::Engine;
@@ -336,6 +337,10 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> {
336337
// Bounded to avoid unbounded memory growth; continuity blocks are small.
337338
let mut continuity_buffer = String::new();
338339
const CONTINUITY_BUFFER_MAX: usize = 4096;
340+
// Streaming UTF-8 decoder. PTY reads can split multi-byte codepoints
341+
// across chunks; the decoder holds incomplete trailing bytes for the
342+
// next chunk instead of corrupting them into U+FFFD.
343+
let mut utf8_decoder = Utf8StreamDecoder::new();
339344
// Rate-limited buffering for worker_stream emissions.
340345
// Chunks are accumulated and flushed at most every 100ms or when buffer exceeds threshold.
341346
let mut stream_buffer = String::new();
@@ -594,9 +599,14 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> {
594599
reported_idle = false;
595600
// Child is provably alive — reset the no-PID exit counter.
596601
pty.reset_no_pid_checks();
597-
let text = String::from_utf8_lossy(&chunk).to_string();
598-
let clean_text = strip_ansi(&text);
599602
startup_total_bytes = startup_total_bytes.saturating_add(chunk.len());
603+
let text = utf8_decoder.decode(&chunk);
604+
if text.is_empty() {
605+
// Whole chunk was an incomplete UTF-8 prefix held
606+
// back for the next read; nothing to emit yet.
607+
continue;
608+
}
609+
let clean_text = strip_ansi(&text);
600610
append_bounded(
601611
&mut startup_output,
602612
&clean_text,
@@ -860,7 +870,16 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> {
860870
}
861871
None => {
862872
// PTY reader closed — child likely exited. Flush
863-
// any buffered stream output before sending
873+
// any incomplete trailing UTF-8 bytes (no further
874+
// chunks will arrive to complete them) before the
875+
// stream_buffer flush so they reach worker_stream
876+
// and echo_buffer like normal output.
877+
let tail = utf8_decoder.flush();
878+
if !tail.is_empty() {
879+
stream_buffer.push_str(&tail);
880+
echo_buffer.push_str(&tail);
881+
}
882+
// Flush any buffered stream output before sending
864883
// agent_exit to preserve output ordering.
865884
flush_stream_buffer!();
866885
// Emit agent_exit with any echo_buffer tail so the
@@ -1111,13 +1130,26 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> {
11111130
flush_stream_buffer!();
11121131
let mut late_output = String::new();
11131132
while let Ok(chunk) = pty_rx.try_recv() {
1114-
let text = String::from_utf8_lossy(&chunk).to_string();
1133+
let text = utf8_decoder.decode(&chunk);
1134+
if text.is_empty() {
1135+
continue;
1136+
}
11151137
late_output.push_str(&text);
11161138
let _ = send_frame(&out_tx, "worker_stream", None, json!({
11171139
"stream": "stdout",
11181140
"chunk": text,
11191141
})).await;
11201142
}
1143+
// Stream is closing; flush any incomplete trailing bytes
1144+
// as U+FFFD rather than silently dropping them.
1145+
let tail = utf8_decoder.flush();
1146+
if !tail.is_empty() {
1147+
late_output.push_str(&tail);
1148+
let _ = send_frame(&out_tx, "worker_stream", None, json!({
1149+
"stream": "stdout",
1150+
"chunk": tail,
1151+
})).await;
1152+
}
11211153
if !late_output.is_empty() {
11221154
let clean = strip_ansi(&late_output);
11231155
tracing::warn!(

crates/broker/src/util/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub(crate) mod ansi;
22
pub(crate) mod terminal;
3+
pub(crate) mod utf8_stream;
34
pub(crate) mod version;
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
//! Stateful streaming UTF-8 decoder.
2+
//!
3+
//! PTY reads can land in the middle of a multi-byte UTF-8 codepoint, so a
4+
//! naïve `String::from_utf8_lossy` on each chunk replaces partial sequences
5+
//! with `U+FFFD` even though the next chunk would complete the codepoint.
6+
//! `Utf8StreamDecoder` keeps any trailing incomplete byte sequence buffered
7+
//! across `decode` calls and only substitutes `U+FFFD` for byte sequences
8+
//! that are definitively invalid.
9+
10+
/// Streaming UTF-8 decoder that preserves codepoints split across byte
11+
/// chunks.
12+
#[derive(Debug, Default)]
13+
pub(crate) struct Utf8StreamDecoder {
14+
pending: Vec<u8>,
15+
}
16+
17+
impl Utf8StreamDecoder {
18+
pub(crate) fn new() -> Self {
19+
Self {
20+
pending: Vec::new(),
21+
}
22+
}
23+
24+
/// Decode an incoming byte chunk, returning all complete UTF-8 text
25+
/// available. Any trailing bytes that form an incomplete codepoint are
26+
/// retained for the next call.
27+
pub(crate) fn decode(&mut self, bytes: &[u8]) -> String {
28+
if bytes.is_empty() && self.pending.is_empty() {
29+
return String::new();
30+
}
31+
self.pending.extend_from_slice(bytes);
32+
let mut output = String::with_capacity(self.pending.len());
33+
let mut cursor = 0;
34+
35+
while cursor < self.pending.len() {
36+
match std::str::from_utf8(&self.pending[cursor..]) {
37+
Ok(s) => {
38+
output.push_str(s);
39+
cursor = self.pending.len();
40+
break;
41+
}
42+
Err(e) => {
43+
let valid_up_to = e.valid_up_to();
44+
if valid_up_to > 0 {
45+
// SAFETY: from_utf8 reported these bytes as valid.
46+
let valid =
47+
std::str::from_utf8(&self.pending[cursor..cursor + valid_up_to])
48+
.expect("valid_up_to slice must be valid UTF-8");
49+
output.push_str(valid);
50+
cursor += valid_up_to;
51+
}
52+
53+
match e.error_len() {
54+
Some(invalid_len) => {
55+
output.push('\u{FFFD}');
56+
cursor += invalid_len;
57+
}
58+
None => {
59+
// Incomplete sequence at the end of the buffer —
60+
// hold it for the next chunk.
61+
break;
62+
}
63+
}
64+
}
65+
}
66+
}
67+
68+
self.pending.drain(..cursor);
69+
output
70+
}
71+
72+
/// Drain any remaining buffered bytes, emitting `U+FFFD` for each
73+
/// incomplete sequence. Call once no more bytes will arrive.
74+
pub(crate) fn flush(&mut self) -> String {
75+
if self.pending.is_empty() {
76+
return String::new();
77+
}
78+
let bytes = std::mem::take(&mut self.pending);
79+
let mut output = String::with_capacity(bytes.len());
80+
let mut cursor = 0;
81+
while cursor < bytes.len() {
82+
match std::str::from_utf8(&bytes[cursor..]) {
83+
Ok(s) => {
84+
output.push_str(s);
85+
break;
86+
}
87+
Err(e) => {
88+
let valid_up_to = e.valid_up_to();
89+
if valid_up_to > 0 {
90+
let valid = std::str::from_utf8(&bytes[cursor..cursor + valid_up_to])
91+
.expect("valid_up_to slice must be valid UTF-8");
92+
output.push_str(valid);
93+
cursor += valid_up_to;
94+
}
95+
output.push('\u{FFFD}');
96+
match e.error_len() {
97+
Some(invalid_len) => cursor += invalid_len,
98+
// Incomplete trailing sequence: consume the rest.
99+
None => break,
100+
}
101+
}
102+
}
103+
}
104+
output
105+
}
106+
}
107+
108+
#[cfg(test)]
109+
mod tests {
110+
use super::*;
111+
112+
#[test]
113+
fn decodes_ascii_passthrough() {
114+
let mut dec = Utf8StreamDecoder::new();
115+
assert_eq!(dec.decode(b"hello"), "hello");
116+
assert_eq!(dec.decode(b" world"), " world");
117+
}
118+
119+
#[test]
120+
fn box_drawing_split_across_two_chunks() {
121+
// U+2500 BOX DRAWINGS LIGHT HORIZONTAL = E2 94 80
122+
let mut dec = Utf8StreamDecoder::new();
123+
let first = dec.decode(&[0xE2]);
124+
let second = dec.decode(&[0x94, 0x80]);
125+
assert_eq!(first, "");
126+
assert_eq!(second, "\u{2500}");
127+
assert!(!format!("{first}{second}").contains('\u{FFFD}'));
128+
}
129+
130+
#[test]
131+
fn box_drawing_split_at_every_byte_boundary() {
132+
// U+2588 FULL BLOCK = E2 96 88
133+
let original = "\u{2588}";
134+
let bytes = original.as_bytes();
135+
for split in 1..bytes.len() {
136+
let mut dec = Utf8StreamDecoder::new();
137+
let mut combined = dec.decode(&bytes[..split]);
138+
combined.push_str(&dec.decode(&bytes[split..]));
139+
assert_eq!(
140+
combined, original,
141+
"split at {split} should preserve the original codepoint"
142+
);
143+
assert!(
144+
!combined.contains('\u{FFFD}'),
145+
"split at {split} produced replacement char: {combined:?}"
146+
);
147+
}
148+
}
149+
150+
#[test]
151+
fn cjk_glyph_split_across_chunks() {
152+
// U+4E2D 中 = E4 B8 AD
153+
let original = "\u{4E2D}";
154+
let bytes = original.as_bytes();
155+
for split in 1..bytes.len() {
156+
let mut dec = Utf8StreamDecoder::new();
157+
let mut combined = dec.decode(&bytes[..split]);
158+
combined.push_str(&dec.decode(&bytes[split..]));
159+
assert_eq!(combined, original);
160+
assert!(!combined.contains('\u{FFFD}'));
161+
}
162+
}
163+
164+
#[test]
165+
fn four_byte_emoji_split_at_every_boundary() {
166+
// U+1F600 😀 = F0 9F 98 80
167+
let original = "\u{1F600}";
168+
let bytes = original.as_bytes();
169+
for split in 1..bytes.len() {
170+
let mut dec = Utf8StreamDecoder::new();
171+
let mut combined = dec.decode(&bytes[..split]);
172+
combined.push_str(&dec.decode(&bytes[split..]));
173+
assert_eq!(combined, original, "split at {split}");
174+
assert!(!combined.contains('\u{FFFD}'));
175+
}
176+
}
177+
178+
#[test]
179+
fn byte_by_byte_streaming() {
180+
let original = "héllo 世界 😀 ─";
181+
let bytes = original.as_bytes();
182+
let mut dec = Utf8StreamDecoder::new();
183+
let mut out = String::new();
184+
for b in bytes {
185+
out.push_str(&dec.decode(&[*b]));
186+
}
187+
out.push_str(&dec.flush());
188+
assert_eq!(out, original);
189+
assert!(!out.contains('\u{FFFD}'));
190+
}
191+
192+
#[test]
193+
fn invalid_byte_in_middle_is_replaced() {
194+
let mut dec = Utf8StreamDecoder::new();
195+
// 'A' (0x41) + invalid lone 0xFF + 'B' (0x42)
196+
let out = dec.decode(&[0x41, 0xFF, 0x42]);
197+
assert_eq!(out, "A\u{FFFD}B");
198+
}
199+
200+
#[test]
201+
fn invalid_continuation_after_valid_lead_is_replaced() {
202+
let mut dec = Utf8StreamDecoder::new();
203+
// E2 (start of 3-byte) + 0x41 ('A' — not a continuation byte)
204+
let out = dec.decode(&[0xE2, 0x41]);
205+
// E2 is invalid (can't start that codepoint), 'A' is valid.
206+
assert!(out.contains('\u{FFFD}'));
207+
assert!(out.ends_with('A'));
208+
}
209+
210+
#[test]
211+
fn flush_emits_replacement_for_truncated_tail() {
212+
let mut dec = Utf8StreamDecoder::new();
213+
// E2 alone is incomplete — held in buffer, no output yet.
214+
assert_eq!(dec.decode(&[0xE2]), "");
215+
// Flush should emit one replacement character since stream ended mid-codepoint.
216+
assert_eq!(dec.flush(), "\u{FFFD}");
217+
// Subsequent flush is empty.
218+
assert_eq!(dec.flush(), "");
219+
}
220+
221+
#[test]
222+
fn empty_input_is_handled() {
223+
let mut dec = Utf8StreamDecoder::new();
224+
assert_eq!(dec.decode(&[]), "");
225+
assert_eq!(dec.flush(), "");
226+
}
227+
228+
#[test]
229+
fn multiple_incomplete_chunks_combine() {
230+
// Send E2 94 80 (U+2500) byte-by-byte.
231+
let mut dec = Utf8StreamDecoder::new();
232+
assert_eq!(dec.decode(&[0xE2]), "");
233+
assert_eq!(dec.decode(&[0x94]), "");
234+
assert_eq!(dec.decode(&[0x80]), "\u{2500}");
235+
}
236+
237+
#[test]
238+
fn matches_from_utf8_lossy_for_complete_input() {
239+
let inputs: &[&[u8]] = &[
240+
b"plain ascii",
241+
"héllo".as_bytes(),
242+
"中文 box ─ end".as_bytes(),
243+
"emoji 😀 done".as_bytes(),
244+
];
245+
for input in inputs {
246+
let mut dec = Utf8StreamDecoder::new();
247+
let mut out = dec.decode(input);
248+
out.push_str(&dec.flush());
249+
assert_eq!(out, String::from_utf8_lossy(input));
250+
}
251+
}
252+
}

0 commit comments

Comments
 (0)