Skip to content

Commit 76df7d5

Browse files
committed
Better handling of graceful timeout by Hybrid container. Fixes #56.
1 parent 03d539f commit 76df7d5

3 files changed

Lines changed: 61 additions & 7 deletions

File tree

lib/async/container/generic.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ def wait
115115
@group.wait
116116
end
117117

118+
# Gracefully interrupt all child instances.
119+
def interrupt
120+
@group.interrupt
121+
end
122+
118123
# Returns true if all children instances have the specified status flag set.
119124
# e.g. `:ready`.
120125
# This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details.

lib/async/container/hybrid.rb

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,15 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio
3131
container.wait_until_ready
3232
instance.ready!
3333

34-
container.wait
35-
rescue Async::Container::Terminate
36-
# Stop it immediately:
37-
container.stop(false)
38-
raise
34+
begin
35+
container.wait
36+
rescue Interrupt
37+
# Gracefully interrupt child threads; parent process handles escalation.
38+
container.interrupt
39+
retry
40+
end
3941
ensure
40-
# Stop it gracefully (also code path for Interrupt):
41-
container.stop
42+
container.stop(false)
4243
end
4344
end
4445

test/async/container/hybrid.rb

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,52 @@
1313
it "should be multiprocess" do
1414
expect(subject).to be(:multiprocess?)
1515
end
16+
17+
it "forcefully stops the inner threaded container on exit" do
18+
stop_arguments = []
19+
interrupt_count = 0
20+
21+
threaded_class = Class.new
22+
threaded_class.define_method(:run) do |**options, &block|
23+
self
24+
end
25+
threaded_class.define_method(:wait_until_ready) do
26+
end
27+
threaded_class.define_method(:wait) do
28+
@wait_count ||= 0
29+
@wait_count += 1
30+
31+
raise Interrupt if @wait_count == 1
32+
end
33+
threaded_class.define_method(:interrupt) do
34+
interrupt_count += 1
35+
end
36+
threaded_class.define_method(:stop) do |graceful = true|
37+
stop_arguments << graceful
38+
end
39+
40+
container_class = Class.new(subject) do
41+
def spawn(**options, &block)
42+
instance = Object.new
43+
def instance.ready!
44+
end
45+
46+
block.call(instance)
47+
end
48+
end
49+
50+
original_threaded = Async::Container.send(:remove_const, :Threaded)
51+
Async::Container.const_set(:Threaded, threaded_class)
52+
53+
container = container_class.new
54+
container.run(count: 1, forks: 1, threads: 1) do |instance|
55+
# No-op.
56+
end
57+
58+
expect(interrupt_count).to be == 1
59+
expect(stop_arguments).to be == [false]
60+
ensure
61+
Async::Container.send(:remove_const, :Threaded)
62+
Async::Container.const_set(:Threaded, original_threaded)
63+
end
1664
end if Async::Container.fork?

0 commit comments

Comments
 (0)