Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions lib/async/container/forked.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class Child < Channel
class Instance < Notify::Pipe
# Wrap an instance around the {Process} instance from within the forked child.
# @parameter process [Process] The process intance to wrap.
def self.for(process)
instance = self.new(process.out)
def self.for(process, ordinal: nil)
instance = self.new(process.out, ordinal: ordinal)

# The child process won't be reading from the channel:
process.close_read
Expand All @@ -38,19 +38,24 @@ def self.for(process)
# Initialize the child process instance.
#
# @parameter io [IO] The IO object to use for communication.
def initialize(io)
super
def initialize(io, ordinal: nil)
super(io)

@name = nil
@ordinal = ordinal
end

# @returns [Integer | Nil] The container-scoped ordinal of this worker.
attr :ordinal

# Generate a hash representation of the process.
#
# @returns [Hash] The process as a hash, including `process_id` and `name`.
def as_json(...)
{
process_id: ::Process.pid,
name: @name,
ordinal: @ordinal,
}
end

Expand Down Expand Up @@ -98,9 +103,9 @@ def exec(*arguments, ready: true, **options)
# Fork a child process appropriate for a container.
#
# @returns [Process]
def self.fork(**options)
def self.fork(ordinal: nil, **options)
# $stderr.puts fork: caller
self.new(**options) do |process|
self.new(ordinal: ordinal, **options) do |process|
::Process.fork do
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
Signal.trap(:INT){::Thread.current.raise(Interrupt)}
Expand All @@ -109,7 +114,7 @@ def self.fork(**options)

# This could be a configuration option:
::Thread.handle_interrupt(SignalException => :immediate) do
yield Instance.for(process)
yield Instance.for(process, ordinal: ordinal)
rescue Interrupt
# Graceful exit.
rescue Exception => error
Expand Down Expand Up @@ -138,10 +143,11 @@ def self.spawn(*arguments, name: nil, **options)

# Initialize the process.
# @parameter name [String] The name to use for the child process.
def initialize(name: nil, **options)
def initialize(name: nil, ordinal: nil, **options)
super(**options)

@name = name
@ordinal = ordinal
@status = nil
@pid = nil

Expand Down Expand Up @@ -185,6 +191,9 @@ def name= value
# @attribute [Integer] The process identifier.
attr :pid

# @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents.
attr :ordinal

# A human readable representation of the process.
# @returns [String]
def inspect
Expand Down Expand Up @@ -265,8 +274,8 @@ def wait(timeout = 0.1)
# Start a named child process and execute the provided block in it.
# @parameter name [String] The name (title) of the child process.
# @parameter block [Proc] The block to execute in the child process.
def start(name, &block)
Child.fork(name: name, &block)
def start(name, ordinal: nil, &block)
Child.fork(name: name, ordinal: ordinal, &block)
end
end
end
Expand Down
12 changes: 10 additions & 2 deletions lib/async/container/generic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

require_relative "group"
require_relative "keyed"
require_relative "ordinals"
require_relative "statistics"
require_relative "policy"

Expand Down Expand Up @@ -45,7 +46,7 @@ def self.run(...)
#
# @parameter policy [Policy] The policy to use for managing child lifecycle events.
# @parameter options [Hash] Options passed to the {Group} instance.
def initialize(policy: Policy::DEFAULT, **options)
def initialize(policy: Policy::DEFAULT, ordinals: nil, **options)
@group = Group.new(**options)
@stopping = false

Expand All @@ -54,6 +55,7 @@ def initialize(policy: Policy::DEFAULT, **options)
@policy = policy
@statistics = @policy.make_statistics
@keyed = {}
@ordinals = ordinals || Ordinals::Sequential.new
end

# @attribute [Group] The group of running children instances.
Expand Down Expand Up @@ -221,13 +223,17 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu
return false
end

# Allocate before the fiber so the closure captures the ordinal and it stays
# unchanged across a restart (which re-enters `start` in the same fiber).
ordinal = @ordinals.acquire

@statistics.spawn!

fiber do
until @stopping
Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics)

child = self.start(name, &block)
child = self.start(name, ordinal: ordinal, &block)
state = insert(key, child)

# Notify policy of spawn
Expand Down Expand Up @@ -300,6 +306,8 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu
break
end
end
ensure
@ordinals.release(ordinal)
end.resume

return true
Expand Down
10 changes: 8 additions & 2 deletions lib/async/container/hybrid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio

forks.times do
self.spawn(**options) do |instance|
container = Threaded.new
# Fork ordinals are unique and stable across restart; each fork owns a
# deterministic fixed range for its inner threaded workers.
first_ordinal = instance.ordinal * threads
ordinals = Ordinals::Fixed.range(first_ordinal, threads)
container = Threaded.new(ordinals: ordinals)

container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block)
container.run(count: threads, health_check_timeout: health_check_timeout, **options) do |worker|
block.call(worker)
end

container.wait_until_ready
instance.ready!
Expand Down
117 changes: 117 additions & 0 deletions lib/async/container/ordinals.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require "set"

module Async
module Container
# Allocates stable ordinal values for container workers.
module Ordinals
# Raised when no ordinal can be allocated.
class Exhausted < RuntimeError
end

# A sequential ordinal allocator with lowest-released ordinal reuse.
class Sequential
def initialize(initial = 0)
@next = initial
@free = Set.new
end

