Skip to content

Commit f3d3154

Browse files
committed
Fix polling on Windows ARM
1 parent 11444ad commit f3d3154

4 files changed

Lines changed: 121 additions & 61 deletions

File tree

crates/amalthea/src/kernel.rs

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -531,37 +531,69 @@ fn socket_bridge_thread(
531531
};
532532

533533
loop {
534-
let n = unwrap!(
535-
zmq::poll(&mut poll_items, -1),
536-
Err(err) => {
537-
debug_panic!("While polling 0MQ items: {err:?}");
538-
0
539-
}
540-
);
541-
542-
for _ in 0..n {
543-
if consume_outbound_notification() {
544-
forward_outbound();
545-
continue;
546-
}
547-
534+
// On Windows ARM, zmq::poll with a non-zero timeout blocks forever
535+
// and inproc notification sockets may not wake the poll at all.
536+
// Use a fully separate polling path that doesn't rely on ZMQ
537+
// readability reporting: non-blocking poll + unconditional drain
538+
// of all sources + short sleep when idle.
539+
#[cfg(all(target_os = "windows", target_arch = "aarch64"))]
540+
{
541+
// Drain outbound messages (IOPub, StdIn) unconditionally
542+
consume_outbound_notification();
543+
forward_outbound();
544+
545+
// Check inbound sockets with non-blocking poll
548546
if has_inbound(&stdin_socket) {
549547
unwrap!(
550548
forward_inbound(&stdin_socket, &stdin_inbound_tx),
551549
Err(err) => debug_panic!("While forwarding inbound message: {err:?}")
552550
);
553-
continue;
554551
}
555-
556552
if has_inbound(&iopub_socket) {
557553
unwrap!(
558554
forward_inbound_subscription(&iopub_socket, &iopub_inbound_tx),
559555
Err(err) => debug_panic!("While forwarding inbound message: {err:?}")
560556
);
561-
continue;
562557
}
563558

564-
debug_panic!("Could not find readable message");
559+
std::thread::sleep(std::time::Duration::from_millis(1));
560+
continue;
561+
}
562+
563+
#[cfg(not(all(target_os = "windows", target_arch = "aarch64")))]
564+
{
565+
let n = unwrap!(
566+
zmq::poll(&mut poll_items, -1),
567+
Err(err) => {
568+
debug_panic!("While polling 0MQ items: {err:?}");
569+
0
570+
}
571+
);
572+
573+
for _ in 0..n {
574+
if consume_outbound_notification() {
575+
forward_outbound();
576+
continue;
577+
}
578+
579+
if has_inbound(&stdin_socket) {
580+
unwrap!(
581+
forward_inbound(&stdin_socket, &stdin_inbound_tx),
582+
Err(err) => debug_panic!("While forwarding inbound message: {err:?}")
583+
);
584+
continue;
585+
}
586+
587+
if has_inbound(&iopub_socket) {
588+
unwrap!(
589+
forward_inbound_subscription(&iopub_socket, &iopub_inbound_tx),
590+
Err(err) => debug_panic!("While forwarding inbound message: {err:?}")
591+
);
592+
continue;
593+
}
594+
595+
debug_panic!("Could not find readable message");
596+
}
565597
}
566598
}
567599
}

crates/amalthea/src/socket.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,38 @@ impl Socket {
214214
}
215215
}
216216

217+
#[cfg(not(all(target_os = "windows", target_arch = "aarch64")))]
217218
pub fn poll_incoming(&self, timeout_ms: i64) -> zmq::Result<bool> {
218219
Ok(self.socket.poll(zmq::PollEvents::POLLIN, timeout_ms)? != 0)
219220
}
220221

222+
/// On Windows ARM, ZMQ poll with a non-zero timeout blocks forever
223+
/// instead of respecting the timeout. Use non-blocking poll with
224+
/// manual timing.
225+
#[cfg(all(target_os = "windows", target_arch = "aarch64"))]
226+
pub fn poll_incoming(&self, timeout_ms: i64) -> zmq::Result<bool> {
227+
if timeout_ms == 0 {
228+
return Ok(self.socket.poll(zmq::PollEvents::POLLIN, 0)? != 0);
229+
}
230+
231+
let start = std::time::Instant::now();
232+
let timeout = if timeout_ms < 0 {
233+
std::time::Duration::from_secs(u64::MAX / 2)
234+
} else {
235+
std::time::Duration::from_millis(timeout_ms as u64)
236+
};
237+
238+
loop {
239+
if self.socket.poll(zmq::PollEvents::POLLIN, 0)? != 0 {
240+
return Ok(true);
241+
}
242+
if start.elapsed() >= timeout {
243+
return Ok(false);
244+
}
245+
std::thread::sleep(std::time::Duration::from_millis(1));
246+
}
247+
}
248+
221249
pub fn has_incoming_data(&self) -> zmq::Result<bool> {
222250
self.poll_incoming(0)
223251
}

