Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions src/Internal/Workflow/Process/Scope.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Temporal\Internal\Transport\Request\Cancel;
use Temporal\Internal\Workflow\ScopeContext;
use Temporal\Internal\Workflow\WorkflowContext;
use Temporal\Worker\FeatureFlags;
use Temporal\Worker\LoopInterface;
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Workflow;
Expand Down Expand Up @@ -87,6 +88,8 @@ class Scope implements CancellationScopeInterface, Destroyable

private bool $detached = false;
private bool $cancelled = false;
private bool $closed = false;
private ?\Throwable $cancelReason = null;

public function __construct(
ServiceContainer $services,
Expand Down Expand Up @@ -191,7 +194,7 @@ public function attach(\Generator $generator): self

public function onCancel(callable $then): self
{
$this->onCancel[++$this->cancelID] = $then;
$this->addOnCancel($then);
return $this;
}

Expand All @@ -217,6 +220,7 @@ public function cancel(?\Throwable $reason = null): void
}

$this->cancelled = true;
$this->cancelReason = $reason;

foreach ($this->onCancel as $i => $handler) {
$this->makeCurrent();
Expand Down Expand Up @@ -280,12 +284,9 @@ public function always(callable $onFulfilledOrRejected): PromiseInterface
*/
public function onAwait(Deferred $deferred): void
{
$this->onCancel[++$this->cancelID] = static function (?\Throwable $e = null) use ($deferred): void {
$cancelID = $this->addOnCancel(static function (?\Throwable $e = null) use ($deferred): void {
$deferred->reject($e ?? new CanceledFailure(''));
};

$cancelID = $this->cancelID;

});

// do not cancel already complete promises
$cleanup = function () use ($cancelID): void {
Expand Down Expand Up @@ -329,8 +330,7 @@ protected function createScope(
$scope->layer = $layer;
}

$cancelID = ++$this->cancelID;
$this->onCancel[$cancelID] = $scope->cancel(...);
$cancelID = $this->addOnCancel($scope->cancel(...));

$scope->onClose(
function () use ($cancelID): void {
Expand Down Expand Up @@ -371,7 +371,7 @@ protected function callSignalOrUpdateHandler(callable $handler, ValuesInterface

protected function onRequest(RequestInterface $request, PromiseInterface $promise, bool $cancellable = true): void
{
$this->onCancel[++$this->cancelID] = function (?\Throwable $reason = null) use ($request, $cancellable): void {
$cancelID = $this->addOnCancel(function (?\Throwable $reason = null) use ($request, $cancellable): void {
Comment thread
xepozz marked this conversation as resolved.
$client = $this->context->getClient();
if ($reason instanceof DestructMemorizedInstanceException) {
// memory flush
Expand All @@ -390,9 +390,7 @@ protected function onRequest(RequestInterface $request, PromiseInterface $promis
}

$client->request(new Cancel($request->getID()), $this->scopeContext);
};

$cancelID = $this->cancelID;
}, $cancellable);

// do not cancel already complete promises
$cleanup = function () use ($cancelID): void {
Expand Down Expand Up @@ -460,10 +458,27 @@ protected function next(): void
}
}

private function addOnCancel(callable $handler, bool $cancellable = true): int
{
$id = ++$this->cancelID;

if (FeatureFlags::$propagateCancellationToNewScopes && $this->cancelled && $cancellable) {
$this->makeCurrent();
$handler($this->cancelReason);
return $id;
}

$this->onCancel[$id] = $handler;
return $id;
}

private function nextPromise(PromiseInterface $promise): void
{
if ($promise instanceof CancellationScopeInterface && $promise->isCancelled()) {
$this->handleError(new CanceledFailure(''));
$reason = FeatureFlags::$propagateCancellationToNewScopes && $promise instanceof self
? $promise->cancelReason
: null;
$this->handleError($reason ?? new CanceledFailure(''));
return;
}

Expand Down Expand Up @@ -525,6 +540,11 @@ private function handleError(\Throwable $e): void

private function onException(\Throwable $e): void
{
if ($this->closed) {
return;
}

$this->closed = true;
$this->deferred->reject($e);

$this->makeCurrent();
Expand All @@ -537,6 +557,11 @@ private function onException(\Throwable $e): void

private function onResult(mixed $result): void
{
if ($this->closed) {
return;
}

$this->closed = true;
$this->deferred->resolve($result);

$this->makeCurrent();
Expand Down
14 changes: 14 additions & 0 deletions src/Worker/FeatureFlags.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,18 @@ final class FeatureFlags
* @since SDK 2.17.0
*/
public static bool $warnOnActivityMethodWithoutAttribute = true;

/**
* Make scope cancellation sticky: a nested scope, an await or an onCancel handler registered
* after the surrounding scope was already cancelled is notified immediately instead of being
* missed. Set to TRUE to enable this behavior.
*
* When FALSE (default), the previous behavior is kept: cancel handlers registered after the
* cancellation are never invoked.
*
* @experimental
* @since SDK 2.18.0
* @link https://github.com/temporalio/sdk-php/issues/769
*/
public static bool $propagateCancellationToNewScopes = false;
}
1 change: 1 addition & 0 deletions tests/Acceptance/App/RuntimeBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public static function init(): void
FeatureFlags::$workflowDeferredHandlerStart = true;
FeatureFlags::$cancelAbandonedChildWorkflows = false;
FeatureFlags::$warnOnActivityMethodWithoutAttribute = true;
FeatureFlags::$propagateCancellationToNewScopes = true;
}

/**
Expand Down
Loading