Skip to content

Commit 720d592

Browse files
committed
Fix synchronisation between comm opening and plot updates
1 parent e65e886 commit 720d592

11 files changed

Lines changed: 384 additions & 172 deletions

File tree

crates/amalthea/src/comm/event.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*
66
*/
77

8+
use crossbeam::channel::Sender;
89
use serde_json::Value;
910

1011
use crate::comm::comm_channel::CommMsg;
@@ -21,4 +22,10 @@ pub enum CommEvent {
2122

2223
/// A Comm was closed
2324
Closed(String),
25+
26+
/// Synchronisation barrier. Shell signals the sender after processing all
27+
/// preceding events. The caller blocks on the paired receiver to guarantee
28+
/// that earlier events (e.g. `Opened`) have been fully handled before
29+
/// continuing.
30+
Barrier(Sender<()>),
2431
}

crates/amalthea/src/language/shell_handler.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77

88
use async_trait::async_trait;
9+
use crossbeam::channel::Receiver;
910

1011
use crate::comm::comm_channel::Comm;
1112
use crate::comm::comm_channel::CommMsg;
@@ -53,17 +54,17 @@ pub trait ShellHandler: Send {
5354
req: &IsCompleteRequest,
5455
) -> crate::Result<IsCompleteReply>;
5556

56-
/// Handles a request to execute code.
57-
///
58-
/// The `originator` is an opaque byte array identifying the peer that sent
59-
/// the request; it is needed to perform an input request during execution.
57+
/// Kicks off execution of the given request and returns a channel that
58+
/// will receive the reply once execution completes. Shell select-loops
59+
/// on this receiver together with `comm_event_rx` so it can process
60+
/// comm events (e.g. barrier handshakes) while execution is in progress.
6061
///
6162
/// Docs: https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute
62-
async fn handle_execute_request(
63+
fn start_execute_request(
6364
&mut self,
6465
originator: Originator,
6566
req: &ExecuteRequest,
66-
) -> crate::Result<ExecuteReply>;
67+
) -> Receiver<crate::Result<ExecuteReply>>;
6768

6869
/// Handles a request to provide completions for the given code fragment.
6970
///

crates/amalthea/src/socket/shell.rs

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::sync::Arc;
1111
use std::sync::Mutex;
1212

1313
use crossbeam::channel::Receiver;
14+
use crossbeam::channel::Select;
1415
use crossbeam::channel::Sender;
1516
use futures::executor::block_on;
1617
use stdext::result::ResultExt;
@@ -37,6 +38,8 @@ use crate::wire::comm_info_request::CommInfoRequest;
3738
use crate::wire::comm_msg::CommWireMsg;
3839
use crate::wire::comm_open::CommOpen;
3940
use crate::wire::exception::Exception;
41+
use crate::wire::execute_reply::ExecuteReply;
42+
use crate::wire::execute_request::ExecuteRequest;
4043
use crate::wire::jupyter_message::JupyterMessage;
4144
use crate::wire::jupyter_message::Message;
4245
use crate::wire::jupyter_message::ProtocolMessage;
@@ -205,6 +208,10 @@ impl Shell {
205208
);
206209
},
207210

