Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions lib/async/stop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

require "fiber"
require "console"

module Async
# Raised when a task is explicitly stopped.
class Stop < Exception
# Represents the source of the stop operation.
class Cause < Exception
if RUBY_VERSION >= "3.4"
# @returns [Array(Thread::Backtrace::Location)] The backtrace of the caller.
def self.backtrace
caller_locations(2..-1)
end
else
# @returns [Array(String)] The backtrace of the caller.
def self.backtrace
caller(2..-1)
end
end

# Create a new cause of the stop operation, with the given message.
#
# @parameter message [String] The error message.
# @returns [Cause] The cause of the stop operation.
def self.for(message = "Task was stopped")
instance = self.new(message)
instance.set_backtrace(self.backtrace)
return instance
end
end

if RUBY_VERSION < "3.5"
# Create a new stop operation.
#
# This is a compatibility method for Ruby versions before 3.5 where cause is not propagated correctly when using {Fiber#raise}
#
# @parameter message [String | Hash] The error message or a hash containing the cause.
def initialize(message = "Task was stopped")
if message.is_a?(Hash)
@cause = message[:cause]
message = "Task was stopped"
end

super(message)
end

# @returns [Exception] The cause of the stop operation.
#
# This is a compatibility method for Ruby versions before 3.5 where cause is not propagated correctly when using {Fiber#raise}, we explicitly capture the cause here.
def cause
super || @cause
end
end

# Used to defer stopping the current task until later.
class Later
# Create a new stop later operation.
#
# @parameter task [Task] The task to stop later.
# @parameter cause [Exception] The cause of the stop operation.
def initialize(task, cause = nil)
@task = task
@cause = cause
end

# @returns [Boolean] Whether the task is alive.
def alive?
true
end

# Transfer control to the operation - this will stop the task.
def transfer
@task.stop(false, cause: @cause)
end
end
end
end
46 changes: 15 additions & 31 deletions lib/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,11 @@

require_relative "node"
require_relative "condition"
require_relative "stop"

Fiber.attr_accessor :async_task

module Async
# Raised when a task is explicitly stopped.
class Stop < Exception
# Used to defer stopping the current task until later.
class Later
# Create a new stop later operation.
#
# @parameter task [Task] The task to stop later.
def initialize(task)
@task = task
end

# @returns [Boolean] Whether the task is alive.
def alive?
true
end

# Transfer control to the operation - this will stop the task.
def transfer
@task.stop
end
end
end

# Raised if a timeout occurs on a specific Fiber. Handled gracefully by `Task`.
# @public Since *Async v1*.
class TimeoutError < StandardError
Expand Down Expand Up @@ -271,7 +249,13 @@ def wait
# If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later.
#
# @parameter later [Boolean] Whether to stop the task later, or immediately.
def stop(later = false)
# @parameter cause [Exception] The cause of the stop operation.
def stop(later = false, cause: $!)
# If no cause is given, we generate one from the current call stack:
unless cause
cause = Stop::Cause.for("Stopping task!")
end

if self.stopped?
# If the task is already stopped, a `stop` state transition re-enters the same state which is a no-op. However, we will also attempt to stop any running children too. This can happen if the children did not stop correctly the first time around. Doing this should probably be considered a bug, but it's better to be safe than sorry.
return stopped!
Expand All @@ -285,27 +269,27 @@ def stop(later = false)
# If we are deferring stop...
if @defer_stop == false
# Don't stop now... but update the state so we know we need to stop later.
@defer_stop = true
@defer_stop = cause
return false
end

if self.current?
# If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`:
if later
# If the fiber is the current fiber and we want to stop it later, schedule it:
Fiber.scheduler.push(Stop::Later.new(self))
Fiber.scheduler.push(Stop::Later.new(self, cause))
else
# Otherwise, raise the exception directly:
raise Stop, "Stopping current task!"
raise Stop, "Stopping current task!", cause: cause
end
else
# If the fiber is not curent, we can raise the exception directly:
begin
# There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later.
Fiber.scheduler.raise(@fiber, Stop)
Fiber.scheduler.raise(@fiber, Stop, cause: cause)
rescue FiberError
# In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later:
Fiber.scheduler.push(Stop::Later.new(self))
Fiber.scheduler.push(Stop::Later.new(self, cause))
end
end
else
Expand Down Expand Up @@ -345,7 +329,7 @@ def defer_stop

# If we were asked to stop, we should do so now:
if defer_stop
raise Stop, "Stopping current task (was deferred)!"
raise Stop, "Stopping current task (was deferred)!", cause: defer_stop
end
end
else
Expand All @@ -356,7 +340,7 @@ def defer_stop

# @returns [Boolean] Whether stop has been deferred.
def stop_deferred?
@defer_stop
!!@defer_stop
end

# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.
Expand Down
44 changes: 43 additions & 1 deletion test/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,48 @@
expect(transient).to be(:running?)
end.wait
end

it "can stop a task from within with a cause" do
error = nil

cause = Async::Stop::Cause.for("boom")

task = reactor.async do |task|
begin
task.stop(cause: cause)
rescue Async::Stop => error
raise
end
end

reactor.run

expect(task).to be(:stopped?)
expect(error).to be_a(Async::Stop)
expect(error.cause).to be == cause
end

it "can stop a task from outside with a cause" do
error = nil

cause = RuntimeError.new("boom")

task = reactor.async do |task|
begin
task.yield
rescue Async::Stop => error
raise
end
end

task.stop(cause: cause)

reactor.run

expect(task).to be(:stopped?)
expect(error).to be_a(Async::Stop)
expect(error.cause).to be == cause
end
end

with "#sleep" do
Expand Down Expand Up @@ -923,7 +965,7 @@ def sleep_forever

reactor.run_once(0)

expect(child_task.stop_deferred?).to be == nil
expect(child_task.stop_deferred?).to be == false
end
end

Expand Down
Loading