Skip to content

Commit ff085f4

Browse files
Accept open_condition_wait command from external workers
Adds external-worker support for durable wait_condition. The engine-side ConditionWait machinery (HistoryEventType::ConditionWaitOpened, condition timeout timers) already existed for in-process PHP workflows; this exposes it to the worker protocol. WorkflowCommandNormalizer accepts open_condition_wait with optional condition_key, condition_definition_fingerprint, and timeout_seconds. DefaultWorkflowTaskBridge applies the command by recording ConditionWaitOpened and, when timeout_seconds > 0, scheduling a timer with timer_kind=condition_timeout (matching the in-process executor's shape so ConditionWaits/RunWaitView/HistoryTimeline keep working unchanged). Closes one leg of #397. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a71304a commit ff085f4

4 files changed

Lines changed: 337 additions & 3 deletions

File tree

src/V2/Support/DefaultWorkflowTaskBridge.php

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ final class DefaultWorkflowTaskBridge implements WorkflowTaskBridge
4848
'record_side_effect',
4949
'record_version_marker',
5050
'upsert_search_attributes',
51+
'open_condition_wait',
5152
];
5253

5354
public function __construct(
@@ -919,6 +920,7 @@ private function applyNonTerminalCommand(
919920
'record_side_effect' => $this->applyRecordSideEffect($run, $task, $command, $sequence),
920921
'record_version_marker' => $this->applyRecordVersionMarker($run, $task, $command, $sequence),
921922
'upsert_search_attributes' => $this->applyUpsertSearchAttributes($run, $task, $command, $sequence),
923+
'open_condition_wait' => $this->applyOpenConditionWait($run, $task, $command, $sequence, $createdTaskIds),
922924
default => $sequence,
923925
};
924926
}
@@ -1312,6 +1314,88 @@ private function applyStartTimer(
13121314
return $sequence + 1;
13131315
}
13141316

