Skip to content

Commit 18864bd

Browse files
TD-092: soft-warn payload/memo/search-attribute size before server rejection
Add warnApproachingPayloadSize/MemoSize/SearchAttributeSize helpers to StructuralLimits, mirroring the existing count-based warn helpers, and wire them into every payload-producing call site so operators see a structured Laravel log warning before the hard guard trips (or before the server rejects an oversized payload). Warn sites: - WorkflowExecutor: activity input, child workflow input, continueAsNew arguments, workflow output, memo upsert, search-attribute upsert. - WorkflowStub: accepted signal arguments, accepted update arguments. - ActivityOutcomeRecorder: successful activity result. Each warning carries the run, workflow_type, payload_site, and the responsible class/name so the signal is actionable. Uses the same DEFAULT_WARNING_THRESHOLD_PERCENT as the count-based warnings. Closes #445.
1 parent e3bcf9a commit 18864bd

5 files changed

Lines changed: 286 additions & 37 deletions

File tree

src/V2/Support/ActivityOutcomeRecorder.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,18 @@ public static function record(
150150
$serializedResult = self::serializeWithCodec($result, $codec, $runCodec);
151151
$resultCodec = self::payloadCodec($codec, $runCodec);
152152

153+
StructuralLimits::logWarning(
154+
StructuralLimits::warnApproachingPayloadSize($serializedResult),
155+
[
156+
'workflow_run_id' => $run->id,
157+
'workflow_type' => $run->workflow_type,
158+
'payload_site' => 'activity_output',
159+
'activity_class' => $lockedExecution->activity_class,
160+
'activity_type' => $lockedExecution->activity_type,
161+
'activity_execution_id' => $lockedExecution->id,
162+
],
163+
);
164+
153165
$lockedExecution->forceFill([
154166
'status' => ActivityStatus::Completed,
155167
'result' => $serializedResult,

src/V2/Support/StructuralLimits.php

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Workflow\V2\Support;
66

7+
use Illuminate\Support\Facades\Log;
78
use Workflow\V2\Enums\ActivityStatus;
89
use Workflow\V2\Enums\RunStatus;
910
use Workflow\V2\Enums\SignalStatus;
@@ -421,6 +422,61 @@ public static function warnApproachingCommandBatch(int $batchSize): ?array
421422
return self::checkApproaching(StructuralLimitKind::CommandBatchSize, $batchSize);
422423
}
423424

425+
/**
426+
* @return array{limit_kind: string, current: int, limit: int, threshold_percent: int, utilization_percent: int}|null
427+
*/
428+
public static function warnApproachingPayloadSize(string $serialized): ?array
429+
{
430+
return self::checkApproaching(StructuralLimitKind::PayloadSize, strlen($serialized));
431+
}
432+
433+
/**
434+
* @return array{limit_kind: string, current: int, limit: int, threshold_percent: int, utilization_percent: int}|null
435+
*/
436+
public static function warnApproachingMemoSize(string $serialized): ?array
437+
{
438+
return self::checkApproaching(StructuralLimitKind::MemoSize, strlen($serialized));
439+
}
440+
441+
/**
442+
* @return array{limit_kind: string, current: int, limit: int, threshold_percent: int, utilization_percent: int}|null
443+
*/
444+
public static function warnApproachingSearchAttributeSize(string $serialized): ?array
445+
{
446+
return self::checkApproaching(StructuralLimitKind::SearchAttributeSize, strlen($serialized));
447+
}
448+
449+
/**
450+
* Emit a structured Laravel log warning when a soft-limit check
451+
* returns a snapshot. Call sites pass the result of
452+
* checkApproaching / warnApproaching* directly; null is a no-op so
453+
* callers don't need to branch.
454+
*
455+
* @param array{limit_kind: string, current: int, limit: int, threshold_percent: int, utilization_percent: int}|null $warning
456+
* @param array<string, mixed> $context
457+
*/
458+
public static function logWarning(?array $warning, array $context = []): void
459+
{
460+
if ($warning === null) {
461+
return;
462+
}
463+
464+
Log::warning(sprintf(
465+
'[Durable Workflow] Approaching structural limit [%s]: %d / %d (%d%% utilization, warning at %d%%).',
466+
$warning['limit_kind'],
467+
$warning['current'],
468+
$warning['limit'],
469+
$warning['utilization_percent'],
470+
$warning['threshold_percent'],
471+
), array_merge([
472+
'limit_kind' => $warning['limit_kind'],
473+
'current' => $warning['current'],
474+
'limit' => $warning['limit'],
475+
'utilization_percent' => $warning['utilization_percent'],
476+
'threshold_percent' => $warning['threshold_percent'],
477+
], $context));
478+
}
479+
424480
/**
425481
* Return the configured hard limit for a given kind, or 0 if disabled.
426482
*/

src/V2/Support/WorkflowExecutor.php

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,6 +1346,11 @@ private function scheduleActivity(
13461346
// a stored per-execution codec column. See #429.
13471347
$argumentsCodec = Serializer::chooseCodecForData($run->payload_codec, $activityCall->arguments);
13481348
$serializedArguments = Serializer::serializeWithCodec($argumentsCodec, $activityCall->arguments);
1349+
$this->logApproachingLimit(
1350+
StructuralLimits::warnApproachingPayloadSize($serializedArguments),
1351+
$run,
1352+
['payload_site' => 'activity_input', 'activity_class' => $activityCall->activity],
1353+
);
13491354
StructuralLimits::guardPayloadSize($serializedArguments);
13501355

13511356
/** @var ActivityExecution $execution */
@@ -1552,6 +1557,11 @@ private function scheduleChildWorkflow(
15521557
// matches what the blob was serialized with.
15531558
$childCodec = Serializer::chooseCodecForData($preferredChildCodec, $metadata->arguments);
15541559
$serializedChildArguments = Serializer::serializeWithCodec($childCodec, $metadata->arguments);
1560+
$this->logApproachingLimit(
1561+
StructuralLimits::warnApproachingPayloadSize($serializedChildArguments),
1562+
$run,
1563+
['payload_site' => 'child_workflow_input', 'child_workflow_class' => $childWorkflowCall->workflow],
1564+
);
15551565
StructuralLimits::guardPayloadSize($serializedChildArguments);
15561566

15571567
/** @var WorkflowInstance $childInstance */
@@ -2233,6 +2243,13 @@ private function continueAsNew(
22332243
->addSeconds((int) $runTimeoutSeconds)
22342244
: null;
22352245

2246+
$continueAsNewArguments = Serializer::serializeWithCodec($run->payload_codec, $continueAsNew->arguments);
2247+
$this->logApproachingLimit(
2248+
StructuralLimits::warnApproachingPayloadSize($continueAsNewArguments),
2249+
$run,
2250+
['payload_site' => 'continue_as_new_input', 'target_workflow_class' => $workflowClass],
2251+
);
2252+
22362253
/** @var WorkflowRun $continuedRun */
22372254
$continuedRun = WorkflowRun::query()->create([
22382255
'workflow_instance_id' => $run->workflow_instance_id,
@@ -2250,7 +2267,7 @@ private function continueAsNew(
22502267
'status' => RunStatus::Pending->value,
22512268
'compatibility' => $run->compatibility,
22522269
'payload_codec' => $run->payload_codec,
2253-
'arguments' => Serializer::serializeWithCodec($run->payload_codec, $continueAsNew->arguments),
2270+
'arguments' => $continueAsNewArguments,
22542271
'connection' => $run->connection,
22552272
'queue' => $run->queue,
22562273
'started_at' => $now,
@@ -2476,10 +2493,17 @@ private function continueAsNew(
24762493

24772494
private function completeRun(WorkflowRun $run, WorkflowTask $task, mixed $result): void
24782495
{
2496+
$serializedOutput = Serializer::serializeWithCodec($run->payload_codec, $result);
2497+
$this->logApproachingLimit(
2498+
StructuralLimits::warnApproachingPayloadSize($serializedOutput),
2499+
$run,
2500+
['payload_site' => 'workflow_output'],
2501+
);
2502+
24792503
$run->forceFill([
24802504
'status' => RunStatus::Completed,
24812505
'closed_reason' => 'completed',
2482-
'output' => Serializer::serializeWithCodec($run->payload_codec, $result),
2506+
'output' => $serializedOutput,
24832507
'closed_at' => now(),
24842508
'last_progress_at' => now(),
24852509
])->save();
@@ -3724,7 +3748,13 @@ private function recordSearchAttributesUpserted(
37243748
}
37253749

37263750
ksort($merged);
3727-
StructuralLimits::guardSearchAttributeSize(json_encode($merged, JSON_THROW_ON_ERROR));
3751+
$serializedSearchAttributes = json_encode($merged, JSON_THROW_ON_ERROR);
3752+
$this->logApproachingLimit(
3753+
StructuralLimits::warnApproachingSearchAttributeSize($serializedSearchAttributes),
3754+
$run,
3755+
['payload_site' => 'search_attributes'],
3756+
);
3757+
StructuralLimits::guardSearchAttributeSize($serializedSearchAttributes);
37283758

37293759
$run->search_attributes = $merged;
37303760
$run->save();
@@ -3795,7 +3825,13 @@ private function recordMemoUpserted(
37953825

37963826
ksort($merged);
37973827

3798-
StructuralLimits::guardMemoSize(json_encode($merged, JSON_THROW_ON_ERROR));
3828+
$serializedMemo = json_encode($merged, JSON_THROW_ON_ERROR);
3829+
$this->logApproachingLimit(
3830+
StructuralLimits::warnApproachingMemoSize($serializedMemo),
3831+
$run,
3832+
['payload_site' => 'memo'],
3833+
);
3834+
StructuralLimits::guardMemoSize($serializedMemo);
37993835

38003836
$run->memo = $merged;
38013837
$run->save();
@@ -4484,34 +4520,19 @@ private static function parallelGroupPath(?array $parallelMetadata): ?array
44844520
}
44854521

44864522
/**
4487-
* Log a structured warning when a count-based resource is approaching
4488-
* its hard structural limit. Callers pass the result of one of the
4489-
* `StructuralLimits::warnApproaching*()` helpers — null means "safe,
4490-
* no warning needed."
4523+
* Log a structured warning when a count- or size-based resource is
4524+
* approaching its hard structural limit. Callers pass the result of
4525+
* one of the `StructuralLimits::warnApproaching*()` helpers — null
4526+
* means "safe, no warning needed."
44914527
*
44924528
* @param array{limit_kind: string, current: int, limit: int, threshold_percent: int, utilization_percent: int}|null $warning
4529+
* @param array<string, mixed> $extraContext
44934530
*/
4494-
private function logApproachingLimit(?array $warning, WorkflowRun $run): void
4531+
private function logApproachingLimit(?array $warning, WorkflowRun $run, array $extraContext = []): void
44954532
{
4496-
if ($warning === null) {
4497-
return;
4498-
}
4499-
4500-
Log::warning(sprintf(
4501-
'[Durable Workflow] Run %d approaching structural limit [%s]: %d / %d (%d%% utilization, warning at %d%%).',
4502-
$run->id,
4503-
$warning['limit_kind'],
4504-
$warning['current'],
4505-
$warning['limit'],
4506-
$warning['utilization_percent'],
4507-
$warning['threshold_percent'],
4508-
), [
4533+
StructuralLimits::logWarning($warning, array_merge([
45094534
'workflow_run_id' => $run->id,
45104535
'workflow_type' => $run->workflow_type,
4511-
'limit_kind' => $warning['limit_kind'],
4512-
'current' => $warning['current'],
4513-
'limit' => $warning['limit'],
4514-
'utilization_percent' => $warning['utilization_percent'],
4515-
]);
4536+
], $extraContext));
45164537
}
45174538
}

src/V2/WorkflowStub.php

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1857,6 +1857,19 @@ private function recordAcceptedUpdateWithArguments(string $method, array $argume
18571857
'accepted_at' => now(),
18581858
], $updateCommandAttributes)));
18591859

1860+
$updateCodec = $run->payload_codec ?? CodecRegistry::defaultCodec();
1861+
$serializedUpdateArguments = Serializer::serializeWithCodec($updateCodec, $arguments);
1862+
1863+
StructuralLimits::logWarning(
1864+
StructuralLimits::warnApproachingPayloadSize($serializedUpdateArguments),
1865+
[
1866+
'workflow_run_id' => $run->id,
1867+
'workflow_type' => $run->workflow_type,
1868+
'payload_site' => 'update_input',
1869+
'update_name' => $updateName,
1870+
],
1871+
);
1872+
18601873
/** @var WorkflowUpdate $update */
18611874
$update = WorkflowUpdate::query()->create([
18621875
'workflow_command_id' => $command->id,
@@ -1868,11 +1881,8 @@ private function recordAcceptedUpdateWithArguments(string $method, array $argume
18681881
'update_name' => $updateName,
18691882
'status' => UpdateStatus::Accepted->value,
18701883
'command_sequence' => $command->command_sequence,
1871-
'payload_codec' => $run->payload_codec ?? CodecRegistry::defaultCodec(),
1872-
'arguments' => Serializer::serializeWithCodec(
1873-
$run->payload_codec ?? CodecRegistry::defaultCodec(),
1874-
$arguments
1875-
),
1884+
'payload_codec' => $updateCodec,
1885+
'arguments' => $serializedUpdateArguments,
18761886
'accepted_at' => $command->accepted_at,
18771887
]);
18781888

@@ -1882,10 +1892,7 @@ private function recordAcceptedUpdateWithArguments(string $method, array $argume
18821892
'workflow_instance_id' => $instance->id,
18831893
'workflow_run_id' => $run->id,
18841894
'update_name' => $updateName,
1885-
'arguments' => Serializer::serializeWithCodec(
1886-
$run->payload_codec ?? CodecRegistry::defaultCodec(),
1887-
$arguments
1888-
),
1895+
'arguments' => $serializedUpdateArguments,
18891896
], null, $command);
18901897

18911898
$resumeTask = $this->readyWorkflowTaskForDispatch($run->id);
@@ -3099,6 +3106,17 @@ private function recordAcceptedSignal(
30993106
?string $payloadBlob = null,
31003107
): WorkflowSignal {
31013108
$codec = $payloadCodec ?? $run->payload_codec ?? CodecRegistry::defaultCodec();
3109+
$serializedArguments = $payloadBlob ?? Serializer::serializeWithCodec($codec, $arguments);
3110+
3111+
StructuralLimits::logWarning(
3112+
StructuralLimits::warnApproachingPayloadSize($serializedArguments),
3113+
[
3114+
'workflow_run_id' => $run->id,
3115+
'workflow_type' => $run->workflow_type,
3116+
'payload_site' => 'signal_input',
3117+
'signal_name' => $name,
3118+
],
3119+
);
31023120

31033121
/** @var WorkflowSignal $signal */
31043122
$signal = WorkflowSignal::query()->create([
@@ -3114,7 +3132,7 @@ private function recordAcceptedSignal(
31143132
'outcome' => $command->outcome?->value,
31153133
'command_sequence' => $command->command_sequence,
31163134
'payload_codec' => $codec,
3117-
'arguments' => $payloadBlob ?? Serializer::serializeWithCodec($codec, $arguments),
3135+
'arguments' => $serializedArguments,
31183136
'received_at' => $command->accepted_at,
31193137
]);
31203138

0 commit comments

Comments
 (0)