211+
CommEvent::Barrier(done_tx) => {
212+
done_tx.send(()).log_err();
213+
},
214+
208215
CommEvent::Message(comm_id, msg) => {
209216
let Some(comm) = self.open_comms.iter().find(|c| c.comm_id == comm_id) else {
210217
log::warn!("Received message for unknown comm channel {comm_id}: {msg:?}");
@@ -240,6 +247,15 @@ impl Shell {
240247
/// Process a message received from the front-end, optionally dispatching
241248
/// messages to the IOPub or execution threads
242249
fn process_message(&mut self, msg: Message) -> crate::Result<()> {
250+
// Execute requests get special handling: Shell select-loops on both
251+
// the execute response and comm events so it can process
252+
// backend-initiated comm opens (with barrier handshakes) while R is
253+
// still executing, preventing a deadlock where R waits for Shell to
254+
// drain the barrier while Shell waits for the execute response.
255+
if let Message::ExecuteRequest(req) = msg {
256+
return self.handle_execute_request(req);
257+
}
258+
243259
// Extract references to the components we need to pass to handlers.
244260
// This allows us to borrow different fields of self independently.
245261
let iopub_tx = &self.iopub_tx;
@@ -256,13 +272,6 @@ impl Shell {
256272
Message::IsCompleteRequest(req) => Self::handle_request(iopub_tx, socket, req, |msg| {
257273
block_on(shell_handler.handle_is_complete_request(msg))
258274
}),
259-
Message::ExecuteRequest(req) => {
260-
// FIXME: We should ideally not pass the originator to the language kernel
261-
let originator = Originator::from(&req);
262-
Self::handle_request(iopub_tx, socket, req, |msg| {
263-
block_on(shell_handler.handle_execute_request(originator, msg))
264-
})
265-
},
266275
Message::CompleteRequest(req) => Self::handle_request(iopub_tx, socket, req, |msg| {
267276
block_on(shell_handler.handle_complete_request(msg))
268277
}),
@@ -363,6 +372,63 @@ impl Shell {
363372
result.and(Ok(()))
364373
}
365374

375+
/// Handle an execute request. Unlike other requests that use the generic
376+
/// `handle_request`, this method select-loops on both the execute response
377+
/// and `comm_event_rx`. This allows Shell to process comm events (e.g.
378+
/// `CommEvent::Barrier` from `comm_open_backend`) while the R thread is
379+
/// still executing, preventing a deadlock where the R thread waits for
380+
/// Shell to drain comm events while Shell waits for the execute response.
381+
fn handle_execute_request(&mut self, req: JupyterMessage<ExecuteRequest>) -> crate::Result<()> {
382+
self.iopub_tx
383+
.send(status(req.clone(), ExecutionState::Busy))
384+
.unwrap();
385+
386+
log::info!("Received shell request: {req:?}");
387+
388+
// FIXME: We should ideally not pass the originator to the language kernel
389+
let originator = Originator::from(&req);
390+
let response_rx = self
391+
.shell_handler
392+
.start_execute_request(originator, &req.content);
393+
394+
// Select-loop: drain comm events while waiting for the execute reply.
395+
let result = loop {
396+
let mut sel = Select::new();
397+
let resp_idx = sel.recv(&response_rx);
398+
sel.recv(&self.comm_event_rx);
399+
400+
let ready = sel.ready();
401+
402+
while let Ok(event) = self.comm_event_rx.try_recv() {
403+
self.process_comm_event(event);
404+
}
405+
406+
if ready == resp_idx {
407+
break response_rx.recv().unwrap();
408+
}
409+
};
410+
411+
let result = match result {
412+
Ok(reply) => req.send_reply(reply, &self.socket),
413+
Err(crate::Error::ShellErrorReply(error)) => {
414+
req.send_error::<ExecuteReply>(error, &self.socket)
415+
},
416+
Err(crate::Error::ShellErrorExecuteReply(error, exec_count)) => {
417+
req.send_execute_error(error, exec_count, &self.socket)
418+
},
419+
Err(err) => {
420+
let error = Exception::internal_error(format!("{err:?}"));
421+
req.send_error::<ExecuteReply>(error, &self.socket)
422+
},
423+
};
424+
425+
self.iopub_tx
426+
.send(status(req.clone(), ExecutionState::Idle))
427+
.unwrap();
428+
429+
result.and(Ok(()))
430+
}
431+
366432
fn handle_notification<Not, Handler>(
367433
iopub_tx: &Sender<IOPubMessage>,
368434
not: JupyterMessage<Not>,

crates/amalthea/tests/client/shell.rs

Lines changed: 67 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -82,63 +82,8 @@ impl Shell {
8282
warn!("Could not prompt for input: {}", err);
8383
}
8484
}
85-
}
86-
87-
#[async_trait]
88-
impl ShellHandler for Shell {
89-
async fn handle_info_request(
90-
&mut self,
91-
_req: &KernelInfoRequest,
92-
) -> amalthea::Result<KernelInfoReply> {
93-
let info = LanguageInfo {
94-
name: String::from("Test"),
95-
version: String::from("1.0"),
96-
file_extension: String::from(".ech"),
97-
mimetype: String::from("text/echo"),
98-
pygments_lexer: None,
99-
codemirror_mode: None,
100-
nbconvert_exporter: None,
101-
positron: None,
102-
};
103-
Ok(KernelInfoReply {
104-
status: Status::Ok,
105-
banner: format!("Amalthea Echo {}", env!("CARGO_PKG_VERSION")),
106-
implementation: String::from("echo"),
107-
implementation_version: String::from(env!("CARGO_PKG_VERSION")),
108-
debugger: false,
109-
help_links: Vec::new(),
110-
language_info: info,
111-
})
112-
}
113-
114-
async fn handle_complete_request(
115-
&self,
116-
_req: &CompleteRequest,
117-
) -> amalthea::Result<CompleteReply> {
118-
// No matches in this toy implementation.
119-
Ok(CompleteReply {
120-
matches: Vec::new(),
121-
status: Status::Ok,
122-
cursor_start: 0,
123-
cursor_end: 0,
124-
metadata: json!({}),
125-
})
126-
}
12785

128-
/// Handle a request to test code for completion.
129-
async fn handle_is_complete_request(
130-
&self,
131-
_req: &IsCompleteRequest,
132-
) -> amalthea::Result<IsCompleteReply> {
133-
// In this echo example, the code is always complete!
134-
Ok(IsCompleteReply {
135-
status: IsComplete::Complete,
136-
indent: String::from(""),
137-
})
138-
}
139-
140-
/// Handles an ExecuteRequest; "executes" the code by echoing it.
141-
async fn handle_execute_request(
86+
fn execute(
14287
&mut self,
14388
originator: Originator,
14489
req: &ExecuteRequest,
@@ -227,6 +172,72 @@ impl ShellHandler for Shell {
227172
user_expressions: serde_json::Value::Null,
228173
})
229174
}
175+
}
176+
177+
#[async_trait]
178+
impl ShellHandler for Shell {
179+
async fn handle_info_request(
180+
&mut self,
181+
_req: &KernelInfoRequest,
182+
) -> amalthea::Result<KernelInfoReply> {
183+
let info = LanguageInfo {
184+
name: String::from("Test"),
185+
version: String::from("1.0"),
186+
file_extension: String::from(".ech"),
187+
mimetype: String::from("text/echo"),
188+
pygments_lexer: None,
189+
codemirror_mode: None,
190+
nbconvert_exporter: None,
191+
positron: None,
192+
};
193+
Ok(KernelInfoReply {
194+
status: Status::Ok,
195+
banner: format!("Amalthea Echo {}", env!("CARGO_PKG_VERSION")),
196+
implementation: String::from("echo"),
197+
implementation_version: String::from(env!("CARGO_PKG_VERSION")),
198+
debugger: false,
199+
help_links: Vec::new(),
200+
language_info: info,
201+
})
202+
}
203+
204+
async fn handle_complete_request(
205+
&self,
206+
_req: &CompleteRequest,
207+
) -> amalthea::Result<CompleteReply> {
208+
// No matches in this toy implementation.
209+
Ok(CompleteReply {
210+
matches: Vec::new(),
211+
status: Status::Ok,
212+
cursor_start: 0,
213+
cursor_end: 0,
214+
metadata: json!({}),
215+
})
216+
}
217+
218+
/// Handle a request to test code for completion.
219+
async fn handle_is_complete_request(
220+
&self,
221+
_req: &IsCompleteRequest,
222+
) -> amalthea::Result<IsCompleteReply> {
223+
// In this echo example, the code is always complete!
224+
Ok(IsCompleteReply {
225+
status: IsComplete::Complete,
226+
indent: String::from(""),
227+
})
228+
}
229+
230+
/// Handles an ExecuteRequest; "executes" the code by echoing it.
231+
fn start_execute_request(
232+
&mut self,
233+
originator: Originator,
234+
req: &ExecuteRequest,
235+
) -> crossbeam::channel::Receiver<amalthea::Result<ExecuteReply>> {
236+
let (tx, rx) = crossbeam::channel::bounded(1);
237+
let result = self.execute(originator, req);
238+
tx.send(result).unwrap();
239+
rx
240+
}
230241

231242
/// Handles an introspection request
232243
async fn handle_inspect_request(&self, req: &InspectRequest) -> amalthea::Result<InspectReply> {

crates/ark/src/console/console_comm.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,13 @@ impl Console {
4040
/// Register a backend-initiated comm on the R thread.
4141
///
4242
/// Creates the `CommSocket` and `CommHandlerContext`, calls `handle_open`,
43-
/// sends `CommEvent::Opened` to amalthea, and returns the comm ID.
43+
/// sends `CommEvent::Opened` to Amalthea's Shell thread, and returns the
44+
/// comm ID.
45+
///
46+
/// Blocks until Shell has fully processed the open (sent `comm_open` on
47+
/// IOPub and registered the comm for routing). This guarantees that any
48+
/// `comm_msg` sent by the caller afterwards are ordered after the
49+
/// `comm_open` on IOPub.
4450
pub(crate) fn comm_open_backend(
4551
&mut self,
4652
comm_name: &str,
@@ -65,6 +71,13 @@ impl Console {
6571
self.comm_event_tx
6672
.send(CommEvent::Opened(comm, open_metadata))?;
6773

74+
// Block until Shell has processed the Opened event, ensuring the
75+
// `comm_open` message is on IOPub before we return. Any updates
76+
// the caller sends after this point are guaranteed to follow it.
77+
let (done_tx, done_rx) = crossbeam::channel::bounded(0);
78+
self.comm_event_tx.send(CommEvent::Barrier(done_tx))?;
79+
done_rx.recv()?;
80+
6881
Ok(comm_id)
6982
}
7083

crates/ark/src/modules/positron/graphics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ setHook("before.grid.newpage", action = "replace", function(...) {
8080
grDevices::deviceIsInteractive(ARK_GRAPHICS_DEVICE_NAME)
8181
}
8282

83+
current_plot_id <- function() {
84+
.ps.Call("ps_graphics_current_plot_id")
85+
}
86+
8387
# Create a recording of the current plot.
8488
#
8589
# This saves the plot's display list, so it can be used to re-render plots as

crates/ark/src/plots/graphics_device.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,6 +1358,14 @@ unsafe extern "C-unwind" fn ps_graphics_get_metadata(id: SEXP) -> anyhow::Result
13581358
}
13591359
}
13601360

1361+
/// Return the current plot ID. Used by tests to verify that layout panels
1362+
/// share the same page (same ID) and that overflow creates a new page.
1363+
#[harp::register]
1364+
unsafe extern "C-unwind" fn ps_graphics_current_plot_id() -> anyhow::Result<SEXP> {
1365+
let id = Console::get().device_context().id();
1366+
Ok(RObject::from(&id).sexp)
1367+
}
1368+
13611369
/// Push a source file URI onto the source context stack.
13621370
/// Called from the `source()` hook when entering a sourced file.
13631371
#[harp::register]

crates/ark/src/shell.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,11 @@ impl ShellHandler for Shell {
175175

176176
/// Handles an ExecuteRequest by sending the code to the R execution thread
177177
/// for processing.
178-
async fn handle_execute_request(
178+
fn start_execute_request(
179179
&mut self,
180180
originator: Originator,
181181
req: &ExecuteRequest,
182-
) -> amalthea::Result<ExecuteReply> {
182+
) -> crossbeam::channel::Receiver<amalthea::Result<ExecuteReply>> {
183183
let (response_tx, response_rx) = unbounded::<amalthea::Result<ExecuteReply>>();
184184
let mut req_clone = req.clone();
185185
req_clone.code = convert_line_endings(&req_clone.code, LineEnding::Posix);
@@ -196,7 +196,7 @@ impl ShellHandler for Shell {
196196

197197
trace!("Code sent to R: {}", req_clone.code);
198198

199-
response_rx.recv().unwrap()
199+
response_rx
200200
}
201201

202202
/// Handles an introspection request

0 commit comments

Comments
 (0)