Skip to content

Commit e92bda0

Browse files
Clarify durable message stream consumption
Clarify durable message stream consumption
1 parent e48ac38 commit e92bda0

4 files changed

Lines changed: 152 additions & 9 deletions

File tree

docs/api-stability.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,25 @@ preference; both produce identical `Support\*` Call value objects.
102102
Adding new static methods to the facade is an additive (non-breaking)
103103
change. Removing or renaming a documented method is a major change.
104104

105+
## Durable Message Stream Contract
106+
107+
The v2 durable message service is the stable lower-level contract backing
108+
signals, updates, workflow-to-workflow messages, and repeated human-input
109+
flows:
110+
111+
- `MessageService::sendMessage()` creates paired outbound and inbound
112+
`workflow_messages` rows with one reserved instance sequence.
113+
- `MessageService::peekMessages()` and `receiveMessages()` read pending
114+
inbound messages after the run cursor. They are intentionally read-only:
115+
they do not mark messages consumed and do not advance the cursor.
116+
- `MessageService::consumeMessage()` and `consumeMessages()` are the only
117+
message-service APIs that mark messages consumed and advance the cursor.
118+
Batch consumption is same-stream only; mixed-stream batches are rejected so
119+
each `MessageCursorAdvanced` event names exactly one `stream_key`.
120+
- `MessageService::transferMessagesToContinuedRun()` moves pending inbound
121+
messages and the cursor position from the closing run to the continued run.
122+
Consumed messages stay attached to the original run as historical record.
123+
105124
## Continue-As-New Interleaving Contract
106125

107126
Continue-as-new keeps one logical workflow instance while closing one run

docs/workflow-messages-architecture.md

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,17 @@ Stream B: msg_seq_1, msg_seq_2, msg_seq_3
182182

183183
### Cursor Position
184184

185-
Each workflow run tracks a cursor position per stream:
185+
Each workflow run tracks one monotonic cursor position across the instance
186+
message sequence:
186187
- `workflow_runs.message_cursor_position` - last consumed sequence
187188
- Default: 0 (no messages consumed yet)
188189
- Cursor only moves forward (monotonic)
189190

191+
Stream keys scope receive and consume queries, but cursor advancement is still
192+
against the shared instance sequence. A single batch consume must therefore
193+
contain messages from one stream only; mixed-stream batches are rejected so the
194+
`MessageCursorAdvanced` event has one unambiguous `stream_key`.
195+
190196
### Cursor Advancement
191197

192198
When a message is consumed:
@@ -198,9 +204,10 @@ When a message is consumed:
198204

199205
**Idempotency:** `advanceCursor()` returns null if cursor already at or past requested position.
200206

201-
### Receiving Messages
207+
### Peeking And Receiving Messages
202208

203209
```php
210+
$messages = $messageService->peekMessages($run, $streamKey);
204211
$messages = $messageService->receiveMessages($run, $streamKey);
205212
```
206213

@@ -211,7 +218,11 @@ Returns messages where:
211218
- `consume_state = 'pending'`
212219
- `sequence > $run->message_cursor_position`
213220

214-
**Only unconsumed messages after cursor are returned.**
221+
`peekMessages()` and `receiveMessages()` are read-only. They return only
222+
unconsumed messages after the cursor, but they do not mark messages consumed and
223+
they do not advance the cursor. Cursor movement is explicit and happens only
224+
through `consumeMessage()` or `consumeMessages()` after workflow code has
225+
durably recorded how it handled the messages.
215226

216227
## Continue-As-New Handoff
217228

@@ -341,7 +352,14 @@ class MessageService
341352
?\DateTimeInterface $expiresAt = null,
342353
): WorkflowMessage;
343354