crates/amalthea/src/socket/shell.rs

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,27 @@ impl Shell {
111111

112112
/// Main loop for the Shell thread; to be invoked by the kernel.
113113
pub fn listen(&mut self) {
114+
#[cfg(all(target_os = "windows", target_arch = "aarch64"))]
115+
self.listen_polling();
116+
117+
#[cfg(not(all(target_os = "windows", target_arch = "aarch64")))]
118+
self.listen_blocking();
119+
}
120+
121+
/// On Windows ARM, zmq::poll with a non-zero timeout blocks forever
122+
/// and inproc notification sockets may not wake the poll. Use
123+
/// non-blocking poll with unconditional comm checks and a short sleep.
124+
#[cfg(all(target_os = "windows", target_arch = "aarch64"))]
125+
fn listen_polling(&mut self) {
126+
loop {
127+
self.process_comm_notification();
128+
self.process_shell_socket();
129+
std::thread::sleep(std::time::Duration::from_millis(1));
130+
}
131+
}
132+
133+
#[cfg(not(all(target_os = "windows", target_arch = "aarch64")))]
134+
fn listen_blocking(&mut self) {
114135
loop {
115136
log::trace!("Waiting for shell messages or comm events");
116137

@@ -139,24 +160,32 @@ impl Shell {
139160
}
140161

141162
if shell_readable {
142-
let message = match Message::read_from_socket(&self.socket) {
143-
Ok(m) => m,
144-
Err(err) => {
145-
log::warn!("Could not read message from shell socket: {err:?}");
146-
continue;
147-
},
148-
};
149-
150-
// Handle the message; any failures while handling the messages are
151-
// delivered to the client instead of reported up the stack, so the
152-
// only errors likely here are "can't deliver to client"
153-
if let Err(err) = self.process_message(message) {
154-
log::error!("Could not handle shell message: {err:?}");
155-
}
163+
self.process_shell_socket();
156164
}
157165
}
158166
}
159167

168+
fn process_shell_socket(&mut self) {
169+
if !self.socket.has_incoming_data().unwrap_or(false) {
170+
return;
171+
}
172+
173+
let message = match Message::read_from_socket(&self.socket) {
174+
Ok(m) => m,
175+
Err(err) => {
176+
log::warn!("Could not read message from shell socket: {err:?}");
177+
return;
178+
},
179+
};
180+
181+
// Handle the message; any failures while handling the messages are
182+
// delivered to the client instead of reported up the stack, so the
183+
// only errors likely here are "can't deliver to client"
184+
if let Err(err) = self.process_message(message) {
185+
log::error!("Could not handle shell message: {err:?}");
186+
}
187+
}
188+
160189
/// Process comm event notifications from the notifier thread.
161190
/// Drains all pending notifications and all pending events.
162191
fn process_comm_notification(&mut self) {

crates/ark_test/src/dummy_frontend.rs

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ impl DummyArkFrontend {
248248

249249
/// Receive from IOPub with a timeout.
250250
/// Returns `None` if the timeout expires before a message arrives.
251-
#[cfg(not(all(target_os = "windows", target_arch = "aarch64")))]
252251
pub fn recv_iopub_with_timeout(&self, timeout: Duration) -> Option<Message> {
253252
let timeout_ms = timeout.as_millis() as i64;
254253
if self.guard.iopub_socket.poll_incoming(timeout_ms).unwrap() {
@@ -258,34 +257,6 @@ impl DummyArkFrontend {
258257
}
259258
}
260259

261-
/// Receive from IOPub with a timeout.
262-
/// Returns `None` if the timeout expires before a message arrives.
263-
///
264-
/// On Windows ARM, ZMQ poll with timeout blocks forever instead of
265-
/// respecting the timeout. Use non-blocking poll with manual timing.
266-
#[cfg(all(target_os = "windows", target_arch = "aarch64"))]
267-
pub fn recv_iopub_with_timeout(&self, timeout: Duration) -> Option<Message> {
268-
let start = std::time::Instant::now();
269-
270-
loop {
271-
if start.elapsed() >= timeout {
272-
return None;
273-
}
274-
275-
// Use non-blocking poll (timeout=0) to avoid ZMQ blocking forever
276-
match self.guard.iopub_socket.poll_incoming(0) {
277-
Ok(true) => {
278-
return Some(Message::read_from_socket(&self.guard.iopub_socket).unwrap());
279-
},
280-
Ok(false) => {
281-
// No message available, sleep briefly and try again
282-
std::thread::sleep(Duration::from_millis(10));
283-
},
284-
Err(_) => return None,
285-
}
286-
}
287-
}
288-
289260
/// Core primitive: receive the next non-stream, non-variables-comm message
290261
/// from IOPub.
291262
///

0 commit comments

Comments
 (0)