diff --git a/src/Internal/Workflow/Process/Scope.php b/src/Internal/Workflow/Process/Scope.php index 7eeee5707..8981b3161 100644 --- a/src/Internal/Workflow/Process/Scope.php +++ b/src/Internal/Workflow/Process/Scope.php @@ -26,6 +26,7 @@ use Temporal\Internal\Transport\Request\Cancel; use Temporal\Internal\Workflow\ScopeContext; use Temporal\Internal\Workflow\WorkflowContext; +use Temporal\Worker\FeatureFlags; use Temporal\Worker\LoopInterface; use Temporal\Worker\Transport\Command\RequestInterface; use Temporal\Workflow; @@ -87,6 +88,8 @@ class Scope implements CancellationScopeInterface, Destroyable private bool $detached = false; private bool $cancelled = false; + private bool $closed = false; + private ?\Throwable $cancelReason = null; public function __construct( ServiceContainer $services, @@ -191,7 +194,7 @@ public function attach(\Generator $generator): self public function onCancel(callable $then): self { - $this->onCancel[++$this->cancelID] = $then; + $this->addOnCancel($then); return $this; } @@ -217,6 +220,7 @@ public function cancel(?\Throwable $reason = null): void } $this->cancelled = true; + $this->cancelReason = $reason; foreach ($this->onCancel as $i => $handler) { $this->makeCurrent(); @@ -280,12 +284,9 @@ public function always(callable $onFulfilledOrRejected): PromiseInterface */ public function onAwait(Deferred $deferred): void { - $this->onCancel[++$this->cancelID] = static function (?\Throwable $e = null) use ($deferred): void { + $cancelID = $this->addOnCancel(static function (?\Throwable $e = null) use ($deferred): void { $deferred->reject($e ?? new CanceledFailure('')); - }; - - $cancelID = $this->cancelID; - + }); // do not cancel already complete promises $cleanup = function () use ($cancelID): void { @@ -329,8 +330,7 @@ protected function createScope( $scope->layer = $layer; } - $cancelID = ++$this->cancelID; - $this->onCancel[$cancelID] = $scope->cancel(...); + $cancelID = $this->addOnCancel($scope->cancel(...)); $scope->onClose( function () use ($cancelID): void { @@ -371,7 +371,7 @@ protected function callSignalOrUpdateHandler(callable $handler, ValuesInterface protected function onRequest(RequestInterface $request, PromiseInterface $promise, bool $cancellable = true): void { - $this->onCancel[++$this->cancelID] = function (?\Throwable $reason = null) use ($request, $cancellable): void { + $cancelID = $this->addOnCancel(function (?\Throwable $reason = null) use ($request, $cancellable): void { $client = $this->context->getClient(); if ($reason instanceof DestructMemorizedInstanceException) { // memory flush @@ -390,9 +390,7 @@ protected function onRequest(RequestInterface $request, PromiseInterface $promis } $client->request(new Cancel($request->getID()), $this->scopeContext); - }; - - $cancelID = $this->cancelID; + }, $cancellable); // do not cancel already complete promises $cleanup = function () use ($cancelID): void { @@ -460,10 +458,27 @@ protected function next(): void } } + private function addOnCancel(callable $handler, bool $cancellable = true): int + { + $id = ++$this->cancelID; + + if (FeatureFlags::$propagateCancellationToNewScopes && $this->cancelled && $cancellable) { + $this->makeCurrent(); + $handler($this->cancelReason); + return $id; + } + + $this->onCancel[$id] = $handler; + return $id; + } + private function nextPromise(PromiseInterface $promise): void { if ($promise instanceof CancellationScopeInterface && $promise->isCancelled()) { - $this->handleError(new CanceledFailure('')); + $reason = FeatureFlags::$propagateCancellationToNewScopes && $promise instanceof self + ? $promise->cancelReason + : null; + $this->handleError($reason ?? new CanceledFailure('')); return; } @@ -525,6 +540,11 @@ private function handleError(\Throwable $e): void private function onException(\Throwable $e): void { + if ($this->closed) { + return; + } + + $this->closed = true; $this->deferred->reject($e); $this->makeCurrent(); @@ -537,6 +557,11 @@ private function onException(\Throwable $e): void private function onResult(mixed $result): void { + if ($this->closed) { + return; + } + + $this->closed = true; $this->deferred->resolve($result); $this->makeCurrent(); diff --git a/src/Worker/FeatureFlags.php b/src/Worker/FeatureFlags.php index d814149c8..66557df0b 100644 --- a/src/Worker/FeatureFlags.php +++ b/src/Worker/FeatureFlags.php @@ -64,4 +64,18 @@ final class FeatureFlags * @since SDK 2.17.0 */ public static bool $warnOnActivityMethodWithoutAttribute = true; + + /** + * Make scope cancellation sticky: a nested scope, an await or an onCancel handler registered + * after the surrounding scope was already cancelled is notified immediately instead of being + * missed. Set to TRUE to enable this behavior. + * + * When FALSE (default), the previous behavior is kept: cancel handlers registered after the + * cancellation are never invoked. + * + * @experimental + * @since SDK 2.18.0 + * @link https://github.com/temporalio/sdk-php/issues/769 + */ + public static bool $propagateCancellationToNewScopes = false; } diff --git a/tests/Acceptance/App/RuntimeBuilder.php b/tests/Acceptance/App/RuntimeBuilder.php index 64086f205..b7f502414 100644 --- a/tests/Acceptance/App/RuntimeBuilder.php +++ b/tests/Acceptance/App/RuntimeBuilder.php @@ -88,6 +88,7 @@ public static function init(): void FeatureFlags::$workflowDeferredHandlerStart = true; FeatureFlags::$cancelAbandonedChildWorkflows = false; FeatureFlags::$warnOnActivityMethodWithoutAttribute = true; + FeatureFlags::$propagateCancellationToNewScopes = true; } /** diff --git a/tests/Acceptance/Extra/Workflow/CancelPropagationTest.php b/tests/Acceptance/Extra/Workflow/CancelPropagationTest.php new file mode 100644 index 000000000..69668af0f --- /dev/null +++ b/tests/Acceptance/Extra/Workflow/CancelPropagationTest.php @@ -0,0 +1,275 @@ +cancel(); + + $log = $stub->getResult(timeout: 10); + + $this->assertSame( + [ + 'root cancelled', + 'nested inherited cancel', + 'await failed fast', + ], + $log, + ); + } + + #[Test] + public function scopeCancelledAtBirthRunsFinallyAndPropagatesOnce( + #[Stub('Extra_Workflow_CancelOnCloseOnce')] WorkflowStubInterface $stub, + ): void { + $stub->cancel(); + + $log = $stub->getResult(timeout: 10); + + $this->assertSame( + [ + 'root cancelled', + 'child cleanup', + 'child caught', + ], + $log, + ); + } + + #[Test] + public function onCancelHandlerAttachedAfterCancelFiresImmediately( + #[Stub('Extra_Workflow_CancelOnCancelHook')] WorkflowStubInterface $stub, + ): void { + $stub->cancel(); + + $log = $stub->getResult(timeout: 10); + + $this->assertSame( + [ + 'root cancelled', + 'oncancel fired', + ], + $log, + ); + } + + #[Test] + public function detachedScopeStartedAfterCancelDoesNotInheritCancel( + #[Stub('Extra_Workflow_CancelDetachedSurvives')] WorkflowStubInterface $stub, + ): void { + $stub->cancel(); + + $log = $stub->getResult(timeout: 10); + + $this->assertSame( + [ + 'root cancelled', + 'detached cancelled: false', + 'detached completed', + ], + $log, + ); + } + + /** + * Faithful replica of the reproduction attached to issue #769: + * a nested scope and an await registered after the scope was cancelled. + */ + #[Test] + public function issue769ReproductionReportsCancelledNestedScopeAndFailFastAwait( + #[Stub('Extra_Workflow_Issue769')] WorkflowStubInterface $stub, + ): void { + $stub->cancel(); + + $log = $stub->getResult(timeout: 10); + + $this->assertSame( + [ + 'start: true', + 'timer in nested scope: true', + 'await: true', + 'await threw: true', + ], + $log, + ); + } +} + +#[WorkflowInterface] +class TestWorkflow +{ + private array $log = []; + + #[WorkflowMethod(name: 'Extra_Workflow_CancelPropagation')] + public function handle() + { + try { + yield Workflow::await(static fn(): bool => false); + } catch (CanceledFailure) { + $this->log[] = 'root cancelled'; + } + + try { + yield (function () { + yield Workflow::timer(1); + })(); + $this->log[] = 'nested timer completed'; + } catch (CanceledFailure) { + $this->log[] = 'nested inherited cancel'; + } + + try { + yield Workflow::await(static fn(): bool => false); + $this->log[] = 'await returned'; + } catch (CanceledFailure) { + $this->log[] = 'await failed fast'; + } + + return $this->log; + } +} + +#[WorkflowInterface] +class CleanupOnceWorkflow +{ + private array $log = []; + + #[WorkflowMethod(name: 'Extra_Workflow_CancelOnCloseOnce')] + public function handle() + { + try { + yield Workflow::await(static fn(): bool => false); + } catch (CanceledFailure) { + $this->log[] = 'root cancelled'; + } + + try { + yield (function () { + try { + yield Workflow::timer(1); + $this->log[] = 'child timer done'; + } finally { + $this->log[] = 'child cleanup'; + } + })(); + } catch (CanceledFailure) { + $this->log[] = 'child caught'; + } + + return $this->log; + } +} + +#[WorkflowInterface] +class CancelOnCancelHookWorkflow +{ + private array $log = []; + + #[WorkflowMethod(name: 'Extra_Workflow_CancelOnCancelHook')] + public function start() + { + try { + yield Workflow::await(static fn(): bool => false); + } catch (CanceledFailure) { + $this->log[] = 'root cancelled'; + } + + Workflow::async(function () { + yield Workflow::timer(1); + })->onCancel(function (): void { + $this->log[] = 'oncancel fired'; + }); + + return $this->log; + } +} + +#[WorkflowInterface] +class DetachedSurvivesCancelWorkflow +{ + private array $log = []; + + #[WorkflowMethod(name: 'Extra_Workflow_CancelDetachedSurvives')] + public function start() + { + try { + yield Workflow::await(static fn(): bool => false); + } catch (CanceledFailure) { + $this->log[] = 'root cancelled'; + } + + $detached = Workflow::asyncDetached(function () { + yield Workflow::timer(1); + return 'detached completed'; + }); + + $this->log[] = 'detached cancelled: ' . ($detached->isCancelled() ? 'true' : 'false'); + $this->log[] = yield $detached; + + return $this->log; + } +} + +#[WorkflowInterface] +class Issue769Workflow +{ + private array $log = []; + + #[WorkflowMethod(name: 'Extra_Workflow_Issue769')] + public function start() + { + try { + yield Workflow::await(static fn(): bool => false); + } catch (CanceledFailure) { + } + + $this->record('start'); + + try { + yield $this->doSomething(); + } catch (CanceledFailure) { + } + + $this->record('await'); + + $awaitThrew = false; + try { + yield Workflow::await(static fn(): bool => false); + } catch (CanceledFailure) { + $awaitThrew = true; + } + $this->log[] = 'await threw: ' . ($awaitThrew ? 'true' : 'false'); + + return $this->log; + } + + private function doSomething(): \Generator + { + $this->record('timer in nested scope'); + yield Workflow::timer(1); + } + + private function record(string $location): void + { + $context = Workflow::getCurrentContext(); + $isCancelled = (new \ReflectionProperty($context::class, 'scope')) + ->getValue($context) + ->isCancelled(); + $this->log[] = $location . ': ' . ($isCancelled ? 'true' : 'false'); + } +} diff --git a/tests/Acceptance/Extra/Workflow/MutexRunLockedTest.php b/tests/Acceptance/Extra/Workflow/MutexRunLockedTest.php index 576259480..62a92eced 100644 --- a/tests/Acceptance/Extra/Workflow/MutexRunLockedTest.php +++ b/tests/Acceptance/Extra/Workflow/MutexRunLockedTest.php @@ -30,6 +30,7 @@ public function runLockedWithGeneratorAndAwait( $this->assertTrue($result[1], 'The function inside runLocked mist wait for signal'); $this->assertTrue($result[2], 'Mutex must be locked during runLocked'); $this->assertNull($result[3], 'No exception must be thrown'); + $this->assertFalse($result[4], 'The trailed runLocked must not run: the permanent lock is held'); } #[Test] @@ -44,6 +45,7 @@ public function runLockedAndCancel( $this->assertTrue($result[0], 'Mutex must be unlocked after runLocked is cancelled'); $this->assertNull($result[2], 'Mutex must be locked during runLocked'); $this->assertSame(CanceledFailure::class, $result[3], 'CanceledFailure must be thrown'); + $this->assertTrue($result[4], 'Cancelling the outer runLocked releases the inner permanent lock, so the trailed runLocked runs'); } } @@ -82,11 +84,7 @@ public function handle(): \Generator }), ); - // The last runLocked must not be executed because there a permanent lock - // that was created inside the first runLocked - $trailed and throw new \Exception('The trailed runLocked must not be executed.'); - - return [$this->unlocked, $this->unblock, $result, $exception]; + return [$this->unlocked, $this->unblock, $result, $exception, $trailed]; } #[Workflow\SignalMethod] diff --git a/tests/Unit/Internal/Workflow/Process/ScopeCancellationFlagTestCase.php b/tests/Unit/Internal/Workflow/Process/ScopeCancellationFlagTestCase.php new file mode 100644 index 000000000..f35a19c76 --- /dev/null +++ b/tests/Unit/Internal/Workflow/Process/ScopeCancellationFlagTestCase.php @@ -0,0 +1,80 @@ +flagBackup = FeatureFlags::$propagateCancellationToNewScopes; + } + + protected function tearDown(): void + { + FeatureFlags::$propagateCancellationToNewScopes = $this->flagBackup; + } + + #[Test] + public function onCancelHandlerRegisteredAfterCancelIsMissedWhenFlagDisabled(): void + { + FeatureFlags::$propagateCancellationToNewScopes = false; + + $scope = new CancelProbeScope( + $this->createMock(WorkflowContext::class), + $this->createMock(ScopeContext::class), + ); + $scope->cancel(); + + $fired = false; + $scope->onCancel(static function () use (&$fired): void { + $fired = true; + }); + + self::assertFalse($fired, 'With the flag disabled the late onCancel handler must not fire'); + } + + #[Test] + public function onCancelHandlerRegisteredAfterCancelFiresImmediatelyWhenFlagEnabled(): void + { + FeatureFlags::$propagateCancellationToNewScopes = true; + + $reason = new \RuntimeException('stop'); + $scope = new CancelProbeScope( + $this->createMock(WorkflowContext::class), + $this->createMock(ScopeContext::class), + ); + $scope->cancel($reason); + + $received = null; + $scope->onCancel(static function (?\Throwable $e = null) use (&$received): void { + $received = $e; + }); + + self::assertSame($reason, $received, 'With the flag enabled the late onCancel handler must fire with the cancel reason'); + } +} + +final class CancelProbeScope extends Scope +{ + public function __construct(WorkflowContext $context, ScopeContext $scopeContext) + { + $this->context = $context; + $this->scopeContext = $scopeContext; + } + + protected function makeCurrent(): void + { + // no-op: avoid the global Workflow context facade in isolation + } +} diff --git a/tests/Unit/Internal/Workflow/Process/ScopeOnRequestCancelTestCase.php b/tests/Unit/Internal/Workflow/Process/ScopeOnRequestCancelTestCase.php new file mode 100644 index 000000000..e71963306 --- /dev/null +++ b/tests/Unit/Internal/Workflow/Process/ScopeOnRequestCancelTestCase.php @@ -0,0 +1,62 @@ +cancel() on the just-queued + * command and silently strip the completion, leaving the workflow unable to finish. + */ +final class ScopeOnRequestCancelTestCase extends TestCase +{ + #[Test] + public function nonCancellableRequestInCancelledScopeKeepsQueuedCommand(): void + { + $client = $this->createMock(ClientInterface::class); + $client->method('isQueued')->willReturn(true); + $client->expects($this->never())->method('cancel'); + $client->expects($this->never())->method('request'); + + $context = $this->createMock(WorkflowContext::class); + $context->method('getClient')->willReturn($client); + + $request = $this->createMock(RequestInterface::class); + $request->method('getID')->willReturn(42); + + $scope = new OnRequestProbeScope($context, $this->createMock(ScopeContext::class)); + $scope->cancel(); + + $scope->callOnRequest($request, $this->createMock(PromiseInterface::class), cancellable: false); + } +} + +final class OnRequestProbeScope extends Scope +{ + public function __construct(WorkflowContext $context, ScopeContext $scopeContext) + { + $this->context = $context; + $this->scopeContext = $scopeContext; + } + + public function callOnRequest(RequestInterface $request, PromiseInterface $promise, bool $cancellable = true): void + { + $this->onRequest($request, $promise, $cancellable); + } + + protected function makeCurrent(): void + { + // no-op: avoid the global Workflow context facade in isolation + } +}