Skip to content

Commit 34db040

Browse files
Simplify worker ordinal API
Assisted-By: devx/c78f867c-4c73-40b2-a763-4a9332e15ef9
1 parent fffa67a commit 34db040

8 files changed

Lines changed: 184 additions & 235 deletions

File tree

lib/async/container/context.rb

Lines changed: 0 additions & 31 deletions
This file was deleted.

lib/async/container/forked.rb

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
require_relative "generic"
99
require_relative "channel"
10-
require_relative "context"
1110
require_relative "notify/pipe"
1211

1312
module Async
@@ -23,12 +22,10 @@ def self.multiprocess?
2322
class Child < Channel
2423
# Represents a running child process from the point of view of the child process.
2524
class Instance < Notify::Pipe
26-
include Context
27-
2825
# Wrap an instance around the {Process} instance from within the forked child.
2926
# @parameter process [Process] The process intance to wrap.
30-
def self.for(process, instance_num: nil)
31-
instance = self.new(process.out, num: instance_num)
27+
def self.for(process, ordinal: nil)
28+
instance = self.new(process.out, ordinal: ordinal)
3229

3330
# The child process won't be reading from the channel:
3431
process.close_read
@@ -41,20 +38,18 @@ def self.for(process, instance_num: nil)
4138
# Initialize the child process instance.
4239
#
4340
# @parameter io [IO] The IO object to use for communication.
44-
def initialize(io, num: nil)
41+
def initialize(io, ordinal: nil)
4542
super(io)
4643

4744
@name = nil
48-
@num = num
45+
@ordinal = ordinal
4946
end
5047

5148
# @returns [Integer | Nil] The container-scoped ordinal of this worker.
52-
attr :num
49+
attr :ordinal
5350

54-
# @returns [Symbol] The kind of worker this instance represents.
55-
def kind
56-
:process
57-
end
51+
# @returns [Object | Nil] The worker this one is nested inside.
52+
attr_accessor :parent
5853

5954
# Generate a hash representation of the process.
6055
#
@@ -63,7 +58,7 @@ def as_json(...)
6358
{
6459
process_id: ::Process.pid,
6560
name: @name,
66-
num: @num,
61+
ordinal: @ordinal,
6762
}
6863
end
6964

@@ -111,9 +106,9 @@ def exec(*arguments, ready: true, **options)
111106
# Fork a child process appropriate for a container.
112107
#
113108
# @returns [Process]
114-
def self.fork(instance_num: nil, **options)
109+
def self.fork(ordinal: nil, **options)
115110
# $stderr.puts fork: caller
116-
self.new(instance_num: instance_num, **options) do |process|
111+
self.new(ordinal: ordinal, **options) do |process|
117112
::Process.fork do
118113
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
119114
Signal.trap(:INT){::Thread.current.raise(Interrupt)}
@@ -122,7 +117,7 @@ def self.fork(instance_num: nil, **options)
122117

123118
# This could be a configuration option:
124119
::Thread.handle_interrupt(SignalException => :immediate) do
125-
yield Instance.for(process, instance_num: instance_num)
120+
yield Instance.for(process, ordinal: ordinal)
126121
rescue Interrupt
127122
# Graceful exit.
128123
rescue Exception => error
@@ -151,11 +146,11 @@ def self.spawn(*arguments, name: nil, **options)
151146

152147
# Initialize the process.
153148
# @parameter name [String] The name to use for the child process.
154-
def initialize(name: nil, instance_num: nil, **options)
149+
def initialize(name: nil, ordinal: nil, **options)
155150
super(**options)
156151

157152
@name = name
158-
@instance_num = instance_num
153+
@ordinal = ordinal
159154
@status = nil
160155
@pid = nil
161156

@@ -200,7 +195,7 @@ def name= value
200195
attr :pid
201196

202197
# @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents.
203-
attr :instance_num
198+
attr :ordinal
204199

205200
# A human readable representation of the process.
206201
# @returns [String]
@@ -282,8 +277,8 @@ def wait(timeout = 0.1)
282277
# Start a named child process and execute the provided block in it.
283278
# @parameter name [String] The name (title) of the child process.
284279
# @parameter block [Proc] The block to execute in the child process.
285-
def start(name, instance_num: nil, &block)
286-
Child.fork(name: name, instance_num: instance_num, &block)
280+
def start(name, ordinal: nil, &block)
281+
Child.fork(name: name, ordinal: ordinal, &block)
287282
end
288283
end
289284
end

