Skip to content

Commit 9872bb5

Browse files
Make channel cleanup idempotent
Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4
1 parent d48cb72 commit 9872bb5

5 files changed

Lines changed: 37 additions & 4 deletions

File tree

lib/async/container/channel.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ def initialize(timeout: 1.0)
2525

2626
# Close the input end of the pipe.
2727
def close_read
28-
@in.close
28+
@in.close unless @in.closed?
29+
rescue IOError, Errno::EBADF
2930
end
3031

3132
# Close the output end of the pipe.
3233
def close_write
33-
@out.close
34+
@out.close unless @out.closed?
35+
rescue IOError, Errno::EBADF
3436
end
3537

3638
# Close both ends of the pipe.

lib/async/container/forked.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,15 @@ def exec(*arguments, ready: true, **options)
101101
def self.fork(**options)
102102
# $stderr.puts fork: caller
103103
self.new(**options) do |process|
104+
# Fork from `Thread.new` so the child does not inherit the parent fiber scheduler or the current caller's fiber stack. Only this short-lived thread is copied into the child process:
104105
::Thread.new do
105106
::Process.fork do
106107
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
107108
Signal.trap(:INT){::Thread.current.raise(Interrupt)}
108109
Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT.
109110
Signal.trap(:HUP){::Thread.current.raise(Restart)}
110111

111-
# This could be a configuration option:
112+
# Reset `SignalException` delivery because CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`. Async deliberately masks signal exceptions, and the signal traps above should be delivered promptly:
112113
::Thread.handle_interrupt(SignalException => :immediate) do
113114
yield Instance.for(process)
114115
rescue Interrupt

lib/async/container/threaded.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def exec(*arguments, ready: true, **options)
114114
def self.fork(**options)
115115
self.new(**options) do |thread|
116116
::Thread.new do
117-
# This could be a configuration option (see forked implementation too):
117+
# Reset `SignalException` delivery because CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`. Async deliberately masks signal exceptions, and signal-driven thread control should be delivered promptly:
118118
::Thread.handle_interrupt(SignalException => :immediate) do
119119
yield Instance.for(thread)
120120
end

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Make channel close operations idempotent, so cleanup paths can safely close already-closed pipe ends.
6+
37
## v0.37.0
48

59
- Rename `ASYNC_CONTAINER_GRACEFUL_TIMEOUT` to `ASYNC_CONTAINER_GRACEFUL_STOP` and apply it at the controller level as `GRACEFUL_STOP`. `Group#stop` now only applies the shutdown policy it is given.

test/async/container/channel.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,32 @@
3030
expect(channel.receive).to be_nil
3131
end
3232

33+
with "#close" do
34+
it "can close more than once" do
35+
channel.close
36+
37+
expect do
38+
channel.close
39+
end.not.to raise_exception
40+
end
41+
42+
it "can close the input end after it was already closed" do
43+
channel.in.close
44+
45+
expect do
46+
channel.close_read
47+
end.not.to raise_exception
48+
end
49+
50+
it "can close the output end after it was already closed" do
51+
channel.out.close
52+
53+
expect do
54+
channel.close_write
55+
end.not.to raise_exception
56+
end
57+
end
58+
3359
with "timeout" do
3460
let(:channel) {subject.new(timeout: 0.001)}
3561

0 commit comments

Comments
 (0)