diff --git a/crates/amalthea/src/fixtures/dummy_frontend.rs b/crates/amalthea/src/fixtures/dummy_frontend.rs index 9b979de89..193bc0152 100644 --- a/crates/amalthea/src/fixtures/dummy_frontend.rs +++ b/crates/amalthea/src/fixtures/dummy_frontend.rs @@ -14,11 +14,13 @@ use crate::registration_file::RegistrationFile; use crate::session::Session; use crate::socket::Socket; use crate::wire::comm_msg::CommWireMsg; +use crate::wire::debug_request::DebugRequest; use crate::wire::execute_input::ExecuteInput; use crate::wire::execute_request::ExecuteRequest; use crate::wire::execute_request::ExecuteRequestPositron; use crate::wire::handshake_reply::HandshakeReply; use crate::wire::input_reply::InputReply; +use crate::wire::interrupt_request::InterruptRequest; use crate::wire::jupyter_message::JupyterMessage; use crate::wire::jupyter_message::Message; use crate::wire::jupyter_message::ProtocolMessage; @@ -235,6 +237,10 @@ impl DummyFrontend { self.send_control(ShutdownRequest { restart }) } + pub fn send_interrupt_request(&self) -> String { + self.send_control(InterruptRequest {}) + } + pub fn send_execute_request(&self, code: &str, options: ExecuteRequestOptions) -> String { self.send_shell(ExecuteRequest { code: String::from(code), @@ -247,6 +253,56 @@ impl DummyFrontend { }) } + /// Send an execute request with custom metadata (e.g. `cellId` for notebook debugging). + pub fn send_execute_request_with_metadata( + &self, + code: &str, + options: ExecuteRequestOptions, + metadata: serde_json::Value, + ) -> String { + Self::send_with_metadata( + &self.shell_socket, + &self.session, + ExecuteRequest { + code: String::from(code), + silent: false, + store_history: true, + user_expressions: serde_json::Value::Null, + allow_stdin: options.allow_stdin, + stop_on_error: false, + positron: options.positron, + }, + metadata, + ) + } + + /// Send a DAP request wrapped in a Jupyter `debug_request` on the control channel. + pub fn send_debug_request(&self, dap_request: serde_json::Value) -> String { + self.send_control(DebugRequest { + content: dap_request, + }) + } + + /// Receive a `debug_reply` from the control channel. + #[track_caller] + pub fn recv_debug_reply(&self) -> serde_json::Value { + let msg = Self::recv(&self.control_socket); + match msg { + Message::DebugReply(msg) => msg.content.content, + other => panic!("Expected DebugReply, got {other:?}"), + } + } + + /// Receive a `debug_event` from the IOPub channel. + #[track_caller] + pub fn recv_iopub_debug_event(&self) -> serde_json::Value { + let msg = Self::recv(&self.iopub_socket); + match msg { + Message::DebugEvent(msg) => msg.content.content, + other => panic!("Expected DebugEvent, got {other:?}"), + } + } + /// Sends a Jupyter message on the Stdin socket pub fn send_stdin(&self, msg: T) { Self::send(&self.stdin_socket, &self.session, msg); @@ -259,6 +315,19 @@ impl DummyFrontend { id } + fn send_with_metadata( + socket: &Socket, + session: &Session, + msg: T, + metadata: serde_json::Value, + ) -> String { + let mut message = JupyterMessage::create(msg, None, session); + message.metadata = metadata; + let id = message.header.msg_id.clone(); + message.send(socket).unwrap(); + id + } + #[track_caller] pub fn recv(socket: &Socket) -> Message { // It's important to wait with a timeout because the kernel thread might have @@ -312,6 +381,15 @@ impl DummyFrontend { }) } + /// Receive from Control and assert `InterruptReply` message. + #[track_caller] + pub fn recv_control_interrupt_reply(&self) { + let message = self.recv_control(); + assert_matches!(message, Message::InterruptReply(message) => { + assert_eq!(message.content.status, Status::Ok); + }); + } + /// Receive from Shell and assert `ExecuteReply` message. /// Returns `execution_count`. #[track_caller] diff --git a/crates/amalthea/src/kernel.rs b/crates/amalthea/src/kernel.rs index 4e5862be3..e816723c7 100644 --- a/crates/amalthea/src/kernel.rs +++ b/crates/amalthea/src/kernel.rs @@ -54,7 +54,7 @@ pub enum StreamBehavior { /// Handler implementations provided by the language runtime. pub struct Handlers { pub shell_handler: Box, - pub control_handler: Arc>, + pub control_handler: Box, pub server_handlers: HashMap>>, } @@ -379,7 +379,7 @@ pub fn read_connection(connection_file: &str) -> (ConnectionFile, Option, - handler: Arc>, + handler: Box, stdin_interrupt_tx: Sender, ) { let control = Control::new(socket, iopub_tx, handler, stdin_interrupt_tx); @@ -531,37 +531,69 @@ fn socket_bridge_thread( }; loop { - let n = unwrap!( - zmq::poll(&mut poll_items, -1), - Err(err) => { - debug_panic!("While polling 0MQ items: {err:?}"); - 0 - } - ); - - for _ in 0..n { - if consume_outbound_notification() { - forward_outbound(); - continue; - } - + // On Windows ARM, zmq::poll with a non-zero timeout blocks forever + // and inproc notification sockets may not wake the poll at all. + // Use a fully separate polling path that doesn't rely on ZMQ + // readability reporting: non-blocking poll + unconditional drain + // of all sources + short sleep when idle. + #[cfg(all(target_os = "windows", target_arch = "aarch64"))] + { + // Drain outbound messages (IOPub, StdIn) unconditionally + consume_outbound_notification(); + forward_outbound(); + + // Check inbound sockets with non-blocking poll if has_inbound(&stdin_socket) { unwrap!( forward_inbound(&stdin_socket, &stdin_inbound_tx), Err(err) => debug_panic!("While forwarding inbound message: {err:?}") ); - continue; } - if has_inbound(&iopub_socket) { unwrap!( forward_inbound_subscription(&iopub_socket, &iopub_inbound_tx), Err(err) => debug_panic!("While forwarding inbound message: {err:?}") ); - continue; } - debug_panic!("Could not find readable message"); + std::thread::sleep(std::time::Duration::from_millis(1)); + continue; + } + + #[cfg(not(all(target_os = "windows", target_arch = "aarch64")))] + { + let n = unwrap!( + zmq::poll(&mut poll_items, -1), + Err(err) => { + debug_panic!("While polling 0MQ items: {err:?}"); + 0 + } + ); + + for _ in 0..n { + if consume_outbound_notification() { + forward_outbound(); + continue; + } + + if has_inbound(&stdin_socket) { + unwrap!( + forward_inbound(&stdin_socket, &stdin_inbound_tx), + Err(err) => debug_panic!("While forwarding inbound message: {err:?}") + ); + continue; + } + + if has_inbound(&iopub_socket) { + unwrap!( + forward_inbound_subscription(&iopub_socket, &iopub_inbound_tx), + Err(err) => debug_panic!("While forwarding inbound message: {err:?}") + ); + continue; + } + + debug_panic!("Could not find readable message"); + } } } } diff --git a/crates/amalthea/src/language/control_handler.rs b/crates/amalthea/src/language/control_handler.rs index 1c45d109d..c0cecad01 100644 --- a/crates/amalthea/src/language/control_handler.rs +++ b/crates/amalthea/src/language/control_handler.rs @@ -5,27 +5,29 @@ * */ -use async_trait::async_trait; - +use crate::wire::debug_reply::DebugReply; +use crate::wire::debug_request::DebugRequest; use crate::wire::exception::Exception; use crate::wire::interrupt_reply::InterruptReply; use crate::wire::shutdown_reply::ShutdownReply; use crate::wire::shutdown_request::ShutdownRequest; -#[async_trait] pub trait ControlHandler: Send { /// Handles a request to shut down the kernel. This message is forwarded /// from the Control socket. /// /// https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-shutdown - async fn handle_shutdown_request( - &self, - msg: &ShutdownRequest, - ) -> Result; + fn handle_shutdown_request(&self, msg: &ShutdownRequest) -> Result; /// Handles a request to interrupt the kernel. This message is forwarded /// from the Control socket. /// /// https://jupyter-client.readthedocs.io/en/stable/messaging.html#kernel-interrupt - async fn handle_interrupt_request(&self) -> Result; + fn handle_interrupt_request(&self) -> Result; + + /// Handles a debug request forwarded from the Control socket. + /// The request and reply contents are opaque DAP messages. + /// + /// https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request + fn handle_debug_request(&self, msg: &DebugRequest) -> Result; } diff --git a/crates/amalthea/src/socket.rs b/crates/amalthea/src/socket.rs index 5c92a24ff..eaf954777 100644 --- a/crates/amalthea/src/socket.rs +++ b/crates/amalthea/src/socket.rs @@ -214,10 +214,38 @@ impl Socket { } } + #[cfg(not(all(target_os = "windows", target_arch = "aarch64")))] pub fn poll_incoming(&self, timeout_ms: i64) -> zmq::Result { Ok(self.socket.poll(zmq::PollEvents::POLLIN, timeout_ms)? != 0) } + /// On Windows ARM, ZMQ poll with a non-zero timeout blocks forever + /// instead of respecting the timeout. Use non-blocking poll with + /// manual timing. + #[cfg(all(target_os = "windows", target_arch = "aarch64"))] + pub fn poll_incoming(&self, timeout_ms: i64) -> zmq::Result { + if timeout_ms == 0 { + return Ok(self.socket.poll(zmq::PollEvents::POLLIN, 0)? != 0); + } + + let start = std::time::Instant::now(); + let timeout = if timeout_ms < 0 { + std::time::Duration::from_secs(u64::MAX / 2) + } else { + std::time::Duration::from_millis(timeout_ms as u64) + }; + + loop { + if self.socket.poll(zmq::PollEvents::POLLIN, 0)? != 0 { + return Ok(true); + } + if start.elapsed() >= timeout { + return Ok(false); + } + std::thread::sleep(std::time::Duration::from_millis(1)); + } + } + pub fn has_incoming_data(&self) -> zmq::Result { self.poll_incoming(0) } diff --git a/crates/amalthea/src/socket/control.rs b/crates/amalthea/src/socket/control.rs index 067b3d638..b0e570d0c 100644 --- a/crates/amalthea/src/socket/control.rs +++ b/crates/amalthea/src/socket/control.rs @@ -5,16 +5,13 @@ * */ -use std::sync::Arc; -use std::sync::Mutex; - use crossbeam::channel::SendError; use crossbeam::channel::Sender; -use futures::executor::block_on; use log::error; use log::info; use log::trace; use log::warn; +use stdext::result::ResultExt; use stdext::unwrap; use crate::error::Error; @@ -22,6 +19,7 @@ use crate::language::control_handler::ControlHandler; use crate::socket::iopub::IOPubContextChannel; use crate::socket::iopub::IOPubMessage; use crate::socket::Socket; +use crate::wire::debug_request::DebugRequest; use crate::wire::interrupt_request::InterruptRequest; use crate::wire::jupyter_message::JupyterMessage; use crate::wire::jupyter_message::Message; @@ -33,7 +31,7 @@ use crate::wire::status::KernelStatus; pub struct Control { socket: Socket, iopub_tx: Sender, - handler: Arc>, + handler: Box, stdin_interrupt_tx: Sender, } @@ -41,7 +39,7 @@ impl Control { pub fn new( socket: Socket, iopub_tx: Sender, - handler: Arc>, + handler: Box, stdin_interrupt_tx: Sender, ) -> Self { Self { @@ -73,6 +71,9 @@ impl Control { fn process_message(&self, message: Message) -> Result<(), Error> { match message { + Message::DebugRequest(req) => { + self.handle_request(req, |r| self.handle_debug_request(r)) + }, Message::ShutdownRequest(req) => { self.handle_request(req, |r| self.handle_shutdown_request(r)) }, @@ -130,11 +131,8 @@ impl Control { fn handle_shutdown_request(&self, req: JupyterMessage) -> Result<(), Error> { info!("Received shutdown request, shutting down kernel: {:?}", req); - // Lock the control handler object on this thread - let control_handler = self.handler.lock().unwrap(); - let reply = unwrap!( - block_on(control_handler.handle_shutdown_request(&req.content)), + self.handler.handle_shutdown_request(&req.content), Err(err) => { log::error!("Failed to handle shutdown request: {err:?}"); return Ok(()) @@ -156,6 +154,18 @@ impl Control { Ok(()) } + fn handle_debug_request(&self, req: JupyterMessage) -> Result<(), Error> { + log::trace!("Received debug request: {:?}", req); + + let Some(reply) = self.handler.handle_debug_request(&req.content).log_err() else { + return Ok(()); + }; + + req.send_reply(reply, &self.socket).log_err(); + + Ok(()) + } + fn handle_interrupt_request(&self, req: JupyterMessage) -> Result<(), Error> { info!( "Received interrupt request, asking kernel to stop: {:?}", @@ -169,11 +179,8 @@ impl Control { error!("Failed to send interrupt request: {:?}", err); } - // Lock the control handler object on this thread - let control_handler = self.handler.lock().unwrap(); - let reply = unwrap!( - block_on(control_handler.handle_interrupt_request()), + self.handler.handle_interrupt_request(), Err(err) => { log::error!("Failed to handle interrupt request: {err:?}"); return Ok(()) diff --git a/crates/amalthea/src/socket/iopub.rs b/crates/amalthea/src/socket/iopub.rs index 02cb6fac6..9f0565bf2 100644 --- a/crates/amalthea/src/socket/iopub.rs +++ b/crates/amalthea/src/socket/iopub.rs @@ -18,6 +18,7 @@ use crate::session::Session; use crate::wire::comm_close::CommClose; use crate::wire::comm_msg::CommWireMsg; use crate::wire::comm_open::CommOpen; +use crate::wire::debug_event::DebugEvent; use crate::wire::display_data::DisplayData; use crate::wire::execute_error::ExecuteError; use crate::wire::execute_input::ExecuteInput; @@ -85,6 +86,7 @@ pub enum IOPubContextChannel { #[derive(Debug)] pub enum IOPubMessage { Status(JupyterHeader, IOPubContextChannel, KernelStatus), + DebugEvent(DebugEvent), ExecuteResult(ExecuteResult), ExecuteError(ExecuteError), ExecuteInput(ExecuteInput), @@ -241,6 +243,12 @@ impl IOPub { self.message_with_context(content, IOPubContextChannel::Shell), )) }, + IOPubMessage::DebugEvent(content) => { + self.flush_stream(); + self.forward(Message::DebugEvent( + self.message_with_context(content, IOPubContextChannel::Control), + )) + }, IOPubMessage::Wait(content) => self.process_wait_request(content), IOPubMessage::CommOutgoing(comm_id, comm_msg) => { self.flush_stream(); diff --git a/crates/amalthea/src/socket/shell.rs b/crates/amalthea/src/socket/shell.rs index b4d367a4d..7357bd6db 100644 --- a/crates/amalthea/src/socket/shell.rs +++ b/crates/amalthea/src/socket/shell.rs @@ -111,6 +111,29 @@ impl Shell { /// Main loop for the Shell thread; to be invoked by the kernel. pub fn listen(&mut self) { + #[cfg(all(target_os = "windows", target_arch = "aarch64"))] + self.listen_polling(); + + #[cfg(not(all(target_os = "windows", target_arch = "aarch64")))] + self.listen_blocking(); + } + + /// On Windows ARM, zmq::poll with a non-zero timeout blocks forever + /// and inproc notification sockets may not wake the poll. Use + /// non-blocking poll with unconditional comm checks and a short sleep. + #[cfg(all(target_os = "windows", target_arch = "aarch64"))] + fn listen_polling(&mut self) { + loop { + self.process_comm_notification(); + if self.socket.has_incoming_data().unwrap_or(false) { + self.process_shell_socket(); + } + std::thread::sleep(std::time::Duration::from_millis(1)); + } + } + + #[cfg(not(all(target_os = "windows", target_arch = "aarch64")))] + fn listen_blocking(&mut self) { loop { log::trace!("Waiting for shell messages or comm events"); @@ -139,24 +162,28 @@ impl Shell { } if shell_readable { - let message = match Message::read_from_socket(&self.socket) { - Ok(m) => m, - Err(err) => { - log::warn!("Could not read message from shell socket: {err:?}"); - continue; - }, - }; - - // Handle the message; any failures while handling the messages are - // delivered to the client instead of reported up the stack, so the - // only errors likely here are "can't deliver to client" - if let Err(err) = self.process_message(message) { - log::error!("Could not handle shell message: {err:?}"); - } + self.process_shell_socket(); } } } + fn process_shell_socket(&mut self) { + let message = match Message::read_from_socket(&self.socket) { + Ok(m) => m, + Err(err) => { + log::warn!("Could not read message from shell socket: {err:?}"); + return; + }, + }; + + // Handle the message; any failures while handling the messages are + // delivered to the client instead of reported up the stack, so the + // only errors likely here are "can't deliver to client" + if let Err(err) = self.process_message(message) { + log::error!("Could not handle shell message: {err:?}"); + } + } + /// Process comm event notifications from the notifier thread. /// Drains all pending notifications and all pending events. fn process_comm_notification(&mut self) { diff --git a/crates/amalthea/src/wire.rs b/crates/amalthea/src/wire.rs index e916dcdad..bcb69a3d4 100644 --- a/crates/amalthea/src/wire.rs +++ b/crates/amalthea/src/wire.rs @@ -12,6 +12,9 @@ pub mod comm_msg; pub mod comm_open; pub mod complete_reply; pub mod complete_request; +pub mod debug_event; +pub mod debug_reply; +pub mod debug_request; pub mod display_data; pub mod error_reply; pub mod exception; diff --git a/crates/amalthea/src/wire/debug_event.rs b/crates/amalthea/src/wire/debug_event.rs new file mode 100644 index 000000000..c1150aa83 --- /dev/null +++ b/crates/amalthea/src/wire/debug_event.rs @@ -0,0 +1,29 @@ +/* + * debug_event.rs + * + * Copyright (C) 2026 Posit Software, PBC. All rights reserved. + * + */ + +use serde::Deserialize; +use serde::Serialize; + +use crate::wire::jupyter_message::MessageType; + +/// Represents a debug event published on the IOPub channel. +/// +/// The content is an opaque DAP (Debug Adapter Protocol) event, forwarded +/// as-is between the kernel's debugger and the frontend. +/// +/// https://jupyter-client.readthedocs.io/en/latest/messaging.html#additions-to-the-dap +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DebugEvent { + #[serde(flatten)] + pub content: serde_json::Value, +} + +impl MessageType for DebugEvent { + fn message_type() -> String { + String::from("debug_event") + } +} diff --git a/crates/amalthea/src/wire/debug_reply.rs b/crates/amalthea/src/wire/debug_reply.rs new file mode 100644 index 000000000..52f337d13 --- /dev/null +++ b/crates/amalthea/src/wire/debug_reply.rs @@ -0,0 +1,29 @@ +/* + * debug_reply.rs + * + * Copyright (C) 2026 Posit Software, PBC. All rights reserved. + * + */ + +use serde::Deserialize; +use serde::Serialize; + +use crate::wire::jupyter_message::MessageType; + +/// Represents a reply to a `debug_request` message on the control channel. +/// +/// The content is an opaque DAP (Debug Adapter Protocol) response, passed +/// through as-is between the frontend and the kernel's debugger. +/// +/// https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DebugReply { + #[serde(flatten)] + pub content: serde_json::Value, +} + +impl MessageType for DebugReply { + fn message_type() -> String { + String::from("debug_reply") + } +} diff --git a/crates/amalthea/src/wire/debug_request.rs b/crates/amalthea/src/wire/debug_request.rs new file mode 100644 index 000000000..b81f96c01 --- /dev/null +++ b/crates/amalthea/src/wire/debug_request.rs @@ -0,0 +1,29 @@ +/* + * debug_request.rs + * + * Copyright (C) 2026 Posit Software, PBC. All rights reserved. + * + */ + +use serde::Deserialize; +use serde::Serialize; + +use crate::wire::jupyter_message::MessageType; + +/// Represents a Jupyter Debug Protocol request. +/// +/// The content is an opaque DAP (Debug Adapter Protocol) request message, +/// forwarded as-is between the frontend and the kernel's debugger. +/// +/// https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DebugRequest { + #[serde(flatten)] + pub content: serde_json::Value, +} + +impl MessageType for DebugRequest { + fn message_type() -> String { + String::from("debug_request") + } +} diff --git a/crates/amalthea/src/wire/jupyter_message.rs b/crates/amalthea/src/wire/jupyter_message.rs index 6f99c7d28..2f6b39d0a 100644 --- a/crates/amalthea/src/wire/jupyter_message.rs +++ b/crates/amalthea/src/wire/jupyter_message.rs @@ -7,6 +7,8 @@ use serde::Deserialize; use serde::Serialize; +use serde_json::json; +use serde_json::Value; use super::display_data::DisplayData; use super::handshake_reply::HandshakeReply; @@ -27,6 +29,9 @@ use crate::wire::comm_msg::CommWireMsg; use crate::wire::comm_open::CommOpen; use crate::wire::complete_reply::CompleteReply; use crate::wire::complete_request::CompleteRequest; +use crate::wire::debug_event::DebugEvent; +use crate::wire::debug_reply::DebugReply; +use crate::wire::debug_request::DebugRequest; use crate::wire::error_reply::ErrorReply; use crate::wire::exception::Exception; use crate::wire::execute_error::ExecuteError; @@ -66,6 +71,9 @@ pub struct JupyterMessage { /// not all messages have a parent. pub parent_header: Option, + /// Additional metadata + pub metadata: Value, + /// The body (payload) of the message pub content: T, } @@ -104,6 +112,8 @@ pub enum Message { InputReply(JupyterMessage), InputRequest(JupyterMessage), // Control + DebugReply(JupyterMessage), + DebugRequest(JupyterMessage), InterruptReply(JupyterMessage), InterruptRequest(JupyterMessage), ShutdownReply(JupyterMessage), @@ -112,6 +122,7 @@ pub enum Message { HandshakeRequest(JupyterMessage), HandshakeReply(JupyterMessage), // IOPub + DebugEvent(JupyterMessage), Status(JupyterMessage), ExecuteResult(JupyterMessage), ExecuteError(JupyterMessage), @@ -153,6 +164,9 @@ impl TryFrom<&Message> for WireMessage { match msg { Message::CompleteReply(msg) => WireMessage::try_from(msg), Message::CompleteRequest(msg) => WireMessage::try_from(msg), + Message::DebugEvent(msg) => WireMessage::try_from(msg), + Message::DebugReply(msg) => WireMessage::try_from(msg), + Message::DebugRequest(msg) => WireMessage::try_from(msg), Message::ExecuteReply(msg) => WireMessage::try_from(msg), Message::ExecuteReplyException(msg) => WireMessage::try_from(msg), Message::ExecuteRequest(msg) => WireMessage::try_from(msg), @@ -254,6 +268,15 @@ impl TryFrom<&WireMessage> for Message { if kind == CompleteReply::message_type() { return Ok(Message::CompleteReply(JupyterMessage::try_from(msg)?)); } + if kind == DebugEvent::message_type() { + return Ok(Message::DebugEvent(JupyterMessage::try_from(msg)?)); + } + if kind == DebugReply::message_type() { + return Ok(Message::DebugReply(JupyterMessage::try_from(msg)?)); + } + if kind == DebugRequest::message_type() { + return Ok(Message::DebugRequest(JupyterMessage::try_from(msg)?)); + } if kind == DisplayData::message_type() { return Ok(Message::DisplayData(JupyterMessage::try_from(msg)?)); } @@ -329,6 +352,50 @@ impl Message { msg.send(socket)?; Ok(()) } + + pub fn parent_header(&self) -> Option<&JupyterHeader> { + match self { + Self::KernelInfoReply(msg) => msg.parent_header.as_ref(), + Self::KernelInfoRequest(msg) => msg.parent_header.as_ref(), + Self::CompleteReply(msg) => msg.parent_header.as_ref(), + Self::CompleteRequest(msg) => msg.parent_header.as_ref(), + Self::ExecuteReply(msg) => msg.parent_header.as_ref(), + Self::ExecuteReplyException(msg) => msg.parent_header.as_ref(), + Self::ExecuteRequest(msg) => msg.parent_header.as_ref(), + Self::InspectReply(msg) => msg.parent_header.as_ref(), + Self::InspectRequest(msg) => msg.parent_header.as_ref(), + Self::IsCompleteReply(msg) => msg.parent_header.as_ref(), + Self::IsCompleteRequest(msg) => msg.parent_header.as_ref(), + Self::HistoryReply(msg) => msg.parent_header.as_ref(), + Self::HistoryRequest(msg) => msg.parent_header.as_ref(), + Self::CommInfoReply(msg) => msg.parent_header.as_ref(), + Self::CommInfoRequest(msg) => msg.parent_header.as_ref(), + Self::CommRequest(msg) => msg.parent_header.as_ref(), + Self::CommReply(msg) => msg.parent_header.as_ref(), + Self::InputReply(msg) => msg.parent_header.as_ref(), + Self::InputRequest(msg) => msg.parent_header.as_ref(), + Self::DebugReply(msg) => msg.parent_header.as_ref(), + Self::DebugRequest(msg) => msg.parent_header.as_ref(), + Self::InterruptReply(msg) => msg.parent_header.as_ref(), + Self::InterruptRequest(msg) => msg.parent_header.as_ref(), + Self::ShutdownReply(msg) => msg.parent_header.as_ref(), + Self::ShutdownRequest(msg) => msg.parent_header.as_ref(), + Self::HandshakeRequest(msg) => msg.parent_header.as_ref(), + Self::HandshakeReply(msg) => msg.parent_header.as_ref(), + Self::DebugEvent(msg) => msg.parent_header.as_ref(), + Self::Status(msg) => msg.parent_header.as_ref(), + Self::ExecuteResult(msg) => msg.parent_header.as_ref(), + Self::ExecuteError(msg) => msg.parent_header.as_ref(), + Self::ExecuteInput(msg) => msg.parent_header.as_ref(), + Self::Stream(msg) => msg.parent_header.as_ref(), + Self::DisplayData(msg) => msg.parent_header.as_ref(), + Self::UpdateDisplayData(msg) => msg.parent_header.as_ref(), + Self::Welcome(msg) => msg.parent_header.as_ref(), + Self::CommMsg(msg) => msg.parent_header.as_ref(), + Self::CommOpen(msg) => msg.parent_header.as_ref(), + Self::CommClose(msg) => msg.parent_header.as_ref(), + } + } } impl JupyterMessage @@ -357,6 +424,7 @@ where session.username.clone(), ), parent_header: parent, + metadata: json!({}), content, } } @@ -375,6 +443,7 @@ where session.username.clone(), ), parent_header: Some(originator.header), + metadata: json!({}), content, } } @@ -437,6 +506,7 @@ where session.username.clone(), ), parent_header: Some(self.header.clone()), + metadata: json!({}), content, } } @@ -459,6 +529,7 @@ where session.username.clone(), ), parent_header: Some(self.header.clone()), + metadata: json!({}), content: ErrorReply { status: Status::Error, exception, diff --git a/crates/amalthea/src/wire/kernel_info_full_reply.rs b/crates/amalthea/src/wire/kernel_info_full_reply.rs index 42e022e50..aeab651b1 100644 --- a/crates/amalthea/src/wire/kernel_info_full_reply.rs +++ b/crates/amalthea/src/wire/kernel_info_full_reply.rs @@ -59,10 +59,13 @@ impl MessageType for KernelInfoReply { /// Adds Amalthea fields to partial [kernel_info_reply::KernelInfoReply]. impl From for KernelInfoReply { fn from(value: kernel_info_reply::KernelInfoReply) -> Self { + // Merge client's supported features with Amalthea's + let mut supported_features = vec![String::from("iopub_welcome")]; + supported_features.extend(value.supported_features); + Self { - // These fields are set by Amalthea protocol_version: String::from("5.4"), - supported_features: vec![String::from("iopub_welcome")], + supported_features, // These fields are set by the Amalthea user status: value.status, diff --git a/crates/amalthea/src/wire/kernel_info_reply.rs b/crates/amalthea/src/wire/kernel_info_reply.rs index 57cd2f12a..ee1b055ed 100644 --- a/crates/amalthea/src/wire/kernel_info_reply.rs +++ b/crates/amalthea/src/wire/kernel_info_reply.rs @@ -38,4 +38,8 @@ pub struct KernelInfoReply { /// A list of help links pub help_links: Vec, + + /// Optional list of supported features (e.g. "debugger", "kernel subshells") + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub supported_features: Vec, } diff --git a/crates/amalthea/src/wire/originator.rs b/crates/amalthea/src/wire/originator.rs index 2fdd6dfc7..601fd13d7 100644 --- a/crates/amalthea/src/wire/originator.rs +++ b/crates/amalthea/src/wire/originator.rs @@ -5,6 +5,8 @@ * */ +use serde_json::Value; + use crate::wire::header::JupyterHeader; use crate::wire::jupyter_message::JupyterMessage; @@ -12,6 +14,7 @@ use crate::wire::jupyter_message::JupyterMessage; pub struct Originator { pub zmq_identities: Vec>, pub header: JupyterHeader, + pub metadata: Value, } impl From<&JupyterMessage> for Originator { @@ -19,6 +22,7 @@ impl From<&JupyterMessage> for Originator { Originator { zmq_identities: msg.zmq_identities.clone(), header: msg.header.clone(), + metadata: msg.metadata.clone(), } } } diff --git a/crates/amalthea/src/wire/wire_message.rs b/crates/amalthea/src/wire/wire_message.rs index 414fa7a77..28d92f3c8 100644 --- a/crates/amalthea/src/wire/wire_message.rs +++ b/crates/amalthea/src/wire/wire_message.rs @@ -11,7 +11,6 @@ use log::trace; use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; -use serde_json::json; use serde_json::value::Value; use sha2::Sha256; @@ -347,6 +346,7 @@ impl TryFrom<&WireMessage> for JupyterMes zmq_identities: msg.zmq_identities.clone(), header: msg.header.clone(), parent_header: msg.parent_header.clone(), + metadata: msg.metadata.clone(), content, }) } @@ -371,7 +371,7 @@ impl TryFrom<&JupyterMessage> for WireMessage { zmq_identities: msg.zmq_identities.clone(), header: msg.header.clone(), parent_header: msg.parent_header.clone(), - metadata: json!({}), + metadata: msg.metadata.clone(), content, }) } diff --git a/crates/amalthea/tests/client/control.rs b/crates/amalthea/tests/client/control.rs index 94a588d38..bf833bebd 100644 --- a/crates/amalthea/tests/client/control.rs +++ b/crates/amalthea/tests/client/control.rs @@ -6,21 +6,18 @@ */ use amalthea::language::control_handler::ControlHandler; +use amalthea::wire::debug_reply::DebugReply; +use amalthea::wire::debug_request::DebugRequest; use amalthea::wire::exception::Exception; use amalthea::wire::interrupt_reply::InterruptReply; use amalthea::wire::jupyter_message::Status; use amalthea::wire::shutdown_reply::ShutdownReply; use amalthea::wire::shutdown_request::ShutdownRequest; -use async_trait::async_trait; pub struct Control {} -#[async_trait] impl ControlHandler for Control { - async fn handle_shutdown_request( - &self, - msg: &ShutdownRequest, - ) -> Result { + fn handle_shutdown_request(&self, msg: &ShutdownRequest) -> Result { // NYI Ok(ShutdownReply { status: Status::Ok, @@ -28,8 +25,15 @@ impl ControlHandler for Control { }) } - async fn handle_interrupt_request(&self) -> Result { + fn handle_interrupt_request(&self) -> Result { // NYI Ok(InterruptReply { status: Status::Ok }) } + + fn handle_debug_request(&self, _msg: &DebugRequest) -> Result { + // NYI + Ok(DebugReply { + content: serde_json::json!({}), + }) + } } diff --git a/crates/amalthea/tests/client/dummy_frontend.rs b/crates/amalthea/tests/client/dummy_frontend.rs index 2d4574a9e..9d9c78640 100644 --- a/crates/amalthea/tests/client/dummy_frontend.rs +++ b/crates/amalthea/tests/client/dummy_frontend.rs @@ -70,7 +70,7 @@ impl DummyAmaltheaFrontend { stdin_request_tx, stdin_reply_rx, )); - let control = Arc::new(Mutex::new(control::Control {})); + let control = Box::new(control::Control {}); // Initialize logging env_logger::init(); diff --git a/crates/amalthea/tests/client/shell.rs b/crates/amalthea/tests/client/shell.rs index 0db730c3e..c5154d3c8 100644 --- a/crates/amalthea/tests/client/shell.rs +++ b/crates/amalthea/tests/client/shell.rs @@ -108,6 +108,7 @@ impl ShellHandler for Shell { debugger: false, help_links: Vec::new(), language_info: info, + supported_features: vec![], }) } diff --git a/crates/ark/src/console/console_debug.rs b/crates/ark/src/console/console_debug.rs index 31a9d15f6..1edcd0483 100644 --- a/crates/ark/src/console/console_debug.rs +++ b/crates/ark/src/console/console_debug.rs @@ -24,6 +24,7 @@ use regex::Regex; use stdext::result::ResultExt; use crate::console::Console; +use crate::console::SessionMode; use crate::modules::ARK_ENVS; use crate::srcref::ark_uri; use crate::thread::RThreadSafe; @@ -111,7 +112,7 @@ impl Console { // Transient eval with unchanged stack: just refresh variables if transient_eval && !stack_changed { - let dap = self.debug_dap.lock().unwrap(); + let mut dap = self.debug_dap.lock().unwrap(); dap.send_invalidated(); return; } @@ -553,6 +554,11 @@ pub unsafe extern "C-unwind" fn ps_handle_breakpoint( let console = Console::get_mut(); let dap = console.debug_dap.lock().unwrap(); + // In notebook mode, breakpoints are inert when no debug session is active + if console.session_mode() == SessionMode::Notebook && !dap.is_connected { + return Ok(RObject::from(false).sexp); + } + let enabled = dap.is_breakpoint_enabled(&uri, id); let bp = dap.get_breakpoint(&uri, id); let bp_line = bp.map_or(0, |bp| bp.line); diff --git a/crates/ark/src/console/console_repl.rs b/crates/ark/src/console/console_repl.rs index 9f8f74372..2e8c93e3f 100644 --- a/crates/ark/src/console/console_repl.rs +++ b/crates/ark/src/console/console_repl.rs @@ -11,9 +11,11 @@ //! ReadConsole, WriteConsole, and R frontend callbacks. use super::*; +use crate::dap::dap_notebook; use crate::data_explorer::r_data_explorer::POSITRON_DATA_EXPLORER_MIME; use crate::r_task::QueuedRTask; use crate::r_task::RTask; +use crate::request::DebugRequest; static RE_DEBUG_PROMPT: Lazy = Lazy::new(|| Regex::new(r"Browse\[\d+\]").unwrap()); @@ -33,7 +35,7 @@ const DEBUG_COMMANDS_CONTINUE: &[&str] = &["n", "f", "c", "cont", "Q"]; static R_INIT: once_cell::sync::OnceCell<()> = once_cell::sync::OnceCell::new(); /// An enum representing the different modes in which the R session can run. -#[derive(PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Clone, Copy)] pub enum SessionMode { /// A session with an interactive console (REPL), such as in Positron. Console, @@ -190,6 +192,7 @@ pub(crate) struct KernelInfo { pub(crate) banner: String, pub(crate) input_prompt: Option, pub(crate) continuation_prompt: Option, + pub(crate) session_mode: SessionMode, } /// The kind of prompt we're handling in the REPL. @@ -483,7 +486,7 @@ impl Console { log::info!( "R has started and ark handlers have been registered, completing initialization." ); - Self::complete_initialization(console.banner.take(), kernel_init_tx); + Self::complete_initialization(console.banner.take(), console.session_mode, kernel_init_tx); // Spawn handler loop for async messages from other components (e.g., LSP). // Note that we do it after init is complete to avoid deadlocking @@ -574,7 +577,11 @@ impl Console { /// # Safety /// /// Can only be called from the R thread, and only once. - fn complete_initialization(banner: Option, mut kernel_init_tx: Bus) { + fn complete_initialization( + banner: Option, + session_mode: SessionMode, + mut kernel_init_tx: Bus, + ) { let version = unsafe { let version = Rf_findVarInFrame(R_BaseNamespace, r_symbol!("R.version.string")); RObject::new(version).to::().unwrap() @@ -589,6 +596,7 @@ impl Console { banner: banner.unwrap_or_default(), input_prompt: Some(input_prompt), continuation_prompt: Some(continuation_prompt), + session_mode, }; // Set `R_INIT` before broadcasting so that threads unblocked by the @@ -868,9 +876,21 @@ impl Console { return result; } - // Similarly, if we have pending inputs, we're about to immediately - // continue with the next expression. Don't emit debug notifications - // for these intermediate browser prompts. + // In notebook mode, `browser()` or `debug()` calls causing a + // browser prompt outside of an active debugging session are routed + // to stdin so the user can type debug commands in an input box. + if self.session_mode == SessionMode::Notebook && + !self.debug_dap.lock().unwrap().is_connected + { + self.debug_dap.lock().unwrap().is_debugging_stdin = true; + let result = self.handle_input_request(&info, buf, buflen); + self.debug_dap.lock().unwrap().is_debugging_stdin = false; + return result; + } + + // As with auto-stepping above, if we have pending inputs we're + // about to immediately continue with the next expression. Don't + // emit debug notifications for these intermediate browser prompts. let has_pending = self.pending_inputs.as_ref().is_some_and(|p| !p.is_empty()); // Only now that we know we're stopping for real, set state and @@ -887,6 +907,11 @@ impl Console { } } + // In notebook mode, don't complete the request while debugging. + // The kernel must stay busy until the debug session ends. + let can_complete_active_request = + !(self.session_mode == SessionMode::Notebook && self.debug_is_debugging); + if let Some(exception) = self.take_exception() { // We might get an input request if `readline()` or `menu()` is // called in `options(error = )`. We respond to this with an error @@ -906,18 +931,20 @@ impl Console { self.pending_inputs = None; // Reply to active request with error, then fall through to event loop - self.handle_active_request( - &info.input_prompt, - &continuation_prompt, - ConsoleValue::Error(exception), - ); + if can_complete_active_request { + self.handle_active_request( + &info.input_prompt, + &continuation_prompt, + ConsoleValue::Error(exception), + ); + } } else if matches!(info.kind, PromptKind::InputRequest) { // Request input from the frontend and return it to R return self.handle_input_request(&info, buf, buflen); } else if let Some(input) = self.pop_pending() { // Evaluate pending expression if there is any remaining return self.handle_pending_input(input, buf, buflen); - } else { + } else if can_complete_active_request { // Otherwise reply to active request with accumulated result, then // fall through to event loop let result = self.take_result(); @@ -974,12 +1001,17 @@ impl Console { // package. 50ms seems to be more in line with RStudio (posit-dev/positron#7235). let polled_events_rx = crossbeam::channel::tick(Duration::from_millis(50)); - // This is the main kind of message from the frontend that we are expecting. - // We either wait for `input_reply` messages on StdIn, or for - // `execute_request` on Shell. - let (r_request_index, stdin_reply_index) = match wait_for { - WaitFor::ExecuteRequest => (Some(select.recv(&r_request_rx)), None), - WaitFor::InputReply => (None, Some(select.recv(&stdin_reply_rx))), + // This is the main kind of message from the frontend that we are + // expecting. We either wait for `input_reply` messages on StdIn, or for + // `execute_request` on Shell. We also listen for R requests, including + // while waiting on StdIn. Such requests are only expected in Notebook + // mode when debugging via StdIn. The interrupt handler may send us a + // Quit command to terminate the debugging session. For simplicity we + // listen for these requests unconditionally, including in Console sessions. + let r_request_index = select.recv(&r_request_rx); + let stdin_reply_index = match wait_for { + WaitFor::ExecuteRequest => None, + WaitFor::InputReply => Some(select.recv(&stdin_reply_rx)), }; let kernel_request_index = select.recv(&kernel_request_rx); @@ -1003,15 +1035,15 @@ impl Console { }; loop { - // If an interrupt was signaled and we are in a user - // request prompt, e.g. `readline()`, we need to propagate - // the interrupt to the R stack. This needs to happen before - // `process_idle_events()`, particularly on Windows, because it - // calls `R_ProcessEvents()`, which checks and resets - // `UserBreak`, but won't actually fire the interrupt b/c - // we have them disabled, so it would end up swallowing the - // user interrupt request. - if matches!(info.kind, PromptKind::InputRequest) && interrupts_pending() { + // If an interrupt was signaled and we are waiting for user + // input (readline, or browser-as-stdin in notebook mode), we + // need to propagate the interrupt to the R stack. This needs + // to happen before `process_idle_events()`, particularly on + // Windows, because it calls `R_ProcessEvents()`, which checks + // and resets `UserBreak`, but won't actually fire the + // interrupt b/c we have them disabled, so it would end up + // swallowing the user interrupt request. + if matches!(wait_for, WaitFor::InputReply) && interrupts_pending() { return ConsoleResult::Interrupt; } @@ -1043,13 +1075,26 @@ impl Console { match oper.index() { // We've got an execute request from the frontend - i if Some(i) == r_request_index => { + i if i == r_request_index => { let req = oper.recv(&r_request_rx); let Ok(req) = req else { // The channel is disconnected and empty return ConsoleResult::Disconnected; }; + // When debugging via StdIn in notebook mode, waiting for + // input at a browser prompt, the interrupt handler may send + // a Quit command to exit the browser cleanly. + if matches!(wait_for, WaitFor::InputReply) { + if let RRequest::DebugCommand(DebugRequest::Quit) = req { + let input = String::from("Q"); + Self::on_console_input(buf, buflen, input).unwrap(); + return ConsoleResult::NewInput; + } + log::warn!("Unexpected R request while waiting for stdin input: {req:?}"); + continue; + } + if let Some(input) = self.handle_execute_request(req, info, buf, buflen) { return input; } @@ -1313,8 +1358,16 @@ impl Console { panic!("Unexpected `execute_request` while waiting for `input_reply`."); } + let mut cell_id = None; + let input = match req { RRequest::ExecuteCode(exec_req, originator, reply_tx) => { + cell_id = originator + .metadata + .get("cellId") + .and_then(|v| v.as_str()) + .map(String::from); + // Extract input from request let (input, exec_count) = { self.init_execute_request(&exec_req) }; @@ -1367,6 +1420,23 @@ impl Console { let mut dap_guard = self.debug_dap.lock().unwrap(); let uri_id = loc.as_ref().map(UrlId::from_code_location); + + // For notebook cells (`cellId` present in metadata), synthesize + // a `CodeLocation` pointing to the temp file that `dumpCell` + // wrote. This gives `annotate_input()` the file URI it needs + // for the `#line` directive and breakpoint injection. + let (uri_id, loc) = if cell_id.is_some() { + match dap_notebook::notebook_code_location(&code) { + Some(notebook_loc) => { + let id = UrlId::from_url(notebook_loc.uri.clone()); + (Some(id), Some(notebook_loc)) + }, + None => (uri_id, loc), + } + } else { + (uri_id, loc) + }; + let breakpoints = uri_id .as_ref() .and_then(|uri_id| dap_guard.breakpoints.get_mut(uri_id)) diff --git a/crates/ark/src/control.rs b/crates/ark/src/control.rs index b005e8485..aee5f607e 100644 --- a/crates/ark/src/control.rs +++ b/crates/ark/src/control.rs @@ -1,39 +1,64 @@ /* * control.rs * - * Copyright (C) 2022 Posit Software, PBC. All rights reserved. + * Copyright (C) 2022-2026 Posit Software, PBC. All rights reserved. * */ +use std::sync::Arc; +use std::sync::Mutex; + use amalthea::language::control_handler::ControlHandler; +use amalthea::socket::iopub::IOPubMessage; +use amalthea::wire::debug_reply::DebugReply; +use amalthea::wire::debug_request::DebugRequest; use amalthea::wire::exception::Exception; use amalthea::wire::interrupt_reply::InterruptReply; use amalthea::wire::jupyter_message::Status; use amalthea::wire::shutdown_reply::ShutdownReply; use amalthea::wire::shutdown_request::ShutdownRequest; -use async_trait::async_trait; use crossbeam::channel::Sender; +use stdext::result::ResultExt; +use crate::console::SessionMode; +use crate::dap::dap_jupyter_handler::DapJupyterHandler; +use crate::dap::Dap; use crate::request::RRequest; pub struct Control { r_request_tx: Sender, + dap: Arc>, + dap_handler: Option, } impl Control { - pub fn new(sender: Sender) -> Self { + pub fn new( + r_request_tx: Sender, + dap: Arc>, + iopub_tx: Sender, + session_mode: SessionMode, + ) -> Self { + let dap_handler = if matches!(session_mode, SessionMode::Notebook) { + dap.lock().unwrap().set_iopub_tx(iopub_tx.clone()); + Some(DapJupyterHandler::new( + dap.clone(), + r_request_tx.clone(), + iopub_tx, + )) + } else { + None + }; + Self { - r_request_tx: sender, + r_request_tx, + dap, + dap_handler, } } } -#[async_trait] impl ControlHandler for Control { - async fn handle_shutdown_request( - &self, - msg: &ShutdownRequest, - ) -> Result { + fn handle_shutdown_request(&self, msg: &ShutdownRequest) -> Result { log::info!("Received shutdown request: {msg:?}"); // Interrupt any ongoing computation. We shut down from ReadConsole when @@ -60,9 +85,38 @@ impl ControlHandler for Control { }) } - async fn handle_interrupt_request(&self) -> Result { + fn handle_interrupt_request(&self) -> Result { log::info!("Received interrupt request"); + + // When an interrupt is sent while debugging in notebook mode, we quit + // the debugger. The difference is justified by how the Console stays + // busy while debugging, showing a spinning wheel to the user. Quitting + // debugging on interrupt is natural UX in that context. + if self.dap_handler.is_some() { + let dap = self.dap.lock().unwrap(); + if dap.is_debugging || dap.is_debugging_stdin { + drop(dap); + self.r_request_tx + .send(RRequest::DebugCommand(crate::request::DebugRequest::Quit)) + .log_err(); + } + } crate::sys::control::handle_interrupt_request(); + Ok(InterruptReply { status: Status::Ok }) } + + fn handle_debug_request(&self, msg: &DebugRequest) -> Result { + let Some(handler) = &self.dap_handler else { + let response = serde_json::json!({ + "seq": 0, + "type": "response", + "success": false, + "message": "Debug requests are not supported in console mode", + }); + return Ok(DebugReply { content: response }); + }; + let response = handler.handle(&msg.content); + Ok(DebugReply { content: response }) + } } diff --git a/crates/ark/src/dap.rs b/crates/ark/src/dap.rs index 5cb0af56d..295880abc 100644 --- a/crates/ark/src/dap.rs +++ b/crates/ark/src/dap.rs @@ -5,6 +5,8 @@ // // +pub mod dap_jupyter_handler; +pub mod dap_notebook; pub mod dap_server; pub mod dap_state; pub mod dap_variables; diff --git a/crates/ark/src/dap/dap_jupyter_handler.rs b/crates/ark/src/dap/dap_jupyter_handler.rs new file mode 100644 index 000000000..1f8cec222 --- /dev/null +++ b/crates/ark/src/dap/dap_jupyter_handler.rs @@ -0,0 +1,306 @@ +// +// dap_jupyter_handler.rs +// +// Copyright (C) 2026 Posit Software, PBC. All rights reserved. +// +// + +// Handles DAP requests arriving via Jupyter `debug_request` messages on the +// control channel. Delegates standard DAP commands to the shared `DapHandler` +// and handles Jupyter Debug Protocol extensions (`dumpCell`, `debugInfo`, +// `configurationDone`) directly. +// +// Events are forwarded to the frontend as `debug_event` IOPub messages rather +// than over the TCP stream. +// +// https://jupyter-client.readthedocs.io/en/latest/messaging.html#additions-to-the-dap + +use std::cell::Cell; +use std::sync::Arc; +use std::sync::Mutex; + +use amalthea::socket::iopub::IOPubMessage; +use amalthea::wire::debug_event::DebugEvent; +use crossbeam::channel::Sender; +use dap::base_message::BaseMessage; +use dap::base_message::Sendable; +use dap::requests::Request; +use stdext::result::ResultExt; + +use crate::dap::dap_notebook; +use crate::dap::dap_server::DapConsoleEvent; +use crate::dap::dap_server::DapHandler; +use crate::dap::dap_state::Breakpoint; +use crate::dap::dap_state::BreakpointState; +use crate::dap::dap_state::Dap; +use crate::dap::dap_state::THREAD_ID; +use crate::request::RRequest; + +pub struct DapJupyterHandler { + handler: DapHandler, + iopub_tx: Sender, + seq: Cell, + tmp_file_prefix: &'static str, +} + +impl DapJupyterHandler { + pub fn new( + state: Arc>, + r_request_tx: Sender, + iopub_tx: Sender, + ) -> Self { + let handler = DapHandler::new(state, r_request_tx); + let tmp_file_prefix = dap_notebook::tmp_file_prefix(); + + Self { + handler, + iopub_tx, + seq: Cell::new(1), + tmp_file_prefix, + } + } + + fn next_seq(&self) -> i64 { + let seq = self.seq.get(); + self.seq.set(seq + 1); + seq + } + + /// Process a DAP request from a Jupyter `debug_request` message. + /// Returns the DAP response to be sent back as a `debug_reply`. + pub fn handle(&self, request: &serde_json::Value) -> serde_json::Value { + let seq = request.get("seq").and_then(|v| v.as_i64()).unwrap_or(0); + let command = request + .get("command") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + log::trace!("Jupyter DAP: Handling `{command}` (seq={seq})"); + + // Handle Jupyter Debug Protocol extensions and commands not in the + // `dap` crate's `Command` enum. + let result = match command { + "dumpCell" => Some(self.handle_dump_cell(seq, request)), + "debugInfo" => Some(self.handle_debug_info(seq)), + "configurationDone" => Some(Ok(self.success_response( + seq, + "configurationDone", + serde_json::json!({}), + ))), + _ => None, + }; + + match result { + Some(Ok(response)) => return response, + Some(Err(err)) => return self.error_response(seq, command, &format!("{err}")), + None => {}, + } + + // Parse as a standard DAP request and delegate to the shared handler + match serde_json::from_value::(request.clone()) { + Ok(dap_request) => { + let output = self.handler.dispatch(dap_request); + + for event in output.dap_events { + self.send_dap_event(event); + } + + // Deliver console events directly to R (no detour through the + // frontend in notebook mode since there is no console prompt to + // sync) + for effect in output.console_events { + self.handle_console_event(effect); + } + + self.response_to_json(output.response) + }, + Err(err) => { + log::warn!("Jupyter DAP: Failed to parse `{command}`: {err:?}"); + self.error_response(seq, command, &format!("Failed to parse request: {err}")) + }, + } + } + + fn handle_console_event(&self, event: DapConsoleEvent) { + match event { + DapConsoleEvent::DebugCommand(cmd) => { + self.handler + .r_request_tx + .send(RRequest::DebugCommand(cmd)) + .log_err(); + }, + DapConsoleEvent::Interrupt => { + crate::sys::control::handle_interrupt_request(); + }, + DapConsoleEvent::Restart => { + log::warn!("Jupyter DAP: Restart requested but not supported"); + }, + } + } +} + +// Jupyter Debug Protocol extensions +impl DapJupyterHandler { + /// Receive cell source code and write it to a temporary file so the + /// debugger can set breakpoints in it. + /// + /// https://jupyter-client.readthedocs.io/en/latest/messaging.html#dumpcell + fn handle_dump_cell( + &self, + seq: i64, + request: &serde_json::Value, + ) -> anyhow::Result { + let code = request + .get("arguments") + .and_then(|a| a.get("code")) + .and_then(|c| c.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing `code` in dumpCell arguments"))?; + + let source_path = dap_notebook::notebook_source_path(code); + + std::fs::create_dir_all(self.tmp_file_prefix)?; + std::fs::write(&source_path, code)?; + + log::trace!("Jupyter DAP: Dumped cell to {source_path}"); + + Ok(self.success_response( + seq, + "dumpCell", + serde_json::json!({ "sourcePath": source_path }), + )) + } + + /// Return debug state so the frontend can restore breakpoints and configure + /// source mapping after (re)connecting to the kernel. + /// + /// https://jupyter-client.readthedocs.io/en/latest/messaging.html#debuginfo + fn handle_debug_info(&self, seq: i64) -> anyhow::Result { + let state = self.handler.state.lock().unwrap(); + + let stopped_threads: Vec = if state.is_debugging { + vec![THREAD_ID] + } else { + vec![] + }; + + let breakpoints: Vec = state + .breakpoints + .iter() + .map(|(uri, (_, bps))| { + let source = uri + .as_url() + .to_file_path() + .map_or_else(|_| uri.to_string(), |p| p.to_string_lossy().into_owned()); + let source_breakpoints: Vec = bps + .iter() + .filter(|bp| !matches!(bp.state, BreakpointState::Disabled)) + .map(|bp| { + let mut obj = serde_json::json!({ + "line": Breakpoint::to_dap_line(bp.original_line), + }); + if let Some(cond) = &bp.condition { + obj["condition"] = serde_json::json!(cond); + } + if let Some(msg) = &bp.log_message { + obj["logMessage"] = serde_json::json!(msg); + } + if let Some(hit) = &bp.hit_condition { + obj["hitCondition"] = serde_json::json!(hit); + } + obj + }) + .collect(); + serde_json::json!({ + "source": source, + "breakpoints": source_breakpoints, + }) + }) + .collect(); + + Ok(self.success_response( + seq, + "debugInfo", + serde_json::json!({ + "isStarted": true, + "hashMethod": "Murmur2", + "hashSeed": dap_notebook::hash_seed(), + "tmpFilePrefix": self.tmp_file_prefix, + "tmpFileSuffix": dap_notebook::tmp_file_suffix(), + "breakpoints": breakpoints, + "stoppedThreads": stopped_threads, + "richRendering": false, + "exceptionPaths": [], + }), + )) + } +} + +// Serialization helpers +impl DapJupyterHandler { + fn send_dap_event(&self, event: dap::events::Event) { + let msg = BaseMessage { + seq: self.next_seq(), + message: Sendable::Event(event), + }; + + let json = match serde_json::to_value(&msg) { + Ok(json) => json, + Err(err) => { + log::error!("Jupyter DAP: Failed to serialize event: {err:?}"); + return; + }, + }; + + self.iopub_tx + .send(IOPubMessage::DebugEvent(DebugEvent { content: json })) + .log_err(); + } + + fn response_to_json(&self, response: dap::responses::Response) -> serde_json::Value { + let msg = BaseMessage { + seq: self.next_seq(), + message: Sendable::Response(response), + }; + + match serde_json::to_value(&msg) { + Ok(json) => json, + Err(err) => { + log::error!("Jupyter DAP: Failed to serialize response: {err:?}"); + serde_json::json!({ + "seq": self.next_seq(), + "type": "response", + "success": false, + "message": format!("Internal serialization error: {err}"), + }) + }, + } + } + + fn success_response( + &self, + request_seq: i64, + command: &str, + body: serde_json::Value, + ) -> serde_json::Value { + serde_json::json!({ + "seq": self.next_seq(), + "type": "response", + "request_seq": request_seq, + "success": true, + "command": command, + "body": body, + }) + } + + fn error_response(&self, request_seq: i64, command: &str, message: &str) -> serde_json::Value { + log::warn!("Jupyter DAP: Error for `{command}`: {message}"); + serde_json::json!({ + "seq": self.next_seq(), + "type": "response", + "request_seq": request_seq, + "success": false, + "command": command, + "message": message, + }) + } +} diff --git a/crates/ark/src/dap/dap_notebook.rs b/crates/ark/src/dap/dap_notebook.rs new file mode 100644 index 000000000..9e5cd8833 --- /dev/null +++ b/crates/ark/src/dap/dap_notebook.rs @@ -0,0 +1,187 @@ +// +// dap_notebook.rs +// +// Copyright (C) 2026 Posit Software, PBC. All rights reserved. +// +// + +// Shared helpers for mapping notebook cell code to temporary source files. +// +// Both `DapJupyterHandler` (which handles `dumpCell` / `debugInfo`) and the +// console REPL (which needs to look up breakpoints for executed cell code) +// must agree on how cell source code maps to file paths. This module +// centralises that logic. + +use std::sync::LazyLock; + +use amalthea::wire::execute_request::CodeLocation; +use amalthea::wire::execute_request::Position; +use url::Url; + +const HASH_SEED: u32 = 0; +const TMP_FILE_SUFFIX: &str = ".r"; + +/// The temporary file prefix, cached at first access to avoid `TMPDIR` +/// instability. On macOS, R may unset `TMPDIR` during startup, causing +/// `std::env::temp_dir()` to return `/tmp` instead of the per-session +/// `/var/folders/.../T/` directory. +static TMP_FILE_PREFIX: LazyLock = LazyLock::new(|| { + let mut tmp_dir = std::env::temp_dir(); + let pid = std::process::id(); + tmp_dir.push(format!("ark-debug-{pid}")); + // Trailing separator so the prefix can be concatenated directly with + // the hash and suffix (e.g. `{prefix}{hash}.r`). + format!("{}/", tmp_dir.display()) +}); + +/// The temporary file prefix used for notebook debug source files. +/// +/// Deterministic for a given process: `{tmp_dir}/ark-debug-{pid}/`. +/// Reported to the frontend via `debugInfo` so that the `PathEncoder` +/// on the client side can independently produce the same paths. +pub fn tmp_file_prefix() -> &'static str { + &TMP_FILE_PREFIX +} + +pub fn hash_seed() -> u32 { + HASH_SEED +} + +pub fn tmp_file_suffix() -> &'static str { + TMP_FILE_SUFFIX +} + +/// Compute the temporary source file path for a piece of cell code. +/// +/// This produces the same path that `dumpCell` writes to, allowing the +/// console REPL to look up breakpoints for notebook cells without a +/// `code_location` in the `execute_request`. +pub fn notebook_source_path(code: &str) -> String { + let prefix = tmp_file_prefix(); + let hash = murmur2(code.as_bytes(), HASH_SEED); + format!("{}{hash}{TMP_FILE_SUFFIX}", prefix) +} + +/// Synthesize a [`CodeLocation`] pointing to the notebook temp file for a +/// cell chunk. +/// +/// The range spans the entire code starting at (0, 0). This gives +/// `annotate_input()` the file URI it needs for the `#line` directive and +/// breakpoint injection. +pub fn notebook_code_location(code: &str) -> Option { + let path = notebook_source_path(code); + let uri = Url::from_file_path(&path).ok()?; + + let lines: Vec<&str> = code.split('\n').collect(); + let last_line = lines.last().unwrap_or(&""); + let end_line = if lines.is_empty() { 0 } else { lines.len() - 1 }; + + Some(CodeLocation { + uri, + start: Position { + line: 0, + character: 0, + }, + end: Position { + line: end_line as u32, + character: last_line.len() as u32, + }, + }) +} + +/// MurmurHash2 implementation for computing deterministic temp file paths +/// from cell source code. +/// +/// This must match the client-side `PathEncoder` in +/// `positron-runtime-debugger` so that the notebook location mapper can +/// correlate cell URIs with runtime source paths. +pub fn murmur2(data: &[u8], seed: u32) -> u32 { + const M: u32 = 0x5bd1e995; + const R: u32 = 24; + + let len = data.len(); + let mut h: u32 = seed ^ (len as u32); + + let mut i = 0; + while i + 4 <= len { + let mut k = u32::from_le_bytes([data[i], data[i + 1], data[i + 2], data[i + 3]]); + k = k.wrapping_mul(M); + k ^= k >> R; + k = k.wrapping_mul(M); + + h = h.wrapping_mul(M); + h ^= k; + + i += 4; + } + + let remaining = len - i; + if remaining >= 3 { + h ^= (data[i + 2] as u32) << 16; + } + if remaining >= 2 { + h ^= (data[i + 1] as u32) << 8; + } + if remaining >= 1 { + h ^= data[i] as u32; + h = h.wrapping_mul(M); + } + + h ^= h >> 13; + h = h.wrapping_mul(M); + h ^= h >> 15; + + h +} + +#[cfg(test)] +mod tests { + use super::murmur2; + use super::notebook_source_path; + use super::tmp_file_prefix; + use super::HASH_SEED; + use super::TMP_FILE_SUFFIX; + + #[test] + fn test_murmur2_empty() { + assert_eq!(murmur2(b"", 0), 0); + } + + #[test] + fn test_murmur2_deterministic() { + let hash1 = murmur2(b"x <- 1 + 1\nprint(x)", 42); + let hash2 = murmur2(b"x <- 1 + 1\nprint(x)", 42); + assert_eq!(hash1, hash2); + } + + #[test] + fn test_murmur2_seed_varies() { + let hash1 = murmur2(b"test", 0); + let hash2 = murmur2(b"test", 1); + assert_ne!(hash1, hash2); + } + + #[test] + fn test_murmur2_content_varies() { + let hash1 = murmur2(b"cell_a", 0); + let hash2 = murmur2(b"cell_b", 0); + assert_ne!(hash1, hash2); + } + + #[test] + fn test_notebook_source_path_deterministic() { + let code = "x <- 1 + 1\nprint(x)"; + let path1 = notebook_source_path(code); + let path2 = notebook_source_path(code); + assert_eq!(path1, path2); + } + + #[test] + fn test_notebook_source_path_format() { + let code = "print(1)"; + let path = notebook_source_path(code); + let prefix = tmp_file_prefix(); + let hash = murmur2(code.as_bytes(), HASH_SEED); + assert_eq!(path, format!("{prefix}{hash}{TMP_FILE_SUFFIX}")); + } +} diff --git a/crates/ark/src/dap/dap_server.rs b/crates/ark/src/dap/dap_server.rs index d22143934..52224d80e 100644 --- a/crates/ark/src/dap/dap_server.rs +++ b/crates/ark/src/dap/dap_server.rs @@ -38,10 +38,10 @@ use super::dap_state::Breakpoint; use super::dap_state::BreakpointState; use super::dap_state::Dap; use super::dap_state::DapBackendEvent; +use super::dap_state::THREAD_ID; use crate::console::Console; use crate::console::FrameInfo; use crate::console::FrameSource; -use crate::dap::dap_state::DapExceptionEvent; use crate::dap::dap_variables::object_variables; use crate::dap::dap_variables::RVariable; use crate::r_task; @@ -51,8 +51,6 @@ use crate::request::DebugRequest; use crate::request::RRequest; use crate::url::UrlId; -const THREAD_ID: i64 = -1; - /// Sentinel expression sent by the frontend to notify the kernel that the user /// selected a different stack frame in the debugger UI. Subsequent console /// evaluations will run in that frame's environment. @@ -155,6 +153,11 @@ impl DapHandler { self.handle_step(args, DebugRequest::StepOut, ResponseBody::StepOut) }, Command::Pause(args) => self.handle_pause(args), + Command::ConfigurationDone => Ok(DapHandlerOutput { + body: ResponseBody::ConfigurationDone, + dap_events: vec![], + console_events: vec![], + }), _ => { log::warn!("DAP: Unknown request: {cmd:?}"); return DapOutput::error(req, "Ark DAP: Unknown request"); @@ -201,6 +204,7 @@ impl DapHandler { supports_conditional_breakpoints: Some(true), supports_hit_conditional_breakpoints: Some(true), supports_log_points: Some(true), + supports_configuration_done_request: Some(true), ..Default::default() }); Ok(DapHandlerOutput { @@ -436,6 +440,8 @@ impl DapHandler { } fn handle_attach(&self, _args: AttachRequestArguments) -> anyhow::Result { + self.state.lock().unwrap().is_connected = true; + Ok(DapHandlerOutput { body: ResponseBody::Attach, dap_events: vec![Event::Thread(ThreadEventBody { @@ -447,8 +453,11 @@ impl DapHandler { } fn handle_disconnect(&self, _args: DisconnectArguments) -> anyhow::Result { - // Only send `Q` if currently in a debugging session. - let is_debugging = { self.state.lock().unwrap().is_debugging }; + let mut state = self.state.lock().unwrap(); + let is_debugging = state.is_debugging; + state.is_connected = false; + drop(state); + let console_events = if is_debugging { vec![DapConsoleEvent::DebugCommand(DebugRequest::Quit)] } else { @@ -781,64 +790,7 @@ fn listen_dap_events( log::trace!("DAP: Got event from backend: {:?}", event); - let event = match event { - DapBackendEvent::Continued => { - Event::Continued(ContinuedEventBody { - thread_id: THREAD_ID, - all_threads_continued: Some(true) - }) - }, - - DapBackendEvent::Stopped => { - Event::Stopped(StoppedEventBody { - reason: StoppedEventReason::Step, - description: None, - thread_id: Some(THREAD_ID), - preserve_focus_hint: Some(false), - text: None, - all_threads_stopped: Some(true), - hit_breakpoint_ids: None, - }) - }, - - DapBackendEvent::Exception(DapExceptionEvent { class, message }) => { - let text = format!("<{class}>\n{message}"); - Event::Stopped(StoppedEventBody { - reason: StoppedEventReason::Exception, - description: Some(message), - thread_id: Some(THREAD_ID), - preserve_focus_hint: Some(false), - text: Some(text), - all_threads_stopped: Some(true), - hit_breakpoint_ids: None, - }) - }, - - DapBackendEvent::Invalidated => { - Event::Invalidated(InvalidatedEventBody { - areas: Some(vec![types::InvalidatedAreas::Variables]), - thread_id: Some(THREAD_ID), - stack_frame_id: None, - }) - }, - - DapBackendEvent::Terminated => { - Event::Terminated(None) - }, - - DapBackendEvent::BreakpointState { id, line, verified, message } => { - Event::Breakpoint(BreakpointEventBody { - reason: BreakpointEventReason::Changed, - breakpoint: dap::types::Breakpoint { - id: Some(id), - line: Some(Breakpoint::to_dap_line(line)), - verified, - message, - ..Default::default() - }, - }) - }, - }; + let event = event.into_dap_event(); let mut output = output.lock().unwrap(); if let Err(err) = output.send_event(event) { diff --git a/crates/ark/src/dap/dap_state.rs b/crates/ark/src/dap/dap_state.rs index d2433bd82..a74f7ca65 100644 --- a/crates/ark/src/dap/dap_state.rs +++ b/crates/ark/src/dap/dap_state.rs @@ -14,9 +14,16 @@ use amalthea::comm::server_comm::ServerStartMessage; use amalthea::comm::server_comm::ServerStartedMessage; use amalthea::language::server_handler::ServerHandler; use amalthea::socket::comm::CommOutgoingTx; +use amalthea::socket::iopub::IOPubMessage; +use amalthea::wire::debug_event::DebugEvent; use anyhow::anyhow; use crossbeam::channel::Sender; +use dap::base_message::BaseMessage; +use dap::base_message::Sendable; +use dap::events::Event; +use dap::events::*; use dap::responses::EvaluateResponse; +use dap::types; use dap::types::Variable; use harp::environment::R_ENVS; use harp::object::RObject; @@ -33,6 +40,9 @@ use crate::request::RRequest; use crate::thread::RThreadSafe; use crate::url::UrlId; +/// Thread ID used in DAP events. R is single-threaded so there's only one. +pub(crate) const THREAD_ID: i64 = -1; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum BreakpointState { Unverified, @@ -152,10 +162,75 @@ pub struct DapExceptionEvent { pub message: String, } +impl DapBackendEvent { + pub(crate) fn into_dap_event(self) -> Event { + match self { + DapBackendEvent::Continued => Event::Continued(ContinuedEventBody { + thread_id: THREAD_ID, + all_threads_continued: Some(true), + }), + + DapBackendEvent::Stopped => Event::Stopped(StoppedEventBody { + reason: types::StoppedEventReason::Step, + description: None, + thread_id: Some(THREAD_ID), + preserve_focus_hint: Some(false), + text: None, + all_threads_stopped: Some(true), + hit_breakpoint_ids: None, + }), + + DapBackendEvent::Exception(DapExceptionEvent { class, message }) => { + let text = format!("<{class}>\n{message}"); + Event::Stopped(StoppedEventBody { + reason: types::StoppedEventReason::Exception, + description: Some(message), + thread_id: Some(THREAD_ID), + preserve_focus_hint: Some(false), + text: Some(text), + all_threads_stopped: Some(true), + hit_breakpoint_ids: None, + }) + }, + + DapBackendEvent::Invalidated => Event::Invalidated(InvalidatedEventBody { + areas: Some(vec![types::InvalidatedAreas::Variables]), + thread_id: Some(THREAD_ID), + stack_frame_id: None, + }), + + DapBackendEvent::Terminated => Event::Terminated(None), + + DapBackendEvent::BreakpointState { + id, + line, + verified, + message, + } => Event::Breakpoint(BreakpointEventBody { + reason: types::BreakpointEventReason::Changed, + breakpoint: dap::types::Breakpoint { + id: Some(id), + line: Some(Breakpoint::to_dap_line(line)), + verified, + message, + ..Default::default() + }, + }), + } + } +} + pub struct Dap { /// Whether the REPL is stopped with a browser prompt. pub is_debugging: bool, + /// Whether R is stopped at an unexpected `browser()` prompt in notebook + /// mode (no active debug session). Mutually exclusive with `is_debugging`. + /// Used by the interrupt handler to decide whether to send a "Q" command. + /// `is_debugging` and `is_debugging_stdin` could be folded into a single + /// enum in the future. + pub is_debugging_stdin: bool, + /// Whether the DAP server is connected to a client. pub is_connected: bool, @@ -208,6 +283,13 @@ pub struct Dap { /// Channel for sending events to the comm frontend. comm_tx: Option, + /// IOPub sender for emitting DAP events as `debug_event` messages + /// in notebook debugging mode (Jupyter Debug Protocol path). + notebook_iopub_tx: Option>, + + /// Sequence counter for IOPub DAP event messages. + iopub_seq: i64, + /// Channel for sending debug commands to `read_console()` r_request_tx: Sender, @@ -220,6 +302,7 @@ impl Dap { pub fn new_shared(r_request_tx: Sender) -> Arc> { let state = Self { is_debugging: false, + is_debugging_stdin: false, is_connected: false, backend_events_tx: None, stack: None, @@ -231,6 +314,8 @@ impl Dap { current_variables_reference: 1, current_breakpoint_id: 1, comm_tx: None, + notebook_iopub_tx: None, + iopub_seq: 0, r_request_tx, shared_self: None, is_interrupting_for_debugger: false, @@ -266,24 +351,20 @@ impl Dap { log::trace!("DAP: Sending `start_debug` events"); + // Console debugging: ask frontend to show debug toolbar if let Some(comm_tx) = &self.comm_tx { - // Ask frontend to connect to the DAP comm_tx .send(amalthea::comm_rpc_message!("start_debug")) .log_err(); - - if let Some(dap_tx) = &self.backend_events_tx { - let event = match stopped_reason { - DebugStoppedReason::Step | DebugStoppedReason::Pause => { - DapBackendEvent::Stopped - }, - DebugStoppedReason::Condition { class, message } => { - DapBackendEvent::Exception(DapExceptionEvent { class, message }) - }, - }; - dap_tx.send(event).log_err(); - } } + + let event = match stopped_reason { + DebugStoppedReason::Step | DebugStoppedReason::Pause => DapBackendEvent::Stopped, + DebugStoppedReason::Condition { class, message } => { + DapBackendEvent::Exception(DapExceptionEvent { class, message }) + }, + }; + self.send_backend_event(event); } /// Notify the frontend that we've exited the debugger. @@ -307,27 +388,50 @@ impl Dap { let was_debugging = self.is_debugging; self.is_debugging = false; - if was_debugging && self.is_connected { + if was_debugging && (self.is_connected || self.notebook_iopub_tx.is_some()) { log::trace!("DAP: Sending `stop_debug` events"); if let Some(comm_tx) = &self.comm_tx { comm_tx .send(amalthea::comm_rpc_message!("stop_debug")) .log_err(); - - if let Some(datp_tx) = &self.backend_events_tx { - datp_tx.send(DapBackendEvent::Continued).log_err(); - } } - // else: If not connected to a frontend, the DAP client should - // have received a `Continued` event already, after a `n` - // command or similar. + + self.send_backend_event(DapBackendEvent::Continued); } } - pub fn send_invalidated(&self) { + pub fn send_invalidated(&mut self) { + self.send_backend_event(DapBackendEvent::Invalidated); + } + + pub fn set_iopub_tx(&mut self, tx: Sender) { + self.notebook_iopub_tx = Some(tx); + } + + fn send_backend_event(&mut self, event: DapBackendEvent) { if let Some(tx) = &self.backend_events_tx { - tx.send(DapBackendEvent::Invalidated).log_err(); + tx.send(event.clone()).log_err(); + } + + if let Some(tx) = &self.notebook_iopub_tx { + let dap_event = event.into_dap_event(); + self.iopub_seq += 1; + + let msg = BaseMessage { + seq: self.iopub_seq, + message: Sendable::Event(dap_event), + }; + + match serde_json::to_value(&msg) { + Ok(json) => { + tx.send(IOPubMessage::DebugEvent(DebugEvent { content: json })) + .log_err(); + }, + Err(err) => { + log::error!("DAP: Failed to serialize IOPub event: {err:?}"); + }, + } } } @@ -507,6 +611,10 @@ impl Dap { return; }; + // Collect events first: `bp_list` borrows from `self.breakpoints`, + // which prevents calling `&mut self` methods like `send_backend_event()`. + let mut events = Vec::new(); + for bp in bp_list.iter_mut() { // Verified and Disabled breakpoints are both already verified. // Invalid breakpoints never get verified so we skip them too. @@ -525,17 +633,18 @@ impl Dap { if line >= start_line && line < end_line { bp.state = BreakpointState::Verified; - if let Some(tx) = &self.backend_events_tx { - tx.send(DapBackendEvent::BreakpointState { - id: bp.id, - line: bp.line, - verified: true, - message: None, - }) - .log_err(); - } + events.push(DapBackendEvent::BreakpointState { + id: bp.id, + line: bp.line, + verified: true, + message: None, + }); } } + + for event in events { + self.send_backend_event(event); + } } /// Verify a single breakpoint by ID @@ -543,29 +652,29 @@ impl Dap { /// Finds the breakpoint with the given ID for the URI and marks it as verified /// if it was previously unverified. Sends a `BreakpointVerified` event. pub fn verify_breakpoint(&mut self, uri: &UrlId, id: &str) { - let Some((_, bp_list)) = self.breakpoints.get_mut(uri) else { - return; - }; - let Some(bp) = bp_list.iter_mut().find(|bp| bp.id.to_string() == id) else { - return; - }; + let event = { + let Some((_, bp_list)) = self.breakpoints.get_mut(uri) else { + return; + }; + let Some(bp) = bp_list.iter_mut().find(|bp| bp.id.to_string() == id) else { + return; + }; - // Only verify unverified breakpoints - if !matches!(bp.state, BreakpointState::Unverified) { - return; - } + if !matches!(bp.state, BreakpointState::Unverified) { + return; + } - bp.state = BreakpointState::Verified; + bp.state = BreakpointState::Verified; - if let Some(tx) = &self.backend_events_tx { - tx.send(DapBackendEvent::BreakpointState { + DapBackendEvent::BreakpointState { id: bp.id, line: bp.line, verified: true, message: None, - }) - .log_err(); - } + } + }; + + self.send_backend_event(event); } /// Called when a document changes. Removes all breakpoints for the URI @@ -578,42 +687,40 @@ impl Dap { }; log::trace!("DAP: Removing {} breakpoints for {uri}", breakpoints.len()); - let Some(tx) = &self.backend_events_tx else { - return; - }; for bp in breakpoints { - tx.send(DapBackendEvent::BreakpointState { + self.send_backend_event(DapBackendEvent::BreakpointState { id: bp.id, line: bp.line, verified: false, message: None, - }) - .log_err(); + }); } } /// Notify the frontend about breakpoints that were marked invalid during annotation. /// Sends a `BreakpointState` event with verified=false and a message for each. - pub fn notify_invalid_breakpoints(&self, uri: &UrlId) { - let Some(tx) = &self.backend_events_tx else { - return; - }; - let Some((_, breakpoints)) = self.breakpoints.get(uri) else { - return; - }; - - for bp in breakpoints { - let BreakpointState::Invalid(reason) = &bp.state else { - continue; - }; - tx.send(DapBackendEvent::BreakpointState { - id: bp.id, - line: bp.line, - verified: false, - message: Some(reason.message().to_string()), + pub fn notify_invalid_breakpoints(&mut self, uri: &UrlId) { + let events: Vec<_> = self + .breakpoints + .get(uri) + .into_iter() + .flat_map(|(_, breakpoints)| breakpoints) + .filter_map(|bp| { + let BreakpointState::Invalid(reason) = &bp.state else { + return None; + }; + Some(DapBackendEvent::BreakpointState { + id: bp.id, + line: bp.line, + verified: false, + message: Some(reason.message().to_string()), + }) }) - .log_err(); + .collect(); + + for event in events { + self.send_backend_event(event); } } @@ -766,6 +873,9 @@ mod tests { current_breakpoint_id: 1, is_interrupting_for_debugger: false, comm_tx: None, + notebook_iopub_tx: None, + iopub_seq: 0, + is_debugging_stdin: false, r_request_tx, shared_self: None, }; @@ -877,6 +987,9 @@ mod tests { current_breakpoint_id: 1, is_interrupting_for_debugger: false, comm_tx: None, + notebook_iopub_tx: None, + iopub_seq: 0, + is_debugging_stdin: false, r_request_tx, shared_self: None, }; diff --git a/crates/ark/src/shell.rs b/crates/ark/src/shell.rs index 157f56f96..fe8dca871 100644 --- a/crates/ark/src/shell.rs +++ b/crates/ark/src/shell.rs @@ -45,6 +45,7 @@ use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; use crate::ark_comm::ArkComm; use crate::console::Console; use crate::console::KernelInfo; +use crate::console::SessionMode; use crate::data_explorer::r_data_explorer::DATA_EXPLORER_COMM_NAME; use crate::help::r_help::RHelp; use crate::help_proxy; @@ -144,14 +145,20 @@ impl ShellHandler for Shell { continuation_prompt: kernel_info.continuation_prompt.clone(), }), }; + let mut supported_features = vec![String::from("debugger")]; + if matches!(kernel_info.session_mode, SessionMode::Notebook) { + supported_features.push(String::from("proactive breakpoints")); + } + Ok(KernelInfoReply { status: Status::Ok, banner: kernel_info.banner.clone(), - debugger: false, + debugger: true, help_links: Vec::new(), language_info: info, implementation: String::from("ark"), implementation_version: String::from(env!("CARGO_PKG_VERSION")), + supported_features, }) } diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs index f0e3dfab8..17dd90e17 100644 --- a/crates/ark/src/start.rs +++ b/crates/ark/src/start.rs @@ -95,7 +95,12 @@ pub fn start_kernel( // Create the control handler; this is used to handle shutdown/interrupt and // related requests - let control = Arc::new(Mutex::new(Control::new(r_request_tx.clone()))); + let control = Box::new(Control::new( + r_request_tx.clone(), + dap.clone(), + iopub_tx.clone(), + session_mode, + )); // Create the stream behavior; this determines whether the kernel should // capture stdout/stderr and send them to the frontend as IOPub messages diff --git a/crates/ark/tests/dap_notebook.rs b/crates/ark/tests/dap_notebook.rs new file mode 100644 index 000000000..dd665afc3 --- /dev/null +++ b/crates/ark/tests/dap_notebook.rs @@ -0,0 +1,853 @@ +// +// dap_notebook.rs +// +// Copyright (C) 2026 Posit Software, PBC. All rights reserved. +// +// + +use amalthea::fixtures::dummy_frontend::ExecuteRequestOptions; +use amalthea::wire::jupyter_message::Message; +use ark_test::DummyArkFrontendNotebook; +use ark_test::IopubExpectation; + +fn find_debug_event<'a>(msgs: &'a [Message], event: &str) -> &'a serde_json::Value { + msgs.iter() + .find_map(|m| match m { + Message::DebugEvent(data) if data.content.content["event"] == event => { + Some(&data.content.content) + }, + _ => None, + }) + .unwrap_or_else(|| panic!("No DebugEvent with event={event:?} found")) +} + +#[test] +fn test_notebook_debug_info() { + let frontend = DummyArkFrontendNotebook::lock(); + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "debugInfo", + "arguments": {} + })); + frontend.recv_iopub_busy(); + let reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + assert_eq!(reply["success"], true); + assert_eq!(reply["body"]["isStarted"], true); + assert_eq!(reply["body"]["hashMethod"], "Murmur2"); + assert_eq!(reply["body"]["hashSeed"], 0); + let prefix = reply["body"]["tmpFilePrefix"].as_str().unwrap(); + assert!(prefix.contains("ark-debug-")); + assert_eq!(reply["body"]["tmpFileSuffix"], ".r"); +} + +#[test] +fn test_notebook_dump_cell() { + let frontend = DummyArkFrontendNotebook::lock(); + + let code = "x <- 1\nprint(x)"; + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": code } + })); + frontend.recv_iopub_busy(); + let reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + assert_eq!(reply["success"], true); + let source_path = reply["body"]["sourcePath"].as_str().unwrap(); + assert!(source_path.contains("ark-debug-")); + assert!(source_path.ends_with(".r")); + + // File should actually exist on disk with the cell contents + assert!(std::path::Path::new(source_path).exists()); + assert_eq!(std::fs::read_to_string(source_path).unwrap(), code); +} + +#[test] +fn test_notebook_dump_cell_deterministic() { + let frontend = DummyArkFrontendNotebook::lock(); + + let code = "x <- 42\ny <- x + 1"; + + // Dump the same cell code twice + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": code } + })); + frontend.recv_iopub_busy(); + let reply1 = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 2, + "command": "dumpCell", + "arguments": { "code": code } + })); + frontend.recv_iopub_busy(); + let reply2 = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + // Same code should produce the same source path (Murmur2 hash) + assert_eq!( + reply1["body"]["sourcePath"].as_str().unwrap(), + reply2["body"]["sourcePath"].as_str().unwrap() + ); +} + +#[test] +fn test_notebook_dump_cell_different_code() { + let frontend = DummyArkFrontendNotebook::lock(); + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": "cell_a" } + })); + frontend.recv_iopub_busy(); + let reply1 = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 2, + "command": "dumpCell", + "arguments": { "code": "cell_b" } + })); + frontend.recv_iopub_busy(); + let reply2 = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + // Different code should produce different paths + assert_ne!( + reply1["body"]["sourcePath"].as_str().unwrap(), + reply2["body"]["sourcePath"].as_str().unwrap() + ); +} + +#[test] +fn test_notebook_configuration_done() { + let frontend = DummyArkFrontendNotebook::lock(); + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "configurationDone", + "arguments": {} + })); + frontend.recv_iopub_busy(); + let reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + assert_eq!(reply["success"], true); + assert_eq!(reply["command"], "configurationDone"); +} + +#[test] +fn test_notebook_dump_cell_then_set_breakpoints() { + let frontend = DummyArkFrontendNotebook::lock(); + + let code = "x <- 1\ny <- 2\nz <- x + y"; + + // Dump the cell to a temp file + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": code } + })); + frontend.recv_iopub_busy(); + let dump_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + let source_path = dump_reply["body"]["sourcePath"].as_str().unwrap(); + + // Set breakpoints on the dumped file + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 2, + "command": "setBreakpoints", + "arguments": { + "source": { "path": source_path }, + "breakpoints": [{ "line": 2 }] + } + })); + frontend.recv_iopub_busy(); + let bp_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + assert_eq!(bp_reply["success"], true); + let breakpoints = bp_reply["body"]["breakpoints"].as_array().unwrap(); + assert_eq!(breakpoints.len(), 1); + assert_eq!(breakpoints[0]["line"], 2); +} + +#[test] +fn test_notebook_set_multiple_breakpoints() { + let frontend = DummyArkFrontendNotebook::lock(); + + let code = "a <- 1\nb <- 2\nc <- 3\nd <- 4"; + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": code } + })); + frontend.recv_iopub_busy(); + let dump_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + let source_path = dump_reply["body"]["sourcePath"].as_str().unwrap(); + + // Set breakpoints on lines 2 and 4 + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 2, + "command": "setBreakpoints", + "arguments": { + "source": { "path": source_path }, + "breakpoints": [{ "line": 2 }, { "line": 4 }] + } + })); + frontend.recv_iopub_busy(); + let bp_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + assert_eq!(bp_reply["success"], true); + let breakpoints = bp_reply["body"]["breakpoints"].as_array().unwrap(); + assert_eq!(breakpoints.len(), 2); + assert_eq!(breakpoints[0]["line"], 2); + assert_eq!(breakpoints[1]["line"], 4); +} + +#[test] +fn test_notebook_clear_breakpoints() { + let frontend = DummyArkFrontendNotebook::lock(); + + let code = "x <- 1\ny <- 2"; + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": code } + })); + frontend.recv_iopub_busy(); + let dump_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + let source_path = dump_reply["body"]["sourcePath"].as_str().unwrap(); + + // Set a breakpoint + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 2, + "command": "setBreakpoints", + "arguments": { + "source": { "path": source_path }, + "breakpoints": [{ "line": 2 }] + } + })); + frontend.recv_iopub_busy(); + frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + // Clear breakpoints by sending an empty list + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 3, + "command": "setBreakpoints", + "arguments": { + "source": { "path": source_path }, + "breakpoints": [] + } + })); + frontend.recv_iopub_busy(); + let bp_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + assert_eq!(bp_reply["success"], true); + let breakpoints = bp_reply["body"]["breakpoints"].as_array().unwrap(); + assert!(breakpoints.is_empty()); +} + +#[test] +fn test_notebook_execute_with_cell_id() { + let frontend = DummyArkFrontendNotebook::lock(); + + // Execute a cell with `cellId` in metadata (regression: shouldn't crash) + frontend.send_execute_request_with_metadata( + "42", + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "test-cell-1" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + assert_eq!(frontend.recv_iopub_execute_result(), "[1] 42"); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +#[test] +fn test_notebook_execute_multiline_with_cell_id() { + let frontend = DummyArkFrontendNotebook::lock(); + + let code = "x <- 10\ny <- 20\nx + y"; + frontend.send_execute_request_with_metadata( + code, + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "test-cell-2" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + assert_eq!(frontend.recv_iopub_execute_result(), "[1] 30"); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +#[test] +fn test_notebook_initialize_via_jupyter_debug() { + let frontend = DummyArkFrontendNotebook::lock(); + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "initialize", + "arguments": { + "clientID": "test", + "adapterID": "test", + "pathFormat": "path", + "linesStartAt1": true, + "columnsStartAt1": true, + "supportsRunInTerminalRequest": false + } + })); + frontend.recv_iopub_busy(); + + // `initialize` produces an `Initialized` event on IOPub + let event = frontend.recv_iopub_debug_event(); + assert_eq!(event["type"], "event"); + assert_eq!(event["event"], "initialized"); + + let reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + assert_eq!(reply["success"], true); + assert_eq!(reply["command"], "initialize"); + + // Capabilities should be present + assert!(reply["body"]["supportsRestartRequest"].as_bool().unwrap()); +} + +#[test] +fn test_notebook_unknown_dap_command() { + let frontend = DummyArkFrontendNotebook::lock(); + + // Sending a command with invalid structure should get an error response + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "nonexistentCommand", + "arguments": {} + })); + frontend.recv_iopub_busy(); + let reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + assert_eq!(reply["success"], false); +} + +#[test] +fn test_notebook_breakpoint_stops_execution() { + let frontend = DummyArkFrontendNotebook::lock(); + + let fn_code = "fn <- function() {\n x <- 1\n x <- 2\n x <- 3\n x\n}"; + + // Dump cell and set a breakpoint at line 3 (x <- 2) + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": fn_code } + })); + frontend.recv_iopub_busy(); + let dump_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + let source_path = dump_reply["body"]["sourcePath"] + .as_str() + .unwrap() + .to_string(); + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 2, + "command": "setBreakpoints", + "arguments": { + "source": { "path": &source_path }, + "breakpoints": [{ "line": 3 }] + } + })); + frontend.recv_iopub_busy(); + frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + // Attach sets is_connected = true so breakpoints fire + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 3, + "command": "attach", + "arguments": { "request": "attach", "type": "notebook" } + })); + frontend.recv_iopub_busy(); + // attach produces a Thread started event + let event = frontend.recv_iopub_debug_event(); + assert_eq!(event["event"], "thread"); + frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + // Define the function (breakpoints get injected into the body) + frontend.send_execute_request_with_metadata( + fn_code, + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-def" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + // Breakpoint gets verified when the function body is parsed + let bp_event = frontend.recv_iopub_debug_event(); + assert_eq!(bp_event["event"], "breakpoint"); + assert_eq!(bp_event["body"]["breakpoint"]["verified"], true); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Call the function — should hit breakpoint and kernel stays busy + frontend.send_execute_request_with_metadata( + "fn()", + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-call" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + // Stopped event arrives on IOPub (kernel paused at breakpoint) + let stopped = frontend.recv_iopub_debug_event(); + assert_eq!(stopped["event"], "stopped"); + + // Shell reply hasn't arrived — kernel is still busy + assert!(!frontend.shell_socket.poll_incoming(200).unwrap()); + + // Send "continue" via the debug channel + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 4, + "command": "continue", + "arguments": { "threadId": -1 } + })); + frontend.recv_debug_reply(); + + // Shell reply arrives now — kernel unblocked after continue + frontend.recv_shell_execute_reply(); + + // IOPub messages from the control thread (busy/idle for the debug_request) + // and the R thread (debug_event Continued, execute_request idle) arrive in + // unpredictable order since they originate from different threads. + let msgs = frontend.recv_iopub_interleaved(&[ + // Control thread: debug_request busy/idle + &[IopubExpectation::BusyControl, IopubExpectation::IdleControl], + // R thread: continued event, execute result, then execution idle + &[ + IopubExpectation::DebugEvent, + IopubExpectation::ExecuteResult, + IopubExpectation::IdleShell, + ], + ]); + find_debug_event(&msgs, "continued"); + + // Disconnect to reset is_connected for other tests + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 5, + "command": "disconnect", + "arguments": { "restart": false } + })); + frontend.recv_debug_reply(); + // Only the control thread sends IOPub messages here (no R-thread side effects) + frontend.recv_iopub_busy(); + frontend.recv_iopub_idle(); +} + +#[test] +#[cfg_attr(target_os = "windows", ignore)] +fn test_notebook_interrupt_at_breakpoint_exits_debugger() { + let frontend = DummyArkFrontendNotebook::lock(); + + let fn_code = "fn3 <- function() {\n x <- 1\n x <- 2\n x <- 3\n x\n}"; + + // Dump cell and set a breakpoint at line 3 (x <- 2) + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": fn_code } + })); + frontend.recv_iopub_busy(); + let dump_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + let source_path = dump_reply["body"]["sourcePath"] + .as_str() + .unwrap() + .to_string(); + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 2, + "command": "setBreakpoints", + "arguments": { + "source": { "path": &source_path }, + "breakpoints": [{ "line": 3 }] + } + })); + frontend.recv_iopub_busy(); + frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + // Attach so breakpoints fire + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 3, + "command": "attach", + "arguments": { "request": "attach", "type": "notebook" } + })); + frontend.recv_iopub_busy(); + frontend.recv_iopub_debug_event(); // thread started + frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + // Define the function + frontend.send_execute_request_with_metadata( + fn_code, + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-def-int" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + let bp_event = frontend.recv_iopub_debug_event(); + assert_eq!(bp_event["event"], "breakpoint"); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Call the function — hits breakpoint, kernel stays busy + frontend.send_execute_request_with_metadata( + "fn3()", + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-call-int" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + let stopped = frontend.recv_iopub_debug_event(); + assert_eq!(stopped["event"], "stopped"); + + // Shell reply hasn't arrived — kernel is paused + assert!(!frontend.shell_socket.poll_incoming(200).unwrap()); + + // Send interrupt — in notebook mode this should exit the debugger + frontend.send_interrupt_request(); + frontend.recv_control_interrupt_reply(); + + // Shell reply arrives — kernel unblocked by the Q command + frontend.recv_shell_execute_reply(); + + // IOPub messages from the control thread (interrupt busy/idle) and + // R thread (debug_event Continued, execute_request idle) race. + let msgs = frontend.recv_iopub_interleaved(&[ + // Control thread: interrupt_request busy/idle + &[IopubExpectation::BusyControl, IopubExpectation::IdleControl], + // R thread: continued event, then execution idle + &[IopubExpectation::DebugEvent, IopubExpectation::IdleShell], + ]); + find_debug_event(&msgs, "continued"); + + // Disconnect + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 4, + "command": "disconnect", + "arguments": { "restart": false } + })); + frontend.recv_debug_reply(); + // Only the control thread sends IOPub messages here (no R-thread side effects) + frontend.recv_iopub_busy(); + frontend.recv_iopub_idle(); +} + +#[test] +fn test_notebook_unexpected_browser_routes_to_stdin() { + let frontend = DummyArkFrontendNotebook::lock(); + + // Execute code that calls browser() directly — no debug session active. + // `browser(); 42` is split into two pending expressions. After quitting + // the browser, the second expression `42` is evaluated and produces a result. + frontend.send_execute_request_with_metadata( + "browser(); 42", + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-browser-stdin" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + // The browser prompt is routed to stdin since no debug session is connected + let prompt = frontend.recv_stdin_input_request(); + assert!( + prompt.contains("Browse"), + "Expected Browse prompt, got: {prompt}" + ); + + // User types "Q" to quit the browser + frontend.send_stdin_input_reply(String::from("Q")); + + // The remaining expression `42` produces a result + assert_eq!(frontend.recv_iopub_execute_result(), "[1] 42"); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +#[test] +fn test_notebook_unexpected_browser_continue_via_stdin() { + let frontend = DummyArkFrontendNotebook::lock(); + + // Define a function with browser() inside — no debug session active + frontend.send_execute_request_with_metadata( + "fn_stdin <- function() {\n x <- 1\n browser()\n x <- 42\n x\n}", + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-browser-def" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Call the function — browser() fires, routed to stdin + frontend.send_execute_request_with_metadata( + "fn_stdin()", + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-browser-call" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + let prompt = frontend.recv_stdin_input_request(); + assert!( + prompt.contains("Browse"), + "Expected Browse prompt, got: {prompt}" + ); + + // User types "c" to continue — function runs to completion + frontend.send_stdin_input_reply(String::from("c")); + + // Function returns 42 + assert_eq!(frontend.recv_iopub_execute_result(), "[1] 42"); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +#[test] +#[cfg_attr(target_os = "windows", ignore)] +fn test_notebook_unexpected_browser_interrupt_via_stdin() { + let frontend = DummyArkFrontendNotebook::lock(); + + // Define a function that enters browser() — no debug session active + frontend.send_execute_request_with_metadata( + "fn_stdin_int <- function() {\n browser()\n 42\n}", + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-browser-int-def" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Call the function — browser() fires, routed to stdin + frontend.send_execute_request_with_metadata( + "fn_stdin_int()", + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-browser-int-call" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + let prompt = frontend.recv_stdin_input_request(); + assert!( + prompt.contains("Browse"), + "Expected Browse prompt, got: {prompt}" + ); + + // Shell reply hasn't arrived — kernel is waiting for stdin input + assert!(!frontend.shell_socket.poll_incoming(200).unwrap()); + + // Send interrupt — should exit the browser via Q + frontend.send_interrupt_request(); + frontend.recv_control_interrupt_reply(); + + // IOPub messages from the control thread (interrupt busy/idle) and + // R thread (execute_request idle) race. + frontend.recv_iopub_interleaved(&[ + // Control thread: interrupt_request busy/idle + &[IopubExpectation::BusyControl, IopubExpectation::IdleControl], + // R thread: execution idle + &[IopubExpectation::IdleShell], + ]); + + // Execution completes — the interrupt exited the browser + frontend.recv_shell_execute_reply(); +} + +#[test] +fn test_notebook_breakpoints_inert_without_attach() { + let frontend = DummyArkFrontendNotebook::lock(); + + let fn_code = "fn2 <- function() {\n x <- 1\n x <- 2\n x <- 3\n invisible(x)\n}"; + + // Dump cell and set a breakpoint — but do NOT attach + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": fn_code } + })); + frontend.recv_iopub_busy(); + let dump_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + let source_path = dump_reply["body"]["sourcePath"] + .as_str() + .unwrap() + .to_string(); + + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 2, + "command": "setBreakpoints", + "arguments": { + "source": { "path": &source_path }, + "breakpoints": [{ "line": 3 }] + } + })); + frontend.recv_iopub_busy(); + frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + // Define the function (breakpoints are injected but won't fire) + frontend.send_execute_request_with_metadata( + fn_code, + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-def-inert" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + // Breakpoint gets verified when the function body is parsed + let bp_event = frontend.recv_iopub_debug_event(); + assert_eq!(bp_event["event"], "breakpoint"); + assert_eq!(bp_event["body"]["breakpoint"]["verified"], true); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Call the function — should complete normally (breakpoint is inert) + frontend.send_execute_request_with_metadata( + "fn2()", + ExecuteRequestOptions::default(), + serde_json::json!({ "cellId": "cell-call-inert" }), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + // No Stopped event — execution completes without stopping + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +#[test] +fn test_notebook_debug_info_reports_breakpoints() { + let frontend = DummyArkFrontendNotebook::lock(); + + let code = "a <- 1\nb <- 2\nc <- 3"; + + // Dump a cell + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 1, + "command": "dumpCell", + "arguments": { "code": code } + })); + frontend.recv_iopub_busy(); + let dump_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + let source_path = dump_reply["body"]["sourcePath"].as_str().unwrap(); + + // Set two breakpoints + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 2, + "command": "setBreakpoints", + "arguments": { + "source": { "path": source_path }, + "breakpoints": [ + { "line": 1 }, + { "line": 3, "condition": "c > 0" }, + ] + } + })); + frontend.recv_iopub_busy(); + frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + // Query debugInfo and verify breakpoints are reported + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 3, + "command": "debugInfo", + "arguments": {} + })); + frontend.recv_iopub_busy(); + let info_reply = frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); + + assert_eq!(info_reply["success"], true); + let bp_groups = info_reply["body"]["breakpoints"].as_array().unwrap(); + + // Find the group for our source file + let group = bp_groups + .iter() + .find(|g| g["source"].as_str().unwrap().contains("ark-debug-")) + .expect("No breakpoint group found for dumped cell"); + + let bps = group["breakpoints"].as_array().unwrap(); + assert_eq!(bps.len(), 2); + assert_eq!(bps[0]["line"], 1); + assert_eq!(bps[1]["line"], 3); + assert_eq!(bps[1]["condition"], "c > 0"); + + // Clean up: clear breakpoints + frontend.send_debug_request(serde_json::json!({ + "type": "request", + "seq": 4, + "command": "setBreakpoints", + "arguments": { + "source": { "path": source_path }, + "breakpoints": [] + } + })); + frontend.recv_iopub_busy(); + frontend.recv_debug_reply(); + frontend.recv_iopub_idle(); +} diff --git a/crates/ark/tests/dap_step.rs b/crates/ark/tests/dap_step.rs index c471d25fe..831cf6fc4 100644 --- a/crates/ark/tests/dap_step.rs +++ b/crates/ark/tests/dap_step.rs @@ -138,6 +138,51 @@ fn test_dap_continue() { dap.recv_continued(); } +/// In console mode, interrupt at a breakpoint sends SIGINT (not Q). +/// The debugger should remain active after the interrupt. +#[test] +#[cfg_attr(target_os = "windows", ignore)] +fn test_dap_interrupt_at_breakpoint_stays_in_debugger() { + let frontend = DummyArkFrontend::lock(); + let mut dap = frontend.start_dap(); + + let file = frontend.send_source( + " +{ + browser() + Sys.sleep(10) +} +", + ); + dap.recv_stopped(); + + // Verify we're at the browser() call + let stack = dap.stack_trace(); + assert_file_frame(&stack[0], &file.filename, 3, 12); + + // Send interrupt via the Jupyter control channel. + // In console mode this sends SIGINT, not Q. + frontend.send_interrupt_request(); + frontend.recv_control_interrupt_reply(); + + // Consume the interrupt request's IOPub busy/idle + frontend.recv_iopub_busy(); + frontend.recv_iopub_idle(); + + // The debugger should still be active — we can step + frontend.debug_send_step_command("n"); + dap.recv_continued(); + dap.recv_stopped(); + + // Verify we moved to Sys.sleep(10) + let stack = dap.stack_trace(); + assert_file_frame(&stack[0], &file.filename, 4, 16); + + // Quit the debugger + frontend.debug_send_quit(); + dap.recv_continued(); +} + #[test] fn test_dap_step_out() { let frontend = DummyArkFrontend::lock(); diff --git a/crates/ark_test/src/dummy_frontend.rs b/crates/ark_test/src/dummy_frontend.rs index 0f7d201dc..cf341ad29 100644 --- a/crates/ark_test/src/dummy_frontend.rs +++ b/crates/ark_test/src/dummy_frontend.rs @@ -145,6 +145,80 @@ pub struct DummyArkFrontendDefaultRepos { inner: DummyArkFrontend, } +/// Expected IOPub message type for use with `recv_iopub_interleaved`. +/// +/// Variants without a suffix match any message of that type. The `_control` +/// and `_shell` suffixed variants additionally check that the message's parent +/// header `msg_type` originated from the corresponding channel. +#[derive(Debug, Clone)] +pub enum IopubExpectation { + /// Any `Status(Busy)` message. + Busy, + /// `Status(Busy)` whose parent is a control-channel message + /// (`debug_request` or `interrupt_request`). + BusyControl, + /// Any `Status(Idle)` message. + Idle, + /// `Status(Idle)` whose parent is a control-channel message. + IdleControl, + /// `Status(Idle)` whose parent is a shell-channel message + /// (`execute_request`). + IdleShell, + /// A `DebugEvent` message. + DebugEvent, + /// An `ExecuteInput` message. + ExecuteInput, + /// An `ExecuteResult` message. + ExecuteResult, +} + +fn matches_expectation(msg: &Message, expected: &IopubExpectation) -> bool { + use amalthea::wire::status::ExecutionState; + + let parent_msg_type = msg.parent_header().map(|h| h.msg_type.as_str()); + + let is_control_parent = matches!( + parent_msg_type, + Some("debug_request") | Some("interrupt_request") | Some("shutdown_request") + ); + let is_shell_parent = matches!( + parent_msg_type, + Some("execute_request") | Some("comm_open") | Some("comm_msg") + ); + + match expected { + IopubExpectation::Busy => matches!( + msg, + Message::Status(data) if data.content.execution_state == ExecutionState::Busy + ), + IopubExpectation::BusyControl => { + matches!( + msg, + Message::Status(data) if data.content.execution_state == ExecutionState::Busy + ) && is_control_parent + }, + IopubExpectation::Idle => matches!( + msg, + Message::Status(data) if data.content.execution_state == ExecutionState::Idle + ), + IopubExpectation::IdleControl => { + matches!( + msg, + Message::Status(data) if data.content.execution_state == ExecutionState::Idle + ) && is_control_parent + }, + IopubExpectation::IdleShell => { + matches!( + msg, + Message::Status(data) if data.content.execution_state == ExecutionState::Idle + ) && is_shell_parent + }, + IopubExpectation::DebugEvent => matches!(msg, Message::DebugEvent(_)), + IopubExpectation::ExecuteInput => matches!(msg, Message::ExecuteInput(_)), + IopubExpectation::ExecuteResult => matches!(msg, Message::ExecuteResult(_)), + } +} + impl DummyArkFrontend { pub fn lock() -> Self { Self { @@ -174,7 +248,6 @@ impl DummyArkFrontend { /// Receive from IOPub with a timeout. /// Returns `None` if the timeout expires before a message arrives. - #[cfg(not(all(target_os = "windows", target_arch = "aarch64")))] pub fn recv_iopub_with_timeout(&self, timeout: Duration) -> Option { let timeout_ms = timeout.as_millis() as i64; if self.guard.iopub_socket.poll_incoming(timeout_ms).unwrap() { @@ -184,34 +257,6 @@ impl DummyArkFrontend { } } - /// Receive from IOPub with a timeout. - /// Returns `None` if the timeout expires before a message arrives. - /// - /// On Windows ARM, ZMQ poll with timeout blocks forever instead of - /// respecting the timeout. Use non-blocking poll with manual timing. - #[cfg(all(target_os = "windows", target_arch = "aarch64"))] - pub fn recv_iopub_with_timeout(&self, timeout: Duration) -> Option { - let start = std::time::Instant::now(); - - loop { - if start.elapsed() >= timeout { - return None; - } - - // Use non-blocking poll (timeout=0) to avoid ZMQ blocking forever - match self.guard.iopub_socket.poll_incoming(0) { - Ok(true) => { - return Some(Message::read_from_socket(&self.guard.iopub_socket).unwrap()); - }, - Ok(false) => { - // No message available, sleep briefly and try again - std::thread::sleep(Duration::from_millis(10)); - }, - Err(_) => return None, - } - } - } - /// Core primitive: receive the next non-stream, non-variables-comm message /// from IOPub. /// @@ -673,6 +718,17 @@ impl DummyArkFrontend { } } + /// Receive a `debug_event` from IOPub. + /// Automatically skips any Stream messages. + #[track_caller] + pub fn recv_iopub_debug_event(&self) -> serde_json::Value { + let msg = self.recv_iopub_next(); + match msg { + Message::DebugEvent(data) => data.content.content, + other => panic!("Expected DebugEvent, got {:?}", other), + } + } + /// Receive from IOPub and assert CommClose message for the given comm ID. /// Automatically skips any Stream messages. #[track_caller] @@ -1542,6 +1598,99 @@ impl DummyArkFrontend { out } + + /// Receive IOPub messages produced by multiple threads racing on the + /// IOPub channel. Each inner slice is an ordered sequence of expected + /// messages from one thread. Messages from different sequences may + /// interleave freely, but within each sequence the order is strict. + /// + /// Returns the collected messages in the order they were received. + /// + /// Streams and variables comm messages are auto-buffered (same as + /// `recv_iopub_next`). Stream buffers are cleared at the end without + /// requiring explicit assertions, since the interleaved window typically + /// spans multiple busy/idle cycles. + #[track_caller] + pub fn recv_iopub_interleaved(&self, sequences: &[&[IopubExpectation]]) -> Vec { + let mut cursors: Vec = vec![0; sequences.len()]; + let mut collected: Vec = Vec::new(); + + let total_expected: usize = sequences.iter().map(|s| s.len()).sum(); + + while collected.len() < total_expected { + let msg = match self.recv_iopub_with_timeout(RECV_TIMEOUT) { + Some(msg) => msg, + None => { + let mut status = String::new(); + for (i, seq) in sequences.iter().enumerate() { + let cursor = cursors[i]; + if cursor < seq.len() { + status.push_str(&format!( + "\n sequence {i}: waiting for {:?} ({cursor}/{})", + seq[cursor], + seq.len() + )); + } else { + status.push_str(&format!( + "\n sequence {i}: complete ({}/{})", + seq.len(), + seq.len() + )); + } + } + panic!( + "recv_iopub_interleaved: timed out after receiving {}/{total_expected} messages{status}", + collected.len() + ); + }, + }; + + if self.try_buffer_msg(&msg) { + continue; + } + + trace_iopub_msg(&msg); + + let mut matched = false; + for (i, seq) in sequences.iter().enumerate() { + let cursor = cursors[i]; + if cursor < seq.len() && matches_expectation(&msg, &seq[cursor]) { + cursors[i] += 1; + matched = true; + break; + } + } + + if !matched { + let mut status = String::new(); + for (i, seq) in sequences.iter().enumerate() { + let cursor = cursors[i]; + if cursor < seq.len() { + status.push_str(&format!( + "\n sequence {i}: expecting {:?} ({cursor}/{})", + seq[cursor], + seq.len() + )); + } else { + status.push_str(&format!( + "\n sequence {i}: complete ({}/{})", + seq.len(), + seq.len() + )); + } + } + panic!( + "recv_iopub_interleaved: unexpected message: {msg:?}\nSequence state:{status}" + ); + } + + collected.push(msg); + } + + self.flush_streams_at_boundary(); + + collected + } } /// Result of sourcing a file via `send_source()`. diff --git a/crates/echo/src/control.rs b/crates/echo/src/control.rs index db51b30ef..cbbfe066f 100644 --- a/crates/echo/src/control.rs +++ b/crates/echo/src/control.rs @@ -1,19 +1,15 @@ use amalthea::language::control_handler::ControlHandler; +use amalthea::wire::debug_reply::DebugReply; +use amalthea::wire::debug_request::DebugRequest; use amalthea::wire::exception::Exception; use amalthea::wire::interrupt_reply::InterruptReply; use amalthea::wire::jupyter_message::Status; use amalthea::wire::shutdown_reply::ShutdownReply; use amalthea::wire::shutdown_request::ShutdownRequest; -use async_trait::async_trait; - pub struct Control {} -#[async_trait] impl ControlHandler for Control { - async fn handle_shutdown_request( - &self, - msg: &ShutdownRequest, - ) -> Result { + fn handle_shutdown_request(&self, msg: &ShutdownRequest) -> Result { // NYI Ok(ShutdownReply { status: Status::Ok, @@ -21,8 +17,15 @@ impl ControlHandler for Control { }) } - async fn handle_interrupt_request(&self) -> Result { + fn handle_interrupt_request(&self) -> Result { // NYI Ok(InterruptReply { status: Status::Ok }) } + + fn handle_debug_request(&self, _msg: &DebugRequest) -> Result { + // NYI + Ok(DebugReply { + content: serde_json::json!({}), + }) + } } diff --git a/crates/echo/src/main.rs b/crates/echo/src/main.rs index 978ff81aa..2e4bb212b 100644 --- a/crates/echo/src/main.rs +++ b/crates/echo/src/main.rs @@ -11,8 +11,6 @@ mod shell; use std::collections::HashMap; use std::env; use std::io::stdin; -use std::sync::Arc; -use std::sync::Mutex; use amalthea::comm::event::CommEvent; use amalthea::connection_file::ConnectionFile; @@ -42,7 +40,7 @@ fn start_kernel(connection_file: ConnectionFile, registration_file: Option