From 3007892df8946817162b50bdf5c427636ce098dd Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 15:59:42 +1200 Subject: [PATCH 01/20] Simplify container supervision using Async Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- fixtures/async/container/a_container.rb | 32 +++ lib/async/container/channel.rb | 6 +- lib/async/container/forked.rb | 73 ++++-- lib/async/container/generic.rb | 140 +++++++---- lib/async/container/group.rb | 257 ++++++++------------ test/async/container/group.rb | 300 ++++++------------------ 6 files changed, 351 insertions(+), 457 deletions(-) diff --git a/fixtures/async/container/a_container.rb b/fixtures/async/container/a_container.rb index b3ceb16..7d5da05 100644 --- a/fixtures/async/container/a_container.rb +++ b/fixtures/async/container/a_container.rb @@ -73,6 +73,38 @@ module Container end end + with "Async compatibility" do + it "can wait inside an Async task after spawning outside Async" do + input, output = IO.pipe + + container.spawn do + output.write(".") + end + + Async do + container.wait + end + + output.close + expect(input.read).to be == "." + end + + it "can spawn and wait inside the same Async task" do + input, output = IO.pipe + + Async do + container.spawn do + output.write(".") + end + + container.wait + end + + output.close + expect(input.read).to be == "." + end + end + it "should be blocking" do skip "Fiber.blocking? is not supported!" unless Fiber.respond_to?(:blocking?) diff --git a/lib/async/container/channel.rb b/lib/async/container/channel.rb index d207442..30283fd 100644 --- a/lib/async/container/channel.rb +++ b/lib/async/container/channel.rb @@ -25,12 +25,14 @@ def initialize(timeout: 1.0) # Close the input end of the pipe. def close_read - @in.close + @in.close unless @in.closed? + rescue IOError, Errno::EBADF end # Close the output end of the pipe. def close_write - @out.close + @out.close unless @out.closed? + rescue IOError, Errno::EBADF end # Close both ends of the pipe. diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 15cc05f..61fe6ea 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -101,23 +101,25 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) # $stderr.puts fork: caller self.new(**options) do |process| - ::Process.fork do - # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. - Signal.trap(:INT){::Thread.current.raise(Interrupt)} - Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. - Signal.trap(:HUP){::Thread.current.raise(Restart)} - - # This could be a configuration option: - ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(process) - rescue Interrupt - # Graceful exit. - rescue Exception => error - Console.error(self, error) + ::Thread.new do + ::Process.fork do + # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. + Signal.trap(:INT){::Thread.current.raise(Interrupt)} + Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. + Signal.trap(:HUP){::Thread.current.raise(Restart)} - exit!(1) + # This could be a configuration option: + ::Thread.handle_interrupt(SignalException => :immediate) do + yield Instance.for(process) + rescue Interrupt + # Graceful exit. + rescue Exception => error + Console.error(self, error) + + exit!(1) + end end - end + end.value end end @@ -151,6 +153,21 @@ def initialize(name: nil, **options) self.close_write end + # A minimal status for children reaped outside this object. + class Status + def success? + true + end + + def to_i + 0 + end + + def to_s + "\#<#{self.class} success>" + end + end + # Convert the child process to a hash, suitable for serialization. # # @returns [Hash] The request as a hash. @@ -236,24 +253,28 @@ def restart! # @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination. # @returns [::Process::Status] The process exit status. def wait(timeout = 0.1) - if @pid && @status.nil? - Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}, timeout: timeout) - - _, @status = ::Process.wait2(@pid, ::Process::WNOHANG) - - if @status.nil? - sleep(timeout) if timeout + begin + if @pid && @status.nil? + Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}, timeout: timeout) _, @status = ::Process.wait2(@pid, ::Process::WNOHANG) if @status.nil? - Console.warn(self, "Process is blocking, sending kill signal...", child: {process_id: @pid}, timeout: timeout) - self.kill! + sleep(timeout) if timeout + + _, @status = ::Process.wait2(@pid, ::Process::WNOHANG) - # Wait for the process to exit: - _, @status = ::Process.wait2(@pid) + if @status.nil? + Console.warn(self, "Process is blocking, sending kill signal...", child: {process_id: @pid}, timeout: timeout) + self.kill! + + # Wait for the process to exit: + _, @status = ::Process.wait2(@pid) + end end end + rescue Errno::ECHILD + @status = Status.new end Console.debug(self, "Process exited.", child: {process_id: @pid, status: @status}) diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index d7e1551..fc5a95f 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -5,6 +5,7 @@ # Copyright, 2025, by Marc-André Cournoyer. require "etc" +require "io/wait" require "async/clock" require_relative "group" @@ -144,8 +145,6 @@ def wait_until_ready end end - self.sleep - if self.status?(:ready) Console.debug(self) do |buffer| buffer.puts "All ready:" @@ -156,6 +155,8 @@ def wait_until_ready return true end + + self.sleep end end @@ -222,13 +223,22 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu end @statistics.spawn! + started = ::Thread::Queue.new - fiber do + @group.supervise do + first = true + until @stopping Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) child = self.start(name, &block) state = insert(key, child) + @group.insert(child) + + if first + started << true + first = false + end # Notify policy of spawn begin @@ -247,36 +257,11 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu status = nil begin - status = @group.wait_for(child) do |message| - case message - when :health_check! - if state[:ready] - # If a health check timeout is specified, we will monitor the child process and terminate it if it does not update its state within the specified time. - if health_check_timeout - if health_check_timeout < age_clock.total - health_check_failed(child, age_clock, health_check_timeout) - end - end - else - # If a startup timeout is specified, we will monitor the child process and terminate it if it does not become ready within the specified time. - if startup_timeout - if startup_timeout < age_clock.total - startup_failed(child, age_clock, startup_timeout) - end - end - end - else - state.update(message) - - # Reset the age clock if the child has become ready: - if state[:ready] - age_clock&.reset! - end - end - end + status = wait_for(child, state, age_clock, health_check_timeout, startup_timeout) rescue => error Console.error(self, "Error during child process management!", exception: error, stopping: @stopping) ensure + @group.delete(child) delete(key, child) end @@ -300,7 +285,17 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu break end end - end.resume + rescue => error + started << error if first + raise + ensure + started << false if first + end + + case result = started.pop + when Exception + raise result + end return true end @@ -363,6 +358,78 @@ def key?(key) protected + def wait_for(child, state, age_clock, health_check_timeout, startup_timeout) + while true + timeout = next_timeout(state, age_clock, health_check_timeout, startup_timeout) + + if wait_readable(child, timeout) + if message = child.receive + state.update(message) + + # Reset the age clock if the child has become ready: + if state[:ready] + age_clock&.reset! + end + + @group.health_check! + else + break + end + else + break if check_timeout(child, state, age_clock, health_check_timeout, startup_timeout) + end + end + + return child.wait + end + + def wait_readable(child, timeout) + child.in.wait_readable(timeout) + rescue IO::TimeoutError + false + end + + def next_timeout(state, age_clock, health_check_timeout, startup_timeout) + return nil unless age_clock + + timeout = if state[:ready] + health_check_timeout + else + startup_timeout + end + + if timeout + remaining = timeout - age_clock.total + return 0 if remaining.negative? + + if interval = @group.health_check_interval + return [remaining, interval].min + else + return remaining + end + elsif interval = @group.health_check_interval + return interval + end + end + + def check_timeout(child, state, age_clock, health_check_timeout, startup_timeout) + return false unless age_clock + + if state[:ready] + if health_check_timeout && health_check_timeout < age_clock.total + health_check_failed(child, age_clock, health_check_timeout) + return true + end + else + if startup_timeout && startup_timeout < age_clock.total + startup_failed(child, age_clock, startup_timeout) + return true + end + end + + return false + end + # Register the child (value) as running. def insert(key, child) if key @@ -385,17 +452,6 @@ def delete(key, child) @state.delete(child) end - private - - if Fiber.respond_to?(:blocking?) - def fiber(&block) - Fiber.new(blocking: true, &block) - end - else - def fiber(&block) - Fiber.new(&block) - end - end end end end diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index fb665f8..1db7925 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -3,7 +3,7 @@ # Released under the MIT License. # Copyright, 2018-2026, by Samuel Williams. -require "fiber" +require "async" require "async/clock" require_relative "error" @@ -14,9 +14,9 @@ module Container GRACEFUL_TIMEOUT = ENV.fetch("ASYNC_CONTAINER_GRACEFUL_TIMEOUT", "true").then do |value| case value when "true" - true # Default timeout for graceful termination. + true when "false" - false # Immediately kill the processes. + false else value.to_f end @@ -25,229 +25,170 @@ module Container # The default timeout for graceful termination. DEFAULT_GRACEFUL_TIMEOUT = 10.0 - # Manages a group of running processes. + # Internal child supervision registry. + # + # This object intentionally does not model a public collection. It owns the + # Async task context used for child supervisors and provides just enough + # coordination for container-level wait, sleep and shutdown operations. class Group - # Initialize an empty group. - # - # @parameter health_check_interval [Numeric | Nil] The (biggest) interval at which health checks are performed. def initialize(health_check_interval: 1.0) @health_check_interval = health_check_interval - # The running fibers, indexed by IO: - @running = {} + @mutex = Mutex.new + @children = {} + @supervisors = 0 + @events = ::Thread::Queue.new + + @jobs = ::Thread::Queue.new + @thread = nil end - # @returns [String] A human-readable representation of the group. + attr :health_check_interval + def inspect - "#<#{self.class} running=#{@running.size}>" + "#<#{self.class} running=#{size}>" end - # @attribute [Hash(IO, Fiber)] the running tasks, indexed by IO. - attr :running - - # @returns [Integer] The number of running processes. def size - @running.size + @mutex.synchronize{@children.size} end - # Whether the group contains any running processes. - # @returns [Boolean] def running? - @running.any? + @mutex.synchronize{@supervisors.positive?} end - # Whether the group contains any running processes. - # @returns [Boolean] def any? - @running.any? + running? end - # Whether the group is empty. - # @returns [Boolean] def empty? - @running.empty? + !running? end - # Sleep for at most the specified duration until some state change occurs. - def sleep(duration) - self.wait_for_children(duration) + # Compatibility for older tests/code that inspected the implementation. + def running + @mutex.synchronize{@children.dup} end - # Begin any outstanding queued processes and wait for them indefinitely. - def wait - with_health_checks do |duration| - self.wait_for_children(duration) + def supervise(&block) + @mutex.synchronize{@supervisors += 1} + + schedule do + begin + block.call + ensure + @mutex.synchronize{@supervisors -= 1} + signal! + end end end - private def with_health_checks - if @health_check_interval - health_check_clock = Clock.start - - while self.running? - duration = [@health_check_interval - health_check_clock.total, 0].max - - yield duration - - if health_check_clock.total > @health_check_interval - self.health_check! - health_check_clock.reset! - end - end - else - while self.running? - yield nil - end + def insert(child) + @mutex.synchronize{@children[child] = true} + end + + def delete(child) + @mutex.synchronize{@children.delete(child)} + signal! + end + + def sleep(duration = nil) + ::Thread.handle_interrupt(SignalException => :immediate) do + @events.pop(timeout: duration) end end - private def each_running(&block) - # We create a copy of the values here, in case the block modifies the running set: - @running.values.each(&block) + def wait + sleep while running? end - # Perform a health check on all running processes. def health_check! - each_running do |fiber| - fiber.resume(:health_check!) - end + signal! end - # Interrupt all running processes. - # This resumes the controlling fiber with an instance of {Interrupt}. def interrupt - Console.info(self, "Sending interrupt to #{@running.size} running processes...") - each_running do |fiber| - fiber.resume(Interrupt) - end + Console.info(self, "Sending interrupt to #{size} running children...") + each_child(&:interrupt!) end - # Terminate all running processes. - # This resumes the controlling fiber with an instance of {Terminate}. def terminate - Console.info(self, "Sending terminate to #{@running.size} running processes...") - each_running do |fiber| - fiber.resume(Terminate) - end + Console.info(self, "Sending terminate to #{size} running children...") + each_child(&:terminate!) end - # Kill all running processes. - # This resumes the controlling fiber with an instance of {Kill}. def kill - Console.info(self, "Sending kill to #{@running.size} running processes...") - each_running do |fiber| - fiber.resume(Kill) - end + Console.info(self, "Sending kill to #{size} running children...") + each_child(&:kill!) end - private def wait_for_exit(clock, timeout) - while self.any? - duration = timeout - clock.total - - if duration >= 0 - self.wait_for_children(duration) - else - self.wait_for_children(0) - break - end - end - end - - # Stop all child processes with a multi-phase shutdown sequence. - # - # A graceful shutdown performs the following sequence: - # 1. Send SIGINT and wait up to `graceful` seconds if specified. - # 2. Send SIGKILL and wait indefinitely for process cleanup. - # - # If `graceful` is true, default to `DEFAULT_GRACEFUL_TIMEOUT` (10 seconds). - # If `graceful` is false, skip the SIGINT phase and go directly to SIGKILL. - # - # @parameter graceful [Boolean | Numeric] Whether to send SIGINT first or skip directly to SIGKILL. def stop(graceful = GRACEFUL_TIMEOUT) - Console.debug(self, "Stopping all processes...", graceful: graceful) + Console.debug(self, "Stopping all children...", graceful: graceful) - # If a timeout is specified, interrupt the children first: if graceful - # Send SIGINT to the children: - self.interrupt - - if graceful == true - graceful = DEFAULT_GRACEFUL_TIMEOUT - end - - clock = Clock.start + interrupt - # Wait for the children to exit: - self.wait_for_exit(clock, graceful) + graceful = DEFAULT_GRACEFUL_TIMEOUT if graceful == true + wait_for_exit(Clock.start, graceful) end ensure - # Do our best to clean up the children: - if any? + if running? if graceful - Console.warn(self, "Killing processes after graceful shutdown failed...", size: self.size, clock: clock) + Console.warn(self, "Killing children after graceful shutdown failed...", size: size) end - self.kill - self.wait + kill + wait end end - # Wait for a message in the specified {Channel}. - def wait_for(channel) - io = channel.in - - @running[io] = Fiber.current + private + + def schedule(&block) + start_reactor - while @running.key?(io) - # Wait for some event on the channel: - result = Fiber.yield - - if result == Interrupt - channel.interrupt! - elsif result == Terminate - channel.terminate! - elsif result == Kill - channel.kill! - elsif result - yield result - elsif message = channel.receive - yield message - else - # Wait for the channel to exit: - return channel.wait - end + @jobs << proc do |parent| + parent.async(&block) end - ensure - @running.delete(io) end - protected - - def wait_for_children(duration = nil) - # This log is a bit noisy and doesn't really provide a lot of useful information: - Console.debug(self, "Waiting for children...", duration: duration, running: @running) + def start_reactor + return if @thread&.alive? - unless @running.empty? - # Maybe consider using a proper event loop here: - if ready = self.select(duration) - ready.each do |io| - if fiber = @running[io] - # This method can be re-entered. While resuming a fiber, a policy hook may be invoked, which may invoke operations on the container. In that case, select may be called again on the same set of waiting fibers. On returning, those fibers may have already completed and removed themselves from @running, so we need to check for that. - fiber.resume - end + @thread = ::Thread.new do + Sync do |parent| + while job = @jobs.pop + job.call(parent) end end end + + @thread.report_on_exception = false + @thread.name = "async-container supervisor" end - # Wait for a child process to exit OR a signal to be received. - def select(duration) - ::Thread.handle_interrupt(SignalException => :immediate) do - readable, _, _ = ::IO.select(@running.keys, nil, nil, duration) + def each_child + children = @mutex.synchronize{@children.keys} + + children.each do |child| + yield child + rescue Errno::ESRCH + # The child has already exited. + end + end + + def wait_for_exit(clock, timeout) + while running? + duration = timeout - clock.total + + break if duration.negative? - return readable + sleep(duration) end end + + def signal! + @events << true + end end end end diff --git a/test/async/container/group.rb b/test/async/container/group.rb index 11cbfe5..d38e1e7 100644 --- a/test/async/container/group.rb +++ b/test/async/container/group.rb @@ -4,28 +4,41 @@ # Copyright, 2025-2026, by Samuel Williams. require "async/container/group" -require "async/container/channel" describe Async::Container::Group do let(:group) {Async::Container::Group.new} + class FakeChild + def initialize + @events = [] + end + + attr :events + + def interrupt! + @events << :interrupt + end + + def terminate! + @events << :terminate + end + + def kill! + @events << :kill + end + end + with "#size" do it "returns zero for empty group" do expect(group.size).to be == 0 end - it "returns the number of running processes" do - channel1 = Async::Container::Channel.new - channel2 = Async::Container::Channel.new - - fiber1 = Fiber.new{Fiber.yield} - fiber2 = Fiber.new{Fiber.yield} + it "returns the number of registered children" do + child1 = FakeChild.new + child2 = FakeChild.new - fiber1.resume - fiber2.resume - - group.running[channel1.in] = fiber1 - group.running[channel2.in] = fiber2 + group.insert(child1) + group.insert(child2) expect(group.size).to be == 2 end @@ -36,30 +49,20 @@ expect(group).not.to be(:running?) end - it "returns true when processes are running" do - channel = Async::Container::Channel.new - fiber = Fiber.new{Fiber.yield} - fiber.resume + it "returns true while a supervisor is running" do + queue = Thread::Queue.new + release = Thread::Queue.new - group.running[channel.in] = fiber + group.supervise do + queue << :running + release.pop + end + expect(queue.pop).to be == :running expect(group).to be(:running?) - end - end - - with "#any?" do - it "returns false for empty group" do - expect(group).not.to be(:any?) - end - - it "returns true when processes are running" do - channel = Async::Container::Channel.new - fiber = Fiber.new{Fiber.yield} - fiber.resume - group.running[channel.in] = fiber - - expect(group).to be(:any?) + release << true + group.wait end end @@ -67,16 +70,6 @@ it "returns true for empty group" do expect(group).to be(:empty?) end - - it "returns false when processes are running" do - channel = Async::Container::Channel.new - fiber = Fiber.new{Fiber.yield} - fiber.resume - - group.running[channel.in] = fiber - - expect(group).not.to be(:empty?) - end end with "#inspect" do @@ -85,219 +78,68 @@ expect(group.inspect).to be =~ /running=0/ end - it "shows the number of running processes" do - channel = Async::Container::Channel.new - fiber = Fiber.new{Fiber.yield} - fiber.resume - - group.running[channel.in] = fiber + it "shows the number of registered children" do + child = FakeChild.new + group.insert(child) expect(group.inspect).to be =~ /running=1/ end end - with "#health_check!" do - it "resumes all fibers with :health_check! message" do - messages = [] + with "bulk child control" do + it "interrupts all registered children" do + child1 = FakeChild.new + child2 = FakeChild.new - 2.times do - channel = Async::Container::Channel.new - fiber = Fiber.new do - result = Fiber.yield - messages << result - end - - fiber.resume - group.running[channel.in] = fiber - end - - group.health_check! + group.insert(child1) + group.insert(child2) + group.interrupt - expect(messages).to be == [:health_check!, :health_check!] + expect(child1.events).to be == [:interrupt] + expect(child2.events).to be == [:interrupt] end - it "does nothing for empty group" do - expect do - group.health_check! - end.not.to raise_exception - end - end - - with "#interrupt" do - it "resumes all fibers with Interrupt" do - messages = [] - - 2.times do - channel = Async::Container::Channel.new - fiber = Fiber.new do - result = Fiber.yield - messages << result - end - - fiber.resume - group.running[channel.in] = fiber - end + it "terminates all registered children" do + child = FakeChild.new - group.interrupt + group.insert(child) + group.terminate - expect(messages).to be == [Async::Container::Interrupt, Async::Container::Interrupt] + expect(child.events).to be == [:terminate] end - end - - with "#terminate" do - it "resumes all fibers with Terminate" do - messages = [] - - 2.times do - channel = Async::Container::Channel.new - fiber = Fiber.new do - result = Fiber.yield - messages << result - end - - fiber.resume - group.running[channel.in] = fiber - end + + it "kills all registered children" do + child = FakeChild.new - group.terminate + group.insert(child) + group.kill - expect(messages).to be == [Async::Container::Terminate, Async::Container::Terminate] + expect(child.events).to be == [:kill] end end - with "#kill" do - it "resumes all fibers with Kill" do - messages = [] - - 2.times do - channel = Async::Container::Channel.new - fiber = Fiber.new do - result = Fiber.yield - messages << result - end - - fiber.resume - group.running[channel.in] = fiber + with "#sleep" do + it "wakes when child state changes" do + thread = Thread.new do + group.sleep + :woken end - group.kill + group.health_check! - expect(messages).to be == [Async::Container::Kill, Async::Container::Kill] + expect(thread.value).to be == :woken end end - # Regression test for a bug where restarting a child during health check caused - # "RuntimeError: can't add a new key into hash during iteration" - # - # The scenario: - # - A container spawns children with `restart: true` and `health_check_timeout: N` - # - health_check! calls @running.each_value { |fiber| fiber.resume(:health_check!) } - # - A resumed fiber detects health check failure and kills the child - # - The spawn fiber's while loop continues (restart: true) and calls wait_for with a new child - # - wait_for tries to add to @running while health_check! is still iterating - # - This used to cause: RuntimeError: can't add a new key into hash during iteration - it "can restart child during health_check! iteration without error" do - channel1 = Async::Container::Channel.new - channel2 = Async::Container::Channel.new - - # Simulate the spawn fiber that restarts on health check failure - restart = true - fiber = Fiber.new do - while restart - result = Fiber.yield # Wait to be resumed - - if result == :health_check! - # Health check failed! Simulate the restart logic: - # The wait_for will return (simulated by breaking from this iteration) - # and the while loop continues, creating a new child - - # Simulate: child.kill! happens, wait_for returns - # Now the while loop continues and calls wait_for with new child - Fiber.new do - group.wait_for(channel2) do |msg| - # New child waiting - end - end.resume - - restart = false # Only do this once for the test - end + with "#wait" do + it "waits until all supervisors finish" do + group.supervise do + sleep(0.01) end - end - - # Start the fiber and add it to @running (simulating first wait_for call) - fiber.resume - group.running[channel1.in] = fiber - - # The fix ensures this doesn't raise RuntimeError during iteration - expect do - group.health_check! - end.not.to raise_exception - end - - # Regression test with multiple children where one restarts during health check - it "can handle one of multiple children restarting during health check" do - # Create two children, both with restart capability - 2.times do |i| - channel = Async::Container::Channel.new - fiber = Fiber.new do - iteration = 0 - loop do - iteration += 1 - result = Fiber.yield - - # First child fails health check on first iteration - if i == 0 && iteration == 1 && result == :health_check! - # Simulate health check failure and restart: - # Kill the old child, create new one - new_channel = Async::Container::Channel.new - - # This mimics what happens in spawn's while @running loop - # after wait_for returns due to child being killed - group.wait_for(new_channel) do |msg| - # New child process - end - - break # Exit this child's loop - end - end - end + group.wait - fiber.resume - group.running[channel.in] = fiber + expect(group).not.to be(:running?) end - - # The fix ensures this doesn't raise RuntimeError when the first fiber restarts - expect do - group.health_check! - end.not.to raise_exception - end - - it "handles nil fiber in @running during iteration (re-entrance scenario)" do - # This test simulates a scenario where: - # 1. IO.select returns [io1, io2] - # 2. While resuming fiber for io1, a re-entrant call completes fiber for io2 - # 3. When iteration continues to io2, @running[io2] is nil - # Without defensive check (&.), this would crash with NoMethodError - - channel1 = Async::Container::Channel.new - channel2 = Async::Container::Channel.new - - fiber1 = Fiber.new{group.running.delete(channel2.in)} - fiber2 = Fiber.new{Fiber.yield} - - fiber2.resume - - group.running[channel1.in] = fiber1 - group.running[channel2.in] = fiber2 - - # Mock select to return both channels: - expect(group).to receive(:select).and_return([channel1.in, channel2.in]) - - # This should not crash due to &. operator: - group.sleep(0) - - # Verify fiber2 was removed - expect(group.running.key?(channel2.in)).to be == false end end From 57e9e4bfc20d1b3b1d876d15eef06d9c140dbbc1 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 17:05:42 +1200 Subject: [PATCH 02/20] Use child wait as supervision source of truth Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/forked.rb | 23 ++----- lib/async/container/generic.rb | 104 +++++++++++++++++++++----------- lib/async/container/threaded.rb | 16 +++-- 3 files changed, 84 insertions(+), 59 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 61fe6ea..d648e0d 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -250,28 +250,13 @@ def restart! # Wait for the child process to exit. # @asynchronous This method may block. # - # @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination. + # @parameter timeout [Numeric | Nil] Ignored; retained for compatibility. # @returns [::Process::Status] The process exit status. - def wait(timeout = 0.1) + def wait(timeout = nil) begin if @pid && @status.nil? - Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}, timeout: timeout) - - _, @status = ::Process.wait2(@pid, ::Process::WNOHANG) - - if @status.nil? - sleep(timeout) if timeout - - _, @status = ::Process.wait2(@pid, ::Process::WNOHANG) - - if @status.nil? - Console.warn(self, "Process is blocking, sending kill signal...", child: {process_id: @pid}, timeout: timeout) - self.kill! - - # Wait for the process to exit: - _, @status = ::Process.wait2(@pid) - end - end + Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}) + _, @status = ::Process.wait2(@pid) end rescue Errno::ECHILD @status = Status.new diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index fc5a95f..a41f286 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -7,6 +7,7 @@ require "etc" require "io/wait" require "async/clock" +require "async/queue" require_relative "group" require_relative "keyed" @@ -359,62 +360,95 @@ def key?(key) protected def wait_for(child, state, age_clock, health_check_timeout, startup_timeout) + parent = Async::Task.current + result = Async::Queue.new + + waiter = parent.async do + begin + result << child.wait + rescue Exception => error + result << error + end + end + + reader = parent.async do + read_notifications(child, state, age_clock) + end + + monitor = if age_clock + parent.async do + monitor_health(child, state, age_clock, health_check_timeout, startup_timeout) + end + end + + status = result.pop + raise status if status.is_a?(Exception) + + return status + ensure + reader&.stop + monitor&.stop + waiter&.stop + end + + def read_notifications(child, state, age_clock) while true - timeout = next_timeout(state, age_clock, health_check_timeout, startup_timeout) + child.in.wait_readable - if wait_readable(child, timeout) - if message = child.receive - state.update(message) - - # Reset the age clock if the child has become ready: - if state[:ready] - age_clock&.reset! - end - - @group.health_check! - else - break + if message = child.receive + state.update(message) + + # Reset the age clock if the child has become ready: + if state[:ready] + age_clock&.reset! end + + @group.health_check! else - break if check_timeout(child, state, age_clock, health_check_timeout, startup_timeout) + break end end - - return child.wait + rescue IO::TimeoutError + retry + rescue IOError, Errno::EBADF + # The notification pipe was closed while the child waiter was exiting. end - def wait_readable(child, timeout) - child.in.wait_readable(timeout) - rescue IO::TimeoutError - false + def monitor_health(child, state, age_clock, health_check_timeout, startup_timeout) + while true + if timeout = next_timeout(state, age_clock, health_check_timeout, startup_timeout) + sleep(timeout) + + break if check_timeout(child, state, age_clock, health_check_timeout, startup_timeout) + else + sleep(@group.health_check_interval || 1.0) + end + + if @group.health_check_interval + @group.health_check! + end + end end def next_timeout(state, age_clock, health_check_timeout, startup_timeout) - return nil unless age_clock - timeout = if state[:ready] health_check_timeout else startup_timeout end - if timeout - remaining = timeout - age_clock.total - return 0 if remaining.negative? - - if interval = @group.health_check_interval - return [remaining, interval].min - else - return remaining - end - elsif interval = @group.health_check_interval - return interval + return unless timeout + + remaining = timeout - age_clock.total + + if remaining.positive? + remaining + else + 0 end end def check_timeout(child, state, age_clock, health_check_timeout, startup_timeout) - return false unless age_clock - if state[:ready] if health_check_timeout && health_check_timeout < age_clock.total health_check_failed(child, age_clock, health_check_timeout) diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index b1be095..ef31b28 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -221,14 +221,20 @@ def restart! # # @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination. # @returns [Status] - def wait(timeout = 0.1) + def wait(timeout = nil) if @waiter Console.debug(self, "Waiting for thread to exit...", child: {thread_id: @thread.object_id}, timeout: timeout) - unless @waiter.join(timeout) - Console.warn(self, "Thread is blocking, sending kill signal...", child: {thread_id: @thread.object_id}, timeout: timeout) - self.kill! - @waiter.join + if timeout + unless @waiter.join(timeout) + Console.warn(self, "Thread is blocking, sending kill signal...", child: {thread_id: @thread.object_id}, timeout: timeout) + self.kill! + @waiter.join + end + else + until @waiter.join(0) + sleep(0.1) + end end @waiter = nil From d1155de7fe0f24e3901df1cac6d75b27e4c1fd5c Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 19:17:33 +1200 Subject: [PATCH 03/20] Simplify threaded child interrupt handling Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/forked.rb | 4 ++-- lib/async/container/threaded.rb | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index d648e0d..b861d83 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -103,12 +103,12 @@ def self.fork(**options) self.new(**options) do |process| ::Thread.new do ::Process.fork do - # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. + # Ensure signal exceptions are delivered promptly in the forked child, + # even if the parent controller is masking SignalException. Signal.trap(:INT){::Thread.current.raise(Interrupt)} Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. Signal.trap(:HUP){::Thread.current.raise(Restart)} - # This could be a configuration option: ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(process) rescue Interrupt diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index ef31b28..9caa6de 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -114,10 +114,7 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) self.new(**options) do |thread| ::Thread.new do - # This could be a configuration option (see forked implementation too): - ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(thread) - end + yield Instance.for(thread) end end end From 6f3ed7714def584b98b7043e828b497aeb4c7459 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 19:24:27 +1200 Subject: [PATCH 04/20] Restore child interrupt mask reset Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/threaded.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 9caa6de..dfd1193 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -114,7 +114,9 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) self.new(**options) do |thread| ::Thread.new do - yield Instance.for(thread) + ::Thread.handle_interrupt(SignalException => :immediate) do + yield Instance.for(thread) + end end end end From 114fe352c561f95253d2cdb53f4251a4625be203 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 19:25:11 +1200 Subject: [PATCH 05/20] Revert interrupt handling detour --- lib/async/container/forked.rb | 4 ++-- lib/async/container/threaded.rb | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index b861d83..d648e0d 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -103,12 +103,12 @@ def self.fork(**options) self.new(**options) do |process| ::Thread.new do ::Process.fork do - # Ensure signal exceptions are delivered promptly in the forked child, - # even if the parent controller is masking SignalException. + # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. Signal.trap(:INT){::Thread.current.raise(Interrupt)} Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. Signal.trap(:HUP){::Thread.current.raise(Restart)} + # This could be a configuration option: ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(process) rescue Interrupt diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index dfd1193..ef31b28 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -114,6 +114,7 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) self.new(**options) do |thread| ::Thread.new do + # This could be a configuration option (see forked implementation too): ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(thread) end From 750e024b9ddc2e765fb4eb660eced39478fe728d Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 19:46:20 +1200 Subject: [PATCH 06/20] Rename Async compatibility test context Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- fixtures/async/container/a_container.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fixtures/async/container/a_container.rb b/fixtures/async/container/a_container.rb index 7d5da05..23bc7d4 100644 --- a/fixtures/async/container/a_container.rb +++ b/fixtures/async/container/a_container.rb @@ -73,7 +73,7 @@ module Container end end - with "Async compatibility" do + with "Async{}" do it "can wait inside an Async task after spawning outside Async" do input, output = IO.pipe From 36d6fcd1870680797d0a8ca56806f826544bd7b1 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 20:25:51 +1200 Subject: [PATCH 07/20] Document child fork interrupt handling Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/forked.rb | 9 +++++++-- lib/async/container/threaded.rb | 3 ++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index d648e0d..15e43a8 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -101,14 +101,19 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) # $stderr.puts fork: caller self.new(**options) do |process| + # Fork from a dedicated thread so the child does not inherit the + # parent fiber scheduler or the current caller's fiber stack. Only + # this short-lived thread is copied into the child process. ::Thread.new do ::Process.fork do - # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. Signal.trap(:INT){::Thread.current.raise(Interrupt)} Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. Signal.trap(:HUP){::Thread.current.raise(Restart)} - # This could be a configuration option: + # CRuby inherits the `Thread.handle_interrupt` mask stack across + # Thread.new, so reset signal delivery in the child before running + # user code. This ensures the signal traps above are delivered + # promptly even if the parent was masking SignalException. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(process) rescue Interrupt diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index ef31b28..8ae5162 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -114,7 +114,8 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) self.new(**options) do |thread| ::Thread.new do - # This could be a configuration option (see forked implementation too): + # CRuby inherits the `Thread.handle_interrupt` mask stack across + # Thread.new, so reset signal delivery before running user code. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(thread) end From 5f89e73f269abe28ea032ae86dbfdccdf315cb29 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 20:26:33 +1200 Subject: [PATCH 08/20] Avoid hard-wrapped child fork comments Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/forked.rb | 9 ++------- lib/async/container/threaded.rb | 3 +-- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 15e43a8..1eae0b3 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -101,19 +101,14 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) # $stderr.puts fork: caller self.new(**options) do |process| - # Fork from a dedicated thread so the child does not inherit the - # parent fiber scheduler or the current caller's fiber stack. Only - # this short-lived thread is copied into the child process. + # Fork from a dedicated thread so the child does not inherit the parent fiber scheduler or the current caller's fiber stack. ::Thread.new do ::Process.fork do Signal.trap(:INT){::Thread.current.raise(Interrupt)} Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. Signal.trap(:HUP){::Thread.current.raise(Restart)} - # CRuby inherits the `Thread.handle_interrupt` mask stack across - # Thread.new, so reset signal delivery in the child before running - # user code. This ensures the signal traps above are delivered - # promptly even if the parent was masking SignalException. + # CRuby inherits the `Thread.handle_interrupt` mask stack across Thread.new, so reset signal delivery before running user code. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(process) rescue Interrupt diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 8ae5162..10e5fb0 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -114,8 +114,7 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) self.new(**options) do |thread| ::Thread.new do - # CRuby inherits the `Thread.handle_interrupt` mask stack across - # Thread.new, so reset signal delivery before running user code. + # CRuby inherits the `Thread.handle_interrupt` mask stack across Thread.new, so reset signal delivery before running user code. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(thread) end From 64e37bacbfe75afe42f13c69ed5665056eac1341 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 20:27:19 +1200 Subject: [PATCH 09/20] Format Thread.new in child comments Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/forked.rb | 2 +- lib/async/container/threaded.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 1eae0b3..19b5d15 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -108,7 +108,7 @@ def self.fork(**options) Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. Signal.trap(:HUP){::Thread.current.raise(Restart)} - # CRuby inherits the `Thread.handle_interrupt` mask stack across Thread.new, so reset signal delivery before running user code. + # CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`, so reset signal delivery before running user code. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(process) rescue Interrupt diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 10e5fb0..aace666 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -114,7 +114,7 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) self.new(**options) do |thread| ::Thread.new do - # CRuby inherits the `Thread.handle_interrupt` mask stack across Thread.new, so reset signal delivery before running user code. + # CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`, so reset signal delivery before running user code. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(thread) end From 93c2eea237a39a05d628f2e2dfdd63d6a20dafb2 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 20:29:20 +1200 Subject: [PATCH 10/20] Clarify child signal handling comments Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/forked.rb | 4 ++-- lib/async/container/threaded.rb | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 19b5d15..4ddada2 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -101,14 +101,14 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) # $stderr.puts fork: caller self.new(**options) do |process| - # Fork from a dedicated thread so the child does not inherit the parent fiber scheduler or the current caller's fiber stack. + # Fork from a dedicated thread so the child does not inherit the parent fiber scheduler or the current caller's fiber stack; only this short-lived thread is copied into the child process. ::Thread.new do ::Process.fork do Signal.trap(:INT){::Thread.current.raise(Interrupt)} Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. Signal.trap(:HUP){::Thread.current.raise(Restart)} - # CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`, so reset signal delivery before running user code. + # CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`, so reset signal delivery before running user code; Async deliberately masks SignalException. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(process) rescue Interrupt diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index aace666..0b3f0fd 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -114,7 +114,7 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) self.new(**options) do |thread| ::Thread.new do - # CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`, so reset signal delivery before running user code. + # CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`, so reset signal delivery before running user code; Async deliberately masks SignalException. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(thread) end From 59805b94f9cc496c52c3e7aa955dd9fb661bddf7 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 20:34:40 +1200 Subject: [PATCH 11/20] Let ECHILD propagate from forked wait Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/forked.rb | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 4ddada2..2e8b1ad 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -153,21 +153,6 @@ def initialize(name: nil, **options) self.close_write end - # A minimal status for children reaped outside this object. - class Status - def success? - true - end - - def to_i - 0 - end - - def to_s - "\#<#{self.class} success>" - end - end - # Convert the child process to a hash, suitable for serialization. # # @returns [Hash] The request as a hash. @@ -253,13 +238,9 @@ def restart! # @parameter timeout [Numeric | Nil] Ignored; retained for compatibility. # @returns [::Process::Status] The process exit status. def wait(timeout = nil) - begin - if @pid && @status.nil? - Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}) - _, @status = ::Process.wait2(@pid) - end - rescue Errno::ECHILD - @status = Status.new + if @pid && @status.nil? + Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}) + _, @status = ::Process.wait2(@pid) end Console.debug(self, "Process exited.", child: {process_id: @pid, status: @status}) From 149baccde28d9ceb14dbfd8081939385029044b9 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 20:44:31 +1200 Subject: [PATCH 12/20] Simplify child wait methods Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/forked.rb | 3 +- lib/async/container/threaded.rb | 50 ++++++++++++--------------------- 2 files changed, 19 insertions(+), 34 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 2e8b1ad..fc7bdb6 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -235,9 +235,8 @@ def restart! # Wait for the child process to exit. # @asynchronous This method may block. # - # @parameter timeout [Numeric | Nil] Ignored; retained for compatibility. # @returns [::Process::Status] The process exit status. - def wait(timeout = nil) + def wait if @pid && @status.nil? Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}) _, @status = ::Process.wait2(@pid) diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 0b3f0fd..ebe504e 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -129,25 +129,11 @@ def initialize(name: nil, **options) super(**options) @status = nil + @joined = false @thread = yield(self) @thread.report_on_exception = false @thread.name = name - - @waiter = ::Thread.new do - begin - @thread.join - rescue Exit => exit - finished(exit.error) - rescue Interrupt - # Graceful shutdown. - finished - rescue Exception => error - finished(error) - else - finished - end - end end # Convert the child process to a hash, suitable for serialization. @@ -216,28 +202,28 @@ def restart! @thread.raise(Restart) end - # Wait for the thread to exit and return he exit status. + # Wait for the thread to exit and return the exit status. # @asynchronous This method may block. # - # @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination. # @returns [Status] - def wait(timeout = nil) - if @waiter - Console.debug(self, "Waiting for thread to exit...", child: {thread_id: @thread.object_id}, timeout: timeout) + def wait + unless @joined + Console.debug(self, "Waiting for thread to exit...", child: {thread_id: @thread.object_id}) - if timeout - unless @waiter.join(timeout) - Console.warn(self, "Thread is blocking, sending kill signal...", child: {thread_id: @thread.object_id}, timeout: timeout) - self.kill! - @waiter.join - end + begin + @thread.join + rescue Exit => exit + finished(exit.error) + rescue Interrupt + # Graceful shutdown. + finished + rescue Exception => error + finished(error) else - until @waiter.join(0) - sleep(0.1) - end + finished + ensure + @joined = true end - - @waiter = nil end Console.debug(self, "Thread exited.", child: {thread_id: @thread.object_id, status: @status}) @@ -278,7 +264,7 @@ def to_s protected - # Invoked by the @waiter thread to indicate the outcome of the child thread. + # Record the outcome of the child thread and close the notification channel. def finished(error = nil) if error Console.error(self){error} From 4eb7ba3585bf22fb6f1d768dadf7ca1b389f98d7 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 21:01:30 +1200 Subject: [PATCH 13/20] Document container group API Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/group.rb | 39 ++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index 1db7925..ab652d9 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -31,6 +31,8 @@ module Container # Async task context used for child supervisors and provides just enough # coordination for container-level wait, sleep and shutdown operations. class Group + # Initialize the child supervision registry. + # @parameter health_check_interval [Numeric | Nil] The interval used to wake waiters for periodic health checks. def initialize(health_check_interval: 1.0) @health_check_interval = health_check_interval @@ -43,33 +45,48 @@ def initialize(health_check_interval: 1.0) @thread = nil end + # @attribute [Numeric | Nil] The interval used to wake waiters for periodic health checks. attr :health_check_interval + # Generate a human-readable representation of the group. + # @returns [String] The group inspection string. def inspect "#<#{self.class} running=#{size}>" end + # Get the number of currently registered children. + # @returns [Integer] The number of running children. def size @mutex.synchronize{@children.size} end + # Check whether any supervisor tasks are still running. + # @returns [Boolean] Whether any supervisor tasks are running. def running? @mutex.synchronize{@supervisors.positive?} end + # Check whether any supervisor tasks are still running. + # @returns [Boolean] Whether any supervisor tasks are running. def any? running? end + # Check whether all supervisor tasks have stopped. + # @returns [Boolean] Whether no supervisor tasks are running. def empty? !running? end # Compatibility for older tests/code that inspected the implementation. + # @returns [Hash] A copy of the current child registration map. def running @mutex.synchronize{@children.dup} end + # Run a child supervisor block in the group's Async task context. + # @yields {...} The supervisor block to execute. + # @returns [Object] The queued supervisor job. def supervise(&block) @mutex.synchronize{@supervisors += 1} @@ -83,44 +100,66 @@ def supervise(&block) end end + # Register a child as running. + # @parameter child [Object] The child to register. + # @returns [Boolean] The child registration value. def insert(child) @mutex.synchronize{@children[child] = true} end + # Remove a child from the running set and wake waiters. + # @parameter child [Object] The child to remove. + # @returns [Object] The queued signal value. def delete(child) @mutex.synchronize{@children.delete(child)} signal! end + # Sleep until the group is signalled or the optional duration elapses. + # @parameter duration [Numeric | Nil] The maximum duration to sleep. + # @returns [Object | Nil] The queued signal value, or `nil` if the sleep timed out. def sleep(duration = nil) ::Thread.handle_interrupt(SignalException => :immediate) do @events.pop(timeout: duration) end end + # Wait until all supervisor tasks have stopped. + # @returns [Nil] def wait sleep while running? end + # Wake any waiters so they can re-check child health or state. + # @returns [Object] The queued signal value. def health_check! signal! end + # Send an interrupt signal to all registered children. + # @returns [Object] The result of iterating over the current children. def interrupt Console.info(self, "Sending interrupt to #{size} running children...") each_child(&:interrupt!) end + # Send a terminate signal to all registered children. + # @returns [Object] The result of iterating over the current children. def terminate Console.info(self, "Sending terminate to #{size} running children...") each_child(&:terminate!) end + # Send a kill signal to all registered children. + # @returns [Object] The result of iterating over the current children. def kill Console.info(self, "Sending kill to #{size} running children...") each_child(&:kill!) end + # Stop all registered children, escalating to kill if graceful shutdown does not complete. + # @parameter graceful [Boolean | Numeric] Whether to stop gracefully, or the graceful timeout duration. + # @returns [Nil] def stop(graceful = GRACEFUL_TIMEOUT) Console.debug(self, "Stopping all children...", graceful: graceful) From 1609b1af5ac822f452152279d34990702835783b Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 27 Jun 2026 09:47:25 +1200 Subject: [PATCH 14/20] Simplify container supervision with Async Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- fixtures/async/container/a_container.rb | 39 ++++++++- gems.rb | 1 + lib/async/container.rb | 1 + lib/async/container/controller.rb | 103 +++++++++++++++++------- lib/async/container/forked.rb | 3 + lib/async/container/generic.rb | 34 ++++---- lib/async/container/group.rb | 30 +------ lib/async/container/hybrid.rb | 35 ++++---- test/async/container/controller.rb | 22 +++++ test/async/container/forked.rb | 9 ++- test/async/container/group.rb | 3 + test/async/container/hybrid.rb | 13 ++- test/async/container/notify/pipe.rb | 3 + test/async/container/policy.rb | 3 + test/async/container/threaded.rb | 3 + 15 files changed, 199 insertions(+), 103 deletions(-) diff --git a/fixtures/async/container/a_container.rb b/fixtures/async/container/a_container.rb index 23bc7d4..522f824 100644 --- a/fixtures/async/container/a_container.rb +++ b/fixtures/async/container/a_container.rb @@ -83,7 +83,7 @@ module Container Async do container.wait - end + end.wait output.close expect(input.read).to be == "." @@ -98,11 +98,42 @@ module Container end container.wait - end + end.wait output.close expect(input.read).to be == "." end + + it "can wait while a health monitor is active" do + container.spawn(health_check_timeout: 10.0) do |instance| + instance.ready! + end + + Async::Task.current.with_timeout(2.0) do + container.wait + end + + expect(container.statistics).to have_attributes(failures: be == 0) + end + + it "can start, wait until ready, and stop inside Sync" do + Sync do + begin + 2.times do |i| + container.spawn(name: "worker #{i}") do |instance| + instance.ready! + sleep + end + end + + container.wait_until_ready + + expect(container.group.running).to have_attributes(size: be == 2) + ensure + container.stop if container.running? + end + end + end end it "should be blocking" do @@ -257,15 +288,15 @@ def child_exit(container, child, status, name:, key:, **options) container.run do |instance| Async do instance.ready! + sleep 0.1 end end expect(container).to be(:running?) + container.stop(false) container.wait - container.stop - expect(container).not.to be(:running?) end end diff --git a/gems.rb b/gems.rb index b3a2c76..8426145 100644 --- a/gems.rb +++ b/gems.rb @@ -21,6 +21,7 @@ group :test do gem "sus" + gem "sus-fixtures-async" gem "covered" gem "rubocop" diff --git a/lib/async/container.rb b/lib/async/container.rb index bd0d47a..28b9b7a 100644 --- a/lib/async/container.rb +++ b/lib/async/container.rb @@ -3,6 +3,7 @@ # Released under the MIT License. # Copyright, 2017-2025, by Samuel Williams. +require_relative "container/best" require_relative "container/controller" # @namespace diff --git a/lib/async/container/controller.rb b/lib/async/container/controller.rb index 755f7c7..ad0b3ba 100644 --- a/lib/async/container/controller.rb +++ b/lib/async/container/controller.rb @@ -6,6 +6,8 @@ require_relative "error" require_relative "best" +require "async" + require_relative "statistics" require_relative "notify" require_relative "policy" @@ -30,6 +32,7 @@ def initialize(notify: Notify.open!, container_class: Container, graceful_stop: @container = nil @signals = {} + @signal_queue = ::Thread::Queue.new self.trap(SIGHUP) do self.restart @@ -144,7 +147,7 @@ def restart # Wait for all child processes to enter the ready state. Console.info(self, "Waiting for startup...") - container.wait_until_ready + wait_until_ready(container) Console.info(self, "Finished startup.") if container.failed? @@ -160,7 +163,13 @@ def restart if old_container Console.info(self, "Stopping old container...") - old_container&.stop(@graceful_stop) + + begin + @stopping_container = old_container + old_container&.stop(@graceful_stop) + ensure + @stopping_container = nil + end end @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") @@ -188,7 +197,7 @@ def reload # Wait for all child processes to enter the ready state. Console.info(self, "Waiting for startup...") - @container.wait_until_ready + wait_until_ready(@container) Console.info(self, "Finished startup.") if @container.failed? @@ -202,24 +211,20 @@ def reload # Enter the controller run loop, trapping `SIGINT` and `SIGTERM`. def run - @notify&.status!("Initializing controller...") - - with_signal_handlers do - self.start + Sync do + @notify&.status!("Initializing controller...") - while @container&.running? - begin - @container.wait - rescue SignalException => exception - if handler = @signals[exception.signo] - begin - handler.call - rescue SetupError => error - Console.error(self, error) - end - else - raise + with_signal_handlers do + self.start + + while @container&.running? + process_signals(@container) + + if container = @container + container.sleep end + + process_signals(@container) if @container end end end @@ -231,29 +236,69 @@ def run self.stop(false) end + private def wait_until_ready(container) + @waiting_container = container + + begin + until container.status?(:ready) + process_signals(container, graceful: container.status?(:ready)) + + break unless container.running? + + container.sleep + end + ensure + @waiting_container = nil + end + end + + private def process_signals(container = @container, graceful: @graceful_stop) + while signal = @signal_queue.pop(timeout: 0) + if handler = @signals[signal] + begin + handler.call + rescue SetupError => error + Console.error(self, error) + end + elsif signal == SIGINT || signal == SIGTERM + target = @container || container + + if target.equal?(@container) + self.stop + else + target&.stop(graceful) + end + else + raise SignalException.new(signal) + end + end + end + private def with_signal_handlers # I thought this was the default... but it doesn't always raise an exception unless you do this explicitly. + wake_containers = proc do + @container&.group&.health_check! + @waiting_container&.group&.health_check! + @stopping_container&.group&.health_check! + end interrupt_action = Signal.trap(:INT) do - # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. - # $stderr.puts "Received INT signal, interrupting...", caller - ::Thread.current.raise(Interrupt) + @signal_queue << SIGINT + wake_containers.call end # SIGTERM behaves the same as SIGINT by default. terminate_action = Signal.trap(:TERM) do - # $stderr.puts "Received TERM signal, interrupting...", caller - ::Thread.current.raise(Interrupt) # Same as SIGINT + @signal_queue << SIGTERM + wake_containers.call end hangup_action = Signal.trap(:HUP) do - # $stderr.puts "Received HUP signal, restarting...", caller - ::Thread.current.raise(Restart) + @signal_queue << SIGHUP + wake_containers.call end - ::Thread.handle_interrupt(SignalException => :never) do - yield - end + yield ensure # Restore the interrupt handler: Signal.trap(:INT, interrupt_action) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index fc7bdb6..88f485d 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -111,8 +111,11 @@ def self.fork(**options) # CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`, so reset signal delivery before running user code; Async deliberately masks SignalException. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(process) + + exit!(0) rescue Interrupt # Graceful exit. + exit!(0) rescue Exception => error Console.error(self, error) diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index a41f286..529f034 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -362,22 +362,23 @@ def key?(key) def wait_for(child, state, age_clock, health_check_timeout, startup_timeout) parent = Async::Task.current result = Async::Queue.new - - waiter = parent.async do - begin - result << child.wait - rescue Exception => error - result << error - end - end + health = ::Thread::Queue.new if age_clock reader = parent.async do - read_notifications(child, state, age_clock) + read_notifications(child, state, age_clock, health) end monitor = if age_clock parent.async do - monitor_health(child, state, age_clock, health_check_timeout, startup_timeout) + monitor_health(child, state, age_clock, health, health_check_timeout, startup_timeout) + end + end + + waiter = parent.async do + begin + result << child.wait + rescue Exception => error + result << error end end @@ -391,7 +392,7 @@ def wait_for(child, state, age_clock, health_check_timeout, startup_timeout) waiter&.stop end - def read_notifications(child, state, age_clock) + def read_notifications(child, state, age_clock, health) while true child.in.wait_readable @@ -403,6 +404,7 @@ def read_notifications(child, state, age_clock) age_clock&.reset! end + health&.push(true) @group.health_check! else break @@ -414,18 +416,14 @@ def read_notifications(child, state, age_clock) # The notification pipe was closed while the child waiter was exiting. end - def monitor_health(child, state, age_clock, health_check_timeout, startup_timeout) + def monitor_health(child, state, age_clock, health, health_check_timeout, startup_timeout) while true if timeout = next_timeout(state, age_clock, health_check_timeout, startup_timeout) - sleep(timeout) + health.pop(timeout: timeout) break if check_timeout(child, state, age_clock, health_check_timeout, startup_timeout) else - sleep(@group.health_check_interval || 1.0) - end - - if @group.health_check_interval - @group.health_check! + health.pop(timeout: @group.health_check_interval || 1.0) end end end diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index ab652d9..b5a1a11 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -40,9 +40,6 @@ def initialize(health_check_interval: 1.0) @children = {} @supervisors = 0 @events = ::Thread::Queue.new - - @jobs = ::Thread::Queue.new - @thread = nil end # @attribute [Numeric | Nil] The interval used to wake waiters for periodic health checks. @@ -86,11 +83,11 @@ def running # Run a child supervisor block in the group's Async task context. # @yields {...} The supervisor block to execute. - # @returns [Object] The queued supervisor job. + # @returns [Async::Task] The supervisor task. def supervise(&block) @mutex.synchronize{@supervisors += 1} - schedule do + Async::Task.current.async(transient: true) do begin block.call ensure @@ -182,29 +179,6 @@ def stop(graceful = GRACEFUL_TIMEOUT) private - def schedule(&block) - start_reactor - - @jobs << proc do |parent| - parent.async(&block) - end - end - - def start_reactor - return if @thread&.alive? - - @thread = ::Thread.new do - Sync do |parent| - while job = @jobs.pop - job.call(parent) - end - end - end - - @thread.report_on_exception = false - @thread.name = "async-container supervisor" - end - def each_child children = @mutex.synchronize{@children.keys} diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index 8c6f4f5..da0df88 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -24,22 +24,29 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio forks.times do self.spawn(**options) do |instance| - container = Threaded.new + Signal.trap(:INT){exit!(0)} + Signal.trap(:TERM){exit!(0)} - container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block) - - container.wait_until_ready - instance.ready! - - begin - container.wait - rescue Interrupt - # Gracefully interrupt child threads; parent process handles escalation. - container.interrupt - retry + Sync do + container = Threaded.new + + begin + container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block) + + container.wait_until_ready + instance.ready! + + begin + container.wait + rescue Interrupt + container.interrupt + end + ensure + container&.stop(false) + end end - ensure - container.stop(false) + + exit!(0) end end diff --git a/test/async/container/controller.rb b/test/async/container/controller.rb index 446155a..f744f13 100644 --- a/test/async/container/controller.rb +++ b/test/async/container/controller.rb @@ -7,8 +7,11 @@ require "async/container/controllers" require "async/container/notify/server" require "async/container/notify/socket" +require "sus/fixtures/async" describe Async::Container::Controller do + include Sus::Fixtures::Async::SchedulerContext + let(:controller) {subject.new} with "#to_s" do @@ -231,3 +234,22 @@ def controller.setup(container) end end end + +describe Async::Container::Controller, unique: "run without scheduler" do + let(:controller) {subject.new} + + with "#run" do + it "creates a top-level event loop" do + def controller.setup(container) + container.spawn do |instance| + instance.ready! + sleep(0.01) + end + end + + controller.run + + expect(controller).not.to be(:running?) + end + end +end diff --git a/test/async/container/forked.rb b/test/async/container/forked.rb index e2a47a0..bdf69db 100644 --- a/test/async/container/forked.rb +++ b/test/async/container/forked.rb @@ -7,8 +7,11 @@ require "async/container/best" require "async/container/forked" require "async/container/a_container" +require "sus/fixtures/async" describe Async::Container::Forked do + include Sus::Fixtures::Async::SchedulerContext + let(:container) {subject.new} it_behaves_like Async::Container::AContainer @@ -17,7 +20,7 @@ trigger = IO.pipe pids = IO.pipe - thread = Thread.new do + task = Async do container.async(restart: true) do trigger.first.gets pids.last.puts Process.pid.to_s @@ -31,8 +34,8 @@ _child_pid = pids.first.gets end - thread.kill - thread.join + task.stop + task.wait expect(container.statistics.spawns).to be == 1 expect(container.statistics.restarts).to be == 2 diff --git a/test/async/container/group.rb b/test/async/container/group.rb index d38e1e7..8d4af8f 100644 --- a/test/async/container/group.rb +++ b/test/async/container/group.rb @@ -4,8 +4,11 @@ # Copyright, 2025-2026, by Samuel Williams. require "async/container/group" +require "sus/fixtures/async" describe Async::Container::Group do + include Sus::Fixtures::Async::SchedulerContext + let(:group) {Async::Container::Group.new} class FakeChild diff --git a/test/async/container/hybrid.rb b/test/async/container/hybrid.rb index 27b8848..6ce6b68 100644 --- a/test/async/container/hybrid.rb +++ b/test/async/container/hybrid.rb @@ -6,8 +6,11 @@ require "async/container/hybrid" require "async/container/best" require "async/container/a_container" +require "sus/fixtures/async" describe Async::Container::Hybrid do + include Sus::Fixtures::Async::SchedulerContext + it_behaves_like Async::Container::AContainer it "should be multiprocess" do @@ -85,14 +88,10 @@ def exits_fork_on_single_signal(signal) Process.kill(signal, fork_pid) # The fork must drain its inner threads and exit, rather than respawning them forever: - 8.times do - reaped, _status = Process.waitpid2(fork_pid, Process::WNOHANG) - if reaped - exited = true - break - end + 20.times do + Process.kill(0, fork_pid) sleep(0.1) - rescue Errno::ECHILD + rescue Errno::ESRCH exited = true break end diff --git a/test/async/container/notify/pipe.rb b/test/async/container/notify/pipe.rb index 27fb9e9..9caeff3 100644 --- a/test/async/container/notify/pipe.rb +++ b/test/async/container/notify/pipe.rb @@ -6,8 +6,11 @@ require "async/container/controller" require "async/container/controllers" +require "sus/fixtures/async" describe Async::Container::Notify::Pipe do + include Sus::Fixtures::Async::SchedulerContext + let(:notify_script) {Async::Container::Controllers.path_for("notify")} it "receives notification of child status" do diff --git a/test/async/container/policy.rb b/test/async/container/policy.rb index 684af5f..4fb73bc 100644 --- a/test/async/container/policy.rb +++ b/test/async/container/policy.rb @@ -5,8 +5,11 @@ require "async/container/policy" require "async/container/best" +require "sus/fixtures/async" describe Async::Container::Policy do + include Sus::Fixtures::Async::SchedulerContext + let(:policy) {subject.new} with "interface" do diff --git a/test/async/container/threaded.rb b/test/async/container/threaded.rb index f6c078e..02de8c3 100644 --- a/test/async/container/threaded.rb +++ b/test/async/container/threaded.rb @@ -5,8 +5,11 @@ require "async/container/threaded" require "async/container/a_container" +require "sus/fixtures/async" describe Async::Container::Threaded do + include Sus::Fixtures::Async::SchedulerContext + it_behaves_like Async::Container::AContainer it "should not be multiprocess" do From 2a2b2a43bdee2617379e85363322bc99ab27c815 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 27 Jun 2026 09:58:03 +1200 Subject: [PATCH 15/20] Use controller-owned signal wakeups Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/controller.rb | 77 ++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/lib/async/container/controller.rb b/lib/async/container/controller.rb index ad0b3ba..d5aff99 100644 --- a/lib/async/container/controller.rb +++ b/lib/async/container/controller.rb @@ -7,6 +7,7 @@ require_relative "best" require "async" +require "async/queue" require_relative "statistics" require_relative "notify" @@ -33,6 +34,7 @@ def initialize(notify: Notify.open!, container_class: Container, graceful_stop: @container = nil @signals = {} @signal_queue = ::Thread::Queue.new + @event_queue = ::Thread::Queue.new self.trap(SIGHUP) do self.restart @@ -164,12 +166,7 @@ def restart if old_container Console.info(self, "Stopping old container...") - begin - @stopping_container = old_container - old_container&.stop(@graceful_stop) - ensure - @stopping_container = nil - end + stop_container(old_container, @graceful_stop) end @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") @@ -221,7 +218,7 @@ def run process_signals(@container) if container = @container - container.sleep + controller_sleep(container) end process_signals(@container) if @container @@ -236,20 +233,49 @@ def run self.stop(false) end + private def stop_container(container, graceful) + parent = Async::Task.current + task = parent.async(transient: true) do + container.stop(graceful) + end + + while task.running? + process_signals(@container) + controller_sleep(container) + process_signals(@container) if @container + end + + task.wait + end + private def wait_until_ready(container) - @waiting_container = container + until container.status?(:ready) + process_signals(container, graceful: container.status?(:ready)) + + break unless container.running? + + controller_sleep(container) + end + end + + private def controller_sleep(container = nil) + parent = Async::Task.current + result = Async::Queue.new - begin - until container.status?(:ready) - process_signals(container, graceful: container.status?(:ready)) - - break unless container.running? - - container.sleep + event_task = parent.async(transient: true) do + result << @event_queue.pop + end + + container_task = if container + parent.async(transient: true) do + result << container.sleep end - ensure - @waiting_container = nil end + + result.pop + ensure + event_task&.stop + container_task&.stop end private def process_signals(container = @container, graceful: @graceful_stop) @@ -275,27 +301,22 @@ def run end private def with_signal_handlers - # I thought this was the default... but it doesn't always raise an exception unless you do this explicitly. - wake_containers = proc do - @container&.group&.health_check! - @waiting_container&.group&.health_check! - @stopping_container&.group&.health_check! + queue_signal = proc do |signal| + @signal_queue << signal + @event_queue << true end interrupt_action = Signal.trap(:INT) do - @signal_queue << SIGINT - wake_containers.call + queue_signal.call(SIGINT) end # SIGTERM behaves the same as SIGINT by default. terminate_action = Signal.trap(:TERM) do - @signal_queue << SIGTERM - wake_containers.call + queue_signal.call(SIGTERM) end hangup_action = Signal.trap(:HUP) do - @signal_queue << SIGHUP - wake_containers.call + queue_signal.call(SIGHUP) end yield From 8ca59e90e663859e98f4408b424f8bfb70d808bb Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 27 Jun 2026 10:03:35 +1200 Subject: [PATCH 16/20] Store controller signals in event queue Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/controller.rb | 70 +++++++++++++------------------ 1 file changed, 30 insertions(+), 40 deletions(-) diff --git a/lib/async/container/controller.rb b/lib/async/container/controller.rb index d5aff99..ba14274 100644 --- a/lib/async/container/controller.rb +++ b/lib/async/container/controller.rb @@ -33,7 +33,6 @@ def initialize(notify: Notify.open!, container_class: Container, graceful_stop: @container = nil @signals = {} - @signal_queue = ::Thread::Queue.new @event_queue = ::Thread::Queue.new self.trap(SIGHUP) do @@ -169,7 +168,9 @@ def restart stop_container(old_container, @graceful_stop) end - @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") + if @container + @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") + end rescue => error raise ensure @@ -215,13 +216,9 @@ def run self.start while @container&.running? - process_signals(@container) - if container = @container controller_sleep(container) end - - process_signals(@container) if @container end end end @@ -240,9 +237,7 @@ def run end while task.running? - process_signals(@container) controller_sleep(container) - process_signals(@container) if @container end task.wait @@ -250,73 +245,68 @@ def run private def wait_until_ready(container) until container.status?(:ready) - process_signals(container, graceful: container.status?(:ready)) - break unless container.running? - controller_sleep(container) + controller_sleep(container, graceful: container.status?(:ready)) end end - private def controller_sleep(container = nil) + private def controller_sleep(container = nil, graceful: @graceful_stop) parent = Async::Task.current result = Async::Queue.new event_task = parent.async(transient: true) do - result << @event_queue.pop + result << [:signal, @event_queue.pop] end container_task = if container parent.async(transient: true) do - result << container.sleep + result << [:container, container.sleep] end end - result.pop + source, signal = result.pop + + if source == :signal + process_signal(signal, container, graceful) + end ensure event_task&.stop container_task&.stop end - private def process_signals(container = @container, graceful: @graceful_stop) - while signal = @signal_queue.pop(timeout: 0) - if handler = @signals[signal] - begin - handler.call - rescue SetupError => error - Console.error(self, error) - end - elsif signal == SIGINT || signal == SIGTERM - target = @container || container - - if target.equal?(@container) - self.stop - else - target&.stop(graceful) - end + private def process_signal(signal, container = @container, graceful = @graceful_stop) + if handler = @signals[signal] + begin + handler.call + rescue SetupError => error + Console.error(self, error) + end + elsif signal == SIGINT || signal == SIGTERM + target = @container || container + + if target.equal?(@container) + self.stop else - raise SignalException.new(signal) + target&.stop(graceful) end + else + raise SignalException.new(signal) end end private def with_signal_handlers - queue_signal = proc do |signal| - @signal_queue << signal - @event_queue << true - end - interrupt_action = Signal.trap(:INT) do - queue_signal.call(SIGINT) + @event_queue << SIGINT end # SIGTERM behaves the same as SIGINT by default. terminate_action = Signal.trap(:TERM) do - queue_signal.call(SIGTERM) + @event_queue << SIGTERM end hangup_action = Signal.trap(:HUP) do - queue_signal.call(SIGHUP) + @event_queue << SIGHUP end yield From 2fcbd5878ae174c3732503efb7777bde11872d7e Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 27 Jun 2026 12:00:06 +1200 Subject: [PATCH 17/20] Represent controller wakeups as events Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/controller.rb | 39 +++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/lib/async/container/controller.rb b/lib/async/container/controller.rb index ba14274..ca7039b 100644 --- a/lib/async/container/controller.rb +++ b/lib/async/container/controller.rb @@ -18,12 +18,34 @@ module Container # Manages the life-cycle of one or more containers in order to support a persistent system. # e.g. a web server, job server or some other long running system. class Controller + class SignalEvent + def initialize(signal) + @signal = signal + end + + attr :signal + + def apply(controller, container: nil, graceful: controller.graceful_stop) + controller.__send__(:process_signal, @signal, container, graceful) + end + end + + module ContainerEvent + def self.apply(controller, **options) + # The container state has changed; callers will re-check their predicates. + end + end + SIGHUP = Signal.list["HUP"] SIGINT = Signal.list["INT"] SIGTERM = Signal.list["TERM"] SIGUSR1 = Signal.list["USR1"] SIGUSR2 = Signal.list["USR2"] + HANGUP_EVENT = SignalEvent.new(SIGHUP).freeze + INTERRUPT_EVENT = SignalEvent.new(SIGINT).freeze + TERMINATE_EVENT = SignalEvent.new(SIGTERM).freeze + # Initialize the controller. # @parameter notify [Notify::Client] A client used for process readiness notifications. def initialize(notify: Notify.open!, container_class: Container, graceful_stop: true) @@ -256,20 +278,17 @@ def run result = Async::Queue.new event_task = parent.async(transient: true) do - result << [:signal, @event_queue.pop] + result << @event_queue.pop end container_task = if container parent.async(transient: true) do - result << [:container, container.sleep] + container.sleep + result << ContainerEvent end end - source, signal = result.pop - - if source == :signal - process_signal(signal, container, graceful) - end + result.pop.apply(self, container: container, graceful: graceful) ensure event_task&.stop container_task&.stop @@ -297,16 +316,16 @@ def run private def with_signal_handlers interrupt_action = Signal.trap(:INT) do - @event_queue << SIGINT + @event_queue << INTERRUPT_EVENT end # SIGTERM behaves the same as SIGINT by default. terminate_action = Signal.trap(:TERM) do - @event_queue << SIGTERM + @event_queue << TERMINATE_EVENT end hangup_action = Signal.trap(:HUP) do - @event_queue << SIGHUP + @event_queue << HANGUP_EVENT end yield From 422c5ff6d3931968ae5bac219802874601b22454 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 27 Jun 2026 12:13:07 +1200 Subject: [PATCH 18/20] Prefer `version.rb` documentation. Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container.rb | 8 +------- lib/async/container/version.rb | 2 ++ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/async/container.rb b/lib/async/container.rb index 28b9b7a..834cb53 100644 --- a/lib/async/container.rb +++ b/lib/async/container.rb @@ -5,10 +5,4 @@ require_relative "container/best" require_relative "container/controller" - -# @namespace -module Async - # @namespace - module Container - end -end +require_relative "container/version" diff --git a/lib/async/container/version.rb b/lib/async/container/version.rb index 91da604..7420c32 100644 --- a/lib/async/container/version.rb +++ b/lib/async/container/version.rb @@ -3,7 +3,9 @@ # Released under the MIT License. # Copyright, 2017-2026, by Samuel Williams. +# @namespace module Async + # @namespace module Container VERSION = "0.35.1" end From 69c9fbb35d981f1aaaba34be8bec2811570f1a39 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 27 Jun 2026 12:54:38 +1200 Subject: [PATCH 19/20] Document controller event types Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/controller.rb | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/async/container/controller.rb b/lib/async/container/controller.rb index ca7039b..fb59d8b 100644 --- a/lib/async/container/controller.rb +++ b/lib/async/container/controller.rb @@ -18,19 +18,31 @@ module Container # Manages the life-cycle of one or more containers in order to support a persistent system. # e.g. a web server, job server or some other long running system. class Controller + # Represents a signal delivered through the controller event queue. class SignalEvent + # Initialize a signal event. + # @parameter signal [Integer] The signal number to process. def initialize(signal) @signal = signal end + # @attribute [Integer] The signal number to process. attr :signal + # Apply this signal event to the controller. + # @parameter controller [Controller] The controller which should process the signal. + # @parameter container [Generic | Nil] The container associated with the current wait operation. + # @parameter graceful [Boolean | Float] Whether to stop the container gracefully, or the duration to wait for graceful shutdown. def apply(controller, container: nil, graceful: controller.graceful_stop) controller.__send__(:process_signal, @signal, container, graceful) end end + # Represents a container state change in the controller event queue. module ContainerEvent + # Apply this container event to the controller. + # @parameter controller [Controller] The controller currently waiting for an event. + # @parameter options [Hash] Additional event context. def self.apply(controller, **options) # The container state has changed; callers will re-check their predicates. end From a8f7921a4cf69e28e90082b559dd4a9831284893 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 27 Jun 2026 13:23:17 +1200 Subject: [PATCH 20/20] Fix policy-driven container shutdown Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4 --- lib/async/container/generic.rb | 8 ++- lib/async/container/group.rb | 90 +++++++++++++++++++++++++++------- test/async/container/policy.rb | 31 ++++++++++++ 3 files changed, 111 insertions(+), 18 deletions(-) diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index 529f034..0604674 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -173,7 +173,7 @@ def stop(timeout = true) @stopping = true @group.stop(timeout) - if @group.running? + if @group.running?(except: current_task) Console.warn(self, "Group is still running after stopping it!") else Console.info(self, "Group has stopped.") @@ -183,6 +183,12 @@ def stop(timeout = true) raise end + private def current_task + Async::Task.current + rescue RuntimeError + nil + end + protected def health_check_failed(child, age_clock, health_check_timeout) begin @policy.health_check_failed( diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index b5a1a11..8d5c3c3 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -38,8 +38,9 @@ def initialize(health_check_interval: 1.0) @mutex = Mutex.new @children = {} - @supervisors = 0 - @events = ::Thread::Queue.new + @supervisors = {} + @pending_events = 0 + @waiters = [] end # @attribute [Numeric | Nil] The interval used to wake waiters for periodic health checks. @@ -58,9 +59,10 @@ def size end # Check whether any supervisor tasks are still running. + # @parameter except [Async::Task | Nil] The supervisor task to ignore. # @returns [Boolean] Whether any supervisor tasks are running. - def running? - @mutex.synchronize{@supervisors.positive?} + def running?(except: nil) + running_supervisors?(except: except) end # Check whether any supervisor tasks are still running. @@ -85,13 +87,16 @@ def running # @yields {...} The supervisor block to execute. # @returns [Async::Task] The supervisor task. def supervise(&block) - @mutex.synchronize{@supervisors += 1} + parent = Async::Task.current - Async::Task.current.async(transient: true) do + parent.async(transient: true) do + task = Async::Task.current + @mutex.synchronize{@supervisors[task] = true} + begin block.call ensure - @mutex.synchronize{@supervisors -= 1} + @mutex.synchronize{@supervisors.delete(task)} signal! end end @@ -116,15 +121,29 @@ def delete(child) # @parameter duration [Numeric | Nil] The maximum duration to sleep. # @returns [Object | Nil] The queued signal value, or `nil` if the sleep timed out. def sleep(duration = nil) + events = ::Thread::Queue.new + + @mutex.synchronize do + if @pending_events.positive? + @pending_events -= 1 + return true + end + + @waiters << events + end + ::Thread.handle_interrupt(SignalException => :immediate) do - @events.pop(timeout: duration) + events.pop(timeout: duration) end + ensure + @mutex.synchronize{@waiters.delete(events)} if events end - # Wait until all supervisor tasks have stopped. + # Wait until all other supervisor tasks have stopped. + # @parameter except [Async::Task | Nil] The supervisor task to ignore while waiting. # @returns [Nil] - def wait - sleep while running? + def wait(except: current_supervisor) + sleep while running_supervisors?(except: except) end # Wake any waiters so they can re-check child health or state. @@ -159,21 +178,26 @@ def kill # @returns [Nil] def stop(graceful = GRACEFUL_TIMEOUT) Console.debug(self, "Stopping all children...", graceful: graceful) + except = current_supervisor if graceful interrupt graceful = DEFAULT_GRACEFUL_TIMEOUT if graceful == true - wait_for_exit(Clock.start, graceful) + wait_for_children(Clock.start, graceful) end ensure - if running? + if size.positive? if graceful Console.warn(self, "Killing children after graceful shutdown failed...", size: size) end kill - wait + sleep while size.positive? + end + + if running_supervisors?(except: except) + wait(except: except) end end @@ -189,8 +213,8 @@ def each_child end end - def wait_for_exit(clock, timeout) - while running? + def wait_for_children(clock, timeout) + while size.positive? duration = timeout - clock.total break if duration.negative? @@ -199,8 +223,40 @@ def wait_for_exit(clock, timeout) end end + def current_supervisor + task = Async::Task.current + + @mutex.synchronize do + @supervisors.key?(task) ? task : nil + end + rescue RuntimeError + nil + end + + def running_supervisors?(except: nil) + @mutex.synchronize do + if except + @supervisors.any?{|task, _| task != except} + else + @supervisors.any? + end + end + end + def signal! - @events << true + waiters = @mutex.synchronize do + if @waiters.empty? + @pending_events += 1 + end + + @waiters.dup + end + + waiters.each do |events| + events << true + end + + return true end end end diff --git a/test/async/container/policy.rb b/test/async/container/policy.rb index 4fb73bc..ddff555 100644 --- a/test/async/container/policy.rb +++ b/test/async/container/policy.rb @@ -237,6 +237,37 @@ def child_exit(container, child, status, name:, key:, **options) expect(container.statistics.failures).to be == 1 end + it "can stop the container from child_exit" do + stop_policy = Class.new(Async::Container::Policy) do + def initialize + @stop_count = 0 + end + + attr :stop_count + + def child_exit(container, child, status, name:, key:, **options) + unless container.stopping? + @stop_count += 1 + container.stop + end + end + end.new + + container = Async::Container.best_container_class.new(policy: stop_policy) + + 3.times do |i| + container.spawn(name: "worker-#{i}") do |instance| + instance.ready! + exit(1) + end + end + + container.wait + + expect(stop_policy.stop_count).to be == 1 + expect(container).not.to be(:running?) + end + it "invokes callbacks for multiple children" do container = Async::Container.best_container_class.new(policy: tracking_policy)