Skip to content

Commit 3fc54aa

Browse files
Add first-class durable message stream authoring API
Add durable message stream authoring API
1 parent 886f185 commit 3fc54aa

7 files changed

Lines changed: 504 additions & 31 deletions

File tree

docs/api-stability.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,30 @@ omitted a null value. Consumers must continue to accept missing optional
214214
keys indefinitely. Producers must not rename, remove, or change the type
215215
or meaning of an existing key.
216216

217+
## Durable Message Stream Authoring API
218+
219+
The first-class v2 inbox/outbox surface is `Workflow\V2\MessageStream`, opened
220+
from `Workflow::messages()`, `Workflow::inbox()`, `Workflow::outbox()`, or
221+
`MessageService::stream()`.
222+
223+
Stable methods:
224+
225+
- `key(): string`
226+
- `cursor(): int`
227+
- `hasPending(): bool`
228+
- `pendingCount(): int`
229+
- `peek(int $limit = 100): Collection`
230+
- `receive(int $limit = 1, ?int $consumedBySequence = null): Collection`
231+
- `receiveOne(?int $consumedBySequence = null): ?WorkflowMessage`
232+
- `sendReference(string $targetInstanceId, ?string $payloadReference = null, MessageChannel|string $channel = MessageChannel::WorkflowMessage, ?string $correlationId = null, ?string $idempotencyKey = null, array $metadata = [], ?DateTimeInterface $expiresAt = null): WorkflowMessage`
233+
234+
`peek()` is non-mutating. `receive()` and `receiveOne()` consume pending inbound
235+
messages and advance the durable cursor; they must be associated with a
236+
positive workflow history sequence, either from the workflow base class or an
237+
explicit runtime/control-plane caller. `sendReference()` stores a payload
238+
reference and routing metadata only; inline payload storage is not part of the
239+
stable contract.
240+
217241
### `VersionMarkerRecorded`
218242

219243
This marker records the result of `Workflow::getVersion()`, `Workflow::patched()`,

docs/workflow-messages-architecture.md

Lines changed: 74 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,11 @@ Stream B: msg_seq_1, msg_seq_2, msg_seq_3
170170

171171
**Sequence reservation:**
172172
1. Sender calls `sendMessage()`
173-
2. MessageService locks target instance (`SELECT ... FOR UPDATE`)
174-
3. `MessageStreamCursor::reserveNextSequence()` increments `instance.last_message_sequence`
175-
4. Both inbound and outbound messages get same sequence number
176-
5. Lock released on transaction commit
173+
2. MessageService locks sender and target instances (`SELECT ... FOR UPDATE`)
174+
3. `MessageStreamCursor::reserveNextSequence()` increments each instance's `last_message_sequence`
175+
4. The outbound row gets the sender instance's next sequence number
176+
5. The inbound row gets the target instance's next sequence number
177+
6. Lock released on transaction commit
177178

178179
**Ordering guarantee:** Unique constraint prevents duplicate sequences in same stream.
179180

@@ -273,11 +274,60 @@ enum MessageConsumeState: string
273274

274275
## API Surface
275276

