Skip to content

Commit d4def96

Browse files
authored
Implement TestKit result iteration stubs and SummarizedResult::list() (#301)
feat(bolt,testkit): TestKit result iteration, SummarizedResult::list(), PULL n=-1, and Bolt pull/disconnect fixes
1 parent c814ca0 commit d4def96

19 files changed

Lines changed: 814 additions & 51 deletions

src/Bolt/BoltConnection.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,12 @@ public function pull(?int $qid, ?int $fetchSize): array
337337
return $tbr;
338338
} catch (Throwable $e) {
339339
$this->restoreOriginalTimeout();
340-
// If we've received some records before the disconnect, return them so first next() succeeds and second next() fails.
340+
// If we've received some records before the disconnect, return them so first next() succeeds.
341+
// Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts).
342+
// Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator
343+
// would stop cleanly instead of surfacing the disconnect. A synthetic has_more:true means "not done".
341344
if (!empty($tbr)) {
342-
$tbr[] = [];
345+
$tbr[] = ['has_more' => true];
343346

344347
/** @var non-empty-list<list> */
345348
return $tbr;

src/Bolt/BoltResult.php

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
namespace Laudis\Neo4j\Bolt;
1515

16+
use function array_key_exists;
1617
use function array_splice;
1718

1819
use Bolt\error\BoltException;
@@ -23,10 +24,10 @@
2324
use Generator;
2425

2526
use function in_array;
27+
use function is_array;
2628

2729
use Iterator;
2830
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
29-
use RuntimeException;
3031
use Throwable;
3132

3233
/**
@@ -49,11 +50,37 @@ public function __construct(
4950
) {
5051
}
5152

53+
/**
54+
* Remaining server pulls use PULL n=-1 (TestKit Optimization:ResultListFetchAll / list()).
55+
*/
56+
private ?int $pullOverrideSize = null;
57+
58+
/**
59+
* True after at least one {@see fetchResults()} (network pull). Used so list() can reset a stale
60+
* cached generator before the first pull, but must not reset after next()+list() or rows replay.
61+
*/
62+
private bool $networkPullOccurred = false;
63+
64+
public function prepareForResultListFetchAll(): void
65+
{
66+
$this->pullOverrideSize = -1;
67+
// Drop cached generator only if no pull ran yet (e.g. valid()/getIt() touched before list()).
68+
// If next() already ran, resetting would restart iterator() and duplicate records on list().
69+
if ($this->it !== null && !$this->networkPullOccurred) {
70+
$this->it = null;
71+
}
72+
}
73+
5274
public function getFetchSize(): int
5375
{
5476
return $this->fetchSize;
5577
}
5678

79+
private function effectivePullSize(): int
80+
{
81+
return $this->pullOverrideSize ?? $this->fetchSize;
82+
}
83+
5784
private ?Generator $it = null;
5885

5986
/**
@@ -116,51 +143,45 @@ public function consume(): array
116143

117144
private function fetchResults(): void
118145
{
146+
$this->networkPullOccurred = true;
147+
119148
try {
120-
$meta = $this->connection->pull($this->qid, $this->fetchSize);
149+
$meta = $this->connection->pull($this->qid, $this->effectivePullSize());
121150
} catch (BoltConnectException|BoltException $e) {
122151
// Invalidate connection on socket/network errors so pool does not reuse it.
123152
// Rethrow as-is - Session retry logic inspects the actual exception via isConnectionError().
124153
$this->connection->invalidate();
125154
throw $e;
126155
}
127156
// Neo4jException and other Throwable propagate naturally - no invalidate needed for server errors
128-
129-
// Safety check: ensure pull response $meta is not empty (pull() is typed non-empty-list but we defend against empty)
130-
/** @psalm-suppress TypeDoesNotContainType */
131-
if (empty($meta)) {
132-
throw new RuntimeException('Empty response from server');
133-
}
157+
// $meta is non-empty: {@see BoltConnection::pull()} is contractually non-empty-list<list>.
134158

135159
/** @var list<list> $rows */
136160
$rows = array_splice($meta, 0, count($meta) - 1);
137161
$this->rows = $rows;
138162

139-
/** @var array{0: array} $meta */
140-
// Check if we have a valid summary
141-
/** @psalm-suppress RedundantConditionGivenDocblockType */
142-
if (count($meta) > 0 && is_array($meta[0])) {
143-
// If summary is empty array and we have no rows, it's a normal completion (no records)
144-
// If summary is empty array but we have rows, it's a partial pull from disconnect
145-
if (empty($meta[0]) && empty($rows)) {
146-
// Normal completion with no records - mark as complete
147-
$this->meta = [];
148-
} elseif (!empty($meta[0])) {
149-
// Valid summary with data
150-
if (!array_key_exists('has_more', $meta[0]) || $meta[0]['has_more'] === false) {
151-
$this->meta = $meta[0];
152-
}
153-
} else {
154-
// Empty summary but we have rows - partial result from disconnect
155-
// Set $this->meta to null so the next fetchResults() will try to pull again
156-
// This allows the first record to be consumed, and the next fetch will fail
157-
// which is the expected behavior for tests like exit_after_record
158-
$this->meta = null;
159-
}
160-
} else {
163+
$summarySlot = $meta[0] ?? null;
164+
if (!is_array($summarySlot)) {
161165
// No summary received (connection closed before summary)
162-
// Set $this->meta to null so the next fetchResults() will try to pull again
163166
$this->meta = null;
167+
168+
return;
169+
}
170+
171+
$summaryEmpty = $summarySlot === [];
172+
$hasDataRows = $rows !== [];
173+
174+
if ($summaryEmpty && !$hasDataRows) {
175+
// Normal completion with no records
176+
$this->meta = [];
177+
} elseif (!$summaryEmpty) {
178+
// Valid summary map (e.g. has_more, counters, db, …)
179+
if (!array_key_exists('has_more', $summarySlot) || $summarySlot['has_more'] === false) {
180+
$this->meta = $summarySlot;
181+
}
182+
} else {
183+
// Empty summary slot with data rows: Bolt SUCCESS {} after RECORDs — stream complete (no has_more keys).
184+
$this->meta = $summarySlot;
164185
}
165186
}
166187

src/Databags/SummarizedResult.php

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
namespace Laudis\Neo4j\Databags;
1515

16+
use Closure;
1617
use Generator;
1718
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
1819
use Laudis\Neo4j\Types\CypherList;
@@ -33,17 +34,32 @@ final class SummarizedResult extends CypherList
3334
*/
3435
private array $keys;
3536

37+
/**
38+
* Bolt: before materializing all records, use PULL n=-1 for remaining pulls (Result.list()).
39+
*
40+
* @var (Closure():void)|null
41+
*/
42+
private readonly ?Closure $prepareListFetchAll;
43+
44+
/**
45+
* Keeps the Bolt result stream alive until this summarized result is consumed (avoids premature BoltResult::__destruct).
46+
*/
47+
private readonly ?object $boltResultRef;
48+
3649
/**
3750
* @psalm-mutation-free
3851
*
3952
* @param iterable<mixed, CypherMap<OGMTypes>>|callable():Generator<mixed, CypherMap<OGMTypes>> $iterable
4053
* @param list<string> $keys
54+
* @param (Closure():void)|null $prepareListFetchAll
4155
*/
42-
public function __construct(?ResultSummary &$summary, iterable|callable $iterable = [], array $keys = [])
56+
public function __construct(?ResultSummary &$summary, iterable|callable $iterable = [], array $keys = [], ?Closure $prepareListFetchAll = null, ?object $boltResultRef = null)
4357
{
4458
parent::__construct($iterable);
4559
$this->summary = &$summary;
4660
$this->keys = $keys;
61+
$this->prepareListFetchAll = $prepareListFetchAll;
62+
$this->boltResultRef = $boltResultRef;
4763
}
4864

4965
/**
@@ -85,4 +101,25 @@ public function keys(): array
85101
{
86102
return $this->keys;
87103
}
104+
105+
/**
106+
* Materialize all remaining records (Bolt: remaining pulls use fetch-all / PULL n=-1 when configured).
107+
*
108+
* Does not rewind: if the caller already consumed rows with next(), only unconsumed rows are returned.
109+
* (iterator_to_array() would call rewind() and duplicate those rows.)
110+
*
111+
* @return list<CypherMap<OGMTypes>>
112+
*/
113+
public function list(): array
114+
{
115+
$this->prepareListFetchAll?->__invoke();
116+
117+
$rows = [];
118+
while ($this->valid()) {
119+
$rows[] = $this->current();
120+
$this->next();
121+
}
122+
123+
return $rows;
124+
}
88125
}

src/Formatter/SummarizedResultFormatter.php

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,17 +194,29 @@ function (mixed $response) use ($connection, $statement, $runStart, $resultAvail
194194
);
195195
});
196196

