From 32a048d60f67ddda7f44275618307ad92eaaaba3 Mon Sep 17 00:00:00 2001 From: Dmitriy Derepko Date: Tue, 7 Apr 2026 22:02:39 +0400 Subject: [PATCH 1/3] feat: add automatic retry mechanism for schedule updates with conflict token retries --- src/Client/Schedule/ScheduleHandle.php | 124 +++++++++-------- .../Unit/Schedule/ScheduleHandleTestCase.php | 125 +++++++++++++++++- 2 files changed, 192 insertions(+), 57 deletions(-) diff --git a/src/Client/Schedule/ScheduleHandle.php b/src/Client/Schedule/ScheduleHandle.php index 6f9151b58..34616023c 100644 --- a/src/Client/Schedule/ScheduleHandle.php +++ b/src/Client/Schedule/ScheduleHandle.php @@ -17,12 +17,14 @@ use Temporal\Client\ClientOptions; use Temporal\Client\Common\ClientContextTrait; use Temporal\Client\GRPC\ServiceClientInterface; +use Temporal\Client\GRPC\StatusCode; use Temporal\Client\Schedule\Info\ScheduleDescription; use Temporal\Client\Schedule\Policy\ScheduleOverlapPolicy; use Temporal\Client\Schedule\Update\ScheduleUpdate; use Temporal\Client\Schedule\Update\ScheduleUpdateInput; use Temporal\Common\Uuid; use Temporal\DataConverter\DataConverterInterface; +use Temporal\Exception\Client\ServiceClientException; use Temporal\Exception\InvalidArgumentException; use Temporal\Internal\Mapper\ScheduleMapper; use Temporal\Internal\Marshaller\MarshallerInterface; @@ -32,6 +34,11 @@ final class ScheduleHandle { use ClientContextTrait; + /** + * Maximum number of retry attempts for conflict token mismatches when using a closure updater. + */ + private const CONFLICT_TOKEN_MAX_RETRIES = 10; + public function __construct( ServiceClientInterface $client, private readonly ClientOptions $clientOptions, @@ -55,32 +62,9 @@ public function getID(): string /** * Update the Schedule. * - * Examples: - * - * Add a search attribute to the schedule: - * ``` - * $handle->update(function (ScheduleUpdateInput $input): ScheduleUpdate { - * return ScheduleUpdate::new($input->description->schedule) - * ->withSearchAttributes($input->description->searchAttributes - * ->withValue('foo', 'bar'), - * ->withValue('bar', 42), - * ); - * }); - * ``` - * - * Pause a described schedule: - * ``` - * $description = $handle->describe(); - * $schedule = $description->schedule; - * $handle->update( - * $schedule - * ->withState($schedule->state->withPaused(true)), - * $description->conflictToken, - * ); - * ``` - * - * NOTE: If two Update calls are made in parallel to the same Schedule there is the potential - * for a race condition. Use $conflictToken to avoid this. + * NOTE: When using a closure, the update will automatically retry (up to 10 times) if a conflict + * token mismatch occurs, re-fetching the schedule description each time. When passing a Schedule + * directly, no retry is performed and the caller is responsible for handling conflicts. * * @param Schedule|\Closure(ScheduleUpdateInput): ScheduleUpdate $schedule The new Schedule to update to or * a closure that will be passed the current ScheduleDescription and should return a ScheduleUpdate. @@ -93,37 +77,12 @@ public function update( Schedule|\Closure $schedule, ?string $conflictToken = null, ): void { - $request = (new UpdateScheduleRequest()) - ->setScheduleId($this->id) - ->setNamespace($this->namespace) - ->setConflictToken((string) $conflictToken) - ->setIdentity($this->clientOptions->identity) - ->setRequestId(Uuid::v4()); - if ($schedule instanceof \Closure) { - $description = $this->describe(); - $update = $schedule(new ScheduleUpdateInput($description)); - $update instanceof ScheduleUpdate or throw new InvalidArgumentException( - 'Closure for the schedule update method must return a ScheduleUpdate.', - ); - - $schedule = $update->schedule; - - // Search attributes - if ($update->searchAttributes !== null) { - $update->searchAttributes->setDataConverter($this->converter); - $payloads = $update->searchAttributes->toPayloadArray(); - $encodedSa = (new SearchAttributes())->setIndexedFields($payloads); - $request->setSearchAttributes($encodedSa); - } + $this->updateWithClosure($schedule); + return; } - $mapper = new ScheduleMapper($this->converter, $this->marshaller); - $scheduleMessage = $mapper->toMessage($schedule); - $request->setSchedule($scheduleMessage); - - - $this->client->UpdateSchedule($request); + $this->doUpdate($schedule, $conflictToken); } /** @@ -250,6 +209,63 @@ public function delete(): void $this->client->DeleteSchedule($request); } + private function updateWithClosure(\Closure $updater): void + { + for ($attempt = 0; $attempt < self::CONFLICT_TOKEN_MAX_RETRIES; $attempt++) { + $description = $this->describe(); + $update = $updater(new ScheduleUpdateInput($description)); + $update instanceof ScheduleUpdate or throw new InvalidArgumentException( + 'Closure for the schedule update method must return a ScheduleUpdate.', + ); + + try { + $this->doUpdate($update->schedule, $description->conflictToken, $update); + return; + } catch (ServiceClientException $e) { + if ($e->getCode() !== StatusCode::FAILED_PRECONDITION + || !\str_contains($e->getMessage(), 'conflict token') + ) { + throw $e; + } + + // Conflict token mismatch — retry with a fresh describe + } + } + + throw new ServiceClientException((object) [ + 'code' => StatusCode::FAILED_PRECONDITION, + 'details' => \sprintf( + 'Schedule update conflict token mismatch after %d retries', + self::CONFLICT_TOKEN_MAX_RETRIES, + ), + 'metadata' => [], + ]); + } + + private function doUpdate(Schedule $schedule, ?string $conflictToken, ?ScheduleUpdate $update = null): void + { + $request = (new UpdateScheduleRequest()) + ->setScheduleId($this->id) + ->setNamespace($this->namespace) + ->setConflictToken((string) $conflictToken) + ->setIdentity($this->clientOptions->identity) + ->setRequestId(Uuid::v4()); + + // Search attributes from closure-based update + if ($update?->searchAttributes !== null) { + $update->searchAttributes->setDataConverter($this->converter); + $payloads = $update->searchAttributes->toPayloadArray(); + $encodedSa = (new SearchAttributes())->setIndexedFields($payloads); + $request->setSearchAttributes($encodedSa); + } + + $mapper = new ScheduleMapper($this->converter, $this->marshaller); + $scheduleMessage = $mapper->toMessage($schedule); + $request->setSchedule($scheduleMessage); + + $this->client->UpdateSchedule($request); + } + private function patch(SchedulePatch $patch): PatchScheduleRequest { return (new PatchScheduleRequest()) diff --git a/tests/Unit/Schedule/ScheduleHandleTestCase.php b/tests/Unit/Schedule/ScheduleHandleTestCase.php index b81de5baf..fa7256871 100644 --- a/tests/Unit/Schedule/ScheduleHandleTestCase.php +++ b/tests/Unit/Schedule/ScheduleHandleTestCase.php @@ -19,6 +19,7 @@ use Temporal\Api\Workflowservice\V1\UpdateScheduleResponse; use Temporal\Client\ClientOptions; use Temporal\Client\GRPC\ServiceClientInterface; +use Temporal\Client\GRPC\StatusCode; use Temporal\Client\Schedule\BackfillPeriod; use PHPUnit\Framework\TestCase; use Temporal\Client\Schedule\Policy\ScheduleOverlapPolicy; @@ -29,6 +30,7 @@ use Temporal\DataConverter\DataConverter; use Temporal\DataConverter\DataConverterInterface; use Temporal\DataConverter\EncodedCollection; +use Temporal\Exception\Client\ServiceClientException; use Temporal\Exception\InvalidArgumentException; use Temporal\Internal\Marshaller\Mapper\AttributeMapperFactory; use Temporal\Internal\Marshaller\Marshaller; @@ -238,7 +240,7 @@ public function testUpdateUsingClosureDefaults(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->once()) ->method('DescribeSchedule') - ->willReturn((new DescribeScheduleResponse())); + ->willReturn((new DescribeScheduleResponse())->setConflictToken('describe-conflict-token')); $clientMock->expects($this->once()) ->method('UpdateSchedule') ->with($this->callback(fn (UpdateScheduleRequest $request) => $testContext->request = $request or true)) @@ -250,13 +252,14 @@ public function testUpdateUsingClosureDefaults(): void $sa = EncodedCollection::fromValues(['foo' => 'bar']); return ScheduleUpdate::new($schedule) ->withSearchAttributes($sa); - }, 'test-conflict-token'); + }); $this->assertTrue(isset($testContext->request)); $this->assertSame('default', $testContext->request->getNamespace()); $this->assertSame('test-id', $testContext->request->getScheduleId()); $this->assertSame('test-identity', $testContext->request->getIdentity()); - $this->assertSame('test-conflict-token', $testContext->request->getConflictToken()); + // Conflict token should come from describe(), not from the argument + $this->assertSame('describe-conflict-token', $testContext->request->getConflictToken()); $this->assertNotNull($testContext->request->getRequestId()); $this->assertNotNull($testContext->request->getSchedule()); // Search attributes @@ -267,6 +270,122 @@ public function testUpdateUsingClosureDefaults(): void $this->assertSame(['foo' => 'bar'], $sa); } + public function testUpdateWithClosureRetriesOnConflictTokenMismatch(): void + { + $testContext = new class { + public int $describeCount = 0; + public int $updateCount = 0; + public UpdateScheduleRequest $request; + }; + + $clientMock = $this->createMock(ServiceClientInterface::class); + $clientMock->expects($this->exactly(3)) + ->method('DescribeSchedule') + ->willReturnCallback(function () use ($testContext) { + $testContext->describeCount++; + return (new DescribeScheduleResponse()) + ->setConflictToken('token-' . $testContext->describeCount); + }); + $clientMock->expects($this->exactly(3)) + ->method('UpdateSchedule') + ->willReturnCallback(function (UpdateScheduleRequest $request) use ($testContext) { + $testContext->updateCount++; + $testContext->request = $request; + // Fail first 2 attempts with conflict token mismatch + if ($testContext->updateCount < 3) { + throw new ServiceClientException((object) [ + 'code' => StatusCode::FAILED_PRECONDITION, + 'details' => 'mismatched conflict token', + 'metadata' => [], + ]); + } + return new UpdateScheduleResponse(); + }); + + $scheduleHandle = $this->createScheduleHandle(client: $clientMock); + + $closureCallCount = 0; + $scheduleHandle->update(function (ScheduleUpdateInput $input) use (&$closureCallCount): ScheduleUpdate { + $closureCallCount++; + return ScheduleUpdate::new(Schedule::new()); + }); + + $this->assertSame(3, $testContext->describeCount); + $this->assertSame(3, $testContext->updateCount); + $this->assertSame(3, $closureCallCount); + // Last request should have the token from the 3rd describe + $this->assertSame('token-3', $testContext->request->getConflictToken()); + } + + public function testUpdateWithClosureThrowsNonConflictError(): void + { + $clientMock = $this->createMock(ServiceClientInterface::class); + $clientMock->expects($this->once()) + ->method('DescribeSchedule') + ->willReturn((new DescribeScheduleResponse())->setConflictToken('token')); + $clientMock->expects($this->once()) + ->method('UpdateSchedule') + ->willThrowException(new ServiceClientException((object) [ + 'code' => StatusCode::NOT_FOUND, + 'details' => 'schedule not found', + 'metadata' => [], + ])); + + $scheduleHandle = $this->createScheduleHandle(client: $clientMock); + + $this->expectException(ServiceClientException::class); + $this->expectExceptionCode(StatusCode::NOT_FOUND); + + $scheduleHandle->update(function (ScheduleUpdateInput $input): ScheduleUpdate { + return ScheduleUpdate::new(Schedule::new()); + }); + } + + public function testUpdateWithClosureExhaustsRetries(): void + { + $clientMock = $this->createMock(ServiceClientInterface::class); + $clientMock->expects($this->exactly(10)) + ->method('DescribeSchedule') + ->willReturn((new DescribeScheduleResponse())->setConflictToken('stale-token')); + $clientMock->expects($this->exactly(10)) + ->method('UpdateSchedule') + ->willThrowException(new ServiceClientException((object) [ + 'code' => StatusCode::FAILED_PRECONDITION, + 'details' => 'mismatched conflict token', + 'metadata' => [], + ])); + + $scheduleHandle = $this->createScheduleHandle(client: $clientMock); + + $this->expectException(ServiceClientException::class); + $this->expectExceptionCode(StatusCode::FAILED_PRECONDITION); + + $scheduleHandle->update(function (ScheduleUpdateInput $input): ScheduleUpdate { + return ScheduleUpdate::new(Schedule::new()); + }); + } + + public function testUpdateWithDirectScheduleDoesNotRetryOnConflict(): void + { + $clientMock = $this->createMock(ServiceClientInterface::class); + $clientMock->expects($this->never()) + ->method('DescribeSchedule'); + $clientMock->expects($this->once()) + ->method('UpdateSchedule') + ->willThrowException(new ServiceClientException((object) [ + 'code' => StatusCode::FAILED_PRECONDITION, + 'details' => 'mismatched conflict token', + 'metadata' => [], + ])); + + $scheduleHandle = $this->createScheduleHandle(client: $clientMock); + + $this->expectException(ServiceClientException::class); + $this->expectExceptionCode(StatusCode::FAILED_PRECONDITION); + + $scheduleHandle->update(Schedule::new(), 'old-token'); + } + public function testDescribe(): void { $testContext = new class { From 64cf224c4acc53c1b8fe29bc0768965380cb7973 Mon Sep 17 00:00:00 2001 From: Dmitriy Derepko Date: Sat, 11 Apr 2026 20:30:09 +0400 Subject: [PATCH 2/3] feat: add CHASM scheduler tests and update dependencies for Temporal --- dload.xml | 4 +- .../App/Runtime/TemporalStarter.php | 2 + .../Schedule/ScheduleConflictTokenTest.php | 242 ++++++++++++++++++ .../Unit/Schedule/ScheduleHandleTestCase.php | 2 +- 4 files changed, 247 insertions(+), 3 deletions(-) create mode 100644 tests/Acceptance/Extra/Schedule/ScheduleConflictTokenTest.php diff --git a/dload.xml b/dload.xml index 0b70f0a64..752f25294 100644 --- a/dload.xml +++ b/dload.xml @@ -4,8 +4,8 @@ temp-dir="./runtime" > - - + + diff --git a/tests/Acceptance/App/Runtime/TemporalStarter.php b/tests/Acceptance/App/Runtime/TemporalStarter.php index 4c52c34a2..894751fe2 100644 --- a/tests/Acceptance/App/Runtime/TemporalStarter.php +++ b/tests/Acceptance/App/Runtime/TemporalStarter.php @@ -32,6 +32,8 @@ public function start(): void '--dynamic-config-value', 'frontend.activityAPIsEnabled=true', '--dynamic-config-value', 'frontend.workerVersioningWorkflowAPIs=true', '--dynamic-config-value', 'system.enableDeploymentVersions=true', + '--dynamic-config-value', 'history.enableChasm=true', + '--dynamic-config-value', 'history.enableCHASMSchedulerCreation=true', ], searchAttributes: [ 'foo' => ValueType::Text->value, diff --git a/tests/Acceptance/Extra/Schedule/ScheduleConflictTokenTest.php b/tests/Acceptance/Extra/Schedule/ScheduleConflictTokenTest.php new file mode 100644 index 000000000..908dce55c --- /dev/null +++ b/tests/Acceptance/Extra/Schedule/ScheduleConflictTokenTest.php @@ -0,0 +1,242 @@ +createSchedule( + Schedule::new() + ->withAction(StartWorkflowAction::new('TestWorkflow')) + ->withSpec(ScheduleSpec::new()->withStartTime('+1 hour')), + ScheduleOptions::new()->withMemo(['key' => 'initial']), + ); + + try { + $description = $handle->describe(); + $token = $description->conflictToken; + self::assertNotSame('', $token); + + // Update using the fresh conflict token + $handle->update( + $description->schedule->withAction( + $description->schedule->action->withMemo(['key' => 'updated']), + ), + $token, + ); + + $actual = $handle->describe()->schedule->action; + self::assertInstanceOf(StartWorkflowAction::class, $actual); + self::assertSame('updated', $actual->memo->getValue('key')); + } finally { + $handle->delete(); + } + } + + #[Test] + public function updateWithStaleConflictTokenIsRejected( + ScheduleClientInterface $client, + ): void { + $handle = $client->createSchedule( + Schedule::new() + ->withAction(StartWorkflowAction::new('TestWorkflow')) + ->withSpec(ScheduleSpec::new()->withStartTime('+1 hour')), + ScheduleOptions::new(), + ); + + try { + // Capture the initial token — will become stale after the next update. + $staleToken = $handle->describe()->conflictToken; + self::assertNotSame('', $staleToken); + + // First update (no token): applies and advances the server-side token. + $handle->update( + $handle->describe()->schedule->withAction( + StartWorkflowAction::new('TestWorkflow')->withMemo(['step' => 'first']), + ), + ); + \sleep(1); + + // Second update with the stale token: CHASM scheduler rejects with FAILED_PRECONDITION. + $exception = null; + try { + $handle->update( + $handle->describe()->schedule->withAction( + StartWorkflowAction::new('TestWorkflow')->withMemo(['step' => 'second']), + ), + $staleToken, + ); + } catch (ServiceClientException $e) { + $exception = $e; + } + + self::assertNotNull($exception, 'Expected stale conflict token to be rejected'); + self::assertSame(StatusCode::FAILED_PRECONDITION, $exception->getCode()); + self::assertStringContainsString('conflict token', $exception->getMessage()); + + // Sanity check: the schedule still reflects the first update, not the second + $current = $handle->describe()->schedule->action; + self::assertInstanceOf(StartWorkflowAction::class, $current); + self::assertSame('first', $current->memo->getValue('step')); + } finally { + $handle->delete(); + } + } + + #[Test] + public function updateWithEmptyTokenAlwaysApplies( + ScheduleClientInterface $client, + ): void { + $handle = $client->createSchedule( + Schedule::new() + ->withAction(StartWorkflowAction::new('TestWorkflow')) + ->withSpec(ScheduleSpec::new()->withStartTime('+1 hour')), + ScheduleOptions::new(), + ); + + try { + // Two back-to-back updates without a token. Both must apply — passing no token + // bypasses the optimistic check entirely. + $handle->update( + $handle->describe()->schedule->withAction( + StartWorkflowAction::new('TestWorkflow')->withMemo(['n' => '1']), + ), + ); + \sleep(1); + + $handle->update( + $handle->describe()->schedule->withAction( + StartWorkflowAction::new('TestWorkflow')->withMemo(['n' => '2']), + ), + ); + \sleep(1); + + $current = $handle->describe()->schedule->action; + self::assertInstanceOf(StartWorkflowAction::class, $current); + self::assertSame('2', $current->memo->getValue('n')); + } finally { + $handle->delete(); + } + } + + #[Test] + public function conflictTokenChangesAfterEachUpdate( + ScheduleClientInterface $client, + ): void { + $handle = $client->createSchedule( + Schedule::new() + ->withAction(StartWorkflowAction::new('TestWorkflow')) + ->withSpec(ScheduleSpec::new()->withStartTime('+1 hour')), + ScheduleOptions::new(), + ); + + try { + $tokenBefore = $handle->describe()->conflictToken; + + $handle->update(function (ScheduleUpdateInput $input): ScheduleUpdate { + return ScheduleUpdate::new( + $input->description->schedule->withAction( + StartWorkflowAction::new('TestWorkflow')->withMemo(['v' => '1']), + ), + ); + }); + + $tokenAfter = $handle->describe()->conflictToken; + + self::assertNotSame('', $tokenBefore); + self::assertNotSame('', $tokenAfter); + self::assertNotSame( + $tokenBefore, + $tokenAfter, + 'Conflict token must change after a successful update', + ); + } finally { + $handle->delete(); + } + } + + #[Test] + public function closureUpdateUsesFreshTokenFromEachDescribe( + ScheduleClientInterface $client, + ): void { + $handle = $client->createSchedule( + Schedule::new() + ->withAction(StartWorkflowAction::new('TestWorkflow')) + ->withSpec(ScheduleSpec::new()->withStartTime('+1 hour')), + ScheduleOptions::new(), + ); + + // Second handle to the same schedule acts as a concurrent writer. + $concurrent = $client->getHandle($handle->getID()); + + try { + // Advance the server token out-of-band. The closure-based update below must + // still apply successfully: updateWithClosure() fetches a fresh description + // (and therefore a fresh token) on every invocation, so the token it sends + // always matches the current server state. + $concurrent->update( + $concurrent->describe()->schedule->withAction( + StartWorkflowAction::new('TestWorkflow')->withMemo(['by' => 'concurrent']), + ), + ); + \sleep(1); + + $closureCalls = 0; + $handle->update(function (ScheduleUpdateInput $input) use (&$closureCalls): ScheduleUpdate { + ++$closureCalls; + return ScheduleUpdate::new( + $input->description->schedule->withAction( + StartWorkflowAction::new('TestWorkflow')->withMemo(['by' => 'closure']), + ), + ); + }); + \sleep(1); + + self::assertSame( + 1, + $closureCalls, + 'Closure should run once when there is no concurrent update happening during the call', + ); + + $final = $handle->describe()->schedule->action; + self::assertInstanceOf(StartWorkflowAction::class, $final); + self::assertSame( + 'closure', + $final->memo->getValue('by'), + 'Closure update should overwrite the earlier concurrent update', + ); + } finally { + $handle->delete(); + } + } +} diff --git a/tests/Unit/Schedule/ScheduleHandleTestCase.php b/tests/Unit/Schedule/ScheduleHandleTestCase.php index fa7256871..a507ee83c 100644 --- a/tests/Unit/Schedule/ScheduleHandleTestCase.php +++ b/tests/Unit/Schedule/ScheduleHandleTestCase.php @@ -252,7 +252,7 @@ public function testUpdateUsingClosureDefaults(): void $sa = EncodedCollection::fromValues(['foo' => 'bar']); return ScheduleUpdate::new($schedule) ->withSearchAttributes($sa); - }); + }, 'argument-conflict-token'); $this->assertTrue(isset($testContext->request)); $this->assertSame('default', $testContext->request->getNamespace()); From c2e66ceed17efb6a8fd1e1051c51d6ddca8b191d Mon Sep 17 00:00:00 2001 From: Dmitriy Derepko Date: Sat, 11 Apr 2026 21:28:27 +0400 Subject: [PATCH 3/3] feat: enhance schedule update handling with conflict token logic and refine tests --- src/Client/Schedule/ScheduleHandle.php | 85 ++++++++++++++++--- .../Unit/Schedule/ScheduleHandleTestCase.php | 58 ++++++++----- 2 files changed, 113 insertions(+), 30 deletions(-) diff --git a/src/Client/Schedule/ScheduleHandle.php b/src/Client/Schedule/ScheduleHandle.php index 34616023c..4d9af9718 100644 --- a/src/Client/Schedule/ScheduleHandle.php +++ b/src/Client/Schedule/ScheduleHandle.php @@ -39,6 +39,16 @@ final class ScheduleHandle */ private const CONFLICT_TOKEN_MAX_RETRIES = 10; + /** + * Substring matched against a {@see ServiceClientException} message to detect a conflict + * token mismatch returned by the CHASM scheduler. The full server error is + * `serviceerror.NewFailedPrecondition("mismatched conflict token")` — see + * `chasm/lib/scheduler/scheduler.go` (`ErrConflictTokenMismatch`) in temporalio/temporal. + * The legacy V1 signal-based scheduler silently drops the update instead and does not + * produce this error at all. + */ + private const CONFLICT_TOKEN_ERROR_MARKER = 'conflict token'; + public function __construct( ServiceClientInterface $client, private readonly ClientOptions $clientOptions, @@ -62,22 +72,77 @@ public function getID(): string /** * Update the Schedule. * - * NOTE: When using a closure, the update will automatically retry (up to 10 times) if a conflict - * token mismatch occurs, re-fetching the schedule description each time. When passing a Schedule - * directly, no retry is performed and the caller is responsible for handling conflicts. + * There are two forms: + * + * - **Closure form** — the closure receives a {@see ScheduleUpdateInput} carrying the current + * {@see ScheduleDescription} and must return a {@see ScheduleUpdate}. The SDK automatically + * fetches a fresh description on every attempt and uses its conflict token, so concurrent + * updates from other clients are retried transparently up to {@see self::CONFLICT_TOKEN_MAX_RETRIES} + * times. If all retries are exhausted, a {@see ServiceClientException} with + * {@see StatusCode::FAILED_PRECONDITION} is raised. + * + * - **Direct form** — a pre-built {@see Schedule} is sent as-is. The optional `$conflictToken` + * argument is the opaque value from {@see ScheduleDescription::$conflictToken}; if supplied, + * the server rejects the update when the schedule has been modified since the describe that + * produced the token. No retry is performed; the caller handles conflicts. + * + * **IMPORTANT:** The closure may be invoked multiple times (once per retry), so it MUST be + * idempotent and free of side effects outside of returning the {@see ScheduleUpdate}. + * Do not increment counters, log business events, or mutate external state from inside it. * - * @param Schedule|\Closure(ScheduleUpdateInput): ScheduleUpdate $schedule The new Schedule to update to or - * a closure that will be passed the current ScheduleDescription and should return a ScheduleUpdate. - * @param string|null $conflictToken Can be the value of {@see ScheduleDescription::$conflictToken}, - * which will cause this request to fail if the schedule has been modified - * between the {@see self::describe()} and this Update. - * If missing, the schedule will be updated unconditionally. + * Examples: + * + * Add a search attribute using the closure form (auto-retries on conflict): + * ``` + * $handle->update(function (ScheduleUpdateInput $input): ScheduleUpdate { + * return ScheduleUpdate::new($input->description->schedule) + * ->withSearchAttributes( + * $input->description->searchAttributes + * ->withValue('foo', 'bar') + * ->withValue('bar', 42), + * ); + * }); + * ``` + * + * Pause a described schedule with an explicit conflict token (no retry): + * ``` + * $description = $handle->describe(); + * $schedule = $description->schedule; + * $handle->update( + * $schedule->withState($schedule->state->withPaused(true)), + * $description->conflictToken, + * ); + * ``` + * + * @param Schedule|\Closure(ScheduleUpdateInput): ScheduleUpdate $schedule The new Schedule to + * update to, or an idempotent closure that will be called with the current + * {@see ScheduleUpdateInput} and must return a {@see ScheduleUpdate}. + * @param string|null $conflictToken Only valid with the direct form. Can be the value of + * {@see ScheduleDescription::$conflictToken}, causing the request to fail if the + * schedule has been modified between the {@see self::describe()} and this update. + * If missing, the schedule will be updated unconditionally. MUST be `null` when + * `$schedule` is a closure — in the closure form the token is managed internally by + * the retry loop; passing a non-null value throws {@see InvalidArgumentException}. + * + * @throws InvalidArgumentException When a non-null `$conflictToken` is passed together with a + * closure `$schedule`. + * @throws ServiceClientException On a non-retryable server error, or after retries are + * exhausted in the closure form. */ public function update( Schedule|\Closure $schedule, ?string $conflictToken = null, ): void { if ($schedule instanceof \Closure) { + if ($conflictToken !== null) { + throw new InvalidArgumentException( + 'Passing a conflict token together with a closure updater is not supported: ' + . 'in closure form the token is fetched from describe() on every retry. ' + . 'Use the direct form `update(Schedule, ?string $conflictToken)` if you need ' + . 'to pin the update to a specific token.', + ); + } + $this->updateWithClosure($schedule); return; } @@ -223,7 +288,7 @@ private function updateWithClosure(\Closure $updater): void return; } catch (ServiceClientException $e) { if ($e->getCode() !== StatusCode::FAILED_PRECONDITION - || !\str_contains($e->getMessage(), 'conflict token') + || !\str_contains($e->getMessage(), self::CONFLICT_TOKEN_ERROR_MARKER) ) { throw $e; } diff --git a/tests/Unit/Schedule/ScheduleHandleTestCase.php b/tests/Unit/Schedule/ScheduleHandleTestCase.php index a507ee83c..579bad11b 100644 --- a/tests/Unit/Schedule/ScheduleHandleTestCase.php +++ b/tests/Unit/Schedule/ScheduleHandleTestCase.php @@ -59,7 +59,7 @@ public function testDelete(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->once()) ->method('DeleteSchedule') - ->with($this->callback(fn (DeleteScheduleRequest $request) => $testContext->request = $request or true)) + ->with($this->callback(static fn(DeleteScheduleRequest $request) => $testContext->request = $request or true)) ->willReturn(new DeleteScheduleResponse()); $scheduleHandle = $this->createScheduleHandle( @@ -83,7 +83,7 @@ public function testTrigger(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->once()) ->method('PatchSchedule') - ->with($this->callback(fn (PatchScheduleRequest $request) => $testContext->request = $request or true)) + ->with($this->callback(static fn(PatchScheduleRequest $request) => $testContext->request = $request or true)) ->willReturn(new PatchScheduleResponse()); $scheduleHandle = $this->createScheduleHandle( @@ -109,7 +109,7 @@ public function testPause(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->once()) ->method('PatchSchedule') - ->with($this->callback(fn (PatchScheduleRequest $request) => $testContext->request = $request or true)) + ->with($this->callback(static fn(PatchScheduleRequest $request) => $testContext->request = $request or true)) ->willReturn(new PatchScheduleResponse()); $scheduleHandle = $this->createScheduleHandle( @@ -142,7 +142,7 @@ public function testUnpause(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->once()) ->method('PatchSchedule') - ->with($this->callback(fn (PatchScheduleRequest $request) => $testContext->request = $request or true)) + ->with($this->callback(static fn(PatchScheduleRequest $request) => $testContext->request = $request or true)) ->willReturn(new PatchScheduleResponse()); $scheduleHandle = $this->createScheduleHandle( @@ -178,7 +178,7 @@ public function testBackfill(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->once()) ->method('PatchSchedule') - ->with($this->callback(fn (PatchScheduleRequest $request) => $testContext->request = $request or true)) + ->with($this->callback(static fn(PatchScheduleRequest $request) => $testContext->request = $request or true)) ->willReturn(new PatchScheduleResponse()); $scheduleHandle = $this->createScheduleHandle( client: $clientMock, @@ -216,7 +216,7 @@ public function testUpdate(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->once()) ->method('UpdateSchedule') - ->with($this->callback(fn (UpdateScheduleRequest $request) => $testContext->request = $request or true)) + ->with($this->callback(static fn(UpdateScheduleRequest $request) => $testContext->request = $request or true)) ->willReturn(new UpdateScheduleResponse()); $scheduleHandle = $this->createScheduleHandle(client: $clientMock); @@ -243,22 +243,22 @@ public function testUpdateUsingClosureDefaults(): void ->willReturn((new DescribeScheduleResponse())->setConflictToken('describe-conflict-token')); $clientMock->expects($this->once()) ->method('UpdateSchedule') - ->with($this->callback(fn (UpdateScheduleRequest $request) => $testContext->request = $request or true)) + ->with($this->callback(static fn(UpdateScheduleRequest $request) => $testContext->request = $request or true)) ->willReturn(new UpdateScheduleResponse()); $scheduleHandle = $this->createScheduleHandle(client: $clientMock); - $scheduleHandle->update(function (ScheduleUpdateInput $input): ScheduleUpdate { + $scheduleHandle->update(static function (ScheduleUpdateInput $input): ScheduleUpdate { $schedule = Schedule::new(); $sa = EncodedCollection::fromValues(['foo' => 'bar']); return ScheduleUpdate::new($schedule) ->withSearchAttributes($sa); - }, 'argument-conflict-token'); + }); $this->assertTrue(isset($testContext->request)); $this->assertSame('default', $testContext->request->getNamespace()); $this->assertSame('test-id', $testContext->request->getScheduleId()); $this->assertSame('test-identity', $testContext->request->getIdentity()); - // Conflict token should come from describe(), not from the argument + // Conflict token must come from describe() — it is the only source in closure mode $this->assertSame('describe-conflict-token', $testContext->request->getConflictToken()); $this->assertNotNull($testContext->request->getRequestId()); $this->assertNotNull($testContext->request->getSchedule()); @@ -270,6 +270,23 @@ public function testUpdateUsingClosureDefaults(): void $this->assertSame(['foo' => 'bar'], $sa); } + public function testUpdateWithClosureAndExplicitConflictTokenThrows(): void + { + $clientMock = $this->createMock(ServiceClientInterface::class); + $clientMock->expects($this->never())->method('DescribeSchedule'); + $clientMock->expects($this->never())->method('UpdateSchedule'); + + $scheduleHandle = $this->createScheduleHandle(client: $clientMock); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Passing a conflict token together with a closure updater is not supported'); + + $scheduleHandle->update( + static fn(ScheduleUpdateInput $input): ScheduleUpdate => ScheduleUpdate::new(Schedule::new()), + 'explicit-token', + ); + } + public function testUpdateWithClosureRetriesOnConflictTokenMismatch(): void { $testContext = new class { @@ -281,14 +298,14 @@ public function testUpdateWithClosureRetriesOnConflictTokenMismatch(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->exactly(3)) ->method('DescribeSchedule') - ->willReturnCallback(function () use ($testContext) { + ->willReturnCallback(static function () use ($testContext) { $testContext->describeCount++; return (new DescribeScheduleResponse()) ->setConflictToken('token-' . $testContext->describeCount); }); $clientMock->expects($this->exactly(3)) ->method('UpdateSchedule') - ->willReturnCallback(function (UpdateScheduleRequest $request) use ($testContext) { + ->willReturnCallback(static function (UpdateScheduleRequest $request) use ($testContext) { $testContext->updateCount++; $testContext->request = $request; // Fail first 2 attempts with conflict token mismatch @@ -305,7 +322,7 @@ public function testUpdateWithClosureRetriesOnConflictTokenMismatch(): void $scheduleHandle = $this->createScheduleHandle(client: $clientMock); $closureCallCount = 0; - $scheduleHandle->update(function (ScheduleUpdateInput $input) use (&$closureCallCount): ScheduleUpdate { + $scheduleHandle->update(static function (ScheduleUpdateInput $input) use (&$closureCallCount): ScheduleUpdate { $closureCallCount++; return ScheduleUpdate::new(Schedule::new()); }); @@ -336,7 +353,7 @@ public function testUpdateWithClosureThrowsNonConflictError(): void $this->expectException(ServiceClientException::class); $this->expectExceptionCode(StatusCode::NOT_FOUND); - $scheduleHandle->update(function (ScheduleUpdateInput $input): ScheduleUpdate { + $scheduleHandle->update(static function (ScheduleUpdateInput $input): ScheduleUpdate { return ScheduleUpdate::new(Schedule::new()); }); } @@ -360,7 +377,7 @@ public function testUpdateWithClosureExhaustsRetries(): void $this->expectException(ServiceClientException::class); $this->expectExceptionCode(StatusCode::FAILED_PRECONDITION); - $scheduleHandle->update(function (ScheduleUpdateInput $input): ScheduleUpdate { + $scheduleHandle->update(static function (ScheduleUpdateInput $input): ScheduleUpdate { return ScheduleUpdate::new(Schedule::new()); }); } @@ -395,7 +412,7 @@ public function testDescribe(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->once()) ->method('DescribeSchedule') - ->with($this->callback(fn (DescribeScheduleRequest $request) => $testContext->request = $request or true)) + ->with($this->callback(static fn(DescribeScheduleRequest $request) => $testContext->request = $request or true)) ->willReturn((new DescribeScheduleResponse())->setConflictToken('test-conflict-token')); $scheduleHandle = $this->createScheduleHandle( client: $clientMock, @@ -425,10 +442,11 @@ public function testListScheduleMatchingTimes(): void $clientMock = $this->createMock(ServiceClientInterface::class); $clientMock->expects($this->once()) ->method('ListScheduleMatchingTimes') - ->with($this->callback(fn (ListSchedulesRequest $request) => $testContext->request = $request or true)) - ->willReturn((new ListScheduleMatchingTimesResponse()) - ->setStartTime(array_map(static fn(\DateTimeInterface $dateTime) => (new \Google\Protobuf\Timestamp()) - ->setSeconds($dateTime->getTimestamp()), $resultList)) + ->with($this->callback(static fn(ListSchedulesRequest $request) => $testContext->request = $request or true)) + ->willReturn( + (new ListScheduleMatchingTimesResponse()) + ->setStartTime(\array_map(static fn(\DateTimeInterface $dateTime) => (new \Google\Protobuf\Timestamp()) + ->setSeconds($dateTime->getTimestamp()), $resultList)), ); $scheduleHandle = $this->createScheduleHandle( client: $clientMock,