277+
### MessageStream
278+
279+
Workflow authors and adapters should use `Workflow\V2\MessageStream` as the
280+
named v2 inbox/outbox contract instead of constructing the legacy
281+
`Workflow\Inbox` or `Workflow\Outbox` helpers. A stream is bound to one
282+
`WorkflowRun` and one `stream_key`.
283+
284+
```php
285+
// From workflow code:
286+
$messages = $this->inbox('chat')->peek();
287+
$message = $this->inbox('chat')->receiveOne();
288+
289+
// From runtime/control-plane code that already owns a run:
290+
$stream = app(MessageService::class)->stream($run, 'chat', $historySequence);
291+
$stream->sendReference(
292+
targetInstanceId: $targetInstanceId,
293+
payloadReference: $payloadReference,
294+
correlationId: $requestId,
295+
);
296+
```
297+
298+
Contract:
299+
300+
- `peek($limit)` is read-only and returns pending inbound messages after the
301+
run cursor.
302+
- `receive($limit, $consumedBySequence)` consumes pending inbound messages,
303+
stamps each row with the workflow history sequence that performed the
304+
receive, and advances the cursor to the highest consumed message sequence.
305+
- `receiveOne()` is `receive(1)`.
306+
- `sendReference()` sends an outbound message and creates the corresponding
307+
target inbound row. The table stores a payload reference, not arbitrary
308+
inline payload bytes.
309+
- `Workflow::messages()`, `Workflow::inbox()`, and `Workflow::outbox()` all
310+
open this same stream facade for the current run. `inbox()` and `outbox()`
311+
are semantic aliases for author readability, not separate storage models.
312+
313+
Receive/consume operations require a positive workflow history sequence. The
314+
workflow base class supplies the current visible sequence when it opens a
315+
stream; lower-level callers must pass the sequence explicitly so replay,
316+
history export, and cursor diagnostics can identify the workflow step that
317+
consumed the messages.
318+
276319
### MessageService
277320