344-
// Receive unconsumed inbound messages
355+
// Peek unconsumed inbound messages without consuming or moving the cursor.
356+
public function peekMessages(
357+
WorkflowRun $run,
358+
?string $streamKey = null,
359+
int $limit = 100,
360+
): Collection<WorkflowMessage>;
361+
362+
// Compatibility alias for peekMessages().
345363
public function receiveMessages(
346364
WorkflowRun $run,
347365
?string $streamKey = null,
@@ -355,7 +373,14 @@ class MessageService
355373
int $consumedBySequence,
356374
): void;
357375

358-
// Consume multiple messages (cursor advances to highest sequence)
376+
// Consume multiple same-stream messages (cursor advances to highest sequence)
377+
public function consumeMessages(
378+
WorkflowRun $run,
379+
array $messages,
380+
int $consumedBySequence,
381+
): void;
382+
383+
// Compatibility alias for consumeMessages().
359384
public function consumeMessageBatch(
360385
WorkflowRun $run,
361386
array $messages,

src/V2/Support/MessageService.php

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,19 @@ public function sendMessage(
136136
}
137137

138138
/**
139-
* Receive unconsumed inbound messages for a workflow run.
139+
* Peek at unconsumed inbound messages for a workflow run.
140+
*
141+
* Peeking is read-only: it never marks messages consumed and never advances
142+
* the run cursor. Call {@see consumeMessage()} or {@see consumeMessages()}
143+
* after workflow code has durably recorded how the messages were handled.
140144
*
141145
* @param WorkflowRun $run The receiving workflow run
142146
* @param string|null $streamKey Stream key (defaults to instance stream)
143147
* @param int $limit Maximum number of messages to fetch
144148
*
145149
* @return \Illuminate\Database\Eloquent\Collection<WorkflowMessage>
146150
*/
147-
public function receiveMessages(
151+
public function peekMessages(
148152
WorkflowRun $run,
149153
?string $streamKey = null,
150154
int $limit = 100,
@@ -155,6 +159,26 @@ public function receiveMessages(
155159
return WorkflowMessage::getUnconsumedForStream($run, $streamKey, $afterSequence, $limit);
156160
}
157161

162+
/**
163+
* Compatibility alias for {@see peekMessages()}.
164+
*
165+
* "Receive" in the v2 message stream contract means "read pending
166+
* messages without consuming them". Cursor movement remains explicit.
167+
*
168+
* @param WorkflowRun $run The receiving workflow run
169+
* @param string|null $streamKey Stream key (defaults to instance stream)
170+
* @param int $limit Maximum number of messages to fetch
171+
*
172+
* @return \Illuminate\Database\Eloquent\Collection<WorkflowMessage>
173+
*/
174+
public function receiveMessages(
175+
WorkflowRun $run,
176+
?string $streamKey = null,
177+
int $limit = 100,
178+
): \Illuminate\Database\Eloquent\Collection {
179+
return $this->peekMessages($run, $streamKey, $limit);
180+
}
181+
158182
/**
159183
* Consume a message and advance the cursor.
160184
*
@@ -196,7 +220,7 @@ public function consumeMessage(WorkflowRun $run, WorkflowMessage $message, int $
196220
* @param array<WorkflowMessage> $messages Messages to consume
197221
* @param int $consumedBySequence The history sequence that consumed these messages
198222
*/
199-
public function consumeMessageBatch(WorkflowRun $run, array $messages, int $consumedBySequence): void
223+
public function consumeMessages(WorkflowRun $run, array $messages, int $consumedBySequence): void
200224
{
201225
if (empty($messages)) {
202226
return;
@@ -233,11 +257,19 @@ public function consumeMessageBatch(WorkflowRun $run, array $messages, int $cons
233257
));
234258
}
235259

260+
if ($streamKey !== null && $message->stream_key !== $streamKey) {
261+
throw new InvalidArgumentException(sprintf(
262+
'Cannot consume messages from multiple streams in one batch (%s, %s).',
263+
$streamKey,
264+
$message->stream_key,
265+
));
266+
}
267+
268+
$streamKey ??= $message->stream_key;
236269
$message->markConsumed($consumedBySequence);
237270

238271
if ($message->sequence > $maxSequence) {
239272
$maxSequence = $message->sequence;
240-
$streamKey = $message->stream_key;
241273
}
242274
}
243275

@@ -248,6 +280,18 @@ public function consumeMessageBatch(WorkflowRun $run, array $messages, int $cons
248280
});
249281
}
250282

