Skip to content

Commit 1c960f3

Browse files
authored
Preserve raw worker stdin at the server boundary (#72)
* Add Zod readline discard recovery test Teach the standalone Zod worker to emit readline_discard for buffered stdin after an interrupt so stale active-turn input is accounted for before a follow-up tail runs. Validation: cargo check; cargo build; python3 tests/run_integration_tests.py --binary target/debug/mcp-repl; cargo clippy --all-targets --all-features -- -D warnings; cargo test --quiet; cargo +nightly fmt. * Validate protocol session_end reasons Reject worker sideband session_end frames that carry unrecognized reasons, and validate optional session_end message_b64 payloads before accepting the frame as final. Add a Zod test hook and MCP-level regression coverage so malformed protocol-worker session_end data fails fast instead of being treated as a clean session shutdown. Validation: cargo test --quiet --test zod_protocol zod_worker_invalid_session_end_reason_is_protocol_error; cargo test --quiet --test zod_protocol; cargo check; cargo build; python3 tests/run_integration_tests.py --binary target/debug/mcp-repl; cargo clippy --all-targets --all-features -- -D warnings; cargo test --quiet; cargo +nightly fmt * Add Zod shutdown conformance hooks Add slow-shutdown and hang-shutdown commands to the standalone Zod worker so public protocol tests can exercise delayed graceful shutdown and pending shutdown states. Cover both through MCP-level tests and update the active protocol plan command list. Validation: - cargo test --quiet --test zod_protocol shutdown (red before fixture change) - cargo test --quiet --test zod_protocol - cargo check - cargo build - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet - cargo +nightly fmt * Split Zod interrupt observations Add an interrupt-report fixture command that records sideband interrupt notifications separately from OS interrupt delivery. Cover it through the public Zod protocol test and document the command in the active protocol plan. Validation: - cargo +nightly fmt - cargo check - cargo build - cargo test --test zod_protocol zod_worker_reports_sideband_and_os_interrupt_facts -- --nocapture - cargo test --test zod_protocol --quiet - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet * Preserve control-tail newline bytes Consume only the leading Ctrl-C/Ctrl-D byte when splitting control-prefixed input so an immediate newline remains part of the tail payload. Add a public Zod MCP regression that proves the worker observes that newline before the tail command. Update existing tests that intend bare controls to send unterminated control bytes, and update transcript snapshots to show those bare inputs explicitly. Validation: - cargo check - cargo build - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet - cargo +nightly fmt * Preserve protocol worker stdin bytes Move CR/CRLF normalization out of the shared request path and keep it in the built-in R/Python drivers, so custom protocol workers receive the exact client stdin bytes plus the existing single appended LF rule. Add public Zod MCP coverage for supplied CRLF bytes and a trailing bare carriage return. Update the protocol plan to state the byte-preservation contract. Validation: - cargo test --test zod_protocol zod_worker_preserves_crlf_stdin_and_appended_newline -- --exact - cargo test --test write_stdin_edge_cases write_stdin_accepts_crlf_input -- --exact - cargo check - cargo build - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet - cargo +nightly fmt * Fix reset teardown output and Ctrl-C bundle reuse Snapshot pending output before reset shutdown so teardown-only worker output does not leak into the reset reply. Also classify Ctrl-C plus newline/CRLF as follow-up input for timeout bundle reuse; only bare Ctrl-C reuses the full timeout reply. Validation: - cargo check - cargo build - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet - cargo +nightly fmt * Cover reset EOF for active Zod stdin Add a Zod command that blocks on a second stdin read and verify repl_reset closes the active stdin stream without sending interpreter shutdown text. Validation: - cargo check - cargo build - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet - cargo +nightly fmt * Fix Unix Python CRLF stdin normalization Normalize CRLF and bare CR input before the built-in Unix Python backend writes request bytes to its PTY. Keep custom protocol worker payloads byte-preserving by applying the normalization only when the protocol driver is serving the built-in Python bridge. Add a Unix public regression test that sends a CRLF-formatted multiline Python block through the MCP repl tool and asserts it executes without an injected blank line or IndentationError. Review finding: - [P2] Normalize CRLF before Unix Python PTY writes — /Users/tomasz/github/posit-dev/mcp-repl/src/worker_process.rs:2624-2624 On Unix, the built-in Python backend uses `ProtocolBackendDriver`, whose `prepare_input_payload` now preserves stdin bytes. Since the previous normalization was removed before this call, CRLF input is written to the PTY as `\r\n`; the terminal maps `\r` to another newline, so a block like `if True:\r\n print("A")` reaches Python with a blank line and raises `IndentationError`. Custom protocol workers can preserve bytes, but built-in Python still needs newline normalization before writing to the PTY. Response: - Added `python_crlf_multiline_block_executes` to cover CRLF multiline Python input through the public MCP tool path. - Updated `ProtocolBackendDriver::prepare_input_payload` so only the built-in Unix Python bridge normalizes CRLF/CR before PTY writes; custom protocol workers still preserve stdin bytes. Validation: - cargo test --test python_backend python_crlf_multiline_block_executes -- --nocapture (failed before fix, passed after fix) - cargo check - cargo build - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet - cargo +nightly fmt * Fix Zod command fixture argument count Move the shutdown-log path into CommandState so run_command stays within the clippy argument limit after the stdin-close reset coverage was adapted. Validation: - cargo test --test zod_protocol --quiet - cargo test --test python_backend --quiet crlf - cargo test --quiet timeout_bundle_reuse_treats_newline_ctrl_c_as_follow_up_input - cargo check - cargo build - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet - cargo +nightly fmt * Preserve worker stdin bytes at server boundary Keep server-side input payload preparation limited to the MCP UTF-8 string boundary and the explicit final-newline append rule. Backend/runtime-specific CRLF interpretation belongs to the worker instead of the server. This also removes the Unix Python CRLF multiline expectation, keeps CRLF acceptance scoped to Windows, and rewords the Zod protocol regression around raw client byte preservation. Validation: - cargo check - cargo build - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet - cargo +nightly fmt * Fix CI stdin edge cases Normalize CRLF input in the built-in R backend before writing bytes to R so Windows requests do not leave carriage returns in R source. Keep protocol worker byte preservation intact by leaving the protocol driver unchanged. Make the files-mode timeout poll regression deterministic by replacing a short sleep with an explicit file gate before releasing the request. Validation: - cargo check - cargo build - python3 tests/run_integration_tests.py --binary target/debug/mcp-repl - cargo clippy --all-targets --all-features -- -D warnings - cargo test --quiet - cargo +nightly fmt
1 parent 55c7209 commit 1c960f3

22 files changed

Lines changed: 797 additions & 104 deletions

docs/plans/active/worker-server-protocol-zod.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,9 @@ emits an unsatisfied `readline_start`, the server uses that explicit
664664
event to finalize the poll reply.
665665

666666
A later non-empty `repl()` call after an unsatisfied `readline_start` is
667-
written to worker stdin after the same trailing-newline normalization.
667+
written to worker stdin after the same trailing-newline rule: if the input is
668+
non-empty and does not end in `\n`, append one `\n`. The server does not
669+
otherwise canonicalize supplied stdin bytes such as `\r\n` or bare `\r`.
668670
The worker/runtime decides whether those bytes answer an
669671
interpreter-level prompt, continue an incomplete expression, or start a
670672
new top-level evaluation.
@@ -751,8 +753,14 @@ a small deterministic command language through stdin:
751753
bytes.
752754
- `sleep <millis>` delays the next unsatisfied prompt so timeout and
753755
poll behavior can be tested.
754-
- `interruptible <millis>` delays until either the timer finishes or an
755-
OS interrupt arrives.
756+
- `interruptible <millis>` delays until either the timer finishes or a
757+
sideband or OS interrupt arrives.
758+
- `interrupt-report <millis>` delays while recording sideband and OS
759+
interrupt delivery as separate output facts.
760+
- `slow-shutdown <millis>` delays a later `exit` or EOF `session_end`
761+
so reset and shutdown graceful-exit timing can be tested.
762+
- `hang-shutdown` accepts a later `exit` or EOF but never emits
763+
`session_end`, so reset and shutdown OS escalation can be tested.
756764
- `image` emits a tiny deterministic PNG if Zod advertises image
757765
support.
758766
- `exit` emits `session_end`.

src/ipc.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,14 @@ impl ServerIpcConnection {
461461
reason,
462462
message_b64,
463463
} => {
464+
if let Err(err) =
465+
validate_session_end(reason.as_deref(), message_b64.as_deref())
466+
{
467+
let mut guard = reader_inbox.lock().unwrap();
468+
latch_protocol_error(&mut guard, err);
469+
reader_cvar.notify_all();
470+
break;
471+
}
464472
let mut guard = reader_inbox.lock().unwrap();
465473
guard.session_end = true;
466474
guard.session_end_final = true;
@@ -1907,6 +1915,23 @@ fn latch_protocol_error(guard: &mut ServerIpcInbox, message: impl Into<String>)
19071915
});
19081916
}
19091917