278321
```php
279322
class MessageService
280323
{
324+
// Open the first-class stream facade
325+
public function stream(
326+
WorkflowRun $run,
327+
?string $streamKey = null,
328+
?int $defaultConsumedBySequence = null,
329+
): MessageStream;
330+
281331
// Send outbound message
282332
public function sendMessage(
283333
WorkflowRun $sourceRun,
@@ -458,31 +508,32 @@ foreach ($webhooks as $webhook) {
458508
```
459509
[Send Phase]
460510
1. Sender calls sendMessage()
461-
2. MessageService locks target instance
462-
3. Reserves next sequence number
463-
4. Creates outbound message (sender's record)
464-
5. Creates inbound message (receiver's record)
465-
6. Transaction commits
511+
2. MessageService locks sender and target instances
512+
3. Reserves the sender stream's next sequence for the outbound row
513+
4. Reserves the target stream's next sequence for the inbound row
514+
5. Creates outbound message (sender's record)
515+
6. Creates inbound message (receiver's record)
516+
7. Transaction commits
466517
467518
[Receive Phase]
468-
7. Receiver calls receiveMessages()
469-
8. Fetches pending inbound messages after cursor
470-
9. Returns messages in sequence order
519+
8. Receiver calls receiveMessages()
520+
9. Fetches pending inbound messages after cursor
521+
10. Returns messages in sequence order
471522
472523
[Consume Phase]
473-
10. Receiver processes messages
474-
11. Calls consumeMessage() or consumeMessageBatch()
475-
12. Messages marked as consumed
476-
13. Cursor advanced to consumed sequence
477-
14. MessageCursorAdvanced history event recorded
524+
11. Receiver processes messages
525+
12. Calls consumeMessage() or consumeMessageBatch()
526+
13. Messages marked as consumed
527+
14. Cursor advanced to consumed sequence
528+
15. MessageCursorAdvanced history event recorded
478529
479530
[Continue-As-New Phase]
480-
15. Closing run has cursor at position N
481-
16. Pending messages (sequence > N) still exist
482-
17. transferMessagesToContinuedRun() called
483-
18. Pending messages workflow_run_id updated to new run
484-
19. Cursor position transferred
485-
20. New run continues from position N
531+
16. Closing run has cursor at position N
532+
17. Pending messages (sequence > N) still exist
533+
18. transferMessagesToContinuedRun() called
534+
19. Pending messages workflow_run_id updated to new run
535+
20. Cursor position transferred
536+
21. New run continues from position N
486537
```
487538

488539
## Integration with MessageStreamCursor

src/V2/MessageStream.php

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2;
6+
7+
use DateTimeInterface;
8+
use Illuminate\Database\Eloquent\Collection;
9+
use InvalidArgumentException;
10+
use Workflow\V2\Enums\MessageChannel;
11+
use Workflow\V2\Models\WorkflowMessage;
12+
use Workflow\V2\Models\WorkflowRun;
13+
use Workflow\V2\Support\MessageService;
14+
use Workflow\V2\Support\MessageStreamCursor;
15+
16+
/**
17+
* First-class v2 durable message stream facade.
18+
*
19+
* @api Stable v2 authoring API for repeated human input and workflow-to-workflow message streams.
20+
*/
21+
final class MessageStream
22+
{
23+
public function __construct(
24+
private readonly WorkflowRun $run,
25+
private readonly string $streamKey,
26+
private readonly MessageService $messages = new MessageService(),
27+
private readonly ?int $defaultConsumedBySequence = null,
28+
) {
29+
if ($streamKey === '') {
30+
throw new InvalidArgumentException('Message stream key must not be empty.');
31+
}
32+
}
33+
34+
public static function forRun(
35+
WorkflowRun $run,
36+
?string $streamKey = null,
37+
?MessageService $messages = null,
38+
?int $defaultConsumedBySequence = null,
39+
): self {
40+
return new self(
41+
$run,
42+
$streamKey ?? MessageStreamCursor::defaultStreamKey($run),
43+
$messages ?? new MessageService(),
44+
$defaultConsumedBySequence,
45+
);
46+
}
47+
48+
public function key(): string
49+
{
50+
return $this->streamKey;
51+
}
52+
53+
public function cursor(): int
54+
{
55+
return MessageStreamCursor::positionForRun($this->run);
56+
}
57+
58+
public function hasPending(): bool
59+
{
60+
return $this->messages->hasUnconsumedMessages($this->run, $this->streamKey);
61+
}
62+
63+
public function pendingCount(): int
64+
{
65+
return $this->messages->getUnconsumedCount($this->run, $this->streamKey);
66+
}
67+
68+
/**
69+
* Inspect pending inbound messages without consuming them.
70+
*
71+
* @return Collection<int, WorkflowMessage>
72+
*/
73+
public function peek(int $limit = 100): Collection
74+
{
75+
return $this->messages->receiveMessages($this->run, $this->streamKey, $this->positiveLimit($limit));
76+
}
77+
78+
/**
79+
* Receive and consume pending inbound messages.
80+
*
81+
* @return Collection<int, WorkflowMessage>
82+
*/
83+
public function receive(int $limit = 1, ?int $consumedBySequence = null): Collection
84+
{
85+
$messages = $this->peek($limit);
86+
87+
if ($messages->isEmpty()) {
88+
return $messages;
89+
}
90+
91+
$this->messages->consumeMessageBatch(
92+
$this->run,
93+
$messages->all(),
94+
$this->consumedBySequence($consumedBySequence),
95+
);
96+
97+
return $messages;
98+
}
99+
100+
public function receiveOne(?int $consumedBySequence = null): ?WorkflowMessage
101+
{
102+
/** @var WorkflowMessage|null $message */
103+
$message = $this->receive(1, $consumedBySequence)
104+
->first();
105+
106+
return $message;
107+
}
108+
109+
/**
110+
* Send a payload-reference message to another workflow instance.
111+
*
112+
* Message payload bytes live outside `workflow_messages`; this method
113+
* stores the durable pointer plus stream ordering/correlation metadata.
114+
*
115+
* @param array<string, mixed> $metadata
116+
*/
117+
public function sendReference(
118+
string $targetInstanceId,
119+
?string $payloadReference = null,
120+
MessageChannel|string $channel = MessageChannel::WorkflowMessage,
121+
?string $correlationId = null,
122+
?string $idempotencyKey = null,
123+
array $metadata = [],
124+
?DateTimeInterface $expiresAt = null,
125+
): WorkflowMessage {
126+
return $this->messages->sendMessage(
127+
$this->run,
128+
$channel,
129+
$targetInstanceId,
130+
$payloadReference,
131+
$this->streamKey,
132+
$correlationId,
133+
$idempotencyKey,
134+
$metadata,
135+
$expiresAt,
136+
);
137+
}
138+
139+
private function consumedBySequence(?int $override): int
140+
{
141+
$sequence = $override ?? $this->defaultConsumedBySequence;
142+
143+
if ($sequence === null || $sequence < 1) {
144+
throw new InvalidArgumentException(
145+
'Receiving a durable message requires a positive workflow history sequence.',
146+
);
147+
}
148+
149+
return $sequence;
150+
}
151+
152+
private function positiveLimit(int $limit): int
153+
{
154+
if ($limit < 1) {
155+
throw new InvalidArgumentException('Message receive limit must be at least 1.');
156+
}
157+
158+
return $limit;
159+
}
160+
}

src/V2/Support/MessageService.php

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,21 @@
99
use Workflow\V2\Enums\MessageChannel;
1010
use Workflow\V2\Enums\MessageConsumeState;
1111
use Workflow\V2\Enums\MessageDirection;
12+
use Workflow\V2\MessageStream;
1213
use Workflow\V2\Models\WorkflowInstance;
1314
use Workflow\V2\Models\WorkflowMessage;
1415
use Workflow\V2\Models\WorkflowRun;
1516

1617
class MessageService
1718
{
19+
public function stream(
20+
WorkflowRun $run,
21+
?string $streamKey = null,
22+
?int $defaultConsumedBySequence = null,
23+
): MessageStream {
24+
return MessageStream::forRun($run, $streamKey, $this, $defaultConsumedBySequence);
25+
}
26+
1827
/**
1928
* Send an outbound message from a workflow run.
2029
*
@@ -55,13 +64,27 @@ public function sendMessage(
5564
$metadata,
5665
$expiresAt,
5766
): WorkflowMessage {
58-
// Lock target instance to reserve sequence
59-
$targetInstance = WorkflowInstance::where('id', $targetInstanceId)
67+
$instanceIds = array_values(array_unique([$sourceRun->workflow_instance_id, $targetInstanceId]));
68+
sort($instanceIds);
69+
70+
/** @var \Illuminate\Support\Collection<string, WorkflowInstance> $instances */
71+
$instances = WorkflowInstance::whereIn('id', $instanceIds)
72+
->orderBy('id')
6073
->lockForUpdate()
61-
->firstOrFail();
74+
->get()
75+
->keyBy('id');
76+
77+
/** @var WorkflowInstance $sourceInstance */
78+
$sourceInstance = $instances->get($sourceRun->workflow_instance_id)
79+
?? WorkflowInstance::where('id', $sourceRun->workflow_instance_id)->firstOrFail();
80+
/** @var WorkflowInstance $targetInstance */
81+
$targetInstance = $instances->get($targetInstanceId)
82+
?? WorkflowInstance::where('id', $targetInstanceId)->firstOrFail();
6283

63-
// Reserve next sequence number
64-
$sequence = MessageStreamCursor::reserveNextSequence($targetInstance);
84+
$outboundSequence = MessageStreamCursor::reserveNextSequence($sourceInstance);
85+
$inboundSequence = $sourceRun->workflow_instance_id === $targetInstanceId
86+
? MessageStreamCursor::reserveNextSequence($sourceInstance)
87+
: MessageStreamCursor::reserveNextSequence($targetInstance);
6588

6689
// Create outbound message
6790
$message = new WorkflowMessage([
@@ -70,7 +93,7 @@ public function sendMessage(
7093
'direction' => MessageDirection::Outbound,
7194
'channel' => $channel,
7295
'stream_key' => $streamKey,
73-
'sequence' => $sequence,
96+
'sequence' => $outboundSequence,
7497
'source_workflow_instance_id' => $sourceRun->workflow_instance_id,
7598
'source_workflow_run_id' => $sourceRun->id,
7699
'target_workflow_instance_id' => $targetInstanceId,
@@ -94,7 +117,7 @@ public function sendMessage(
94117
'direction' => MessageDirection::Inbound,
95118
'channel' => $channel,
96119
'stream_key' => $streamKey,
97-
'sequence' => $sequence,
120+
'sequence' => $inboundSequence,
98121
'source_workflow_instance_id' => $sourceRun->workflow_instance_id,
99122
'source_workflow_run_id' => $sourceRun->id,
100123
'target_workflow_instance_id' => $targetInstanceId,

0 commit comments

Comments
 (0)