Skip to content

Commit d36f1c4

Browse files
[cross-repo from server#227] TD-S111: External-worker signals are rejected as unknown_signal in polyglot runs (#593)
1 parent 10cbca7 commit d36f1c4

5 files changed

Lines changed: 288 additions & 9 deletions

File tree

src/V2/CommandResult.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,13 @@ public function message(): ?string
152152
{
153153
return $this->command->commandMessage();
154154
}
155+
156+
/**
157+
* @param list<string> $keys
158+
* @return array<string, mixed>
159+
*/
160+
public function payloadValues(array $keys): array
161+
{
162+
return $this->command->payloadValues($keys);
163+
}
155164
}

src/V2/Models/WorkflowCommand.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,29 @@ public function commandMessage(): ?string
444444
return is_string($message) && $message !== '' ? $message : null;
445445
}
446446

447+
/**
448+
* @param list<string> $keys
449+
* @return array<string, mixed>
450+
*/
451+
public function payloadValues(array $keys): array
452+
{
453+
$payload = $this->payloadData();
454+
455+
if (! is_array($payload)) {
456+
return [];
457+
}
458+
459+
$values = [];
460+
461+
foreach ($keys as $key) {
462+
if (array_key_exists($key, $payload)) {
463+
$values[$key] = $payload[$key];
464+
}
465+
}
466+
467+
return $values;
468+
}
469+
447470
/**
448471
* @return array<string, mixed>|null
449472
*/

src/V2/Support/CommandResponse.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,30 @@ public static function payload(CommandResult $result, ?string $workflowType = nu
3232
'validation_errors' => $result->validationErrors(),
3333
];
3434