1918+
fn validate_session_end(reason: Option<&str>, message_b64: Option<&str>) -> Result<(), String> {
1919+
if let Some(reason) = reason {
1920+
match reason {
1921+
"shutdown" | "reset" | "runtime_exit" | "crash" | "protocol_error" => {}
1922+
other => return Err(format!("invalid session_end reason: {other}")),
1923+
}
1924+
}
1925+
if let Some(message_b64) = message_b64
1926+
&& base64::engine::general_purpose::STANDARD
1927+
.decode(message_b64)
1928+
.is_err()
1929+
{
1930+
return Err("invalid session_end message_b64 base64".to_string());
1931+
}
1932+
Ok(())
1933+
}
1934+
19101935
fn take_latched_protocol_error(guard: &mut ServerIpcInbox) -> Option<String> {
19111936
guard.protocol_error.take().map(|error| error.message)
19121937
}

src/server/response.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -186,15 +186,6 @@ pub(crate) fn timeout_bundle_reuse_for_input(input: &str) -> TimeoutBundleReuse
186186
return TimeoutBundleReuse::FullReply;
187187
};
188188
let tail = &input[first.len_utf8()..];
189-
let tail = if let Some(rest) = tail.strip_prefix("\r\n") {
190-
rest
191-
} else if let Some(rest) = tail.strip_prefix('\n') {
192-
rest
193-
} else if let Some(rest) = tail.strip_prefix('\r') {
194-
rest
195-
} else {
196-
tail
197-
};
198189

