Skip to content

Commit 9438339

Browse files
committed
refactor: enhance stream processing in Adapter class
- Introduced methods for managing streaming buffers and processing lines from chunks. - Added constants and properties to handle incomplete SSE fragments. - Updated adapter implementations (Anthropic, Deepseek, Gemini, OpenAI, Perplexity, XAI) to utilize new stream processing methods, improving code clarity and maintainability.
1 parent 06064fd commit 9438339

7 files changed

Lines changed: 202 additions & 172 deletions

File tree

src/Agents/Adapter.php

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,15 @@
22

33
namespace Utopia\Agents;
44

5+
use Utopia\Fetch\Chunk;
6+
57
abstract class Adapter
68
{
9+
/**
10+
* Upper bound for retained incomplete SSE fragments.
11+
*/
12+
protected const STREAM_BUFFER_MAX_BYTES = 1048576;
13+
714
/**
815
* The agent instance
916
*/
@@ -34,6 +41,11 @@ abstract class Adapter
3441
*/
3542
protected int $timeout = 90000;
3643

44+
/**
45+
* Carries incomplete SSE line fragments between chunks.
46+
*/
47+
protected string $streamBuffer = '';
48+
3749
/**
3850
* Get the adapter name
3951
*/
@@ -216,4 +228,98 @@ public function getTimeout(): int
216228
{
217229
return $this->timeout;
218230
}
231+
232+
/**
233+
* Reset streaming buffer before/after stream lifecycle.
234+
*/
235+
protected function beginStreamProcessing(): void
236+
{
237+
$this->streamBuffer = '';
238+
}
239+
240+
/**
241+
* Reset streaming buffer before/after stream lifecycle.
242+
*/
243+
protected function endStreamProcessing(): void
244+
{
245+
$this->streamBuffer = '';
246+
}
247+
248+
/**
249+
* @return array{0: string, 1: array<int, string>}
250+
*/
251+
protected function prepareStreamLines(Chunk $chunk): array
252+
{
253+
$data = $this->streamBuffer.$chunk->getData();
254+
$lines = explode("\n", $data);
255+
256+
if ($data !== '' && ! str_ends_with($data, "\n")) {
257+
$this->streamBuffer = array_pop($lines) ?? '';
258+
if (strlen($this->streamBuffer) > self::STREAM_BUFFER_MAX_BYTES) {
259+
$this->streamBuffer = substr($this->streamBuffer, -self::STREAM_BUFFER_MAX_BYTES);
260+
}
261+
} else {
262+
$this->streamBuffer = '';
263+
}
264+
265+
return [$data, $lines];
266+
}
267+
268+
/**
269+
* Decode a standard SSE "data: {json}" line.
270+
*
271+
* @return array<string, mixed>|null
272+
*/
273+
protected function decodeSseJsonLine(string $line): ?array
274+
{
275+
if (trim($line) === '') {
276+
return null;
277+
}
278+
279+
if (! str_starts_with($line, 'data: ')) {
280+
return null;
281+
}
282+
283+
$payload = substr($line, 6);
284+
if (trim($payload) === '[DONE]') {
285+
return null;
286+
}
287+
288+
$json = json_decode($payload, true);
289+
290+
return is_array($json) ? $json : null;
291+
}
292+
293+
/**
294+
* Decode either raw JSON lines or SSE "data: {json}" lines.
295+
*
296+
* @return array<string, mixed>|null
297+
*/
298+
protected function decodeJsonOrSseLine(string $line): ?array
299+
{
300+
if (trim($line) === '') {
301+
return null;
302+
}
303+
304+
$payload = str_starts_with($line, 'data: ') ? substr($line, 6) : $line;
305+
if (trim($payload) === '[DONE]') {
306+
return null;
307+
}
308+
309+
$json = json_decode($payload, true);
310+
311+
return is_array($json) ? $json : null;
312+
}
313+
314+
protected function appendStreamToken(string &$block, string $token, ?callable $listener): void
315+
{
316+
if ($token === '') {
317+
return;
318+
}
319+
320+
$block .= $token;
321+
if ($listener !== null) {
322+
$listener($token);
323+
}
324+
}
219325
}

