Skip to content

Commit 0ece5d6

Browse files
committed
feat: robust http streaming
1 parent 65382fa commit 0ece5d6

3 files changed

Lines changed: 334 additions & 148 deletions

File tree

src/KubernetesCluster.php

Lines changed: 13 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ class KubernetesCluster
132132
use Traits\Cluster\ChecksClusterVersion;
133133
use Traits\Cluster\LoadsFromKubeConfig;
134134
use Traits\Cluster\MakesHttpCalls;
135+
use Traits\Cluster\MakesWatchCalls;
135136
use Traits\Cluster\MakesWebsocketCalls;
136137

137138
/**
@@ -176,8 +177,8 @@ public function runOperation(Operation|string $operation, string $path, string|n
176177
}
177178

178179
return match ($operation) {
179-
Operation::WATCH => $this->watchPath($path, $payload, $query),
180-
Operation::WATCH_LOGS => $this->watchLogsPath($path, $payload, $query),
180+
Operation::WATCH => $this->runWatchOperation($this->watchPath($path, $payload, $query)),
181+
Operation::WATCH_LOGS => $this->runWatchOperation($this->watchLogsPath($path, $payload, $query)),
181182
Operation::EXEC => $this->execPath($path, $query),
182183
Operation::ATTACH => $this->attachPath($path, $payload, $query),
183184
Operation::APPLY => $this->applyPath($path, $payload, $query),
@@ -188,156 +189,22 @@ public function runOperation(Operation|string $operation, string $path, string|n
188189
}
189190

190191
/**
191-
* Watch for the current resource or a resource list.
192+
* Run a watch operation and return the result.
192193
*
193-
* @return mixed|null
194+
* @param array{0: \React\EventLoop\LoopInterface, 1: \React\Promise\PromiseInterface} $loopAndPromise
194195
*/
195-
protected function watchPath(string $path, Closure $callback, array $query = ['pretty' => 1]): mixed
196+
protected function runWatchOperation(array $loopAndPromise): mixed
196197
{
197-
$resourceClass = $this->resourceClass;
198-
$sock = $this->createSocketConnection($this->getCallableUrl($path, $query));
198+
[$loop, $promise] = $loopAndPromise;
199199

200-
if ($sock === false) {
201-
return null;
202-
}
203-
204-
// Set stream to non-blocking mode to allow timeout handling
205-
stream_set_blocking($sock, false);
206-
207-
// Calculate overall timeout: server timeout + buffer for network/processing
208-
$timeout = ($query['timeoutSeconds'] ?? 30) + 5;
209-
$endTime = time() + $timeout;
210-
211-
$buffer = '';
212-
213-
while (time() < $endTime) {
214-
// Try to read data (non-blocking)
215-
$chunk = fread($sock, 8192);
216-
217-
if ($chunk === false) {
218-
// Error occurred
219-
fclose($sock);
220-
221-
return null;
222-
}
223-
224-
if ($chunk === '') {
225-
// No data available, check if stream ended
226-
if (feof($sock)) {
227-
break;
228-
}
229-
230-
// No data yet, sleep briefly and continue
231-
usleep(100000); // 100ms
232-
233-
continue;
234-
}
235-
236-
// Append chunk to buffer
237-
$buffer .= $chunk;
238-
239-
// Process complete lines from buffer
240-
while (($pos = strpos($buffer, "\n")) !== false) {
241-
$line = substr($buffer, 0, $pos);
242-
$buffer = substr($buffer, $pos + 1);
243-
244-
if (trim($line) === '') {
245-
continue;
246-
}
247-
248-
$data = @json_decode($line, true);
249-
250-
if (! $data || ! isset($data['type'], $data['object'])) {
251-
continue;
252-
}
253-
254-
['type' => $type, 'object' => $attributes] = $data;
255-
256-
$call = call_user_func(
257-
$callback,
258-
$type,
259-
new $resourceClass($this, $attributes)
260-
);
261-
262-
if (! is_null($call)) {
263-
fclose($sock);
264-
265-
return $call;
266-
}
267-
}
268-
}
269-
270-
fclose($sock);
271-
272-
return null;
273-
}
274-
275-
/**
276-
* Watch for the logs for the resource.
277-
*
278-
* @return mixed|null
279-
*/
280-
protected function watchLogsPath(string $path, Closure $callback, array $query = ['pretty' => 1]): mixed
281-
{
282-
$sock = $this->createSocketConnection($this->getCallableUrl($path, $query));
283-
284-
if ($sock === false) {
285-
return null;
286-
}
287-
288-
// Set stream to non-blocking mode to allow timeout handling
289-
stream_set_blocking($sock, false);
290-
291-
// Calculate overall timeout: server timeout + buffer for network/processing
292-
$timeout = ($query['timeoutSeconds'] ?? 30) + 5;
293-
$endTime = time() + $timeout;
294-
295-
$buffer = '';
296-
297-
while (time() < $endTime) {
298-
// Try to read data (non-blocking)
299-
$chunk = fread($sock, 8192);
300-
301-
if ($chunk === false) {
302-
// Error occurred
303-
fclose($sock);
304-
305-
return null;
306-
}
307-
308-
if ($chunk === '') {
309-
// No data available, check if stream ended
310-
if (feof($sock)) {
311-
break;
312-
}
313-
314-
// No data yet, sleep briefly and continue
315-
usleep(100000); // 100ms
316-
317-
continue;
318-
}
319-
320-
// Append chunk to buffer
321-
$buffer .= $chunk;
322-
323-
// Process complete lines from buffer
324-
while (($pos = strpos($buffer, "\n")) !== false) {
325-
$line = substr($buffer, 0, $pos);
326-
$buffer = substr($buffer, $pos + 1);
327-
328-
$call = call_user_func($callback, $line."\n");
329-
330-
if (! is_null($call)) {
331-
fclose($sock);
332-
333-
return $call;
334-
}
335-
}
336-
}
200+
$result = null;
201+
$promise->then(function ($value) use (&$result) {
202+
$result = $value;
203+
});
337204

338-
fclose($sock);
205+
$loop->run();
339206

340-
return null;
207+
return $result;
341208
}
342209

343210
/**

0 commit comments

Comments
 (0)