@@ -20,6 +20,11 @@ class Ai extends Command
2020
2121 protected $ description = 'Interactive AI travel agent powered by a durable workflow ' ;
2222
23+ /**
24+ * @var array<string, int>
25+ */
26+ private array $ printedAssistantMessageSequences = [];
27+
2328 public function handle (): int
2429 {
2530 $ injectFailure = $ this ->option ('inject-failure ' );
@@ -97,24 +102,31 @@ public function handle(): int
97102 }
98103
99104 /**
100- * Poll the workflow's assistant message stream until one message arrives, then display it.
105+ * Poll the workflow's durable assistant projection and message stream until one message arrives, then display it.
101106 *
102107 * Uses attemptUpdate() so that v2 protocol-level rejections like
103108 * earlier_signal_pending (the signal we just sent has not been applied to
104109 * the workflow yet) are treated as "try again shortly" rather than fatal.
105110 */
106111 private function waitForMessage (WorkflowStub $ workflow , int $ timeoutSeconds = 120 ): bool
107112 {
108- $ elapsed = 0 ;
113+ $ deadline = time () + $ timeoutSeconds ;
109114
110- while ($ elapsed < $ timeoutSeconds ) {
115+ while (time () < $ deadline ) {
111116 $ workflow ->refresh ();
117+
118+ if ($ this ->printLatestAssistantMessage ($ workflow , onlyNew: true )) {
119+ $ workflow ->refresh ();
120+
121+ return ! $ workflow ->failed ();
122+ }
123+
112124 if ($ workflow ->failed ()) {
113125 return false ;
114126 }
115127
116128 if ($ workflow ->completed ()) {
117- return $ this ->printLatestAssistantMessage ($ workflow );
129+ return $ this ->printLatestAssistantMessage ($ workflow, onlyNew: true );
118130 }
119131
120132 $ result = $ workflow ->attemptUpdate ('receive ' );
@@ -123,15 +135,21 @@ private function waitForMessage(WorkflowStub $workflow, int $timeoutSeconds = 12
123135 $ message = $ result ->result ();
124136
125137 if ($ message !== null ) {
138+ if ($ this ->assistantMessageAlreadyPrinted ($ workflow , (string ) $ message )) {
139+ sleep (1 );
140+ continue ;
141+ }
142+
126143 $ this ->newLine ();
127144 $ this ->line ("<comment>Agent:</comment> {$ message }" );
145+ $ this ->rememberLatestAssistantMessageSequence ($ workflow );
128146 $ workflow ->refresh ();
129147
130148 return ! $ workflow ->failed ();
131149 }
132150 } elseif ($ result ->failed ()) {
133151 $ workflow ->refresh ();
134- if ($ workflow ->completed () && $ this ->printLatestAssistantMessage ($ workflow )) {
152+ if ($ workflow ->completed () && $ this ->printLatestAssistantMessage ($ workflow, onlyNew: true )) {
135153 return true ;
136154 }
137155
@@ -148,33 +166,47 @@ private function waitForMessage(WorkflowStub $workflow, int $timeoutSeconds = 12
148166 }
149167
150168 if ($ workflow ->completed ()) {
151- return $ this ->printLatestAssistantMessage ($ workflow );
169+ return $ this ->printLatestAssistantMessage ($ workflow, onlyNew: true );
152170 }
153171
154172 sleep (2 );
155- $ elapsed += 2 ;
156173 }
157174
158175 $ this ->error ('Timed out waiting for a response. ' );
159176
160177 return false ;
161178 }
162179
163- private function printLatestAssistantMessage (WorkflowStub $ workflow ): bool
180+ private function printLatestAssistantMessage (WorkflowStub $ workflow, bool $ onlyNew = false ): bool
164181 {
165- $ message = $ this ->latestAssistantMessage ($ workflow );
182+ $ record = $ this ->latestAssistantMessageRecord ($ workflow );
166183
167- if ($ message === null ) {
184+ if ($ record === null ) {
185+ return false ;
186+ }
187+
188+ if ($ onlyNew && $ record ['sequence ' ] <= $ this ->printedAssistantMessageSequence ($ workflow )) {
168189 return false ;
169190 }
170191
171192 $ this ->newLine ();
172- $ this ->line ("<comment>Agent:</comment> {$ message }" );
193+ $ this ->line ("<comment>Agent:</comment> {$ record ['content ' ]}" );
194+ $ this ->rememberAssistantMessageSequence ($ workflow , $ record ['sequence ' ]);
173195
174196 return true ;
175197 }
176198
177199 private function latestAssistantMessage (WorkflowStub $ workflow ): ?string
200+ {
201+ $ record = $ this ->latestAssistantMessageRecord ($ workflow );
202+
203+ return $ record ['content ' ] ?? null ;
204+ }
205+
206+ /**
207+ * @return array{content: string, sequence: int}|null
208+ */
209+ private function latestAssistantMessageRecord (WorkflowStub $ workflow ): ?array
178210 {
179211 $ messages = AiWorkflowMessage::query ()
180212 ->where ('workflow_id ' , $ workflow ->workflowId ())
@@ -188,7 +220,10 @@ private function latestAssistantMessage(WorkflowStub $workflow): ?string
188220 return null ;
189221 }
190222
191- return $ message ->content ;
223+ return [
224+ 'content ' => $ message ->content ,
225+ 'sequence ' => $ this ->assistantMessageSequence ($ message ->reference ),
226+ ];
192227 }
193228
194229 private function assistantMessageSequence (string $ reference ): int
@@ -204,6 +239,43 @@ private function assistantMessageSequence(string $reference): int
204239 return ctype_digit ($ sequence ) ? (int ) $ sequence : 0 ;
205240 }
206241
242+ private function printedAssistantMessageSequence (WorkflowStub $ workflow ): int
243+ {
244+ return $ this ->printedAssistantMessageSequences [$ this ->assistantMessageKey ($ workflow )] ?? 0 ;
245+ }
246+
247+ private function rememberLatestAssistantMessageSequence (WorkflowStub $ workflow ): void
248+ {
249+ $ record = $ this ->latestAssistantMessageRecord ($ workflow );
250+
251+ if ($ record !== null ) {
252+ $ this ->rememberAssistantMessageSequence ($ workflow , $ record ['sequence ' ]);
253+ }
254+ }
255+
256+ private function rememberAssistantMessageSequence (WorkflowStub $ workflow , int $ sequence ): void
257+ {
258+ $ key = $ this ->assistantMessageKey ($ workflow );
259+ $ this ->printedAssistantMessageSequences [$ key ] = max (
260+ $ sequence ,
261+ $ this ->printedAssistantMessageSequences [$ key ] ?? 0 ,
262+ );
263+ }
264+
265+ private function assistantMessageAlreadyPrinted (WorkflowStub $ workflow , string $ content ): bool
266+ {
267+ $ record = $ this ->latestAssistantMessageRecord ($ workflow );
268+
269+ return $ record !== null
270+ && $ record ['content ' ] === $ content
271+ && $ record ['sequence ' ] <= $ this ->printedAssistantMessageSequence ($ workflow );
272+ }
273+
274+ private function assistantMessageKey (WorkflowStub $ workflow ): string
275+ {
276+ return $ workflow ->workflowId ().': ' .($ workflow ->runId () ?? 'pending ' );
277+ }
278+
207279 private function waitForTerminalState (WorkflowStub $ workflow , int $ timeoutSeconds ): bool
208280 {
209281 $ deadline = time () + $ timeoutSeconds ;
0 commit comments