# Reserve a fixed pool of ordinals from this allocator.
# @parameter count [Integer] The number of ordinals to reserve.
# @returns [Fixed] The reserved ordinals as a fixed allocator.
def reserve(count)
first = @next
@next += count

return Fixed.new(first...@next)
end

# Allocate the next available ordinal.
def acquire
unless @free.empty?
ordinal = @free.min
@free.delete(ordinal)
return ordinal
end

ordinal = @next
@next += 1
return ordinal
end

# Return an ordinal to the allocator.
# @parameter ordinal [Integer] The ordinal to release.
def release(ordinal)
@free.add(ordinal)
end
end

# An allocator backed by a fixed set of ordinals.
class Fixed
include Enumerable

# Create a fixed pool from a contiguous range of ordinals.
# @parameter initial [Integer] The first ordinal in the range.
# @parameter count [Integer] The number of ordinals in the range.
# @returns [Fixed] The fixed allocator for the range.
def self.range(initial, count)
self.new(initial...(initial + count))
end

def initialize(ordinals)
@ordinals = ordinals.to_set.freeze
@free = @ordinals.dup
end

# @attribute [Set(Integer)] The ordinals managed by this allocator.
attr :ordinals

# Enumerate the ordinals managed by this allocator.
def each(&block)
@ordinals.each(&block)
end

# Reserve a fixed pool of ordinals from this allocator.
# @parameter count [Integer] The number of ordinals to reserve.
# @returns [Fixed] The reserved ordinals as a fixed allocator.
def reserve(count)
if count > @free.size
raise Exhausted, "No ordinals available!"
end

ordinals = @free.min(count)
ordinals.each do |ordinal|
@free.delete(ordinal)
end

return Fixed.new(ordinals)
end

# Allocate the lowest available ordinal from the fixed pool.
def acquire
unless @free.empty?
ordinal = @free.min
@free.delete(ordinal)
return ordinal
end

raise Exhausted, "No ordinals available!"
end

# Return an ordinal to the fixed pool.
# @parameter ordinal [Integer] The ordinal to release.
def release(ordinal)
unless @ordinals.include?(ordinal)
raise ArgumentError, "Cannot release ordinal #{ordinal.inspect} to #{self.class}!"
end

@free.add(ordinal)
end
end
end
end
end
29 changes: 19 additions & 10 deletions lib/async/container/threaded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,25 @@ def error
class Instance < Notify::Pipe
# Wrap an instance around the {Thread} instance from within the threaded child.
# @parameter thread [Thread] The thread intance to wrap.
def self.for(thread)
instance = self.new(thread.out)
def self.for(thread, ordinal: nil)
instance = self.new(thread.out, ordinal: ordinal)

return instance
end

# Initialize the child thread instance.
#
# @parameter io [IO] The IO object to use for communication with the parent.
def initialize(io)
def initialize(io, ordinal: nil)
@thread = ::Thread.current
@ordinal = ordinal

super
super(io)
end

# @returns [Integer | Nil] The container-scoped ordinal of this worker.
attr :ordinal

# Generate a hash representation of the thread.
#
# @returns [Hash] The thread as a hash, including `process_id`, `thread_id`, and `name`.
Expand All @@ -66,6 +70,7 @@ def as_json(...)
process_id: ::Process.pid,
thread_id: @thread.object_id,
name: @thread.name,
ordinal: @ordinal,
}
end

Expand Down Expand Up @@ -111,12 +116,12 @@ def exec(*arguments, ready: true, **options)
# Start a new child thread and execute the provided block in it.
#
# @parameter options [Hash] Additional options to to the new child instance.
def self.fork(**options)
self.new(**options) do |thread|
def self.fork(ordinal: nil, **options)
self.new(ordinal: ordinal, **options) do |thread|
::Thread.new do
# This could be a configuration option (see forked implementation too):
::Thread.handle_interrupt(SignalException => :immediate) do
yield Instance.for(thread)
yield Instance.for(thread, ordinal: ordinal)
end
end
end
Expand All @@ -125,9 +130,10 @@ def self.fork(**options)
# Initialize the thread.
#
# @parameter name [String] The name to use for the child thread.
def initialize(name: nil, **options)
def initialize(name: nil, ordinal: nil, **options)
super(**options)

@ordinal = ordinal
@status = nil

@thread = yield(self)
Expand All @@ -150,6 +156,9 @@ def initialize(name: nil, **options)
end
end

# @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents.
attr :ordinal

# Convert the child process to a hash, suitable for serialization.
#
# @returns [Hash] The request as a hash.
Expand Down Expand Up @@ -286,8 +295,8 @@ def finished(error = nil)
# Start a named child thread and execute the provided block in it.
# @parameter name [String] The name (title) of the child process.
# @parameter block [Proc] The block to execute in the child process.
def start(name, &block)
Child.fork(name: name, &block)
def start(name, ordinal: nil, &block)
Child.fork(name: name, ordinal: ordinal, &block)
end
end
end
Expand Down
7 changes: 7 additions & 0 deletions test/async/container/hybrid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
interrupt_count = 0

threaded_class = Class.new
threaded_class.define_method(:initialize) do |**options|
@options = options
end
threaded_class.define_method(:run) do |**options, &block|
self
end
Expand All @@ -40,6 +43,10 @@
container_class = Class.new(subject) do
def spawn(**options, &block)
instance = Object.new
instance.define_singleton_method(:ordinal) do
0
end

def instance.ready!
end

Expand Down
Loading
Loading