@@ -15,6 +15,11 @@ module Async
1515 #
1616 # @public Since *Async v1*.
1717 class Queue
18+ # An error raised when trying to enqueue items to a closed queue.
19+ # @public Since *Async v2.24*.
20+ class ClosedError < RuntimeError
21+ end
22+
1823 # Create a new queue.
1924 #
2025 # @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
@@ -26,6 +31,7 @@ def initialize(parent: nil, available: Notification.new)
2631 @available = available
2732 end
2833
34+ # Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
2935 def close
3036 @closed = true
3137
@@ -61,6 +67,10 @@ def <<(item)
6167
6268 # Add multiple items to the queue.
6369 def enqueue ( *items )
70+ if @closed
71+ raise ClosedError , "Cannot enqueue items to a closed queue."
72+ end
73+
6474 @items . concat ( items )
6575
6676 @available . signal unless self . empty?
@@ -133,6 +143,14 @@ def initialize(limit = 1, full: Notification.new, **options)
133143 # @attribute [Integer] The maximum number of items that can be enqueued.
134144 attr :limit
135145
146+ def close
147+ super
148+
149+ while @full . waiting?
150+ @full . signal ( nil )
151+ end
152+ end
153+
136154 # @returns [Boolean] Whether trying to enqueue an item would block.
137155 def limited?
138156 @items . size >= @limit
@@ -162,6 +180,10 @@ def enqueue(*items)
162180 @full . wait
163181 end
164182
183+ if @closed
184+ raise ClosedError , "Cannot enqueue items to a closed queue."
185+ end
186+
165187 available = @limit - @items . size
166188 @items . concat ( items . shift ( available ) )
167189
0 commit comments