lib/async/container/generic.rb

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ def initialize(policy: Policy::DEFAULT, **options)
5454
@policy = policy
5555
@statistics = @policy.make_statistics
5656
@keyed = {}
57-
# Container-scoped allocation of worker ordinals (`instance_num`): a monotonic
58-
# counter plus a free set, so a num released by a permanently exited worker is
57+
# Container-scoped allocation of worker ordinals: a monotonic
58+
# counter plus a free set, so an ordinal released by a permanently exited worker is
5959
# recycled, keeping the range compact (e.g. for multiprocess metric files).
60-
@next_instance_num = 0
61-
@free_instance_nums = Set.new
60+
@next_ordinal = 0
61+
@free_ordinals = Set.new
6262
end
6363

6464
# @attribute [Group] The group of running children instances.
@@ -218,27 +218,25 @@ def stop(timeout = true)
218218
# @parameter key [Symbol] A key used for reloading child instances.
219219
# @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy.
220220
# @parameter startup_timeout [Numeric | Nil] The maximum time a child instance can run without becoming ready, before it is terminated as unhealthy.
221-
def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_timeout: nil, startup_timeout: nil, &block)
221+
def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block)
222222
name ||= UNNAMED
223223

224224
if mark?(key)
225225
Console.debug(self, "Reusing existing child.", child: {key: key, name: name})
226226
return false
227227
end
228228

229-
# Allocate before the fiber so the closure captures the num and it stays unchanged
230-
# across a restart (which re-enters `start` in the same fiber). Only release a num
231-
# we allocated ourselves, and only when the worker permanently exits.
232-
owned = instance_num.nil?
233-
instance_num ||= acquire_instance_num
229+
# Allocate before the fiber so the closure captures the ordinal and it stays
230+
# unchanged across a restart (which re-enters `start` in the same fiber).
231+
ordinal = acquire_ordinal
234232

235233
@statistics.spawn!
236234

237235
fiber do
238236
until @stopping
239237
Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics)
240238

241-
child = self.start(name, instance_num: instance_num, &block)
239+
child = self.start(name, ordinal: ordinal, &block)
242240
state = insert(key, child)
243241

244242
# Notify policy of spawn
@@ -312,31 +310,31 @@ def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_t
312310
end
313311
end
314312
ensure
315-
release_instance_num(instance_num) if owned
313+
release_ordinal(ordinal)
316314
end.resume
317315

318316
return true
319317
end
320318

321-
# Allocate a container-scoped worker ordinal, recycling the lowest released num.
319+
# Allocate a container-scoped worker ordinal, recycling the lowest released ordinal.
322320
# Allocation runs on the single cooperative reactor thread (acquire in the run loop,
323321
# release in the fiber's ensure), so no synchronisation is required.
324-
protected def acquire_instance_num
325-
unless @free_instance_nums.empty?
326-
num = @free_instance_nums.min
327-
@free_instance_nums.delete(num)
328-
return num
322+
protected def acquire_ordinal
323+
unless @free_ordinals.empty?
324+
ordinal = @free_ordinals.min
325+
@free_ordinals.delete(ordinal)
326+
return ordinal
329327
end
330328

331-
num = @next_instance_num
332-
@next_instance_num += 1
333-
num
329+
ordinal = @next_ordinal
330+
@next_ordinal += 1
331+
ordinal
334332
end
335333

336334
# Return a worker ordinal to the free set once its worker has permanently exited. Using a
337-
# Set makes release idempotent, so a double release can't hand the same num to two workers.
338-
protected def release_instance_num(instance_num)
339-
@free_instance_nums.add(instance_num)
335+
# Set makes release idempotent, so a double release can't hand the same ordinal to two workers.
336+
protected def release_ordinal(ordinal)
337+
@free_ordinals.add(ordinal)
340338
end
341339

342340
# Run multiple instances of the same block in the container.

lib/async/container/hybrid.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio
2727
container = Threaded.new
2828

