Skip to content

Commit 4e30cd9

Browse files
committed
Better types.
1 parent b7ad219 commit 4e30cd9

7 files changed

Lines changed: 34 additions & 29 deletions

File tree

lib/async/barrier.rb

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ module Async
1313
# @public Since *Async v1*.
1414
class Barrier
1515
# Initialize the barrier.
16-
# @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks.
16+
# @parameter parent [_Asyncable?] The parent for holding any children tasks.
1717
# @public Since *Async v1*.
1818
def initialize(parent: nil)
1919
@tasks = List.new
@@ -32,16 +32,17 @@ def initialize(task)
3232

3333
private_constant :TaskNode
3434

35-
# Number of tasks being held by the barrier.
35+
# @returns [Integer] Number of tasks being held by the barrier.
3636
def size
3737
@tasks.size
3838
end
3939

40-
# All tasks which have been invoked into the barrier.
40+
# @attribute [Array(Task)] All tasks which have been invoked into the barrier.
4141
attr :tasks
4242

4343
# Execute a child task and add it to the barrier.
4444
# @asynchronous Executes the given block concurrently.
45+
# @rbs [T] (*untyped, parent: _Asyncable, **untyped) {(Task, *untyped) -> T} -> Task[T]
4546
def async(*arguments, parent: (@parent or Task.current), **options, &block)
4647
waiting = nil
4748

@@ -54,8 +55,7 @@ def async(*arguments, parent: (@parent or Task.current), **options, &block)
5455
end
5556
end
5657

57-
# Whether there are any tasks being held by the barrier.
58-
# @returns [Boolean]
58+
# @returns [Boolean] Whether there are any tasks being held by the barrier.
5959
def empty?
6060
@tasks.empty?
6161
end
@@ -65,6 +65,7 @@ def empty?
6565
# @yields {|task| ...} If a block is given, the unwaited task is yielded. You must invoke {Task#wait} yourself. In addition, you may `break` if you have captured enough results.
6666
#
6767
# @asynchronous Will wait for tasks to finish executing.
68+
# @rbs () ?{(Task) -> untyped} -> void
6869
def wait
6970
while !@tasks.empty?
7071
# Wait for a task to finish (we get the task node):
@@ -88,6 +89,7 @@ def wait
8889

8990
# Stop all tasks held by the barrier.
9091
# @asynchronous May wait for tasks to finish executing.
92+
# @rbs () -> void
9193
def stop
9294
@tasks.each do |waiting|
9395
waiting.task.stop

lib/async/idler.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class Idler
1212
#
1313
# @parameter maximum_load [Numeric] The maximum load before we start shedding work.
1414
# @parameter backoff [Numeric] The initial backoff time, used for delaying work.
15-
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
15+
# @parameter parent [Interface(Asyncable) | Nil] The parent task to use for async operations.
1616
def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil)
1717
@maximum_load = maximum_load
1818
@backoff = backoff
@@ -24,7 +24,7 @@ def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil)
2424
# @asynchronous Executes the given block concurrently.
2525
#
2626
# @parameter arguments [Array] The arguments to pass to the block.
27-
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
27+
# @parameter parent [Interface(Asyncable) | Nil] The parent task to use for async operations.
2828
# @parameter options [Hash] The options to pass to the task.
2929
# @yields {|task| ...} When the system is idle, the block will be executed in a new task.
3030
def async(*arguments, parent: (@parent or Task.current), **options, &block)

lib/async/queue.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class ClosedError < RuntimeError
2323

2424
# Create a new queue.
2525
#
26-
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
26+
# @parameter parent [Interface(Asyncable) | Nil] The parent task to use for async operations.
2727
# @parameter available [Notification] The notification to use for signaling when items are available.
2828
def initialize(parent: nil, available: Notification.new)
2929
@items = []
@@ -104,7 +104,7 @@ def pop
104104
# @asynchronous Executes the given block concurrently for each item.
105105
#
106106
# @parameter arguments [Array] The arguments to pass to the block.
107-
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
107+
# @parameter parent [Interface(Asyncable) | Nil] The parent task to use for async operations.
108108
# @parameter options [Hash] The options to pass to the task.
109109
# @yields {|task| ...} When the system is idle, the block will be executed in a new task.
110110
def async(parent: (@parent or Task.current), **options, &block)

lib/async/task.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ def self.current
370370
end
371371

372372
# Check if there is a task defined for the current fiber.
373-
# @returns [Interface(:async) | Nil]
373+
# @returns [Interface(Asyncable) | Nil]
374374
def self.current?
375375
Fiber.current.async_task
376376
end

lib/async/waiter.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ module Async
1010
class Waiter
1111
# Create a waiter instance.
1212
#
13-
# @parameter parent [Interface(:async) | Nil] The parent task to use for asynchronous operations.
13+
# @parameter parent [Interface(Asyncable) | Nil] The parent task to use for asynchronous operations.
1414
# @parameter finished [::Async::Condition] The condition to signal when a task completes.
1515
def initialize(parent: nil, finished: Async::Condition.new)
1616
warn("`Async::Waiter` is deprecated, use `Async::Barrier` instead.", uplevel: 1, category: :deprecated) if $VERBOSE

