Skip to content

Commit fffa67a

Browse files
committed
Add durable worker num and execution context
Containers expose nothing to a worker about which worker it is; the Child::Instance handed to the run block carries only `name`. Code that needs a stable per-worker identifier (e.g. a prometheus-client-mmap pid_provider, which keys mmap files per process) has nothing to key on, and under Falcon/async-service the app never holds the run block itself. Such an identifier needs to be both durable across a restart (so metrics survive a re-fork instead of fragmenting) and bounded in cardinality (drawn from 0..N-1 rather than the open-ended PID space, so the file and series count stays constant). A recycled worker ordinal satisfies both. Add a container-scoped `num` allocated by Generic (a counter plus a Set free-list; idempotent release), captured in the spawn closure so it is unchanged when a `restart: true` worker re-enters `start`. Expose `num` and `kind` on Child::Instance, `instance_num` on the parent-side Child, and a `parent` link plus a `context` Frame stack built from the object graph. Hybrid links each inner thread worker to its fork, so a Hybrid thread can reach its durable process num via `instance.parent.num` with no process- or thread-global state.
1 parent cb5a852 commit fffa67a

7 files changed

Lines changed: 309 additions & 23 deletions

File tree

lib/async/container/context.rb

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2026, by Udi Oron.
5+
6+
module Async
7+
module Container
8+
# A single level of a worker's execution context.
9+
# @attribute kind [Symbol] Either `:process` or `:thread`.
10+
# @attribute num [Integer] The container-scoped ordinal of the worker at this level.
11+
# @attribute name [String | Nil] The name the container was given for this worker.
12+
Frame = Data.define(:kind, :num, :name)
13+
14+
# Mixed into each container's `Child::Instance` to expose its place in the worker
15+
# hierarchy. The frame stack is built from the object graph (this instance plus its
16+
# parent chain), so worker code that holds the instance - e.g. an async-service
17+
# `prepare!(instance)` hook - can read its durable worker number with no process or
18+
# thread globals.
19+
module Context
20+
# The instance of the worker this one is nested inside, or `nil` at the top level.
21+
# A {Hybrid} thread's parent is its fork; a plain {Forked}/{Threaded} worker has none.
22+
attr_accessor :parent
23+
24+
# The execution context as a {Frame} stack, outermost level first.
25+
# @returns [Array(Frame)] e.g. `[process, thread]` for a Hybrid thread worker.
26+
def context
27+
(parent ? parent.context : []) + [Frame.new(kind: kind, num: num, name: name)]
28+
end
29+
end
30+
end
31+
end

lib/async/container/forked.rb

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
require_relative "generic"
99
require_relative "channel"
10+
require_relative "context"
1011
require_relative "notify/pipe"
1112

1213
module Async
@@ -22,10 +23,12 @@ def self.multiprocess?
2223
class Child < Channel
2324
# Represents a running child process from the point of view of the child process.
2425
class Instance < Notify::Pipe
26+
include Context
27+
2528
# Wrap an instance around the {Process} instance from within the forked child.
2629
# @parameter process [Process] The process intance to wrap.
27-
def self.for(process)
28-
instance = self.new(process.out)
30+
def self.for(process, instance_num: nil)
31+
instance = self.new(process.out, num: instance_num)
2932

3033
# The child process won't be reading from the channel:
3134
process.close_read
@@ -38,10 +41,19 @@ def self.for(process)
3841
# Initialize the child process instance.
3942
#
4043
# @parameter io [IO] The IO object to use for communication.
41-
def initialize(io)
42-
super
44+
def initialize(io, num: nil)
45+
super(io)
4346

4447
@name = nil
48+
@num = num
49+
end
50+
51+
# @returns [Integer | Nil] The container-scoped ordinal of this worker.
52+
attr :num
53+
54+
# @returns [Symbol] The kind of worker this instance represents.
55+
def kind
56+
:process
4557
end
4658

4759
# Generate a hash representation of the process.
@@ -51,6 +63,7 @@ def as_json(...)
5163
{
5264
process_id: ::Process.pid,
5365
name: @name,
66+
num: @num,
5467
}
5568
end
5669

@@ -98,9 +111,9 @@ def exec(*arguments, ready: true, **options)
98111
# Fork a child process appropriate for a container.
99112
#
100113
# @returns [Process]
101-
def self.fork(**options)
114+
def self.fork(instance_num: nil, **options)
102115
# $stderr.puts fork: caller
103-
self.new(**options) do |process|
116+
self.new(instance_num: instance_num, **options) do |process|
104117
::Process.fork do
105118
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
106119
Signal.trap(:INT){::Thread.current.raise(Interrupt)}
@@ -109,7 +122,7 @@ def self.fork(**options)
109122

110123
# This could be a configuration option:
111124
::Thread.handle_interrupt(SignalException => :immediate) do
112-
yield Instance.for(process)
125+
yield Instance.for(process, instance_num: instance_num)
113126
rescue Interrupt
114127
# Graceful exit.
115128
rescue Exception => error
@@ -138,10 +151,11 @@ def self.spawn(*arguments, name: nil, **options)
138151