35+
$payloadFields = $result->rejected()
36+
? [
37+
'message',
38+
'command_contract_source',
39+
'command_contract_backfill_needed',
40+
'command_contract_backfill_available',
41+
'declared_signals',
42+
'signal_admission',
43+
]
44+
: [];
45+
$payloadValues = $payloadFields === [] ? [] : $result->payloadValues($payloadFields);
46+
47+
foreach ($payloadFields as $field) {
48+
$value = $payloadValues[$field] ?? null;
49+
50+
if ($field === 'message' && (! is_string($value) || $value === '')) {
51+
continue;
52+
}
53+
54+
if ($value !== null) {
55+
$payload[$field] = $value;
56+
}
57+
}
58+
3559
if ($result instanceof SignalWithStartResult) {
3660
$payload['start_command_id'] = $result->startCommandId();
3761
$payload['start_command_sequence'] = $result->startCommandSequence();

src/V2/WorkflowStub.php

Lines changed: 160 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2261,14 +2261,22 @@ private function attemptSignalInternal(
22612261

22622262
$this->loadLockedRunRelations($run, $instance);
22632263

2264-
if (! RunCommandContract::hasSignal($run, $name)) {
2264+
$signalAdmission = $this->signalAdmissionForRun($run, $name);
2265+
2266+
if (($signalAdmission['allowed'] ?? false) !== true) {
22652267
$command = $this->rejectCommand(
22662268
$instance,
22672269
$run,
22682270
CommandType::Signal,
22692271
'unknown_signal',
22702272
$this->commandTargetScope(),
2271-
$this->signalCommandPayloadAttributes($name, $arguments, [], $payloadCodec),
2273+
$this->signalCommandPayloadAttributes(
2274+
$name,
2275+
$arguments,
2276+
[],
2277+
$payloadCodec,
2278+
$signalAdmission['payload'] ?? [],
2279+
),
22722280
);
22732281
$this->recordRejectedSignal($command, $name, $arguments);
22742282

@@ -2462,14 +2470,19 @@ private function attemptSignalWithStartInternal(
24622470

24632471
$this->loadLockedRunRelations($run, $instance);
24642472

2465-
if (! RunCommandContract::hasSignal($run, $name)) {
2473+
$signalAdmission = $this->signalAdmissionForRun($run, $name);
2474+
2475+
if (($signalAdmission['allowed'] ?? false) !== true) {
24662476
$signalCommand = $this->rejectSignalCommandForContext(
24672477
$commandContext,
24682478
$instance,
24692479
$run,
24702480
$name,
24712481
$signalArguments,
24722482
'unknown_signal',
2483+
[],
2484+
$signalAdmission['message'] ?? null,
2485+
$signalAdmission['payload'] ?? [],
24732486
);
24742487

24752488
return;
@@ -3381,6 +3394,7 @@ private function rejectSignalCommandForContext(
33813394
string $reason,
33823395
array $validationErrors = [],
33833396
?string $message = null,
3397+
array $extraPayload = [],
33843398
): WorkflowCommand {
33853399
/** @var WorkflowCommand $command */
33863400
$command = WorkflowCommand::record($instance, $run, $this->commandAttributesForContext(
@@ -3398,12 +3412,15 @@ private function rejectSignalCommandForContext(
33983412
$arguments,
33993413
$validationErrors,
34003414
null,
3401-
$message === null
3402-
? []
3403-
: [
3404-
'reason' => $reason,
3405-
'message' => $message,
3406-
],
3415+
array_merge(
3416+
$extraPayload,
3417+
$message === null
3418+
? []
3419+
: [
3420+
'reason' => $reason,
3421+
'message' => $message,
3422+
],
3423+
),
34073424
),
34083425
],
34093426
));
@@ -3619,6 +3636,140 @@ private function validatedSignalArgumentsForRun(WorkflowRun $run, string $signal
36193636
: $this->normalizeNamedCommandArguments($contract, $arguments);
36203637
}
36213638

3639+
/**
3640+
* @return array{
3641+
* allowed: bool,
3642+
* payload?: array<string, mixed>,
3643+
* message?: string
3644+
* }
3645+
*/
3646+
private function signalAdmissionForRun(WorkflowRun $run, string $signalName): array
3647+
{
3648+
$contract = RunCommandContract::forRun($run);
3649+
$diagnostics = $this->signalContractDiagnostics($contract);
3650+
3651+
if (in_array($signalName, $contract['signals'], true)) {
3652+
return [
3653+
'allowed' => true,
3654+
'payload' => [
3655+
...$diagnostics,
3656+
'signal_admission' => 'declared_signal',
3657+
],
3658+
];
3659+
}
3660+
3661+
if (($contract['source'] ?? null) !== RunCommandContract::SOURCE_DURABLE_HISTORY) {
3662+
try {
3663+
$workflowClass = TypeRegistry::resolveWorkflowClass($run->workflow_class, $run->workflow_type);
3664+
} catch (LogicException) {
3665+
return [
3666+
'allowed' => true,
3667+
'payload' => [
3668+
...$diagnostics,
3669+
'signal_admission' => 'external_contract_unavailable',
3670+
],
3671+
];
3672+
}
3673+
3674+
if (WorkflowDefinition::hasSignal($workflowClass, $signalName)) {
3675+
return [
3676+
'allowed' => true,
3677+
'payload' => [
3678+
...$diagnostics,
3679+
'signal_admission' => 'loadable_workflow_class',
3680+
],
3681+
];
3682+
}
3683+
3684+
$diagnostics = [
3685+
...$diagnostics,
3686+
'signal_admission' => 'handler_not_declared_in_loadable_class',
3687+
];
3688+
$message = $this->unknownSignalMessage($run, $signalName, $diagnostics);
3689+
3690+
return [
3691+
'allowed' => false,
3692+
'payload' => [
3693+
...$diagnostics,
3694+
'reason' => 'unknown_signal',
3695+
'message' => $message,
3696+
],
3697+
'message' => $message,
3698+
];
3699+
}
3700+
3701+
$diagnostics = [
3702+
...$diagnostics,
3703+
'signal_admission' => 'handler_not_declared',
3704+
];
3705+
$message = $this->unknownSignalMessage($run, $signalName, $diagnostics);
3706+
3707+
return [
3708+
'allowed' => false,
3709+
'payload' => [
3710+
...$diagnostics,
3711+
'reason' => 'unknown_signal',
3712+
'message' => $message,
3713+
],
3714+
'message' => $message,
3715+
];
3716+
}
3717+
3718+
/**
3719+
* @param array<string, mixed> $contract
3720+
* @return array{
3721+
* command_contract_source: string|null,
3722+
* command_contract_backfill_needed: bool,
3723+
* command_contract_backfill_available: bool,
3724+
* declared_signals: list<string>
3725+
* }
3726+
*/
3727+
private function signalContractDiagnostics(array $contract): array
3728+
{
3729+
$signals = $contract['signals'] ?? [];
3730+
3731+
return [
3732+
'command_contract_source' => is_string($contract['source'] ?? null)
3733+
? $contract['source']
3734+
: null,
3735+
'command_contract_backfill_needed' => ($contract['backfill_needed'] ?? false) === true,
3736+
'command_contract_backfill_available' => ($contract['backfill_available'] ?? false) === true,
3737+
'declared_signals' => is_array($signals)
3738+
? array_values(array_filter($signals, static fn (mixed $signal): bool => is_string($signal) && $signal !== ''))
3739+
: [],
3740+
];
3741+
}
3742+
3743+
/**
3744+
* @param array<string, mixed> $diagnostics
3745+
*/
3746+
private function unknownSignalMessage(WorkflowRun $run, string $signalName, array $diagnostics): string
3747+
{
3748+
$declaredSignals = $diagnostics['declared_signals'] ?? [];
3749+
$declaredSummary = is_array($declaredSignals) && $declaredSignals !== []
3750+
? implode(', ', $declaredSignals)
3751+
: 'none';
3752+
$contractSource = is_string($diagnostics['command_contract_source'] ?? null)
3753+
? $diagnostics['command_contract_source']
3754+
: RunCommandContract::SOURCE_UNAVAILABLE;
3755+
$admission = is_string($diagnostics['signal_admission'] ?? null)
3756+
? $diagnostics['signal_admission']
3757+
: 'handler_not_declared';
3758+
3759+
$detail = $admission === 'handler_not_declared'
3760+
? 'the durable command contract does not declare that handler'
3761+
: 'the server did not receive durable signal declarations and the loadable workflow class does not declare that handler';
3762+
3763+
return sprintf(
3764+
'Workflow signal [%s] is unknown for workflow type [%s]: %s. Command contract source [%s], declared signals [%s].',
3765+
$signalName,
3766+
$run->workflow_type,
3767+
$detail,
3768+
$contractSource,
3769+
$declaredSummary,
3770+
);
3771+
}
3772+
36223773
/**
36233774
* @param array<int|string, mixed> $arguments
36243775
* @return array{arguments: list<mixed>, validation_errors: array<string, list<string>>}

tests/Feature/V2/V2WorkflowControlPlaneTest.php

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,53 @@ public function testStartWithTypeKeyOnly(): void
133133
$this->assertArrayNotHasKey('declared_signals', $startedEvent->payload);
134134
}
135135

136+
public function testSignalForTypeKeyOnlyRunIsAcceptedWhenCommandContractUnavailable(): void
137+
{
138+
$start = $this->controlPlane->start('remote-signal-workflow', 'ctrl-plane-remote-signal-1', [
139+
'arguments' => Serializer::serializeWithCodec(CodecRegistry::defaultCodec(), []),
140+
'connection' => 'redis',
141+
'queue' => 'polyglot-shared',
142+
]);
143+
144+
$this->assertTrue($start['started']);
145+
146+
/** @var WorkflowRun $run */
147+
$run = WorkflowRun::query()->findOrFail($start['workflow_run_id']);
148+
149+
WorkflowTask::query()
150+
->where('workflow_run_id', $run->id)
151+
->update(['status' => TaskStatus::Completed->value]);
152+
153+
$run->forceFill([
154+
'status' => RunStatus::Waiting->value,
155+
])->save();
156+
157+
$signal = $this->controlPlane->signal('ctrl-plane-remote-signal-1', 'polyglot-signal', [
158+
'arguments' => ['delivered'],
159+
'strict_configured_type_validation' => true,
160+
]);
161+
162+
$this->assertTrue($signal['accepted']);
163+
$this->assertSame(202, $signal['status']);
164+
$this->assertSame('signal_received', $signal['outcome']);
165+
$this->assertNull($signal['reason']);
166+
167+
$signalReceived = WorkflowHistoryEvent::query()
168+
->where('workflow_run_id', $run->id)
169+
->where('event_type', HistoryEventType::SignalReceived->value)
170+
->first();
171+
$this->assertNotNull($signalReceived);
172+
$this->assertSame('polyglot-signal', $signalReceived->payload['signal_name'] ?? null);
173+
174+
$resumeTask = WorkflowTask::query()
175+
->where('workflow_run_id', $run->id)
176+
->where('task_type', TaskType::Workflow->value)
177+
->where('status', TaskStatus::Ready->value)
178+
->first();
179+
$this->assertNotNull($resumeTask);
180+
$this->assertSame('polyglot-shared', $resumeTask->queue);
181+
}
182+
136183
public function testStartUsesHistoryProjectionRoleBindingForNewRunProjection(): void
137184
{
138185
$customRole = new class(new DefaultHistoryProjectionRole()) implements HistoryProjectionRole {
@@ -732,6 +779,31 @@ public function testSignalSupportsCommandContextAndDetailedResponsePayload(): vo
732779
$this->assertSame('default', $command->commandContext()['server']['namespace'] ?? null);
733780
}
734781

782+
public function testUnknownSignalRejectionIncludesCommandContractDiagnostics(): void
783+
{
784+
config()->set('workflows.v2.types.workflows', [
785+
'test-signal-workflow' => TestSignalWorkflow::class,
786+
]);
787+
788+
$this->controlPlane->start('test-signal-workflow', 'ctrl-plane-sig-unknown-detail', [
789+
'queue' => 'default',
790+
]);
791+
792+
$result = $this->controlPlane->signal('ctrl-plane-sig-unknown-detail', 'missing-signal', [
793+
'strict_configured_type_validation' => true,
794+
]);
795+
796+
$this->assertFalse($result['accepted']);
797+
$this->assertSame(404, $result['status']);
798+
$this->assertSame('unknown_signal', $result['reason']);
799+
$this->assertSame('rejected_unknown_signal', $result['outcome']);
800+
$this->assertSame('durable_history', $result['command_contract_source']);
801+
$this->assertFalse($result['command_contract_backfill_needed']);
802+
$this->assertSame(['name-provided'], $result['declared_signals']);
803+
$this->assertSame('handler_not_declared', $result['signal_admission']);
804+
$this->assertStringContainsString('durable command contract does not declare', $result['message']);
805+
}
806+
735807
// ── Query happy path ────────────────────────────────────────────
736808

737809
public function testQueryReturnsResult(): void

0 commit comments

Comments
 (0)