197-
$formattedResult = $this->processBoltResult($meta, $result, $connection, $holder);
197+
$boltResult = $result;
198+
$formattedResult = $this->processBoltResult($meta, $boltResult, $connection, $holder);
198199

199-
/** @var SummarizedResult */
200-
$result = (new CypherList($formattedResult))->withCacheLimit($result->getFetchSize());
200+
$recordsList = (new CypherList($formattedResult))->withCacheLimit($this->clientSideCacheLimitFromBoltFetchSize($boltResult->getFetchSize()));
201201
// Safely get fields from metadata, defaulting to empty array if missing (indicates connection loss)
202202
$keys = [];
203203
if (array_key_exists('fields', $meta)) {
204204
$keys = $meta['fields'];
205205
}
206206

207-
return new SummarizedResult($summary, $result, $keys);
207+
$prepareListFetchAll = static function () use ($boltResult): void {
208+
$boltResult->prepareForResultListFetchAll();
209+
};
210+
211+
return new SummarizedResult($summary, $recordsList, $keys, $prepareListFetchAll, $boltResult);
212+
}
213+
214+
/**
215+
* Bolt fetch size -1 means one PULL with n=-1; client-side CypherList cache must stay non-negative.
216+
*/
217+
private function clientSideCacheLimitFromBoltFetchSize(int $boltFetchSize): int
218+
{
219+
return $boltFetchSize < 0 ? PHP_INT_MAX : $boltFetchSize;
208220
}
209221

