Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions docs/plans/active/worker-server-protocol-zod.md
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,9 @@ emits an unsatisfied `readline_start`, the server uses that explicit
event to finalize the poll reply.

A later non-empty `repl()` call after an unsatisfied `readline_start` is
written to worker stdin after the same trailing-newline normalization.
written to worker stdin after the same trailing-newline rule: if the input is
non-empty and does not end in `\n`, append one `\n`. The server does not
otherwise canonicalize supplied stdin bytes such as `\r\n` or bare `\r`.
The worker/runtime decides whether those bytes answer an
interpreter-level prompt, continue an incomplete expression, or start a
new top-level evaluation.
Expand Down Expand Up @@ -751,8 +753,14 @@ a small deterministic command language through stdin:
bytes.
- `sleep <millis>` delays the next unsatisfied prompt so timeout and
poll behavior can be tested.
- `interruptible <millis>` delays until either the timer finishes or an
OS interrupt arrives.
- `interruptible <millis>` delays until either the timer finishes or a
sideband or OS interrupt arrives.
- `interrupt-report <millis>` delays while recording sideband and OS
interrupt delivery as separate output facts.
- `slow-shutdown <millis>` delays a later `exit` or EOF `session_end`
so reset and shutdown graceful-exit timing can be tested.
- `hang-shutdown` accepts a later `exit` or EOF but never emits
`session_end`, so reset and shutdown OS escalation can be tested.
- `image` emits a tiny deterministic PNG if Zod advertises image
support.
- `exit` emits `session_end`.
Expand Down
25 changes: 25 additions & 0 deletions src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,14 @@ impl ServerIpcConnection {
reason,
message_b64,
} => {
if let Err(err) =
validate_session_end(reason.as_deref(), message_b64.as_deref())
{
let mut guard = reader_inbox.lock().unwrap();
latch_protocol_error(&mut guard, err);
reader_cvar.notify_all();
break;
}
let mut guard = reader_inbox.lock().unwrap();
guard.session_end = true;
guard.session_end_final = true;
Expand Down Expand Up @@ -1907,6 +1915,23 @@ fn latch_protocol_error(guard: &mut ServerIpcInbox, message: impl Into<String>)
});
}

fn validate_session_end(reason: Option<&str>, message_b64: Option<&str>) -> Result<(), String> {
if let Some(reason) = reason {
match reason {
"shutdown" | "reset" | "runtime_exit" | "crash" | "protocol_error" => {}
other => return Err(format!("invalid session_end reason: {other}")),
}
}
if let Some(message_b64) = message_b64
&& base64::engine::general_purpose::STANDARD
.decode(message_b64)
.is_err()
{
return Err("invalid session_end message_b64 base64".to_string());
}
Ok(())
}

fn take_latched_protocol_error(guard: &mut ServerIpcInbox) -> Option<String> {
guard.protocol_error.take().map(|error| error.message)
}
Expand Down
9 changes: 0 additions & 9 deletions src/server/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,6 @@ pub(crate) fn timeout_bundle_reuse_for_input(input: &str) -> TimeoutBundleReuse
return TimeoutBundleReuse::FullReply;
};
let tail = &input[first.len_utf8()..];
let tail = if let Some(rest) = tail.strip_prefix("\r\n") {
rest
} else if let Some(rest) = tail.strip_prefix('\n') {
rest
} else if let Some(rest) = tail.strip_prefix('\r') {
rest
} else {
tail
};

match first {
'\u{3}' if tail.is_empty() => TimeoutBundleReuse::FullReply,
Expand Down
16 changes: 16 additions & 0 deletions src/server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ fn timeout_bundle_reuse_treats_blank_lines_as_fresh_input() {
));
}

#[test]
fn timeout_bundle_reuse_treats_newline_ctrl_c_as_follow_up_input() {
assert!(matches!(
super::response::timeout_bundle_reuse_for_input("\u{3}"),
super::response::TimeoutBundleReuse::FullReply
));
assert!(matches!(
super::response::timeout_bundle_reuse_for_input("\u{3}\n"),
super::response::TimeoutBundleReuse::FollowUpInput
));
assert!(matches!(
super::response::timeout_bundle_reuse_for_input("\u{3}\r\n"),
super::response::TimeoutBundleReuse::FollowUpInput
));
}