2929
# Link each inner thread worker to this fork, so the thread instance can reach
30-
# the durable process num via `instance.parent` / `instance.context`.
30+
# the durable process ordinal via `instance.parent`.
3131
container.run(count: threads, health_check_timeout: health_check_timeout, **options) do |worker|
3232
worker.parent = instance
3333
block.call(worker)

lib/async/container/threaded.rb

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
require_relative "generic"
77
require_relative "channel"
8-
require_relative "context"
98
require_relative "notify/pipe"
109

1110
module Async
@@ -42,33 +41,29 @@ def error
4241

4342
# Represents a running child thread from the point of view of the child thread.
4443
class Instance < Notify::Pipe
45-
include Context
46-
4744
# Wrap an instance around the {Thread} instance from within the threaded child.
4845
# @parameter thread [Thread] The thread intance to wrap.
49-
def self.for(thread, instance_num: nil)
50-
instance = self.new(thread.out, num: instance_num)
46+
def self.for(thread, ordinal: nil)
47+
instance = self.new(thread.out, ordinal: ordinal)
5148

5249
return instance
5350
end
5451

5552
# Initialize the child thread instance.
5653
#
5754
# @parameter io [IO] The IO object to use for communication with the parent.
58-
def initialize(io, num: nil)
55+
def initialize(io, ordinal: nil)
5956
@thread = ::Thread.current
60-
@num = num
57+
@ordinal = ordinal
6158

6259
super(io)
6360
end
6461

6562
# @returns [Integer | Nil] The container-scoped ordinal of this worker.
66-
attr :num
63+
attr :ordinal
6764

68-
# @returns [Symbol] The kind of worker this instance represents.
69-
def kind
70-
:thread
71-
end
65+
# @returns [Object | Nil] The worker this one is nested inside.
66+
attr_accessor :parent
7267

7368
# Generate a hash representation of the thread.
7469
#
@@ -78,7 +73,7 @@ def as_json(...)
7873
process_id: ::Process.pid,
7974
thread_id: @thread.object_id,
8075
name: @thread.name,
81-
num: @num,
76+
ordinal: @ordinal,
8277
}
8378
end
8479

@@ -124,12 +119,12 @@ def exec(*arguments, ready: true, **options)
124119
# Start a new child thread and execute the provided block in it.
125120
#
126121
# @parameter options [Hash] Additional options to to the new child instance.
127-
def self.fork(instance_num: nil, **options)
128-
self.new(instance_num: instance_num, **options) do |thread|
122+
def self.fork(ordinal: nil, **options)
123+
self.new(ordinal: ordinal, **options) do |thread|
129124
::Thread.new do
130125
# This could be a configuration option (see forked implementation too):
131126
::Thread.handle_interrupt(SignalException => :immediate) do
132-
yield Instance.for(thread, instance_num: instance_num)
127+
yield Instance.for(thread, ordinal: ordinal)
133128
end
134129
end
135130
end
@@ -138,10 +133,10 @@ def self.fork(instance_num: nil, **options)
138133
# Initialize the thread.
139134
#
140135
# @parameter name [String] The name to use for the child thread.
141-
def initialize(name: nil, instance_num: nil, **options)
136+
def initialize(name: nil, ordinal: nil, **options)
142137
super(**options)
143138

144-
@instance_num = instance_num
139+
@ordinal = ordinal
145140
@status = nil
146141

147142
@thread = yield(self)
@@ -165,7 +160,7 @@ def initialize(name: nil, instance_num: nil, **options)
165160
end
166161

167162
# @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents.
168-
attr :instance_num
163+
attr :ordinal
169164

170165
# Convert the child process to a hash, suitable for serialization.
171166
#
@@ -303,8 +298,8 @@ def finished(error = nil)
303298
# Start a named child thread and execute the provided block in it.
304299
# @parameter name [String] The name (title) of the child process.
305300
# @parameter block [Proc] The block to execute in the child process.
306-
def start(name, instance_num: nil, &block)
307-
Child.fork(name: name, instance_num: instance_num, &block)
301+
def start(name, ordinal: nil, &block)
302+
Child.fork(name: name, ordinal: ordinal, &block)
308303
end
309304
end
310305
end

0 commit comments

Comments
 (0)