Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3007892
Simplify container supervision using Async
samuel-williams-shopify Jun 26, 2026
57e9e4b
Use child wait as supervision source of truth
samuel-williams-shopify Jun 26, 2026
d1155de
Simplify threaded child interrupt handling
samuel-williams-shopify Jun 26, 2026
6f3ed77
Restore child interrupt mask reset
samuel-williams-shopify Jun 26, 2026
114fe35
Revert interrupt handling detour
samuel-williams-shopify Jun 26, 2026
750e024
Rename Async compatibility test context
samuel-williams-shopify Jun 26, 2026
36d6fcd
Document child fork interrupt handling
samuel-williams-shopify Jun 26, 2026
5f89e73
Avoid hard-wrapped child fork comments
samuel-williams-shopify Jun 26, 2026
64e37ba
Format Thread.new in child comments
samuel-williams-shopify Jun 26, 2026
93c2eea
Clarify child signal handling comments
samuel-williams-shopify Jun 26, 2026
59805b9
Let ECHILD propagate from forked wait
samuel-williams-shopify Jun 26, 2026
149bacc
Simplify child wait methods
samuel-williams-shopify Jun 26, 2026
4eb7ba3
Document container group API
samuel-williams-shopify Jun 26, 2026
1609b1a
Simplify container supervision with Async
samuel-williams-shopify Jun 26, 2026
2a2b2a4
Use controller-owned signal wakeups
samuel-williams-shopify Jun 26, 2026
8ca59e9
Store controller signals in event queue
samuel-williams-shopify Jun 26, 2026
2fcbd58
Represent controller wakeups as events
samuel-williams-shopify Jun 27, 2026
422c5ff
Prefer `version.rb` documentation.
samuel-williams-shopify Jun 27, 2026
69c9fbb
Document controller event types
samuel-williams-shopify Jun 27, 2026
a8f7921
Fix policy-driven container shutdown
samuel-williams-shopify Jun 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions fixtures/async/container/a_container.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,69 @@ module Container
end
end

with "Async{}" do
it "can wait inside an Async task after spawning outside Async" do
input, output = IO.pipe

container.spawn do
output.write(".")
end

Async do
container.wait
end.wait

output.close
expect(input.read).to be == "."
end

it "can spawn and wait inside the same Async task" do
input, output = IO.pipe

Async do
container.spawn do
output.write(".")
end

container.wait
end.wait

output.close
expect(input.read).to be == "."
end

it "can wait while a health monitor is active" do
container.spawn(health_check_timeout: 10.0) do |instance|
instance.ready!
end

Async::Task.current.with_timeout(2.0) do
container.wait
end

expect(container.statistics).to have_attributes(failures: be == 0)
end

it "can start, wait until ready, and stop inside Sync" do
Sync do
begin
2.times do |i|
container.spawn(name: "worker #{i}") do |instance|
instance.ready!
sleep
end
end

container.wait_until_ready

expect(container.group.running).to have_attributes(size: be == 2)
ensure
container.stop if container.running?
end
end
end
end

it "should be blocking" do
skip "Fiber.blocking? is not supported!" unless Fiber.respond_to?(:blocking?)

Expand Down Expand Up @@ -225,15 +288,15 @@ def child_exit(container, child, status, name:, key:, **options)
container.run do |instance|
Async do
instance.ready!
sleep 0.1
end
end

expect(container).to be(:running?)

container.stop(false)
container.wait

container.stop

expect(container).not.to be(:running?)
end
end
Expand Down
1 change: 1 addition & 0 deletions gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

group :test do
gem "sus"
gem "sus-fixtures-async"
gem "covered"

gem "rubocop"
Expand Down
9 changes: 2 additions & 7 deletions lib/async/container.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
# Released under the MIT License.
# Copyright, 2017-2025, by Samuel Williams.

require_relative "container/best"
require_relative "container/controller"

# @namespace
module Async
# @namespace
module Container
end
end
require_relative "container/version"
6 changes: 4 additions & 2 deletions lib/async/container/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ def initialize(timeout: 1.0)

# Close the input end of the pipe.
def close_read
@in.close
@in.close unless @in.closed?
rescue IOError, Errno::EBADF
end

# Close the output end of the pipe.
def close_write
@out.close
@out.close unless @out.closed?
rescue IOError, Errno::EBADF
end

# Close both ends of the pipe.
Expand Down
151 changes: 119 additions & 32 deletions lib/async/container/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
require_relative "error"
require_relative "best"

require "async"
require "async/queue"

require_relative "statistics"
require_relative "notify"
require_relative "policy"
Expand All @@ -15,12 +18,46 @@ module Container
# Manages the life-cycle of one or more containers in order to support a persistent system.
# e.g. a web server, job server or some other long running system.
class Controller
# Represents a signal delivered through the controller event queue.
class SignalEvent
# Initialize a signal event.
# @parameter signal [Integer] The signal number to process.
def initialize(signal)
@signal = signal
end

# @attribute [Integer] The signal number to process.
attr :signal