139152
# Initialize the process.
140153
# @parameter name [String] The name to use for the child process.
141-
def initialize(name: nil, **options)
154+
def initialize(name: nil, instance_num: nil, **options)
142155
super(**options)
143156

144157
@name = name
158+
@instance_num = instance_num
145159
@status = nil
146160
@pid = nil
147161

@@ -185,6 +199,9 @@ def name= value
185199
# @attribute [Integer] The process identifier.
186200
attr :pid
187201

202+
# @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents.
203+
attr :instance_num
204+
188205
# A human readable representation of the process.
189206
# @returns [String]
190207
def inspect
@@ -265,8 +282,8 @@ def wait(timeout = 0.1)
265282
# Start a named child process and execute the provided block in it.
266283
# @parameter name [String] The name (title) of the child process.
267284
# @parameter block [Proc] The block to execute in the child process.
268-
def start(name, &block)
269-
Child.fork(name: name, &block)
285+
def start(name, instance_num: nil, &block)
286+
Child.fork(name: name, instance_num: instance_num, &block)
270287
end
271288
end
272289
end

lib/async/container/generic.rb

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ def initialize(policy: Policy::DEFAULT, **options)
5454
@policy = policy
5555
@statistics = @policy.make_statistics
5656
@keyed = {}
57+
# Container-scoped allocation of worker ordinals (`instance_num`): a monotonic
58+
# counter plus a free set, so a num released by a permanently exited worker is
59+
# recycled, keeping the range compact (e.g. for multiprocess metric files).
60+
@next_instance_num = 0
61+
@free_instance_nums = Set.new
5762
end
5863

5964
# @attribute [Group] The group of running children instances.
@@ -213,21 +218,27 @@ def stop(timeout = true)
213218
# @parameter key [Symbol] A key used for reloading child instances.
214219
# @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy.
215220
# @parameter startup_timeout [Numeric | Nil] The maximum time a child instance can run without becoming ready, before it is terminated as unhealthy.
216-
def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block)
221+
def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_timeout: nil, startup_timeout: nil, &block)
217222
name ||= UNNAMED
218223

219224
if mark?(key)
220225
Console.debug(self, "Reusing existing child.", child: {key: key, name: name})
221226
return false
222227
end
223228

229+
# Allocate before the fiber so the closure captures the num and it stays unchanged
230+
# across a restart (which re-enters `start` in the same fiber). Only release a num
231+
# we allocated ourselves, and only when the worker permanently exits.
232+
owned = instance_num.nil?
233+
instance_num ||= acquire_instance_num
234+
224235
@statistics.spawn!
225236

226237
fiber do
227238
until @stopping
228239
Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics)
229240

230-
child = self.start(name, &block)
241+
child = self.start(name, instance_num: instance_num, &block)
231242
state = insert(key, child)
232243

233244
# Notify policy of spawn
@@ -300,11 +311,34 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu
300311
break
301312
end
302313
end
314+
ensure
315+
release_instance_num(instance_num) if owned
303316
end.resume
304317

305318
return true
306319
end
307320

321+
# Allocate a container-scoped worker ordinal, recycling the lowest released num.
322+
# Allocation runs on the single cooperative reactor thread (acquire in the run loop,
323+
# release in the fiber's ensure), so no synchronisation is required.
324+
protected def acquire_instance_num
325+
unless @free_instance_nums.empty?
326+
num = @free_instance_nums.min
327+
@free_instance_nums.delete(num)
328+
return num
329+
end
330+
331+
num = @next_instance_num
332+
@next_instance_num += 1
333+
num
334+
end
335+
336+
# Return a worker ordinal to the free set once its worker has permanently exited. Using a
337+
# Set makes release idempotent, so a double release can't hand the same num to two workers.
338+
protected def release_instance_num(instance_num)
339+
@free_instance_nums.add(instance_num)
340+
end
341+
308342
# Run multiple instances of the same block in the container.
309343
# @parameter count [Integer] The number of instances to start.
310344
def run(count: Container.processor_count, **options, &block)

lib/async/container/hybrid.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio
2626
self.spawn(**options) do |instance|
2727
container = Threaded.new
2828

29-
container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block)
29+
# Link each inner thread worker to this fork, so the thread instance can reach
30+
# the durable process num via `instance.parent` / `instance.context`.
31+
container.run(count: threads, health_check_timeout: health_check_timeout, **options) do |worker|
32+
worker.parent = instance
33+
block.call(worker)
34+
end
3035

3136
container.wait_until_ready
3237
instance.ready!

lib/async/container/threaded.rb

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
require_relative "generic"
77
require_relative "channel"
8+
require_relative "context"
89
require_relative "notify/pipe"
910

1011
module Async
@@ -41,21 +42,32 @@ def error
4142

4243
# Represents a running child thread from the point of view of the child thread.
4344
class Instance < Notify::Pipe
45+
include Context
46+
4447
# Wrap an instance around the {Thread} instance from within the threaded child.
4548
# @parameter thread [Thread] The thread intance to wrap.
46-
def self.for(thread)
47-
instance = self.new(thread.out)
49+
def self.for(thread, instance_num: nil)
50+
instance = self.new(thread.out, num: instance_num)
4851