fn split_lines(text: &str) -> Vec<String> {
if text.is_empty() {
return Vec::new();
Expand Down
76 changes: 56 additions & 20 deletions src/worker_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ fn driver_refresh_worker_ready(

impl BackendDriver for RBackendDriver {
fn prepare_input_payload(&self, text: &str) -> Vec<u8> {
let mut payload = text.as_bytes().to_vec();
let normalized = normalize_input_newlines(text);
let mut payload = normalized.into_bytes();
if !payload.is_empty() && !payload.ends_with(b"\n") {
payload.push(b'\n');
}
Expand Down Expand Up @@ -1234,17 +1235,7 @@ pub(crate) fn split_write_stdin_control_prefix(
_ => return None,
};

let tail = &input[first.len_utf8()..];
let tail = if let Some(rest) = tail.strip_prefix("\r\n") {
rest
} else if let Some(rest) = tail.strip_prefix('\n') {
rest
} else if let Some(rest) = tail.strip_prefix('\r') {
rest
} else {
tail
};
Some((action, tail))
Some((action, &input[first.len_utf8()..]))
}

fn worker_context_event_payload(
Expand Down Expand Up @@ -2575,7 +2566,6 @@ impl WorkerManager {
worker_timeout: Duration,
server_timeout: Duration,
) -> Result<RequestState, WorkerError> {
let text = normalize_input_newlines(&text);
let started_at = std::time::Instant::now();
let prompt = self.current_prompt_hint();
self.remember_prompt(prompt);
Expand Down Expand Up @@ -3480,13 +3470,23 @@ impl WorkerManager {
MISSING_INHERITED_SANDBOX_STATE_MESSAGE.to_string(),
));
}
self.maybe_emit_pending_server_notice();
let pre_shutdown_output = self
.process
.is_some()
.then(|| self.drain_sealed_formatted_output());
if let Some(process) = self.process.take() {
let _ = process.shutdown_graceful(timeout);
self.pending_output_tape.clear();
}
self.guardrail.busy.store(false, Ordering::Relaxed);
self.maybe_emit_pending_server_notice();

let reply = self.build_session_reset_reply_files("new session started");
let reply = match pre_shutdown_output {
Some(output) => {
self.build_session_reset_reply_files_from_formatted("new session started", output)
}
None => self.build_session_reset_reply_files("new session started"),
};
self.clear_preserved_prefixes();
self.reset_output_state_files(true);
self.note_respawn_during_write();
Expand Down Expand Up @@ -3653,14 +3653,32 @@ impl WorkerManager {
MISSING_INHERITED_SANDBOX_STATE_MESSAGE.to_string(),
));
}
self.maybe_emit_pending_server_notice();
let pre_shutdown_end_offset = self
.process
.is_some()
.then(|| self.output.end_offset().unwrap_or(0));
if let Some(process) = self.process.take() {
let _ = process.shutdown_graceful(timeout);
}
let post_shutdown_end_offset = self.output.end_offset();
self.guardrail.busy.store(false, Ordering::Relaxed);
self.maybe_emit_pending_server_notice();

let page_bytes = pager::resolve_page_bytes(None);
let reply = self.build_session_reset_reply_pager(page_bytes, "new session started");
let reply = match pre_shutdown_end_offset {
Some(end_offset) => {
let reply = self.build_session_reset_reply_pager_to_offset(
page_bytes,
"new session started",
end_offset,
);
if let Some(end_offset) = post_shutdown_end_offset {
self.output.advance_offset_to(end_offset);
}
reply
}
None => self.build_session_reset_reply_pager(page_bytes, "new session started"),
};
self.clear_preserved_prefixes();
self.reset_output_state_pager(true, false);
self.note_respawn_during_write();
Expand Down Expand Up @@ -4586,10 +4604,19 @@ impl WorkerManager {
}

fn build_session_reset_reply_files(&mut self, meta: &str) -> ReplyWithOffset {
let formatted = self.drain_sealed_formatted_output();
self.build_session_reset_reply_files_from_formatted(meta, formatted)
}

fn build_session_reset_reply_files_from_formatted(
&mut self,
meta: &str,
formatted: FormattedPendingOutput,
) -> ReplyWithOffset {
let FormattedPendingOutput {
mut contents,
saw_stderr,
} = self.drain_sealed_formatted_output();
} = formatted;
contents.retain(|content| match content {
WorkerContent::ContentText { text, .. } => !text.trim().is_empty(),
_ => true,
Expand All @@ -4613,6 +4640,15 @@ impl WorkerManager {

fn build_session_reset_reply_pager(&mut self, page_bytes: u64, meta: &str) -> ReplyWithOffset {
let end_offset = self.output.end_offset().unwrap_or(0);
self.build_session_reset_reply_pager_to_offset(page_bytes, meta, end_offset)
}

fn build_session_reset_reply_pager_to_offset(
&mut self,
page_bytes: u64,
meta: &str,
end_offset: u64,
) -> ReplyWithOffset {
let mut is_error = false;

let SnapshotWithImages {
Expand Down Expand Up @@ -7603,11 +7639,11 @@ mod tests {
}

#[test]
fn control_prefix_strips_single_separator_newline() {
fn control_prefix_preserves_immediate_newline_tail() {
let (action, remaining) =
split_write_stdin_control_prefix("\u{4}\nprint(1)").expect("expected control prefix");
assert!(matches!(action, WriteStdinControlAction::Restart));
assert_eq!(remaining, "print(1)");
assert_eq!(remaining, "\nprint(1)");
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ snapshot.session("default", mcp_script! {
write_stdin("x <- 1");
write_stdin("x <- x + 2");
write_stdin("x", timeout = 0.2);
write_stdin("\u{4}");
write_stdin_raw_unterminated("\u{4}");
}).await?;
```
17 changes: 16 additions & 1 deletion tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,21 @@ macro_rules! mcp_calls_inner {
$crate::mcp_calls_inner!($session, $($rest)*);
}};

($session:ident, write_stdin_raw_unterminated($input:expr $(, timeout = $timeout:expr)? ); $($rest:tt)*) => {{
let mut args = serde_json::Map::new();
args.insert("input".to_string(), serde_json::Value::String($input.to_string()));
if let Some(timeout) = $crate::common::normalized_test_timeout($crate::mcp_timeout_opt!($($timeout)?)) {
args.insert(
"timeout_ms".to_string(),
serde_json::json!((timeout * 1000.0).round() as i64),
);
}
$session
.call_tool($session.repl_tool_name(), serde_json::Value::Object(args))
.await;
$crate::mcp_calls_inner!($session, $($rest)*);
}};

}

#[macro_export]
Expand Down Expand Up @@ -440,7 +455,7 @@ fn compact_json(value: &Value) -> String {
serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
}

fn normalized_test_timeout(timeout: Option<f64>) -> Option<f64> {
pub(crate) fn normalized_test_timeout(timeout: Option<f64>) -> Option<f64> {
#[cfg(windows)]
{
timeout.map(|value| value.min(WINDOWS_TEST_TIMEOUT_CAP_SECS))
Expand Down
Loading
Loading