# Apply this signal event to the controller.
# @parameter controller [Controller] The controller which should process the signal.
# @parameter container [Generic | Nil] The container associated with the current wait operation.
# @parameter graceful [Boolean | Float] Whether to stop the container gracefully, or the duration to wait for graceful shutdown.
def apply(controller, container: nil, graceful: controller.graceful_stop)
controller.__send__(:process_signal, @signal, container, graceful)
end
end

# Represents a container state change in the controller event queue.
module ContainerEvent
# Apply this container event to the controller.
# @parameter controller [Controller] The controller currently waiting for an event.
# @parameter options [Hash] Additional event context.
def self.apply(controller, **options)
# The container state has changed; callers will re-check their predicates.
end
end

SIGHUP = Signal.list["HUP"]
SIGINT = Signal.list["INT"]
SIGTERM = Signal.list["TERM"]
SIGUSR1 = Signal.list["USR1"]
SIGUSR2 = Signal.list["USR2"]

HANGUP_EVENT = SignalEvent.new(SIGHUP).freeze
INTERRUPT_EVENT = SignalEvent.new(SIGINT).freeze
TERMINATE_EVENT = SignalEvent.new(SIGTERM).freeze

# Initialize the controller.
# @parameter notify [Notify::Client] A client used for process readiness notifications.
def initialize(notify: Notify.open!, container_class: Container, graceful_stop: true)
Expand All @@ -30,6 +67,7 @@ def initialize(notify: Notify.open!, container_class: Container, graceful_stop:

@container = nil
@signals = {}
@event_queue = ::Thread::Queue.new

self.trap(SIGHUP) do
self.restart
Expand Down Expand Up @@ -144,7 +182,7 @@ def restart

# Wait for all child processes to enter the ready state.
Console.info(self, "Waiting for startup...")
container.wait_until_ready
wait_until_ready(container)
Console.info(self, "Finished startup.")

if container.failed?
Expand All @@ -160,10 +198,13 @@ def restart

if old_container
Console.info(self, "Stopping old container...")
old_container&.stop(@graceful_stop)

stop_container(old_container, @graceful_stop)
end

@notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.")
if @container
@notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.")
end
rescue => error
raise
ensure
Expand All @@ -188,7 +229,7 @@ def reload

# Wait for all child processes to enter the ready state.
Console.info(self, "Waiting for startup...")
@container.wait_until_ready
wait_until_ready(@container)
Console.info(self, "Finished startup.")

if @container.failed?
Expand All @@ -202,23 +243,15 @@ def reload

# Enter the controller run loop, trapping `SIGINT` and `SIGTERM`.
def run
@notify&.status!("Initializing controller...")

with_signal_handlers do
self.start
Sync do
@notify&.status!("Initializing controller...")

while @container&.running?
begin
@container.wait
rescue SignalException => exception
if handler = @signals[exception.signo]
begin
handler.call
rescue SetupError => error
Console.error(self, error)
end
else
raise
with_signal_handlers do
self.start

while @container&.running?
if container = @container
controller_sleep(container)
end
end
end
Expand All @@ -231,29 +264,83 @@ def run
self.stop(false)
end

private def with_signal_handlers
# I thought this was the default... but it doesn't always raise an exception unless you do this explicitly.
private def stop_container(container, graceful)
parent = Async::Task.current
task = parent.async(transient: true) do
container.stop(graceful)
end

while task.running?
controller_sleep(container)
end

task.wait
end

private def wait_until_ready(container)
until container.status?(:ready)
break unless container.running?

controller_sleep(container, graceful: container.status?(:ready))
end
end

private def controller_sleep(container = nil, graceful: @graceful_stop)
parent = Async::Task.current
result = Async::Queue.new

event_task = parent.async(transient: true) do
result << @event_queue.pop
end

container_task = if container
parent.async(transient: true) do
container.sleep
result << ContainerEvent
end
end

result.pop.apply(self, container: container, graceful: graceful)
ensure
event_task&.stop
container_task&.stop
end

private def process_signal(signal, container = @container, graceful = @graceful_stop)
if handler = @signals[signal]
begin
handler.call
rescue SetupError => error
Console.error(self, error)
end
elsif signal == SIGINT || signal == SIGTERM
target = @container || container

if target.equal?(@container)
self.stop
else
target&.stop(graceful)
end
else
raise SignalException.new(signal)
end
end

private def with_signal_handlers
interrupt_action = Signal.trap(:INT) do
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
# $stderr.puts "Received INT signal, interrupting...", caller
::Thread.current.raise(Interrupt)
@event_queue << INTERRUPT_EVENT
end

# SIGTERM behaves the same as SIGINT by default.
terminate_action = Signal.trap(:TERM) do
# $stderr.puts "Received TERM signal, interrupting...", caller
::Thread.current.raise(Interrupt) # Same as SIGINT
@event_queue << TERMINATE_EVENT
end

hangup_action = Signal.trap(:HUP) do
# $stderr.puts "Received HUP signal, restarting...", caller
::Thread.current.raise(Restart)
@event_queue << HANGUP_EVENT
end

::Thread.handle_interrupt(SignalException => :never) do
yield
end
yield
ensure
# Restore the interrupt handler:
Signal.trap(:INT, interrupt_action)
Expand Down
Loading
Loading