Skip to content

Commit 62ab341

Browse files
committed
Centralize stdin control semantics
1 parent 23da2b7 commit 62ab341

13 files changed

Lines changed: 185 additions & 100 deletions

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod r_session;
2323
mod sandbox;
2424
mod sandbox_cli;
2525
mod server;
26+
mod stdin_payload;
2627
#[cfg(target_os = "windows")]
2728
mod windows_sandbox;
2829
mod worker;

src/server/response.rs

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use rmcp::model::{
1111
use serde_json::Value;
1212
use tempfile::Builder;
1313

14+
pub(crate) use crate::stdin_payload::{TimeoutBundleReuse, timeout_bundle_reuse_for_input};
1415
use crate::worker_process::WorkerError;
1516
use crate::worker_protocol::{
1617
ContentOrigin, TextStream, WorkerContent, WorkerErrorCode, WorkerReply,
@@ -170,40 +171,6 @@ struct TimeoutReplyView<'a> {
170171
protected_bundle_id: Option<u64>,
171172
}
172173

173-
#[derive(Clone, Copy)]
174-
pub(crate) enum TimeoutBundleReuse {
175-
None,
176-
FullReply,
177-
FollowUpInput,
178-
}
179-
180-
pub(crate) fn timeout_bundle_reuse_for_input(input: &str) -> TimeoutBundleReuse {
181-
if input.is_empty() {
182-
return TimeoutBundleReuse::FullReply;
183-
}
184-
185-
let Some(first) = input.chars().next() else {
186-
return TimeoutBundleReuse::FullReply;
187-
};
188-
let tail = &input[first.len_utf8()..];
189-
let tail = if let Some(rest) = tail.strip_prefix("\r\n") {
190-
rest
191-
} else if let Some(rest) = tail.strip_prefix('\n') {
192-
rest
193-
} else if let Some(rest) = tail.strip_prefix('\r') {
194-
rest
195-
} else {
196-
tail
197-
};
198-
199-
match first {
200-
'\u{3}' if tail.is_empty() => TimeoutBundleReuse::FullReply,
201-
'\u{3}' => TimeoutBundleReuse::FollowUpInput,
202-
'\u{4}' => TimeoutBundleReuse::None,
203-
_ => TimeoutBundleReuse::FollowUpInput,
204-
}
205-
}
206-
207174
impl ResponseState {
208175
pub(crate) fn new() -> Result<Self, WorkerError> {
209176
Ok(Self {

src/server/tests.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,22 @@ fn timeout_bundle_reuse_treats_blank_lines_as_fresh_input() {
195195
super::response::timeout_bundle_reuse_for_input("\r\n"),
196196
super::response::TimeoutBundleReuse::FollowUpInput
197197
));
198+
assert!(matches!(
199+
super::response::timeout_bundle_reuse_for_input("\u{3}"),
200+
super::response::TimeoutBundleReuse::FullReply
201+
));
202+
assert!(matches!(
203+
super::response::timeout_bundle_reuse_for_input("\u{3}\n"),
204+
super::response::TimeoutBundleReuse::FollowUpInput
205+
));
206+
assert!(matches!(
207+
super::response::timeout_bundle_reuse_for_input("\u{3}\r\n"),
208+
super::response::TimeoutBundleReuse::FollowUpInput
209+
));
210+
assert!(matches!(
211+
super::response::timeout_bundle_reuse_for_input("\u{4}"),
212+
super::response::TimeoutBundleReuse::None
213+
));
198214
}
199215

200216
fn split_lines(text: &str) -> Vec<String> {

src/stdin_payload.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2+
pub(crate) enum WriteStdinControlAction {
3+
Interrupt,
4+
Restart,
5+
}
6+
7+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8+
pub(crate) enum TimeoutBundleReuse {
9+
None,
10+
FullReply,
11+
FollowUpInput,
12+
}
13+
14+
pub(crate) fn prepare_worker_stdin_payload(input: &str) -> Vec<u8> {
15+
let mut payload = input.as_bytes().to_vec();
16+
if !payload.is_empty() && !payload.ends_with(b"\n") {
17+
payload.push(b'\n');
18+
}
19+
payload
20+
}
21+
22+
pub(crate) fn split_write_stdin_control_prefix(
23+
input: &str,
24+
) -> Option<(WriteStdinControlAction, &str)> {
25+
let first = input.chars().next()?;
26+
let action = match first {
27+
'\u{3}' => WriteStdinControlAction::Interrupt,
28+
'\u{4}' => WriteStdinControlAction::Restart,
29+
_ => return None,
30+
};
31+
Some((action, &input[first.len_utf8()..]))
32+
}
33+
34+
pub(crate) fn timeout_bundle_reuse_for_input(input: &str) -> TimeoutBundleReuse {
35+
if input.is_empty() {
36+
return TimeoutBundleReuse::FullReply;
37+
}
38+
39+
match split_write_stdin_control_prefix(input) {
40+
Some((WriteStdinControlAction::Interrupt, "")) => TimeoutBundleReuse::FullReply,
41+
Some((WriteStdinControlAction::Interrupt, _)) => TimeoutBundleReuse::FollowUpInput,
42+
Some((WriteStdinControlAction::Restart, _)) => TimeoutBundleReuse::None,
43+
None => TimeoutBundleReuse::FollowUpInput,
44+
}
45+
}

src/worker_process.rs

Lines changed: 15 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ use crate::sandbox_cli::{
5151
resolve_effective_sandbox_state_with_defaults, sandbox_plan_requests_inherited_state,
5252
validate_sandbox_plan_with_defaults,
5353
};
54+
use crate::stdin_payload::prepare_worker_stdin_payload;
55+
pub(crate) use crate::stdin_payload::{WriteStdinControlAction, split_write_stdin_control_prefix};
5456
use crate::worker_protocol::{
5557
ContentOrigin, TextStream, WORKER_MODE_ARG, WorkerContent, WorkerErrorCode, WorkerReply,
5658
};
@@ -275,7 +277,14 @@ fn prechecked_follow_up_requires_meta_error() -> WorkerError {
275277
}
276278

277279
trait BackendDriver: Send {
278-
fn prepare_input_payload(&self, text: &str) -> Vec<u8>;
280+
fn prepare_input_text(&self, text: String) -> String {
281+
text
282+
}
283+
284+
fn prepare_input_payload(&self, text: &str) -> Vec<u8> {
285+
prepare_worker_stdin_payload(text)
286+
}
287+
279288
fn on_input_start(
280289
&mut self,
281290
text: &str,
@@ -473,12 +482,8 @@ fn driver_refresh_worker_ready(
473482
}
474483

475484
impl BackendDriver for RBackendDriver {
476-
fn prepare_input_payload(&self, text: &str) -> Vec<u8> {
477-
let mut payload = text.as_bytes().to_vec();
478-
if !payload.is_empty() && !payload.ends_with(b"\n") {
479-
payload.push(b'\n');
480-
}
481-
payload
485+
fn prepare_input_text(&self, text: String) -> String {
486+
normalize_input_newlines(&text)
482487
}
483488

484489
fn on_input_start(
@@ -850,14 +855,6 @@ fn strip_one_line_ending(text: &str) -> Option<&str> {
850855

851856
#[cfg(not(target_family = "unix"))]
852857
impl BackendDriver for PythonBackendDriver {
853-
fn prepare_input_payload(&self, text: &str) -> Vec<u8> {
854-
let mut payload = text.as_bytes().to_vec();
855-
if !payload.is_empty() && !payload.ends_with(b"\n") {
856-
payload.push(b'\n');
857-
}
858-
payload
859-
}
860-
861858
fn on_input_start(
862859
&mut self,
863860
text: &str,
@@ -934,14 +931,6 @@ impl ProtocolBackendDriver {
934931
}
935932

936933
impl BackendDriver for ProtocolBackendDriver {
937-
fn prepare_input_payload(&self, text: &str) -> Vec<u8> {
938-
let mut payload = text.as_bytes().to_vec();
939-
if !payload.is_empty() && !payload.ends_with(b"\n") {
940-
payload.push(b'\n');
941-
}
942-
payload
943-
}
944-
945934
fn on_input_start(
946935
&mut self,
947936
_text: &str,
@@ -1196,12 +1185,6 @@ fn completion_info_from_ipc(
11961185

11971186
const DEFERRED_SANDBOX_UPDATE_TIMEOUT: Duration = Duration::from_secs(5);
11981187

1199-
#[derive(Clone, Copy)]
1200-
pub(crate) enum WriteStdinControlAction {
1201-
Interrupt,
1202-
Restart,
1203-
}
1204-
12051188
#[derive(Debug, Clone, Default)]
12061189
pub(crate) struct WriteStdinOptions {
12071190
pub page_bytes_override: Option<u64>,
@@ -1223,29 +1206,6 @@ impl WriteStdinOptions {
12231206
}
12241207
}
12251208

1226-
pub(crate) fn split_write_stdin_control_prefix(
1227-
input: &str,
1228-
) -> Option<(WriteStdinControlAction, &str)> {
1229-
let first = input.chars().next()?;
1230-
let action = match first {
1231-
'\u{3}' => WriteStdinControlAction::Interrupt,
1232-
'\u{4}' => WriteStdinControlAction::Restart,
1233-
_ => return None,
1234-
};
1235-
1236-
let tail = &input[first.len_utf8()..];
1237-
let tail = if let Some(rest) = tail.strip_prefix("\r\n") {
1238-
rest
1239-
} else if let Some(rest) = tail.strip_prefix('\n') {
1240-
rest
1241-
} else if let Some(rest) = tail.strip_prefix('\r') {
1242-
rest
1243-
} else {
1244-
tail
1245-
};
1246-
Some((action, tail))
1247-
}
1248-
12491209
fn worker_context_event_payload(
12501210
worker_launch: &WorkerLaunch,
12511211
backend: Backend,
@@ -2574,7 +2534,7 @@ impl WorkerManager {
25742534
worker_timeout: Duration,
25752535
server_timeout: Duration,
25762536
) -> Result<RequestState, WorkerError> {
2577-
let text = normalize_input_newlines(&text);
2537+
let text = self.driver.prepare_input_text(text);
25782538
let started_at = std::time::Instant::now();
25792539
let prompt = self.current_prompt_hint();
25802540
self.remember_prompt(prompt);
@@ -7605,11 +7565,11 @@ mod tests {
76057565
}
76067566

76077567
#[test]
7608-
fn control_prefix_strips_single_separator_newline() {
7568+
fn control_prefix_preserves_immediate_newline_tail() {
76097569
let (action, remaining) =
76107570
split_write_stdin_control_prefix("\u{4}\nprint(1)").expect("expected control prefix");
76117571
assert!(matches!(action, WriteStdinControlAction::Restart));
7612-
assert_eq!(remaining, "print(1)");
7572+
assert_eq!(remaining, "\nprint(1)");
76137573
}
76147574

76157575
#[test]

tests/common/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ impl McpTestSession {
669669
#[allow(dead_code)]
670670
pub async fn write_stdin_with(&mut self, input: impl Into<String>, timeout: Option<f64>) {
671671
let mut input = input.into();
672-
if !input.ends_with('\n') {
672+
if test_input_needs_trailing_newline(&input) {
673673
input.push('\n');
674674
}
675675
let timeout = normalized_test_timeout(timeout);
@@ -797,7 +797,7 @@ impl McpTestSession {
797797
meta: Option<Value>,
798798
) -> Result<rmcp::model::CallToolResult, ServiceError> {
799799
let mut input = input.into();
800-
if !input.is_empty() && !input.ends_with('\n') {
800+
if !input.is_empty() && test_input_needs_trailing_newline(&input) {
801801
input.push('\n');
802802
}
803803
let timeout = normalized_test_timeout(timeout);
@@ -872,6 +872,10 @@ impl McpTestSession {
872872
}
873873
}
874874

875+
fn test_input_needs_trailing_newline(input: &str) -> bool {
876+
!input.ends_with('\n') && !matches!(input, "\u{3}" | "\u{4}")
877+
}
878+
875879
pub struct McpSnapshot {
876880
sessions: Vec<(String, Vec<SnapshotStep>)>,
877881
}

tests/fixtures/zod-worker.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
6363
let mut line = String::new();
6464
let mut next_prompt = "zod> ".to_string();
6565
let mut timeline = Timeline::default();
66+
let mut line_number = 0_u64;
6667
loop {
6768
line.clear();
6869
let bytes = reader.read_line(&mut line)?;
6970
if bytes == 0 {
7071
send_session_end(&writer, &mut timeline, "shutdown")?;
7172
return Ok(());
7273
}
74+
line_number += 1;
7375

7476
let command = line.trim_end_matches(['\r', '\n']);
7577
let reported_input = if let Some(text) = command.strip_prefix("misreport-input ") {
@@ -96,6 +98,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
9698
&interrupted,
9799
command,
98100
&line,
101+
line_number,
99102
&mut next_prompt,
100103
&mut timeline,
101104
)?;
@@ -113,6 +116,7 @@ fn run_command(
113116
interrupted: &AtomicBool,
114117
command: &str,
115118
raw_line: &str,
119+
line_number: u64,
116120
next_prompt: &mut String,
117121
timeline: &mut Timeline,
118122
) -> io::Result<()> {
@@ -141,6 +145,15 @@ fn run_command(
141145
return Ok(());
142146
}
143147

148+
if command.starts_with("raw-line-escape") {
149+
let escaped = escape_bytes(raw_line.as_bytes());
150+
writer.output_text(
151+
"stdout",
152+
format!("raw-line[{line_number}]={escaped}\n").as_bytes(),
153+
)?;
154+
return Ok(());
155+
}
156+
144157
if let Some(millis) = command.strip_prefix("prompt-then-sleep ") {
145158
writer.send(&WorkerToServer::ReadlineStart {
146159
prompt: "buffered> ".to_string(),
@@ -199,6 +212,21 @@ fn run_command(
199212
writer.output_text("stdout", raw_line.as_bytes())
200213
}
201214

215+
fn escape_bytes(bytes: &[u8]) -> String {
216+
let mut escaped = String::new();
217+
for byte in bytes {
218+
match byte {
219+
b'\n' => escaped.push_str("\\n"),
220+
b'\r' => escaped.push_str("\\r"),
221+
b'\t' => escaped.push_str("\\t"),
222+
b'\\' => escaped.push_str("\\\\"),
223+
b' '..=b'~' => escaped.push(char::from(*byte)),
224+
_ => escaped.push_str(&format!("\\x{byte:02x}")),
225+
}
226+
}
227+
escaped
228+
}
229+
202230
fn send_readline_start(
203231
writer: &IpcWriter,
204232
timeline: &mut Timeline,

tests/snapshots/mcp_transcripts__snapshots_interrupt_handler_output.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ call:
2828
{
2929
"tool": "r_repl",
3030
"arguments": {
31-
"input": "\u0003\n",
31+
"input": "\u0003",
3232
"timeout_ms": 5000
3333
}
3434
}

tests/snapshots/mcp_transcripts__snapshots_support_multiple_calls_and_sessions.snap

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ call:
5252
{
5353
"tool": "r_repl",
5454
"arguments": {
55-
"input": "\u0004\n"
55+
"input": "\u0004"
5656
}
5757
}
5858
response:

tests/snapshots/mcp_transcripts__snapshots_tempdir_session_restart.snap

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ call:
3232
{
3333
"tool": "r_repl",
3434
"arguments": {
35-
"input": "\u0004\n"
35+
"input": "\u0004"
3636
}
3737
}
3838
response:
@@ -75,7 +75,7 @@ call:
7575
{
7676
"tool": "r_repl",
7777
"arguments": {
78-
"input": "\u0004\n"
78+
"input": "\u0004"
7979
}
8080
}
8181
response:

0 commit comments

Comments
 (0)