Skip to content

Commit 71a5faf

Browse files
committed
feat: Implement streaming response functionality with callable and generator support
1 parent 0f3bf23 commit 71a5faf

4 files changed

Lines changed: 358 additions & 1 deletion

File tree

src/Http/Adapter/Swoole/Response.php

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Utopia\Http\Adapter\Swoole;
44

55
use Swoole\Http\Response as SwooleResponse;
6+
use Swoole\Http\Server as SwooleServer;
67
use Utopia\Http\Response as UtopiaResponse;
78

89
class Response extends UtopiaResponse
@@ -14,6 +15,13 @@ class Response extends UtopiaResponse
1415
*/
1516
protected SwooleResponse $swoole;
1617

18+
/**
19+
* Swoole Server Object (needed for raw TCP streaming with Content-Length)
20+
*
21+
* @var SwooleServer|null
22+
*/
23+
protected ?SwooleServer $server = null;
24+
1725
/**
1826
* Response constructor.
1927
*/
@@ -23,6 +31,19 @@ public function __construct(SwooleResponse $response)
2331
parent::__construct(\microtime(true));
2432
}
2533

34+
/**
35+
* Set the Swoole server instance for raw TCP streaming.
36+
*
37+
* @param SwooleServer $server
38+
* @return static
39+
*/
40+
public function setServer(SwooleServer $server): static
41+
{
42+
$this->server = $server;
43+
44+
return $this;
45+
}
46+
2647
/**
2748
* Write
2849
*
@@ -45,6 +66,125 @@ public function end(?string $content = null): void
4566
$this->swoole->end($content);
4667
}
4768

69+
/**
70+
* Stream response
71+
*
72+
* Uses detach() + $server->send() for raw TCP streaming that preserves
73+
* Content-Length (so browsers show download progress). Falls back to
74+
* the base class implementation if no server instance is available.
75+
*
76+
* @param callable|\Generator $source Either a callable($offset, $length) or a Generator yielding string chunks
77+
* @param int $totalSize Total size of the content in bytes
78+
* @return void
79+
*/
80+
public function stream(callable|\Generator $source, int $totalSize): void
81+
{
82+
if ($this->sent) {
83+
return;
84+
}
85+
86+
$this->sent = true;
87+
88+
if ($this->server === null) {
89+
$this->sent = false;
90+
parent::stream($source, $totalSize);
91+
92+
return;
93+
}
94+
95+
if ($this->disablePayload) {
96+
$this->appendCookies();
97+
$this->appendHeaders();
98+
$this->swoole->end();
99+
$this->disablePayload();
100+
101+
return;
102+
}
103+
104+
// Build raw HTTP response headers for direct TCP send
105+
$this->addHeader('Content-Length', (string) $totalSize, override: true);
106+
$this->addHeader('Connection', 'close', override: true);
107+
$this->addHeader('X-Debug-Speed', (string) (microtime(true) - $this->startTime), override: true);
108+
109+
if (!empty($this->contentType)) {
110+
$this->addHeader('Content-Type', $this->contentType, override: true);
111+
}
112+
113+
$statusReason = $this->getStatusCodeReason($this->statusCode);
114+
$rawHeaders = "HTTP/1.1 {$this->statusCode} {$statusReason}\r\n";
115+
116+
foreach ($this->headers as $key => $value) {
117+
if (\is_array($value)) {
118+
foreach ($value as $v) {
119+
$rawHeaders .= "{$key}: {$v}\r\n";
120+
}
121+
} else {
122+
$rawHeaders .= "{$key}: {$value}\r\n";
123+
}
124+
}
125+
126+
foreach ($this->cookies as $cookie) {
127+
$cookieStr = \urlencode($cookie['name']) . '=' . \urlencode($cookie['value'] ?? '');
128+
if (!empty($cookie['expire'])) {
129+
$cookieStr .= '; Expires=' . \gmdate('D, d M Y H:i:s T', $cookie['expire']);
130+
}
131+
if (!empty($cookie['path'])) {
132+
$cookieStr .= '; Path=' . $cookie['path'];
133+
}
134+
if (!empty($cookie['domain'])) {
135+
$cookieStr .= '; Domain=' . $cookie['domain'];
136+
}
137+
if (!empty($cookie['secure'])) {
138+
$cookieStr .= '; Secure';
139+
}
140+
if (!empty($cookie['httponly'])) {
141+
$cookieStr .= '; HttpOnly';
142+
}
143+
if (!empty($cookie['samesite'])) {
144+
$cookieStr .= '; SameSite=' . $cookie['samesite'];
145+
}
146+
$rawHeaders .= "Set-Cookie: {$cookieStr}\r\n";
147+
}
148+
149+
$rawHeaders .= "\r\n";
150+
151+
// Detach from Swoole HTTP layer and send raw TCP
152+
$fd = $this->swoole->fd;
153+
$this->swoole->detach();
154+
155+
if ($this->server->send($fd, $rawHeaders) === false) {
156+
$this->disablePayload();
157+
158+
return;
159+
}
160+
161+
// Stream body chunks
162+
if ($source instanceof \Generator) {
163+
foreach ($source as $chunk) {
164+
if (!empty($chunk)) {
165+
$this->size += strlen($chunk);
166+
if ($this->server->send($fd, $chunk) === false) {
167+
break;
168+
}
169+
}
170+
}
171+
} else {
172+
$length = self::CHUNK_SIZE;
173+
for ($offset = 0; $offset < $totalSize; $offset += $length) {
174+
$chunk = $source($offset, min($length, $totalSize - $offset));
175+
if (!empty($chunk)) {
176+
$this->size += strlen($chunk);
177+
if ($this->server->send($fd, $chunk) === false) {
178+
break;
179+
}
180+
}
181+
}
182+
}
183+
184+
$this->server->close($fd);
185+
$this->disablePayload();
186+
}
187+
48188
/**
49189
* Get status code reason
50190
*

src/Http/Adapter/Swoole/Server.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ public function onRequest(callable $callback)
2626
Http::setResource('swooleRequest', fn () => $request);
2727
Http::setResource('swooleResponse', fn () => $response);
2828

29-
call_user_func($callback, new Request($request), new Response($response));
29+
$utopiaResponse = new Response($response);
30+
$utopiaResponse->setServer($this->server);
31+
call_user_func($callback, new Request($request), $utopiaResponse);
3032
});
3133
}
3234

src/Http/Response.php

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,57 @@ public function iframe(string $callback, array $data): void
991991
->send('<script type="text/javascript">window.parent.'.$callback.'('.\json_encode($data).');</script>');
992992
}
993993

994+
/**
995+
* Stream response
996+
*
997+
* Stream file content to the client using either a callable (pull-based, offset/length)
998+
* or a Generator/iterable (push-based, yielding chunks from a streaming source).
999+
*
1000+
* @param callable|\Generator $source Either a callable($offset, $length) or a Generator yielding string chunks
1001+
* @param int $totalSize Total size of the content in bytes
1002+
* @return void
1003+
*/
1004+
public function stream(callable|\Generator $source, int $totalSize): void
1005+
{
1006+
if ($this->sent) {
1007+
return;
1008+
}
1009+
1010+
$this->addHeader('Content-Length', (string) $totalSize, override: true);
1011+
$this->addHeader('X-Debug-Speed', (string) (microtime(true) - $this->startTime), override: true);
1012+
1013+
$this->appendCookies();
1014+
$this->appendHeaders();
1015+
1016+
if ($this->disablePayload) {
1017+
$this->end();
1018+
$this->sent = true;
1019+
return;
1020+
}
1021+
1022+
if ($source instanceof \Generator) {
1023+
foreach ($source as $chunk) {
1024+
if (!empty($chunk)) {
1025+
$this->size += strlen($chunk);
1026+
$this->write($chunk);
1027+
}
1028+
}
1029+
} else {
1030+
$length = self::CHUNK_SIZE;
1031+
for ($offset = 0; $offset < $totalSize; $offset += $length) {
1032+
$chunk = $source($offset, $length);
1033+
if (!empty($chunk)) {
1034+
$this->size += strlen($chunk);
1035+
$this->write($chunk);
1036+
}
1037+
}
1038+
}
1039+
1040+
$this->end();
1041+
$this->sent = true;
1042+
$this->disablePayload();
1043+
}
1044+
9941045
/**
9951046
* No Content
9961047
*

0 commit comments

Comments
 (0)