src/Agents/Adapters/Anthropic.php

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -180,16 +180,21 @@ public function send(array $messages, ?callable $listener = null): Message
180180

181181
$content = '';
182182
if ($payload['stream']) {
183-
$response = $client->fetch(
184-
'https://api.anthropic.com/v1/messages',
185-
Client::METHOD_POST,
186-
$payload,
187-
[],
188-
function ($chunk) use (&$content, $listener) {
189-
/** @var Chunk $chunk */
190-
$content .= $this->process($chunk, $listener);
191-
}
192-
);
183+
$this->beginStreamProcessing();
184+
try {
185+
$response = $client->fetch(
186+
'https://api.anthropic.com/v1/messages',
187+
Client::METHOD_POST,
188+
$payload,
189+
[],
190+
function ($chunk) use (&$content, $listener) {
191+
/** @var Chunk $chunk */
192+
$content .= $this->process($chunk, $listener);
193+
}
194+
);
195+
} finally {
196+
$this->endStreamProcessing();
197+
}
193198
} else {
194199
$response = $client->fetch(
195200
'https://api.anthropic.com/v1/messages',
@@ -247,18 +252,10 @@ function ($chunk) use (&$content, $listener) {
247252
protected function process(Chunk $chunk, ?callable $listener): string
248253
{
249254
$block = '';
250-
$data = $chunk->getData();
251-
$lines = explode("\n", $data);
255+
[, $lines] = $this->prepareStreamLines($chunk);
252256

253257
foreach ($lines as $line) {
254-
if (empty(trim($line))) {
255-
continue;
256-
}
257-
258-
// Check if line starts with "data: " prefix and remove it, otherwise use the line as-is
259-
$jsonString = str_starts_with($line, 'data: ') ? substr($line, 6) : $line;
260-
$json = json_decode($jsonString, true);
261-
258+
$json = $this->decodeJsonOrSseLine($line);
262259
if (! is_array($json)) {
263260
continue;
264261
}
@@ -297,15 +294,8 @@ protected function process(Chunk $chunk, ?callable $listener): string
297294
}
298295

299296
$deltaType = $json['delta']['type'];
300-
301297
if ($deltaType === 'text_delta' && isset($json['delta']['text']) && is_string($json['delta']['text'])) {
302-
$block = $json['delta']['text'];
303-
}
304-
305-
if (! empty($block)) {
306-
if ($listener !== null) {
307-
$listener($block);
308-
}
298+
$this->appendStreamToken($block, $json['delta']['text'], $listener);
309299
}
310300
break;
311301

src/Agents/Adapters/Deepseek.php

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,21 @@ public function send(array $messages, ?callable $listener = null): Message
123123
}
124124

125125
$content = '';
126-
$response = $client->fetch(
127-
'https://api.deepseek.com/chat/completions',
128-
Client::METHOD_POST,
129-
$payload,
130-
[],
131-
function ($chunk) use (&$content, $listener) {
132-
/** @var Chunk $chunk */
133-
$content .= $this->process($chunk, $listener);
134-
}
135-
);
126+
$this->beginStreamProcessing();
127+
try {
128+
$response = $client->fetch(
129+
'https://api.deepseek.com/chat/completions',
130+
Client::METHOD_POST,
131+
$payload,
132+
[],
133+
function ($chunk) use (&$content, $listener) {
134+
/** @var Chunk $chunk */
135+
$content .= $this->process($chunk, $listener);
136+
}
137+
);
138+
} finally {
139+
$this->endStreamProcessing();
140+
}
136141

137142
if ($response->getStatusCode() >= 400) {
138143
throw new \Exception(
@@ -153,29 +158,15 @@ function ($chunk) use (&$content, $listener) {
153158
protected function process(Chunk $chunk, ?callable $listener): string
154159
{
155160
$block = '';
156-
$data = $chunk->getData();
157-
$lines = explode("\n", $data);
161+
[$data, $lines] = $this->prepareStreamLines($chunk);
158162

159163
$json = json_decode($data, true);
160164
if (is_array($json) && isset($json['error'])) {
161165
return $this->formatErrorMessage($json);
162166
}
163167

164168
foreach ($lines as $line) {
165-
if (empty(trim($line))) {
166-
continue;
167-
}
168-
169-
if (! str_starts_with($line, 'data: ')) {
170-
continue;
171-
}
172-
173-
$line = substr($line, 6);
174-
if ($line === '[DONE]') {
175-
continue;
176-
}
177-
178-
$json = json_decode($line, true);
169+
$json = $this->decodeSseJsonLine($line);
179170
if (! is_array($json)) {
180171
continue;
181172
}
@@ -184,13 +175,7 @@ protected function process(Chunk $chunk, ?callable $listener): string
184175
$firstChoice = isset($choices[0]) && is_array($choices[0]) ? $choices[0] : [];
185176
$delta = isset($firstChoice['delta']) && is_array($firstChoice['delta']) ? $firstChoice['delta'] : [];
186177
if (isset($delta['content']) && is_string($delta['content'])) {
187-
$deltaContent = $delta['content'];
188-
if (! empty($deltaContent)) {
189-
$block .= $deltaContent;
190-
if ($listener !== null) {
191-
$listener($deltaContent);
192-
}
193-
}
178+
$this->appendStreamToken($block, $delta['content'], $listener);
194179
}
195180

196181
if (isset($json['usage']) && is_array($json['usage'])) {

src/Agents/Adapters/Gemini.php

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,21 @@ public function send(array $messages, ?callable $listener = null): Message
131131
];
132132

133133
$content = '';
134-
$response = $client->fetch(
135-
$this->endpoint,
136-
Client::METHOD_POST,
137-
$payload,
138-
[],
139-
function ($chunk) use (&$content, $listener) {
140-
/** @var Chunk $chunk */
141-
$content .= $this->process($chunk, $listener);
142-
}
143-
);
134+
$this->beginStreamProcessing();
135+
try {
136+
$response = $client->fetch(
137+
$this->endpoint,
138+
Client::METHOD_POST,
139+
$payload,
140+
[],
141+
function ($chunk) use (&$content, $listener) {
142+
/** @var Chunk $chunk */
143+
$content .= $this->process($chunk, $listener);
144+
}
145+
);
146+
} finally {
147+
$this->endStreamProcessing();
148+
}
144149

145150
if ($response->getStatusCode() >= 400) {
146151
throw new \Exception(
@@ -163,29 +168,15 @@ function ($chunk) use (&$content, $listener) {
163168
protected function process(Chunk $chunk, ?callable $listener): string
164169
{
165170
$block = '';
166-
$data = $chunk->getData();
167-
$lines = explode("\n", $data);
171+
[$data, $lines] = $this->prepareStreamLines($chunk);
168172

169173
$json = json_decode($data, true);
170174
if (is_array($json) && isset($json['error'])) {
171175
return $this->formatErrorMessage($json);
172176
}
173177

174178
foreach ($lines as $line) {
175-
if (empty(trim($line))) {
176-
continue;
177-
}
178-
179-
if (! str_starts_with($line, 'data: ')) {
180-
continue;
181-
}
182-
183-
// Handle [DONE] message
184-
if (trim($line) === 'data: [DONE]') {
185-
continue;
186-
}
187-
188-
$json = json_decode(substr($line, 6), true);
179+
$json = $this->decodeSseJsonLine($line);
189180
if (! is_array($json)) {
190181
continue;
191182
}
@@ -197,11 +188,7 @@ protected function process(Chunk $chunk, ?callable $listener): string
197188
$parts = isset($content['parts']) && is_array($content['parts']) ? $content['parts'] : [];
198189
$firstPart = isset($parts[0]) && is_array($parts[0]) ? $parts[0] : [];
199190
if (isset($firstPart['text']) && is_string($firstPart['text'])) {
200-
$block = $firstPart['text'];
201-
202-
if (! empty($block) && $listener !== null) {
203-
$listener($block);
204-
}
191+
$this->appendStreamToken($block, $firstPart['text'], $listener);
205192
}
206193
}
207194

0 commit comments

Comments
 (0)