199190
match first {
200191
'\u{3}' if tail.is_empty() => TimeoutBundleReuse::FullReply,

src/server/tests.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,22 @@ fn timeout_bundle_reuse_treats_blank_lines_as_fresh_input() {
197197
));
198198
}
199199

200+
#[test]
201+
fn timeout_bundle_reuse_treats_newline_ctrl_c_as_follow_up_input() {
202+
assert!(matches!(
203+
super::response::timeout_bundle_reuse_for_input("\u{3}"),
204+
super::response::TimeoutBundleReuse::FullReply
205+
));
206+
assert!(matches!(
207+
super::response::timeout_bundle_reuse_for_input("\u{3}\n"),
208+
super::response::TimeoutBundleReuse::FollowUpInput
209+
));
210+
assert!(matches!(
211+
super::response::timeout_bundle_reuse_for_input("\u{3}\r\n"),
212+
super::response::TimeoutBundleReuse::FollowUpInput
213+
));
214+
}
215+
200216
fn split_lines(text: &str) -> Vec<String> {
201217
if text.is_empty() {
202218
return Vec::new();

src/worker_process.rs

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ fn driver_refresh_worker_ready(
475475

476476
impl BackendDriver for RBackendDriver {
477477
fn prepare_input_payload(&self, text: &str) -> Vec<u8> {
478-
let mut payload = text.as_bytes().to_vec();
478+
let normalized = normalize_input_newlines(text);
479+
let mut payload = normalized.into_bytes();
479480
if !payload.is_empty() && !payload.ends_with(b"\n") {
480481
payload.push(b'\n');
481482
}
@@ -1234,17 +1235,7 @@ pub(crate) fn split_write_stdin_control_prefix(
12341235
_ => return None,
12351236
};
12361237

1237-
let tail = &input[first.len_utf8()..];
1238-
let tail = if let Some(rest) = tail.strip_prefix("\r\n") {
1239-
rest
1240-
} else if let Some(rest) = tail.strip_prefix('\n') {
1241-
rest
1242-
} else if let Some(rest) = tail.strip_prefix('\r') {
1243-
rest
1244-
} else {
1245-
tail
1246-
};
1247-
Some((action, tail))
1238+
Some((action, &input[first.len_utf8()..]))
12481239
}
12491240

12501241
fn worker_context_event_payload(
@@ -2575,7 +2566,6 @@ impl WorkerManager {
25752566
worker_timeout: Duration,
25762567
server_timeout: Duration,
25772568
) -> Result<RequestState, WorkerError> {
2578-
let text = normalize_input_newlines(&text);
25792569
let started_at = std::time::Instant::now();
25802570
let prompt = self.current_prompt_hint();
25812571
self.remember_prompt(prompt);
@@ -3480,13 +3470,23 @@ impl WorkerManager {
34803470
MISSING_INHERITED_SANDBOX_STATE_MESSAGE.to_string(),
34813471
));
34823472
}
3473+
self.maybe_emit_pending_server_notice();
3474+
let pre_shutdown_output = self
3475+
.process
3476+
.is_some()
3477+
.then(|| self.drain_sealed_formatted_output());
34833478
if let Some(process) = self.process.take() {
34843479
let _ = process.shutdown_graceful(timeout);
3480+
self.pending_output_tape.clear();
34853481
}
34863482
self.guardrail.busy.store(false, Ordering::Relaxed);
3487-
self.maybe_emit_pending_server_notice();
34883483

3489-
let reply = self.build_session_reset_reply_files("new session started");
3484+
let reply = match pre_shutdown_output {
3485+
Some(output) => {
3486+
self.build_session_reset_reply_files_from_formatted("new session started", output)
3487+
}
3488+
None => self.build_session_reset_reply_files("new session started"),
3489+
};
34903490
self.clear_preserved_prefixes();
34913491
self.reset_output_state_files(true);
34923492
self.note_respawn_during_write();
@@ -3653,14 +3653,32 @@ impl WorkerManager {
36533653
MISSING_INHERITED_SANDBOX_STATE_MESSAGE.to_string(),
36543654
));
36553655
}
3656+
self.maybe_emit_pending_server_notice();
3657+
let pre_shutdown_end_offset = self
3658+
.process
3659+
.is_some()
3660+
.then(|| self.output.end_offset().unwrap_or(0));
36563661
if let Some(process) = self.process.take() {
36573662
let _ = process.shutdown_graceful(timeout);
36583663
}
3664+
let post_shutdown_end_offset = self.output.end_offset();
36593665
self.guardrail.busy.store(false, Ordering::Relaxed);
3660-
self.maybe_emit_pending_server_notice();
36613666

36623667
let page_bytes = pager::resolve_page_bytes(None);
3663-
let reply = self.build_session_reset_reply_pager(page_bytes, "new session started");
3668+
let reply = match pre_shutdown_end_offset {
3669+
Some(end_offset) => {
3670+
let reply = self.build_session_reset_reply_pager_to_offset(
3671+
page_bytes,
3672+
"new session started",
3673+
end_offset,
3674+
);
3675+
if let Some(end_offset) = post_shutdown_end_offset {
3676+
self.output.advance_offset_to(end_offset);
3677+
}
3678+
reply
3679+
}
3680+
None => self.build_session_reset_reply_pager(page_bytes, "new session started"),
3681+
};
36643682
self.clear_preserved_prefixes();
36653683
self.reset_output_state_pager(true, false);
36663684
self.note_respawn_during_write();
@@ -4586,10 +4604,19 @@ impl WorkerManager {
45864604
}
45874605

45884606
fn build_session_reset_reply_files(&mut self, meta: &str) -> ReplyWithOffset {
4607+
let formatted = self.drain_sealed_formatted_output();
4608+
self.build_session_reset_reply_files_from_formatted(meta, formatted)
4609+
}
4610+
4611+
fn build_session_reset_reply_files_from_formatted(
4612+
&mut self,
4613+
meta: &str,
4614+
formatted: FormattedPendingOutput,
4615+
) -> ReplyWithOffset {
45894616
let FormattedPendingOutput {
45904617
mut contents,
45914618
saw_stderr,
4592-
} = self.drain_sealed_formatted_output();
4619+
} = formatted;
45934620
contents.retain(|content| match content {
45944621
WorkerContent::ContentText { text, .. } => !text.trim().is_empty(),
45954622
_ => true,
@@ -4613,6 +4640,15 @@ impl WorkerManager {
46134640

46144641
fn build_session_reset_reply_pager(&mut self, page_bytes: u64, meta: &str) -> ReplyWithOffset {
46154642
let end_offset = self.output.end_offset().unwrap_or(0);
4643+
self.build_session_reset_reply_pager_to_offset(page_bytes, meta, end_offset)
4644+
}
4645+
4646+
fn build_session_reset_reply_pager_to_offset(
4647+
&mut self,
4648+
page_bytes: u64,
4649+
meta: &str,
4650+
end_offset: u64,
4651+
) -> ReplyWithOffset {
46164652
let mut is_error = false;
46174653

46184654
let SnapshotWithImages {
@@ -7603,11 +7639,11 @@ mod tests {
76037639
}
76047640

76057641
#[test]
7606-
fn control_prefix_strips_single_separator_newline() {
7642+
fn control_prefix_preserves_immediate_newline_tail() {
76077643
let (action, remaining) =
76087644
split_write_stdin_control_prefix("\u{4}\nprint(1)").expect("expected control prefix");
76097645
assert!(matches!(action, WriteStdinControlAction::Restart));
7610-
assert_eq!(remaining, "print(1)");
7646+
assert_eq!(remaining, "\nprint(1)");
76117647
}
76127648

76137649
#[test]

tests/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ snapshot.session("default", mcp_script! {
2525
write_stdin("x <- 1");
2626
write_stdin("x <- x + 2");
2727
write_stdin("x", timeout = 0.2);
28-
write_stdin("\u{4}");
28+
write_stdin_raw_unterminated("\u{4}");
2929
}).await?;
3030
```

tests/common/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,21 @@ macro_rules! mcp_calls_inner {
279279
$crate::mcp_calls_inner!($session, $($rest)*);
280280
}};
281281

282+
($session:ident, write_stdin_raw_unterminated($input:expr $(, timeout = $timeout:expr)? ); $($rest:tt)*) => {{
283+
let mut args = serde_json::Map::new();
284+
args.insert("input".to_string(), serde_json::Value::String($input.to_string()));
285+
if let Some(timeout) = $crate::common::normalized_test_timeout($crate::mcp_timeout_opt!($($timeout)?)) {
286+
args.insert(
287+
"timeout_ms".to_string(),
288+
serde_json::json!((timeout * 1000.0).round() as i64),
289+
);
290+
}
291+
$session
292+
.call_tool($session.repl_tool_name(), serde_json::Value::Object(args))
293+
.await;
294+
$crate::mcp_calls_inner!($session, $($rest)*);
295+
}};
296+
282297
}
283298

284299
#[macro_export]
@@ -440,7 +455,7 @@ fn compact_json(value: &Value) -> String {
440455
serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
441456
}
442457

443-
fn normalized_test_timeout(timeout: Option<f64>) -> Option<f64> {
458+
pub(crate) fn normalized_test_timeout(timeout: Option<f64>) -> Option<f64> {
444459
#[cfg(windows)]
445460
{
446461
timeout.map(|value| value.min(WINDOWS_TEST_TIMEOUT_CAP_SECS))

0 commit comments

Comments
 (0)