Skip to content

Commit 3176fed

Browse files
committed
feat: allow UseExisting conflicts policy
1 parent 487887e commit 3176fed

6 files changed

Lines changed: 319 additions & 0 deletions

File tree

src/Client/WorkflowOptions.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
use Temporal\Common\Versioning\VersioningOverride;
2727
use Temporal\Common\WorkflowIdConflictPolicy;
2828
use Temporal\DataConverter\DataConverterInterface;
29+
use Temporal\Internal\Client\OnConflictOptions;
2930
use Temporal\Internal\Marshaller\Meta\Marshal;
3031
use Temporal\Internal\Marshaller\Type\ArrayType;
3132
use Temporal\Internal\Marshaller\Type\CronType;
@@ -193,6 +194,9 @@ final class WorkflowOptions extends Options
193194
#[Marshal(name: 'VersioningOverride')]
194195
public ?VersioningOverride $versioningOverride = null;
195196

197+
/** @internal */
198+
public ?OnConflictOptions $onConflictOptions = null;
199+
196200
/**
197201
* @throws \Exception
198202
*/
@@ -612,4 +616,16 @@ public function withPriority(Priority $priority): self
612616
$self->priority = $priority;
613617
return $self;
614618
}
619+
620+
/**
621+
* @internal
622+
* @return $this
623+
*/
624+
#[Pure]
625+
public function withOnConflictOptionsInternal(?OnConflictOptions $options): self
626+
{
627+
$self = clone $this;
628+
$self->onConflictOptions = $options;
629+
return $self;
630+
}
615631
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
/**
4+
* This file is part of Temporal package.
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Temporal\Internal\Client;
13+
14+
/**
15+
* Workflow-id conflict policy options: what to attach to the existing run when
16+
* a start request conflicts. Serialized to {@see \Temporal\Api\Workflow\V1\OnConflictOptions}.
17+
*
18+
* @internal
19+
* @psalm-immutable
20+
*/
21+
final class OnConflictOptions
22+
{
23+
public function __construct(
24+
public readonly bool $attachRequestId = true,
25+
public readonly bool $attachCompletionCallbacks = true,
26+
public readonly bool $attachLinks = true,
27+
) {}
28+
}

src/Internal/Client/WorkflowStarter.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Temporal\Internal\Client;
1313

1414
use Temporal\Api\Common\V1\WorkflowType;
15+
use Temporal\Api\Workflow\V1\OnConflictOptions as OnConflictOptionsProto;
1516
use Temporal\Api\Deployment\V1\WorkerDeploymentVersion;
1617
use Temporal\Api\Errordetails\V1\MultiOperationExecutionFailure;
1718
use Temporal\Api\Errordetails\V1\WorkflowExecutionAlreadyStartedFailure;
@@ -32,6 +33,7 @@
3233
use Temporal\Client\Update\UpdateHandle;
3334
use Temporal\Client\Update\UpdateOptions;
3435
use Temporal\Client\WorkflowOptions;
36+
use Temporal\Common\WorkflowIdConflictPolicy;
3537
use Temporal\Common\Uuid;
3638
use Temporal\Common\Versioning\VersioningBehavior;
3739
use Temporal\DataConverter\DataConverterInterface;
@@ -47,6 +49,7 @@
4749
use Temporal\Interceptor\WorkflowClient\UpdateWithStartInput;
4850
use Temporal\Interceptor\WorkflowClient\UpdateWithStartOutput;
4951
use Temporal\Interceptor\WorkflowClientCallsInterceptor;
52+
use Temporal\Internal\Client\OnConflictOptions;
5053
use Temporal\Internal\Interceptor\Pipeline;
5154
use Temporal\Internal\Support\DateInterval;
5255
use Temporal\Workflow\WorkflowExecution;
@@ -276,6 +279,15 @@ function (UpdateWithStartInput $input): UpdateWithStartOutput {
276279
);
277280
}
278281

