Skip to content

Commit 19df35b

Browse files
authored
Shut down stdin thread on disconnect (#1238)
Should prevent the barrage of StdIn channel errors people are getting on restarts in some cases (e.g. linux). posit-dev/positron#7593 (comment)
1 parent d5432d9 commit 19df35b

2 files changed

Lines changed: 92 additions & 8 deletions

File tree

crates/amalthea/src/socket/stdin.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ impl Stdin {
7979
// don't need to listen to interrupts at this stage so we'd
8080
// only subscribe after receiving an input request, and the
8181
// loop/select below could be removed.
82+
//
83+
// If any of the channels disconnects, all senders have been
84+
// dropped, which signals shutdown. Exit cleanly rather than
85+
// spinning on the disconnected channel.
8286
let req: StdInRequest;
8387
loop {
8488
select! {
@@ -89,12 +93,16 @@ impl Stdin {
8993
break;
9094
},
9195
Err(err) => {
92-
log::error!("Could not read input request: {}", err);
93-
continue;
96+
log::trace!("StdIn shutting down: request channel disconnected: {err:?}");
97+
return;
9498
}
9599
}
96100
},
97-
recv(interrupt_rx) -> _ => {
101+
recv(interrupt_rx) -> msg => {
102+
if let Err(err) = msg {
103+
log::trace!("StdIn shutting down: interrupt channel disconnected: {err:?}");
104+
return;
105+
}
98106
continue;
99107
}
100108
};
@@ -131,19 +139,19 @@ impl Stdin {
131139
recv(self.inbound_rx) -> msg => match msg {
132140
Ok(m) => m,
133141
Err(err) => {
134-
log::error!("Could not read message from stdin socket: {err:?}");
135-
continue;
142+
log::trace!("StdIn shutting down: inbound socket disconnected: {err:?}");
143+
return;
136144
}
137145
},
138146
// Cancel current iteration if an interrupt is
139147
// signaled. We're no longer waiting for an `input_reply`
140148
// but for an `input_request`.
141149
recv(interrupt_rx) -> msg => {
142-
log::trace!("Received interrupt signal in StdIn");
143-
144150
if let Err(err) = msg {
145-
log::error!("Could not read interrupt message: {err:?}");
151+
log::trace!("StdIn shutting down: interrupt channel disconnected: {err:?}");
152+
return;
146153
}
154+
log::trace!("Received interrupt signal in StdIn");
147155

148156
match reply_tx {
149157
StdInReplySender::Input(_tx) => {

crates/ark/tests/stdin-shutdown.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* stdin-shutdown.rs
3+
*
4+
* Copyright (C) 2026 Posit Software, PBC. All rights reserved.
5+
*
6+
*/
7+
8+
//! Direct tests for `Stdin::listen` shutdown behaviour. Regression coverage
9+
//! for the issue where dropping the request or interrupt senders left the
10+
//! listen loop spinning on the disconnected channel, spamming error logs
11+
//! during kernel shutdown.
12+
13+
use std::thread;
14+
use std::time::Duration;
15+
16+
use amalthea::session::Session;
17+
use amalthea::socket::stdin::StdInRequest;
18+
use amalthea::socket::stdin::Stdin;
19+
use crossbeam::channel::bounded;
20+
use crossbeam::channel::unbounded;
21+
22+
/// Spawn `Stdin::listen` on its own thread and return a one-shot receiver
23+
/// that fires after `listen` returns.
24+
fn spawn_listen() -> (
25+
crossbeam::channel::Sender<StdInRequest>,
26+
crossbeam::channel::Sender<bool>,
27+
crossbeam::channel::Receiver<()>,
28+
) {
29+
let session = Session::create("").unwrap();
30+
31+
let (_inbound_tx, inbound_rx) = unbounded();
32+
let (outbound_tx, _outbound_rx) = unbounded();
33+
let stdin = Stdin::new(inbound_rx, outbound_tx, session);
34+
35+
let (stdin_request_tx, stdin_request_rx) = unbounded::<StdInRequest>();
36+
let (stdin_reply_tx, _stdin_reply_rx) = unbounded();
37+
let (interrupt_tx, interrupt_rx) = bounded::<bool>(1);
38+
39+
let (done_tx, done_rx) = bounded::<()>(1);
40+
thread::spawn(move || {
41+
stdin.listen(stdin_request_rx, stdin_reply_tx, interrupt_rx);
42+
let _ = done_tx.send(());
43+
});
44+
45+
// Keep the inbound and outbound channel endpoints alive for the duration
46+
// of the test by leaking the receivers/sender we don't return. They're
47+
// captured by the closure above when the thread reads them; once `listen`
48+
// exits the thread drops them along with `stdin`.
49+
(stdin_request_tx, interrupt_tx, done_rx)
50+
}
51+
52+
#[test]
53+
fn test_stdin_exits_when_request_channel_disconnects() {
54+
let (stdin_request_tx, _interrupt_tx, done_rx) = spawn_listen();
55+
56+
// Closing all senders for the request channel signals shutdown.
57+
drop(stdin_request_tx);
58+
59+
done_rx
60+
.recv_timeout(Duration::from_secs(2))
61+
.expect("Stdin::listen did not exit when request channel disconnected");
62+
}
63+
64+
#[test]
65+
fn test_stdin_exits_when_interrupt_channel_disconnects() {
66+
let (_stdin_request_tx, interrupt_tx, done_rx) = spawn_listen();
67+
68+
// Closing all senders for the interrupt channel also signals shutdown.
69+
// Without the fix, the inner select would keep firing on the
70+
// disconnected interrupt channel and never make progress.
71+
drop(interrupt_tx);
72+
73+
done_rx
74+
.recv_timeout(Duration::from_secs(2))
75+
.expect("Stdin::listen did not exit when interrupt channel disconnected");
76+
}

0 commit comments

Comments
 (0)