sig/async.rbs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,20 @@ module Async
33
# A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}.
44
class Barrier
55
# Initialize the barrier.
6-
public def initialize: (Task | Semaphore | nil parent) -> void
6+
public def initialize: (_Asyncable? parent) -> void
77

8-
# Number of tasks being held by the barrier.
9-
public def size: () -> untyped
8+
public def size: () -> Integer
109

1110
# Execute a child task and add it to the barrier.
12-
public def async: () -> untyped
11+
public def async: [T] (*untyped, parent: _Asyncable, **untyped) { (Task, *untyped) -> T } -> Task[T]
1312

14-
# Whether there are any tasks being held by the barrier.
1513
public def empty?: () -> bool
1614

1715
# Wait for all tasks to complete by invoking {Task#wait} on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.
18-
public def wait: () { () -> void } -> untyped
16+
public def wait: () ?{ (Task) -> untyped } -> void
1917

2018
# Stop all tasks held by the barrier.
21-
public def stop: () -> untyped
19+
public def stop: () -> void
2220
end
2321

2422
# A convenient wrapper around the internal monotonic clock.
@@ -91,10 +89,10 @@ module Async
9189
# A load balancing mechanism that can be used process work when the system is idle.
9290
class Idler
9391
# Create a new idler.
94-
public def initialize: (Numeric maximum_load, Numeric backoff, Interface parent) -> void
92+
public def initialize: (Numeric maximum_load, Numeric backoff, Interface[Asyncable] | nil parent) -> void
9593

9694
# Wait until the system is idle, then execute the given block in a new task.
97-
public def async: (Types::Array arguments, Interface parent, Types::Hash options) { () -> void } -> untyped
95+
public def async: (Array arguments, Interface[Asyncable] | nil parent, Hash options) { () -> void } -> untyped
9896

9997
# Wait until the system is idle, according to the maximum load specified.
10098
#
@@ -122,7 +120,7 @@ module Async
122120
# Add multiple items to the queue.
123121
#
124122
# If the queue is full, this method will block until there is space available.
125-
public def enqueue: (Types::Array items) -> untyped
123+
public def enqueue: (Array items) -> untyped
126124

127125
# Remove and return the next item from the queue.
128126
#
@@ -251,7 +249,7 @@ module Async
251249
public def consume: () -> untyped
252250

253251
# Traverse the task tree.
254-
public def traverse: () { () -> void } -> Types::Enumerator
252+
public def traverse: () { () -> void } -> Enumerator
255253

256254
# Immediately terminate all children tasks, including transient tasks. Internally invokes `stop(false)` on all children. This should be considered a last ditch effort and is used when closing the scheduler.
257255
public def terminate: () -> untyped
@@ -277,7 +275,7 @@ module Async
277275
# It has a compatible interface with {Notification} and {Condition}, except that it's multi-value.
278276
class Queue
279277
# Create a new queue.
280-
public def initialize: (Interface parent, Notification available) -> void
278+
public def initialize: (Interface[Asyncable] | nil parent, Notification available) -> void
281279

282280
# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
283281
public def close: () -> untyped
@@ -302,7 +300,7 @@ module Async
302300
public def pop: () -> untyped
303301

304302
# Process each item in the queue.
305-
public def async: (Types::Array arguments, Interface parent, Types::Hash options) { () -> void } -> untyped
303+
public def async: (Array arguments, Interface[Asyncable] | nil parent, Hash options) { () -> void } -> untyped
306304

307305
# Enumerate each item in the queue.
308306
public def each: () -> untyped
@@ -365,15 +363,15 @@ module Async
365363
public def yield: () -> untyped
366364

367365
# Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
368-
public def push: (Fiber | Object fiber) -> self
366+
public def push: (Any[Fiber, Object] fiber) -> self
369367

370368
# Raise an exception on a specified fiber with the given arguments.
371369
#
372370
# This internally schedules the current fiber to be ready, before raising the exception, so that it will later resume execution.
373-
public def raise: (Fiber fiber, Types::Array `*arguments`) -> untyped
371+
public def raise: (Fiber fiber, Array `*arguments`) -> untyped
374372

375373
# Resume execution of the specified fiber.
376-
public def resume: (Fiber fiber, Types::Array arguments) -> untyped
374+
public def resume: (Fiber fiber, Array arguments) -> untyped
377375

378376
# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
379377
public def block: (Object blocker, Float | nil timeout) -> untyped
@@ -567,7 +565,7 @@ module Async
567565
public def self.current: () -> Task
568566

569567
# Check if there is a task defined for the current fiber.
570-
public def self.current?: () -> Interface
568+
public def self.current?: () -> (Interface[Asyncable] | nil)
571569

572570
public def current?: () -> bool
573571

@@ -638,7 +636,7 @@ module Async
638636
# A composable synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore} and/or {Barrier}.
639637
class Waiter
640638
# Create a waiter instance.
641-
public def initialize: (Interface parent, ::Async::Condition finished) -> void
639+
public def initialize: (Interface[Asyncable] | nil parent, ::Async::Condition finished) -> void
642640

643641
# Execute a child task and add it to the waiter.
644642
public def async: () -> untyped

sig/async/asyncable.rbs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module Async
2+
interface _Asyncable
3+
def async: [T] (*untyped, **untyped) { (Task) -> T } -> Task[T]
4+
end
5+
end

0 commit comments

Comments
 (0)