4952
return instance
5053
end
5154

5255
# Initialize the child thread instance.
5356
#
5457
# @parameter io [IO] The IO object to use for communication with the parent.
55-
def initialize(io)
58+
def initialize(io, num: nil)
5659
@thread = ::Thread.current
60+
@num = num
5761

58-
super
62+
super(io)
63+
end
64+
65+
# @returns [Integer | Nil] The container-scoped ordinal of this worker.
66+
attr :num
67+
68+
# @returns [Symbol] The kind of worker this instance represents.
69+
def kind
70+
:thread
5971
end
6072

6173
# Generate a hash representation of the thread.
@@ -66,6 +78,7 @@ def as_json(...)
6678
process_id: ::Process.pid,
6779
thread_id: @thread.object_id,
6880
name: @thread.name,
81+
num: @num,
6982
}
7083
end
7184

@@ -111,12 +124,12 @@ def exec(*arguments, ready: true, **options)
111124
# Start a new child thread and execute the provided block in it.
112125
#
113126
# @parameter options [Hash] Additional options to to the new child instance.
114-
def self.fork(**options)
115-
self.new(**options) do |thread|
127+
def self.fork(instance_num: nil, **options)
128+
self.new(instance_num: instance_num, **options) do |thread|
116129
::Thread.new do
117130
# This could be a configuration option (see forked implementation too):
118131
::Thread.handle_interrupt(SignalException => :immediate) do
119-
yield Instance.for(thread)
132+
yield Instance.for(thread, instance_num: instance_num)
120133
end
121134
end
122135
end
@@ -125,9 +138,10 @@ def self.fork(**options)
125138
# Initialize the thread.
126139
#
127140
# @parameter name [String] The name to use for the child thread.
128-
def initialize(name: nil, **options)
141+
def initialize(name: nil, instance_num: nil, **options)
129142
super(**options)
130143

144+
@instance_num = instance_num
131145
@status = nil
132146

133147
@thread = yield(self)
@@ -150,6 +164,9 @@ def initialize(name: nil, **options)
150164
end
151165
end
152166

167+
# @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents.
168+
attr :instance_num
169+
153170
# Convert the child process to a hash, suitable for serialization.
154171
#
155172
# @returns [Hash] The request as a hash.
@@ -286,8 +303,8 @@ def finished(error = nil)
286303
# Start a named child thread and execute the provided block in it.
287304
# @parameter name [String] The name (title) of the child process.
288305
# @parameter block [Proc] The block to execute in the child process.
289-
def start(name, &block)
290-
Child.fork(name: name, &block)
306+
def start(name, instance_num: nil, &block)
307+
Child.fork(name: name, instance_num: instance_num, &block)
291308
end
292309
end
293310
end

test/async/container/context.rb

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2026, by Udi Oron.
5+
6+
require "async/container/threaded"
7+
require "async/container/forked"
8+
require "async/container/hybrid"
9+
require "async/container/best"
10+
11+
# Have the worker serialise something about its instance to a pipe, one line per worker.
12+
def report_from_worker(container, **run_options)
13+
input, output = IO.pipe
14+
container.run(**run_options) do |instance|
15+
output.write(yield(instance) + "\n")
16+
end
17+
container.wait
18+
output.close
19+
input.read.lines.map(&:chomp)
20+
ensure
21+
input&.close unless input&.closed?
22+
end
23+
24+
describe Async::Container::Threaded do
25+
it "exposes a single :thread frame and no parent via instance.context" do
26+
reported = report_from_worker(subject.new, count: 1) do |instance|
27+
"#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}"
28+
end
29+
30+
expect(reported).to be == ["thread:0|parent=nil"]
31+
end
32+
end
33+
34+
describe Async::Container::Forked do
35+
it "exposes a single :process frame and no parent via instance.context" do
36+
reported = report_from_worker(subject.new, count: 1) do |instance|
37+
"#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}"
38+
end
39+
40+
expect(reported).to be == ["process:0|parent=nil"]
41+
end
42+
end if Async::Container.fork?
43+
44+
describe Async::Container::Hybrid do
45+
it "exposes [:process, :thread] frames via instance.context" do
46+
reported = report_from_worker(subject.new, count: 1, forks: 1, threads: 1) do |instance|
47+
instance.context.map {|f| f.kind}.join(",")
48+
end
49+
50+
expect(reported).to be == ["process,thread"]
51+
end
52+
53+
it "reaches the durable forked num through instance.parent (not the thread num)" do
54+
reported = report_from_worker(subject.new, count: 2, forks: 2, threads: 1) do |instance|
55+
"#{instance.kind}/#{instance.num} parent=#{instance.parent&.kind}/#{instance.parent&.num}"
56+
end
57+
58+
# Both workers are thread num 0 within their fork; the durable forked num is on the parent.
59+
expect(reported.sort).to be == [
60+
"thread/0 parent=process/0",
61+
"thread/0 parent=process/1",
62+
]
63+
end
64+
end if Async::Container.fork?

0 commit comments

Comments
 (0)