Skip to content

Commit 7386726

Browse files
committed
Fix synchronisation between comm opening and plot updates
1 parent 8f67c4b commit 7386726

11 files changed

Lines changed: 381 additions & 167 deletions

File tree

crates/amalthea/src/comm/event.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*
66
*/
77

8+
use crossbeam::channel::Sender;
89
use serde_json::Value;
910

1011
use crate::comm::comm_channel::CommMsg;
@@ -21,4 +22,10 @@ pub enum CommEvent {
2122

2223
/// A Comm was closed
2324
Closed(String),
25+
26+
/// Synchronisation barrier. Shell signals the sender after processing all
27+
/// preceding events. The caller blocks on the paired receiver to guarantee
28+
/// that earlier events (e.g. `Opened`) have been fully handled before
29+
/// continuing.
30+
Barrier(Sender<()>),
2431
}

crates/amalthea/src/language/shell_handler.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77

88
use async_trait::async_trait;
9+
use crossbeam::channel::Receiver;
910

1011
use crate::comm::comm_channel::Comm;
1112
use crate::comm::comm_channel::CommMsg;
@@ -53,17 +54,17 @@ pub trait ShellHandler: Send {
5354
req: &IsCompleteRequest,
5455
) -> crate::Result<IsCompleteReply>;
5556

56-
/// Handles a request to execute code.
57-
///
58-
/// The `originator` is an opaque byte array identifying the peer that sent
59-
/// the request; it is needed to perform an input request during execution.
57+
/// Kicks off execution of the given request and returns a channel that
58+
/// will receive the reply once execution completes. Shell select-loops
59+
/// on this receiver together with `comm_event_rx` so it can process
60+
/// comm events (e.g. barrier handshakes) while execution is in progress.
6061
///
6162
/// Docs: https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute
62-
async fn handle_execute_request(
63+
fn start_execute_request(
6364
&mut self,
6465
originator: Originator,
6566
req: &ExecuteRequest,
66-
) -> crate::Result<ExecuteReply>;
67+
) -> Receiver<crate::Result<ExecuteReply>>;
6768

6869
/// Handles a request to provide completions for the given code fragment.
6970
///

crates/amalthea/src/socket/shell.rs

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use crate::wire::comm_info_request::CommInfoRequest;
3737
use crate::wire::comm_msg::CommWireMsg;
3838
use crate::wire::comm_open::CommOpen;
3939
use crate::wire::exception::Exception;
40+
use crate::wire::execute_reply::ExecuteReply;
41+
use crate::wire::execute_request::ExecuteRequest;
4042
use crate::wire::header::JupyterHeader;
4143
use crate::wire::jupyter_message::JupyterMessage;
4244
use crate::wire::jupyter_message::Message;
@@ -206,6 +208,10 @@ impl Shell {
206208
);
207209
},
208210