1317+
/**
1318+
* @param array{
1319+
* type: string,
1320+
* condition_key?: string|null,
1321+
* condition_definition_fingerprint?: string|null,
1322+
* timeout_seconds?: int|null
1323+
* } $command
1324+
* @param list<string> $createdTaskIds
1325+
*/
1326+
private function applyOpenConditionWait(
1327+
WorkflowRun $run,
1328+
WorkflowTask $task,
1329+
array $command,
1330+
int $sequence,
1331+
array &$createdTaskIds,
1332+
): int {
1333+
$conditionKey = self::nonEmptyString($command['condition_key'] ?? null);
1334+
$conditionDefinitionFingerprint = self::nonEmptyString(
1335+
$command['condition_definition_fingerprint'] ?? null,
1336+
);
1337+
$timeoutSeconds = is_int($command['timeout_seconds'] ?? null) && $command['timeout_seconds'] >= 0
1338+
? (int) $command['timeout_seconds']
1339+
: null;
1340+
1341+
$waitId = (string) Str::ulid();
1342+
1343+
WorkflowHistoryEvent::record($run, HistoryEventType::ConditionWaitOpened, array_filter([
1344+
'condition_wait_id' => $waitId,
1345+
'condition_key' => $conditionKey,
1346+
'condition_definition_fingerprint' => $conditionDefinitionFingerprint,
1347+
'sequence' => $sequence,
1348+
'timeout_seconds' => $timeoutSeconds,
1349+
], static fn (mixed $value): bool => $value !== null), $task);
1350+
1351+
if ($timeoutSeconds !== null && $timeoutSeconds > 0) {
1352+
$fireAt = now()
1353+
->addSeconds($timeoutSeconds);
1354+
1355+
/** @var WorkflowTimer $timer */
1356+
$timer = WorkflowTimer::query()->create([
1357+
'workflow_run_id' => $run->id,
1358+
'sequence' => $sequence,
1359+
'status' => TimerStatus::Pending->value,
1360+
'delay_seconds' => $timeoutSeconds,
1361+
'fire_at' => $fireAt,
1362+
]);
1363+
1364+
WorkflowHistoryEvent::record($run, HistoryEventType::TimerScheduled, array_filter([
1365+
'timer_id' => $timer->id,
1366+
'sequence' => $sequence,
1367+
'delay_seconds' => $timer->delay_seconds,
1368+
'fire_at' => $timer->fire_at?->toJSON(),
1369+
'timer_kind' => 'condition_timeout',
1370+
'condition_wait_id' => $waitId,
1371+
'condition_key' => $conditionKey,
1372+
'condition_definition_fingerprint' => $conditionDefinitionFingerprint,
1373+
], static fn (mixed $value): bool => $value !== null), $task);
1374+
1375+
/** @var WorkflowTask $timerTask */
1376+
$timerTask = WorkflowTask::query()->create([
1377+
'workflow_run_id' => $run->id,
1378+
'namespace' => $run->namespace,
1379+
'task_type' => TaskType::Timer->value,
1380+
'status' => TaskStatus::Ready->value,
1381+
'available_at' => $fireAt,
1382+
'payload' => array_filter([
1383+
'timer_id' => $timer->id,
1384+
'condition_wait_id' => $waitId,
1385+
'condition_key' => $conditionKey,
1386+
'condition_definition_fingerprint' => $conditionDefinitionFingerprint,
1387+
], static fn (mixed $value): bool => $value !== null),
1388+
'connection' => $run->connection,
1389+
'queue' => $run->queue,
1390+
'compatibility' => $run->compatibility,
1391+
]);
1392+
1393+
$createdTaskIds[] = $timerTask->id;
1394+
}
1395+
1396+
return $sequence + 1;
1397+
}
1398+
13151399
/**
13161400
* @param array{
13171401
* type: string,
@@ -2176,10 +2260,34 @@ private static function normalizeCommand(array $command): ?array
21762260
'record_side_effect' => self::normalizeRecordSideEffectCommand($command),
21772261
'record_version_marker' => self::normalizeRecordVersionMarkerCommand($command),
21782262
'upsert_search_attributes' => self::normalizeUpsertSearchAttributesCommand($command),
2263+
'open_condition_wait' => self::normalizeOpenConditionWaitCommand($command),
21792264
default => null,
21802265
};
21812266
}
21822267

2268+
/**
2269+
* @param array<string, mixed> $command
2270+
* @return array{
2271+
* type: string,
2272+
* condition_key?: string,
2273+
* condition_definition_fingerprint?: string,
2274+
* timeout_seconds?: int
2275+
* }
2276+
*/
2277+
private static function normalizeOpenConditionWaitCommand(array $command): array
2278+
{
2279+
$timeoutSeconds = $command['timeout_seconds'] ?? null;
2280+
2281+
return array_filter([
2282+
'type' => 'open_condition_wait',
2283+
'condition_key' => self::normalizeOptionalString($command['condition_key'] ?? null),
2284+
'condition_definition_fingerprint' => self::normalizeOptionalString(
2285+
$command['condition_definition_fingerprint'] ?? null,
2286+
),
2287+
'timeout_seconds' => is_int($timeoutSeconds) && $timeoutSeconds >= 0 ? $timeoutSeconds : null,
2288+
], static fn (mixed $value): bool => $value !== null);
2289+
}
2290+
21832291
/**
21842292
* @param array<string, mixed> $command
21852293
* @return array{type: string, message: string, exception_class?: string, exception_type?: string}|null

src/V2/Support/WorkflowCommandNormalizer.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,38 @@ public static function normalize(array $commands): array
313313
continue;
314314
}
315315

316+
if ($type === 'open_condition_wait') {
317+
$conditionKey = self::optionalCommandString($command, 'condition_key', $index, $errors);
318+
$conditionDefinitionFingerprint = self::optionalCommandString(
319+
$command,
320+
'condition_definition_fingerprint',
321+
$index,
322+
$errors,
323+
);
324+
$timeoutSeconds = null;
325+
326+
if (array_key_exists('timeout_seconds', $command) && $command['timeout_seconds'] !== null) {
327+
if (! is_int($command['timeout_seconds']) || (int) $command['timeout_seconds'] < 0) {
328+
$errors["commands.{$index}.timeout_seconds"] = [
329+
'Open condition wait timeout_seconds must be a non-negative integer when provided.',
330+
];
331+
332+
continue;
333+
}
334+
335+
$timeoutSeconds = (int) $command['timeout_seconds'];
336+
}
337+
338+
$normalized[] = array_filter([
339+
'type' => $type,
340+
'condition_key' => $conditionKey,
341+
'condition_definition_fingerprint' => $conditionDefinitionFingerprint,
342+
'timeout_seconds' => $timeoutSeconds,
343+
], static fn (mixed $value): bool => $value !== null);
344+
345+
continue;
346+
}
347+
316348
$errors["commands.{$index}.type"] = [
317349
sprintf('Workflow task command type [%s] is not supported by the server yet.', $type),
318350
];

tests/Feature/V2/V2WorkflowTaskBridgeTest.php

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,127 @@ public function testTimerTaskCreatesResumeTaskWithTimerContext(): void
16181618
$this->assertSame(HistoryEventType::TimerFired->value, $resumeTask->payload['workflow_event_type'] ?? null);
16191619
}
16201620

1621+
public function testCompleteOpenConditionWaitWithoutTimeoutRecordsEventAndMarksWaiting(): void
1622+
{
1623+
$run = $this->createWaitingRun();
1624+
1625+
/** @var WorkflowTask $task */
1626+
$task = $this->createLeasedTask($run);
1627+
1628+
$result = $this->bridge->complete($task->id, [
1629+
[
1630+
'type' => 'open_condition_wait',
1631+
'condition_key' => 'order-ready',
1632+
],
1633+
]);
1634+
1635+
$this->assertTrue($result['completed']);
1636+
$this->assertSame('waiting', $result['run_status']);
1637+
$this->assertSame([], $result['created_task_ids']);
1638+
1639+
$run->refresh();
1640+
$this->assertSame(RunStatus::Waiting, $run->status);
1641+
1642+
$event = WorkflowHistoryEvent::query()
1643+
->where('workflow_run_id', $run->id)
1644+
->where('event_type', HistoryEventType::ConditionWaitOpened->value)
1645+
->first();
1646+
1647+
$this->assertNotNull($event);
1648+
$this->assertSame('order-ready', $event->payload['condition_key'] ?? null);
1649+
$this->assertIsString($event->payload['condition_wait_id'] ?? null);
1650+
$this->assertSame(1, $event->payload['sequence'] ?? null);
1651+
$this->assertArrayNotHasKey('timeout_seconds', $event->payload);
1652+
1653+
$this->assertSame(0, WorkflowTimer::query()->where('workflow_run_id', $run->id)->count());
1654+
}
1655+
1656+
public function testCompleteOpenConditionWaitWithTimeoutSchedulesConditionTimeoutTimer(): void
1657+
{
1658+
$run = $this->createWaitingRun();
1659+
1660+
/** @var WorkflowTask $task */
1661+
$task = $this->createLeasedTask($run);
1662+
1663+
$result = $this->bridge->complete($task->id, [
1664+
[
1665+
'type' => 'open_condition_wait',
1666+
'condition_key' => 'payment-cleared',
1667+
'condition_definition_fingerprint' => 'fp-1',
1668+
'timeout_seconds' => 45,
1669+
],
1670+
]);
1671+
1672+
$this->assertTrue($result['completed']);
1673+
$this->assertSame('waiting', $result['run_status']);
1674+
$this->assertCount(1, $result['created_task_ids']);
1675+
1676+
$opened = WorkflowHistoryEvent::query()
1677+
->where('workflow_run_id', $run->id)
1678+
->where('event_type', HistoryEventType::ConditionWaitOpened->value)
1679+
->firstOrFail();
1680+
1681+
$this->assertSame('payment-cleared', $opened->payload['condition_key'] ?? null);
1682+
$this->assertSame('fp-1', $opened->payload['condition_definition_fingerprint'] ?? null);
1683+
$this->assertSame(45, $opened->payload['timeout_seconds'] ?? null);
1684+
1685+
$waitId = $opened->payload['condition_wait_id'] ?? null;
1686+
$this->assertIsString($waitId);
1687+
1688+
$timer = WorkflowTimer::query()
1689+
->where('workflow_run_id', $run->id)
1690+
->firstOrFail();
1691+
1692+
$this->assertSame(TimerStatus::Pending, $timer->status);
1693+
$this->assertSame(45, $timer->delay_seconds);
1694+
$this->assertSame(1, $timer->sequence);
1695+
1696+
$scheduled = WorkflowHistoryEvent::query()
1697+
->where('workflow_run_id', $run->id)
1698+
->where('event_type', HistoryEventType::TimerScheduled->value)
1699+
->firstOrFail();
1700+
1701+
$this->assertSame('condition_timeout', $scheduled->payload['timer_kind'] ?? null);
1702+
$this->assertSame($waitId, $scheduled->payload['condition_wait_id'] ?? null);
1703+
$this->assertSame('payment-cleared', $scheduled->payload['condition_key'] ?? null);
1704+
$this->assertSame('fp-1', $scheduled->payload['condition_definition_fingerprint'] ?? null);
1705+
1706+
$timerTask = WorkflowTask::query()
1707+
->where('workflow_run_id', $run->id)
1708+
->where('task_type', TaskType::Timer->value)
1709+
->firstOrFail();
1710+
1711+
$this->assertSame($timer->id, $timerTask->payload['timer_id'] ?? null);
1712+
$this->assertSame($waitId, $timerTask->payload['condition_wait_id'] ?? null);
1713+
$this->assertSame('payment-cleared', $timerTask->payload['condition_key'] ?? null);
1714+
}
1715+
1716+
public function testCompleteOpenConditionWaitWithZeroTimeoutDoesNotScheduleTimer(): void
1717+
{
1718+
$run = $this->createWaitingRun();
1719+
1720+
/** @var WorkflowTask $task */
1721+
$task = $this->createLeasedTask($run);
1722+
1723+
$result = $this->bridge->complete($task->id, [
1724+
[
1725+
'type' => 'open_condition_wait',
1726+
'timeout_seconds' => 0,
1727+
],
1728+
]);
1729+
1730+
$this->assertTrue($result['completed']);
1731+
$this->assertSame([], $result['created_task_ids']);
1732+
1733+
$opened = WorkflowHistoryEvent::query()
1734+
->where('workflow_run_id', $run->id)
1735+
->where('event_type', HistoryEventType::ConditionWaitOpened->value)
1736+
->firstOrFail();
1737+
1738+
$this->assertSame(0, $opened->payload['timeout_seconds'] ?? null);
1739+
$this->assertSame(0, WorkflowTimer::query()->where('workflow_run_id', $run->id)->count());
1740+
}
1741+
16211742
public function testCompleteStartsChildWorkflow(): void
16221743
{
16231744
$run = $this->createWaitingRun();
@@ -1795,7 +1916,7 @@ public function testCompleteStartsChildWorkflowSnapshotsRetryPolicyAndTimeouts()
17951916
->where('sequence', 1)
17961917
->firstOrFail();
17971918

1798-
$this->assertSame([
1919+
$this->assertSameJsonObject([
17991920
'snapshot_version' => 1,
18001921
'max_attempts' => 3,
18011922
'backoff_seconds' => [2, 8],
@@ -1999,7 +2120,7 @@ public function testChildWorkflowCompletionCreatesParentResumeTaskWithChildConte
19992120
->where('status', TaskStatus::Ready->value)
20002121
->firstOrFail();
20012122

2002-
$this->assertSame([
2123+
$this->assertSameJsonObject([
20032124
'workflow_wait_kind' => 'child',
20042125
'open_wait_id' => sprintf('child:%s', $link->id),
20052126
'resume_source_kind' => 'child_workflow_run',
@@ -2241,7 +2362,7 @@ public function testScheduleActivitySnapshotsExternalRetryPolicyAndTimeouts(): v
22412362
->where('workflow_run_id', $run->id)
22422363
->firstOrFail();
22432364

2244-
$this->assertSame([
2365+
$this->assertSameJsonObject([
22452366
'snapshot_version' => 1,
22462367
'max_attempts' => 4,
22472368
'backoff_seconds' => [1, 5, 30],

0 commit comments

Comments
 (0)