diff --git a/README.md b/README.md index 382fa715..04c34b1a 100644 --- a/README.md +++ b/README.md @@ -207,6 +207,8 @@ foreach ($results as $result) { } ``` +`peek()` returns the next row without moving the cursor (or `null` if there is none). It can pull the next record from the server into the buffer if needed, but does not advance past it—unlike `next()` in a `foreach`, which always consumes. Repeated calls to `peek()` return the same value until you advance the iterator (for example with `next()` or by continuing a `foreach`). + Cypher values and types map to these php types and classes: | Cypher | Php | diff --git a/composer.json b/composer.json index 961b76fb..22970a2a 100644 --- a/composer.json +++ b/composer.json @@ -28,7 +28,7 @@ "psr/http-factory": "^1.0", "psr/http-client": "^1.0", "php-http/message": "^1.0", - "stefanak-michal/bolt": "^7.1.4", + "stefanak-michal/bolt": "^7.4", "symfony/polyfill-php80": "^1.2", "psr/simple-cache": ">=2.0", "ext-json": "*", diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index 3f802e14..2a7d77d8 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -71,6 +71,18 @@ class BoltConnection implements ConnectionInterface private ?float $originalTimeout = null; + /** + * When one PULL yields RECORD(s) then FAILURE, {@see assertNoFailure()} runs RESET on the FAILURE before we + * can finish yielding those rows. {@see pull()} therefore defers the {@see Neo4jException} to the next + * {@see BoltResult::fetchResults()} so buffered RECORDs are delivered first (TestKit pull_2_end_error.script). + * + * This is not peek-specific: any consumption path that pulls (including {@see CypherList::peek()} calling + * {@see Iterator::current()} on a lazy {@see BoltResult}) relies on the same ordering. Official Neo4j drivers + * treat peek at end-of-stream as non-throwing (null / absent record); for a FAILURE that arrives after RECORDs + * in the same PULL response, surfacing the error only after those rows matches the same stream semantics. + */ + private ?Neo4jException $deferredPullFailure = null; + /** * @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection} */ @@ -217,6 +229,7 @@ public function consumeResults(): void */ public function reset(): void { + $this->deferredPullFailure = null; $message = $this->messageFactory->createResetMessage(); $response = $message->send()->getResponse(); $this->assertNoFailure($response); @@ -271,6 +284,7 @@ public function run( ?iterable $tsxMetadata, ): array { $extra = $this->buildRunExtra($database, $timeout, $holder, $mode, $tsxMetadata); + $message = $this->messageFactory->createRunMessage($text, $parameters, $extra); $response = $message->send()->getResponse(); $this->assertNoFailure($response); @@ -337,6 +351,23 @@ public function pull(?int $qid, ?int $fetchSize): array return $tbr; } catch (Throwable $e) { $this->restoreOriginalTimeout(); + if ($e instanceof Neo4jException) { + // RECORD(s) then FAILURE in one PULL: RESET already ran in assertNoFailure — return rows and + // defer the exception to the next fetchResults() so the last record is yielded first and no + // extra PULL is sent (pull_2_end_error.script). + if (!empty($tbr)) { + $this->deferredPullFailure = $e; + $tbr[] = ['has_more' => true]; + + /** @var non-empty-list */ + return $tbr; + } + throw $e; + } + // If we've received some records before the disconnect, return them so first next() succeeds. + // Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts). + // Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator + // would stop cleanly instead of surfacing the disconnect. A synthetic has_more:true means "not done". // If we've received some records before the disconnect, return them so first next() succeeds. // Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts). // Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator @@ -363,6 +394,7 @@ public function close(): void // Other exceptions (Neo4jException, TypeError, etc.) should propagate. try { if ($this->isOpen()) { + $this->deferredPullFailure = null; if ($this->isStreaming()) { $this->discardUnconsumedResults(); } @@ -390,11 +422,23 @@ public function close(): void */ public function invalidate(): void { + $this->deferredPullFailure = null; $this->subscribedResults = []; $this->connection->disconnect(); unset($this->boltProtocol); } + /** + * Consumes a FAILURE that was deferred from the previous {@see pull()} (RECORD(s) then FAILURE). + */ + public function takeDeferredPullFailure(): ?Neo4jException + { + $e = $this->deferredPullFailure; + $this->deferredPullFailure = null; + + return $e; + } + private function buildRunExtra(?string $database, ?float $timeout, ?BookmarkHolder $holder, ?AccessMode $mode, ?iterable $metadata): array { $extra = []; @@ -431,6 +475,10 @@ private function buildResultExtra(?int $fetchSize, ?int $qid): array } if ($qid !== null && $qid >= 0) { + // Always send explicit qid (including 0). Omitting qid defaults to the "current" stream; with + // multiple concurrent RUN streams in a transaction, PULL must target the correct stream or Neo4j + // returns e.g. "No such statement: N" (Neo.ClientError.Request.InvalidFormat). The Bolt library's + // openStreams counter is not reliable for this across all server versions and message orderings. $extra['qid'] = $qid; } diff --git a/src/Bolt/BoltResult.php b/src/Bolt/BoltResult.php index ac4e5233..783b28eb 100644 --- a/src/Bolt/BoltResult.php +++ b/src/Bolt/BoltResult.php @@ -145,6 +145,13 @@ private function fetchResults(): void { $this->networkPullOccurred = true; + $deferred = $this->connection->takeDeferredPullFailure(); + if ($deferred !== null) { + throw $deferred; + } + + $this->networkPullOccurred = true; + try { $meta = $this->connection->pull($this->qid, $this->effectivePullSize()); } catch (BoltConnectException|BoltException $e) { diff --git a/src/Bolt/BoltUnmanagedTransaction.php b/src/Bolt/BoltUnmanagedTransaction.php index 8af372ce..80fe4198 100644 --- a/src/Bolt/BoltUnmanagedTransaction.php +++ b/src/Bolt/BoltUnmanagedTransaction.php @@ -13,7 +13,6 @@ namespace Laudis\Neo4j\Bolt; -use Bolt\enum\ServerState; use Laudis\Neo4j\Contracts\ConnectionPoolInterface; use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; use Laudis\Neo4j\Databags\BookmarkHolder; @@ -86,6 +85,8 @@ public function commit(iterable $statements = []): CypherList $this->ensureBeginSent(); + $this->connection->consumeResults(); + // Force the results to pull all the results. // After a commit, the connection will be in the ready state, making it impossible to use PULL $tbr = $this->runStatements($statements)->each(static function (CypherList $list) { @@ -101,6 +102,11 @@ public function commit(iterable $statements = []): CypherList public function rollback(): void { if ($this->isFinished()) { + if ($this->state === TransactionState::TERMINATED) { + // Run/pull already failed; connection may have been RESET — nothing to send. + return; + } + if ($this->state === TransactionState::COMMITTED) { throw new TransactionException("Can't rollback a committed transaction."); } @@ -112,6 +118,14 @@ public function rollback(): void $this->ensureBeginSent(); + // FAILURE on PULL triggers RESET in {@see BoltConnection::assertNoFailure()}; server has no open tx. + // TestKit stubs (e.g. tx_error_on_pull.script) expect no ROLLBACK after that RESET. + if ($this->connection->getServerState() === 'READY') { + $this->state = TransactionState::ROLLED_BACK; + + return; + } + $this->messageFactory->createRollbackMessage()->send(); $this->state = TransactionState::ROLLED_BACK; } @@ -146,8 +160,10 @@ public function runStatement(Statement $statement): SummarizedResult $parameters = ParameterHelper::formatParameters($statement->getParameters(), $this->connection->getProtocol()); $start = microtime(true); - $serverState = $this->connection->protocol()->serverState; - if ($serverState === ServerState::STREAMING) { + // Only drain an outstanding autocommit result (STREAMING). In an explicit transaction (TX_STREAMING) + // several RUN streams may be open; consumeResults() would preload other streams and reorder PULLs + // vs RUN (TestKit tx_pull_1_nested*, Neo4j parallel/nested tx tests). + if ($this->connection->getServerState() === 'STREAMING') { $this->connection->consumeResults(); } @@ -217,7 +233,15 @@ public function isFinished(): bool private function ensureBeginSent(): void { - if ($this->isInstantTransaction || $this->beginSent) { + if ($this->isInstantTransaction) { + return; + } + // FAILURE on PULL triggers RESET in BoltConnection — server is READY with no tx, but we may still + // have beginSent=true (e.g. execute_read retry). Must send BEGIN again before RUN. + if ($this->beginSent && $this->state === TransactionState::ACTIVE && $this->connection->getServerState() === 'READY') { + $this->beginSent = false; + } + if ($this->beginSent) { return; } try { diff --git a/src/Bolt/ConnectionPool.php b/src/Bolt/ConnectionPool.php index 580927d1..1544cafb 100644 --- a/src/Bolt/ConnectionPool.php +++ b/src/Bolt/ConnectionPool.php @@ -112,15 +112,10 @@ public function acquire(SessionConfiguration $config): Generator public function release(ConnectionInterface $connection): void { + // Return a permit only — keep the connection in {@see $activeConnections} so it stays + // pooled for reuse and is still closed by {@see close()}. Removing it here orphaned + // sockets (no GOODBYE on driver close), which breaks TestKit stubs after errors. $this->semaphore->post(); - - foreach ($this->activeConnections as $i => $activeConnection) { - if ($connection === $activeConnection) { - array_splice($this->activeConnections, $i, 1); - - return; - } - } } public function getLogger(): ?Neo4jLogger diff --git a/src/Bolt/ProtocolFactory.php b/src/Bolt/ProtocolFactory.php index 5c03c00e..4f574c0d 100644 --- a/src/Bolt/ProtocolFactory.php +++ b/src/Bolt/ProtocolFactory.php @@ -33,12 +33,11 @@ public function createProtocol(IConnection $connection): V4_4|V5|V5_1|V5_2|V5_3| } $bolt = new Bolt($connection); - // Offer protocol versions from newest to oldest (only 4.4 and above are supported) $bolt->setProtocolVersions('5.4.4', 4.4); $protocol = $bolt->build(); if (!($protocol instanceof V4_4 || $protocol instanceof V5 || $protocol instanceof V5_1 || $protocol instanceof V5_2 || $protocol instanceof V5_3 || $protocol instanceof V5_4)) { - throw new RuntimeException('Client only supports bolt version 4.4 to 5.4'); + throw new RuntimeException('Client only supports Bolt protocol 4.4 through 5.4'); } return $protocol; diff --git a/src/BoltFactory.php b/src/BoltFactory.php index 204e8933..430f6216 100644 --- a/src/BoltFactory.php +++ b/src/BoltFactory.php @@ -30,7 +30,7 @@ use Laudis\Neo4j\Enum\SocketType; /** - * Small wrapper around the bolt library to easily guarantee only bolt version 3 and up will be created and authenticated. + * Small wrapper around the bolt library to create and authenticate Bolt connections (protocol 4.4+). */ class BoltFactory { diff --git a/src/Enum/ConnectionProtocol.php b/src/Enum/ConnectionProtocol.php index e5d68b0e..d816121d 100644 --- a/src/Enum/ConnectionProtocol.php +++ b/src/Enum/ConnectionProtocol.php @@ -13,7 +13,6 @@ namespace Laudis\Neo4j\Enum; -use Bolt\protocol\V3; use Bolt\protocol\V4; use Bolt\protocol\V4_1; use Bolt\protocol\V4_2; @@ -67,7 +66,7 @@ final class ConnectionProtocol extends TypedEnum implements JsonSerializable * * @psalm-suppress ImpureMethodCall */ - public static function determineBoltVersion(V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $bolt): self + public static function determineBoltVersion(V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $bolt): self { $version = self::resolve($bolt->getVersion()); diff --git a/src/Formatter/Specialised/BoltOGMTranslator.php b/src/Formatter/Specialised/BoltOGMTranslator.php index 8959c90a..905e2284 100644 --- a/src/Formatter/Specialised/BoltOGMTranslator.php +++ b/src/Formatter/Specialised/BoltOGMTranslator.php @@ -298,13 +298,13 @@ private function makeFromBoltPath(BoltPath $path): Path foreach ($rels as $rel) { $relationships[] = $this->makeFromBoltUnboundRelationship($rel); } - /** @var list $ids */ - $ids = $path->ids; + /** @var list $indices */ + $indices = $path->indices; return new Path( new CypherList($nodes), new CypherList($relationships), - new CypherList($ids), + new CypherList($indices), ); } diff --git a/src/Types/CypherList.php b/src/Types/CypherList.php index 663e0ca6..3a2a0121 100644 --- a/src/Types/CypherList.php +++ b/src/Types/CypherList.php @@ -308,6 +308,24 @@ public function key(): int return $this->cacheKey(); } + /** + * Returns the next record without consuming it (the iterator position is unchanged). + * + * This may pull and buffer the next row from the server when the result is lazy, the same as + * {@see current()} on a non-empty result. When there is no next record, returns `null` without + * erroring; calling {@see current()} on an exhausted result is not safe. + * + * @return TValue|null + */ + public function peek() + { + if (!$this->valid()) { + return null; + } + + return $this->current(); + } + /** * @return array */ diff --git a/testkit b/testkit index f9e7590b..5fb75922 160000 --- a/testkit +++ b/testkit @@ -1 +1 @@ -Subproject commit f9e7590b44ef983b320fae9adcd0c220b8e02962 +Subproject commit 5fb7592281079df7436f139dd2cbda996b5040d0 diff --git a/testkit-backend/features.php b/testkit-backend/features.php index d7405993..3df9bd4b 100644 --- a/testkit-backend/features.php +++ b/testkit-backend/features.php @@ -99,7 +99,7 @@ // notified when the server reports a token expired. 'Feature:Auth:Managed' => false, // The driver supports Bolt protocol version 3 - 'Feature:Bolt:3.0' => true, + 'Feature:Bolt:3.0' => false, // The driver supports Bolt protocol version 4.1 'Feature:Bolt:4.1' => true, // The driver supports Bolt protocol version 4.2 diff --git a/testkit-backend/src/Handlers/ResultList.php b/testkit-backend/src/Handlers/ResultList.php index c06a306c..7d184eb0 100644 --- a/testkit-backend/src/Handlers/ResultList.php +++ b/testkit-backend/src/Handlers/ResultList.php @@ -67,22 +67,28 @@ public function handle($request): TestkitResponseInterface return new RecordListResponse($rows); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + // Keep error for RetryableNegative lookup by result id (execute_read tx_func tests). + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } diff --git a/testkit-backend/src/Handlers/ResultNext.php b/testkit-backend/src/Handlers/ResultNext.php index d9b99258..c3c98f39 100644 --- a/testkit-backend/src/Handlers/ResultNext.php +++ b/testkit-backend/src/Handlers/ResultNext.php @@ -73,22 +73,25 @@ public function handle($request): TestkitResponseInterface return new RecordResponse($values); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { - $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { - $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } diff --git a/testkit-backend/src/Handlers/ResultPeek.php b/testkit-backend/src/Handlers/ResultPeek.php index 504a5247..838a6051 100644 --- a/testkit-backend/src/Handlers/ResultPeek.php +++ b/testkit-backend/src/Handlers/ResultPeek.php @@ -66,22 +66,27 @@ public function handle($request): TestkitResponseInterface return new RecordResponse($values); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } @@ -101,6 +106,7 @@ private function isConnectionOrSocketError(Throwable $e): bool || str_contains($message, 'network read incomplete') || str_contains($message, 'network write incomplete') || str_contains($message, 'socket') - || str_contains($message, 'broken'); + || str_contains($message, 'broken') + || str_contains($message, 'already been closed'); } } diff --git a/testkit-backend/src/Handlers/ResultSingle.php b/testkit-backend/src/Handlers/ResultSingle.php index 6af2bcce..5152ec01 100644 --- a/testkit-backend/src/Handlers/ResultSingle.php +++ b/testkit-backend/src/Handlers/ResultSingle.php @@ -13,6 +13,7 @@ namespace Laudis\Neo4j\TestkitBackend\Handlers; +use Bolt\error\BoltException; use Laudis\Neo4j\Databags\Neo4jError; use Laudis\Neo4j\Exception\Neo4jException; use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; @@ -22,6 +23,7 @@ use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse; use Laudis\Neo4j\TestkitBackend\Responses\RecordResponse; use Laudis\Neo4j\TestkitBackend\Responses\Types\CypherObject; +use Throwable; /** * Request to expect and return exactly one record in the result stream. @@ -42,28 +44,72 @@ public function __construct( public function handle($request): TestkitResponseInterface { - $record = $this->repository->getRecords($request->getResultId()); - if ($record instanceof TestkitResponseInterface) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Statement.ResultNotSingle', 'Something went wrong with the result handling')]); + try { + $record = $this->repository->getRecords($request->getResultId()); + if ($record instanceof TestkitResponseInterface) { + return $record; + } - return new DriverErrorResponse($request->getResultId(), $err); - } + $count = $record->count(); + if ($count !== 1) { + $err = new Neo4jException([Neo4jError::fromMessageAndCode( + 'Neo.ClientError.Statement.ResultNotSingle', + sprintf('Expected exactly one result row, found %d.', $count) + )]); + $response = new DriverErrorResponse($request->getResultId(), $err); + $this->repository->addRecords($request->getResultId(), $response); - $count = $record->count(); - if ($count !== 1) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode( - 'Neo.ClientError.Statement.ResultNotSingle', - sprintf('Expected exactly one result row, found %d.', $count) - )]); + return $response; + } - return new DriverErrorResponse($request->getResultId(), $err); - } + $values = []; + foreach ($record->getAsCypherMap(0) as $value) { + $values[] = CypherObject::autoDetect($value); + } + + $this->repository->removeRecords($request->getResultId()); + + return new RecordResponse($values); + } catch (Neo4jException $e) { + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); + + return $response; + } catch (BoltException $e) { + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - $values = []; - foreach ($record->getAsCypherMap(0) as $value) { - $values[] = CypherObject::autoDetect($value); + return $response; + } catch (Throwable $e) { + if ($this->isConnectionOrSocketError($e)) { + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); + + return $response; + } + throw $e; } + } + + private function isConnectionOrSocketError(Throwable $e): bool + { + $message = strtolower($e->getMessage()); - return new RecordResponse($values); + return str_contains($message, 'broken pipe') + || str_contains($message, 'connection reset') + || str_contains($message, 'connection refused') + || str_contains($message, 'connection closed') + || str_contains($message, 'connection is closed') + || str_contains($message, 'interrupted system call') + || str_contains($message, 'i/o error') + || str_contains($message, 'network read incomplete') + || str_contains($message, 'network write incomplete') + || str_contains($message, 'socket') + || str_contains($message, 'broken') + || str_contains($message, 'already been closed'); } } diff --git a/testkit-backend/src/Handlers/ResultSingleOptional.php b/testkit-backend/src/Handlers/ResultSingleOptional.php index 89ffa677..cd40f785 100644 --- a/testkit-backend/src/Handlers/ResultSingleOptional.php +++ b/testkit-backend/src/Handlers/ResultSingleOptional.php @@ -69,22 +69,28 @@ public function handle($request): TestkitResponseInterface ['Expected a single record but found multiple records in the stream.'] ); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + // Keep error for RetryableNegative lookup by result id (execute_read tx_func tests). + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } @@ -104,6 +110,7 @@ private function isConnectionOrSocketError(Throwable $e): bool || str_contains($message, 'network read incomplete') || str_contains($message, 'network write incomplete') || str_contains($message, 'socket') - || str_contains($message, 'broken'); + || str_contains($message, 'broken') + || str_contains($message, 'already been closed'); } } diff --git a/testkit-backend/src/Handlers/RetryableNegative.php b/testkit-backend/src/Handlers/RetryableNegative.php index 9481be64..d9c787bd 100644 --- a/testkit-backend/src/Handlers/RetryableNegative.php +++ b/testkit-backend/src/Handlers/RetryableNegative.php @@ -57,35 +57,43 @@ public function handle($request): TestkitResponseInterface return new BackendErrorResponse('Transaction not found '.$transactionId->toRfc4122()); } - try { - $tsx->rollback(); - } catch (Throwable $e) { - // Best-effort rollback: connection may already be broken. Proceed with error response. - $this->logger->debug('Rollback failed during RetryableNegative', ['exception' => $e->getMessage()]); - } - $errorId = $request->getErrorId(); + $resolvedException = null; if ($errorId !== '' && $errorId !== null) { try { $errorUuid = $errorId instanceof Uuid ? $errorId : Uuid::fromString($errorId); $errorResponse = $this->repository->getRecords($errorUuid); - if ($errorResponse instanceof DriverErrorResponse) { - $exception = $errorResponse->getException(); - if ($exception instanceof Neo4jException && $exception->getClassification() === 'TransientError') { - // If the original error was retryable, signal for retry - return new RetryableTryResponse($transactionId); - } - - // Otherwise, return the original error to the frontend - return new DriverErrorResponse($transactionId, $exception); + $resolvedException = $errorResponse->getException(); } } catch (Throwable $e) { - // Invalid errorId or record not found - fall through to generic FrontendError $this->logger->debug('Could not retrieve error for RetryableNegative', ['exception' => $e->getMessage()]); } } + // After FAILURE on PULL, BoltConnection RESETs before throwing Neo4jException — ROLLBACK is invalid on the wire. + $skipRollback = $resolvedException instanceof Neo4jException; + + if (!$skipRollback) { + try { + $tsx->rollback(); + } catch (Throwable $e) { + $this->logger->debug('Rollback failed during RetryableNegative', ['exception' => $e->getMessage()]); + } + } + + if ($resolvedException instanceof Neo4jException) { + if ($resolvedException->getClassification() === 'TransientError') { + return new RetryableTryResponse($transactionId); + } + + return new DriverErrorResponse($transactionId, $resolvedException); + } + + if ($resolvedException !== null) { + return new DriverErrorResponse($transactionId, $resolvedException); + } + // If no specific error was provided or couldn't be retrieved, // client code caused the rollback (e.g. ApplicationCodeError) - return FrontendError return new FrontendErrorResponse('Client code caused transaction to be rolled back'); diff --git a/testkit-backend/src/MainRepository.php b/testkit-backend/src/MainRepository.php index efb19ee6..f7a76633 100644 --- a/testkit-backend/src/MainRepository.php +++ b/testkit-backend/src/MainRepository.php @@ -188,15 +188,26 @@ public function getSession(Uuid $id): SessionInterface */ public function addRecords(Uuid $id, $result): void { - $this->records[$id->toRfc4122()] = $result; + $key = $id->toRfc4122(); + $this->records[$key] = $result; if ($result instanceof SummarizedResult) { /** @var SummarizedResult> $result */ - $this->recordIterators[$id->toRfc4122()] = $result; + $this->recordIterators[$key] = $result; + } else { + unset($this->recordIterators[$key]); } } public function removeRecords(Uuid $id): void { + $key = $id->toRfc4122(); + unset( + $this->records[$key], + $this->recordIterators[$key], + $this->iteratorFetchedFirst[$key], + $this->peekPrimed[$key], + $this->pendingIteratorNextCount[$key] + ); $key = $id->toRfc4122(); unset($this->records[$key], $this->iteratorFetchedFirst[$key], $this->peekPrimed[$key], $this->pendingIteratorNextCount[$key]); } diff --git a/testkit-backend/src/Socket.php b/testkit-backend/src/Socket.php index a01133a4..683cf5bc 100644 --- a/testkit-backend/src/Socket.php +++ b/testkit-backend/src/Socket.php @@ -90,11 +90,15 @@ public static function fromAddressAndPort(string $address, int $port): self public function reset(): void { - if ($this->socket !== null && !stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR)) { - throw new RuntimeException(json_encode(error_get_last(), JSON_THROW_ON_ERROR)); + if ($this->socket === null) { + return; } + $sock = $this->socket; $this->socket = null; + if (is_resource($sock)) { + @stream_socket_shutdown($sock, STREAM_SHUT_RDWR); + } } public function readMessage(): ?string diff --git a/testkit-backend/testkit.sh b/testkit-backend/testkit.sh index 3ac85647..5caa2995 100755 --- a/testkit-backend/testkit.sh +++ b/testkit-backend/testkit.sh @@ -163,29 +163,56 @@ python3 -m unittest -v \ tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_0_records \ tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_1_records \ tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_2_records \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_disconnect \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_failure \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_failure_tx_run \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_failure_tx_func_run \ tests.stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once \ tests.stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_run_result_list_pulls_all_records_at_once \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_run_result_list_pulls_all_records_at_once_next_before_list \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_func_result_list_pulls_all_records_at_once \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_func_result_list_pulls_all_records_at_once_next_before_list \ \ tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_0_records \ tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_1_records \ tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_2_records \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_disconnect \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_failure \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_failure_tx_run \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_failure_tx_func_run \ \ tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_0_records \ tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_1_records \ tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_2_records \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_disconnect \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_failure \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_failure_tx_run \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_failure_tx_func_run \ \ tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_0_records \ tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_1_records \ tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_2_records \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_disconnect \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure_tx_run \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure_tx_func_run \ \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_full_batch \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_half_batch \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_empty_batch \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_error \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all \ -\ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch \ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all \ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all_slow_connection \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_discards_on_session_close \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested_using_list \ +\ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all_slow_connection \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested_using_list \ EXIT_CODE=$? diff --git a/tests/Unit/BoltConnectionPoolTest.php b/tests/Unit/BoltConnectionPoolTest.php index 987ce985..d4de3e53 100644 --- a/tests/Unit/BoltConnectionPoolTest.php +++ b/tests/Unit/BoltConnectionPoolTest.php @@ -122,7 +122,9 @@ public function testReleaseReference(): void $this->pool->release($connection); - static::assertEquals($refCount - 1, $this->refCount($connection)); + // Release returns the permit but keeps the connection in the pool for reuse and for + // {@see ConnectionPool::close()} — it must not drop the pool's reference. + static::assertEquals($refCount, $this->refCount($connection)); } /** diff --git a/tests/Unit/CypherListPeekTest.php b/tests/Unit/CypherListPeekTest.php new file mode 100644 index 00000000..80cf9164 --- /dev/null +++ b/tests/Unit/CypherListPeekTest.php @@ -0,0 +1,59 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\Tests\Unit; + +use Laudis\Neo4j\Types\CypherList; +use Laudis\Neo4j\Types\CypherMap; +use PHPUnit\Framework\TestCase; + +final class CypherListPeekTest extends TestCase +{ + public function testPeekReturnsNullOnEmptyResult(): void + { + $list = new CypherList([]); + + self::assertFalse($list->valid()); + self::assertNull($list->peek()); + } + + public function testPeekMatchesCurrentAndDoesNotAdvance(): void + { + $list = new CypherList([ + new CypherMap(['n' => 1]), + new CypherMap(['n' => 2]), + ]); + + $a = $list->peek(); + $b = $list->peek(); + $c = $list->current(); + + self::assertNotNull($a); + self::assertSame($a, $b); + self::assertSame($a, $c); + self::assertSame(1, $a->get('n')); + + $list->next(); + + self::assertSame(2, $list->peek()?->get('n')); + } + + public function testPeekReturnsNullAfterAllRowsConsumed(): void + { + $list = new CypherList([new CypherMap(['x' => true])]); + foreach ($list as $_) { + } + self::assertFalse($list->valid()); + self::assertNull($list->peek()); + } +} diff --git a/tests/Unit/TypeCasterTest.php b/tests/Unit/TypeCasterTest.php index 20a767eb..d6f53e05 100644 --- a/tests/Unit/TypeCasterTest.php +++ b/tests/Unit/TypeCasterTest.php @@ -33,9 +33,11 @@ final class TypeCasterTest extends TestCase * Complete coverage: every input type × every cast method. * When a cast isn't possible, expected is null (invalid case). * - * Yields argument lists for {@see testCastMatrix} in order: input, method, expected, class (null unless toClass). + * Yields argument lists for {@see testCastMatrix}: input, method, expected, class (null unless toClass). * * @return Generator + * + * @psalm-suppress MixedReturnTypeCoercion */ public static function provideCastMatrix(): Generator { @@ -94,7 +96,8 @@ public function __toString(): string foreach ($inputs as $inputName => $inputValue) { foreach ($matrix as $method => $expectations) { - $expected = self::expectedValueForInput($expectations, $inputName); + /** @psalm-suppress MixedAssignment */ + $expected = $expectations[$inputName] ?? null; $key = $inputName.'->'.$method; // Generator must be fresh per test (consumable once) @@ -107,6 +110,9 @@ public function __toString(): string $classForRow = null; if ($method === 'toClass') { + if (!array_key_exists('_class', $expectations) || !is_array($expectations['_class'])) { + throw new InvalidArgumentException('Malformed toClass expectations: missing _class map'); + } $classForRow = self::toClassClassNameForInput($expectations, $inputName); }