210222
public function formatArgs(array $profiledPlanData): PlanArguments
@@ -277,7 +289,7 @@ private function processBoltResult(array $meta, BoltResult $result, BoltConnecti
277289
foreach ($result as $row) {
278290
yield $this->formatRow($meta, $row);
279291
}
280-
}))->withCacheLimit($result->getFetchSize());
292+
}))->withCacheLimit($this->clientSideCacheLimitFromBoltFetchSize($result->getFetchSize()));
281293

282294
$connection->subscribeResult($tbr);
283295
$result->addFinishedCallback(function (array $response) use ($holder) {
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/*
6+
* This file is part of the Neo4j PHP Client and Driver package.
7+
*
8+
* (c) Nagels <https://nagels.tech>
9+
*
10+
* For the full copyright and license information, please view the LICENSE
11+
* file that was distributed with this source code.
12+
*/
13+
14+
namespace Laudis\Neo4j\TestkitBackend\Handlers;
15+
16+
use Bolt\error\BoltException;
17+
use Laudis\Neo4j\Databags\Neo4jError;
18+
use Laudis\Neo4j\Databags\SummarizedResult;
19+
use Laudis\Neo4j\Exception\Neo4jException;
20+
use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface;
21+
use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface;
22+
use Laudis\Neo4j\TestkitBackend\MainRepository;
23+
use Laudis\Neo4j\TestkitBackend\Requests\ResultListRequest;
24+
use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse;
25+
use Laudis\Neo4j\TestkitBackend\Responses\RecordListResponse;
26+
use Throwable;
27+
28+
/**
29+
* Materializes the full result (PULL / iterate) for TestKit Result.list().
30+
*
31+
* @implements RequestHandlerInterface<ResultListRequest>
32+
*/
33+
final class ResultList implements RequestHandlerInterface
34+
{
35+
public function __construct(
36+
private readonly MainRepository $repository,
37+
) {
38+
}
39+
40+
/**
41+
* @param ResultListRequest $request
42+
*/
43+
public function handle($request): TestkitResponseInterface
44+
{
45+
try {
46+
$result = $this->repository->getRecords($request->getResultId());
47+
if ($result instanceof TestkitResponseInterface) {
48+
return $result;
49+
}
50+
51+
$rows = [];
52+
if ($result instanceof SummarizedResult) {
53+
$this->repository->drainPendingIteratorNexts($request->getResultId(), $result);
54+
$iterable = $result->list();
55+
} else {
56+
$iterable = $result;
57+
}
58+
foreach ($iterable as $row) {
59+
$r = [];
60+
foreach ($row as $value) {
61+
$r[] = $value;
62+
}
63+
$rows[] = $r;
64+
}
65+
66+
$this->repository->removeRecords($request->getResultId());
67+
68+
return new RecordListResponse($rows);
69+
} catch (Neo4jException $e) {
70+
$this->repository->removeRecords($request->getResultId());
71+
72+
return new DriverErrorResponse($request->getResultId(), $e);
73+
} catch (BoltException $e) {
74+
$this->repository->removeRecords($request->getResultId());
75+
$neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage());
76+
$wrapped = new Neo4jException([$neo4jError], $e);
77+
78+
return new DriverErrorResponse($request->getResultId(), $wrapped);
79+
} catch (Throwable $e) {
80+
$this->repository->removeRecords($request->getResultId());
81+
if ($this->isConnectionOrSocketError($e)) {
82+
$neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage());
83+
$wrapped = new Neo4jException([$neo4jError], $e);
84+
85+
return new DriverErrorResponse($request->getResultId(), $wrapped);
86+
}
87+
throw $e;
88+
}
89+
}
90+
91+
private function isConnectionOrSocketError(Throwable $e): bool
92+
{
93+
$message = strtolower($e->getMessage());
94+
95+
return str_contains($message, 'broken pipe')
96+
|| str_contains($message, 'connection reset')
97+
|| str_contains($message, 'connection refused')
98+
|| str_contains($message, 'connection closed')
99+
|| str_contains($message, 'connection is closed')
100+
|| str_contains($message, 'interrupted system call')
101+
|| str_contains($message, 'i/o error')
102+
|| str_contains($message, 'network read incomplete')
103+
|| str_contains($message, 'network write incomplete')
104+
|| str_contains($message, 'socket')
105+
|| str_contains($message, 'broken');
106+
}
107+
}

testkit-backend/src/Handlers/ResultNext.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,9 @@ public function handle($request): TestkitResponseInterface
5050
}
5151

5252
$iterator = $this->repository->getIterator($request->getResultId());
53-
54-
// If we've already fetched the first record, advance to the next one
55-
if ($this->repository->getIteratorFetchedFirst($request->getResultId()) === true) {
56-
$iterator->next();
57-
}
53+
// Defer Iterator::next() until here so the Bolt stream is not advanced (e.g. second PULL)
54+
// until the client asks for the next record — required for disconnect stubs and Result.list().
55+
$this->repository->drainPendingIteratorNexts($request->getResultId(), $iterator);
5856

5957
// Check if iterator is valid - this may trigger generator to start and fetch results
6058
// If the connection is closed, this will throw an exception which we catch below
@@ -71,6 +69,8 @@ public function handle($request): TestkitResponseInterface
7169
$values[] = CypherObject::autoDetect($value);
7270
}
7371

72+
$this->repository->addPendingIteratorNext($request->getResultId());
73+
7474
return new RecordResponse($values);
7575
} catch (Neo4jException $e) {
7676
$this->repository->removeRecords($request->getResultId());

0 commit comments

Comments
 (0)