From 860d9140f99ffcbfed59c2e54037bf45046e0c99 Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Tue, 7 Apr 2026 18:15:49 +0530 Subject: [PATCH 01/12] temp commit --- src/Bolt/BoltConnection.php | 7 +- src/Bolt/BoltResult.php | 42 +++++-- src/Databags/SummarizedResult.php | 41 ++++++- src/Formatter/SummarizedResultFormatter.php | 22 +++- testkit-backend/src/Handlers/ResultList.php | 107 +++++++++++++++++ testkit-backend/src/Handlers/ResultNext.php | 10 +- testkit-backend/src/Handlers/ResultPeek.php | 104 +++++++++++++++++ testkit-backend/src/Handlers/ResultSingle.php | 20 +++- .../src/Handlers/ResultSingleOptional.php | 109 ++++++++++++++++++ testkit-backend/src/MainRepository.php | 64 +++++++++- testkit-backend/src/RequestFactory.php | 8 ++ .../src/Requests/ResultListRequest.php | 29 +++++ .../src/Requests/ResultPeekRequest.php | 29 +++++ .../Requests/ResultSingleOptionalRequest.php | 29 +++++ .../src/Responses/RecordListResponse.php | 52 +++++++++ .../src/Responses/RecordOptionalResponse.php | 53 +++++++++ .../src/Responses/RecordResponse.php | 11 +- testkit-backend/testkit.sh | 27 +++++ tests/Unit/SummarizedResultListTest.php | 46 ++++++++ 19 files changed, 782 insertions(+), 28 deletions(-) create mode 100644 testkit-backend/src/Handlers/ResultList.php create mode 100644 testkit-backend/src/Handlers/ResultPeek.php create mode 100644 testkit-backend/src/Handlers/ResultSingleOptional.php create mode 100644 testkit-backend/src/Requests/ResultListRequest.php create mode 100644 testkit-backend/src/Requests/ResultPeekRequest.php create mode 100644 testkit-backend/src/Requests/ResultSingleOptionalRequest.php create mode 100644 testkit-backend/src/Responses/RecordListResponse.php create mode 100644 testkit-backend/src/Responses/RecordOptionalResponse.php create mode 100644 tests/Unit/SummarizedResultListTest.php diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index 5816c020..3f802e14 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -337,9 +337,12 @@ public function pull(?int $qid, ?int $fetchSize): array return $tbr; } catch (Throwable $e) { $this->restoreOriginalTimeout(); - // If we've received some records before the disconnect, return them so first next() succeeds and second next() fails. + // If we've received some records before the disconnect, return them so first next() succeeds. + // Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts). + // Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator + // would stop cleanly instead of surfacing the disconnect. A synthetic has_more:true means "not done". if (!empty($tbr)) { - $tbr[] = []; + $tbr[] = ['has_more' => true]; /** @var non-empty-list */ return $tbr; diff --git a/src/Bolt/BoltResult.php b/src/Bolt/BoltResult.php index 865c6724..b623945f 100644 --- a/src/Bolt/BoltResult.php +++ b/src/Bolt/BoltResult.php @@ -49,11 +49,37 @@ public function __construct( ) { } + /** + * Remaining server pulls use PULL n=-1 (TestKit Optimization:ResultListFetchAll / list()). + */ + private ?int $pullOverrideSize = null; + + /** + * True after at least one {@see fetchResults()} (network pull). Used so list() can reset a stale + * cached generator before the first pull, but must not reset after next()+list() or rows replay. + */ + private bool $networkPullOccurred = false; + + public function prepareForResultListFetchAll(): void + { + $this->pullOverrideSize = -1; + // Drop cached generator only if no pull ran yet (e.g. valid()/getIt() touched before list()). + // If next() already ran, resetting would restart iterator() and duplicate records on list(). + if ($this->it !== null && !$this->networkPullOccurred) { + $this->it = null; + } + } + public function getFetchSize(): int { return $this->fetchSize; } + private function effectivePullSize(): int + { + return $this->pullOverrideSize ?? $this->fetchSize; + } + private ?Generator $it = null; /** @@ -116,8 +142,10 @@ public function consume(): array private function fetchResults(): void { + $this->networkPullOccurred = true; + try { - $meta = $this->connection->pull($this->qid, $this->fetchSize); + $meta = $this->connection->pull($this->qid, $this->effectivePullSize()); } catch (BoltConnectException|BoltException $e) { // Invalidate connection on socket/network errors so pool does not reuse it. // Rethrow as-is - Session retry logic inspects the actual exception via isConnectionError(). @@ -141,7 +169,6 @@ private function fetchResults(): void /** @psalm-suppress RedundantConditionGivenDocblockType */ if (count($meta) > 0 && is_array($meta[0])) { // If summary is empty array and we have no rows, it's a normal completion (no records) - // If summary is empty array but we have rows, it's a partial pull from disconnect if (empty($meta[0]) && empty($rows)) { // Normal completion with no records - mark as complete $this->meta = []; @@ -150,12 +177,11 @@ private function fetchResults(): void if (!array_key_exists('has_more', $meta[0]) || $meta[0]['has_more'] === false) { $this->meta = $meta[0]; } - } else { - // Empty summary but we have rows - partial result from disconnect - // Set $this->meta to null so the next fetchResults() will try to pull again - // This allows the first record to be consumed, and the next fetch will fail - // which is the expected behavior for tests like exit_after_record - $this->meta = null; + } elseif (!empty($rows)) { + // SUCCESS {} after records: Bolt may deserialize as []; stream is complete (not has_more) + if (!array_key_exists('has_more', $meta[0]) || $meta[0]['has_more'] === false) { + $this->meta = $meta[0]; + } } } else { // No summary received (connection closed before summary) diff --git a/src/Databags/SummarizedResult.php b/src/Databags/SummarizedResult.php index e97c54c8..50fdee83 100644 --- a/src/Databags/SummarizedResult.php +++ b/src/Databags/SummarizedResult.php @@ -13,6 +13,7 @@ namespace Laudis\Neo4j\Databags; +use Closure; use Generator; use Laudis\Neo4j\Formatter\SummarizedResultFormatter; use Laudis\Neo4j\Types\CypherList; @@ -33,17 +34,34 @@ final class SummarizedResult extends CypherList */ private array $keys; + /** + * Bolt: before materializing all records, use PULL n=-1 for remaining pulls (Result.list()). + * + * @var (Closure():void)|null + */ + private readonly ?Closure $prepareListFetchAll; + + /** + * Keeps the Bolt result stream alive until this summarized result is consumed (avoids premature BoltResult::__destruct). + * + * @var object|null + */ + private readonly ?object $boltResultRef; + /** * @psalm-mutation-free * * @param iterable>|callable():Generator> $iterable * @param list $keys + * @param (Closure():void)|null $prepareListFetchAll */ - public function __construct(?ResultSummary &$summary, iterable|callable $iterable = [], array $keys = []) + public function __construct(?ResultSummary &$summary, iterable|callable $iterable = [], array $keys = [], ?Closure $prepareListFetchAll = null, ?object $boltResultRef = null) { parent::__construct($iterable); $this->summary = &$summary; $this->keys = $keys; + $this->prepareListFetchAll = $prepareListFetchAll; + $this->boltResultRef = $boltResultRef; } /** @@ -85,4 +103,25 @@ public function keys(): array { return $this->keys; } + + /** + * Materialize all remaining records (Bolt: remaining pulls use fetch-all / PULL n=-1 when configured). + * + * Does not rewind: if the caller already consumed rows with next(), only unconsumed rows are returned. + * (iterator_to_array() would call rewind() and duplicate those rows.) + * + * @return list> + */ + public function list(): array + { + $this->prepareListFetchAll?->__invoke(); + + $rows = []; + while ($this->valid()) { + $rows[] = $this->current(); + $this->next(); + } + + return $rows; + } } diff --git a/src/Formatter/SummarizedResultFormatter.php b/src/Formatter/SummarizedResultFormatter.php index a1f2440c..9712fd13 100644 --- a/src/Formatter/SummarizedResultFormatter.php +++ b/src/Formatter/SummarizedResultFormatter.php @@ -194,17 +194,29 @@ function (mixed $response) use ($connection, $statement, $runStart, $resultAvail ); }); - $formattedResult = $this->processBoltResult($meta, $result, $connection, $holder); + $boltResult = $result; + $formattedResult = $this->processBoltResult($meta, $boltResult, $connection, $holder); - /** @var SummarizedResult */ - $result = (new CypherList($formattedResult))->withCacheLimit($result->getFetchSize()); + $recordsList = (new CypherList($formattedResult))->withCacheLimit($this->clientSideCacheLimitFromBoltFetchSize($boltResult->getFetchSize())); // Safely get fields from metadata, defaulting to empty array if missing (indicates connection loss) $keys = []; if (array_key_exists('fields', $meta)) { $keys = $meta['fields']; } - return new SummarizedResult($summary, $result, $keys); + $prepareListFetchAll = static function () use ($boltResult): void { + $boltResult->prepareForResultListFetchAll(); + }; + + return new SummarizedResult($summary, $recordsList, $keys, $prepareListFetchAll, $boltResult); + } + + /** + * Bolt fetch size -1 means one PULL with n=-1; client-side CypherList cache must stay non-negative. + */ + private function clientSideCacheLimitFromBoltFetchSize(int $boltFetchSize): int + { + return $boltFetchSize < 0 ? PHP_INT_MAX : $boltFetchSize; } public function formatArgs(array $profiledPlanData): PlanArguments @@ -277,7 +289,7 @@ private function processBoltResult(array $meta, BoltResult $result, BoltConnecti foreach ($result as $row) { yield $this->formatRow($meta, $row); } - }))->withCacheLimit($result->getFetchSize()); + }))->withCacheLimit($this->clientSideCacheLimitFromBoltFetchSize($result->getFetchSize())); $connection->subscribeResult($tbr); $result->addFinishedCallback(function (array $response) use ($holder) { diff --git a/testkit-backend/src/Handlers/ResultList.php b/testkit-backend/src/Handlers/ResultList.php new file mode 100644 index 00000000..c06a306c --- /dev/null +++ b/testkit-backend/src/Handlers/ResultList.php @@ -0,0 +1,107 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend\Handlers; + +use Bolt\error\BoltException; +use Laudis\Neo4j\Databags\Neo4jError; +use Laudis\Neo4j\Databags\SummarizedResult; +use Laudis\Neo4j\Exception\Neo4jException; +use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; +use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; +use Laudis\Neo4j\TestkitBackend\MainRepository; +use Laudis\Neo4j\TestkitBackend\Requests\ResultListRequest; +use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse; +use Laudis\Neo4j\TestkitBackend\Responses\RecordListResponse; +use Throwable; + +/** + * Materializes the full result (PULL / iterate) for TestKit Result.list(). + * + * @implements RequestHandlerInterface + */ +final class ResultList implements RequestHandlerInterface +{ + public function __construct( + private readonly MainRepository $repository, + ) { + } + + /** + * @param ResultListRequest $request + */ + public function handle($request): TestkitResponseInterface + { + try { + $result = $this->repository->getRecords($request->getResultId()); + if ($result instanceof TestkitResponseInterface) { + return $result; + } + + $rows = []; + if ($result instanceof SummarizedResult) { + $this->repository->drainPendingIteratorNexts($request->getResultId(), $result); + $iterable = $result->list(); + } else { + $iterable = $result; + } + foreach ($iterable as $row) { + $r = []; + foreach ($row as $value) { + $r[] = $value; + } + $rows[] = $r; + } + + $this->repository->removeRecords($request->getResultId()); + + return new RecordListResponse($rows); + } catch (Neo4jException $e) { + $this->repository->removeRecords($request->getResultId()); + + return new DriverErrorResponse($request->getResultId(), $e); + } catch (BoltException $e) { + $this->repository->removeRecords($request->getResultId()); + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + + return new DriverErrorResponse($request->getResultId(), $wrapped); + } catch (Throwable $e) { + $this->repository->removeRecords($request->getResultId()); + if ($this->isConnectionOrSocketError($e)) { + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + + return new DriverErrorResponse($request->getResultId(), $wrapped); + } + throw $e; + } + } + + private function isConnectionOrSocketError(Throwable $e): bool + { + $message = strtolower($e->getMessage()); + + return str_contains($message, 'broken pipe') + || str_contains($message, 'connection reset') + || str_contains($message, 'connection refused') + || str_contains($message, 'connection closed') + || str_contains($message, 'connection is closed') + || str_contains($message, 'interrupted system call') + || str_contains($message, 'i/o error') + || str_contains($message, 'network read incomplete') + || str_contains($message, 'network write incomplete') + || str_contains($message, 'socket') + || str_contains($message, 'broken'); + } +} diff --git a/testkit-backend/src/Handlers/ResultNext.php b/testkit-backend/src/Handlers/ResultNext.php index 226e8d13..d9b99258 100644 --- a/testkit-backend/src/Handlers/ResultNext.php +++ b/testkit-backend/src/Handlers/ResultNext.php @@ -50,11 +50,9 @@ public function handle($request): TestkitResponseInterface } $iterator = $this->repository->getIterator($request->getResultId()); - - // If we've already fetched the first record, advance to the next one - if ($this->repository->getIteratorFetchedFirst($request->getResultId()) === true) { - $iterator->next(); - } + // Defer Iterator::next() until here so the Bolt stream is not advanced (e.g. second PULL) + // until the client asks for the next record — required for disconnect stubs and Result.list(). + $this->repository->drainPendingIteratorNexts($request->getResultId(), $iterator); // Check if iterator is valid - this may trigger generator to start and fetch results // If the connection is closed, this will throw an exception which we catch below @@ -71,6 +69,8 @@ public function handle($request): TestkitResponseInterface $values[] = CypherObject::autoDetect($value); } + $this->repository->addPendingIteratorNext($request->getResultId()); + return new RecordResponse($values); } catch (Neo4jException $e) { $this->repository->removeRecords($request->getResultId()); diff --git a/testkit-backend/src/Handlers/ResultPeek.php b/testkit-backend/src/Handlers/ResultPeek.php new file mode 100644 index 00000000..c68318c2 --- /dev/null +++ b/testkit-backend/src/Handlers/ResultPeek.php @@ -0,0 +1,104 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend\Handlers; + +use Bolt\error\BoltException; +use Laudis\Neo4j\Databags\Neo4jError; +use Laudis\Neo4j\Exception\Neo4jException; +use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; +use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; +use Laudis\Neo4j\TestkitBackend\MainRepository; +use Laudis\Neo4j\TestkitBackend\Requests\ResultPeekRequest; +use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse; +use Laudis\Neo4j\TestkitBackend\Responses\NullRecordResponse; +use Laudis\Neo4j\TestkitBackend\Responses\RecordResponse; +use Laudis\Neo4j\TestkitBackend\Responses\Types\CypherObject; +use Throwable; + +/** + * Peek at the next record without advancing the iterator position used by ResultNext. + * + * @implements RequestHandlerInterface + */ +final class ResultPeek implements RequestHandlerInterface +{ + public function __construct( + private readonly MainRepository $repository, + ) { + } + + /** + * @param ResultPeekRequest $request + */ + public function handle($request): TestkitResponseInterface + { + try { + $record = $this->repository->getRecords($request->getResultId()); + if ($record instanceof TestkitResponseInterface) { + return $record; + } + + $iterator = $this->repository->getIterator($request->getResultId()); + + if (!$iterator->valid()) { + return new NullRecordResponse(); + } + + $current = $iterator->current(); + + $values = []; + foreach ($current as $value) { + $values[] = CypherObject::autoDetect($value); + } + + return new RecordResponse($values); + } catch (Neo4jException $e) { + $this->repository->removeRecords($request->getResultId()); + + return new DriverErrorResponse($request->getResultId(), $e); + } catch (BoltException $e) { + $this->repository->removeRecords($request->getResultId()); + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + + return new DriverErrorResponse($request->getResultId(), $wrapped); + } catch (Throwable $e) { + $this->repository->removeRecords($request->getResultId()); + if ($this->isConnectionOrSocketError($e)) { + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + + return new DriverErrorResponse($request->getResultId(), $wrapped); + } + throw $e; + } + } + + private function isConnectionOrSocketError(Throwable $e): bool + { + $message = strtolower($e->getMessage()); + + return str_contains($message, 'broken pipe') + || str_contains($message, 'connection reset') + || str_contains($message, 'connection refused') + || str_contains($message, 'connection closed') + || str_contains($message, 'connection is closed') + || str_contains($message, 'interrupted system call') + || str_contains($message, 'i/o error') + || str_contains($message, 'network read incomplete') + || str_contains($message, 'network write incomplete') + || str_contains($message, 'socket') + || str_contains($message, 'broken'); + } +} diff --git a/testkit-backend/src/Handlers/ResultSingle.php b/testkit-backend/src/Handlers/ResultSingle.php index c78a52fc..6af2bcce 100644 --- a/testkit-backend/src/Handlers/ResultSingle.php +++ b/testkit-backend/src/Handlers/ResultSingle.php @@ -13,12 +13,15 @@ namespace Laudis\Neo4j\TestkitBackend\Handlers; +use Laudis\Neo4j\Databags\Neo4jError; +use Laudis\Neo4j\Exception\Neo4jException; use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; use Laudis\Neo4j\TestkitBackend\MainRepository; use Laudis\Neo4j\TestkitBackend\Requests\ResultSingleRequest; -use Laudis\Neo4j\TestkitBackend\Responses\BackendErrorResponse; +use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse; use Laudis\Neo4j\TestkitBackend\Responses\RecordResponse; +use Laudis\Neo4j\TestkitBackend\Responses\Types\CypherObject; /** * Request to expect and return exactly one record in the result stream. @@ -32,7 +35,7 @@ */ final class ResultSingle implements RequestHandlerInterface { - private function __construct( + public function __construct( private readonly MainRepository $repository, ) { } @@ -41,17 +44,24 @@ public function handle($request): TestkitResponseInterface { $record = $this->repository->getRecords($request->getResultId()); if ($record instanceof TestkitResponseInterface) { - return new BackendErrorResponse('Something went wrong with the result handling'); + $err = new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Statement.ResultNotSingle', 'Something went wrong with the result handling')]); + + return new DriverErrorResponse($request->getResultId(), $err); } $count = $record->count(); if ($count !== 1) { - return new BackendErrorResponse(sprintf('Found exactly %s result rows, but expected just one.', $count)); + $err = new Neo4jException([Neo4jError::fromMessageAndCode( + 'Neo.ClientError.Statement.ResultNotSingle', + sprintf('Expected exactly one result row, found %d.', $count) + )]); + + return new DriverErrorResponse($request->getResultId(), $err); } $values = []; foreach ($record->getAsCypherMap(0) as $value) { - $values[] = $value; + $values[] = CypherObject::autoDetect($value); } return new RecordResponse($values); diff --git a/testkit-backend/src/Handlers/ResultSingleOptional.php b/testkit-backend/src/Handlers/ResultSingleOptional.php new file mode 100644 index 00000000..89ffa677 --- /dev/null +++ b/testkit-backend/src/Handlers/ResultSingleOptional.php @@ -0,0 +1,109 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend\Handlers; + +use Bolt\error\BoltException; +use Laudis\Neo4j\Databags\Neo4jError; +use Laudis\Neo4j\Exception\Neo4jException; +use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; +use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; +use Laudis\Neo4j\TestkitBackend\MainRepository; +use Laudis\Neo4j\TestkitBackend\Requests\ResultSingleOptionalRequest; +use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse; +use Laudis\Neo4j\TestkitBackend\Responses\RecordOptionalResponse; +use Throwable; + +/** + * @implements RequestHandlerInterface + */ +final class ResultSingleOptional implements RequestHandlerInterface +{ + public function __construct( + private readonly MainRepository $repository, + ) { + } + + /** + * @param ResultSingleOptionalRequest $request + */ + public function handle($request): TestkitResponseInterface + { + try { + $result = $this->repository->getRecords($request->getResultId()); + if ($result instanceof TestkitResponseInterface) { + return $result; + } + + $rows = []; + foreach ($result as $row) { + $r = []; + foreach ($row as $value) { + $r[] = $value; + } + $rows[] = $r; + } + + $this->repository->removeRecords($request->getResultId()); + + $n = count($rows); + if ($n === 0) { + return new RecordOptionalResponse(null, []); + } + if ($n === 1) { + return new RecordOptionalResponse($rows[0], []); + } + + return new RecordOptionalResponse( + $rows[0], + ['Expected a single record but found multiple records in the stream.'] + ); + } catch (Neo4jException $e) { + $this->repository->removeRecords($request->getResultId()); + + return new DriverErrorResponse($request->getResultId(), $e); + } catch (BoltException $e) { + $this->repository->removeRecords($request->getResultId()); + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + + return new DriverErrorResponse($request->getResultId(), $wrapped); + } catch (Throwable $e) { + $this->repository->removeRecords($request->getResultId()); + if ($this->isConnectionOrSocketError($e)) { + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + + return new DriverErrorResponse($request->getResultId(), $wrapped); + } + throw $e; + } + } + + private function isConnectionOrSocketError(Throwable $e): bool + { + $message = strtolower($e->getMessage()); + + return str_contains($message, 'broken pipe') + || str_contains($message, 'connection reset') + || str_contains($message, 'connection refused') + || str_contains($message, 'connection closed') + || str_contains($message, 'connection is closed') + || str_contains($message, 'interrupted system call') + || str_contains($message, 'i/o error') + || str_contains($message, 'network read incomplete') + || str_contains($message, 'network write incomplete') + || str_contains($message, 'socket') + || str_contains($message, 'broken'); + } +} diff --git a/testkit-backend/src/MainRepository.php b/testkit-backend/src/MainRepository.php index af78e8db..ac83d4ff 100644 --- a/testkit-backend/src/MainRepository.php +++ b/testkit-backend/src/MainRepository.php @@ -43,6 +43,18 @@ final class MainRepository /** @var array */ private array $iteratorFetchedFirst; + /** @var array After ResultPeek advanced the iterator, ResultNext must not advance again. */ + private array $peekPrimed = []; + + /** + * Count of {@see Iterator::next()} calls owed before the next read: one per record already returned + * to TestKit without advancing the shared iterator (advancing immediately would run the next Bolt pull + * too early — e.g. disconnect tests expect the second pull on the second {@see ResultNext}, not after the first). + * + * @var array + */ + private array $pendingIteratorNextCount = []; + /** * @param array>>> $drivers * @param array>>> $sessions @@ -93,6 +105,55 @@ public function setIteratorFetchedFirst(Uuid $id, bool $value): void $this->iteratorFetchedFirst[$id->toRfc4122()] = $value; } + /** + * ResultPeek advanced the iterator; the following ResultNext must skip its leading {@see Iterator::next()}. + */ + public function setPeekPrimed(Uuid $id, bool $value): void + { + if ($value) { + $this->peekPrimed[$id->toRfc4122()] = true; + } else { + unset($this->peekPrimed[$id->toRfc4122()]); + } + } + + public function consumePeekPrimed(Uuid $id): bool + { + $key = $id->toRfc4122(); + if (!isset($this->peekPrimed[$key])) { + return false; + } + unset($this->peekPrimed[$key]); + + return true; + } + + /** + * After returning a record from {@see ResultNext}, the iterator must advance before the next read; + * defer that advance so the Bolt layer does not pull until the following ResultNext or Result.list(). + */ + public function addPendingIteratorNext(Uuid $id): void + { + $key = $id->toRfc4122(); + $this->pendingIteratorNextCount[$key] = ($this->pendingIteratorNextCount[$key] ?? 0) + 1; + } + + /** + * Applies deferred {@see Iterator::next()} calls (e.g. before the next ResultNext or before Result.list()). + */ + public function drainPendingIteratorNexts(Uuid $id, Iterator $iterator): void + { + $key = $id->toRfc4122(); + $n = $this->pendingIteratorNextCount[$key] ?? 0; + if ($n === 0) { + return; + } + unset($this->pendingIteratorNextCount[$key]); + for ($i = 0; $i < $n; ++$i) { + $iterator->next(); + } + } + /** * @return DriverInterface>> */ @@ -136,7 +197,8 @@ public function addRecords(Uuid $id, $result): void public function removeRecords(Uuid $id): void { - unset($this->records[$id->toRfc4122()]); + $key = $id->toRfc4122(); + unset($this->records[$key], $this->iteratorFetchedFirst[$key], $this->peekPrimed[$key], $this->pendingIteratorNextCount[$key]); } /** diff --git a/testkit-backend/src/RequestFactory.php b/testkit-backend/src/RequestFactory.php index fa992996..ae6875ac 100644 --- a/testkit-backend/src/RequestFactory.php +++ b/testkit-backend/src/RequestFactory.php @@ -27,7 +27,11 @@ use Laudis\Neo4j\TestkitBackend\Requests\NewSessionRequest; use Laudis\Neo4j\TestkitBackend\Requests\ResolverResolutionCompletedRequest; use Laudis\Neo4j\TestkitBackend\Requests\ResultConsumeRequest; +use Laudis\Neo4j\TestkitBackend\Requests\ResultListRequest; use Laudis\Neo4j\TestkitBackend\Requests\ResultNextRequest; +use Laudis\Neo4j\TestkitBackend\Requests\ResultPeekRequest; +use Laudis\Neo4j\TestkitBackend\Requests\ResultSingleOptionalRequest; +use Laudis\Neo4j\TestkitBackend\Requests\ResultSingleRequest; use Laudis\Neo4j\TestkitBackend\Requests\RetryableNegativeRequest; use Laudis\Neo4j\TestkitBackend\Requests\RetryablePositiveRequest; use Laudis\Neo4j\TestkitBackend\Requests\SessionBeginTransactionRequest; @@ -68,6 +72,10 @@ final class RequestFactory 'TransactionRollback' => TransactionRollbackRequest::class, 'TransactionClose' => TransactionCloseRequest::class, 'ResultNext' => ResultNextRequest::class, + 'ResultSingle' => ResultSingleRequest::class, + 'ResultList' => ResultListRequest::class, + 'ResultPeek' => ResultPeekRequest::class, + 'ResultSingleOptional' => ResultSingleOptionalRequest::class, 'ResultConsume' => ResultConsumeRequest::class, 'RetryablePositive' => RetryablePositiveRequest::class, 'RetryableNegative' => RetryableNegativeRequest::class, diff --git a/testkit-backend/src/Requests/ResultListRequest.php b/testkit-backend/src/Requests/ResultListRequest.php new file mode 100644 index 00000000..63cdb6ad --- /dev/null +++ b/testkit-backend/src/Requests/ResultListRequest.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend\Requests; + +use Symfony\Component\Uid\Uuid; + +final class ResultListRequest +{ + public function __construct( + private readonly Uuid $resultId, + ) { + } + + public function getResultId(): Uuid + { + return $this->resultId; + } +} diff --git a/testkit-backend/src/Requests/ResultPeekRequest.php b/testkit-backend/src/Requests/ResultPeekRequest.php new file mode 100644 index 00000000..026b3683 --- /dev/null +++ b/testkit-backend/src/Requests/ResultPeekRequest.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend\Requests; + +use Symfony\Component\Uid\Uuid; + +final class ResultPeekRequest +{ + public function __construct( + private readonly Uuid $resultId, + ) { + } + + public function getResultId(): Uuid + { + return $this->resultId; + } +} diff --git a/testkit-backend/src/Requests/ResultSingleOptionalRequest.php b/testkit-backend/src/Requests/ResultSingleOptionalRequest.php new file mode 100644 index 00000000..0e3e170e --- /dev/null +++ b/testkit-backend/src/Requests/ResultSingleOptionalRequest.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend\Requests; + +use Symfony\Component\Uid\Uuid; + +final class ResultSingleOptionalRequest +{ + public function __construct( + private readonly Uuid $resultId, + ) { + } + + public function getResultId(): Uuid + { + return $this->resultId; + } +} diff --git a/testkit-backend/src/Responses/RecordListResponse.php b/testkit-backend/src/Responses/RecordListResponse.php new file mode 100644 index 00000000..46beb0f4 --- /dev/null +++ b/testkit-backend/src/Responses/RecordListResponse.php @@ -0,0 +1,52 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend\Responses; + +use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; +use Laudis\Neo4j\TestkitBackend\Responses\Types\CypherObject; + +/** + * Response to ResultList — full materialized record list. + * + * @psalm-import-type OGMTypes from \Laudis\Neo4j\Formatter\OGMFormatter + */ +final class RecordListResponse implements TestkitResponseInterface +{ + /** + * @param list> $records Each row is a list of cell values (OGM types). + */ + public function __construct( + private readonly array $records, + ) { + } + + public function jsonSerialize(): array + { + $encoded = []; + foreach ($this->records as $row) { + $values = []; + foreach ($row as $value) { + $values[] = CypherObject::autoDetect($value); + } + $encoded[] = ['values' => $values]; + } + + return [ + 'name' => 'RecordList', + 'data' => [ + 'records' => $encoded, + ], + ]; + } +} diff --git a/testkit-backend/src/Responses/RecordOptionalResponse.php b/testkit-backend/src/Responses/RecordOptionalResponse.php new file mode 100644 index 00000000..15389905 --- /dev/null +++ b/testkit-backend/src/Responses/RecordOptionalResponse.php @@ -0,0 +1,53 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend\Responses; + +use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; +use Laudis\Neo4j\TestkitBackend\Responses\Types\CypherObject; + +/** + * Response to ResultSingleOptional. + */ +final class RecordOptionalResponse implements TestkitResponseInterface +{ + /** + * @param list|null $recordValues First record as flat list of cell values, or null + * @param list $warnings + */ + public function __construct( + private readonly ?array $recordValues, + private readonly array $warnings, + ) { + } + + public function jsonSerialize(): array + { + $record = null; + if ($this->recordValues !== null) { + $values = []; + foreach ($this->recordValues as $value) { + $values[] = CypherObject::autoDetect($value); + } + $record = ['values' => $values]; + } + + return [ + 'name' => 'RecordOptional', + 'data' => [ + 'record' => $record, + 'warnings' => $this->warnings, + ], + ]; + } +} diff --git a/testkit-backend/src/Responses/RecordResponse.php b/testkit-backend/src/Responses/RecordResponse.php index e9160dc6..5f5b2db5 100644 --- a/testkit-backend/src/Responses/RecordResponse.php +++ b/testkit-backend/src/Responses/RecordResponse.php @@ -14,6 +14,7 @@ namespace Laudis\Neo4j\TestkitBackend\Responses; use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; +use Laudis\Neo4j\TestkitBackend\Responses\Types\CypherObject; /** * Represents a record from a result. @@ -35,10 +36,18 @@ public function __construct(iterable $values) public function jsonSerialize(): array { + $serializedValues = []; + foreach ($this->values as $v) { + if (!$v instanceof TestkitResponseInterface) { + $v = CypherObject::autoDetect($v); + } + $serializedValues[] = $v->jsonSerialize(); + } + return [ 'name' => 'Record', 'data' => [ - 'values' => $this->values, + 'values' => $serializedValues, ], ]; } diff --git a/testkit-backend/testkit.sh b/testkit-backend/testkit.sh index 0f549afa..3ac85647 100755 --- a/testkit-backend/testkit.sh +++ b/testkit-backend/testkit.sh @@ -159,6 +159,33 @@ python3 -m unittest -v \ tests.stub.disconnects.test_disconnects.TestDisconnects.test_disconnect_on_tx_begin \ tests.stub.disconnects.test_disconnects.TestDisconnects.test_disconnect_session_on_tx_pull_after_record \ tests.stub.disconnects.test_disconnects.TestDisconnects.test_fail_on_reset \ +\ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_0_records \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_1_records \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_2_records \ + tests.stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once \ + tests.stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list \ +\ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_0_records \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_1_records \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_2_records \ +\ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_0_records \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_1_records \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_2_records \ +\ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_0_records \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_1_records \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_2_records \ +\ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_full_batch \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_half_batch \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_empty_batch \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all \ +\ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested \ EXIT_CODE=$? diff --git a/tests/Unit/SummarizedResultListTest.php b/tests/Unit/SummarizedResultListTest.php new file mode 100644 index 00000000..136fdfbf --- /dev/null +++ b/tests/Unit/SummarizedResultListTest.php @@ -0,0 +1,46 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\Tests\Unit; + +use Laudis\Neo4j\Databags\SummarizedResult; +use Laudis\Neo4j\Types\CypherList; +use Laudis\Neo4j\Types\CypherMap; +use PHPUnit\Framework\TestCase; + +/** + * Mirrors TestKit stub: next() then list() must return only remaining rows (no rewind / duplicate). + */ +final class SummarizedResultListTest extends TestCase +{ + public function testListAfterNextOmitsConsumedRow(): void + { + $summary = null; + $inner = (new CypherList(function (): iterable { + for ($i = 1; $i <= 5; ++$i) { + yield new CypherMap(['n' => $i]); + } + }))->withCacheLimit(2); + + $recordsList = (new CypherList($inner))->withCacheLimit(2); + $result = new SummarizedResult($summary, $recordsList, ['n'], null, null); + + self::assertTrue($result->valid()); + self::assertSame(1, $result->current()->get('n')); + $result->next(); + + $rows = $result->list(); + self::assertCount(4, $rows); + self::assertSame([2, 3, 4, 5], array_map(static fn (CypherMap $m): int => $m->get('n'), $rows)); + } +} From 4cd13e90d6f2277749657582efdce60db4f8b3c9 Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Wed, 8 Apr 2026 11:24:50 +0530 Subject: [PATCH 02/12] feat(bolt,testkit): TestKit result iteration, SummarizedResult::list(), PULL n=-1, and Bolt pull/disconnect fixes --- src/Bolt/BoltResult.php | 51 +++++++++---------- src/Databags/SummarizedResult.php | 2 - testkit-backend/src/Handlers/ResultPeek.php | 4 +- testkit-backend/src/MainRepository.php | 2 +- .../src/Responses/RecordListResponse.php | 2 +- tests/Unit/SummarizedResultListTest.php | 18 ++++++- 6 files changed, 44 insertions(+), 35 deletions(-) diff --git a/src/Bolt/BoltResult.php b/src/Bolt/BoltResult.php index b623945f..ac4e5233 100644 --- a/src/Bolt/BoltResult.php +++ b/src/Bolt/BoltResult.php @@ -13,6 +13,7 @@ namespace Laudis\Neo4j\Bolt; +use function array_key_exists; use function array_splice; use Bolt\error\BoltException; @@ -23,10 +24,10 @@ use Generator; use function in_array; +use function is_array; use Iterator; use Laudis\Neo4j\Formatter\SummarizedResultFormatter; -use RuntimeException; use Throwable; /** @@ -153,40 +154,34 @@ private function fetchResults(): void throw $e; } // Neo4jException and other Throwable propagate naturally - no invalidate needed for server errors - - // Safety check: ensure pull response $meta is not empty (pull() is typed non-empty-list but we defend against empty) - /** @psalm-suppress TypeDoesNotContainType */ - if (empty($meta)) { - throw new RuntimeException('Empty response from server'); - } + // $meta is non-empty: {@see BoltConnection::pull()} is contractually non-empty-list. /** @var list $rows */ $rows = array_splice($meta, 0, count($meta) - 1); $this->rows = $rows; - /** @var array{0: array} $meta */ - // Check if we have a valid summary - /** @psalm-suppress RedundantConditionGivenDocblockType */ - if (count($meta) > 0 && is_array($meta[0])) { - // If summary is empty array and we have no rows, it's a normal completion (no records) - if (empty($meta[0]) && empty($rows)) { - // Normal completion with no records - mark as complete - $this->meta = []; - } elseif (!empty($meta[0])) { - // Valid summary with data - if (!array_key_exists('has_more', $meta[0]) || $meta[0]['has_more'] === false) { - $this->meta = $meta[0]; - } - } elseif (!empty($rows)) { - // SUCCESS {} after records: Bolt may deserialize as []; stream is complete (not has_more) - if (!array_key_exists('has_more', $meta[0]) || $meta[0]['has_more'] === false) { - $this->meta = $meta[0]; - } - } - } else { + $summarySlot = $meta[0] ?? null; + if (!is_array($summarySlot)) { // No summary received (connection closed before summary) - // Set $this->meta to null so the next fetchResults() will try to pull again $this->meta = null; + + return; + } + + $summaryEmpty = $summarySlot === []; + $hasDataRows = $rows !== []; + + if ($summaryEmpty && !$hasDataRows) { + // Normal completion with no records + $this->meta = []; + } elseif (!$summaryEmpty) { + // Valid summary map (e.g. has_more, counters, db, …) + if (!array_key_exists('has_more', $summarySlot) || $summarySlot['has_more'] === false) { + $this->meta = $summarySlot; + } + } else { + // Empty summary slot with data rows: Bolt SUCCESS {} after RECORDs — stream complete (no has_more keys). + $this->meta = $summarySlot; } } diff --git a/src/Databags/SummarizedResult.php b/src/Databags/SummarizedResult.php index 50fdee83..a140f4a1 100644 --- a/src/Databags/SummarizedResult.php +++ b/src/Databags/SummarizedResult.php @@ -43,8 +43,6 @@ final class SummarizedResult extends CypherList /** * Keeps the Bolt result stream alive until this summarized result is consumed (avoids premature BoltResult::__destruct). - * - * @var object|null */ private readonly ?object $boltResultRef; diff --git a/testkit-backend/src/Handlers/ResultPeek.php b/testkit-backend/src/Handlers/ResultPeek.php index c68318c2..504a5247 100644 --- a/testkit-backend/src/Handlers/ResultPeek.php +++ b/testkit-backend/src/Handlers/ResultPeek.php @@ -27,7 +27,8 @@ use Throwable; /** - * Peek at the next record without advancing the iterator position used by ResultNext. + * Peek at the next record without consuming it (no {@see Iterator::next()} after the peek itself). + * Deferred advances from prior {@see ResultNext} must be applied first so peek sees the correct row. * * @implements RequestHandlerInterface */ @@ -50,6 +51,7 @@ public function handle($request): TestkitResponseInterface } $iterator = $this->repository->getIterator($request->getResultId()); + $this->repository->drainPendingIteratorNexts($request->getResultId(), $iterator); if (!$iterator->valid()) { return new NullRecordResponse(); diff --git a/testkit-backend/src/MainRepository.php b/testkit-backend/src/MainRepository.php index ac83d4ff..efb19ee6 100644 --- a/testkit-backend/src/MainRepository.php +++ b/testkit-backend/src/MainRepository.php @@ -120,7 +120,7 @@ public function setPeekPrimed(Uuid $id, bool $value): void public function consumePeekPrimed(Uuid $id): bool { $key = $id->toRfc4122(); - if (!isset($this->peekPrimed[$key])) { + if (!array_key_exists($key, $this->peekPrimed)) { return false; } unset($this->peekPrimed[$key]); diff --git a/testkit-backend/src/Responses/RecordListResponse.php b/testkit-backend/src/Responses/RecordListResponse.php index 46beb0f4..30df4b7e 100644 --- a/testkit-backend/src/Responses/RecordListResponse.php +++ b/testkit-backend/src/Responses/RecordListResponse.php @@ -24,7 +24,7 @@ final class RecordListResponse implements TestkitResponseInterface { /** - * @param list> $records Each row is a list of cell values (OGM types). + * @param list> $records each row is a list of cell values (OGM types) */ public function __construct( private readonly array $records, diff --git a/tests/Unit/SummarizedResultListTest.php b/tests/Unit/SummarizedResultListTest.php index 136fdfbf..f1418866 100644 --- a/tests/Unit/SummarizedResultListTest.php +++ b/tests/Unit/SummarizedResultListTest.php @@ -14,12 +14,15 @@ namespace Laudis\Neo4j\Tests\Unit; use Laudis\Neo4j\Databags\SummarizedResult; +use Laudis\Neo4j\Formatter\SummarizedResultFormatter; use Laudis\Neo4j\Types\CypherList; use Laudis\Neo4j\Types\CypherMap; use PHPUnit\Framework\TestCase; /** * Mirrors TestKit stub: next() then list() must return only remaining rows (no rewind / duplicate). + * + * @psalm-import-type OGMTypes from SummarizedResultFormatter */ final class SummarizedResultListTest extends TestCase { @@ -32,15 +35,26 @@ public function testListAfterNextOmitsConsumedRow(): void } }))->withCacheLimit(2); + /** @var CypherList> $recordsList */ $recordsList = (new CypherList($inner))->withCacheLimit(2); $result = new SummarizedResult($summary, $recordsList, ['n'], null, null); self::assertTrue($result->valid()); - self::assertSame(1, $result->current()->get('n')); + $first = $result->current()->get('n'); + self::assertIsInt($first); + self::assertSame(1, $first); $result->next(); $rows = $result->list(); self::assertCount(4, $rows); - self::assertSame([2, 3, 4, 5], array_map(static fn (CypherMap $m): int => $m->get('n'), $rows)); + + $nValues = []; + foreach ($rows as $row) { + self::assertInstanceOf(CypherMap::class, $row); + $n = $row->get('n'); + self::assertIsInt($n); + $nValues[] = $n; + } + self::assertSame([2, 3, 4, 5], $nValues); } } From 54e4c6614677efe6e8ebb81f027c96c7781f0f04 Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Wed, 8 Apr 2026 17:29:06 +0530 Subject: [PATCH 03/12] Fixed iteration tests --- src/Bolt/BoltConnection.php | 86 ++++++++++++++++++- src/Bolt/BoltResult.php | 5 ++ src/Bolt/BoltUnmanagedTransaction.php | 29 ++++++- src/Bolt/ConnectionPool.php | 11 +-- src/Bolt/Messages/BoltDiscardMessage.php | 8 +- src/Bolt/Messages/BoltPullMessage.php | 10 ++- src/Bolt/ProtocolFactory.php | 11 +-- testkit-backend/src/Handlers/ResultList.php | 16 ++-- testkit-backend/src/Handlers/ResultNext.php | 15 ++-- testkit-backend/src/Handlers/ResultPeek.php | 18 ++-- testkit-backend/src/Handlers/ResultSingle.php | 80 +++++++++++++---- .../src/Handlers/ResultSingleOptional.php | 18 ++-- .../src/Handlers/RetryableNegative.php | 42 +++++---- testkit-backend/src/MainRepository.php | 15 +++- testkit-backend/src/Socket.php | 8 +- testkit-backend/testkit.sh | 38 +++++++- tests/Unit/BoltConnectionPoolTest.php | 4 +- 17 files changed, 321 insertions(+), 93 deletions(-) diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index 3f802e14..5db24057 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -17,6 +17,7 @@ use Bolt\enum\Signature; use Bolt\error\ConnectException as BoltConnectException; use Bolt\protocol\Response; +use Bolt\protocol\V3; use Bolt\protocol\V4_4; use Bolt\protocol\V5; use Bolt\protocol\V5_1; @@ -44,7 +45,7 @@ use WeakReference; /** - * @implements ConnectionInterface + * @implements ConnectionInterface * * @psalm-import-type BoltMeta from SummarizedResultFormatter */ @@ -72,7 +73,19 @@ class BoltConnection implements ConnectionInterface private ?float $originalTimeout = null; /** - * @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection} + * Bolt v3 expects RUN and PULL_ALL to be sent before any SUCCESS is read (pipelined). After {@see run()} + * queues both, the first {@see pull()} must not send a second PULL_ALL. + */ + private bool $v3PullAllQueued = false; + + /** + * When one PULL yields RECORD(s) then FAILURE, {@see pull()} defers the {@see Neo4jException} to the next + * {@see BoltResult::fetchResults()} so records are delivered before the error (TestKit pull_2_end_error.script). + */ + private ?Neo4jException $deferredPullFailure = null; + + /** + * @return array{0: V3|V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection} */ public function getImplementation(): array { @@ -83,7 +96,7 @@ public function getImplementation(): array * @psalm-mutation-free */ public function __construct( - private V4_4|V5|V5_1|V5_2|V5_3|V5_4|null $boltProtocol, + private V3|V4_4|V5|V5_1|V5_2|V5_3|V5_4|null $boltProtocol, private readonly Connection $connection, private readonly AuthenticateInterface $auth, private readonly string $userAgent, @@ -217,6 +230,8 @@ public function consumeResults(): void */ public function reset(): void { + $this->v3PullAllQueued = false; + $this->deferredPullFailure = null; $message = $this->messageFactory->createResetMessage(); $response = $message->send()->getResponse(); $this->assertNoFailure($response); @@ -271,6 +286,19 @@ public function run( ?iterable $tsxMetadata, ): array { $extra = $this->buildRunExtra($database, $timeout, $holder, $mode, $tsxMetadata); + + if ($this->protocol() instanceof V3) { + $this->v3PullAllQueued = false; + $this->messageFactory->createRunMessage($text, $parameters, $extra)->send(); + $this->messageFactory->createPullMessage([])->send(); + $response = $this->protocol()->getResponse(); + $this->assertNoFailure($response); + $this->v3PullAllQueued = true; + + /** @var BoltMeta */ + return $response->content; + } + $message = $this->messageFactory->createRunMessage($text, $parameters, $extra); $response = $message->send()->getResponse(); $this->assertNoFailure($response); @@ -293,7 +321,7 @@ public function rollback(): void $this->assertNoFailure($response); } - public function protocol(): V4_4|V5|V5_1|V5_2|V5_3|V5_4 + public function protocol(): V3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 { if (!isset($this->boltProtocol)) { throw new Exception('Connection is closed'); @@ -337,6 +365,19 @@ public function pull(?int $qid, ?int $fetchSize): array return $tbr; } catch (Throwable $e) { $this->restoreOriginalTimeout(); + if ($e instanceof Neo4jException) { + // RECORD(s) then FAILURE in one PULL: RESET already ran in assertNoFailure — return rows and + // defer the exception to the next fetchResults() so the last record is yielded first and no + // extra PULL is sent (pull_2_end_error.script). + if (!empty($tbr)) { + $this->deferredPullFailure = $e; + $tbr[] = ['has_more' => true]; + + /** @var non-empty-list */ + return $tbr; + } + throw $e; + } // If we've received some records before the disconnect, return them so first next() succeeds. // Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts). // Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator @@ -363,6 +404,8 @@ public function close(): void // Other exceptions (Neo4jException, TypeError, etc.) should propagate. try { if ($this->isOpen()) { + $this->v3PullAllQueued = false; + $this->deferredPullFailure = null; if ($this->isStreaming()) { $this->discardUnconsumedResults(); } @@ -390,11 +433,38 @@ public function close(): void */ public function invalidate(): void { + $this->v3PullAllQueued = false; + $this->deferredPullFailure = null; $this->subscribedResults = []; $this->connection->disconnect(); unset($this->boltProtocol); } + /** + * @internal Used by {@see BoltPullMessage} for Bolt v3 pipelined RUN + PULL_ALL. + */ + public function consumeQueuedV3PullAll(): bool + { + if (!$this->boltProtocol instanceof V3 || !$this->v3PullAllQueued) { + return false; + } + + $this->v3PullAllQueued = false; + + return true; + } + + /** + * Consumes a FAILURE that was deferred from the previous {@see pull()} (RECORD(s) then FAILURE). + */ + public function takeDeferredPullFailure(): ?Neo4jException + { + $e = $this->deferredPullFailure; + $this->deferredPullFailure = null; + + return $e; + } + private function buildRunExtra(?string $database, ?float $timeout, ?BookmarkHolder $holder, ?AccessMode $mode, ?iterable $metadata): array { $extra = []; @@ -420,6 +490,10 @@ private function buildRunExtra(?string $database, ?float $timeout, ?BookmarkHold } } + if (isset($this->boltProtocol) && $this->boltProtocol instanceof V3) { + unset($extra['db'], $extra['mode'], $extra['bookmarks']); + } + return $extra; } @@ -431,6 +505,10 @@ private function buildResultExtra(?int $fetchSize, ?int $qid): array } if ($qid !== null && $qid >= 0) { + // Always send explicit qid (including 0). Omitting qid defaults to the "current" stream; with + // multiple concurrent RUN streams in a transaction, PULL must target the correct stream or Neo4j + // returns e.g. "No such statement: N" (Neo.ClientError.Request.InvalidFormat). The Bolt library's + // openStreams counter is not reliable for this across all server versions and message orderings. $extra['qid'] = $qid; } diff --git a/src/Bolt/BoltResult.php b/src/Bolt/BoltResult.php index ac4e5233..1f2180c6 100644 --- a/src/Bolt/BoltResult.php +++ b/src/Bolt/BoltResult.php @@ -145,6 +145,11 @@ private function fetchResults(): void { $this->networkPullOccurred = true; + $deferred = $this->connection->takeDeferredPullFailure(); + if ($deferred !== null) { + throw $deferred; + } + try { $meta = $this->connection->pull($this->qid, $this->effectivePullSize()); } catch (BoltConnectException|BoltException $e) { diff --git a/src/Bolt/BoltUnmanagedTransaction.php b/src/Bolt/BoltUnmanagedTransaction.php index 8af372ce..aaec6d64 100644 --- a/src/Bolt/BoltUnmanagedTransaction.php +++ b/src/Bolt/BoltUnmanagedTransaction.php @@ -13,7 +13,6 @@ namespace Laudis\Neo4j\Bolt; -use Bolt\enum\ServerState; use Laudis\Neo4j\Contracts\ConnectionPoolInterface; use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; use Laudis\Neo4j\Databags\BookmarkHolder; @@ -86,6 +85,8 @@ public function commit(iterable $statements = []): CypherList $this->ensureBeginSent(); + $this->connection->consumeResults(); + // Force the results to pull all the results. // After a commit, the connection will be in the ready state, making it impossible to use PULL $tbr = $this->runStatements($statements)->each(static function (CypherList $list) { @@ -101,6 +102,11 @@ public function commit(iterable $statements = []): CypherList public function rollback(): void { if ($this->isFinished()) { + if ($this->state === TransactionState::TERMINATED) { + // Run/pull already failed; connection may have been RESET — nothing to send. + return; + } + if ($this->state === TransactionState::COMMITTED) { throw new TransactionException("Can't rollback a committed transaction."); } @@ -112,6 +118,14 @@ public function rollback(): void $this->ensureBeginSent(); + // FAILURE on PULL triggers RESET in {@see BoltConnection::assertNoFailure()}; server has no open tx. + // TestKit stubs (e.g. tx_error_on_pull.script) expect no ROLLBACK after that RESET. + if ($this->connection->getServerState() === 'READY') { + $this->state = TransactionState::ROLLED_BACK; + + return; + } + $this->messageFactory->createRollbackMessage()->send(); $this->state = TransactionState::ROLLED_BACK; } @@ -146,8 +160,7 @@ public function runStatement(Statement $statement): SummarizedResult $parameters = ParameterHelper::formatParameters($statement->getParameters(), $this->connection->getProtocol()); $start = microtime(true); - $serverState = $this->connection->protocol()->serverState; - if ($serverState === ServerState::STREAMING) { + if ($this->connection->isStreaming()) { $this->connection->consumeResults(); } @@ -217,7 +230,15 @@ public function isFinished(): bool private function ensureBeginSent(): void { - if ($this->isInstantTransaction || $this->beginSent) { + if ($this->isInstantTransaction) { + return; + } + // FAILURE on PULL triggers RESET in BoltConnection — server is READY with no tx, but we may still + // have beginSent=true (e.g. execute_read retry). Must send BEGIN again before RUN. + if ($this->beginSent && $this->state === TransactionState::ACTIVE && $this->connection->getServerState() === 'READY') { + $this->beginSent = false; + } + if ($this->beginSent) { return; } try { diff --git a/src/Bolt/ConnectionPool.php b/src/Bolt/ConnectionPool.php index 580927d1..1544cafb 100644 --- a/src/Bolt/ConnectionPool.php +++ b/src/Bolt/ConnectionPool.php @@ -112,15 +112,10 @@ public function acquire(SessionConfiguration $config): Generator public function release(ConnectionInterface $connection): void { + // Return a permit only — keep the connection in {@see $activeConnections} so it stays + // pooled for reuse and is still closed by {@see close()}. Removing it here orphaned + // sockets (no GOODBYE on driver close), which breaks TestKit stubs after errors. $this->semaphore->post(); - - foreach ($this->activeConnections as $i => $activeConnection) { - if ($connection === $activeConnection) { - array_splice($this->activeConnections, $i, 1); - - return; - } - } } public function getLogger(): ?Neo4jLogger diff --git a/src/Bolt/Messages/BoltDiscardMessage.php b/src/Bolt/Messages/BoltDiscardMessage.php index 74824f56..14d74bf4 100644 --- a/src/Bolt/Messages/BoltDiscardMessage.php +++ b/src/Bolt/Messages/BoltDiscardMessage.php @@ -13,6 +13,7 @@ namespace Laudis\Neo4j\Bolt\Messages; +use Bolt\protocol\V3; use Laudis\Neo4j\Bolt\BoltConnection; use Laudis\Neo4j\Common\Neo4jLogger; use Laudis\Neo4j\Contracts\BoltMessage; @@ -31,7 +32,12 @@ public function __construct( public function send(): BoltDiscardMessage { $this->logger?->log(LogLevel::DEBUG, 'DISCARD', $this->extra); - $this->connection->protocol()->discard($this->extra); + $protocol = $this->connection->protocol(); + if ($protocol instanceof V3) { + $protocol->discardAll(); + } else { + $protocol->discard($this->extra); + } return $this; } diff --git a/src/Bolt/Messages/BoltPullMessage.php b/src/Bolt/Messages/BoltPullMessage.php index 42339e97..3ba7e02d 100644 --- a/src/Bolt/Messages/BoltPullMessage.php +++ b/src/Bolt/Messages/BoltPullMessage.php @@ -13,6 +13,7 @@ namespace Laudis\Neo4j\Bolt\Messages; +use Bolt\protocol\V3; use Laudis\Neo4j\Bolt\BoltConnection; use Laudis\Neo4j\Common\Neo4jLogger; use Laudis\Neo4j\Contracts\BoltMessage; @@ -31,7 +32,14 @@ public function __construct( public function send(): BoltPullMessage { $this->logger?->log(LogLevel::DEBUG, 'PULL', $this->extra); - $this->connection->protocol()->pull($this->extra); + $protocol = $this->connection->protocol(); + if ($protocol instanceof V3) { + if (!$this->connection->consumeQueuedV3PullAll()) { + $protocol->pullAll(); + } + } else { + $protocol->pull($this->extra); + } return $this; } diff --git a/src/Bolt/ProtocolFactory.php b/src/Bolt/ProtocolFactory.php index 5c03c00e..0d3ef9d5 100644 --- a/src/Bolt/ProtocolFactory.php +++ b/src/Bolt/ProtocolFactory.php @@ -15,6 +15,7 @@ use Bolt\Bolt; use Bolt\connection\IConnection; +use Bolt\protocol\V3; use Bolt\protocol\V4_4; use Bolt\protocol\V5; use Bolt\protocol\V5_1; @@ -25,7 +26,7 @@ class ProtocolFactory { - public function createProtocol(IConnection $connection): V4_4|V5|V5_1|V5_2|V5_3|V5_4 + public function createProtocol(IConnection $connection): V3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 { $boltOptoutEnv = getenv('BOLT_ANALYTICS_OPTOUT'); if ($boltOptoutEnv === false) { @@ -33,12 +34,12 @@ public function createProtocol(IConnection $connection): V4_4|V5|V5_1|V5_2|V5_3| } $bolt = new Bolt($connection); - // Offer protocol versions from newest to oldest (only 4.4 and above are supported) - $bolt->setProtocolVersions('5.4.4', 4.4); + // Newest first; include 3.0 for legacy servers and TestKit stub (BOLT 3) + $bolt->setProtocolVersions('5.4.4', 4.4, '3.0'); $protocol = $bolt->build(); - if (!($protocol instanceof V4_4 || $protocol instanceof V5 || $protocol instanceof V5_1 || $protocol instanceof V5_2 || $protocol instanceof V5_3 || $protocol instanceof V5_4)) { - throw new RuntimeException('Client only supports bolt version 4.4 to 5.4'); + if (!($protocol instanceof V3 || $protocol instanceof V4_4 || $protocol instanceof V5 || $protocol instanceof V5_1 || $protocol instanceof V5_2 || $protocol instanceof V5_3 || $protocol instanceof V5_4)) { + throw new RuntimeException('Client only supports bolt version 3.0 to 5.4'); } return $protocol; diff --git a/testkit-backend/src/Handlers/ResultList.php b/testkit-backend/src/Handlers/ResultList.php index c06a306c..d7265615 100644 --- a/testkit-backend/src/Handlers/ResultList.php +++ b/testkit-backend/src/Handlers/ResultList.php @@ -67,22 +67,26 @@ public function handle($request): TestkitResponseInterface return new RecordListResponse($rows); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + // Keep error for RetryableNegative lookup by result id (execute_read tx_func tests). + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { - $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { - $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } diff --git a/testkit-backend/src/Handlers/ResultNext.php b/testkit-backend/src/Handlers/ResultNext.php index d9b99258..c3c98f39 100644 --- a/testkit-backend/src/Handlers/ResultNext.php +++ b/testkit-backend/src/Handlers/ResultNext.php @@ -73,22 +73,25 @@ public function handle($request): TestkitResponseInterface return new RecordResponse($values); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { - $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { - $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } diff --git a/testkit-backend/src/Handlers/ResultPeek.php b/testkit-backend/src/Handlers/ResultPeek.php index 504a5247..8ed8b3db 100644 --- a/testkit-backend/src/Handlers/ResultPeek.php +++ b/testkit-backend/src/Handlers/ResultPeek.php @@ -66,22 +66,25 @@ public function handle($request): TestkitResponseInterface return new RecordResponse($values); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { - $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { - $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } @@ -101,6 +104,7 @@ private function isConnectionOrSocketError(Throwable $e): bool || str_contains($message, 'network read incomplete') || str_contains($message, 'network write incomplete') || str_contains($message, 'socket') - || str_contains($message, 'broken'); + || str_contains($message, 'broken') + || str_contains($message, 'already been closed'); } } diff --git a/testkit-backend/src/Handlers/ResultSingle.php b/testkit-backend/src/Handlers/ResultSingle.php index 6af2bcce..5152ec01 100644 --- a/testkit-backend/src/Handlers/ResultSingle.php +++ b/testkit-backend/src/Handlers/ResultSingle.php @@ -13,6 +13,7 @@ namespace Laudis\Neo4j\TestkitBackend\Handlers; +use Bolt\error\BoltException; use Laudis\Neo4j\Databags\Neo4jError; use Laudis\Neo4j\Exception\Neo4jException; use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; @@ -22,6 +23,7 @@ use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse; use Laudis\Neo4j\TestkitBackend\Responses\RecordResponse; use Laudis\Neo4j\TestkitBackend\Responses\Types\CypherObject; +use Throwable; /** * Request to expect and return exactly one record in the result stream. @@ -42,28 +44,72 @@ public function __construct( public function handle($request): TestkitResponseInterface { - $record = $this->repository->getRecords($request->getResultId()); - if ($record instanceof TestkitResponseInterface) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Statement.ResultNotSingle', 'Something went wrong with the result handling')]); + try { + $record = $this->repository->getRecords($request->getResultId()); + if ($record instanceof TestkitResponseInterface) { + return $record; + } - return new DriverErrorResponse($request->getResultId(), $err); - } + $count = $record->count(); + if ($count !== 1) { + $err = new Neo4jException([Neo4jError::fromMessageAndCode( + 'Neo.ClientError.Statement.ResultNotSingle', + sprintf('Expected exactly one result row, found %d.', $count) + )]); + $response = new DriverErrorResponse($request->getResultId(), $err); + $this->repository->addRecords($request->getResultId(), $response); - $count = $record->count(); - if ($count !== 1) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode( - 'Neo.ClientError.Statement.ResultNotSingle', - sprintf('Expected exactly one result row, found %d.', $count) - )]); + return $response; + } - return new DriverErrorResponse($request->getResultId(), $err); - } + $values = []; + foreach ($record->getAsCypherMap(0) as $value) { + $values[] = CypherObject::autoDetect($value); + } + + $this->repository->removeRecords($request->getResultId()); + + return new RecordResponse($values); + } catch (Neo4jException $e) { + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); + + return $response; + } catch (BoltException $e) { + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - $values = []; - foreach ($record->getAsCypherMap(0) as $value) { - $values[] = CypherObject::autoDetect($value); + return $response; + } catch (Throwable $e) { + if ($this->isConnectionOrSocketError($e)) { + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); + + return $response; + } + throw $e; } + } + + private function isConnectionOrSocketError(Throwable $e): bool + { + $message = strtolower($e->getMessage()); - return new RecordResponse($values); + return str_contains($message, 'broken pipe') + || str_contains($message, 'connection reset') + || str_contains($message, 'connection refused') + || str_contains($message, 'connection closed') + || str_contains($message, 'connection is closed') + || str_contains($message, 'interrupted system call') + || str_contains($message, 'i/o error') + || str_contains($message, 'network read incomplete') + || str_contains($message, 'network write incomplete') + || str_contains($message, 'socket') + || str_contains($message, 'broken') + || str_contains($message, 'already been closed'); } } diff --git a/testkit-backend/src/Handlers/ResultSingleOptional.php b/testkit-backend/src/Handlers/ResultSingleOptional.php index 89ffa677..03b8c106 100644 --- a/testkit-backend/src/Handlers/ResultSingleOptional.php +++ b/testkit-backend/src/Handlers/ResultSingleOptional.php @@ -69,22 +69,25 @@ public function handle($request): TestkitResponseInterface ['Expected a single record but found multiple records in the stream.'] ); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { - $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { - $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } @@ -104,6 +107,7 @@ private function isConnectionOrSocketError(Throwable $e): bool || str_contains($message, 'network read incomplete') || str_contains($message, 'network write incomplete') || str_contains($message, 'socket') - || str_contains($message, 'broken'); + || str_contains($message, 'broken') + || str_contains($message, 'already been closed'); } } diff --git a/testkit-backend/src/Handlers/RetryableNegative.php b/testkit-backend/src/Handlers/RetryableNegative.php index 9481be64..d9c787bd 100644 --- a/testkit-backend/src/Handlers/RetryableNegative.php +++ b/testkit-backend/src/Handlers/RetryableNegative.php @@ -57,35 +57,43 @@ public function handle($request): TestkitResponseInterface return new BackendErrorResponse('Transaction not found '.$transactionId->toRfc4122()); } - try { - $tsx->rollback(); - } catch (Throwable $e) { - // Best-effort rollback: connection may already be broken. Proceed with error response. - $this->logger->debug('Rollback failed during RetryableNegative', ['exception' => $e->getMessage()]); - } - $errorId = $request->getErrorId(); + $resolvedException = null; if ($errorId !== '' && $errorId !== null) { try { $errorUuid = $errorId instanceof Uuid ? $errorId : Uuid::fromString($errorId); $errorResponse = $this->repository->getRecords($errorUuid); - if ($errorResponse instanceof DriverErrorResponse) { - $exception = $errorResponse->getException(); - if ($exception instanceof Neo4jException && $exception->getClassification() === 'TransientError') { - // If the original error was retryable, signal for retry - return new RetryableTryResponse($transactionId); - } - - // Otherwise, return the original error to the frontend - return new DriverErrorResponse($transactionId, $exception); + $resolvedException = $errorResponse->getException(); } } catch (Throwable $e) { - // Invalid errorId or record not found - fall through to generic FrontendError $this->logger->debug('Could not retrieve error for RetryableNegative', ['exception' => $e->getMessage()]); } } + // After FAILURE on PULL, BoltConnection RESETs before throwing Neo4jException — ROLLBACK is invalid on the wire. + $skipRollback = $resolvedException instanceof Neo4jException; + + if (!$skipRollback) { + try { + $tsx->rollback(); + } catch (Throwable $e) { + $this->logger->debug('Rollback failed during RetryableNegative', ['exception' => $e->getMessage()]); + } + } + + if ($resolvedException instanceof Neo4jException) { + if ($resolvedException->getClassification() === 'TransientError') { + return new RetryableTryResponse($transactionId); + } + + return new DriverErrorResponse($transactionId, $resolvedException); + } + + if ($resolvedException !== null) { + return new DriverErrorResponse($transactionId, $resolvedException); + } + // If no specific error was provided or couldn't be retrieved, // client code caused the rollback (e.g. ApplicationCodeError) - return FrontendError return new FrontendErrorResponse('Client code caused transaction to be rolled back'); diff --git a/testkit-backend/src/MainRepository.php b/testkit-backend/src/MainRepository.php index efb19ee6..05ec7146 100644 --- a/testkit-backend/src/MainRepository.php +++ b/testkit-backend/src/MainRepository.php @@ -188,17 +188,26 @@ public function getSession(Uuid $id): SessionInterface */ public function addRecords(Uuid $id, $result): void { - $this->records[$id->toRfc4122()] = $result; + $key = $id->toRfc4122(); + $this->records[$key] = $result; if ($result instanceof SummarizedResult) { /** @var SummarizedResult> $result */ - $this->recordIterators[$id->toRfc4122()] = $result; + $this->recordIterators[$key] = $result; + } else { + unset($this->recordIterators[$key]); } } public function removeRecords(Uuid $id): void { $key = $id->toRfc4122(); - unset($this->records[$key], $this->iteratorFetchedFirst[$key], $this->peekPrimed[$key], $this->pendingIteratorNextCount[$key]); + unset( + $this->records[$key], + $this->recordIterators[$key], + $this->iteratorFetchedFirst[$key], + $this->peekPrimed[$key], + $this->pendingIteratorNextCount[$key] + ); } /** diff --git a/testkit-backend/src/Socket.php b/testkit-backend/src/Socket.php index a01133a4..683cf5bc 100644 --- a/testkit-backend/src/Socket.php +++ b/testkit-backend/src/Socket.php @@ -90,11 +90,15 @@ public static function fromAddressAndPort(string $address, int $port): self public function reset(): void { - if ($this->socket !== null && !stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR)) { - throw new RuntimeException(json_encode(error_get_last(), JSON_THROW_ON_ERROR)); + if ($this->socket === null) { + return; } + $sock = $this->socket; $this->socket = null; + if (is_resource($sock)) { + @stream_socket_shutdown($sock, STREAM_SHUT_RDWR); + } } public function readMessage(): ?string diff --git a/testkit-backend/testkit.sh b/testkit-backend/testkit.sh index 3ac85647..5190fb99 100755 --- a/testkit-backend/testkit.sh +++ b/testkit-backend/testkit.sh @@ -163,29 +163,59 @@ python3 -m unittest -v \ tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_0_records \ tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_1_records \ tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_2_records \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_disconnect \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_failure \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_failure_tx_run \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_failure_tx_func_run \ tests.stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once \ tests.stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_run_result_list_pulls_all_records_at_once \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_run_result_list_pulls_all_records_at_once_next_before_list \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_func_result_list_pulls_all_records_at_once \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_func_result_list_pulls_all_records_at_once_next_before_list \ \ tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_0_records \ tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_1_records \ tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_2_records \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_disconnect \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_failure \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_failure_tx_run \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_failure_tx_func_run \ \ tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_0_records \ tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_1_records \ tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_2_records \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_disconnect \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_failure \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_failure_tx_run \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_failure_tx_func_run \ \ tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_0_records \ tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_1_records \ tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_2_records \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_disconnect \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure_tx_run \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure_tx_func_run \ \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_full_batch \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_half_batch \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_empty_batch \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_error \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all \ -\ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch \ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all \ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all_slow_connection \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all_v3 \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_discards_on_session_close \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested_using_list \ +\ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all_slow_connection \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch_v3 \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all_v3 \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested_using_list \ EXIT_CODE=$? diff --git a/tests/Unit/BoltConnectionPoolTest.php b/tests/Unit/BoltConnectionPoolTest.php index 987ce985..d4de3e53 100644 --- a/tests/Unit/BoltConnectionPoolTest.php +++ b/tests/Unit/BoltConnectionPoolTest.php @@ -122,7 +122,9 @@ public function testReleaseReference(): void $this->pool->release($connection); - static::assertEquals($refCount - 1, $this->refCount($connection)); + // Release returns the permit but keeps the connection in the pool for reuse and for + // {@see ConnectionPool::close()} — it must not drop the pool's reference. + static::assertEquals($refCount, $this->refCount($connection)); } /** From d6f23999b5c76d14905e2e33ee8d0caccd18674a Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Wed, 15 Apr 2026 12:37:23 +0530 Subject: [PATCH 04/12] fix(bolt): only consumeResults on STREAMING, not TX_STREAMING (nested/parallel tx) --- src/Bolt/BoltUnmanagedTransaction.php | 6 +++++- testkit | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Bolt/BoltUnmanagedTransaction.php b/src/Bolt/BoltUnmanagedTransaction.php index aaec6d64..5b3da7a3 100644 --- a/src/Bolt/BoltUnmanagedTransaction.php +++ b/src/Bolt/BoltUnmanagedTransaction.php @@ -13,6 +13,7 @@ namespace Laudis\Neo4j\Bolt; +use Bolt\enum\ServerState; use Laudis\Neo4j\Contracts\ConnectionPoolInterface; use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; use Laudis\Neo4j\Databags\BookmarkHolder; @@ -160,7 +161,10 @@ public function runStatement(Statement $statement): SummarizedResult $parameters = ParameterHelper::formatParameters($statement->getParameters(), $this->connection->getProtocol()); $start = microtime(true); - if ($this->connection->isStreaming()) { + // Only drain an outstanding autocommit result (STREAMING). In an explicit transaction (TX_STREAMING) + // several RUN streams may be open; consumeResults() would preload other streams and reorder PULLs + // vs RUN (TestKit tx_pull_1_nested*, Neo4j parallel/nested tx tests). + if ($this->connection->getServerState() === ServerState::STREAMING->name) { $this->connection->consumeResults(); } diff --git a/testkit b/testkit index f9e7590b..5fb75922 160000 --- a/testkit +++ b/testkit @@ -1 +1 @@ -Subproject commit f9e7590b44ef983b320fae9adcd0c220b8e02962 +Subproject commit 5fb7592281079df7436f139dd2cbda996b5040d0 From 405904550658ea5651d2920bbe96a63079773f41 Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Wed, 15 Apr 2026 14:33:48 +0530 Subject: [PATCH 05/12] Fixed psalm errors --- src/Bolt/BoltConnection.php | 2 +- src/Bolt/BoltUnmanagedTransaction.php | 3 +-- src/Neo4j/Neo4jConnectionPool.php | 4 ++++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index 5db24057..77275989 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -441,7 +441,7 @@ public function invalidate(): void } /** - * @internal Used by {@see BoltPullMessage} for Bolt v3 pipelined RUN + PULL_ALL. + * @internal used by {@see BoltPullMessage} for Bolt v3 pipelined RUN + PULL_ALL */ public function consumeQueuedV3PullAll(): bool { diff --git a/src/Bolt/BoltUnmanagedTransaction.php b/src/Bolt/BoltUnmanagedTransaction.php index 5b3da7a3..80fe4198 100644 --- a/src/Bolt/BoltUnmanagedTransaction.php +++ b/src/Bolt/BoltUnmanagedTransaction.php @@ -13,7 +13,6 @@ namespace Laudis\Neo4j\Bolt; -use Bolt\enum\ServerState; use Laudis\Neo4j\Contracts\ConnectionPoolInterface; use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; use Laudis\Neo4j\Databags\BookmarkHolder; @@ -164,7 +163,7 @@ public function runStatement(Statement $statement): SummarizedResult // Only drain an outstanding autocommit result (STREAMING). In an explicit transaction (TX_STREAMING) // several RUN streams may be open; consumeResults() would preload other streams and reorder PULLs // vs RUN (TestKit tx_pull_1_nested*, Neo4j parallel/nested tx tests). - if ($this->connection->getServerState() === ServerState::STREAMING->name) { + if ($this->connection->getServerState() === 'STREAMING') { $this->connection->consumeResults(); } diff --git a/src/Neo4j/Neo4jConnectionPool.php b/src/Neo4j/Neo4jConnectionPool.php index edbc2916..5eeaa5f8 100644 --- a/src/Neo4j/Neo4jConnectionPool.php +++ b/src/Neo4j/Neo4jConnectionPool.php @@ -14,6 +14,7 @@ namespace Laudis\Neo4j\Neo4j; use Bolt\error\ConnectException; +use Bolt\protocol\V3; use function count; @@ -308,6 +309,9 @@ private function getNextServer(RoutingTable $table, ?AccessMode $mode): Uri private function routingTable(BoltConnection $connection, SessionConfiguration $config): RoutingTable { $bolt = $connection->protocol(); + if ($bolt instanceof V3) { + throw new RuntimeException('Neo4j routing requires Bolt protocol 4.4 or newer.'); + } $this->getLogger()?->log(LogLevel::DEBUG, 'ROUTE', ['db' => $config->getDatabase()]); /** @var array{rt: array{servers: list, role:string}>, ttl: int}} $route */ From b5a294b3793bb1f733a96f618b483e4f9c0a6d59 Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Mon, 27 Apr 2026 14:43:11 +0530 Subject: [PATCH 06/12] Add CypherList::peek() for non-consuming next record; document in README --- README.md | 2 ++ src/Types/CypherList.php | 18 ++++++++++ tests/Unit/CypherListPeekTest.php | 59 +++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+) create mode 100644 tests/Unit/CypherListPeekTest.php diff --git a/README.md b/README.md index 363ce6f8..f89c3a3c 100644 --- a/README.md +++ b/README.md @@ -219,6 +219,8 @@ foreach ($results as $result) { } ``` +`peek()` returns the next row without moving the cursor (or `null` if there is none). It can pull the next record from the server into the buffer if needed, but does not advance past it—unlike `next()` in a `foreach`, which always consumes. Repeated calls to `peek()` return the same value until you advance the iterator (for example with `next()` or by continuing a `foreach`). + Cypher values and types map to these php types and classes: | Cypher | Php | diff --git a/src/Types/CypherList.php b/src/Types/CypherList.php index 663e0ca6..3a2a0121 100644 --- a/src/Types/CypherList.php +++ b/src/Types/CypherList.php @@ -308,6 +308,24 @@ public function key(): int return $this->cacheKey(); } + /** + * Returns the next record without consuming it (the iterator position is unchanged). + * + * This may pull and buffer the next row from the server when the result is lazy, the same as + * {@see current()} on a non-empty result. When there is no next record, returns `null` without + * erroring; calling {@see current()} on an exhausted result is not safe. + * + * @return TValue|null + */ + public function peek() + { + if (!$this->valid()) { + return null; + } + + return $this->current(); + } + /** * @return array */ diff --git a/tests/Unit/CypherListPeekTest.php b/tests/Unit/CypherListPeekTest.php new file mode 100644 index 00000000..80cf9164 --- /dev/null +++ b/tests/Unit/CypherListPeekTest.php @@ -0,0 +1,59 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\Tests\Unit; + +use Laudis\Neo4j\Types\CypherList; +use Laudis\Neo4j\Types\CypherMap; +use PHPUnit\Framework\TestCase; + +final class CypherListPeekTest extends TestCase +{ + public function testPeekReturnsNullOnEmptyResult(): void + { + $list = new CypherList([]); + + self::assertFalse($list->valid()); + self::assertNull($list->peek()); + } + + public function testPeekMatchesCurrentAndDoesNotAdvance(): void + { + $list = new CypherList([ + new CypherMap(['n' => 1]), + new CypherMap(['n' => 2]), + ]); + + $a = $list->peek(); + $b = $list->peek(); + $c = $list->current(); + + self::assertNotNull($a); + self::assertSame($a, $b); + self::assertSame($a, $c); + self::assertSame(1, $a->get('n')); + + $list->next(); + + self::assertSame(2, $list->peek()?->get('n')); + } + + public function testPeekReturnsNullAfterAllRowsConsumed(): void + { + $list = new CypherList([new CypherMap(['x' => true])]); + foreach ($list as $_) { + } + self::assertFalse($list->valid()); + self::assertNull($list->peek()); + } +} From b61ec931dae55673434bc8bd40f28a536fb19237 Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Tue, 28 Apr 2026 10:22:47 +0530 Subject: [PATCH 07/12] fixed psalm errors --- tests/Unit/TypeCasterTest.php | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tests/Unit/TypeCasterTest.php b/tests/Unit/TypeCasterTest.php index 40616e10..a4b3de0a 100644 --- a/tests/Unit/TypeCasterTest.php +++ b/tests/Unit/TypeCasterTest.php @@ -30,9 +30,11 @@ final class TypeCasterTest extends TestCase * Complete coverage: every input type × every cast method. * When a cast isn't possible, expected is null (invalid case). * - * @return iterable + * @return Generator + * + * @psalm-suppress MixedReturnTypeCoercion */ - public static function provideCastMatrix(): iterable + public static function provideCastMatrix(): Generator { $stringable = new class implements Stringable { public function __toString(): string @@ -89,6 +91,7 @@ public function __toString(): string foreach ($inputs as $inputName => $inputValue) { foreach ($matrix as $method => $expectations) { + /** @var mixed $expected */ $expected = $expectations[$inputName] ?? null; $key = $inputName.'->'.$method; @@ -107,7 +110,12 @@ public function __toString(): string ]; if ($method === 'toClass') { - $row['class'] = $expectations['_class'][$inputName] ?? stdClass::class; + if (!isset($expectations['_class']) || !is_array($expectations['_class'])) { + throw new InvalidArgumentException('Malformed toClass expectations: missing _class map'); + } + /** @var array $classMap */ + $classMap = $expectations['_class']; + $row['class'] = $classMap[$inputName] ?? stdClass::class; } yield $key => $row; From 410955a359458511395c3c5cf8d0ce8049093772 Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Tue, 28 Apr 2026 11:12:14 +0530 Subject: [PATCH 08/12] fixed code standards --- tests/Unit/TypeCasterTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/Unit/TypeCasterTest.php b/tests/Unit/TypeCasterTest.php index a4b3de0a..29b62454 100644 --- a/tests/Unit/TypeCasterTest.php +++ b/tests/Unit/TypeCasterTest.php @@ -91,7 +91,7 @@ public function __toString(): string foreach ($inputs as $inputName => $inputValue) { foreach ($matrix as $method => $expectations) { - /** @var mixed $expected */ + /** @psalm-suppress MixedAssignment */ $expected = $expectations[$inputName] ?? null; $key = $inputName.'->'.$method; @@ -110,7 +110,7 @@ public function __toString(): string ]; if ($method === 'toClass') { - if (!isset($expectations['_class']) || !is_array($expectations['_class'])) { + if (!array_key_exists('_class', $expectations) || !is_array($expectations['_class'])) { throw new InvalidArgumentException('Malformed toClass expectations: missing _class map'); } /** @var array $classMap */ From 7483e488bb7139769e434640f6779dd8e3b6db44 Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Mon, 4 May 2026 12:11:00 +0530 Subject: [PATCH 09/12] fix: use Bolt Path indices and require stefanak-michal/bolt ^7.4 --- composer.json | 2 +- src/Formatter/Specialised/BoltOGMTranslator.php | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/composer.json b/composer.json index 961b76fb..22970a2a 100644 --- a/composer.json +++ b/composer.json @@ -28,7 +28,7 @@ "psr/http-factory": "^1.0", "psr/http-client": "^1.0", "php-http/message": "^1.0", - "stefanak-michal/bolt": "^7.1.4", + "stefanak-michal/bolt": "^7.4", "symfony/polyfill-php80": "^1.2", "psr/simple-cache": ">=2.0", "ext-json": "*", diff --git a/src/Formatter/Specialised/BoltOGMTranslator.php b/src/Formatter/Specialised/BoltOGMTranslator.php index 8959c90a..905e2284 100644 --- a/src/Formatter/Specialised/BoltOGMTranslator.php +++ b/src/Formatter/Specialised/BoltOGMTranslator.php @@ -298,13 +298,13 @@ private function makeFromBoltPath(BoltPath $path): Path foreach ($rels as $rel) { $relationships[] = $this->makeFromBoltUnboundRelationship($rel); } - /** @var list $ids */ - $ids = $path->ids; + /** @var list $indices */ + $indices = $path->indices; return new Path( new CypherList($nodes), new CypherList($relationships), - new CypherList($ids), + new CypherList($indices), ); } From 4c1c08a5fa42af11a9728cb975a847a7ed437dfc Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Mon, 4 May 2026 14:38:43 +0530 Subject: [PATCH 10/12] refactor(bolt): remove Bolt protocol v3 handling and related TestKit cases --- src/Bolt/BoltConnection.php | 48 ++---------------------- src/Bolt/Messages/BoltDiscardMessage.php | 8 +--- src/Bolt/Messages/BoltPullMessage.php | 10 +---- src/Bolt/ProtocolFactory.php | 10 ++--- src/BoltFactory.php | 2 +- src/Enum/ConnectionProtocol.php | 3 +- src/Neo4j/Neo4jConnectionPool.php | 4 -- testkit-backend/features.php | 2 +- testkit-backend/testkit.sh | 3 -- 9 files changed, 13 insertions(+), 77 deletions(-) diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index 77275989..49fcf6c7 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -17,7 +17,6 @@ use Bolt\enum\Signature; use Bolt\error\ConnectException as BoltConnectException; use Bolt\protocol\Response; -use Bolt\protocol\V3; use Bolt\protocol\V4_4; use Bolt\protocol\V5; use Bolt\protocol\V5_1; @@ -45,7 +44,7 @@ use WeakReference; /** - * @implements ConnectionInterface + * @implements ConnectionInterface * * @psalm-import-type BoltMeta from SummarizedResultFormatter */ @@ -72,12 +71,6 @@ class BoltConnection implements ConnectionInterface private ?float $originalTimeout = null; - /** - * Bolt v3 expects RUN and PULL_ALL to be sent before any SUCCESS is read (pipelined). After {@see run()} - * queues both, the first {@see pull()} must not send a second PULL_ALL. - */ - private bool $v3PullAllQueued = false; - /** * When one PULL yields RECORD(s) then FAILURE, {@see pull()} defers the {@see Neo4jException} to the next * {@see BoltResult::fetchResults()} so records are delivered before the error (TestKit pull_2_end_error.script). @@ -85,7 +78,7 @@ class BoltConnection implements ConnectionInterface private ?Neo4jException $deferredPullFailure = null; /** - * @return array{0: V3|V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection} + * @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection} */ public function getImplementation(): array { @@ -96,7 +89,7 @@ public function getImplementation(): array * @psalm-mutation-free */ public function __construct( - private V3|V4_4|V5|V5_1|V5_2|V5_3|V5_4|null $boltProtocol, + private V4_4|V5|V5_1|V5_2|V5_3|V5_4|null $boltProtocol, private readonly Connection $connection, private readonly AuthenticateInterface $auth, private readonly string $userAgent, @@ -230,7 +223,6 @@ public function consumeResults(): void */ public function reset(): void { - $this->v3PullAllQueued = false; $this->deferredPullFailure = null; $message = $this->messageFactory->createResetMessage(); $response = $message->send()->getResponse(); @@ -287,18 +279,6 @@ public function run( ): array { $extra = $this->buildRunExtra($database, $timeout, $holder, $mode, $tsxMetadata); - if ($this->protocol() instanceof V3) { - $this->v3PullAllQueued = false; - $this->messageFactory->createRunMessage($text, $parameters, $extra)->send(); - $this->messageFactory->createPullMessage([])->send(); - $response = $this->protocol()->getResponse(); - $this->assertNoFailure($response); - $this->v3PullAllQueued = true; - - /** @var BoltMeta */ - return $response->content; - } - $message = $this->messageFactory->createRunMessage($text, $parameters, $extra); $response = $message->send()->getResponse(); $this->assertNoFailure($response); @@ -321,7 +301,7 @@ public function rollback(): void $this->assertNoFailure($response); } - public function protocol(): V3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 + public function protocol(): V4_4|V5|V5_1|V5_2|V5_3|V5_4 { if (!isset($this->boltProtocol)) { throw new Exception('Connection is closed'); @@ -404,7 +384,6 @@ public function close(): void // Other exceptions (Neo4jException, TypeError, etc.) should propagate. try { if ($this->isOpen()) { - $this->v3PullAllQueued = false; $this->deferredPullFailure = null; if ($this->isStreaming()) { $this->discardUnconsumedResults(); @@ -433,27 +412,12 @@ public function close(): void */ public function invalidate(): void { - $this->v3PullAllQueued = false; $this->deferredPullFailure = null; $this->subscribedResults = []; $this->connection->disconnect(); unset($this->boltProtocol); } - /** - * @internal used by {@see BoltPullMessage} for Bolt v3 pipelined RUN + PULL_ALL - */ - public function consumeQueuedV3PullAll(): bool - { - if (!$this->boltProtocol instanceof V3 || !$this->v3PullAllQueued) { - return false; - } - - $this->v3PullAllQueued = false; - - return true; - } - /** * Consumes a FAILURE that was deferred from the previous {@see pull()} (RECORD(s) then FAILURE). */ @@ -490,10 +454,6 @@ private function buildRunExtra(?string $database, ?float $timeout, ?BookmarkHold } } - if (isset($this->boltProtocol) && $this->boltProtocol instanceof V3) { - unset($extra['db'], $extra['mode'], $extra['bookmarks']); - } - return $extra; } diff --git a/src/Bolt/Messages/BoltDiscardMessage.php b/src/Bolt/Messages/BoltDiscardMessage.php index 14d74bf4..74824f56 100644 --- a/src/Bolt/Messages/BoltDiscardMessage.php +++ b/src/Bolt/Messages/BoltDiscardMessage.php @@ -13,7 +13,6 @@ namespace Laudis\Neo4j\Bolt\Messages; -use Bolt\protocol\V3; use Laudis\Neo4j\Bolt\BoltConnection; use Laudis\Neo4j\Common\Neo4jLogger; use Laudis\Neo4j\Contracts\BoltMessage; @@ -32,12 +31,7 @@ public function __construct( public function send(): BoltDiscardMessage { $this->logger?->log(LogLevel::DEBUG, 'DISCARD', $this->extra); - $protocol = $this->connection->protocol(); - if ($protocol instanceof V3) { - $protocol->discardAll(); - } else { - $protocol->discard($this->extra); - } + $this->connection->protocol()->discard($this->extra); return $this; } diff --git a/src/Bolt/Messages/BoltPullMessage.php b/src/Bolt/Messages/BoltPullMessage.php index 3ba7e02d..42339e97 100644 --- a/src/Bolt/Messages/BoltPullMessage.php +++ b/src/Bolt/Messages/BoltPullMessage.php @@ -13,7 +13,6 @@ namespace Laudis\Neo4j\Bolt\Messages; -use Bolt\protocol\V3; use Laudis\Neo4j\Bolt\BoltConnection; use Laudis\Neo4j\Common\Neo4jLogger; use Laudis\Neo4j\Contracts\BoltMessage; @@ -32,14 +31,7 @@ public function __construct( public function send(): BoltPullMessage { $this->logger?->log(LogLevel::DEBUG, 'PULL', $this->extra); - $protocol = $this->connection->protocol(); - if ($protocol instanceof V3) { - if (!$this->connection->consumeQueuedV3PullAll()) { - $protocol->pullAll(); - } - } else { - $protocol->pull($this->extra); - } + $this->connection->protocol()->pull($this->extra); return $this; } diff --git a/src/Bolt/ProtocolFactory.php b/src/Bolt/ProtocolFactory.php index 0d3ef9d5..4f574c0d 100644 --- a/src/Bolt/ProtocolFactory.php +++ b/src/Bolt/ProtocolFactory.php @@ -15,7 +15,6 @@ use Bolt\Bolt; use Bolt\connection\IConnection; -use Bolt\protocol\V3; use Bolt\protocol\V4_4; use Bolt\protocol\V5; use Bolt\protocol\V5_1; @@ -26,7 +25,7 @@ class ProtocolFactory { - public function createProtocol(IConnection $connection): V3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 + public function createProtocol(IConnection $connection): V4_4|V5|V5_1|V5_2|V5_3|V5_4 { $boltOptoutEnv = getenv('BOLT_ANALYTICS_OPTOUT'); if ($boltOptoutEnv === false) { @@ -34,12 +33,11 @@ public function createProtocol(IConnection $connection): V3|V4_4|V5|V5_1|V5_2|V5 } $bolt = new Bolt($connection); - // Newest first; include 3.0 for legacy servers and TestKit stub (BOLT 3) - $bolt->setProtocolVersions('5.4.4', 4.4, '3.0'); + $bolt->setProtocolVersions('5.4.4', 4.4); $protocol = $bolt->build(); - if (!($protocol instanceof V3 || $protocol instanceof V4_4 || $protocol instanceof V5 || $protocol instanceof V5_1 || $protocol instanceof V5_2 || $protocol instanceof V5_3 || $protocol instanceof V5_4)) { - throw new RuntimeException('Client only supports bolt version 3.0 to 5.4'); + if (!($protocol instanceof V4_4 || $protocol instanceof V5 || $protocol instanceof V5_1 || $protocol instanceof V5_2 || $protocol instanceof V5_3 || $protocol instanceof V5_4)) { + throw new RuntimeException('Client only supports Bolt protocol 4.4 through 5.4'); } return $protocol; diff --git a/src/BoltFactory.php b/src/BoltFactory.php index 204e8933..430f6216 100644 --- a/src/BoltFactory.php +++ b/src/BoltFactory.php @@ -30,7 +30,7 @@ use Laudis\Neo4j\Enum\SocketType; /** - * Small wrapper around the bolt library to easily guarantee only bolt version 3 and up will be created and authenticated. + * Small wrapper around the bolt library to create and authenticate Bolt connections (protocol 4.4+). */ class BoltFactory { diff --git a/src/Enum/ConnectionProtocol.php b/src/Enum/ConnectionProtocol.php index e5d68b0e..d816121d 100644 --- a/src/Enum/ConnectionProtocol.php +++ b/src/Enum/ConnectionProtocol.php @@ -13,7 +13,6 @@ namespace Laudis\Neo4j\Enum; -use Bolt\protocol\V3; use Bolt\protocol\V4; use Bolt\protocol\V4_1; use Bolt\protocol\V4_2; @@ -67,7 +66,7 @@ final class ConnectionProtocol extends TypedEnum implements JsonSerializable * * @psalm-suppress ImpureMethodCall */ - public static function determineBoltVersion(V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $bolt): self + public static function determineBoltVersion(V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $bolt): self { $version = self::resolve($bolt->getVersion()); diff --git a/src/Neo4j/Neo4jConnectionPool.php b/src/Neo4j/Neo4jConnectionPool.php index 5eeaa5f8..edbc2916 100644 --- a/src/Neo4j/Neo4jConnectionPool.php +++ b/src/Neo4j/Neo4jConnectionPool.php @@ -14,7 +14,6 @@ namespace Laudis\Neo4j\Neo4j; use Bolt\error\ConnectException; -use Bolt\protocol\V3; use function count; @@ -309,9 +308,6 @@ private function getNextServer(RoutingTable $table, ?AccessMode $mode): Uri private function routingTable(BoltConnection $connection, SessionConfiguration $config): RoutingTable { $bolt = $connection->protocol(); - if ($bolt instanceof V3) { - throw new RuntimeException('Neo4j routing requires Bolt protocol 4.4 or newer.'); - } $this->getLogger()?->log(LogLevel::DEBUG, 'ROUTE', ['db' => $config->getDatabase()]); /** @var array{rt: array{servers: list, role:string}>, ttl: int}} $route */ diff --git a/testkit-backend/features.php b/testkit-backend/features.php index d7405993..3df9bd4b 100644 --- a/testkit-backend/features.php +++ b/testkit-backend/features.php @@ -99,7 +99,7 @@ // notified when the server reports a token expired. 'Feature:Auth:Managed' => false, // The driver supports Bolt protocol version 3 - 'Feature:Bolt:3.0' => true, + 'Feature:Bolt:3.0' => false, // The driver supports Bolt protocol version 4.1 'Feature:Bolt:4.1' => true, // The driver supports Bolt protocol version 4.2 diff --git a/testkit-backend/testkit.sh b/testkit-backend/testkit.sh index 5190fb99..5caa2995 100755 --- a/testkit-backend/testkit.sh +++ b/testkit-backend/testkit.sh @@ -204,7 +204,6 @@ python3 -m unittest -v \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_error \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all_slow_connection \ - tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all_v3 \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_discards_on_session_close \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested_using_list \ @@ -212,8 +211,6 @@ python3 -m unittest -v \ tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch \ tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all \ tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all_slow_connection \ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch_v3 \ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all_v3 \ tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested \ tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested_using_list \ From a92256790df26547f757f058b842bd05e53e615f Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Wed, 6 May 2026 17:07:45 +0530 Subject: [PATCH 11/12] fixed merged conflicts and solved failed tests --- src/Bolt/BoltConnection.php | 10 ++++-- testkit-backend/src/Handlers/ResultList.php | 1 + testkit-backend/src/Handlers/ResultPeek.php | 1 + testkit-backend/src/Handlers/ResultSingle.php | 36 +++++++++---------- .../src/Handlers/ResultSingleOptional.php | 2 +- tests/Unit/TypeCasterTest.php | 10 ++---- 6 files changed, 32 insertions(+), 28 deletions(-) diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index b31a20aa..2a7d77d8 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -72,8 +72,14 @@ class BoltConnection implements ConnectionInterface private ?float $originalTimeout = null; /** - * When one PULL yields RECORD(s) then FAILURE, {@see pull()} defers the {@see Neo4jException} to the next - * {@see BoltResult::fetchResults()} so records are delivered before the error (TestKit pull_2_end_error.script). + * When one PULL yields RECORD(s) then FAILURE, {@see assertNoFailure()} runs RESET on the FAILURE before we + * can finish yielding those rows. {@see pull()} therefore defers the {@see Neo4jException} to the next + * {@see BoltResult::fetchResults()} so buffered RECORDs are delivered first (TestKit pull_2_end_error.script). + * + * This is not peek-specific: any consumption path that pulls (including {@see CypherList::peek()} calling + * {@see Iterator::current()} on a lazy {@see BoltResult}) relies on the same ordering. Official Neo4j drivers + * treat peek at end-of-stream as non-throwing (null / absent record); for a FAILURE that arrives after RECORDs + * in the same PULL response, surfacing the error only after those rows matches the same stream semantics. */ private ?Neo4jException $deferredPullFailure = null; diff --git a/testkit-backend/src/Handlers/ResultList.php b/testkit-backend/src/Handlers/ResultList.php index a9046b30..c2fc61dc 100644 --- a/testkit-backend/src/Handlers/ResultList.php +++ b/testkit-backend/src/Handlers/ResultList.php @@ -89,6 +89,7 @@ public function handle($request): TestkitResponseInterface $this->repository->addRecords($request->getResultId(), $response); return $response; + return new DriverErrorResponse($request->getResultId(), $wrapped); } throw $e; diff --git a/testkit-backend/src/Handlers/ResultPeek.php b/testkit-backend/src/Handlers/ResultPeek.php index 6c38054b..7b9c6f60 100644 --- a/testkit-backend/src/Handlers/ResultPeek.php +++ b/testkit-backend/src/Handlers/ResultPeek.php @@ -87,6 +87,7 @@ public function handle($request): TestkitResponseInterface $this->repository->addRecords($request->getResultId(), $response); return $response; + return new DriverErrorResponse($request->getResultId(), $wrapped); } throw $e; diff --git a/testkit-backend/src/Handlers/ResultSingle.php b/testkit-backend/src/Handlers/ResultSingle.php index 3993ef1f..4d054508 100644 --- a/testkit-backend/src/Handlers/ResultSingle.php +++ b/testkit-backend/src/Handlers/ResultSingle.php @@ -49,12 +49,12 @@ public function handle($request): TestkitResponseInterface if ($record instanceof TestkitResponseInterface) { return $record; } - $record = $this->repository->getRecords($request->getResultId()); - if ($record instanceof TestkitResponseInterface) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Statement.ResultNotSingle', 'Something went wrong with the result handling')]); + $record = $this->repository->getRecords($request->getResultId()); + if ($record instanceof TestkitResponseInterface) { + $err = new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Statement.ResultNotSingle', 'Something went wrong with the result handling')]); - return new DriverErrorResponse($request->getResultId(), $err); - } + return new DriverErrorResponse($request->getResultId(), $err); + } $count = $record->count(); if ($count !== 1) { @@ -67,15 +67,15 @@ public function handle($request): TestkitResponseInterface return $response; } - $count = $record->count(); - if ($count !== 1) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode( - 'Neo.ClientError.Statement.ResultNotSingle', - sprintf('Expected exactly one result row, found %d.', $count) - )]); - - return new DriverErrorResponse($request->getResultId(), $err); - } + $count = $record->count(); + if ($count !== 1) { + $err = new Neo4jException([Neo4jError::fromMessageAndCode( + 'Neo.ClientError.Statement.ResultNotSingle', + sprintf('Expected exactly one result row, found %d.', $count) + )]); + + return new DriverErrorResponse($request->getResultId(), $err); + } $values = []; foreach ($record->getAsCypherMap(0) as $value) { @@ -83,10 +83,10 @@ public function handle($request): TestkitResponseInterface } $this->repository->removeRecords($request->getResultId()); - $values = []; - foreach ($record->getAsCypherMap(0) as $value) { - $values[] = CypherObject::autoDetect($value); - } + $values = []; + foreach ($record->getAsCypherMap(0) as $value) { + $values[] = CypherObject::autoDetect($value); + } return new RecordResponse($values); } catch (Neo4jException $e) { diff --git a/testkit-backend/src/Handlers/ResultSingleOptional.php b/testkit-backend/src/Handlers/ResultSingleOptional.php index 0cbafd9a..cd40f785 100644 --- a/testkit-backend/src/Handlers/ResultSingleOptional.php +++ b/testkit-backend/src/Handlers/ResultSingleOptional.php @@ -70,8 +70,8 @@ public function handle($request): TestkitResponseInterface ); } catch (Neo4jException $e) { $response = new DriverErrorResponse($request->getResultId(), $e); + // Keep error for RetryableNegative lookup by result id (execute_read tx_func tests). $this->repository->addRecords($request->getResultId(), $response); - $this->repository->removeRecords($request->getResultId()); return $response; } catch (BoltException $e) { diff --git a/tests/Unit/TypeCasterTest.php b/tests/Unit/TypeCasterTest.php index a41518db..d6f53e05 100644 --- a/tests/Unit/TypeCasterTest.php +++ b/tests/Unit/TypeCasterTest.php @@ -33,12 +33,11 @@ final class TypeCasterTest extends TestCase * Complete coverage: every input type × every cast method. * When a cast isn't possible, expected is null (invalid case). * - * @return Generator - * - * @psalm-suppress MixedReturnTypeCoercion - * Yields argument lists for {@see testCastMatrix} in order: input, method, expected, class (null unless toClass). + * Yields argument lists for {@see testCastMatrix}: input, method, expected, class (null unless toClass). * * @return Generator + * + * @psalm-suppress MixedReturnTypeCoercion */ public static function provideCastMatrix(): Generator { @@ -114,9 +113,6 @@ public function __toString(): string if (!array_key_exists('_class', $expectations) || !is_array($expectations['_class'])) { throw new InvalidArgumentException('Malformed toClass expectations: missing _class map'); } - /** @var array $classMap */ - $classMap = $expectations['_class']; - $row['class'] = $classMap[$inputName] ?? stdClass::class; $classForRow = self::toClassClassNameForInput($expectations, $inputName); } From 79bf155e0df1a69235ac8afa99f3ab6bc29ac40b Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Wed, 6 May 2026 17:44:31 +0530 Subject: [PATCH 12/12] fix(testkit-backend): repair ResultSingle and duplicate returns in ResultList/Peek --- testkit-backend/src/Handlers/ResultList.php | 2 -- testkit-backend/src/Handlers/ResultPeek.php | 2 -- testkit-backend/src/Handlers/ResultSingle.php | 19 ------------------- 3 files changed, 23 deletions(-) diff --git a/testkit-backend/src/Handlers/ResultList.php b/testkit-backend/src/Handlers/ResultList.php index c2fc61dc..7d184eb0 100644 --- a/testkit-backend/src/Handlers/ResultList.php +++ b/testkit-backend/src/Handlers/ResultList.php @@ -89,8 +89,6 @@ public function handle($request): TestkitResponseInterface $this->repository->addRecords($request->getResultId(), $response); return $response; - - return new DriverErrorResponse($request->getResultId(), $wrapped); } throw $e; } diff --git a/testkit-backend/src/Handlers/ResultPeek.php b/testkit-backend/src/Handlers/ResultPeek.php index 7b9c6f60..838a6051 100644 --- a/testkit-backend/src/Handlers/ResultPeek.php +++ b/testkit-backend/src/Handlers/ResultPeek.php @@ -87,8 +87,6 @@ public function handle($request): TestkitResponseInterface $this->repository->addRecords($request->getResultId(), $response); return $response; - - return new DriverErrorResponse($request->getResultId(), $wrapped); } throw $e; } diff --git a/testkit-backend/src/Handlers/ResultSingle.php b/testkit-backend/src/Handlers/ResultSingle.php index 4d054508..5152ec01 100644 --- a/testkit-backend/src/Handlers/ResultSingle.php +++ b/testkit-backend/src/Handlers/ResultSingle.php @@ -49,12 +49,6 @@ public function handle($request): TestkitResponseInterface if ($record instanceof TestkitResponseInterface) { return $record; } - $record = $this->repository->getRecords($request->getResultId()); - if ($record instanceof TestkitResponseInterface) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Statement.ResultNotSingle', 'Something went wrong with the result handling')]); - - return new DriverErrorResponse($request->getResultId(), $err); - } $count = $record->count(); if ($count !== 1) { @@ -67,15 +61,6 @@ public function handle($request): TestkitResponseInterface return $response; } - $count = $record->count(); - if ($count !== 1) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode( - 'Neo.ClientError.Statement.ResultNotSingle', - sprintf('Expected exactly one result row, found %d.', $count) - )]); - - return new DriverErrorResponse($request->getResultId(), $err); - } $values = []; foreach ($record->getAsCypherMap(0) as $value) { @@ -83,10 +68,6 @@ public function handle($request): TestkitResponseInterface } $this->repository->removeRecords($request->getResultId()); - $values = []; - foreach ($record->getAsCypherMap(0) as $value) { - $values[] = CypherObject::autoDetect($value); - } return new RecordResponse($values); } catch (Neo4jException $e) {