211+
CommEvent::Barrier(done_tx) => {
212+
done_tx.send(()).log_err();
213+
},
214+
209215
CommEvent::Message(comm_id, msg) => {
210216
let Some(comm) = self.open_comms.iter().find(|c| c.comm_id == comm_id) else {
211217
log::warn!("Received message for unknown comm channel {comm_id}: {msg:?}");
@@ -241,6 +247,14 @@ impl Shell {
241247
/// Process a message received from the front-end, optionally dispatching
242248
/// messages to the IOPub or execution threads
243249
fn process_message(&mut self, msg: Message) -> crate::Result<()> {
250+
// Execute requests get special handling: Shell select-loops on both
251+
// the execute response and comm events, allowing it to process
252+
// backend-initiated comm opens (with barrier handshakes) without
253+
// deadlocking.
254+
if let Message::ExecuteRequest(req) = msg {
255+
return self.handle_execute_request(req);
256+
}
257+
244258
// Extract references to the components we need to pass to handlers.
245259
// This allows us to borrow different fields of self independently.
246260
let iopub_tx = &self.iopub_tx;
@@ -257,13 +271,6 @@ impl Shell {
257271
Message::IsCompleteRequest(req) => Self::handle_request(iopub_tx, socket, req, |msg| {
258272
block_on(shell_handler.handle_is_complete_request(msg))
259273
}),
260-
Message::ExecuteRequest(req) => {
261-
// FIXME: We should ideally not pass the originator to the language kernel
262-
let originator = Originator::from(&req);
263-
Self::handle_request(iopub_tx, socket, req, |msg| {
264-
block_on(shell_handler.handle_execute_request(originator, msg))
265-
})
266-
},
267274
Message::CompleteRequest(req) => Self::handle_request(iopub_tx, socket, req, |msg| {
268275
block_on(shell_handler.handle_complete_request(msg))
269276
}),
@@ -364,6 +371,68 @@ impl Shell {
364371
result.and(Ok(()))
365372
}
366373

374+
/// Handle an execute request. Unlike other requests that use the generic
375+
/// `handle_request`, this method select-loops on both the execute response
376+
/// and `comm_event_rx`. This allows Shell to process comm events (e.g.
377+
/// `CommEvent::Barrier` from `comm_open_backend`) while the R thread is
378+
/// still executing, preventing a deadlock where the R thread waits for
379+
/// Shell to drain comm events while Shell waits for the execute response.
380+
fn handle_execute_request(&mut self, req: JupyterMessage<ExecuteRequest>) -> crate::Result<()> {
381+
use crossbeam::channel::Select;
382+
383+
self.iopub_tx
384+
.send(status(req.clone(), ExecutionState::Busy))
385+
.unwrap();
386+
387+
log::info!("Received shell request: {req:?}");
388+
389+
// FIXME: We should ideally not pass the originator to the language kernel
390+
let originator = Originator::from(&req);
391+
let response_rx = self
392+
.shell_handler
393+
.start_execute_request(originator, &req.content);
394+
395+
// Select-loop: drain comm events while waiting for the execute reply.
396+
let result = loop {
397+
let mut sel = Select::new();
398+
let resp_idx = sel.recv(&response_rx);
399+
let comm_idx = sel.recv(&self.comm_event_rx);
400+
401+
let ready = sel.ready();
402+
if ready == resp_idx {
403+
// Drain any comm events that arrived before or alongside the response
404+
while let Ok(event) = self.comm_event_rx.try_recv() {
405+
self.process_comm_event(event);
406+
}
407+
break response_rx.recv().unwrap();
408+
} else if ready == comm_idx {
409+
if let Ok(event) = self.comm_event_rx.try_recv() {
410+
self.process_comm_event(event);
411+
}
412+
}
413+
};
414+
415+
let result = match result {
416+
Ok(reply) => req.send_reply(reply, &self.socket),
417+
Err(crate::Error::ShellErrorReply(error)) => {
418+
req.send_error::<ExecuteReply>(error, &self.socket)
419+
},
420+
Err(crate::Error::ShellErrorExecuteReply(error, exec_count)) => {
421+
req.send_execute_error(error, exec_count, &self.socket)
422+
},
423+
Err(err) => {
424+
let error = Exception::internal_error(format!("{err:?}"));
425+
req.send_error::<ExecuteReply>(error, &self.socket)
426+
},
427+
};
428+
429+
self.iopub_tx
430+
.send(status(req.clone(), ExecutionState::Idle))
431+
.unwrap();
432+
433+
result.and(Ok(()))
434+
}
435+
367436
fn handle_notification<Not, Handler>(
368437
iopub_tx: &Sender<IOPubMessage>,
369438
not: JupyterMessage<Not>,

crates/amalthea/tests/client/shell.rs

Lines changed: 67 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -82,63 +82,8 @@ impl Shell {
8282
warn!("Could not prompt for input: {}", err);
8383
}
8484
}
85-
}
86-
87-
#[async_trait]
88-
impl ShellHandler for Shell {
89-
async fn handle_info_request(
90-
&mut self,
91-
_req: &KernelInfoRequest,
92-
) -> amalthea::Result<KernelInfoReply> {
93-
let info = LanguageInfo {
94-
name: String::from("Test"),
95-
version: String::from("1.0"),
96-
file_extension: String::from(".ech"),
97-
mimetype: String::from("text/echo"),
98-
pygments_lexer: None,
99-
codemirror_mode: None,
100-
nbconvert_exporter: None,
101-
positron: None,
102-
};
103-
Ok(KernelInfoReply {
104-
status: Status::Ok,
105-
banner: format!("Amalthea Echo {}", env!("CARGO_PKG_VERSION")),
106-
implementation: String::from("echo"),
107-
implementation_version: String::from(env!("CARGO_PKG_VERSION")),
108-
debugger: false,
109-
help_links: Vec::new(),
110-
language_info: info,
111-
})
112-
}
113-
114-
async fn handle_complete_request(
115-
&self,
116-
_req: &CompleteRequest,
117-
) -> amalthea::Result<CompleteReply> {
118-
// No matches in this toy implementation.
119-
Ok(CompleteReply {
120-
matches: Vec::new(),
121-
status: Status::Ok,
122-
cursor_start: 0,
123-
cursor_end: 0,
124-
metadata: json!({}),
125-
})
126-
}
12785

128-
/// Handle a request to test code for completion.
129-
async fn handle_is_complete_request(
130-
&self,
131-
_req: &IsCompleteRequest,
132-
) -> amalthea::Result<IsCompleteReply> {
133-
// In this echo example, the code is always complete!
134-
Ok(IsCompleteReply {
135-
status: IsComplete::Complete,
136-
indent: String::from(""),
137-
})
138-
}
139-
140-
/// Handles an ExecuteRequest; "executes" the code by echoing it.
141-
async fn handle_execute_request(
86+
fn execute(
14287
&mut self,
14388
originator: Originator,
14489
req: &ExecuteRequest,
@@ -227,6 +172,72 @@ impl ShellHandler for Shell {
227172
user_expressions: serde_json::Value::Null,
228173
})
229174
}
175+
}
176+
177+
#[async_trait]
178+
impl ShellHandler for Shell {
179+
async fn handle_info_request(
180+
&mut self,
181+
_req: &KernelInfoRequest,
182+
) -> amalthea::Result<KernelInfoReply> {
183+
let info = LanguageInfo {
184+
name: String::from("Test"),
185+
version: String::from("1.0"),
186+
file_extension: String::from(".ech"),
187+
mimetype: String::from("text/echo"),
188+
pygments_lexer: None,
189+
codemirror_mode: None,
190+
nbconvert_exporter: None,
191+
positron: None,
192+
};
193+
Ok(KernelInfoReply {
194+
status: Status::Ok,
195+
banner: format!("Amalthea Echo {}", env!("CARGO_PKG_VERSION")),
196+
implementation: String::from("echo"),
197+
implementation_version: String::from(env!("CARGO_PKG_VERSION")),
198+
debugger: false,
199+
help_links: Vec::new(),
200+
language_info: info,
201+
})
202+
}
203+
204+
async fn handle_complete_request(
205+
&self,
206+
_req: &CompleteRequest,
207+
) -> amalthea::Result<CompleteReply> {
208+
// No matches in this toy implementation.
209+
Ok(CompleteReply {
210+
matches: Vec::new(),
211+
status: Status::Ok,
212+
cursor_start: 0,
213+
cursor_end: 0,
214+
metadata: json!({}),
215+
})
216+
}
217+
218+
/// Handle a request to test code for completion.
219+
async fn handle_is_complete_request(
220+
&self,
221+
_req: &IsCompleteRequest,
222+
) -> amalthea::Result<IsCompleteReply> {
223+
// In this echo example, the code is always complete!
224+
Ok(IsCompleteReply {
225+
status: IsComplete::Complete,
226+
indent: String::from(""),
227+
})
228+
}
229+
230+
/// Handles an ExecuteRequest; "executes" the code by echoing it.
231+
fn start_execute_request(
232+
&mut self,
233+
originator: Originator,
234+
req: &ExecuteRequest,
235+
) -> crossbeam::channel::Receiver<amalthea::Result<ExecuteReply>> {
236+
let (tx, rx) = crossbeam::channel::bounded(1);
237+
let result = self.execute(originator, req);
238+
tx.send(result).unwrap();
239+
rx
240+
}
230241

231242
/// Handles an introspection request
232243
async fn handle_inspect_request(&self, req: &InspectRequest) -> amalthea::Result<InspectReply> {

crates/ark/src/console/console_comm.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ impl Console {
4040
/// Register a backend-initiated comm on the R thread.
4141
///
4242
/// Creates the `CommSocket` and `CommHandlerContext`, calls `handle_open`,
43-
/// sends `CommEvent::Opened` to amalthea, and returns the comm ID.
43+
/// sends `CommEvent::Opened` to Shell, and returns the comm ID.
44+
///
45+
/// Blocks until Shell has fully processed the open (sent `comm_open` on
46+
/// IOPub and registered the comm for routing). This guarantees that any
47+
/// `comm_msg` updates the caller sends afterwards are ordered after the
48+
/// `comm_open` on IOPub.
4449
pub(crate) fn comm_open_backend(
4550
&mut self,
4651
comm_name: &str,
@@ -65,6 +70,13 @@ impl Console {
6570
self.comm_event_tx
6671
.send(CommEvent::Opened(comm, open_metadata))?;
6772

73+
// Block until Shell has processed the Opened event, ensuring the
74+
// `comm_open` message is on IOPub before we return. Any updates
75+
// the caller sends after this point are guaranteed to follow it.
76+
let (done_tx, done_rx) = crossbeam::channel::bounded(0);
77+
self.comm_event_tx.send(CommEvent::Barrier(done_tx))?;
78+
done_rx.recv()?;
79+
6880
Ok(comm_id)
6981
}
7082

crates/ark/src/modules/positron/graphics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ setHook("before.grid.newpage", action = "replace", function(...) {
8080
grDevices::deviceIsInteractive(ARK_GRAPHICS_DEVICE_NAME)
8181
}
8282

83+
current_plot_id <- function() {
84+
.ps.Call("ps_graphics_current_plot_id")
85+
}
86+
8387
# Create a recording of the current plot.
8488
#
8589
# This saves the plot's display list, so it can be used to re-render plots as

crates/ark/src/plots/graphics_device.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,6 +1358,14 @@ unsafe extern "C-unwind" fn ps_graphics_get_metadata(id: SEXP) -> anyhow::Result
13581358
}
13591359
}
13601360

1361+
/// Return the current plot ID. Used by tests to verify that layout panels
1362+
/// share the same page (same ID) and that overflow creates a new page.
1363+
#[harp::register]
1364+
unsafe extern "C-unwind" fn ps_graphics_current_plot_id() -> anyhow::Result<SEXP> {
1365+
let id = Console::get().device_context().id();
1366+
Ok(RObject::from(&id).sexp)
1367+
}
1368+
13611369
/// Push a source file URI onto the source context stack.
13621370
/// Called from the `source()` hook when entering a sourced file.
13631371
#[harp::register]

crates/ark/src/shell.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,11 @@ impl ShellHandler for Shell {
175175

176176
/// Handles an ExecuteRequest by sending the code to the R execution thread
177177
/// for processing.
178-
async fn handle_execute_request(
178+
fn start_execute_request(
179179
&mut self,
180180
originator: Originator,
181181
req: &ExecuteRequest,
182-
) -> amalthea::Result<ExecuteReply> {
182+
) -> crossbeam::channel::Receiver<amalthea::Result<ExecuteReply>> {
183183
let (response_tx, response_rx) = unbounded::<amalthea::Result<ExecuteReply>>();
184184
let mut req_clone = req.clone();
185185
req_clone.code = convert_line_endings(&req_clone.code, LineEnding::Posix);
@@ -196,7 +196,7 @@ impl ShellHandler for Shell {
196196

197197
trace!("Code sent to R: {}", req_clone.code);
198198

199-
response_rx.recv().unwrap()
199+
response_rx
200200
}
201201

202202
/// Handles an introspection request

0 commit comments

Comments
 (0)