Skip to content

Commit a1d1004

Browse files
committed
Add support for stop(cause:).
1 parent 8af530a commit a1d1004

2 files changed

Lines changed: 69 additions & 11 deletions

File tree

lib/async/task.rb

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,45 @@
1919
module Async
2020
# Raised when a task is explicitly stopped.
2121
class Stop < Exception
22+
# Represents the source of the stop operation.
23+
class Cause < Exception
24+
if RUBY_VERSION >= "3.4"
25+
# @returns [Array(Thread::Backtrace::Location)] The backtrace of the caller.
26+
def self.backtrace
27+
caller_locations(2..-1)
28+
end
29+
else
30+
# @returns [Array(String)] The backtrace of the caller.
31+
def self.backtrace
32+
caller(2..-1)
33+
end
34+
end
35+
36+
# Create a new cause of the stop operation, with the given message.
37+
#
38+
# @parameter message [String] The error message.
39+
# @returns [Cause] The cause of the stop operation.
40+
def self.for(message = "Task was stopped")
41+
instance = self.new(message)
42+
instance.set_backtrace(self.backtrace)
43+
return instance
44+
end
45+
end
46+
47+
# Create a new stop operation.
48+
def initialize(message = "Task was stopped")
49+
super(message)
50+
end
51+
2252
# Used to defer stopping the current task until later.
2353
class Later
2454
# Create a new stop later operation.
2555
#
2656
# @parameter task [Task] The task to stop later.
27-
def initialize(task)
57+
# @parameter cause [Exception] The cause of the stop operation.
58+
def initialize(task, cause = nil)
2859
@task = task
60+
@cause = cause
2961
end
3062

3163
# @returns [Boolean] Whether the task is alive.
@@ -35,7 +67,7 @@ def alive?
3567

3668
# Transfer control to the operation - this will stop the task.
3769
def transfer
38-
@task.stop
70+
@task.stop(false, cause: @cause)
3971
end
4072
end
4173
end
@@ -271,7 +303,13 @@ def wait
271303
# If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later.
272304
#
273305
# @parameter later [Boolean] Whether to stop the task later, or immediately.
274-
def stop(later = false)
306+
# @parameter cause [Exception] The cause of the stop operation.
307+
def stop(later = false, cause: $!)
308+
# If no cause is given, we generate one from the current call stack:
309+
unless cause
310+
cause = Stop::Cause.for("Stopping task!")
311+
end
312+
275313
if self.stopped?
276314
# If the task is already stopped, a `stop` state transition re-enters the same state which is a no-op. However, we will also attempt to stop any running children too. This can happen if the children did not stop correctly the first time around. Doing this should probably be considered a bug, but it's better to be safe than sorry.
277315
return stopped!
@@ -285,27 +323,27 @@ def stop(later = false)
285323
# If we are deferring stop...
286324
if @defer_stop == false
287325
# Don't stop now... but update the state so we know we need to stop later.
288-
@defer_stop = true
326+
@defer_stop = cause
289327
return false
290328
end
291329

292330
if self.current?
293331
# If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`:
294332
if later
295333
# If the fiber is the current fiber and we want to stop it later, schedule it:
296-
Fiber.scheduler.push(Stop::Later.new(self))
334+
Fiber.scheduler.push(Stop::Later.new(self, cause))
297335
else
298336
# Otherwise, raise the exception directly:
299-
raise Stop, "Stopping current task!"
337+
raise Stop, "Stopping current task!", cause: cause
300338
end
301339
else
302340
# If the fiber is not curent, we can raise the exception directly:
303341
begin
304342
# There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later.
305-
Fiber.scheduler.raise(@fiber, Stop)
343+
Fiber.scheduler.raise(@fiber, Stop, cause: cause)
306344
rescue FiberError
307345
# In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later:
308-
Fiber.scheduler.push(Stop::Later.new(self))
346+
Fiber.scheduler.push(Stop::Later.new(self, cause))
309347
end
310348
end
311349
else
@@ -345,7 +383,7 @@ def defer_stop
345383

346384
# If we were asked to stop, we should do so now:
347385
if defer_stop
348-
raise Stop, "Stopping current task (was deferred)!"
386+
raise Stop, "Stopping current task (was deferred)!", cause: defer_stop
349387
end
350388
end
351389
else
@@ -356,7 +394,7 @@ def defer_stop
356394

357395
# @returns [Boolean] Whether stop has been deferred.
358396
def stop_deferred?
359-
@defer_stop
397+
!!@defer_stop
360398
end
361399

362400
# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.

test/async/task.rb

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,26 @@
552552
expect(transient).to be(:running?)
553553
end.wait
554554
end
555+
556+
it "can stop a task and provide a cause" do
557+
error = nil
558+
559+
cause = Async::Stop::Cause.for("boom")
560+
561+
task = reactor.async do |task|
562+
begin
563+
task.stop(cause: cause)
564+
rescue Async::Stop => error
565+
raise
566+
end
567+
end
568+
569+
reactor.run
570+
571+
expect(task).to be(:stopped?)
572+
expect(error).to be_a(Async::Stop)
573+
expect(error.cause).to be == cause
574+
end
555575
end
556576

557577
with "#sleep" do
@@ -923,7 +943,7 @@ def sleep_forever
923943

924944
reactor.run_once(0)
925945

926-
expect(child_task.stop_deferred?).to be == nil
946+
expect(child_task.stop_deferred?).to be == false
927947
end
928948
end
929949

0 commit comments

Comments
 (0)