283+
/**
284+
* Compatibility alias for {@see consumeMessages()}.
285+
*
286+
* @param WorkflowRun $run The consuming workflow run
287+
* @param array<WorkflowMessage> $messages Messages to consume
288+
* @param int $consumedBySequence The history sequence that consumed these messages
289+
*/
290+
public function consumeMessageBatch(WorkflowRun $run, array $messages, int $consumedBySequence): void
291+
{
292+
$this->consumeMessages($run, $messages, $consumedBySequence);
293+
}
294+
251295
/**
252296
* Get count of unconsumed messages for a stream.
253297
*

tests/Unit/V2/MessageServiceTest.php

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,26 @@ public function testItReceivesUnconsumedInboundMessages(): void
148148
$this->assertEquals([1, 2, 3], $sequences);
149149
}
150150

151+
public function testPeekMessagesIsReadOnlyAndReceiveIsCompatibilityAlias(): void
152+
{
153+
$sourceRun = $this->createRun();
154+
$targetRun = $this->createRun();
155+
156+
$this->service->sendMessage($sourceRun, MessageChannel::Signal, $targetRun->workflow_instance_id);
157+
$this->service->sendMessage($sourceRun, MessageChannel::Signal, $targetRun->workflow_instance_id);
158+
159+
$peeked = $this->service->peekMessages($targetRun);
160+
$received = $this->service->receiveMessages($targetRun);
161+
162+
$this->assertEquals($peeked->pluck('id')->all(), $received->pluck('id')->all());
163+
164+
$targetRun->refresh();
165+
$this->assertSame(0, (int) $targetRun->message_cursor_position);
166+
$this->assertTrue($peeked->every(
167+
static fn (WorkflowMessage $message): bool => $message->consume_state === MessageConsumeState::Pending
168+
));
169+
}
170+
151171
public function testItConsumesMessageAndAdvancesCursor(): void
152172
{
153173
$sourceRun = $this->createRun();
@@ -202,6 +222,41 @@ public function testItConsumesMessageBatchAndAdvancesToHighestSequence(): void
202222
$this->assertEquals(3, $targetRun->message_cursor_position);
203223
}
204224

225+
public function testItRejectsMixedStreamBatchConsumption(): void
226+
{
227+
$sourceRun = $this->createRun();
228+
$targetRun = $this->createRun();
229+
230+
$this->service->sendMessage(
231+
$sourceRun,
232+
MessageChannel::Signal,
233+
$targetRun->workflow_instance_id,
234+
null,
235+
'stream_a'
236+
);
237+
$this->service->sendMessage(
238+
$sourceRun,
239+
MessageChannel::Signal,
240+
$targetRun->workflow_instance_id,
241+
null,
242+
'stream_b'
243+
);
244+
245+
$messages = [
246+
$this->service->receiveMessages($targetRun, 'stream_a')
247+
->first(),
248+
$this->service->receiveMessages($targetRun, 'stream_b')
249+
->first(),
250+
];
251+
252+
$this->expectException(InvalidArgumentException::class);
253+
$this->expectExceptionMessage(
254+
'Cannot consume messages from multiple streams in one batch (stream_a, stream_b).'
255+
);
256+
257+
$this->service->consumeMessages($targetRun, $messages, 10);
258+
}
259+
205260
public function testItOnlyReceivesMessagesAfterCursorPosition(): void
206261
{
207262
$sourceRun = $this->createRun();

0 commit comments

Comments
 (0)