282+
private static function onConflictOptionsToProto(OnConflictOptions $options): OnConflictOptionsProto
283+
{
284+
$proto = new OnConflictOptionsProto();
285+
$proto->setAttachRequestId($options->attachRequestId);
286+
$proto->setAttachCompletionCallbacks($options->attachCompletionCallbacks);
287+
$proto->setAttachLinks($options->attachLinks);
288+
return $proto;
289+
}
290+
279291
/**
280292
* @param StartWorkflowExecutionRequest|SignalWithStartWorkflowExecutionRequest $request
281293
* use {@see configureExecutionRequest()} to prepare request
@@ -296,6 +308,10 @@ private function executeRequest(
296308
\assert($f instanceof WorkflowExecutionAlreadyStartedFailure);
297309
$execution = new WorkflowExecution($request->getWorkflowId(), $f->getRunId());
298310

311+
if ($request->getWorkflowIdConflictPolicy() === WorkflowIdConflictPolicy::UseExisting->value) {
312+
return $execution;
313+
}
314+
299315
throw new WorkflowExecutionAlreadyStartedException(
300316
$execution,
301317
$request->getWorkflowType()->getName(),
@@ -395,6 +411,10 @@ private function configureExecutionRequest(
395411

396412
if ($req instanceof StartWorkflowExecutionRequest) {
397413
$req->setRequestEagerExecution($options->eagerStart);
414+
415+
if ($options->onConflictOptions !== null) {
416+
$req->setOnConflictOptions(self::onConflictOptionsToProto($options->onConflictOptions));
417+
}
398418
}
399419

400420
if (!$input->arguments->isEmpty()) {

tests/Acceptance/Extra/Update/UpdateWithStartTest.php

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Temporal\Client\Update\UpdateOptions;
1111
use Temporal\Client\WorkflowClientInterface;
1212
use Temporal\Client\WorkflowOptions;
13+
use Temporal\Common\WorkflowIdConflictPolicy;
1314
use Temporal\Exception\Client\WorkflowExecutionAlreadyStartedException;
1415
use Temporal\Exception\Client\WorkflowFailedException;
1516
use Temporal\Exception\Client\WorkflowServiceException;
@@ -138,6 +139,96 @@ public function failOnReuseExistingWorkflowId(
138139
$stub1->signal('exit');
139140
}
140141
}
142+
143+
#[Test]
144+
public function useExistingReturnsRunningExecution(
145+
WorkflowClientInterface $client,
146+
Feature $feature,
147+
): void {
148+
$id = Uuid::uuid7()->__toString();
149+
150+
$first = $client->newUntypedWorkflowStub(
151+
'Extra_Update_UseExisting',
152+
WorkflowOptions::new()->withTaskQueue($feature->taskQueue)->withWorkflowId($id),
153+
);
154+
$client->start($first);
155+
$firstRunId = $first->getExecution()->getRunID();
156+
157+
$second = $client->newUntypedWorkflowStub(
158+
'Extra_Update_UseExisting',
159+
WorkflowOptions::new()
160+
->withTaskQueue($feature->taskQueue)
161+
->withWorkflowId($id)
162+
->withWorkflowIdConflictPolicy(WorkflowIdConflictPolicy::UseExisting),
163+
);
164+
165+
try {
166+
$client->start($second);
167+
168+
$this->assertSame($id, $second->getExecution()->getID());
169+
$this->assertSame(
170+
$firstRunId,
171+
$second->getExecution()->getRunID(),
172+
'UseExisting must resolve to the already-running execution instead of throwing',
173+
);
174+
} finally {
175+
$first->signal('exit');
176+
}
177+
}
178+
179+
#[Test]
180+
public function failPolicyThrowsOnRunningWorkflowId(
181+
WorkflowClientInterface $client,
182+
Feature $feature,
183+
): void {
184+
$id = Uuid::uuid7()->__toString();
185+
186+
$first = $client->newUntypedWorkflowStub(
187+
'Extra_Update_UseExisting',
188+
WorkflowOptions::new()->withTaskQueue($feature->taskQueue)->withWorkflowId($id),
189+
);
190+
$client->start($first);
191+
192+
$second = $client->newUntypedWorkflowStub(
193+
'Extra_Update_UseExisting',
194+
WorkflowOptions::new()
195+
->withTaskQueue($feature->taskQueue)
196+
->withWorkflowId($id)
197+
->withWorkflowIdConflictPolicy(WorkflowIdConflictPolicy::Fail),
198+
);
199+
200+
try {
201+
$this->expectException(WorkflowExecutionAlreadyStartedException::class);
202+
$client->start($second);
203+
} finally {
204+
$first->signal('exit');
205+
}
206+
}
207+
208+
#[Test]
209+
public function useExistingStartsFreshWhenNoneRunning(
210+
WorkflowClientInterface $client,
211+
Feature $feature,
212+
): void {
213+
$id = Uuid::uuid7()->__toString();
214+
215+
$stub = $client->newUntypedWorkflowStub(
216+
'Extra_Update_UseExisting',
217+
WorkflowOptions::new()
218+
->withTaskQueue($feature->taskQueue)
219+
->withWorkflowId($id)
220+
->withWorkflowIdConflictPolicy(WorkflowIdConflictPolicy::UseExisting),
221+
);
222+
223+
try {
224+
$client->start($stub);
225+
226+
$this->assertSame($id, $stub->getExecution()->getID());
227+
$this->assertNotSame('', $stub->getExecution()->getRunID());
228+
} finally {
229+
$stub->signal('exit');
230+
}
231+
}
141232
}
142233

143234
#[WorkflowInterface]
@@ -195,3 +286,22 @@ public function __construct(
195286
public int $length = 0,
196287
) {}
197288
}
289+
290+
#[WorkflowInterface]
291+
class UseExistingWorkflow
292+
{
293+
private bool $exit = false;
294+
295+
#[WorkflowMethod(name: 'Extra_Update_UseExisting')]
296+
public function handle(): \Generator
297+
{
298+
yield Workflow::await(fn(): bool => $this->exit);
299+
return 'done';
300+
}
301+
302+
#[Workflow\SignalMethod]
303+
public function exit(): void
304+
{
305+
$this->exit = true;
306+
}
307+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/**
4+
* This file is part of Temporal package.
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*/
9+
10+
declare(strict_types=1);
11+
12+
namespace Temporal\Tests\Unit\Internal\Client;
13+
14+
use PHPUnit\Framework\Attributes\CoversClass;
15+
use PHPUnit\Framework\TestCase;
16+
use Temporal\Internal\Client\OnConflictOptions;
17+
18+
#[CoversClass(OnConflictOptions::class)]
19+
final class OnConflictOptionsTestCase extends TestCase
20+
{
21+
public function testDefaultsAreAllTrue(): void
22+
{
23+
$options = new OnConflictOptions();
24+
25+
self::assertTrue($options->attachRequestId);
26+
self::assertTrue($options->attachCompletionCallbacks);
27+
self::assertTrue($options->attachLinks);
28+
}
29+
30+
public function testAcceptsExplicitFlags(): void
31+
{
32+
$options = new OnConflictOptions(
33+
attachRequestId: false,
34+
attachCompletionCallbacks: false,
35+
attachLinks: false,
36+
);
37+
38+
self::assertFalse($options->attachRequestId);
39+
self::assertFalse($options->attachCompletionCallbacks);
40+
self::assertFalse($options->attachLinks);
41+
}
42+
}

0 commit comments

Comments
 (0)