diff --git a/fixtures/async/container/a_container.rb b/fixtures/async/container/a_container.rb index b3ceb16..522f824 100644 --- a/fixtures/async/container/a_container.rb +++ b/fixtures/async/container/a_container.rb @@ -73,6 +73,69 @@ module Container end end + with "Async{}" do + it "can wait inside an Async task after spawning outside Async" do + input, output = IO.pipe + + container.spawn do + output.write(".") + end + + Async do + container.wait + end.wait + + output.close + expect(input.read).to be == "." + end + + it "can spawn and wait inside the same Async task" do + input, output = IO.pipe + + Async do + container.spawn do + output.write(".") + end + + container.wait + end.wait + + output.close + expect(input.read).to be == "." + end + + it "can wait while a health monitor is active" do + container.spawn(health_check_timeout: 10.0) do |instance| + instance.ready! + end + + Async::Task.current.with_timeout(2.0) do + container.wait + end + + expect(container.statistics).to have_attributes(failures: be == 0) + end + + it "can start, wait until ready, and stop inside Sync" do + Sync do + begin + 2.times do |i| + container.spawn(name: "worker #{i}") do |instance| + instance.ready! + sleep + end + end + + container.wait_until_ready + + expect(container.group.running).to have_attributes(size: be == 2) + ensure + container.stop if container.running? + end + end + end + end + it "should be blocking" do skip "Fiber.blocking? is not supported!" unless Fiber.respond_to?(:blocking?) @@ -225,15 +288,15 @@ def child_exit(container, child, status, name:, key:, **options) container.run do |instance| Async do instance.ready! + sleep 0.1 end end expect(container).to be(:running?) + container.stop(false) container.wait - container.stop - expect(container).not.to be(:running?) end end diff --git a/gems.rb b/gems.rb index b3a2c76..8426145 100644 --- a/gems.rb +++ b/gems.rb @@ -21,6 +21,7 @@ group :test do gem "sus" + gem "sus-fixtures-async" gem "covered" gem "rubocop" diff --git a/lib/async/container.rb b/lib/async/container.rb index bd0d47a..834cb53 100644 --- a/lib/async/container.rb +++ b/lib/async/container.rb @@ -3,11 +3,6 @@ # Released under the MIT License. # Copyright, 2017-2025, by Samuel Williams. +require_relative "container/best" require_relative "container/controller" - -# @namespace -module Async - # @namespace - module Container - end -end +require_relative "container/version" diff --git a/lib/async/container/channel.rb b/lib/async/container/channel.rb index d207442..30283fd 100644 --- a/lib/async/container/channel.rb +++ b/lib/async/container/channel.rb @@ -25,12 +25,14 @@ def initialize(timeout: 1.0) # Close the input end of the pipe. def close_read - @in.close + @in.close unless @in.closed? + rescue IOError, Errno::EBADF end # Close the output end of the pipe. def close_write - @out.close + @out.close unless @out.closed? + rescue IOError, Errno::EBADF end # Close both ends of the pipe. diff --git a/lib/async/container/controller.rb b/lib/async/container/controller.rb index 755f7c7..fb59d8b 100644 --- a/lib/async/container/controller.rb +++ b/lib/async/container/controller.rb @@ -6,6 +6,9 @@ require_relative "error" require_relative "best" +require "async" +require "async/queue" + require_relative "statistics" require_relative "notify" require_relative "policy" @@ -15,12 +18,46 @@ module Container # Manages the life-cycle of one or more containers in order to support a persistent system. # e.g. a web server, job server or some other long running system. class Controller + # Represents a signal delivered through the controller event queue. + class SignalEvent + # Initialize a signal event. + # @parameter signal [Integer] The signal number to process. + def initialize(signal) + @signal = signal + end + + # @attribute [Integer] The signal number to process. + attr :signal + + # Apply this signal event to the controller. + # @parameter controller [Controller] The controller which should process the signal. + # @parameter container [Generic | Nil] The container associated with the current wait operation. + # @parameter graceful [Boolean | Float] Whether to stop the container gracefully, or the duration to wait for graceful shutdown. + def apply(controller, container: nil, graceful: controller.graceful_stop) + controller.__send__(:process_signal, @signal, container, graceful) + end + end + + # Represents a container state change in the controller event queue. + module ContainerEvent + # Apply this container event to the controller. + # @parameter controller [Controller] The controller currently waiting for an event. + # @parameter options [Hash] Additional event context. + def self.apply(controller, **options) + # The container state has changed; callers will re-check their predicates. + end + end + SIGHUP = Signal.list["HUP"] SIGINT = Signal.list["INT"] SIGTERM = Signal.list["TERM"] SIGUSR1 = Signal.list["USR1"] SIGUSR2 = Signal.list["USR2"] + HANGUP_EVENT = SignalEvent.new(SIGHUP).freeze + INTERRUPT_EVENT = SignalEvent.new(SIGINT).freeze + TERMINATE_EVENT = SignalEvent.new(SIGTERM).freeze + # Initialize the controller. # @parameter notify [Notify::Client] A client used for process readiness notifications. def initialize(notify: Notify.open!, container_class: Container, graceful_stop: true) @@ -30,6 +67,7 @@ def initialize(notify: Notify.open!, container_class: Container, graceful_stop: @container = nil @signals = {} + @event_queue = ::Thread::Queue.new self.trap(SIGHUP) do self.restart @@ -144,7 +182,7 @@ def restart # Wait for all child processes to enter the ready state. Console.info(self, "Waiting for startup...") - container.wait_until_ready + wait_until_ready(container) Console.info(self, "Finished startup.") if container.failed? @@ -160,10 +198,13 @@ def restart if old_container Console.info(self, "Stopping old container...") - old_container&.stop(@graceful_stop) + + stop_container(old_container, @graceful_stop) end - @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") + if @container + @notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.") + end rescue => error raise ensure @@ -188,7 +229,7 @@ def reload # Wait for all child processes to enter the ready state. Console.info(self, "Waiting for startup...") - @container.wait_until_ready + wait_until_ready(@container) Console.info(self, "Finished startup.") if @container.failed? @@ -202,23 +243,15 @@ def reload # Enter the controller run loop, trapping `SIGINT` and `SIGTERM`. def run - @notify&.status!("Initializing controller...") - - with_signal_handlers do - self.start + Sync do + @notify&.status!("Initializing controller...") - while @container&.running? - begin - @container.wait - rescue SignalException => exception - if handler = @signals[exception.signo] - begin - handler.call - rescue SetupError => error - Console.error(self, error) - end - else - raise + with_signal_handlers do + self.start + + while @container&.running? + if container = @container + controller_sleep(container) end end end @@ -231,29 +264,83 @@ def run self.stop(false) end - private def with_signal_handlers - # I thought this was the default... but it doesn't always raise an exception unless you do this explicitly. + private def stop_container(container, graceful) + parent = Async::Task.current + task = parent.async(transient: true) do + container.stop(graceful) + end + + while task.running? + controller_sleep(container) + end + + task.wait + end + + private def wait_until_ready(container) + until container.status?(:ready) + break unless container.running? + + controller_sleep(container, graceful: container.status?(:ready)) + end + end + + private def controller_sleep(container = nil, graceful: @graceful_stop) + parent = Async::Task.current + result = Async::Queue.new + event_task = parent.async(transient: true) do + result << @event_queue.pop + end + + container_task = if container + parent.async(transient: true) do + container.sleep + result << ContainerEvent + end + end + + result.pop.apply(self, container: container, graceful: graceful) + ensure + event_task&.stop + container_task&.stop + end + + private def process_signal(signal, container = @container, graceful = @graceful_stop) + if handler = @signals[signal] + begin + handler.call + rescue SetupError => error + Console.error(self, error) + end + elsif signal == SIGINT || signal == SIGTERM + target = @container || container + + if target.equal?(@container) + self.stop + else + target&.stop(graceful) + end + else + raise SignalException.new(signal) + end + end + + private def with_signal_handlers interrupt_action = Signal.trap(:INT) do - # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. - # $stderr.puts "Received INT signal, interrupting...", caller - ::Thread.current.raise(Interrupt) + @event_queue << INTERRUPT_EVENT end # SIGTERM behaves the same as SIGINT by default. terminate_action = Signal.trap(:TERM) do - # $stderr.puts "Received TERM signal, interrupting...", caller - ::Thread.current.raise(Interrupt) # Same as SIGINT + @event_queue << TERMINATE_EVENT end hangup_action = Signal.trap(:HUP) do - # $stderr.puts "Received HUP signal, restarting...", caller - ::Thread.current.raise(Restart) + @event_queue << HANGUP_EVENT end - ::Thread.handle_interrupt(SignalException => :never) do - yield - end + yield ensure # Restore the interrupt handler: Signal.trap(:INT, interrupt_action) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 15cc05f..88f485d 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -101,23 +101,28 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) # $stderr.puts fork: caller self.new(**options) do |process| - ::Process.fork do - # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. - Signal.trap(:INT){::Thread.current.raise(Interrupt)} - Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. - Signal.trap(:HUP){::Thread.current.raise(Restart)} - - # This could be a configuration option: - ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(process) - rescue Interrupt - # Graceful exit. - rescue Exception => error - Console.error(self, error) + # Fork from a dedicated thread so the child does not inherit the parent fiber scheduler or the current caller's fiber stack; only this short-lived thread is copied into the child process. + ::Thread.new do + ::Process.fork do + Signal.trap(:INT){::Thread.current.raise(Interrupt)} + Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT. + Signal.trap(:HUP){::Thread.current.raise(Restart)} - exit!(1) + # CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`, so reset signal delivery before running user code; Async deliberately masks SignalException. + ::Thread.handle_interrupt(SignalException => :immediate) do + yield Instance.for(process) + + exit!(0) + rescue Interrupt + # Graceful exit. + exit!(0) + rescue Exception => error + Console.error(self, error) + + exit!(1) + end end - end + end.value end end @@ -233,27 +238,11 @@ def restart! # Wait for the child process to exit. # @asynchronous This method may block. # - # @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination. # @returns [::Process::Status] The process exit status. - def wait(timeout = 0.1) + def wait if @pid && @status.nil? - Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}, timeout: timeout) - - _, @status = ::Process.wait2(@pid, ::Process::WNOHANG) - - if @status.nil? - sleep(timeout) if timeout - - _, @status = ::Process.wait2(@pid, ::Process::WNOHANG) - - if @status.nil? - Console.warn(self, "Process is blocking, sending kill signal...", child: {process_id: @pid}, timeout: timeout) - self.kill! - - # Wait for the process to exit: - _, @status = ::Process.wait2(@pid) - end - end + Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}) + _, @status = ::Process.wait2(@pid) end Console.debug(self, "Process exited.", child: {process_id: @pid, status: @status}) diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index d7e1551..0604674 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -5,7 +5,9 @@ # Copyright, 2025, by Marc-André Cournoyer. require "etc" +require "io/wait" require "async/clock" +require "async/queue" require_relative "group" require_relative "keyed" @@ -144,8 +146,6 @@ def wait_until_ready end end - self.sleep - if self.status?(:ready) Console.debug(self) do |buffer| buffer.puts "All ready:" @@ -156,6 +156,8 @@ def wait_until_ready return true end + + self.sleep end end @@ -171,7 +173,7 @@ def stop(timeout = true) @stopping = true @group.stop(timeout) - if @group.running? + if @group.running?(except: current_task) Console.warn(self, "Group is still running after stopping it!") else Console.info(self, "Group has stopped.") @@ -181,6 +183,12 @@ def stop(timeout = true) raise end + private def current_task + Async::Task.current + rescue RuntimeError + nil + end + protected def health_check_failed(child, age_clock, health_check_timeout) begin @policy.health_check_failed( @@ -222,13 +230,22 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu end @statistics.spawn! + started = ::Thread::Queue.new - fiber do + @group.supervise do + first = true + until @stopping Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) child = self.start(name, &block) state = insert(key, child) + @group.insert(child) + + if first + started << true + first = false + end # Notify policy of spawn begin @@ -247,36 +264,11 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu status = nil begin - status = @group.wait_for(child) do |message| - case message - when :health_check! - if state[:ready] - # If a health check timeout is specified, we will monitor the child process and terminate it if it does not update its state within the specified time. - if health_check_timeout - if health_check_timeout < age_clock.total - health_check_failed(child, age_clock, health_check_timeout) - end - end - else - # If a startup timeout is specified, we will monitor the child process and terminate it if it does not become ready within the specified time. - if startup_timeout - if startup_timeout < age_clock.total - startup_failed(child, age_clock, startup_timeout) - end - end - end - else - state.update(message) - - # Reset the age clock if the child has become ready: - if state[:ready] - age_clock&.reset! - end - end - end + status = wait_for(child, state, age_clock, health_check_timeout, startup_timeout) rescue => error Console.error(self, "Error during child process management!", exception: error, stopping: @stopping) ensure + @group.delete(child) delete(key, child) end @@ -300,7 +292,17 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu break end end - end.resume + rescue => error + started << error if first + raise + ensure + started << false if first + end + + case result = started.pop + when Exception + raise result + end return true end @@ -363,6 +365,109 @@ def key?(key) protected + def wait_for(child, state, age_clock, health_check_timeout, startup_timeout) + parent = Async::Task.current + result = Async::Queue.new + health = ::Thread::Queue.new if age_clock + + reader = parent.async do + read_notifications(child, state, age_clock, health) + end + + monitor = if age_clock + parent.async do + monitor_health(child, state, age_clock, health, health_check_timeout, startup_timeout) + end + end + + waiter = parent.async do + begin + result << child.wait + rescue Exception => error + result << error + end + end + + status = result.pop + raise status if status.is_a?(Exception) + + return status + ensure + reader&.stop + monitor&.stop + waiter&.stop + end + + def read_notifications(child, state, age_clock, health) + while true + child.in.wait_readable + + if message = child.receive + state.update(message) + + # Reset the age clock if the child has become ready: + if state[:ready] + age_clock&.reset! + end + + health&.push(true) + @group.health_check! + else + break + end + end + rescue IO::TimeoutError + retry + rescue IOError, Errno::EBADF + # The notification pipe was closed while the child waiter was exiting. + end + + def monitor_health(child, state, age_clock, health, health_check_timeout, startup_timeout) + while true + if timeout = next_timeout(state, age_clock, health_check_timeout, startup_timeout) + health.pop(timeout: timeout) + + break if check_timeout(child, state, age_clock, health_check_timeout, startup_timeout) + else + health.pop(timeout: @group.health_check_interval || 1.0) + end + end + end + + def next_timeout(state, age_clock, health_check_timeout, startup_timeout) + timeout = if state[:ready] + health_check_timeout + else + startup_timeout + end + + return unless timeout + + remaining = timeout - age_clock.total + + if remaining.positive? + remaining + else + 0 + end + end + + def check_timeout(child, state, age_clock, health_check_timeout, startup_timeout) + if state[:ready] + if health_check_timeout && health_check_timeout < age_clock.total + health_check_failed(child, age_clock, health_check_timeout) + return true + end + else + if startup_timeout && startup_timeout < age_clock.total + startup_failed(child, age_clock, startup_timeout) + return true + end + end + + return false + end + # Register the child (value) as running. def insert(key, child) if key @@ -385,17 +490,6 @@ def delete(key, child) @state.delete(child) end - private - - if Fiber.respond_to?(:blocking?) - def fiber(&block) - Fiber.new(blocking: true, &block) - end - else - def fiber(&block) - Fiber.new(&block) - end - end end end end diff --git a/lib/async/container/group.rb b/lib/async/container/group.rb index fb665f8..8d5c3c3 100644 --- a/lib/async/container/group.rb +++ b/lib/async/container/group.rb @@ -3,7 +3,7 @@ # Released under the MIT License. # Copyright, 2018-2026, by Samuel Williams. -require "fiber" +require "async" require "async/clock" require_relative "error" @@ -14,9 +14,9 @@ module Container GRACEFUL_TIMEOUT = ENV.fetch("ASYNC_CONTAINER_GRACEFUL_TIMEOUT", "true").then do |value| case value when "true" - true # Default timeout for graceful termination. + true when "false" - false # Immediately kill the processes. + false else value.to_f end @@ -25,228 +25,238 @@ module Container # The default timeout for graceful termination. DEFAULT_GRACEFUL_TIMEOUT = 10.0 - # Manages a group of running processes. + # Internal child supervision registry. + # + # This object intentionally does not model a public collection. It owns the + # Async task context used for child supervisors and provides just enough + # coordination for container-level wait, sleep and shutdown operations. class Group - # Initialize an empty group. - # - # @parameter health_check_interval [Numeric | Nil] The (biggest) interval at which health checks are performed. + # Initialize the child supervision registry. + # @parameter health_check_interval [Numeric | Nil] The interval used to wake waiters for periodic health checks. def initialize(health_check_interval: 1.0) @health_check_interval = health_check_interval - # The running fibers, indexed by IO: - @running = {} + @mutex = Mutex.new + @children = {} + @supervisors = {} + @pending_events = 0 + @waiters = [] end - # @returns [String] A human-readable representation of the group. + # @attribute [Numeric | Nil] The interval used to wake waiters for periodic health checks. + attr :health_check_interval + + # Generate a human-readable representation of the group. + # @returns [String] The group inspection string. def inspect - "#<#{self.class} running=#{@running.size}>" + "#<#{self.class} running=#{size}>" end - # @attribute [Hash(IO, Fiber)] the running tasks, indexed by IO. - attr :running - - # @returns [Integer] The number of running processes. + # Get the number of currently registered children. + # @returns [Integer] The number of running children. def size - @running.size + @mutex.synchronize{@children.size} end - # Whether the group contains any running processes. - # @returns [Boolean] - def running? - @running.any? + # Check whether any supervisor tasks are still running. + # @parameter except [Async::Task | Nil] The supervisor task to ignore. + # @returns [Boolean] Whether any supervisor tasks are running. + def running?(except: nil) + running_supervisors?(except: except) end - # Whether the group contains any running processes. - # @returns [Boolean] + # Check whether any supervisor tasks are still running. + # @returns [Boolean] Whether any supervisor tasks are running. def any? - @running.any? + running? end - # Whether the group is empty. - # @returns [Boolean] + # Check whether all supervisor tasks have stopped. + # @returns [Boolean] Whether no supervisor tasks are running. def empty? - @running.empty? + !running? end - # Sleep for at most the specified duration until some state change occurs. - def sleep(duration) - self.wait_for_children(duration) + # Compatibility for older tests/code that inspected the implementation. + # @returns [Hash] A copy of the current child registration map. + def running + @mutex.synchronize{@children.dup} end - # Begin any outstanding queued processes and wait for them indefinitely. - def wait - with_health_checks do |duration| - self.wait_for_children(duration) + # Run a child supervisor block in the group's Async task context. + # @yields {...} The supervisor block to execute. + # @returns [Async::Task] The supervisor task. + def supervise(&block) + parent = Async::Task.current + + parent.async(transient: true) do + task = Async::Task.current + @mutex.synchronize{@supervisors[task] = true} + + begin + block.call + ensure + @mutex.synchronize{@supervisors.delete(task)} + signal! + end end end - private def with_health_checks - if @health_check_interval - health_check_clock = Clock.start - - while self.running? - duration = [@health_check_interval - health_check_clock.total, 0].max - - yield duration - - if health_check_clock.total > @health_check_interval - self.health_check! - health_check_clock.reset! - end - end - else - while self.running? - yield nil + # Register a child as running. + # @parameter child [Object] The child to register. + # @returns [Boolean] The child registration value. + def insert(child) + @mutex.synchronize{@children[child] = true} + end + + # Remove a child from the running set and wake waiters. + # @parameter child [Object] The child to remove. + # @returns [Object] The queued signal value. + def delete(child) + @mutex.synchronize{@children.delete(child)} + signal! + end + + # Sleep until the group is signalled or the optional duration elapses. + # @parameter duration [Numeric | Nil] The maximum duration to sleep. + # @returns [Object | Nil] The queued signal value, or `nil` if the sleep timed out. + def sleep(duration = nil) + events = ::Thread::Queue.new + + @mutex.synchronize do + if @pending_events.positive? + @pending_events -= 1 + return true end + + @waiters << events end + + ::Thread.handle_interrupt(SignalException => :immediate) do + events.pop(timeout: duration) + end + ensure + @mutex.synchronize{@waiters.delete(events)} if events end - private def each_running(&block) - # We create a copy of the values here, in case the block modifies the running set: - @running.values.each(&block) + # Wait until all other supervisor tasks have stopped. + # @parameter except [Async::Task | Nil] The supervisor task to ignore while waiting. + # @returns [Nil] + def wait(except: current_supervisor) + sleep while running_supervisors?(except: except) end - # Perform a health check on all running processes. + # Wake any waiters so they can re-check child health or state. + # @returns [Object] The queued signal value. def health_check! - each_running do |fiber| - fiber.resume(:health_check!) - end + signal! end - # Interrupt all running processes. - # This resumes the controlling fiber with an instance of {Interrupt}. + # Send an interrupt signal to all registered children. + # @returns [Object] The result of iterating over the current children. def interrupt - Console.info(self, "Sending interrupt to #{@running.size} running processes...") - each_running do |fiber| - fiber.resume(Interrupt) - end + Console.info(self, "Sending interrupt to #{size} running children...") + each_child(&:interrupt!) end - # Terminate all running processes. - # This resumes the controlling fiber with an instance of {Terminate}. + # Send a terminate signal to all registered children. + # @returns [Object] The result of iterating over the current children. def terminate - Console.info(self, "Sending terminate to #{@running.size} running processes...") - each_running do |fiber| - fiber.resume(Terminate) - end + Console.info(self, "Sending terminate to #{size} running children...") + each_child(&:terminate!) end - # Kill all running processes. - # This resumes the controlling fiber with an instance of {Kill}. + # Send a kill signal to all registered children. + # @returns [Object] The result of iterating over the current children. def kill - Console.info(self, "Sending kill to #{@running.size} running processes...") - each_running do |fiber| - fiber.resume(Kill) - end + Console.info(self, "Sending kill to #{size} running children...") + each_child(&:kill!) end - private def wait_for_exit(clock, timeout) - while self.any? - duration = timeout - clock.total - - if duration >= 0 - self.wait_for_children(duration) - else - self.wait_for_children(0) - break - end - end - end - - # Stop all child processes with a multi-phase shutdown sequence. - # - # A graceful shutdown performs the following sequence: - # 1. Send SIGINT and wait up to `graceful` seconds if specified. - # 2. Send SIGKILL and wait indefinitely for process cleanup. - # - # If `graceful` is true, default to `DEFAULT_GRACEFUL_TIMEOUT` (10 seconds). - # If `graceful` is false, skip the SIGINT phase and go directly to SIGKILL. - # - # @parameter graceful [Boolean | Numeric] Whether to send SIGINT first or skip directly to SIGKILL. + # Stop all registered children, escalating to kill if graceful shutdown does not complete. + # @parameter graceful [Boolean | Numeric] Whether to stop gracefully, or the graceful timeout duration. + # @returns [Nil] def stop(graceful = GRACEFUL_TIMEOUT) - Console.debug(self, "Stopping all processes...", graceful: graceful) + Console.debug(self, "Stopping all children...", graceful: graceful) + except = current_supervisor - # If a timeout is specified, interrupt the children first: if graceful - # Send SIGINT to the children: - self.interrupt - - if graceful == true - graceful = DEFAULT_GRACEFUL_TIMEOUT - end - - clock = Clock.start + interrupt - # Wait for the children to exit: - self.wait_for_exit(clock, graceful) + graceful = DEFAULT_GRACEFUL_TIMEOUT if graceful == true + wait_for_children(Clock.start, graceful) end ensure - # Do our best to clean up the children: - if any? + if size.positive? if graceful - Console.warn(self, "Killing processes after graceful shutdown failed...", size: self.size, clock: clock) + Console.warn(self, "Killing children after graceful shutdown failed...", size: size) end - self.kill - self.wait + kill + sleep while size.positive? + end + + if running_supervisors?(except: except) + wait(except: except) end end - # Wait for a message in the specified {Channel}. - def wait_for(channel) - io = channel.in - - @running[io] = Fiber.current + private + + def each_child + children = @mutex.synchronize{@children.keys} - while @running.key?(io) - # Wait for some event on the channel: - result = Fiber.yield - - if result == Interrupt - channel.interrupt! - elsif result == Terminate - channel.terminate! - elsif result == Kill - channel.kill! - elsif result - yield result - elsif message = channel.receive - yield message - else - # Wait for the channel to exit: - return channel.wait - end + children.each do |child| + yield child + rescue Errno::ESRCH + # The child has already exited. end - ensure - @running.delete(io) end - protected + def wait_for_children(clock, timeout) + while size.positive? + duration = timeout - clock.total + + break if duration.negative? + + sleep(duration) + end + end - def wait_for_children(duration = nil) - # This log is a bit noisy and doesn't really provide a lot of useful information: - Console.debug(self, "Waiting for children...", duration: duration, running: @running) + def current_supervisor + task = Async::Task.current - unless @running.empty? - # Maybe consider using a proper event loop here: - if ready = self.select(duration) - ready.each do |io| - if fiber = @running[io] - # This method can be re-entered. While resuming a fiber, a policy hook may be invoked, which may invoke operations on the container. In that case, select may be called again on the same set of waiting fibers. On returning, those fibers may have already completed and removed themselves from @running, so we need to check for that. - fiber.resume - end - end + @mutex.synchronize do + @supervisors.key?(task) ? task : nil + end + rescue RuntimeError + nil + end + + def running_supervisors?(except: nil) + @mutex.synchronize do + if except + @supervisors.any?{|task, _| task != except} + else + @supervisors.any? end end end - # Wait for a child process to exit OR a signal to be received. - def select(duration) - ::Thread.handle_interrupt(SignalException => :immediate) do - readable, _, _ = ::IO.select(@running.keys, nil, nil, duration) + def signal! + waiters = @mutex.synchronize do + if @waiters.empty? + @pending_events += 1 + end - return readable + @waiters.dup end + + waiters.each do |events| + events << true + end + + return true end end end diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index 8c6f4f5..da0df88 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -24,22 +24,29 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio forks.times do self.spawn(**options) do |instance| - container = Threaded.new + Signal.trap(:INT){exit!(0)} + Signal.trap(:TERM){exit!(0)} - container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block) - - container.wait_until_ready - instance.ready! - - begin - container.wait - rescue Interrupt - # Gracefully interrupt child threads; parent process handles escalation. - container.interrupt - retry + Sync do + container = Threaded.new + + begin + container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block) + + container.wait_until_ready + instance.ready! + + begin + container.wait + rescue Interrupt + container.interrupt + end + ensure + container&.stop(false) + end end - ensure - container.stop(false) + + exit!(0) end end diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index b1be095..ebe504e 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -114,7 +114,7 @@ def exec(*arguments, ready: true, **options) def self.fork(**options) self.new(**options) do |thread| ::Thread.new do - # This could be a configuration option (see forked implementation too): + # CRuby inherits the `Thread.handle_interrupt` mask stack across `Thread.new`, so reset signal delivery before running user code; Async deliberately masks SignalException. ::Thread.handle_interrupt(SignalException => :immediate) do yield Instance.for(thread) end @@ -129,25 +129,11 @@ def initialize(name: nil, **options) super(**options) @status = nil + @joined = false @thread = yield(self) @thread.report_on_exception = false @thread.name = name - - @waiter = ::Thread.new do - begin - @thread.join - rescue Exit => exit - finished(exit.error) - rescue Interrupt - # Graceful shutdown. - finished - rescue Exception => error - finished(error) - else - finished - end - end end # Convert the child process to a hash, suitable for serialization. @@ -216,22 +202,28 @@ def restart! @thread.raise(Restart) end - # Wait for the thread to exit and return he exit status. + # Wait for the thread to exit and return the exit status. # @asynchronous This method may block. # - # @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination. # @returns [Status] - def wait(timeout = 0.1) - if @waiter - Console.debug(self, "Waiting for thread to exit...", child: {thread_id: @thread.object_id}, timeout: timeout) + def wait + unless @joined + Console.debug(self, "Waiting for thread to exit...", child: {thread_id: @thread.object_id}) - unless @waiter.join(timeout) - Console.warn(self, "Thread is blocking, sending kill signal...", child: {thread_id: @thread.object_id}, timeout: timeout) - self.kill! - @waiter.join + begin + @thread.join + rescue Exit => exit + finished(exit.error) + rescue Interrupt + # Graceful shutdown. + finished + rescue Exception => error + finished(error) + else + finished + ensure + @joined = true end - - @waiter = nil end Console.debug(self, "Thread exited.", child: {thread_id: @thread.object_id, status: @status}) @@ -272,7 +264,7 @@ def to_s protected - # Invoked by the @waiter thread to indicate the outcome of the child thread. + # Record the outcome of the child thread and close the notification channel. def finished(error = nil) if error Console.error(self){error} diff --git a/lib/async/container/version.rb b/lib/async/container/version.rb index 91da604..7420c32 100644 --- a/lib/async/container/version.rb +++ b/lib/async/container/version.rb @@ -3,7 +3,9 @@ # Released under the MIT License. # Copyright, 2017-2026, by Samuel Williams. +# @namespace module Async + # @namespace module Container VERSION = "0.35.1" end diff --git a/test/async/container/controller.rb b/test/async/container/controller.rb index 446155a..f744f13 100644 --- a/test/async/container/controller.rb +++ b/test/async/container/controller.rb @@ -7,8 +7,11 @@ require "async/container/controllers" require "async/container/notify/server" require "async/container/notify/socket" +require "sus/fixtures/async" describe Async::Container::Controller do + include Sus::Fixtures::Async::SchedulerContext + let(:controller) {subject.new} with "#to_s" do @@ -231,3 +234,22 @@ def controller.setup(container) end end end + +describe Async::Container::Controller, unique: "run without scheduler" do + let(:controller) {subject.new} + + with "#run" do + it "creates a top-level event loop" do + def controller.setup(container) + container.spawn do |instance| + instance.ready! + sleep(0.01) + end + end + + controller.run + + expect(controller).not.to be(:running?) + end + end +end diff --git a/test/async/container/forked.rb b/test/async/container/forked.rb index e2a47a0..bdf69db 100644 --- a/test/async/container/forked.rb +++ b/test/async/container/forked.rb @@ -7,8 +7,11 @@ require "async/container/best" require "async/container/forked" require "async/container/a_container" +require "sus/fixtures/async" describe Async::Container::Forked do + include Sus::Fixtures::Async::SchedulerContext + let(:container) {subject.new} it_behaves_like Async::Container::AContainer @@ -17,7 +20,7 @@ trigger = IO.pipe pids = IO.pipe - thread = Thread.new do + task = Async do container.async(restart: true) do trigger.first.gets pids.last.puts Process.pid.to_s @@ -31,8 +34,8 @@ _child_pid = pids.first.gets end - thread.kill - thread.join + task.stop + task.wait expect(container.statistics.spawns).to be == 1 expect(container.statistics.restarts).to be == 2 diff --git a/test/async/container/group.rb b/test/async/container/group.rb index 11cbfe5..8d4af8f 100644 --- a/test/async/container/group.rb +++ b/test/async/container/group.rb @@ -4,28 +4,44 @@ # Copyright, 2025-2026, by Samuel Williams. require "async/container/group" -require "async/container/channel" +require "sus/fixtures/async" describe Async::Container::Group do + include Sus::Fixtures::Async::SchedulerContext + let(:group) {Async::Container::Group.new} + class FakeChild + def initialize + @events = [] + end + + attr :events + + def interrupt! + @events << :interrupt + end + + def terminate! + @events << :terminate + end + + def kill! + @events << :kill + end + end + with "#size" do it "returns zero for empty group" do expect(group.size).to be == 0 end - it "returns the number of running processes" do - channel1 = Async::Container::Channel.new - channel2 = Async::Container::Channel.new - - fiber1 = Fiber.new{Fiber.yield} - fiber2 = Fiber.new{Fiber.yield} - - fiber1.resume - fiber2.resume + it "returns the number of registered children" do + child1 = FakeChild.new + child2 = FakeChild.new - group.running[channel1.in] = fiber1 - group.running[channel2.in] = fiber2 + group.insert(child1) + group.insert(child2) expect(group.size).to be == 2 end @@ -36,30 +52,20 @@ expect(group).not.to be(:running?) end - it "returns true when processes are running" do - channel = Async::Container::Channel.new - fiber = Fiber.new{Fiber.yield} - fiber.resume + it "returns true while a supervisor is running" do + queue = Thread::Queue.new + release = Thread::Queue.new - group.running[channel.in] = fiber + group.supervise do + queue << :running + release.pop + end + expect(queue.pop).to be == :running expect(group).to be(:running?) - end - end - - with "#any?" do - it "returns false for empty group" do - expect(group).not.to be(:any?) - end - - it "returns true when processes are running" do - channel = Async::Container::Channel.new - fiber = Fiber.new{Fiber.yield} - fiber.resume - group.running[channel.in] = fiber - - expect(group).to be(:any?) + release << true + group.wait end end @@ -67,16 +73,6 @@ it "returns true for empty group" do expect(group).to be(:empty?) end - - it "returns false when processes are running" do - channel = Async::Container::Channel.new - fiber = Fiber.new{Fiber.yield} - fiber.resume - - group.running[channel.in] = fiber - - expect(group).not.to be(:empty?) - end end with "#inspect" do @@ -85,219 +81,68 @@ expect(group.inspect).to be =~ /running=0/ end - it "shows the number of running processes" do - channel = Async::Container::Channel.new - fiber = Fiber.new{Fiber.yield} - fiber.resume - - group.running[channel.in] = fiber + it "shows the number of registered children" do + child = FakeChild.new + group.insert(child) expect(group.inspect).to be =~ /running=1/ end end - with "#health_check!" do - it "resumes all fibers with :health_check! message" do - messages = [] + with "bulk child control" do + it "interrupts all registered children" do + child1 = FakeChild.new + child2 = FakeChild.new - 2.times do - channel = Async::Container::Channel.new - fiber = Fiber.new do - result = Fiber.yield - messages << result - end - - fiber.resume - group.running[channel.in] = fiber - end - - group.health_check! + group.insert(child1) + group.insert(child2) + group.interrupt - expect(messages).to be == [:health_check!, :health_check!] + expect(child1.events).to be == [:interrupt] + expect(child2.events).to be == [:interrupt] end - it "does nothing for empty group" do - expect do - group.health_check! - end.not.to raise_exception - end - end - - with "#interrupt" do - it "resumes all fibers with Interrupt" do - messages = [] + it "terminates all registered children" do + child = FakeChild.new - 2.times do - channel = Async::Container::Channel.new - fiber = Fiber.new do - result = Fiber.yield - messages << result - end - - fiber.resume - group.running[channel.in] = fiber - end - - group.interrupt + group.insert(child) + group.terminate - expect(messages).to be == [Async::Container::Interrupt, Async::Container::Interrupt] + expect(child.events).to be == [:terminate] end - end - - with "#terminate" do - it "resumes all fibers with Terminate" do - messages = [] - - 2.times do - channel = Async::Container::Channel.new - fiber = Fiber.new do - result = Fiber.yield - messages << result - end - - fiber.resume - group.running[channel.in] = fiber - end + + it "kills all registered children" do + child = FakeChild.new - group.terminate + group.insert(child) + group.kill - expect(messages).to be == [Async::Container::Terminate, Async::Container::Terminate] + expect(child.events).to be == [:kill] end end - with "#kill" do - it "resumes all fibers with Kill" do - messages = [] - - 2.times do - channel = Async::Container::Channel.new - fiber = Fiber.new do - result = Fiber.yield - messages << result - end - - fiber.resume - group.running[channel.in] = fiber + with "#sleep" do + it "wakes when child state changes" do + thread = Thread.new do + group.sleep + :woken end - group.kill + group.health_check! - expect(messages).to be == [Async::Container::Kill, Async::Container::Kill] + expect(thread.value).to be == :woken end end - # Regression test for a bug where restarting a child during health check caused - # "RuntimeError: can't add a new key into hash during iteration" - # - # The scenario: - # - A container spawns children with `restart: true` and `health_check_timeout: N` - # - health_check! calls @running.each_value { |fiber| fiber.resume(:health_check!) } - # - A resumed fiber detects health check failure and kills the child - # - The spawn fiber's while loop continues (restart: true) and calls wait_for with a new child - # - wait_for tries to add to @running while health_check! is still iterating - # - This used to cause: RuntimeError: can't add a new key into hash during iteration - it "can restart child during health_check! iteration without error" do - channel1 = Async::Container::Channel.new - channel2 = Async::Container::Channel.new - - # Simulate the spawn fiber that restarts on health check failure - restart = true - fiber = Fiber.new do - while restart - result = Fiber.yield # Wait to be resumed - - if result == :health_check! - # Health check failed! Simulate the restart logic: - # The wait_for will return (simulated by breaking from this iteration) - # and the while loop continues, creating a new child - - # Simulate: child.kill! happens, wait_for returns - # Now the while loop continues and calls wait_for with new child - Fiber.new do - group.wait_for(channel2) do |msg| - # New child waiting - end - end.resume - - restart = false # Only do this once for the test - end + with "#wait" do + it "waits until all supervisors finish" do + group.supervise do + sleep(0.01) end - end - - # Start the fiber and add it to @running (simulating first wait_for call) - fiber.resume - group.running[channel1.in] = fiber - - # The fix ensures this doesn't raise RuntimeError during iteration - expect do - group.health_check! - end.not.to raise_exception - end - - # Regression test with multiple children where one restarts during health check - it "can handle one of multiple children restarting during health check" do - # Create two children, both with restart capability - 2.times do |i| - channel = Async::Container::Channel.new - fiber = Fiber.new do - iteration = 0 - loop do - iteration += 1 - result = Fiber.yield - - # First child fails health check on first iteration - if i == 0 && iteration == 1 && result == :health_check! - # Simulate health check failure and restart: - # Kill the old child, create new one - new_channel = Async::Container::Channel.new - - # This mimics what happens in spawn's while @running loop - # after wait_for returns due to child being killed - group.wait_for(new_channel) do |msg| - # New child process - end - - break # Exit this child's loop - end - end - end + group.wait - fiber.resume - group.running[channel.in] = fiber + expect(group).not.to be(:running?) end - - # The fix ensures this doesn't raise RuntimeError when the first fiber restarts - expect do - group.health_check! - end.not.to raise_exception - end - - it "handles nil fiber in @running during iteration (re-entrance scenario)" do - # This test simulates a scenario where: - # 1. IO.select returns [io1, io2] - # 2. While resuming fiber for io1, a re-entrant call completes fiber for io2 - # 3. When iteration continues to io2, @running[io2] is nil - # Without defensive check (&.), this would crash with NoMethodError - - channel1 = Async::Container::Channel.new - channel2 = Async::Container::Channel.new - - fiber1 = Fiber.new{group.running.delete(channel2.in)} - fiber2 = Fiber.new{Fiber.yield} - - fiber2.resume - - group.running[channel1.in] = fiber1 - group.running[channel2.in] = fiber2 - - # Mock select to return both channels: - expect(group).to receive(:select).and_return([channel1.in, channel2.in]) - - # This should not crash due to &. operator: - group.sleep(0) - - # Verify fiber2 was removed - expect(group.running.key?(channel2.in)).to be == false end end diff --git a/test/async/container/hybrid.rb b/test/async/container/hybrid.rb index 27b8848..6ce6b68 100644 --- a/test/async/container/hybrid.rb +++ b/test/async/container/hybrid.rb @@ -6,8 +6,11 @@ require "async/container/hybrid" require "async/container/best" require "async/container/a_container" +require "sus/fixtures/async" describe Async::Container::Hybrid do + include Sus::Fixtures::Async::SchedulerContext + it_behaves_like Async::Container::AContainer it "should be multiprocess" do @@ -85,14 +88,10 @@ def exits_fork_on_single_signal(signal) Process.kill(signal, fork_pid) # The fork must drain its inner threads and exit, rather than respawning them forever: - 8.times do - reaped, _status = Process.waitpid2(fork_pid, Process::WNOHANG) - if reaped - exited = true - break - end + 20.times do + Process.kill(0, fork_pid) sleep(0.1) - rescue Errno::ECHILD + rescue Errno::ESRCH exited = true break end diff --git a/test/async/container/notify/pipe.rb b/test/async/container/notify/pipe.rb index 27fb9e9..9caeff3 100644 --- a/test/async/container/notify/pipe.rb +++ b/test/async/container/notify/pipe.rb @@ -6,8 +6,11 @@ require "async/container/controller" require "async/container/controllers" +require "sus/fixtures/async" describe Async::Container::Notify::Pipe do + include Sus::Fixtures::Async::SchedulerContext + let(:notify_script) {Async::Container::Controllers.path_for("notify")} it "receives notification of child status" do diff --git a/test/async/container/policy.rb b/test/async/container/policy.rb index 684af5f..ddff555 100644 --- a/test/async/container/policy.rb +++ b/test/async/container/policy.rb @@ -5,8 +5,11 @@ require "async/container/policy" require "async/container/best" +require "sus/fixtures/async" describe Async::Container::Policy do + include Sus::Fixtures::Async::SchedulerContext + let(:policy) {subject.new} with "interface" do @@ -234,6 +237,37 @@ def child_exit(container, child, status, name:, key:, **options) expect(container.statistics.failures).to be == 1 end + it "can stop the container from child_exit" do + stop_policy = Class.new(Async::Container::Policy) do + def initialize + @stop_count = 0 + end + + attr :stop_count + + def child_exit(container, child, status, name:, key:, **options) + unless container.stopping? + @stop_count += 1 + container.stop + end + end + end.new + + container = Async::Container.best_container_class.new(policy: stop_policy) + + 3.times do |i| + container.spawn(name: "worker-#{i}") do |instance| + instance.ready! + exit(1) + end + end + + container.wait + + expect(stop_policy.stop_count).to be == 1 + expect(container).not.to be(:running?) + end + it "invokes callbacks for multiple children" do container = Async::Container.best_container_class.new(policy: tracking_policy) diff --git a/test/async/container/threaded.rb b/test/async/container/threaded.rb index f6c078e..02de8c3 100644 --- a/test/async/container/threaded.rb +++ b/test/async/container/threaded.rb @@ -5,8 +5,11 @@ require "async/container/threaded" require "async/container/a_container" +require "sus/fixtures/async" describe Async::Container::Threaded do + include Sus::Fixtures::Async::SchedulerContext + it_behaves_like Async::Container::AContainer it "should not be multiprocess" do