From fffa67a947476c7e3f515bf61290e363aeaacafe Mon Sep 17 00:00:00 2001 From: Udi Feldman Date: Mon, 1 Jun 2026 16:13:04 -0400 Subject: [PATCH 1/5] Add durable worker num and execution context Containers expose nothing to a worker about which worker it is; the Child::Instance handed to the run block carries only `name`. Code that needs a stable per-worker identifier (e.g. a prometheus-client-mmap pid_provider, which keys mmap files per process) has nothing to key on, and under Falcon/async-service the app never holds the run block itself. Such an identifier needs to be both durable across a restart (so metrics survive a re-fork instead of fragmenting) and bounded in cardinality (drawn from 0..N-1 rather than the open-ended PID space, so the file and series count stays constant). A recycled worker ordinal satisfies both. Add a container-scoped `num` allocated by Generic (a counter plus a Set free-list; idempotent release), captured in the spawn closure so it is unchanged when a `restart: true` worker re-enters `start`. Expose `num` and `kind` on Child::Instance, `instance_num` on the parent-side Child, and a `parent` link plus a `context` Frame stack built from the object graph. Hybrid links each inner thread worker to its fork, so a Hybrid thread can reach its durable process num via `instance.parent.num` with no process- or thread-global state. --- lib/async/container/context.rb | 31 +++++++ lib/async/container/forked.rb | 37 ++++++--- lib/async/container/generic.rb | 38 ++++++++- lib/async/container/hybrid.rb | 7 +- lib/async/container/threaded.rb | 37 ++++++--- test/async/container/context.rb | 64 +++++++++++++++ test/async/container/instance_num.rb | 118 +++++++++++++++++++++++++++ 7 files changed, 309 insertions(+), 23 deletions(-) create mode 100644 lib/async/container/context.rb create mode 100644 test/async/container/context.rb create mode 100644 test/async/container/instance_num.rb diff --git a/lib/async/container/context.rb b/lib/async/container/context.rb new file mode 100644 index 0000000..9c9d056 --- /dev/null +++ b/lib/async/container/context.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Udi Oron. + +module Async + module Container + # A single level of a worker's execution context. + # @attribute kind [Symbol] Either `:process` or `:thread`. + # @attribute num [Integer] The container-scoped ordinal of the worker at this level. + # @attribute name [String | Nil] The name the container was given for this worker. + Frame = Data.define(:kind, :num, :name) + + # Mixed into each container's `Child::Instance` to expose its place in the worker + # hierarchy. The frame stack is built from the object graph (this instance plus its + # parent chain), so worker code that holds the instance - e.g. an async-service + # `prepare!(instance)` hook - can read its durable worker number with no process or + # thread globals. + module Context + # The instance of the worker this one is nested inside, or `nil` at the top level. + # A {Hybrid} thread's parent is its fork; a plain {Forked}/{Threaded} worker has none. + attr_accessor :parent + + # The execution context as a {Frame} stack, outermost level first. + # @returns [Array(Frame)] e.g. `[process, thread]` for a Hybrid thread worker. + def context + (parent ? parent.context : []) + [Frame.new(kind: kind, num: num, name: name)] + end + end + end +end diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 15cc05f..91a0db5 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -7,6 +7,7 @@ require_relative "generic" require_relative "channel" +require_relative "context" require_relative "notify/pipe" module Async @@ -22,10 +23,12 @@ def self.multiprocess? class Child < Channel # Represents a running child process from the point of view of the child process. class Instance < Notify::Pipe + include Context + # Wrap an instance around the {Process} instance from within the forked child. # @parameter process [Process] The process intance to wrap. - def self.for(process) - instance = self.new(process.out) + def self.for(process, instance_num: nil) + instance = self.new(process.out, num: instance_num) # The child process won't be reading from the channel: process.close_read @@ -38,10 +41,19 @@ def self.for(process) # Initialize the child process instance. # # @parameter io [IO] The IO object to use for communication. - def initialize(io) - super + def initialize(io, num: nil) + super(io) @name = nil + @num = num + end + + # @returns [Integer | Nil] The container-scoped ordinal of this worker. + attr :num + + # @returns [Symbol] The kind of worker this instance represents. + def kind + :process end # Generate a hash representation of the process. @@ -51,6 +63,7 @@ def as_json(...) { process_id: ::Process.pid, name: @name, + num: @num, } end @@ -98,9 +111,9 @@ def exec(*arguments, ready: true, **options) # Fork a child process appropriate for a container. # # @returns [Process] - def self.fork(**options) + def self.fork(instance_num: nil, **options) # $stderr.puts fork: caller - self.new(**options) do |process| + self.new(instance_num: instance_num, **options) do |process| ::Process.fork do # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. Signal.trap(:INT){::Thread.current.raise(Interrupt)} @@ -109,7 +122,7 @@ def self.fork(**options) # This could be a configuration option: ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(process) + yield Instance.for(process, instance_num: instance_num) rescue Interrupt # Graceful exit. rescue Exception => error @@ -138,10 +151,11 @@ def self.spawn(*arguments, name: nil, **options) # Initialize the process. # @parameter name [String] The name to use for the child process. - def initialize(name: nil, **options) + def initialize(name: nil, instance_num: nil, **options) super(**options) @name = name + @instance_num = instance_num @status = nil @pid = nil @@ -185,6 +199,9 @@ def name= value # @attribute [Integer] The process identifier. attr :pid + # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. + attr :instance_num + # A human readable representation of the process. # @returns [String] def inspect @@ -265,8 +282,8 @@ def wait(timeout = 0.1) # Start a named child process and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, &block) - Child.fork(name: name, &block) + def start(name, instance_num: nil, &block) + Child.fork(name: name, instance_num: instance_num, &block) end end end diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index d7e1551..22af78d 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -54,6 +54,11 @@ def initialize(policy: Policy::DEFAULT, **options) @policy = policy @statistics = @policy.make_statistics @keyed = {} + # Container-scoped allocation of worker ordinals (`instance_num`): a monotonic + # counter plus a free set, so a num released by a permanently exited worker is + # recycled, keeping the range compact (e.g. for multiprocess metric files). + @next_instance_num = 0 + @free_instance_nums = Set.new end # @attribute [Group] The group of running children instances. @@ -213,7 +218,7 @@ def stop(timeout = true) # @parameter key [Symbol] A key used for reloading child instances. # @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy. # @parameter startup_timeout [Numeric | Nil] The maximum time a child instance can run without becoming ready, before it is terminated as unhealthy. - def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block) + def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_timeout: nil, startup_timeout: nil, &block) name ||= UNNAMED if mark?(key) @@ -221,13 +226,19 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu return false end + # Allocate before the fiber so the closure captures the num and it stays unchanged + # across a restart (which re-enters `start` in the same fiber). Only release a num + # we allocated ourselves, and only when the worker permanently exits. + owned = instance_num.nil? + instance_num ||= acquire_instance_num + @statistics.spawn! fiber do until @stopping Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) - child = self.start(name, &block) + child = self.start(name, instance_num: instance_num, &block) state = insert(key, child) # Notify policy of spawn @@ -300,11 +311,34 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu break end end + ensure + release_instance_num(instance_num) if owned end.resume return true end + # Allocate a container-scoped worker ordinal, recycling the lowest released num. + # Allocation runs on the single cooperative reactor thread (acquire in the run loop, + # release in the fiber's ensure), so no synchronisation is required. + protected def acquire_instance_num + unless @free_instance_nums.empty? + num = @free_instance_nums.min + @free_instance_nums.delete(num) + return num + end + + num = @next_instance_num + @next_instance_num += 1 + num + end + + # Return a worker ordinal to the free set once its worker has permanently exited. Using a + # Set makes release idempotent, so a double release can't hand the same num to two workers. + protected def release_instance_num(instance_num) + @free_instance_nums.add(instance_num) + end + # Run multiple instances of the same block in the container. # @parameter count [Integer] The number of instances to start. def run(count: Container.processor_count, **options, &block) diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index 8c6f4f5..e23270f 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -26,7 +26,12 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio self.spawn(**options) do |instance| container = Threaded.new - container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block) + # Link each inner thread worker to this fork, so the thread instance can reach + # the durable process num via `instance.parent` / `instance.context`. + container.run(count: threads, health_check_timeout: health_check_timeout, **options) do |worker| + worker.parent = instance + block.call(worker) + end container.wait_until_ready instance.ready! diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index b1be095..5bc6af0 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -5,6 +5,7 @@ require_relative "generic" require_relative "channel" +require_relative "context" require_relative "notify/pipe" module Async @@ -41,10 +42,12 @@ def error # Represents a running child thread from the point of view of the child thread. class Instance < Notify::Pipe + include Context + # Wrap an instance around the {Thread} instance from within the threaded child. # @parameter thread [Thread] The thread intance to wrap. - def self.for(thread) - instance = self.new(thread.out) + def self.for(thread, instance_num: nil) + instance = self.new(thread.out, num: instance_num) return instance end @@ -52,10 +55,19 @@ def self.for(thread) # Initialize the child thread instance. # # @parameter io [IO] The IO object to use for communication with the parent. - def initialize(io) + def initialize(io, num: nil) @thread = ::Thread.current + @num = num - super + super(io) + end + + # @returns [Integer | Nil] The container-scoped ordinal of this worker. + attr :num + + # @returns [Symbol] The kind of worker this instance represents. + def kind + :thread end # Generate a hash representation of the thread. @@ -66,6 +78,7 @@ def as_json(...) process_id: ::Process.pid, thread_id: @thread.object_id, name: @thread.name, + num: @num, } end @@ -111,12 +124,12 @@ def exec(*arguments, ready: true, **options) # Start a new child thread and execute the provided block in it. # # @parameter options [Hash] Additional options to to the new child instance. - def self.fork(**options) - self.new(**options) do |thread| + def self.fork(instance_num: nil, **options) + self.new(instance_num: instance_num, **options) do |thread| ::Thread.new do # This could be a configuration option (see forked implementation too): ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(thread) + yield Instance.for(thread, instance_num: instance_num) end end end @@ -125,9 +138,10 @@ def self.fork(**options) # Initialize the thread. # # @parameter name [String] The name to use for the child thread. - def initialize(name: nil, **options) + def initialize(name: nil, instance_num: nil, **options) super(**options) + @instance_num = instance_num @status = nil @thread = yield(self) @@ -150,6 +164,9 @@ def initialize(name: nil, **options) end end + # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. + attr :instance_num + # Convert the child process to a hash, suitable for serialization. # # @returns [Hash] The request as a hash. @@ -286,8 +303,8 @@ def finished(error = nil) # Start a named child thread and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, &block) - Child.fork(name: name, &block) + def start(name, instance_num: nil, &block) + Child.fork(name: name, instance_num: instance_num, &block) end end end diff --git a/test/async/container/context.rb b/test/async/container/context.rb new file mode 100644 index 0000000..cd50056 --- /dev/null +++ b/test/async/container/context.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Udi Oron. + +require "async/container/threaded" +require "async/container/forked" +require "async/container/hybrid" +require "async/container/best" + +# Have the worker serialise something about its instance to a pipe, one line per worker. +def report_from_worker(container, **run_options) + input, output = IO.pipe + container.run(**run_options) do |instance| + output.write(yield(instance) + "\n") + end + container.wait + output.close + input.read.lines.map(&:chomp) +ensure + input&.close unless input&.closed? +end + +describe Async::Container::Threaded do + it "exposes a single :thread frame and no parent via instance.context" do + reported = report_from_worker(subject.new, count: 1) do |instance| + "#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}" + end + + expect(reported).to be == ["thread:0|parent=nil"] + end +end + +describe Async::Container::Forked do + it "exposes a single :process frame and no parent via instance.context" do + reported = report_from_worker(subject.new, count: 1) do |instance| + "#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}" + end + + expect(reported).to be == ["process:0|parent=nil"] + end +end if Async::Container.fork? + +describe Async::Container::Hybrid do + it "exposes [:process, :thread] frames via instance.context" do + reported = report_from_worker(subject.new, count: 1, forks: 1, threads: 1) do |instance| + instance.context.map {|f| f.kind}.join(",") + end + + expect(reported).to be == ["process,thread"] + end + + it "reaches the durable forked num through instance.parent (not the thread num)" do + reported = report_from_worker(subject.new, count: 2, forks: 2, threads: 1) do |instance| + "#{instance.kind}/#{instance.num} parent=#{instance.parent&.kind}/#{instance.parent&.num}" + end + + # Both workers are thread num 0 within their fork; the durable forked num is on the parent. + expect(reported.sort).to be == [ + "thread/0 parent=process/0", + "thread/0 parent=process/1", + ] + end +end if Async::Container.fork? diff --git a/test/async/container/instance_num.rb b/test/async/container/instance_num.rb new file mode 100644 index 0000000..af25ac2 --- /dev/null +++ b/test/async/container/instance_num.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Udi Oron. + +require "async/container/threaded" +require "async/container/forked" +require "async/container/best" + +# Collect what each worker reports about its own instance context. +def collect_worker_context(container, count: 3, **options) + input, output = IO.pipe + container.run(count: count, **options) do |instance| + output.write("#{instance.num}:#{instance.kind}\n") + end + container.wait + output.close + input.read.lines.map(&:chomp) +ensure + input&.close unless input&.closed? +end + +describe Async::Container::Generic do + let(:container) {Async::Container::Threaded.new} + + with "#acquire_instance_num" do + it "assigns sequential nums starting at 0" do + expect(container.send(:acquire_instance_num)).to be == 0 + expect(container.send(:acquire_instance_num)).to be == 1 + expect(container.send(:acquire_instance_num)).to be == 2 + end + + it "reuses the lowest released num before extending the range" do + 3.times {container.send(:acquire_instance_num)} # => 0, 1, 2 + container.send(:release_instance_num, 1) + + expect(container.send(:acquire_instance_num)).to be == 1 # reused + expect(container.send(:acquire_instance_num)).to be == 3 # then extends + end + + it "does not hand out the same num twice when a num is released more than once" do + 2.times {container.send(:acquire_instance_num)} # => 0, 1 + container.send(:release_instance_num, 0) + container.send(:release_instance_num, 0) # double release must be idempotent + + expect(container.send(:acquire_instance_num)).to be == 0 # the recycled num + expect(container.send(:acquire_instance_num)).to be == 2 # not 0 again + end + end + + with "keyed reuse" do + it "does not allocate a num for a reused keyed child" do + container.spawn(key: :web) {sleep} # allocates 0, registers the key + reused = container.spawn(key: :web) {sleep} # mark? hit => returns before allocating + + expect(reused).to be == false + # If the second spawn had allocated, the next free num would be 2: + expect(container.send(:acquire_instance_num)).to be == 1 + ensure + container.stop(false) + end + end +end + +describe Async::Container::Threaded do + let(:container) {subject.new} + + it "exposes instance.num and instance.kind to the worker (kind: :thread)" do + reported = collect_worker_context(container, count: 3) + + nums = reported.map {|line| line.split(":").first.to_i} + kinds = reported.map {|line| line.split(":").last}.uniq + + expect(nums.sort).to be == [0, 1, 2] + expect(kinds).to be == ["thread"] + expect(container.statistics).to have_attributes(failures: be == 0) + end + + it "preserves instance.num across a restart" do + trigger = IO.pipe + nums = IO.pipe + + runner = Thread.new do + container.spawn(restart: true) do |instance| + nums.last.puts(instance.num.to_s) + trigger.first.gets # block until told to exit, then the worker restarts + end + container.wait + end + + reported = [] + 2.times do + reported << nums.first.gets.to_i + trigger.last.puts("die") + end + + runner.kill + runner.join + + # Same num allocated for both incarnations (num is captured outside the restart loop): + expect(reported).to be == [reported.first, reported.first] + end +end + +describe Async::Container::Forked do + let(:container) {subject.new} + + it "exposes instance.num and instance.kind to the worker (kind: :process)" do + reported = collect_worker_context(container, count: 3) + + nums = reported.map {|line| line.split(":").first.to_i} + kinds = reported.map {|line| line.split(":").last}.uniq + + expect(nums.sort).to be == [0, 1, 2] + expect(kinds).to be == ["process"] + expect(container.statistics).to have_attributes(failures: be == 0) + end +end if Async::Container.fork? From 3be05d0409042c70039619b44f9069cd13288cf7 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 12:47:52 +1200 Subject: [PATCH 2/5] Simplify worker ordinal API Assisted-By: devx/c78f867c-4c73-40b2-a763-4a9332e15ef9 --- lib/async/container/context.rb | 31 ----- lib/async/container/forked.rb | 37 +++--- lib/async/container/generic.rb | 46 ++++--- lib/async/container/hybrid.rb | 2 +- lib/async/container/threaded.rb | 37 +++--- test/async/container/instance_num.rb | 118 ------------------ test/async/container/ordinal.rb | 118 ++++++++++++++++++ .../async/container/{context.rb => parent.rb} | 36 +++--- 8 files changed, 187 insertions(+), 238 deletions(-) delete mode 100644 lib/async/container/context.rb delete mode 100644 test/async/container/instance_num.rb create mode 100644 test/async/container/ordinal.rb rename test/async/container/{context.rb => parent.rb} (50%) diff --git a/lib/async/container/context.rb b/lib/async/container/context.rb deleted file mode 100644 index 9c9d056..0000000 --- a/lib/async/container/context.rb +++ /dev/null @@ -1,31 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2026, by Udi Oron. - -module Async - module Container - # A single level of a worker's execution context. - # @attribute kind [Symbol] Either `:process` or `:thread`. - # @attribute num [Integer] The container-scoped ordinal of the worker at this level. - # @attribute name [String | Nil] The name the container was given for this worker. - Frame = Data.define(:kind, :num, :name) - - # Mixed into each container's `Child::Instance` to expose its place in the worker - # hierarchy. The frame stack is built from the object graph (this instance plus its - # parent chain), so worker code that holds the instance - e.g. an async-service - # `prepare!(instance)` hook - can read its durable worker number with no process or - # thread globals. - module Context - # The instance of the worker this one is nested inside, or `nil` at the top level. - # A {Hybrid} thread's parent is its fork; a plain {Forked}/{Threaded} worker has none. - attr_accessor :parent - - # The execution context as a {Frame} stack, outermost level first. - # @returns [Array(Frame)] e.g. `[process, thread]` for a Hybrid thread worker. - def context - (parent ? parent.context : []) + [Frame.new(kind: kind, num: num, name: name)] - end - end - end -end diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 91a0db5..a9512a0 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -7,7 +7,6 @@ require_relative "generic" require_relative "channel" -require_relative "context" require_relative "notify/pipe" module Async @@ -23,12 +22,10 @@ def self.multiprocess? class Child < Channel # Represents a running child process from the point of view of the child process. class Instance < Notify::Pipe - include Context - # Wrap an instance around the {Process} instance from within the forked child. # @parameter process [Process] The process intance to wrap. - def self.for(process, instance_num: nil) - instance = self.new(process.out, num: instance_num) + def self.for(process, ordinal: nil) + instance = self.new(process.out, ordinal: ordinal) # The child process won't be reading from the channel: process.close_read @@ -41,20 +38,18 @@ def self.for(process, instance_num: nil) # Initialize the child process instance. # # @parameter io [IO] The IO object to use for communication. - def initialize(io, num: nil) + def initialize(io, ordinal: nil) super(io) @name = nil - @num = num + @ordinal = ordinal end # @returns [Integer | Nil] The container-scoped ordinal of this worker. - attr :num + attr :ordinal - # @returns [Symbol] The kind of worker this instance represents. - def kind - :process - end + # @returns [Object | Nil] The worker this one is nested inside. + attr_accessor :parent # Generate a hash representation of the process. # @@ -63,7 +58,7 @@ def as_json(...) { process_id: ::Process.pid, name: @name, - num: @num, + ordinal: @ordinal, } end @@ -111,9 +106,9 @@ def exec(*arguments, ready: true, **options) # Fork a child process appropriate for a container. # # @returns [Process] - def self.fork(instance_num: nil, **options) + def self.fork(ordinal: nil, **options) # $stderr.puts fork: caller - self.new(instance_num: instance_num, **options) do |process| + self.new(ordinal: ordinal, **options) do |process| ::Process.fork do # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. Signal.trap(:INT){::Thread.current.raise(Interrupt)} @@ -122,7 +117,7 @@ def self.fork(instance_num: nil, **options) # This could be a configuration option: ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(process, instance_num: instance_num) + yield Instance.for(process, ordinal: ordinal) rescue Interrupt # Graceful exit. rescue Exception => error @@ -151,11 +146,11 @@ def self.spawn(*arguments, name: nil, **options) # Initialize the process. # @parameter name [String] The name to use for the child process. - def initialize(name: nil, instance_num: nil, **options) + def initialize(name: nil, ordinal: nil, **options) super(**options) @name = name - @instance_num = instance_num + @ordinal = ordinal @status = nil @pid = nil @@ -200,7 +195,7 @@ def name= value attr :pid # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. - attr :instance_num + attr :ordinal # A human readable representation of the process. # @returns [String] @@ -282,8 +277,8 @@ def wait(timeout = 0.1) # Start a named child process and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, instance_num: nil, &block) - Child.fork(name: name, instance_num: instance_num, &block) + def start(name, ordinal: nil, &block) + Child.fork(name: name, ordinal: ordinal, &block) end end end diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index 22af78d..cebee88 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -54,11 +54,11 @@ def initialize(policy: Policy::DEFAULT, **options) @policy = policy @statistics = @policy.make_statistics @keyed = {} - # Container-scoped allocation of worker ordinals (`instance_num`): a monotonic - # counter plus a free set, so a num released by a permanently exited worker is + # Container-scoped allocation of worker ordinals: a monotonic + # counter plus a free set, so an ordinal released by a permanently exited worker is # recycled, keeping the range compact (e.g. for multiprocess metric files). - @next_instance_num = 0 - @free_instance_nums = Set.new + @next_ordinal = 0 + @free_ordinals = Set.new end # @attribute [Group] The group of running children instances. @@ -218,7 +218,7 @@ def stop(timeout = true) # @parameter key [Symbol] A key used for reloading child instances. # @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy. # @parameter startup_timeout [Numeric | Nil] The maximum time a child instance can run without becoming ready, before it is terminated as unhealthy. - def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_timeout: nil, startup_timeout: nil, &block) + def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block) name ||= UNNAMED if mark?(key) @@ -226,11 +226,9 @@ def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_t return false end - # Allocate before the fiber so the closure captures the num and it stays unchanged - # across a restart (which re-enters `start` in the same fiber). Only release a num - # we allocated ourselves, and only when the worker permanently exits. - owned = instance_num.nil? - instance_num ||= acquire_instance_num + # Allocate before the fiber so the closure captures the ordinal and it stays + # unchanged across a restart (which re-enters `start` in the same fiber). + ordinal = acquire_ordinal @statistics.spawn! @@ -238,7 +236,7 @@ def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_t until @stopping Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) - child = self.start(name, instance_num: instance_num, &block) + child = self.start(name, ordinal: ordinal, &block) state = insert(key, child) # Notify policy of spawn @@ -312,31 +310,31 @@ def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_t end end ensure - release_instance_num(instance_num) if owned + release_ordinal(ordinal) end.resume return true end - # Allocate a container-scoped worker ordinal, recycling the lowest released num. + # Allocate a container-scoped worker ordinal, recycling the lowest released ordinal. # Allocation runs on the single cooperative reactor thread (acquire in the run loop, # release in the fiber's ensure), so no synchronisation is required. - protected def acquire_instance_num - unless @free_instance_nums.empty? - num = @free_instance_nums.min - @free_instance_nums.delete(num) - return num + protected def acquire_ordinal + unless @free_ordinals.empty? + ordinal = @free_ordinals.min + @free_ordinals.delete(ordinal) + return ordinal end - num = @next_instance_num - @next_instance_num += 1 - num + ordinal = @next_ordinal + @next_ordinal += 1 + ordinal end # Return a worker ordinal to the free set once its worker has permanently exited. Using a - # Set makes release idempotent, so a double release can't hand the same num to two workers. - protected def release_instance_num(instance_num) - @free_instance_nums.add(instance_num) + # Set makes release idempotent, so a double release can't hand the same ordinal to two workers. + protected def release_ordinal(ordinal) + @free_ordinals.add(ordinal) end # Run multiple instances of the same block in the container. diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index e23270f..d895540 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -27,7 +27,7 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio container = Threaded.new # Link each inner thread worker to this fork, so the thread instance can reach - # the durable process num via `instance.parent` / `instance.context`. + # the durable process ordinal via `instance.parent`. container.run(count: threads, health_check_timeout: health_check_timeout, **options) do |worker| worker.parent = instance block.call(worker) diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 5bc6af0..06ba5b9 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -5,7 +5,6 @@ require_relative "generic" require_relative "channel" -require_relative "context" require_relative "notify/pipe" module Async @@ -42,12 +41,10 @@ def error # Represents a running child thread from the point of view of the child thread. class Instance < Notify::Pipe - include Context - # Wrap an instance around the {Thread} instance from within the threaded child. # @parameter thread [Thread] The thread intance to wrap. - def self.for(thread, instance_num: nil) - instance = self.new(thread.out, num: instance_num) + def self.for(thread, ordinal: nil) + instance = self.new(thread.out, ordinal: ordinal) return instance end @@ -55,20 +52,18 @@ def self.for(thread, instance_num: nil) # Initialize the child thread instance. # # @parameter io [IO] The IO object to use for communication with the parent. - def initialize(io, num: nil) + def initialize(io, ordinal: nil) @thread = ::Thread.current - @num = num + @ordinal = ordinal super(io) end # @returns [Integer | Nil] The container-scoped ordinal of this worker. - attr :num + attr :ordinal - # @returns [Symbol] The kind of worker this instance represents. - def kind - :thread - end + # @returns [Object | Nil] The worker this one is nested inside. + attr_accessor :parent # Generate a hash representation of the thread. # @@ -78,7 +73,7 @@ def as_json(...) process_id: ::Process.pid, thread_id: @thread.object_id, name: @thread.name, - num: @num, + ordinal: @ordinal, } end @@ -124,12 +119,12 @@ def exec(*arguments, ready: true, **options) # Start a new child thread and execute the provided block in it. # # @parameter options [Hash] Additional options to to the new child instance. - def self.fork(instance_num: nil, **options) - self.new(instance_num: instance_num, **options) do |thread| + def self.fork(ordinal: nil, **options) + self.new(ordinal: ordinal, **options) do |thread| ::Thread.new do # This could be a configuration option (see forked implementation too): ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(thread, instance_num: instance_num) + yield Instance.for(thread, ordinal: ordinal) end end end @@ -138,10 +133,10 @@ def self.fork(instance_num: nil, **options) # Initialize the thread. # # @parameter name [String] The name to use for the child thread. - def initialize(name: nil, instance_num: nil, **options) + def initialize(name: nil, ordinal: nil, **options) super(**options) - @instance_num = instance_num + @ordinal = ordinal @status = nil @thread = yield(self) @@ -165,7 +160,7 @@ def initialize(name: nil, instance_num: nil, **options) end # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. - attr :instance_num + attr :ordinal # Convert the child process to a hash, suitable for serialization. # @@ -303,8 +298,8 @@ def finished(error = nil) # Start a named child thread and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, instance_num: nil, &block) - Child.fork(name: name, instance_num: instance_num, &block) + def start(name, ordinal: nil, &block) + Child.fork(name: name, ordinal: ordinal, &block) end end end diff --git a/test/async/container/instance_num.rb b/test/async/container/instance_num.rb deleted file mode 100644 index af25ac2..0000000 --- a/test/async/container/instance_num.rb +++ /dev/null @@ -1,118 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2026, by Udi Oron. - -require "async/container/threaded" -require "async/container/forked" -require "async/container/best" - -# Collect what each worker reports about its own instance context. -def collect_worker_context(container, count: 3, **options) - input, output = IO.pipe - container.run(count: count, **options) do |instance| - output.write("#{instance.num}:#{instance.kind}\n") - end - container.wait - output.close - input.read.lines.map(&:chomp) -ensure - input&.close unless input&.closed? -end - -describe Async::Container::Generic do - let(:container) {Async::Container::Threaded.new} - - with "#acquire_instance_num" do - it "assigns sequential nums starting at 0" do - expect(container.send(:acquire_instance_num)).to be == 0 - expect(container.send(:acquire_instance_num)).to be == 1 - expect(container.send(:acquire_instance_num)).to be == 2 - end - - it "reuses the lowest released num before extending the range" do - 3.times {container.send(:acquire_instance_num)} # => 0, 1, 2 - container.send(:release_instance_num, 1) - - expect(container.send(:acquire_instance_num)).to be == 1 # reused - expect(container.send(:acquire_instance_num)).to be == 3 # then extends - end - - it "does not hand out the same num twice when a num is released more than once" do - 2.times {container.send(:acquire_instance_num)} # => 0, 1 - container.send(:release_instance_num, 0) - container.send(:release_instance_num, 0) # double release must be idempotent - - expect(container.send(:acquire_instance_num)).to be == 0 # the recycled num - expect(container.send(:acquire_instance_num)).to be == 2 # not 0 again - end - end - - with "keyed reuse" do - it "does not allocate a num for a reused keyed child" do - container.spawn(key: :web) {sleep} # allocates 0, registers the key - reused = container.spawn(key: :web) {sleep} # mark? hit => returns before allocating - - expect(reused).to be == false - # If the second spawn had allocated, the next free num would be 2: - expect(container.send(:acquire_instance_num)).to be == 1 - ensure - container.stop(false) - end - end -end - -describe Async::Container::Threaded do - let(:container) {subject.new} - - it "exposes instance.num and instance.kind to the worker (kind: :thread)" do - reported = collect_worker_context(container, count: 3) - - nums = reported.map {|line| line.split(":").first.to_i} - kinds = reported.map {|line| line.split(":").last}.uniq - - expect(nums.sort).to be == [0, 1, 2] - expect(kinds).to be == ["thread"] - expect(container.statistics).to have_attributes(failures: be == 0) - end - - it "preserves instance.num across a restart" do - trigger = IO.pipe - nums = IO.pipe - - runner = Thread.new do - container.spawn(restart: true) do |instance| - nums.last.puts(instance.num.to_s) - trigger.first.gets # block until told to exit, then the worker restarts - end - container.wait - end - - reported = [] - 2.times do - reported << nums.first.gets.to_i - trigger.last.puts("die") - end - - runner.kill - runner.join - - # Same num allocated for both incarnations (num is captured outside the restart loop): - expect(reported).to be == [reported.first, reported.first] - end -end - -describe Async::Container::Forked do - let(:container) {subject.new} - - it "exposes instance.num and instance.kind to the worker (kind: :process)" do - reported = collect_worker_context(container, count: 3) - - nums = reported.map {|line| line.split(":").first.to_i} - kinds = reported.map {|line| line.split(":").last}.uniq - - expect(nums.sort).to be == [0, 1, 2] - expect(kinds).to be == ["process"] - expect(container.statistics).to have_attributes(failures: be == 0) - end -end if Async::Container.fork? diff --git a/test/async/container/ordinal.rb b/test/async/container/ordinal.rb new file mode 100644 index 0000000..2ca6a85 --- /dev/null +++ b/test/async/container/ordinal.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Udi Oron. + +require "async/container/threaded" +require "async/container/forked" +require "async/container/best" + +# Collect what each worker reports about its own ordinal. +def collect_worker_ordinals(container, count: 3, **options) + input, output = IO.pipe + container.run(count: count, **options) do |instance| + output.write("#{instance.ordinal}\n") + end + container.wait + output.close + input.read.lines.map(&:to_i) +ensure + input&.close unless input&.closed? +end + +describe Async::Container::Generic do + let(:container) {Async::Container::Threaded.new} + + with "#acquire_ordinal" do + it "assigns sequential ordinals starting at 0" do + expect(container.send(:acquire_ordinal)).to be == 0 + expect(container.send(:acquire_ordinal)).to be == 1 + expect(container.send(:acquire_ordinal)).to be == 2 + end + + it "reuses the lowest released ordinal before extending the range" do + 3.times{container.send(:acquire_ordinal)} # => 0, 1, 2 + container.send(:release_ordinal, 1) + + expect(container.send(:acquire_ordinal)).to be == 1 # reused + expect(container.send(:acquire_ordinal)).to be == 3 # then extends + end + + it "does not hand out the same ordinal twice when an ordinal is released more than once" do + 2.times{container.send(:acquire_ordinal)} # => 0, 1 + container.send(:release_ordinal, 0) + container.send(:release_ordinal, 0) # double release must be idempotent + + expect(container.send(:acquire_ordinal)).to be == 0 # the recycled ordinal + expect(container.send(:acquire_ordinal)).to be == 2 # not 0 again + end + end + + with "keyed reuse" do + it "does not allocate an ordinal for a reused keyed child" do + container.spawn(key: :web){sleep} # allocates 0, registers the key + reused = container.spawn(key: :web){sleep} # mark? hit => returns before allocating + + expect(reused).to be == false + # If the second spawn had allocated, the next free ordinal would be 2: + expect(container.send(:acquire_ordinal)).to be == 1 + ensure + container.stop(false) + end + end + + with "injected ordinals" do + it "does not accept an ordinal option via run" do + expect do + container.run(count: 2, ordinal: 7){sleep} + end.to raise_exception(ArgumentError, message: be =~ /unknown keyword: :ordinal/) + end + end +end + +describe Async::Container::Threaded do + let(:container) {subject.new} + + it "exposes instance.ordinal to the worker" do + ordinals = collect_worker_ordinals(container, count: 3) + + expect(ordinals.sort).to be == [0, 1, 2] + expect(container.statistics).to have_attributes(failures: be == 0) + end + + it "preserves instance.ordinal across a restart" do + trigger = IO.pipe + ordinals = IO.pipe + + runner = Thread.new do + container.spawn(restart: true) do |instance| + ordinals.last.puts(instance.ordinal.to_s) + trigger.first.gets # block until told to exit, then the worker restarts + end + container.wait + end + + reported = [] + 2.times do + reported << ordinals.first.gets.to_i + trigger.last.puts("die") + end + + runner.kill + runner.join + + # Same ordinal allocated for both incarnations (ordinal is captured outside the restart loop): + expect(reported).to be == [reported.first, reported.first] + end +end + +describe Async::Container::Forked do + let(:container) {subject.new} + + it "exposes instance.ordinal to the worker" do + ordinals = collect_worker_ordinals(container, count: 3) + + expect(ordinals.sort).to be == [0, 1, 2] + expect(container.statistics).to have_attributes(failures: be == 0) + end +end if Async::Container.fork? diff --git a/test/async/container/context.rb b/test/async/container/parent.rb similarity index 50% rename from test/async/container/context.rb rename to test/async/container/parent.rb index cd50056..12a900e 100644 --- a/test/async/container/context.rb +++ b/test/async/container/parent.rb @@ -22,43 +22,35 @@ def report_from_worker(container, **run_options) end describe Async::Container::Threaded do - it "exposes a single :thread frame and no parent via instance.context" do + it "has no parent" do reported = report_from_worker(subject.new, count: 1) do |instance| - "#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}" + "ordinal=#{instance.ordinal} parent=#{instance.parent.inspect}" end - - expect(reported).to be == ["thread:0|parent=nil"] + + expect(reported).to be == ["ordinal=0 parent=nil"] end end describe Async::Container::Forked do - it "exposes a single :process frame and no parent via instance.context" do + it "has no parent" do reported = report_from_worker(subject.new, count: 1) do |instance| - "#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}" + "ordinal=#{instance.ordinal} parent=#{instance.parent.inspect}" end - - expect(reported).to be == ["process:0|parent=nil"] + + expect(reported).to be == ["ordinal=0 parent=nil"] end end if Async::Container.fork? describe Async::Container::Hybrid do - it "exposes [:process, :thread] frames via instance.context" do - reported = report_from_worker(subject.new, count: 1, forks: 1, threads: 1) do |instance| - instance.context.map {|f| f.kind}.join(",") - end - - expect(reported).to be == ["process,thread"] - end - - it "reaches the durable forked num through instance.parent (not the thread num)" do + it "reaches the durable forked ordinal through instance.parent (not the thread ordinal)" do reported = report_from_worker(subject.new, count: 2, forks: 2, threads: 1) do |instance| - "#{instance.kind}/#{instance.num} parent=#{instance.parent&.kind}/#{instance.parent&.num}" + "thread=#{instance.ordinal} parent=#{instance.parent&.ordinal}" end - - # Both workers are thread num 0 within their fork; the durable forked num is on the parent. + + # Both workers are thread ordinal 0 within their fork; the durable forked ordinal is on the parent. expect(reported.sort).to be == [ - "thread/0 parent=process/0", - "thread/0 parent=process/1", + "thread=0 parent=0", + "thread=0 parent=1", ] end end if Async::Container.fork? From 7365d064aa3f50a6b7506a8009463176e22fd1fa Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 14:06:19 +1200 Subject: [PATCH 3/5] Extract ordinal allocation policy Assisted-By: devx/c78f867c-4c73-40b2-a763-4a9332e15ef9 --- lib/async/container/forked.rb | 26 ++++--- lib/async/container/generic.rb | 39 ++-------- lib/async/container/hybrid.rb | 7 +- lib/async/container/ordinals.rb | 107 +++++++++++++++++++++++++ lib/async/container/threaded.rb | 26 ++++--- test/async/container/hybrid.rb | 9 ++- test/async/container/ordinal.rb | 134 +++++++++++++++++++++++++++----- test/async/container/parent.rb | 5 +- 8 files changed, 278 insertions(+), 75 deletions(-) create mode 100644 lib/async/container/ordinals.rb diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index a9512a0..effb706 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -24,8 +24,8 @@ class Child < Channel class Instance < Notify::Pipe # Wrap an instance around the {Process} instance from within the forked child. # @parameter process [Process] The process intance to wrap. - def self.for(process, ordinal: nil) - instance = self.new(process.out, ordinal: ordinal) + def self.for(process, ordinal: nil, ordinals: nil) + instance = self.new(process.out, ordinal: ordinal, ordinals: ordinals) # The child process won't be reading from the channel: process.close_read @@ -38,16 +38,20 @@ def self.for(process, ordinal: nil) # Initialize the child process instance. # # @parameter io [IO] The IO object to use for communication. - def initialize(io, ordinal: nil) + def initialize(io, ordinal: nil, ordinals: nil) super(io) @name = nil @ordinal = ordinal + @ordinals = ordinals end # @returns [Integer | Nil] The container-scoped ordinal of this worker. attr :ordinal + # @returns [Ordinals | Nil] An optional ordinal allocator assigned to this worker for implementation-detail child containers. + attr :ordinals + # @returns [Object | Nil] The worker this one is nested inside. attr_accessor :parent @@ -106,9 +110,9 @@ def exec(*arguments, ready: true, **options) # Fork a child process appropriate for a container. # # @returns [Process] - def self.fork(ordinal: nil, **options) + def self.fork(ordinal: nil, ordinals: nil, **options) # $stderr.puts fork: caller - self.new(ordinal: ordinal, **options) do |process| + self.new(ordinal: ordinal, ordinals: ordinals, **options) do |process| ::Process.fork do # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. Signal.trap(:INT){::Thread.current.raise(Interrupt)} @@ -117,7 +121,7 @@ def self.fork(ordinal: nil, **options) # This could be a configuration option: ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(process, ordinal: ordinal) + yield Instance.for(process, ordinal: ordinal, ordinals: ordinals) rescue Interrupt # Graceful exit. rescue Exception => error @@ -146,11 +150,12 @@ def self.spawn(*arguments, name: nil, **options) # Initialize the process. # @parameter name [String] The name to use for the child process. - def initialize(name: nil, ordinal: nil, **options) + def initialize(name: nil, ordinal: nil, ordinals: nil, **options) super(**options) @name = name @ordinal = ordinal + @ordinals = ordinals @status = nil @pid = nil @@ -197,6 +202,9 @@ def name= value # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. attr :ordinal + # @attribute [Ordinals | Nil] An optional ordinal allocator assigned to the worker for implementation-detail child containers. + attr :ordinals + # A human readable representation of the process. # @returns [String] def inspect @@ -277,8 +285,8 @@ def wait(timeout = 0.1) # Start a named child process and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, ordinal: nil, &block) - Child.fork(name: name, ordinal: ordinal, &block) + def start(name, ordinal: nil, ordinals: nil, &block) + Child.fork(name: name, ordinal: ordinal, ordinals: ordinals, &block) end end end diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index cebee88..d4c54d9 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -9,6 +9,7 @@ require_relative "group" require_relative "keyed" +require_relative "ordinals" require_relative "statistics" require_relative "policy" @@ -45,7 +46,7 @@ def self.run(...) # # @parameter policy [Policy] The policy to use for managing child lifecycle events. # @parameter options [Hash] Options passed to the {Group} instance. - def initialize(policy: Policy::DEFAULT, **options) + def initialize(policy: Policy::DEFAULT, ordinals: nil, **options) @group = Group.new(**options) @stopping = false @@ -54,11 +55,7 @@ def initialize(policy: Policy::DEFAULT, **options) @policy = policy @statistics = @policy.make_statistics @keyed = {} - # Container-scoped allocation of worker ordinals: a monotonic - # counter plus a free set, so an ordinal released by a permanently exited worker is - # recycled, keeping the range compact (e.g. for multiprocess metric files). - @next_ordinal = 0 - @free_ordinals = Set.new + @ordinals = ordinals || Ordinals::Sequential.new end # @attribute [Group] The group of running children instances. @@ -216,9 +213,10 @@ def stop(timeout = true) # @parameter name [String] The name of the child instance. # @parameter restart [Boolean] Whether to restart the child instance if it fails. # @parameter key [Symbol] A key used for reloading child instances. + # @parameter ordinals [Ordinals | Nil] An optional ordinal allocator assigned to this worker for implementation-detail child containers. # @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy. # @parameter startup_timeout [Numeric | Nil] The maximum time a child instance can run without becoming ready, before it is terminated as unhealthy. - def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block) + def spawn(name: nil, restart: false, key: nil, ordinals: nil, health_check_timeout: nil, startup_timeout: nil, &block) name ||= UNNAMED if mark?(key) @@ -228,7 +226,7 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu # Allocate before the fiber so the closure captures the ordinal and it stays # unchanged across a restart (which re-enters `start` in the same fiber). - ordinal = acquire_ordinal + ordinal = @ordinals.acquire @statistics.spawn! @@ -236,7 +234,7 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu until @stopping Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) - child = self.start(name, ordinal: ordinal, &block) + child = self.start(name, ordinal: ordinal, ordinals: ordinals, &block) state = insert(key, child) # Notify policy of spawn @@ -310,33 +308,12 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu end end ensure - release_ordinal(ordinal) + @ordinals.release([ordinal]) end.resume return true end - # Allocate a container-scoped worker ordinal, recycling the lowest released ordinal. - # Allocation runs on the single cooperative reactor thread (acquire in the run loop, - # release in the fiber's ensure), so no synchronisation is required. - protected def acquire_ordinal - unless @free_ordinals.empty? - ordinal = @free_ordinals.min - @free_ordinals.delete(ordinal) - return ordinal - end - - ordinal = @next_ordinal - @next_ordinal += 1 - ordinal - end - - # Return a worker ordinal to the free set once its worker has permanently exited. Using a - # Set makes release idempotent, so a double release can't hand the same ordinal to two workers. - protected def release_ordinal(ordinal) - @free_ordinals.add(ordinal) - end - # Run multiple instances of the same block in the container. # @parameter count [Integer] The number of instances to start. def run(count: Container.processor_count, **options, &block) diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index d895540..9386950 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -21,10 +21,13 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio count ||= processor_count ** 2 forks ||= [processor_count, count].min threads ||= (count / forks).ceil + worker_ordinals = Ordinals::Sequential.new forks.times do - self.spawn(**options) do |instance| - container = Threaded.new + ordinals = worker_ordinals.reserve(threads) + + self.spawn(ordinals: ordinals, **options) do |instance| + container = Threaded.new(ordinals: instance.ordinals) # Link each inner thread worker to this fork, so the thread instance can reach # the durable process ordinal via `instance.parent`. diff --git a/lib/async/container/ordinals.rb b/lib/async/container/ordinals.rb new file mode 100644 index 0000000..a83dfb0 --- /dev/null +++ b/lib/async/container/ordinals.rb @@ -0,0 +1,107 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "set" + +module Async + module Container + # Allocates stable ordinal values for container workers. + module Ordinals + # Raised when no ordinal can be allocated. + class Exhausted < RuntimeError + end + + # Base class for ordinal allocators. + class Allocator + # Reserve a fixed pool of ordinals from this allocator. + # @parameter count [Integer] The number of ordinals to reserve. + # @returns [Fixed] The reserved ordinals as a fixed allocator. + def reserve(count) + ordinals = [] + + count.times do + ordinals << acquire + end + + return Fixed.new(ordinals) + rescue + release(ordinals) unless ordinals.empty? + raise + end + end + + # A sequential ordinal allocator with lowest-released ordinal reuse. + class Sequential < Allocator + def initialize(initial = 0) + @next = initial + @free = Set.new + end + + # Allocate the next available ordinal. + def acquire + unless @free.empty? + ordinal = @free.min + @free.delete(ordinal) + return ordinal + end + + ordinal = @next + @next += 1 + return ordinal + end + + # Return ordinals to the allocator. + # @parameter ordinals [Enumerable(Integer)] The ordinals to release. + def release(ordinals) + ordinals.each do |ordinal| + @free.add(ordinal) + end + end + end + + # An allocator backed by a fixed set of ordinals. + class Fixed < Allocator + include Enumerable + + def initialize(ordinals) + @ordinals = ordinals.to_a.freeze + @set = @ordinals.to_set.freeze + @free = @ordinals.to_set + end + + # @attribute [Array(Integer)] The ordinals managed by this allocator. + attr :ordinals + + # Enumerate the ordinals managed by this allocator. + def each(&block) + @ordinals.each(&block) + end + + # Allocate the lowest available ordinal from the fixed pool. + def acquire + unless @free.empty? + ordinal = @free.min + @free.delete(ordinal) + return ordinal + end + + raise Exhausted, "No ordinals available!" + end + + # Return ordinals to the fixed pool. + # @parameter ordinals [Enumerable(Integer)] The ordinals to release. + def release(ordinals) + ordinals.each do |ordinal| + unless @set.include?(ordinal) + raise ArgumentError, "Cannot release ordinal #{ordinal.inspect} to #{self.class}!" + end + + @free.add(ordinal) + end + end + end + end + end +end diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 06ba5b9..4b199a0 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -43,8 +43,8 @@ def error class Instance < Notify::Pipe # Wrap an instance around the {Thread} instance from within the threaded child. # @parameter thread [Thread] The thread intance to wrap. - def self.for(thread, ordinal: nil) - instance = self.new(thread.out, ordinal: ordinal) + def self.for(thread, ordinal: nil, ordinals: nil) + instance = self.new(thread.out, ordinal: ordinal, ordinals: ordinals) return instance end @@ -52,9 +52,10 @@ def self.for(thread, ordinal: nil) # Initialize the child thread instance. # # @parameter io [IO] The IO object to use for communication with the parent. - def initialize(io, ordinal: nil) + def initialize(io, ordinal: nil, ordinals: nil) @thread = ::Thread.current @ordinal = ordinal + @ordinals = ordinals super(io) end @@ -62,6 +63,9 @@ def initialize(io, ordinal: nil) # @returns [Integer | Nil] The container-scoped ordinal of this worker. attr :ordinal + # @returns [Ordinals | Nil] An optional ordinal allocator assigned to this worker for implementation-detail child containers. + attr :ordinals + # @returns [Object | Nil] The worker this one is nested inside. attr_accessor :parent @@ -119,12 +123,12 @@ def exec(*arguments, ready: true, **options) # Start a new child thread and execute the provided block in it. # # @parameter options [Hash] Additional options to to the new child instance. - def self.fork(ordinal: nil, **options) - self.new(ordinal: ordinal, **options) do |thread| + def self.fork(ordinal: nil, ordinals: nil, **options) + self.new(ordinal: ordinal, ordinals: ordinals, **options) do |thread| ::Thread.new do # This could be a configuration option (see forked implementation too): ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(thread, ordinal: ordinal) + yield Instance.for(thread, ordinal: ordinal, ordinals: ordinals) end end end @@ -133,10 +137,11 @@ def self.fork(ordinal: nil, **options) # Initialize the thread. # # @parameter name [String] The name to use for the child thread. - def initialize(name: nil, ordinal: nil, **options) + def initialize(name: nil, ordinal: nil, ordinals: nil, **options) super(**options) @ordinal = ordinal + @ordinals = ordinals @status = nil @thread = yield(self) @@ -162,6 +167,9 @@ def initialize(name: nil, ordinal: nil, **options) # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. attr :ordinal + # @attribute [Ordinals | Nil] An optional ordinal allocator assigned to the worker for implementation-detail child containers. + attr :ordinals + # Convert the child process to a hash, suitable for serialization. # # @returns [Hash] The request as a hash. @@ -298,8 +306,8 @@ def finished(error = nil) # Start a named child thread and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, ordinal: nil, &block) - Child.fork(name: name, ordinal: ordinal, &block) + def start(name, ordinal: nil, ordinals: nil, &block) + Child.fork(name: name, ordinal: ordinal, ordinals: ordinals, &block) end end end diff --git a/test/async/container/hybrid.rb b/test/async/container/hybrid.rb index 27b8848..8f21976 100644 --- a/test/async/container/hybrid.rb +++ b/test/async/container/hybrid.rb @@ -19,6 +19,9 @@ interrupt_count = 0 threaded_class = Class.new + threaded_class.define_method(:initialize) do |**options| + @options = options + end threaded_class.define_method(:run) do |**options, &block| self end @@ -38,8 +41,12 @@ end container_class = Class.new(subject) do - def spawn(**options, &block) + def spawn(ordinals: nil, **options, &block) instance = Object.new + instance.define_singleton_method(:ordinals) do + ordinals + end + def instance.ready! end diff --git a/test/async/container/ordinal.rb b/test/async/container/ordinal.rb index 2ca6a85..22953d2 100644 --- a/test/async/container/ordinal.rb +++ b/test/async/container/ordinal.rb @@ -5,6 +5,8 @@ require "async/container/threaded" require "async/container/forked" +require "async/container/hybrid" +require "async/container/ordinals" require "async/container/best" # Collect what each worker reports about its own ordinal. @@ -23,41 +25,41 @@ def collect_worker_ordinals(container, count: 3, **options) describe Async::Container::Generic do let(:container) {Async::Container::Threaded.new} - with "#acquire_ordinal" do + with "ordinal allocation" do it "assigns sequential ordinals starting at 0" do - expect(container.send(:acquire_ordinal)).to be == 0 - expect(container.send(:acquire_ordinal)).to be == 1 - expect(container.send(:acquire_ordinal)).to be == 2 - end - - it "reuses the lowest released ordinal before extending the range" do - 3.times{container.send(:acquire_ordinal)} # => 0, 1, 2 - container.send(:release_ordinal, 1) + ordinals = collect_worker_ordinals(container, count: 3) - expect(container.send(:acquire_ordinal)).to be == 1 # reused - expect(container.send(:acquire_ordinal)).to be == 3 # then extends + expect(ordinals.sort).to be == [0, 1, 2] end - it "does not hand out the same ordinal twice when an ordinal is released more than once" do - 2.times{container.send(:acquire_ordinal)} # => 0, 1 - container.send(:release_ordinal, 0) - container.send(:release_ordinal, 0) # double release must be idempotent + it "reuses the lowest released ordinal before extending the range" do + first = collect_worker_ordinals(container, count: 3) + second = collect_worker_ordinals(container, count: 1) - expect(container.send(:acquire_ordinal)).to be == 0 # the recycled ordinal - expect(container.send(:acquire_ordinal)).to be == 2 # not 0 again + expect(first.sort).to be == [0, 1, 2] + expect(second).to be == [0] end end with "keyed reuse" do it "does not allocate an ordinal for a reused keyed child" do - container.spawn(key: :web){sleep} # allocates 0, registers the key + input, output = IO.pipe + + container.spawn(key: :web) do |instance| + output.puts(instance.ordinal) + sleep + end + + expect(input.gets.to_i).to be == 0 + reused = container.spawn(key: :web){sleep} # mark? hit => returns before allocating expect(reused).to be == false - # If the second spawn had allocated, the next free ordinal would be 2: - expect(container.send(:acquire_ordinal)).to be == 1 + expect(container.instance_variable_get(:@ordinals).acquire).to be == 1 ensure container.stop(false) + input&.close unless input&.closed? + output&.close unless output&.closed? end end @@ -70,6 +72,68 @@ def collect_worker_ordinals(container, count: 3, **options) end end +describe Async::Container::Ordinals::Sequential do + let(:ordinals) {subject.new} + + it "assigns sequential ordinals starting at 0" do + expect(ordinals.acquire).to be == 0 + expect(ordinals.acquire).to be == 1 + expect(ordinals.acquire).to be == 2 + end + + it "can start at an initial ordinal" do + ordinals = subject.new(10) + + expect(ordinals.acquire).to be == 10 + expect(ordinals.acquire).to be == 11 + end + + it "reuses the lowest released ordinal before extending the range" do + 3.times{ordinals.acquire} # => 0, 1, 2 + ordinals.release([1]) + + expect(ordinals.acquire).to be == 1 # reused + expect(ordinals.acquire).to be == 3 # then extends + end + + it "does not hand out the same ordinal twice when an ordinal is released more than once" do + 2.times{ordinals.acquire} # => 0, 1 + ordinals.release([0]) + ordinals.release([0]) # double release must be idempotent + + expect(ordinals.acquire).to be == 0 # the recycled ordinal + expect(ordinals.acquire).to be == 2 # not 0 again + end + + it "reserves ordinals as a fixed allocator" do + reserved = ordinals.reserve(3) + + expect(reserved).to be_a(Async::Container::Ordinals::Fixed) + expect(reserved.to_a).to be == [0, 1, 2] + expect(ordinals.acquire).to be == 3 + end +end + +describe Async::Container::Ordinals::Fixed do + let(:ordinals) {subject.new([5, 7])} + + it "allocates from the fixed pool" do + expect(ordinals.acquire).to be == 5 + expect(ordinals.acquire).to be == 7 + expect{ordinals.acquire}.to raise_exception(Async::Container::Ordinals::Exhausted) + end + + it "can release ordinals back to the fixed pool" do + expect(ordinals.acquire).to be == 5 + ordinals.release([5]) + expect(ordinals.acquire).to be == 5 + end + + it "rejects ordinals outside the fixed pool" do + expect{ordinals.release([6])}.to raise_exception(ArgumentError) + end +end + describe Async::Container::Threaded do let(:container) {subject.new} @@ -104,6 +168,25 @@ def collect_worker_ordinals(container, count: 3, **options) # Same ordinal allocated for both incarnations (ordinal is captured outside the restart loop): expect(reported).to be == [reported.first, reported.first] end + + it "does not inherit ordinals into independently managed child containers" do + input, output = IO.pipe + + container.run(count: 1) do |instance| + child = subject.new + child_ordinals = collect_worker_ordinals(child, count: 2) + + output.puts("parent=#{instance.ordinal} child=#{child_ordinals.sort.join(",")}") + end + + container.wait + output.close + reported = input.read.lines.map(&:chomp) + + expect(reported).to be == ["parent=0 child=0,1"] + ensure + input&.close unless input&.closed? + end end describe Async::Container::Forked do @@ -116,3 +199,14 @@ def collect_worker_ordinals(container, count: 3, **options) expect(container.statistics).to have_attributes(failures: be == 0) end end if Async::Container.fork? + +describe Async::Container::Hybrid do + let(:container) {subject.new} + + it "assigns unique worker ordinals across forked threaded workers" do + ordinals = collect_worker_ordinals(container, count: 4, forks: 2, threads: 2) + + expect(ordinals.sort).to be == [0, 1, 2, 3] + expect(container.statistics).to have_attributes(failures: be == 0) + end +end if Async::Container.fork? diff --git a/test/async/container/parent.rb b/test/async/container/parent.rb index 12a900e..23b1946 100644 --- a/test/async/container/parent.rb +++ b/test/async/container/parent.rb @@ -42,15 +42,14 @@ def report_from_worker(container, **run_options) end if Async::Container.fork? describe Async::Container::Hybrid do - it "reaches the durable forked ordinal through instance.parent (not the thread ordinal)" do + it "reaches the durable forked ordinal through instance.parent" do reported = report_from_worker(subject.new, count: 2, forks: 2, threads: 1) do |instance| "thread=#{instance.ordinal} parent=#{instance.parent&.ordinal}" end - # Both workers are thread ordinal 0 within their fork; the durable forked ordinal is on the parent. expect(reported.sort).to be == [ "thread=0 parent=0", - "thread=0 parent=1", + "thread=1 parent=1", ] end end if Async::Container.fork? From 2fee172ef93e32f5d21d727c419907703886238e Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 14:27:35 +1200 Subject: [PATCH 4/5] Anchor hybrid worker ordinals Assisted-By: devx/c78f867c-4c73-40b2-a763-4a9332e15ef9 --- lib/async/container/forked.rb | 26 +++++++++----------------- lib/async/container/generic.rb | 5 ++--- lib/async/container/hybrid.rb | 9 ++++----- lib/async/container/ordinals.rb | 9 ++++----- lib/async/container/threaded.rb | 26 +++++++++----------------- test/async/container/hybrid.rb | 6 +++--- 6 files changed, 31 insertions(+), 50 deletions(-) diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index effb706..a9512a0 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -24,8 +24,8 @@ class Child < Channel class Instance < Notify::Pipe # Wrap an instance around the {Process} instance from within the forked child. # @parameter process [Process] The process intance to wrap. - def self.for(process, ordinal: nil, ordinals: nil) - instance = self.new(process.out, ordinal: ordinal, ordinals: ordinals) + def self.for(process, ordinal: nil) + instance = self.new(process.out, ordinal: ordinal) # The child process won't be reading from the channel: process.close_read @@ -38,20 +38,16 @@ def self.for(process, ordinal: nil, ordinals: nil) # Initialize the child process instance. # # @parameter io [IO] The IO object to use for communication. - def initialize(io, ordinal: nil, ordinals: nil) + def initialize(io, ordinal: nil) super(io) @name = nil @ordinal = ordinal - @ordinals = ordinals end # @returns [Integer | Nil] The container-scoped ordinal of this worker. attr :ordinal - # @returns [Ordinals | Nil] An optional ordinal allocator assigned to this worker for implementation-detail child containers. - attr :ordinals - # @returns [Object | Nil] The worker this one is nested inside. attr_accessor :parent @@ -110,9 +106,9 @@ def exec(*arguments, ready: true, **options) # Fork a child process appropriate for a container. # # @returns [Process] - def self.fork(ordinal: nil, ordinals: nil, **options) + def self.fork(ordinal: nil, **options) # $stderr.puts fork: caller - self.new(ordinal: ordinal, ordinals: ordinals, **options) do |process| + self.new(ordinal: ordinal, **options) do |process| ::Process.fork do # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. Signal.trap(:INT){::Thread.current.raise(Interrupt)} @@ -121,7 +117,7 @@ def self.fork(ordinal: nil, ordinals: nil, **options) # This could be a configuration option: ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(process, ordinal: ordinal, ordinals: ordinals) + yield Instance.for(process, ordinal: ordinal) rescue Interrupt # Graceful exit. rescue Exception => error @@ -150,12 +146,11 @@ def self.spawn(*arguments, name: nil, **options) # Initialize the process. # @parameter name [String] The name to use for the child process. - def initialize(name: nil, ordinal: nil, ordinals: nil, **options) + def initialize(name: nil, ordinal: nil, **options) super(**options) @name = name @ordinal = ordinal - @ordinals = ordinals @status = nil @pid = nil @@ -202,9 +197,6 @@ def name= value # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. attr :ordinal - # @attribute [Ordinals | Nil] An optional ordinal allocator assigned to the worker for implementation-detail child containers. - attr :ordinals - # A human readable representation of the process. # @returns [String] def inspect @@ -285,8 +277,8 @@ def wait(timeout = 0.1) # Start a named child process and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, ordinal: nil, ordinals: nil, &block) - Child.fork(name: name, ordinal: ordinal, ordinals: ordinals, &block) + def start(name, ordinal: nil, &block) + Child.fork(name: name, ordinal: ordinal, &block) end end end diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index d4c54d9..da979dd 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -213,10 +213,9 @@ def stop(timeout = true) # @parameter name [String] The name of the child instance. # @parameter restart [Boolean] Whether to restart the child instance if it fails. # @parameter key [Symbol] A key used for reloading child instances. - # @parameter ordinals [Ordinals | Nil] An optional ordinal allocator assigned to this worker for implementation-detail child containers. # @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy. # @parameter startup_timeout [Numeric | Nil] The maximum time a child instance can run without becoming ready, before it is terminated as unhealthy. - def spawn(name: nil, restart: false, key: nil, ordinals: nil, health_check_timeout: nil, startup_timeout: nil, &block) + def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block) name ||= UNNAMED if mark?(key) @@ -234,7 +233,7 @@ def spawn(name: nil, restart: false, key: nil, ordinals: nil, health_check_timeo until @stopping Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) - child = self.start(name, ordinal: ordinal, ordinals: ordinals, &block) + child = self.start(name, ordinal: ordinal, &block) state = insert(key, child) # Notify policy of spawn diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index 9386950..1c0c12e 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -21,13 +21,12 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio count ||= processor_count ** 2 forks ||= [processor_count, count].min threads ||= (count / forks).ceil - worker_ordinals = Ordinals::Sequential.new forks.times do - ordinals = worker_ordinals.reserve(threads) - - self.spawn(ordinals: ordinals, **options) do |instance| - container = Threaded.new(ordinals: instance.ordinals) + self.spawn(**options) do |instance| + first_ordinal = instance.ordinal * threads + ordinals = Ordinals::Fixed.new(first_ordinal...(first_ordinal + threads)) + container = Threaded.new(ordinals: ordinals) # Link each inner thread worker to this fork, so the thread instance can reach # the durable process ordinal via `instance.parent`. diff --git a/lib/async/container/ordinals.rb b/lib/async/container/ordinals.rb index a83dfb0..9241c90 100644 --- a/lib/async/container/ordinals.rb +++ b/lib/async/container/ordinals.rb @@ -66,12 +66,11 @@ class Fixed < Allocator include Enumerable def initialize(ordinals) - @ordinals = ordinals.to_a.freeze - @set = @ordinals.to_set.freeze - @free = @ordinals.to_set + @ordinals = ordinals.to_set.freeze + @free = @ordinals.dup end - # @attribute [Array(Integer)] The ordinals managed by this allocator. + # @attribute [Set(Integer)] The ordinals managed by this allocator. attr :ordinals # Enumerate the ordinals managed by this allocator. @@ -94,7 +93,7 @@ def acquire # @parameter ordinals [Enumerable(Integer)] The ordinals to release. def release(ordinals) ordinals.each do |ordinal| - unless @set.include?(ordinal) + unless @ordinals.include?(ordinal) raise ArgumentError, "Cannot release ordinal #{ordinal.inspect} to #{self.class}!" end diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 4b199a0..06ba5b9 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -43,8 +43,8 @@ def error class Instance < Notify::Pipe # Wrap an instance around the {Thread} instance from within the threaded child. # @parameter thread [Thread] The thread intance to wrap. - def self.for(thread, ordinal: nil, ordinals: nil) - instance = self.new(thread.out, ordinal: ordinal, ordinals: ordinals) + def self.for(thread, ordinal: nil) + instance = self.new(thread.out, ordinal: ordinal) return instance end @@ -52,10 +52,9 @@ def self.for(thread, ordinal: nil, ordinals: nil) # Initialize the child thread instance. # # @parameter io [IO] The IO object to use for communication with the parent. - def initialize(io, ordinal: nil, ordinals: nil) + def initialize(io, ordinal: nil) @thread = ::Thread.current @ordinal = ordinal - @ordinals = ordinals super(io) end @@ -63,9 +62,6 @@ def initialize(io, ordinal: nil, ordinals: nil) # @returns [Integer | Nil] The container-scoped ordinal of this worker. attr :ordinal - # @returns [Ordinals | Nil] An optional ordinal allocator assigned to this worker for implementation-detail child containers. - attr :ordinals - # @returns [Object | Nil] The worker this one is nested inside. attr_accessor :parent @@ -123,12 +119,12 @@ def exec(*arguments, ready: true, **options) # Start a new child thread and execute the provided block in it. # # @parameter options [Hash] Additional options to to the new child instance. - def self.fork(ordinal: nil, ordinals: nil, **options) - self.new(ordinal: ordinal, ordinals: ordinals, **options) do |thread| + def self.fork(ordinal: nil, **options) + self.new(ordinal: ordinal, **options) do |thread| ::Thread.new do # This could be a configuration option (see forked implementation too): ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(thread, ordinal: ordinal, ordinals: ordinals) + yield Instance.for(thread, ordinal: ordinal) end end end @@ -137,11 +133,10 @@ def self.fork(ordinal: nil, ordinals: nil, **options) # Initialize the thread. # # @parameter name [String] The name to use for the child thread. - def initialize(name: nil, ordinal: nil, ordinals: nil, **options) + def initialize(name: nil, ordinal: nil, **options) super(**options) @ordinal = ordinal - @ordinals = ordinals @status = nil @thread = yield(self) @@ -167,9 +162,6 @@ def initialize(name: nil, ordinal: nil, ordinals: nil, **options) # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. attr :ordinal - # @attribute [Ordinals | Nil] An optional ordinal allocator assigned to the worker for implementation-detail child containers. - attr :ordinals - # Convert the child process to a hash, suitable for serialization. # # @returns [Hash] The request as a hash. @@ -306,8 +298,8 @@ def finished(error = nil) # Start a named child thread and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, ordinal: nil, ordinals: nil, &block) - Child.fork(name: name, ordinal: ordinal, ordinals: ordinals, &block) + def start(name, ordinal: nil, &block) + Child.fork(name: name, ordinal: ordinal, &block) end end end diff --git a/test/async/container/hybrid.rb b/test/async/container/hybrid.rb index 8f21976..280b547 100644 --- a/test/async/container/hybrid.rb +++ b/test/async/container/hybrid.rb @@ -41,10 +41,10 @@ end container_class = Class.new(subject) do - def spawn(ordinals: nil, **options, &block) + def spawn(**options, &block) instance = Object.new - instance.define_singleton_method(:ordinals) do - ordinals + instance.define_singleton_method(:ordinal) do + 0 end def instance.ready! From 44f0baa057ebdc986c498f0a2cc19b1de85067fc Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 26 Jun 2026 14:44:27 +1200 Subject: [PATCH 5/5] Simplify ordinal allocation API Assisted-By: devx/c78f867c-4c73-40b2-a763-4a9332e15ef9 --- lib/async/container/forked.rb | 3 -- lib/async/container/generic.rb | 2 +- lib/async/container/hybrid.rb | 7 ++- lib/async/container/ordinals.rb | 81 +++++++++++++++++++-------------- lib/async/container/threaded.rb | 3 -- test/async/container/ordinal.rb | 43 ++++++++++++++--- test/async/container/parent.rb | 55 ---------------------- 7 files changed, 86 insertions(+), 108 deletions(-) delete mode 100644 test/async/container/parent.rb diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index a9512a0..fe64368 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -48,9 +48,6 @@ def initialize(io, ordinal: nil) # @returns [Integer | Nil] The container-scoped ordinal of this worker. attr :ordinal - # @returns [Object | Nil] The worker this one is nested inside. - attr_accessor :parent - # Generate a hash representation of the process. # # @returns [Hash] The process as a hash, including `process_id` and `name`. diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index da979dd..b51e773 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -307,7 +307,7 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu end end ensure - @ordinals.release([ordinal]) + @ordinals.release(ordinal) end.resume return true diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index 1c0c12e..b56c53c 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -24,14 +24,13 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio forks.times do self.spawn(**options) do |instance| + # Fork ordinals are unique and stable across restart; each fork owns a + # deterministic fixed range for its inner threaded workers. first_ordinal = instance.ordinal * threads - ordinals = Ordinals::Fixed.new(first_ordinal...(first_ordinal + threads)) + ordinals = Ordinals::Fixed.range(first_ordinal, threads) container = Threaded.new(ordinals: ordinals) - # Link each inner thread worker to this fork, so the thread instance can reach - # the durable process ordinal via `instance.parent`. container.run(count: threads, health_check_timeout: health_check_timeout, **options) do |worker| - worker.parent = instance block.call(worker) end diff --git a/lib/async/container/ordinals.rb b/lib/async/container/ordinals.rb index 9241c90..40ad80c 100644 --- a/lib/async/container/ordinals.rb +++ b/lib/async/container/ordinals.rb @@ -13,30 +13,21 @@ module Ordinals class Exhausted < RuntimeError end - # Base class for ordinal allocators. - class Allocator + # A sequential ordinal allocator with lowest-released ordinal reuse. + class Sequential + def initialize(initial = 0) + @next = initial + @free = Set.new + end + # Reserve a fixed pool of ordinals from this allocator. # @parameter count [Integer] The number of ordinals to reserve. # @returns [Fixed] The reserved ordinals as a fixed allocator. def reserve(count) - ordinals = [] - - count.times do - ordinals << acquire - end + first = @next + @next += count - return Fixed.new(ordinals) - rescue - release(ordinals) unless ordinals.empty? - raise - end - end - - # A sequential ordinal allocator with lowest-released ordinal reuse. - class Sequential < Allocator - def initialize(initial = 0) - @next = initial - @free = Set.new + return Fixed.new(first...@next) end # Allocate the next available ordinal. @@ -52,19 +43,25 @@ def acquire return ordinal end - # Return ordinals to the allocator. - # @parameter ordinals [Enumerable(Integer)] The ordinals to release. - def release(ordinals) - ordinals.each do |ordinal| - @free.add(ordinal) - end + # Return an ordinal to the allocator. + # @parameter ordinal [Integer] The ordinal to release. + def release(ordinal) + @free.add(ordinal) end end # An allocator backed by a fixed set of ordinals. - class Fixed < Allocator + class Fixed include Enumerable + # Create a fixed pool from a contiguous range of ordinals. + # @parameter initial [Integer] The first ordinal in the range. + # @parameter count [Integer] The number of ordinals in the range. + # @returns [Fixed] The fixed allocator for the range. + def self.range(initial, count) + self.new(initial...(initial + count)) + end + def initialize(ordinals) @ordinals = ordinals.to_set.freeze @free = @ordinals.dup @@ -78,6 +75,22 @@ def each(&block) @ordinals.each(&block) end + # Reserve a fixed pool of ordinals from this allocator. + # @parameter count [Integer] The number of ordinals to reserve. + # @returns [Fixed] The reserved ordinals as a fixed allocator. + def reserve(count) + if count > @free.size + raise Exhausted, "No ordinals available!" + end + + ordinals = @free.min(count) + ordinals.each do |ordinal| + @free.delete(ordinal) + end + + return Fixed.new(ordinals) + end + # Allocate the lowest available ordinal from the fixed pool. def acquire unless @free.empty? @@ -89,16 +102,14 @@ def acquire raise Exhausted, "No ordinals available!" end - # Return ordinals to the fixed pool. - # @parameter ordinals [Enumerable(Integer)] The ordinals to release. - def release(ordinals) - ordinals.each do |ordinal| - unless @ordinals.include?(ordinal) - raise ArgumentError, "Cannot release ordinal #{ordinal.inspect} to #{self.class}!" - end - - @free.add(ordinal) + # Return an ordinal to the fixed pool. + # @parameter ordinal [Integer] The ordinal to release. + def release(ordinal) + unless @ordinals.include?(ordinal) + raise ArgumentError, "Cannot release ordinal #{ordinal.inspect} to #{self.class}!" end + + @free.add(ordinal) end end end diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index 06ba5b9..9ef0c64 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -62,9 +62,6 @@ def initialize(io, ordinal: nil) # @returns [Integer | Nil] The container-scoped ordinal of this worker. attr :ordinal - # @returns [Object | Nil] The worker this one is nested inside. - attr_accessor :parent - # Generate a hash representation of the thread. # # @returns [Hash] The thread as a hash, including `process_id`, `thread_id`, and `name`. diff --git a/test/async/container/ordinal.rb b/test/async/container/ordinal.rb index 22953d2..c8a7a3b 100644 --- a/test/async/container/ordinal.rb +++ b/test/async/container/ordinal.rb @@ -90,7 +90,7 @@ def collect_worker_ordinals(container, count: 3, **options) it "reuses the lowest released ordinal before extending the range" do 3.times{ordinals.acquire} # => 0, 1, 2 - ordinals.release([1]) + ordinals.release(1) expect(ordinals.acquire).to be == 1 # reused expect(ordinals.acquire).to be == 3 # then extends @@ -98,8 +98,8 @@ def collect_worker_ordinals(container, count: 3, **options) it "does not hand out the same ordinal twice when an ordinal is released more than once" do 2.times{ordinals.acquire} # => 0, 1 - ordinals.release([0]) - ordinals.release([0]) # double release must be idempotent + ordinals.release(0) + ordinals.release(0) # double release must be idempotent expect(ordinals.acquire).to be == 0 # the recycled ordinal expect(ordinals.acquire).to be == 2 # not 0 again @@ -117,6 +117,12 @@ def collect_worker_ordinals(container, count: 3, **options) describe Async::Container::Ordinals::Fixed do let(:ordinals) {subject.new([5, 7])} + it "creates a fixed allocator from a range" do + ordinals = subject.range(5, 3) + + expect(ordinals.to_a).to be == [5, 6, 7] + end + it "allocates from the fixed pool" do expect(ordinals.acquire).to be == 5 expect(ordinals.acquire).to be == 7 @@ -125,12 +131,27 @@ def collect_worker_ordinals(container, count: 3, **options) it "can release ordinals back to the fixed pool" do expect(ordinals.acquire).to be == 5 - ordinals.release([5]) + ordinals.release(5) expect(ordinals.acquire).to be == 5 end + it "reserves ordinals as a fixed allocator" do + reserved = ordinals.reserve(2) + + expect(reserved).to be_a(Async::Container::Ordinals::Fixed) + expect(reserved.to_a).to be == [5, 7] + expect{ordinals.acquire}.to raise_exception(Async::Container::Ordinals::Exhausted) + end + + it "does not partially reserve ordinals if the pool is too small" do + expect{ordinals.reserve(3)}.to raise_exception(Async::Container::Ordinals::Exhausted) + + expect(ordinals.acquire).to be == 5 + expect(ordinals.acquire).to be == 7 + end + it "rejects ordinals outside the fixed pool" do - expect{ordinals.release([6])}.to raise_exception(ArgumentError) + expect{ordinals.release(6)}.to raise_exception(ArgumentError) end end @@ -176,14 +197,14 @@ def collect_worker_ordinals(container, count: 3, **options) child = subject.new child_ordinals = collect_worker_ordinals(child, count: 2) - output.puts("parent=#{instance.ordinal} child=#{child_ordinals.sort.join(",")}") + output.puts("outer=#{instance.ordinal} child=#{child_ordinals.sort.join(",")}") end container.wait output.close reported = input.read.lines.map(&:chomp) - expect(reported).to be == ["parent=0 child=0,1"] + expect(reported).to be == ["outer=0 child=0,1"] ensure input&.close unless input&.closed? end @@ -209,4 +230,12 @@ def collect_worker_ordinals(container, count: 3, **options) expect(ordinals.sort).to be == [0, 1, 2, 3] expect(container.statistics).to have_attributes(failures: be == 0) end + + it "anchors worker ordinal ranges to the fork ordinal" do + container = subject.new(ordinals: Async::Container::Ordinals::Sequential.new(3)) + ordinals = collect_worker_ordinals(container, count: 4, forks: 2, threads: 2) + + expect(ordinals.sort).to be == [6, 7, 8, 9] + expect(container.statistics).to have_attributes(failures: be == 0) + end end if Async::Container.fork? diff --git a/test/async/container/parent.rb b/test/async/container/parent.rb deleted file mode 100644 index 23b1946..0000000 --- a/test/async/container/parent.rb +++ /dev/null @@ -1,55 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2026, by Udi Oron. - -require "async/container/threaded" -require "async/container/forked" -require "async/container/hybrid" -require "async/container/best" - -# Have the worker serialise something about its instance to a pipe, one line per worker. -def report_from_worker(container, **run_options) - input, output = IO.pipe - container.run(**run_options) do |instance| - output.write(yield(instance) + "\n") - end - container.wait - output.close - input.read.lines.map(&:chomp) -ensure - input&.close unless input&.closed? -end - -describe Async::Container::Threaded do - it "has no parent" do - reported = report_from_worker(subject.new, count: 1) do |instance| - "ordinal=#{instance.ordinal} parent=#{instance.parent.inspect}" - end - - expect(reported).to be == ["ordinal=0 parent=nil"] - end -end - -describe Async::Container::Forked do - it "has no parent" do - reported = report_from_worker(subject.new, count: 1) do |instance| - "ordinal=#{instance.ordinal} parent=#{instance.parent.inspect}" - end - - expect(reported).to be == ["ordinal=0 parent=nil"] - end -end if Async::Container.fork? - -describe Async::Container::Hybrid do - it "reaches the durable forked ordinal through instance.parent" do - reported = report_from_worker(subject.new, count: 2, forks: 2, threads: 1) do |instance| - "thread=#{instance.ordinal} parent=#{instance.parent&.ordinal}" - end - - expect(reported.sort).to be == [ - "thread=0 parent=0", - "thread=1 parent=1", - ] - end -end if Async::Container.fork?