diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 15cc05f..fe64368 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -24,8 +24,8 @@ class Child < Channel class Instance < Notify::Pipe # Wrap an instance around the {Process} instance from within the forked child. # @parameter process [Process] The process intance to wrap. - def self.for(process) - instance = self.new(process.out) + def self.for(process, ordinal: nil) + instance = self.new(process.out, ordinal: ordinal) # The child process won't be reading from the channel: process.close_read @@ -38,12 +38,16 @@ def self.for(process) # Initialize the child process instance. # # @parameter io [IO] The IO object to use for communication. - def initialize(io) - super + def initialize(io, ordinal: nil) + super(io) @name = nil + @ordinal = ordinal end + # @returns [Integer | Nil] The container-scoped ordinal of this worker. + attr :ordinal + # Generate a hash representation of the process. # # @returns [Hash] The process as a hash, including `process_id` and `name`. @@ -51,6 +55,7 @@ def as_json(...) { process_id: ::Process.pid, name: @name, + ordinal: @ordinal, } end @@ -98,9 +103,9 @@ def exec(*arguments, ready: true, **options) # Fork a child process appropriate for a container. # # @returns [Process] - def self.fork(**options) + def self.fork(ordinal: nil, **options) # $stderr.puts fork: caller - self.new(**options) do |process| + self.new(ordinal: ordinal, **options) do |process| ::Process.fork do # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. Signal.trap(:INT){::Thread.current.raise(Interrupt)} @@ -109,7 +114,7 @@ def self.fork(**options) # This could be a configuration option: ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(process) + yield Instance.for(process, ordinal: ordinal) rescue Interrupt # Graceful exit. rescue Exception => error @@ -138,10 +143,11 @@ def self.spawn(*arguments, name: nil, **options) # Initialize the process. # @parameter name [String] The name to use for the child process. - def initialize(name: nil, **options) + def initialize(name: nil, ordinal: nil, **options) super(**options) @name = name + @ordinal = ordinal @status = nil @pid = nil @@ -185,6 +191,9 @@ def name= value # @attribute [Integer] The process identifier. attr :pid + # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. + attr :ordinal + # A human readable representation of the process. # @returns [String] def inspect @@ -265,8 +274,8 @@ def wait(timeout = 0.1) # Start a named child process and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, &block) - Child.fork(name: name, &block) + def start(name, ordinal: nil, &block) + Child.fork(name: name, ordinal: ordinal, &block) end end end diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index d7e1551..b51e773 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -9,6 +9,7 @@ require_relative "group" require_relative "keyed" +require_relative "ordinals" require_relative "statistics" require_relative "policy" @@ -45,7 +46,7 @@ def self.run(...) # # @parameter policy [Policy] The policy to use for managing child lifecycle events. # @parameter options [Hash] Options passed to the {Group} instance. - def initialize(policy: Policy::DEFAULT, **options) + def initialize(policy: Policy::DEFAULT, ordinals: nil, **options) @group = Group.new(**options) @stopping = false @@ -54,6 +55,7 @@ def initialize(policy: Policy::DEFAULT, **options) @policy = policy @statistics = @policy.make_statistics @keyed = {} + @ordinals = ordinals || Ordinals::Sequential.new end # @attribute [Group] The group of running children instances. @@ -221,13 +223,17 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu return false end + # Allocate before the fiber so the closure captures the ordinal and it stays + # unchanged across a restart (which re-enters `start` in the same fiber). + ordinal = @ordinals.acquire + @statistics.spawn! fiber do until @stopping Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) - child = self.start(name, &block) + child = self.start(name, ordinal: ordinal, &block) state = insert(key, child) # Notify policy of spawn @@ -300,6 +306,8 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu break end end + ensure + @ordinals.release(ordinal) end.resume return true diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index 8c6f4f5..b56c53c 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -24,9 +24,15 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio forks.times do self.spawn(**options) do |instance| - container = Threaded.new + # Fork ordinals are unique and stable across restart; each fork owns a + # deterministic fixed range for its inner threaded workers. + first_ordinal = instance.ordinal * threads + ordinals = Ordinals::Fixed.range(first_ordinal, threads) + container = Threaded.new(ordinals: ordinals) - container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block) + container.run(count: threads, health_check_timeout: health_check_timeout, **options) do |worker| + block.call(worker) + end container.wait_until_ready instance.ready! diff --git a/lib/async/container/ordinals.rb b/lib/async/container/ordinals.rb new file mode 100644 index 0000000..40ad80c --- /dev/null +++ b/lib/async/container/ordinals.rb @@ -0,0 +1,117 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "set" + +module Async + module Container + # Allocates stable ordinal values for container workers. + module Ordinals + # Raised when no ordinal can be allocated. + class Exhausted < RuntimeError + end + + # A sequential ordinal allocator with lowest-released ordinal reuse. + class Sequential + def initialize(initial = 0) + @next = initial + @free = Set.new + end + + # Reserve a fixed pool of ordinals from this allocator. + # @parameter count [Integer] The number of ordinals to reserve. + # @returns [Fixed] The reserved ordinals as a fixed allocator. + def reserve(count) + first = @next + @next += count + + return Fixed.new(first...@next) + end + + # Allocate the next available ordinal. + def acquire + unless @free.empty? + ordinal = @free.min + @free.delete(ordinal) + return ordinal + end + + ordinal = @next + @next += 1 + return ordinal + end + + # Return an ordinal to the allocator. + # @parameter ordinal [Integer] The ordinal to release. + def release(ordinal) + @free.add(ordinal) + end + end + + # An allocator backed by a fixed set of ordinals. + class Fixed + include Enumerable + + # Create a fixed pool from a contiguous range of ordinals. + # @parameter initial [Integer] The first ordinal in the range. + # @parameter count [Integer] The number of ordinals in the range. + # @returns [Fixed] The fixed allocator for the range. + def self.range(initial, count) + self.new(initial...(initial + count)) + end + + def initialize(ordinals) + @ordinals = ordinals.to_set.freeze + @free = @ordinals.dup + end + + # @attribute [Set(Integer)] The ordinals managed by this allocator. + attr :ordinals + + # Enumerate the ordinals managed by this allocator. + def each(&block) + @ordinals.each(&block) + end + + # Reserve a fixed pool of ordinals from this allocator. + # @parameter count [Integer] The number of ordinals to reserve. + # @returns [Fixed] The reserved ordinals as a fixed allocator. + def reserve(count) + if count > @free.size + raise Exhausted, "No ordinals available!" + end + + ordinals = @free.min(count) + ordinals.each do |ordinal| + @free.delete(ordinal) + end + + return Fixed.new(ordinals) + end + + # Allocate the lowest available ordinal from the fixed pool. + def acquire + unless @free.empty? + ordinal = @free.min + @free.delete(ordinal) + return ordinal + end + + raise Exhausted, "No ordinals available!" + end + + # Return an ordinal to the fixed pool. + # @parameter ordinal [Integer] The ordinal to release. + def release(ordinal) + unless @ordinals.include?(ordinal) + raise ArgumentError, "Cannot release ordinal #{ordinal.inspect} to #{self.class}!" + end + + @free.add(ordinal) + end + end + end + end +end diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index b1be095..9ef0c64 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -43,8 +43,8 @@ def error class Instance < Notify::Pipe # Wrap an instance around the {Thread} instance from within the threaded child. # @parameter thread [Thread] The thread intance to wrap. - def self.for(thread) - instance = self.new(thread.out) + def self.for(thread, ordinal: nil) + instance = self.new(thread.out, ordinal: ordinal) return instance end @@ -52,12 +52,16 @@ def self.for(thread) # Initialize the child thread instance. # # @parameter io [IO] The IO object to use for communication with the parent. - def initialize(io) + def initialize(io, ordinal: nil) @thread = ::Thread.current + @ordinal = ordinal - super + super(io) end + # @returns [Integer | Nil] The container-scoped ordinal of this worker. + attr :ordinal + # Generate a hash representation of the thread. # # @returns [Hash] The thread as a hash, including `process_id`, `thread_id`, and `name`. @@ -66,6 +70,7 @@ def as_json(...) process_id: ::Process.pid, thread_id: @thread.object_id, name: @thread.name, + ordinal: @ordinal, } end @@ -111,12 +116,12 @@ def exec(*arguments, ready: true, **options) # Start a new child thread and execute the provided block in it. # # @parameter options [Hash] Additional options to to the new child instance. - def self.fork(**options) - self.new(**options) do |thread| + def self.fork(ordinal: nil, **options) + self.new(ordinal: ordinal, **options) do |thread| ::Thread.new do # This could be a configuration option (see forked implementation too): ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(thread) + yield Instance.for(thread, ordinal: ordinal) end end end @@ -125,9 +130,10 @@ def self.fork(**options) # Initialize the thread. # # @parameter name [String] The name to use for the child thread. - def initialize(name: nil, **options) + def initialize(name: nil, ordinal: nil, **options) super(**options) + @ordinal = ordinal @status = nil @thread = yield(self) @@ -150,6 +156,9 @@ def initialize(name: nil, **options) end end + # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. + attr :ordinal + # Convert the child process to a hash, suitable for serialization. # # @returns [Hash] The request as a hash. @@ -286,8 +295,8 @@ def finished(error = nil) # Start a named child thread and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, &block) - Child.fork(name: name, &block) + def start(name, ordinal: nil, &block) + Child.fork(name: name, ordinal: ordinal, &block) end end end diff --git a/test/async/container/hybrid.rb b/test/async/container/hybrid.rb index 27b8848..280b547 100644 --- a/test/async/container/hybrid.rb +++ b/test/async/container/hybrid.rb @@ -19,6 +19,9 @@ interrupt_count = 0 threaded_class = Class.new + threaded_class.define_method(:initialize) do |**options| + @options = options + end threaded_class.define_method(:run) do |**options, &block| self end @@ -40,6 +43,10 @@ container_class = Class.new(subject) do def spawn(**options, &block) instance = Object.new + instance.define_singleton_method(:ordinal) do + 0 + end + def instance.ready! end diff --git a/test/async/container/ordinal.rb b/test/async/container/ordinal.rb new file mode 100644 index 0000000..c8a7a3b --- /dev/null +++ b/test/async/container/ordinal.rb @@ -0,0 +1,241 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Udi Oron. + +require "async/container/threaded" +require "async/container/forked" +require "async/container/hybrid" +require "async/container/ordinals" +require "async/container/best" + +# Collect what each worker reports about its own ordinal. +def collect_worker_ordinals(container, count: 3, **options) + input, output = IO.pipe + container.run(count: count, **options) do |instance| + output.write("#{instance.ordinal}\n") + end + container.wait + output.close + input.read.lines.map(&:to_i) +ensure + input&.close unless input&.closed? +end + +describe Async::Container::Generic do + let(:container) {Async::Container::Threaded.new} + + with "ordinal allocation" do + it "assigns sequential ordinals starting at 0" do + ordinals = collect_worker_ordinals(container, count: 3) + + expect(ordinals.sort).to be == [0, 1, 2] + end + + it "reuses the lowest released ordinal before extending the range" do + first = collect_worker_ordinals(container, count: 3) + second = collect_worker_ordinals(container, count: 1) + + expect(first.sort).to be == [0, 1, 2] + expect(second).to be == [0] + end + end + + with "keyed reuse" do + it "does not allocate an ordinal for a reused keyed child" do + input, output = IO.pipe + + container.spawn(key: :web) do |instance| + output.puts(instance.ordinal) + sleep + end + + expect(input.gets.to_i).to be == 0 + + reused = container.spawn(key: :web){sleep} # mark? hit => returns before allocating + + expect(reused).to be == false + expect(container.instance_variable_get(:@ordinals).acquire).to be == 1 + ensure + container.stop(false) + input&.close unless input&.closed? + output&.close unless output&.closed? + end + end + + with "injected ordinals" do + it "does not accept an ordinal option via run" do + expect do + container.run(count: 2, ordinal: 7){sleep} + end.to raise_exception(ArgumentError, message: be =~ /unknown keyword: :ordinal/) + end + end +end + +describe Async::Container::Ordinals::Sequential do + let(:ordinals) {subject.new} + + it "assigns sequential ordinals starting at 0" do + expect(ordinals.acquire).to be == 0 + expect(ordinals.acquire).to be == 1 + expect(ordinals.acquire).to be == 2 + end + + it "can start at an initial ordinal" do + ordinals = subject.new(10) + + expect(ordinals.acquire).to be == 10 + expect(ordinals.acquire).to be == 11 + end + + it "reuses the lowest released ordinal before extending the range" do + 3.times{ordinals.acquire} # => 0, 1, 2 + ordinals.release(1) + + expect(ordinals.acquire).to be == 1 # reused + expect(ordinals.acquire).to be == 3 # then extends + end + + it "does not hand out the same ordinal twice when an ordinal is released more than once" do + 2.times{ordinals.acquire} # => 0, 1 + ordinals.release(0) + ordinals.release(0) # double release must be idempotent + + expect(ordinals.acquire).to be == 0 # the recycled ordinal + expect(ordinals.acquire).to be == 2 # not 0 again + end + + it "reserves ordinals as a fixed allocator" do + reserved = ordinals.reserve(3) + + expect(reserved).to be_a(Async::Container::Ordinals::Fixed) + expect(reserved.to_a).to be == [0, 1, 2] + expect(ordinals.acquire).to be == 3 + end +end + +describe Async::Container::Ordinals::Fixed do + let(:ordinals) {subject.new([5, 7])} + + it "creates a fixed allocator from a range" do + ordinals = subject.range(5, 3) + + expect(ordinals.to_a).to be == [5, 6, 7] + end + + it "allocates from the fixed pool" do + expect(ordinals.acquire).to be == 5 + expect(ordinals.acquire).to be == 7 + expect{ordinals.acquire}.to raise_exception(Async::Container::Ordinals::Exhausted) + end + + it "can release ordinals back to the fixed pool" do + expect(ordinals.acquire).to be == 5 + ordinals.release(5) + expect(ordinals.acquire).to be == 5 + end + + it "reserves ordinals as a fixed allocator" do + reserved = ordinals.reserve(2) + + expect(reserved).to be_a(Async::Container::Ordinals::Fixed) + expect(reserved.to_a).to be == [5, 7] + expect{ordinals.acquire}.to raise_exception(Async::Container::Ordinals::Exhausted) + end + + it "does not partially reserve ordinals if the pool is too small" do + expect{ordinals.reserve(3)}.to raise_exception(Async::Container::Ordinals::Exhausted) + + expect(ordinals.acquire).to be == 5 + expect(ordinals.acquire).to be == 7 + end + + it "rejects ordinals outside the fixed pool" do + expect{ordinals.release(6)}.to raise_exception(ArgumentError) + end +end + +describe Async::Container::Threaded do + let(:container) {subject.new} + + it "exposes instance.ordinal to the worker" do + ordinals = collect_worker_ordinals(container, count: 3) + + expect(ordinals.sort).to be == [0, 1, 2] + expect(container.statistics).to have_attributes(failures: be == 0) + end + + it "preserves instance.ordinal across a restart" do + trigger = IO.pipe + ordinals = IO.pipe + + runner = Thread.new do + container.spawn(restart: true) do |instance| + ordinals.last.puts(instance.ordinal.to_s) + trigger.first.gets # block until told to exit, then the worker restarts + end + container.wait + end + + reported = [] + 2.times do + reported << ordinals.first.gets.to_i + trigger.last.puts("die") + end + + runner.kill + runner.join + + # Same ordinal allocated for both incarnations (ordinal is captured outside the restart loop): + expect(reported).to be == [reported.first, reported.first] + end + + it "does not inherit ordinals into independently managed child containers" do + input, output = IO.pipe + + container.run(count: 1) do |instance| + child = subject.new + child_ordinals = collect_worker_ordinals(child, count: 2) + + output.puts("outer=#{instance.ordinal} child=#{child_ordinals.sort.join(",")}") + end + + container.wait + output.close + reported = input.read.lines.map(&:chomp) + + expect(reported).to be == ["outer=0 child=0,1"] + ensure + input&.close unless input&.closed? + end +end + +describe Async::Container::Forked do + let(:container) {subject.new} + + it "exposes instance.ordinal to the worker" do + ordinals = collect_worker_ordinals(container, count: 3) + + expect(ordinals.sort).to be == [0, 1, 2] + expect(container.statistics).to have_attributes(failures: be == 0) + end +end if Async::Container.fork? + +describe Async::Container::Hybrid do + let(:container) {subject.new} + + it "assigns unique worker ordinals across forked threaded workers" do + ordinals = collect_worker_ordinals(container, count: 4, forks: 2, threads: 2) + + expect(ordinals.sort).to be == [0, 1, 2, 3] + expect(container.statistics).to have_attributes(failures: be == 0) + end + + it "anchors worker ordinal ranges to the fork ordinal" do + container = subject.new(ordinals: Async::Container::Ordinals::Sequential.new(3)) + ordinals = collect_worker_ordinals(container, count: 4, forks: 2, threads: 2) + + expect(ordinals.sort).to be == [6, 7, 8, 9] + expect(container.statistics).to have_attributes(failures: be == 0) + end +end if Async::Container.fork?