diff --git a/README.md b/README.md index 382fa715..04c34b1a 100644 --- a/README.md +++ b/README.md @@ -207,6 +207,8 @@ foreach ($results as $result) { } ``` +`peek()` returns the next row without moving the cursor (or `null` if there is none). It can pull the next record from the server into the buffer if needed, but does not advance past it—unlike `next()` in a `foreach`, which always consumes. Repeated calls to `peek()` return the same value until you advance the iterator (for example with `next()` or by continuing a `foreach`). + Cypher values and types map to these php types and classes: | Cypher | Php | diff --git a/composer.json b/composer.json index 961b76fb..22970a2a 100644 --- a/composer.json +++ b/composer.json @@ -28,7 +28,7 @@ "psr/http-factory": "^1.0", "psr/http-client": "^1.0", "php-http/message": "^1.0", - "stefanak-michal/bolt": "^7.1.4", + "stefanak-michal/bolt": "^7.4", "symfony/polyfill-php80": "^1.2", "psr/simple-cache": ">=2.0", "ext-json": "*", diff --git a/src/Authentication/BasicAuth.php b/src/Authentication/BasicAuth.php index 3cc71d83..0f2751e1 100644 --- a/src/Authentication/BasicAuth.php +++ b/src/Authentication/BasicAuth.php @@ -35,7 +35,7 @@ public function __construct( /** * @throws Exception * - * @return array{server: string, connection_id: string, hints: list} + * @return array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ public function authenticateBolt(BoltConnection $connection, string $userAgent): array { @@ -43,7 +43,7 @@ public function authenticateBolt(BoltConnection $connection, string $userAgent): $protocol = $connection->protocol(); if (method_exists($protocol, 'logon')) { - $helloMetadata = ['user_agent' => $userAgent]; + $helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, ['user_agent' => $userAgent]); $responseHello = $factory->createHelloMessage($helloMetadata)->send()->getResponse(); @@ -55,20 +55,20 @@ public function authenticateBolt(BoltConnection $connection, string $userAgent): $response = $factory->createLogonMessage($credentials)->send()->getResponse(); - /** @var array{server: string, connection_id: string, hints: list} */ + /** @var array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ return array_merge($responseHello->content, $response->content); } - $helloMetadata = [ + $helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, [ 'user_agent' => $userAgent, 'scheme' => 'basic', 'principal' => $this->username, 'credentials' => $this->password, - ]; + ]); $response = $factory->createHelloMessage($helloMetadata)->send()->getResponse(); - /** @var array{server: string, connection_id: string, hints: list} */ + /** @var array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ return $response->content; } diff --git a/src/Authentication/BoltHelloMetadata.php b/src/Authentication/BoltHelloMetadata.php new file mode 100644 index 00000000..417ed4ba --- /dev/null +++ b/src/Authentication/BoltHelloMetadata.php @@ -0,0 +1,39 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\Authentication; + +use Laudis\Neo4j\Bolt\BoltConnection; + +/** + * Merges Bolt HELLO fields required for Neo4j 4.3+ temporal UTC patch negotiation. + */ +final class BoltHelloMetadata +{ + /** + * @param array $metadata + * + * @return array + */ + public static function withUtcPatchIfSupported(BoltConnection $connection, array $metadata): array + { + // Neo4j 4.3–4.4: optional UTC temporal patch. Bolt 5+ TestKit scripts expect HELLO without patch_bolt. + // Compare against "5" (not "5.0"): bolt library getVersion() returns "5" for class V5. + $v = $connection->protocol()->getVersion(); + if (version_compare($v, '4.3', '>=') && version_compare($v, '5', '<')) { + $metadata['patch_bolt'] = ['utc']; + } + + return $metadata; + } +} diff --git a/src/Authentication/KerberosAuth.php b/src/Authentication/KerberosAuth.php index 7b9f32a6..5040b7a6 100644 --- a/src/Authentication/KerberosAuth.php +++ b/src/Authentication/KerberosAuth.php @@ -37,7 +37,7 @@ public function __construct( /** * @throws Exception * - * @return array{server: string, connection_id: string, hints: list} + * @return array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ public function authenticateBolt(BoltConnection $connection, string $userAgent): array { @@ -45,7 +45,9 @@ public function authenticateBolt(BoltConnection $connection, string $userAgent): $this->logger?->log(LogLevel::DEBUG, 'HELLO', ['user_agent' => $userAgent]); - $factory->createHelloMessage(['user_agent' => $userAgent])->send()->getResponse(); + $helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, ['user_agent' => $userAgent]); + + $factory->createHelloMessage($helloMetadata)->send()->getResponse(); $this->logger?->log(LogLevel::DEBUG, 'LOGON', ['scheme' => 'kerberos', 'principal' => '']); @@ -56,7 +58,7 @@ public function authenticateBolt(BoltConnection $connection, string $userAgent): ])->send()->getResponse(); /** - * @var array{server: string, connection_id: string, hints: list} + * @var array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ return $response->content; } diff --git a/src/Authentication/NoAuth.php b/src/Authentication/NoAuth.php index 82210aee..868b42bf 100644 --- a/src/Authentication/NoAuth.php +++ b/src/Authentication/NoAuth.php @@ -33,31 +33,31 @@ public function __construct( /** * @throws Exception * - * @return array{server: string, connection_id: string, hints: list} + * @return array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ public function authenticateBolt(BoltConnection $connection, string $userAgent): array { $factory = $this->createMessageFactory($connection); if ($connection->getProtocol()->compare(ConnectionProtocol::BOLT_V5_1()) >= 0) { - $helloMetadata = ['user_agent' => $userAgent]; + $helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, ['user_agent' => $userAgent]); $factory->createHelloMessage($helloMetadata)->send()->getResponse(); $response = $factory->createLogonMessage(['scheme' => 'none'])->send()->getResponse(); - /** @var array{server: string, connection_id: string, hints: list} */ + /** @var array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ return $response->content; } - $helloMetadata = [ + $helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, [ 'user_agent' => $userAgent, 'scheme' => 'none', - ]; + ]); $response = $factory->createHelloMessage($helloMetadata)->send()->getResponse(); - /** @var array{server: string, connection_id: string, hints: list} */ + /** @var array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ return $response->content; } diff --git a/src/Authentication/OpenIDConnectAuth.php b/src/Authentication/OpenIDConnectAuth.php index 947bbcd6..51cab591 100644 --- a/src/Authentication/OpenIDConnectAuth.php +++ b/src/Authentication/OpenIDConnectAuth.php @@ -43,7 +43,7 @@ public function authenticateHttp(RequestInterface $request, UriInterface $uri, s /** * @throws Exception * - * @return array{server: string, connection_id: string, hints: list} + * @return array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ public function authenticateBolt(BoltConnection $connection, string $userAgent): array { @@ -51,7 +51,9 @@ public function authenticateBolt(BoltConnection $connection, string $userAgent): $this->logger?->log(LogLevel::DEBUG, 'HELLO', ['user_agent' => $userAgent]); - $factory->createHelloMessage(['user_agent' => $userAgent])->send()->getResponse(); + $helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, ['user_agent' => $userAgent]); + + $factory->createHelloMessage($helloMetadata)->send()->getResponse(); $this->logger?->log(LogLevel::DEBUG, 'LOGON', ['scheme' => 'bearer']); @@ -61,7 +63,7 @@ public function authenticateBolt(BoltConnection $connection, string $userAgent): ])->send()->getResponse(); /** - * @var array{server: string, connection_id: string, hints: list} + * @var array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ return $response->content; } diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index 3f802e14..a1a9bd37 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -17,6 +17,8 @@ use Bolt\enum\Signature; use Bolt\error\ConnectException as BoltConnectException; use Bolt\protocol\Response; +use Bolt\protocol\V4_2; +use Bolt\protocol\V4_3; use Bolt\protocol\V4_4; use Bolt\protocol\V5; use Bolt\protocol\V5_1; @@ -44,7 +46,7 @@ use WeakReference; /** - * @implements ConnectionInterface + * @implements ConnectionInterface * * @psalm-import-type BoltMeta from SummarizedResultFormatter */ @@ -72,7 +74,21 @@ class BoltConnection implements ConnectionInterface private ?float $originalTimeout = null; /** - * @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection} + * When one PULL yields RECORD(s) then FAILURE, {@see assertNoFailure()} runs RESET on the FAILURE before we + * can finish yielding those rows. {@see pull()} therefore defers the {@see Neo4jException} to the next + * {@see BoltResult::fetchResults()} so buffered RECORDs are delivered first (TestKit pull_2_end_error.script). + * + * This is not peek-specific: any consumption path that pulls (including {@see CypherList::peek()} calling + * {@see Iterator::current()} on a lazy {@see BoltResult}) relies on the same ordering. Official Neo4j drivers + * treat peek at end-of-stream as non-throwing (null / absent record); for a FAILURE that arrives after RECORDs + * in the same PULL response, surfacing the error only after those rows matches the same stream semantics. + */ + private ?Neo4jException $deferredPullFailure = null; + + private bool $boltUtcPatchNegotiated = false; + + /** + * @return array{0: V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection} */ public function getImplementation(): array { @@ -83,7 +99,7 @@ public function getImplementation(): array * @psalm-mutation-free */ public function __construct( - private V4_4|V5|V5_1|V5_2|V5_3|V5_4|null $boltProtocol, + private V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4|null $boltProtocol, private readonly Connection $connection, private readonly AuthenticateInterface $auth, private readonly string $userAgent, @@ -132,6 +148,16 @@ public function getProtocol(): ConnectionProtocol return $this->config->getProtocol(); } + public function setBoltUtcPatchNegotiated(bool $negotiated): void + { + $this->boltUtcPatchNegotiated = $negotiated; + } + + public function isBoltUtcPatchNegotiated(): bool + { + return $this->boltUtcPatchNegotiated; + } + /** * @psalm-mutation-free */ @@ -217,6 +243,7 @@ public function consumeResults(): void */ public function reset(): void { + $this->deferredPullFailure = null; $message = $this->messageFactory->createResetMessage(); $response = $message->send()->getResponse(); $this->assertNoFailure($response); @@ -271,6 +298,7 @@ public function run( ?iterable $tsxMetadata, ): array { $extra = $this->buildRunExtra($database, $timeout, $holder, $mode, $tsxMetadata); + $message = $this->messageFactory->createRunMessage($text, $parameters, $extra); $response = $message->send()->getResponse(); $this->assertNoFailure($response); @@ -293,7 +321,7 @@ public function rollback(): void $this->assertNoFailure($response); } - public function protocol(): V4_4|V5|V5_1|V5_2|V5_3|V5_4 + public function protocol(): V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 { if (!isset($this->boltProtocol)) { throw new Exception('Connection is closed'); @@ -337,6 +365,23 @@ public function pull(?int $qid, ?int $fetchSize): array return $tbr; } catch (Throwable $e) { $this->restoreOriginalTimeout(); + if ($e instanceof Neo4jException) { + // RECORD(s) then FAILURE in one PULL: RESET already ran in assertNoFailure — return rows and + // defer the exception to the next fetchResults() so the last record is yielded first and no + // extra PULL is sent (pull_2_end_error.script). + if (!empty($tbr)) { + $this->deferredPullFailure = $e; + $tbr[] = ['has_more' => true]; + + /** @var non-empty-list */ + return $tbr; + } + throw $e; + } + // If we've received some records before the disconnect, return them so first next() succeeds. + // Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts). + // Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator + // would stop cleanly instead of surfacing the disconnect. A synthetic has_more:true means "not done". // If we've received some records before the disconnect, return them so first next() succeeds. // Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts). // Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator @@ -363,6 +408,7 @@ public function close(): void // Other exceptions (Neo4jException, TypeError, etc.) should propagate. try { if ($this->isOpen()) { + $this->deferredPullFailure = null; if ($this->isStreaming()) { $this->discardUnconsumedResults(); } @@ -390,11 +436,23 @@ public function close(): void */ public function invalidate(): void { + $this->deferredPullFailure = null; $this->subscribedResults = []; $this->connection->disconnect(); unset($this->boltProtocol); } + /** + * Consumes a FAILURE that was deferred from the previous {@see pull()} (RECORD(s) then FAILURE). + */ + public function takeDeferredPullFailure(): ?Neo4jException + { + $e = $this->deferredPullFailure; + $this->deferredPullFailure = null; + + return $e; + } + private function buildRunExtra(?string $database, ?float $timeout, ?BookmarkHolder $holder, ?AccessMode $mode, ?iterable $metadata): array { $extra = []; @@ -431,6 +489,10 @@ private function buildResultExtra(?int $fetchSize, ?int $qid): array } if ($qid !== null && $qid >= 0) { + // Always send explicit qid (including 0). Omitting qid defaults to the "current" stream; with + // multiple concurrent RUN streams in a transaction, PULL must target the correct stream or Neo4j + // returns e.g. "No such statement: N" (Neo.ClientError.Request.InvalidFormat). The Bolt library's + // openStreams counter is not reliable for this across all server versions and message orderings. $extra['qid'] = $qid; } diff --git a/src/Bolt/BoltResult.php b/src/Bolt/BoltResult.php index ac4e5233..783b28eb 100644 --- a/src/Bolt/BoltResult.php +++ b/src/Bolt/BoltResult.php @@ -145,6 +145,13 @@ private function fetchResults(): void { $this->networkPullOccurred = true; + $deferred = $this->connection->takeDeferredPullFailure(); + if ($deferred !== null) { + throw $deferred; + } + + $this->networkPullOccurred = true; + try { $meta = $this->connection->pull($this->qid, $this->effectivePullSize()); } catch (BoltConnectException|BoltException $e) { diff --git a/src/Bolt/BoltUnmanagedTransaction.php b/src/Bolt/BoltUnmanagedTransaction.php index 8af372ce..644e38a5 100644 --- a/src/Bolt/BoltUnmanagedTransaction.php +++ b/src/Bolt/BoltUnmanagedTransaction.php @@ -13,7 +13,6 @@ namespace Laudis\Neo4j\Bolt; -use Bolt\enum\ServerState; use Laudis\Neo4j\Contracts\ConnectionPoolInterface; use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; use Laudis\Neo4j\Databags\BookmarkHolder; @@ -86,7 +85,12 @@ public function commit(iterable $statements = []): CypherList $this->ensureBeginSent(); - // Force the results to pull all the results. + // Drain subscribed Bolt streams before COMMIT. Rows that fail OGM mapping are surfaced as + // RowDecodeFailure in the formatter so preload can advance without throwing (TestKit + // unknown-then-known temporal). + $this->connection->consumeResults(); + + // Run any extra statements, then commit. // After a commit, the connection will be in the ready state, making it impossible to use PULL $tbr = $this->runStatements($statements)->each(static function (CypherList $list) { $list->preload(); @@ -101,6 +105,11 @@ public function commit(iterable $statements = []): CypherList public function rollback(): void { if ($this->isFinished()) { + if ($this->state === TransactionState::TERMINATED) { + // Run/pull already failed; connection may have been RESET — nothing to send. + return; + } + if ($this->state === TransactionState::COMMITTED) { throw new TransactionException("Can't rollback a committed transaction."); } @@ -110,8 +119,24 @@ public function rollback(): void } } + // FAILURE on PULL triggers RESET in {@see BoltConnection::assertNoFailure()}; server has no open tx. + // Must run before {@see ensureBeginSent()}: otherwise we would send BEGIN then ROLLBACK (tx_error_on_pull). + if ($this->connection->getServerState() === 'READY') { + $this->beginSent = false; + $this->state = TransactionState::ROLLED_BACK; + + return; + } + $this->ensureBeginSent(); + if ($this->connection->getServerState() === 'READY') { + $this->beginSent = false; + $this->state = TransactionState::ROLLED_BACK; + + return; + } + $this->messageFactory->createRollbackMessage()->send(); $this->state = TransactionState::ROLLED_BACK; } @@ -143,11 +168,17 @@ public function run(string $statement, iterable $parameters = []): SummarizedRes */ public function runStatement(Statement $statement): SummarizedResult { - $parameters = ParameterHelper::formatParameters($statement->getParameters(), $this->connection->getProtocol()); + $parameters = ParameterHelper::formatParameters( + $statement->getParameters(), + $this->connection->getProtocol(), + $this->connection->isBoltUtcPatchNegotiated() + ); $start = microtime(true); - $serverState = $this->connection->protocol()->serverState; - if ($serverState === ServerState::STREAMING) { + // Only drain an outstanding autocommit result (STREAMING). In an explicit transaction (TX_STREAMING) + // several RUN streams may be open; consumeResults() would preload other streams and reorder PULLs + // vs RUN (TestKit tx_pull_1_nested*, Neo4j parallel/nested tx tests). + if ($this->connection->getServerState() === 'STREAMING') { $this->connection->consumeResults(); } @@ -217,7 +248,15 @@ public function isFinished(): bool private function ensureBeginSent(): void { - if ($this->isInstantTransaction || $this->beginSent) { + if ($this->isInstantTransaction) { + return; + } + // FAILURE on PULL triggers RESET in BoltConnection — server is READY with no tx, but we may still + // have beginSent=true (e.g. execute_read retry). Must send BEGIN again before RUN. + if ($this->beginSent && $this->state === TransactionState::ACTIVE && $this->connection->getServerState() === 'READY') { + $this->beginSent = false; + } + if ($this->beginSent) { return; } try { diff --git a/src/Bolt/ConnectionPool.php b/src/Bolt/ConnectionPool.php index 580927d1..1544cafb 100644 --- a/src/Bolt/ConnectionPool.php +++ b/src/Bolt/ConnectionPool.php @@ -112,15 +112,10 @@ public function acquire(SessionConfiguration $config): Generator public function release(ConnectionInterface $connection): void { + // Return a permit only — keep the connection in {@see $activeConnections} so it stays + // pooled for reuse and is still closed by {@see close()}. Removing it here orphaned + // sockets (no GOODBYE on driver close), which breaks TestKit stubs after errors. $this->semaphore->post(); - - foreach ($this->activeConnections as $i => $activeConnection) { - if ($connection === $activeConnection) { - array_splice($this->activeConnections, $i, 1); - - return; - } - } } public function getLogger(): ?Neo4jLogger diff --git a/src/Bolt/ProtocolFactory.php b/src/Bolt/ProtocolFactory.php index 5c03c00e..6ccabd85 100644 --- a/src/Bolt/ProtocolFactory.php +++ b/src/Bolt/ProtocolFactory.php @@ -15,6 +15,8 @@ use Bolt\Bolt; use Bolt\connection\IConnection; +use Bolt\protocol\V4_2; +use Bolt\protocol\V4_3; use Bolt\protocol\V4_4; use Bolt\protocol\V5; use Bolt\protocol\V5_1; @@ -25,7 +27,7 @@ class ProtocolFactory { - public function createProtocol(IConnection $connection): V4_4|V5|V5_1|V5_2|V5_3|V5_4 + public function createProtocol(IConnection $connection): V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 { $boltOptoutEnv = getenv('BOLT_ANALYTICS_OPTOUT'); if ($boltOptoutEnv === false) { @@ -33,12 +35,12 @@ public function createProtocol(IConnection $connection): V4_4|V5|V5_1|V5_2|V5_3| } $bolt = new Bolt($connection); - // Offer protocol versions from newest to oldest (only 4.4 and above are supported) - $bolt->setProtocolVersions('5.4.4', 4.4); + // Four Bolt version suggestions (library maximum); include 4.2/4.3 for TestKit stubs and older servers. + $bolt->setProtocolVersions('5.4.4', '4.4.4', '4.3.3', '4.2.2'); $protocol = $bolt->build(); - if (!($protocol instanceof V4_4 || $protocol instanceof V5 || $protocol instanceof V5_1 || $protocol instanceof V5_2 || $protocol instanceof V5_3 || $protocol instanceof V5_4)) { - throw new RuntimeException('Client only supports bolt version 4.4 to 5.4'); + if (!($protocol instanceof V4_2 || $protocol instanceof V4_3 || $protocol instanceof V4_4 || $protocol instanceof V5 || $protocol instanceof V5_1 || $protocol instanceof V5_2 || $protocol instanceof V5_3 || $protocol instanceof V5_4)) { + throw new RuntimeException('Client only supports Bolt protocol 4.2 through 5.4'); } return $protocol; diff --git a/src/BoltFactory.php b/src/BoltFactory.php index 204e8933..42fb017d 100644 --- a/src/BoltFactory.php +++ b/src/BoltFactory.php @@ -30,7 +30,7 @@ use Laudis\Neo4j\Enum\SocketType; /** - * Small wrapper around the bolt library to easily guarantee only bolt version 3 and up will be created and authenticated. + * Small wrapper around the bolt library to create and authenticate Bolt connections (protocol 4.2+). */ class BoltFactory { @@ -81,6 +81,11 @@ public function createConnection(ConnectionRequestData $data, SessionConfigurati $response = $data->getAuth()->authenticateBolt($connection, $data->getUserAgent()); + $patchBolt = $response['patch_bolt'] ?? null; + $connection->setBoltUtcPatchNegotiated( + is_array($patchBolt) && in_array('utc', $patchBolt, true) + ); + $config->setServerAgent($response['server'] ?? ''); // Timeout precedence: 1) driver config, 2) server hint, 3) default 30s. diff --git a/src/Contracts/AuthenticateInterface.php b/src/Contracts/AuthenticateInterface.php index 0e3d4433..b8c3fa7e 100644 --- a/src/Contracts/AuthenticateInterface.php +++ b/src/Contracts/AuthenticateInterface.php @@ -21,7 +21,7 @@ interface AuthenticateInterface /** * Authenticates a Bolt connection with the provided configuration Uri and userAgent. * - * @return array{server: string, connection_id: string, hints: list} + * @return array{server: string, connection_id: string, hints: list, patch_bolt?: list} */ public function authenticateBolt(BoltConnection $connection, string $userAgent): array; diff --git a/src/Databags/SummarizedResult.php b/src/Databags/SummarizedResult.php index a140f4a1..aae3433c 100644 --- a/src/Databags/SummarizedResult.php +++ b/src/Databags/SummarizedResult.php @@ -15,16 +15,18 @@ use Closure; use Generator; +use Laudis\Neo4j\Formatter\RowDecodeFailure; use Laudis\Neo4j\Formatter\SummarizedResultFormatter; use Laudis\Neo4j\Types\CypherList; use Laudis\Neo4j\Types\CypherMap; +use OutOfBoundsException; /** * A result containing the values and the summary. * * @psalm-import-type OGMTypes from SummarizedResultFormatter * - * @extends CypherList> + * @extends CypherList|RowDecodeFailure> */ final class SummarizedResult extends CypherList { @@ -49,9 +51,9 @@ final class SummarizedResult extends CypherList /** * @psalm-mutation-free * - * @param iterable>|callable():Generator> $iterable - * @param list $keys - * @param (Closure():void)|null $prepareListFetchAll + * @param iterable|RowDecodeFailure>|callable():Generator|RowDecodeFailure> $iterable + * @param list $keys + * @param (Closure():void)|null $prepareListFetchAll */ public function __construct(?ResultSummary &$summary, iterable|callable $iterable = [], array $keys = [], ?Closure $prepareListFetchAll = null, ?object $boltResultRef = null) { @@ -80,7 +82,7 @@ public function getSummary(): ResultSummary */ public function getResults(): CypherList { - return new CypherList($this); + return new CypherList(array_values($this->toArray())); } /** @@ -122,4 +124,101 @@ public function list(): array return $rows; } + + public function current(): CypherMap + { + $value = parent::current(); + + if ($value instanceof RowDecodeFailure) { + throw $value->exception; + } + + return $value; + } + + /** + * @internal testKit backend: same as {@see parent::current()} but keeps {@see RowDecodeFailure} rows + * so the client can map them to {@see \Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse} + * without losing the underlying Bolt row cursor + */ + public function currentAllowingDecodeFailures(): mixed + { + return parent::current(); + } + + public function peek(): ?CypherMap + { + if (!$this->valid()) { + return null; + } + + return $this->current(); + } + + public function first(): CypherMap + { + foreach ($this as $value) { + return $value; + } + + throw new OutOfBoundsException('Cannot grab first element of an empty list'); + } + + public function last(): CypherMap + { + $rows = array_values($this->toArray()); + if ($rows === []) { + throw new OutOfBoundsException('Cannot grab last element of an empty list'); + } + + return $rows[count($rows) - 1]; + } + + /** + * @param callable(CypherMap, int):void $callable + * + * @return $this + */ + public function each(callable $callable): self + { + foreach ($this as $key => $value) { + $callable($value, $key); + } + + return $this; + } + + /** + * @return array> + */ + public function toArray(): array + { + $this->preload(); + + /** @var array> $out */ + $out = []; + foreach ($this->cache as $i => $value) { + if ($value instanceof RowDecodeFailure) { + throw $value->exception; + } + $out[$i] = $value; + } + + return $out; + } + + public function offsetGet(mixed $offset): CypherMap + { + $value = parent::offsetGet($offset); + if ($value instanceof RowDecodeFailure) { + throw $value->exception; + } + + return $value; + } + + public function get(int $key): CypherMap + { + return $this->offsetGet($key); + } } diff --git a/src/Enum/ConnectionProtocol.php b/src/Enum/ConnectionProtocol.php index e5d68b0e..d816121d 100644 --- a/src/Enum/ConnectionProtocol.php +++ b/src/Enum/ConnectionProtocol.php @@ -13,7 +13,6 @@ namespace Laudis\Neo4j\Enum; -use Bolt\protocol\V3; use Bolt\protocol\V4; use Bolt\protocol\V4_1; use Bolt\protocol\V4_2; @@ -67,7 +66,7 @@ final class ConnectionProtocol extends TypedEnum implements JsonSerializable * * @psalm-suppress ImpureMethodCall */ - public static function determineBoltVersion(V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $bolt): self + public static function determineBoltVersion(V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $bolt): self { $version = self::resolve($bolt->getVersion()); diff --git a/src/Formatter/RowDecodeFailure.php b/src/Formatter/RowDecodeFailure.php new file mode 100644 index 00000000..f25b64dd --- /dev/null +++ b/src/Formatter/RowDecodeFailure.php @@ -0,0 +1,28 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\Formatter; + +use Laudis\Neo4j\Exception\Neo4jException; + +/** + * @internal Placeholder row when a RECORD cannot be mapped to OGM (e.g. unknown IANA zone id in a temporal). + * The underlying {@see \Laudis\Neo4j\Bolt\BoltResult} cursor is still advanced so further rows can be read. + */ +final class RowDecodeFailure +{ + public function __construct( + public readonly Neo4jException $exception, + ) { + } +} diff --git a/src/Formatter/Specialised/BoltOGMTranslator.php b/src/Formatter/Specialised/BoltOGMTranslator.php index 8959c90a..a351a1aa 100644 --- a/src/Formatter/Specialised/BoltOGMTranslator.php +++ b/src/Formatter/Specialised/BoltOGMTranslator.php @@ -26,8 +26,13 @@ use Bolt\protocol\v1\structures\Relationship as BoltRelationship; use Bolt\protocol\v1\structures\Time as BoltTime; use Bolt\protocol\v1\structures\UnboundRelationship as BoltUnboundRelationship; +use Bolt\protocol\v5\structures\DateTimeZoneId as BoltV5DateTimeZoneId; use Bolt\protocol\v6\structures\Vector as BoltVector; +use DateTimeImmutable; +use DateTimeZone; +use Laudis\Neo4j\Databags\Neo4jError; use Laudis\Neo4j\Enum\VectorTypeMarker; +use Laudis\Neo4j\Exception\Neo4jException; use Laudis\Neo4j\Formatter\SummarizedResultFormatter; use Laudis\Neo4j\Types\Abstract3DPoint; use Laudis\Neo4j\Types\AbstractPoint; @@ -49,6 +54,7 @@ use Laudis\Neo4j\Types\Vector; use Laudis\Neo4j\Types\WGS843DPoint; use Laudis\Neo4j\Types\WGS84Point; +use Throwable; use UnexpectedValueException; /** @@ -138,8 +144,35 @@ private function makeFromBoltLocalDateTime(BoltLocalDateTime $time): LocalDateTi private function makeBoltTimezoneIdentifier(BoltDateTimeZoneId $time): DateTimeZoneId { - /** @var non-empty-string $tzId */ $tzId = $time->tz_id; + if ($tzId === '') { + $msg = 'Time zone ID must not be empty'; + throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Statement.TypeError', $msg)]); + } + + try { + $tz = new DateTimeZone($tzId); + } catch (Throwable $e) { + $msg = sprintf('Unknown time zone ID: %s', $time->tz_id); + throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Statement.TypeError', $msg)], $e); + } + + if ($time instanceof BoltV5DateTimeZoneId) { + $micros = intdiv($time->nanoseconds, 1000); + $utc = (new DateTimeImmutable('@'.$time->seconds))->modify(sprintf('%+d microseconds', $micros)); + if ($utc === false) { + throw new UnexpectedValueException('Expected DateTimeImmutable'); + } + /** @psalm-suppress ImpureMethodCall */ + $inZone = $utc->setTimezone($tz); + + /** @psalm-suppress ImpureMethodCall */ + return new DateTimeZoneId( + DateTimeZoneId::encodeBoltCivilSecondsForInstant($inZone, $tz), + $time->nanoseconds, + $tzId + ); + } return new DateTimeZoneId($time->seconds, $time->nanoseconds, $tzId); } @@ -298,13 +331,13 @@ private function makeFromBoltPath(BoltPath $path): Path foreach ($rels as $rel) { $relationships[] = $this->makeFromBoltUnboundRelationship($rel); } - /** @var list $ids */ - $ids = $path->ids; + /** @var list $indices */ + $indices = $path->indices; return new Path( new CypherList($nodes), new CypherList($relationships), - new CypherList($ids), + new CypherList($indices), ); } diff --git a/src/Formatter/SummarizedResultFormatter.php b/src/Formatter/SummarizedResultFormatter.php index 9712fd13..89d84069 100644 --- a/src/Formatter/SummarizedResultFormatter.php +++ b/src/Formatter/SummarizedResultFormatter.php @@ -32,6 +32,7 @@ use Laudis\Neo4j\Databags\SummarizedResult; use Laudis\Neo4j\Databags\SummaryCounters; use Laudis\Neo4j\Enum\QueryTypeEnum; +use Laudis\Neo4j\Exception\Neo4jException; use Laudis\Neo4j\Formatter\Specialised\BoltOGMTranslator; use Laudis\Neo4j\Types\Cartesian3DPoint; use Laudis\Neo4j\Types\CartesianPoint; @@ -281,13 +282,19 @@ private function formatProfiledPlan(array $profiledPlanData): ProfiledQueryPlan /** * @param BoltMeta $meta * - * @return CypherList> + * @return CypherList|RowDecodeFailure> */ private function processBoltResult(array $meta, BoltResult $result, BoltConnection $connection, BookmarkHolder $holder): CypherList { $tbr = (new CypherList(function () use ($result, $meta) { - foreach ($result as $row) { - yield $this->formatRow($meta, $row); + while ($result->valid()) { + $row = $result->current(); + try { + yield $this->formatRow($meta, $row); + } catch (Neo4jException $e) { + yield new RowDecodeFailure($e); + } + $result->next(); } }))->withCacheLimit($this->clientSideCacheLimitFromBoltFetchSize($result->getFetchSize())); diff --git a/src/Neo4j/Neo4jConnectionPool.php b/src/Neo4j/Neo4jConnectionPool.php index edbc2916..058e81db 100644 --- a/src/Neo4j/Neo4jConnectionPool.php +++ b/src/Neo4j/Neo4jConnectionPool.php @@ -14,6 +14,8 @@ namespace Laudis\Neo4j\Neo4j; use Bolt\error\ConnectException; +use Bolt\protocol\V4_2; +use Bolt\protocol\V4_3; use function count; @@ -171,7 +173,8 @@ public function acquire(SessionConfiguration $config): Generator } $this->cache->set($key, $table, $table->getTtl()); - // TODO: release probably logs off the connection, it is not preferable + // Return permit; keep the connection pooled for reuse (Optimization:ConnectionReuse). TestKit + // routing stubs expect no router until driver/session teardown sends GOODBYE. $pool->release($connection); break; @@ -310,8 +313,18 @@ private function routingTable(BoltConnection $connection, SessionConfiguration $ $bolt = $connection->protocol(); $this->getLogger()?->log(LogLevel::DEBUG, 'ROUTE', ['db' => $config->getDatabase()]); + + if ($bolt instanceof V4_2) { + throw new RuntimeException('Neo4j routing requires Bolt protocol 4.3 or newer (ROUTE message).'); + } + + $db = $config->getDatabase(); + $routeMessage = $bolt instanceof V4_3 + ? $bolt->route([], [], $db) + : $bolt->route([], [], ['db' => $db]); + /** @var array{rt: array{servers: list, role:string}>, ttl: int}} $route */ - $route = $bolt->route([], [], ['db' => $config->getDatabase()]) + $route = $routeMessage ->getResponse() ->content; diff --git a/src/ParameterHelper.php b/src/ParameterHelper.php index 8f232680..30078b53 100644 --- a/src/ParameterHelper.php +++ b/src/ParameterHelper.php @@ -14,12 +14,16 @@ namespace Laudis\Neo4j; use Bolt\protocol\IStructure; +use Bolt\protocol\v1\structures\DateTime as BoltV1DateTime; use Bolt\protocol\v1\structures\DateTimeZoneId; use Bolt\protocol\v1\structures\Duration; +use Bolt\protocol\v5\structures\DateTime as BoltV5DateTime; +use Bolt\protocol\v5\structures\DateTimeZoneId as BoltV5DateTimeZoneId; use function count; use DateInterval; +use DateTimeImmutable; use DateTimeInterface; use function get_debug_type; @@ -82,13 +86,15 @@ public static function asMap(iterable $iterable): CypherMap public static function asParameter( mixed $value, ConnectionProtocol $protocol, + bool $boltUtcPatchNegotiated = false, ): iterable|int|float|bool|string|stdClass|IStructure|null { return self::passThroughBoltStructure($value) ?? self::cypherMapToStdClass($value) ?? self::emptySequenceToArray($value) ?? + self::ogmDateTimeZoneIdToBolt($value, $protocol, $boltUtcPatchNegotiated) ?? self::convertBoltConvertibles($value) ?? - self::convertTemporalTypes($value, $protocol) ?? - self::filledIterableToArray($value, $protocol) ?? + self::convertTemporalTypes($value, $protocol, $boltUtcPatchNegotiated) ?? + self::filledIterableToArray($value, $protocol, $boltUtcPatchNegotiated) ?? self::stringAbleToString($value) ?? self::filterInvalidType($value); } @@ -159,21 +165,36 @@ private static function cypherMapToStdClass(mixed $value): ?stdClass return null; } - private static function filledIterableToArray(mixed $value, ConnectionProtocol $protocol): ?array + private static function filledIterableToArray(mixed $value, ConnectionProtocol $protocol, bool $boltUtcPatchNegotiated = false): ?array { if (is_iterable($value)) { - return self::iterableToArray($value, $protocol); + return self::iterableToArray($value, $protocol, $boltUtcPatchNegotiated); } return null; } + /** + * When the server negotiated {@code patch_bolt: ["utc"]} on Bolt 4.3/4.4, temporal values use the same PackStream + * structures as Bolt 5+ (Tv2 / 0x49) instead of legacy T / 0x46. + */ + private static function useBoltV5TemporalOnWire(ConnectionProtocol $protocol, bool $boltUtcPatchNegotiated): bool + { + if ($protocol->compare(ConnectionProtocol::BOLT_V44()) > 0) { + return true; + } + + return $boltUtcPatchNegotiated + && $protocol->compare(ConnectionProtocol::BOLT_V43()) >= 0 + && $protocol->compare(ConnectionProtocol::BOLT_V5()) < 0; + } + /** * @param iterable $parameters * * @return CypherMap */ - public static function formatParameters(iterable $parameters, ConnectionProtocol $connection): CypherMap + public static function formatParameters(iterable $parameters, ConnectionProtocol $connection, bool $boltUtcPatchNegotiated = false): CypherMap { /** @var array $tbr */ $tbr = []; @@ -186,13 +207,13 @@ public static function formatParameters(iterable $parameters, ConnectionProtocol $msg = 'The parameters must have an integer or string as key values, '.gettype($key).' received.'; throw new InvalidArgumentException($msg); } - $tbr[(string) $key] = self::asParameter($value, $connection); + $tbr[(string) $key] = self::asParameter($value, $connection, $boltUtcPatchNegotiated); } return new CypherMap($tbr); } - private static function iterableToArray(iterable $value, ConnectionProtocol $protocol): array + private static function iterableToArray(iterable $value, ConnectionProtocol $protocol, bool $boltUtcPatchNegotiated = false): array { $tbr = []; /** @@ -201,7 +222,7 @@ private static function iterableToArray(iterable $value, ConnectionProtocol $pro */ foreach ($value as $key => $val) { if (is_int($key) || is_string($key)) { - $tbr[$key] = self::asParameter($val, $protocol); + $tbr[$key] = self::asParameter($val, $protocol, $boltUtcPatchNegotiated); } else { $msg = 'Iterable parameters must have an integer or string as key values, '.gettype( $key @@ -222,22 +243,49 @@ private static function convertBoltConvertibles(mixed $value): ?IStructure return null; } - private static function convertTemporalTypes(mixed $value, ConnectionProtocol $protocol): ?IStructure + /** + * {@see Types\DateTimeZoneId} stores Bolt ≤4.4 "local epoch" seconds; Bolt 5+ wire uses UTC epoch. + */ + private static function ogmDateTimeZoneIdToBolt(mixed $value, ConnectionProtocol $protocol, bool $boltUtcPatchNegotiated = false): ?IStructure + { + if (!$value instanceof Types\DateTimeZoneId) { + return null; + } + + $instant = $value->toDateTime(); + $nanos = $value->getNanoseconds(); + $tzName = $value->getTimezoneIdentifier(); + + return self::useBoltV5TemporalOnWire($protocol, $boltUtcPatchNegotiated) + ? new BoltV5DateTimeZoneId($instant->getTimestamp(), $nanos, $tzName) + : new DateTimeZoneId($value->getSeconds(), $nanos, $tzName); + } + + private static function convertTemporalTypes(mixed $value, ConnectionProtocol $protocol, bool $boltUtcPatchNegotiated = false): ?IStructure { if ($value instanceof DateTimeInterface) { - if ($protocol->compare(ConnectionProtocol::BOLT_V44()) > 0) { - return new \Bolt\protocol\v5\structures\DateTimeZoneId( - $value->getTimestamp(), - ((int) $value->format('u')) * 1000, - $value->getTimezone()->getName() - ); + $immutable = $value instanceof DateTimeImmutable + ? $value + : DateTimeImmutable::createFromMutable($value); + $nanos = ((int) $immutable->format('u')) * 1000; + $offsetSec = $immutable->getOffset(); + $tzName = $immutable->getTimezone()->getName(); + + if (self::isNamedIanaStyleTimezoneId($tzName)) { + // Bolt ≤4.4: civil seconds (TestKit simple_jolt). Bolt 5+: UTC Unix epoch on the wire. + // With patch_bolt utc on 4.3/4.4, use the same v5 wire encoding as Neo4j 5+ (Tv2). + return self::useBoltV5TemporalOnWire($protocol, $boltUtcPatchNegotiated) + ? new BoltV5DateTimeZoneId($immutable->getTimestamp(), $nanos, $tzName) + : new DateTimeZoneId( + Types\DateTimeZoneId::encodeBoltCivilSecondsForInstant($immutable, $immutable->getTimezone()), + $nanos, + $tzName + ); } - return new DateTimeZoneId( - $value->getTimestamp(), - ((int) $value->format('u')) * 1000, - $value->getTimezone()->getName() - ); + return self::useBoltV5TemporalOnWire($protocol, $boltUtcPatchNegotiated) + ? new BoltV5DateTime($immutable->getTimestamp(), $nanos, $offsetSec) + : new BoltV1DateTime($immutable->getTimestamp() + $offsetSec, $nanos, $offsetSec); } if ($value instanceof DateInterval) { @@ -251,4 +299,13 @@ private static function convertTemporalTypes(mixed $value, ConnectionProtocol $p return null; } + + private static function isNamedIanaStyleTimezoneId(string $name): bool + { + if ($name === '' || $name === 'UTC' || $name === 'GMT' || strtoupper($name) === 'Z') { + return false; + } + + return preg_match('/^[+-](?:\d{2}:\d{2}|\d{4})$/', $name) !== 1; + } } diff --git a/src/Types/CypherList.php b/src/Types/CypherList.php index 663e0ca6..3a2a0121 100644 --- a/src/Types/CypherList.php +++ b/src/Types/CypherList.php @@ -308,6 +308,24 @@ public function key(): int return $this->cacheKey(); } + /** + * Returns the next record without consuming it (the iterator position is unchanged). + * + * This may pull and buffer the next row from the server when the result is lazy, the same as + * {@see current()} on a non-empty result. When there is no next record, returns `null` without + * erroring; calling {@see current()} on an exhausted result is not safe. + * + * @return TValue|null + */ + public function peek() + { + if (!$this->valid()) { + return null; + } + + return $this->current(); + } + /** * @return array */ diff --git a/src/Types/DateTime.php b/src/Types/DateTime.php index 6d565c0c..4eb91b20 100644 --- a/src/Types/DateTime.php +++ b/src/Types/DateTime.php @@ -17,6 +17,9 @@ use DateTimeImmutable; use DateTimeZone; use Exception; + +use function intdiv; + use Laudis\Neo4j\Contracts\BoltConvertibleInterface; use function sprintf; @@ -79,8 +82,8 @@ public function toDateTime(): DateTimeImmutable { $dateTime = new DateTimeImmutable(sprintf('@%s', $this->getSeconds())); $dateTime = $dateTime->modify(sprintf('+%s microseconds', $this->nanoseconds / 1000)); - /** @psalm-suppress PossiblyFalseReference */ - $dateTime = $dateTime->setTimezone(new DateTimeZone(sprintf("%+'05d", $this->getTimeZoneOffsetSeconds() / 3600 * 100))); + /** @psalm-suppress ImpureMethodCall, ArgumentTypeCoercion */ + $dateTime = $dateTime->setTimezone(new DateTimeZone(self::fixedOffsetTimezoneIdFromSeconds($this->getTimeZoneOffsetSeconds()))); if ($this->legacy) { /** @@ -95,6 +98,25 @@ public function toDateTime(): DateTimeImmutable return $dateTime; } + /** + * PHP {@see DateTimeZone} accepts fixed offsets like {@code +00:30}; the old {@code sprintf("%+'05d", …)} + * encoding only worked for whole-hour offsets and broke civil time for e.g. +30 minutes. + */ + private static function fixedOffsetTimezoneIdFromSeconds(int $offsetSeconds): string + { + $sign = $offsetSeconds >= 0 ? '+' : '-'; + $abs = abs($offsetSeconds); + $h = intdiv($abs, 3600); + $remainder = $abs % 3600; + $m = intdiv($remainder, 60); + $s = $remainder % 60; + if ($s !== 0) { + return sprintf('%s%02d:%02d:%02d', $sign, $h, $m, $s); + } + + return sprintf('%s%02d:%02d', $sign, $h, $m); + } + /** * @return array{seconds: int, nanoseconds: int, tzOffsetSeconds: int} */ diff --git a/src/Types/DateTimeZoneId.php b/src/Types/DateTimeZoneId.php index 1ebf88a1..95430387 100644 --- a/src/Types/DateTimeZoneId.php +++ b/src/Types/DateTimeZoneId.php @@ -15,6 +15,7 @@ use Bolt\protocol\IStructure; use DateTimeImmutable; +use DateTimeInterface; use DateTimeZone; use Exception; use Laudis\Neo4j\Contracts\BoltConvertibleInterface; @@ -24,7 +25,8 @@ use UnexpectedValueException; /** - * A date represented by seconds and nanoseconds since unix epoch, enriched with a timezone identifier. + * Bolt {@code DateTimeZoneId} (≤4.4) first field: SI seconds from {@code 1970-01-01T00:00:00} as UTC-naive + * to the zone wall clock (same rule as TestKit simple_jolt {@code JoltDateTime.seconds_nanoseconds}). * * @psalm-immutable * @@ -45,7 +47,7 @@ public function __construct( } /** - * Returns the amount of seconds since unix epoch. + * Bolt {@code DateTimeZoneId} “civil” seconds field (see class docblock). */ public function getSeconds(): int { @@ -72,17 +74,54 @@ public function getTimezoneIdentifier(): string * Casts to an immutable date time. * * @throws Exception + * + * @psalm-suppress ImpureMethodCall */ public function toDateTime(): DateTimeImmutable { - $dateTimeImmutable = (new DateTimeImmutable(sprintf('@%s', $this->getSeconds()))) - ->modify(sprintf('+%s microseconds', $this->nanoseconds / 1000)); + return self::toDateTimeFromBoltCivilSeconds($this->seconds, $this->nanoseconds, $this->tzId); + } + + /** + * Encodes a zoned instant into Bolt ≤4.4 {@code DateTimeZoneId} first integer (TestKit/simple_jolt compatible). + */ + public static function encodeBoltCivilSecondsForInstant(DateTimeInterface $instant, DateTimeZone $tz): int + { + $immutable = $instant instanceof DateTimeImmutable + ? $instant->setTimezone($tz) + : DateTimeImmutable::createFromMutable($instant)->setTimezone($tz); + $wall = $immutable->format('Y-m-d H:i:s'); + $naiveWallAsUtc = DateTimeImmutable::createFromFormat('!Y-m-d H:i:s', $wall, new DateTimeZone('UTC')); + if ($naiveWallAsUtc === false) { + throw new UnexpectedValueException('Expected DateTimeImmutable'); + } + $naiveEpoch = new DateTimeImmutable('1970-01-01 00:00:00', new DateTimeZone('UTC')); + + return $naiveWallAsUtc->getTimestamp() - $naiveEpoch->getTimestamp(); + } - if ($dateTimeImmutable === false) { + /** + * @param non-empty-string $tzId + */ + public static function toDateTimeFromBoltCivilSeconds(int $seconds, int $nanoseconds, string $tzId): DateTimeImmutable + { + $tz = new DateTimeZone($tzId); + $naiveUtc = (new DateTimeImmutable('1970-01-01 00:00:00', new DateTimeZone('UTC'))) + ->modify(sprintf('%+d seconds', $seconds)); + if ($naiveUtc === false) { + throw new UnexpectedValueException('Expected DateTimeImmutable'); + } + $wall = $naiveUtc->format('Y-m-d H:i:s'); + $instant = DateTimeImmutable::createFromFormat('!Y-m-d H:i:s', $wall, $tz); + if ($instant === false) { + throw new UnexpectedValueException('Expected DateTimeImmutable'); + } + $withNanos = $instant->modify(sprintf('%+d microseconds', intdiv($nanoseconds, 1000))); + if ($withNanos === false) { throw new UnexpectedValueException('Expected DateTimeImmutable'); } - return $dateTimeImmutable->setTimezone(new DateTimeZone($this->tzId)); + return $withNanos; } /** diff --git a/testkit b/testkit index f9e7590b..5fb75922 160000 --- a/testkit +++ b/testkit @@ -1 +1 @@ -Subproject commit f9e7590b44ef983b320fae9adcd0c220b8e02962 +Subproject commit 5fb7592281079df7436f139dd2cbda996b5040d0 diff --git a/testkit-backend/features.php b/testkit-backend/features.php index d7405993..3df9bd4b 100644 --- a/testkit-backend/features.php +++ b/testkit-backend/features.php @@ -99,7 +99,7 @@ // notified when the server reports a token expired. 'Feature:Auth:Managed' => false, // The driver supports Bolt protocol version 3 - 'Feature:Bolt:3.0' => true, + 'Feature:Bolt:3.0' => false, // The driver supports Bolt protocol version 4.1 'Feature:Bolt:4.1' => true, // The driver supports Bolt protocol version 4.2 diff --git a/testkit-backend/src/Handlers/AbstractRunner.php b/testkit-backend/src/Handlers/AbstractRunner.php index a648b595..d385fad5 100644 --- a/testkit-backend/src/Handlers/AbstractRunner.php +++ b/testkit-backend/src/Handlers/AbstractRunner.php @@ -14,6 +14,7 @@ namespace Laudis\Neo4j\TestkitBackend\Handlers; use Bolt\error\ConnectException as BoltConnectException; +use DateTimeImmutable; use Exception; use Laudis\Neo4j\Contracts\SessionInterface; use Laudis\Neo4j\Contracts\TransactionInterface; @@ -24,12 +25,12 @@ use Laudis\Neo4j\Exception\TransactionException; use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; use Laudis\Neo4j\TestkitBackend\MainRepository; +use Laudis\Neo4j\TestkitBackend\NutkitValueDecoder; use Laudis\Neo4j\TestkitBackend\Requests\SessionRunRequest; use Laudis\Neo4j\TestkitBackend\Requests\TransactionRunRequest; use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse; use Laudis\Neo4j\TestkitBackend\Responses\ResultResponse; use Laudis\Neo4j\Types\AbstractCypherObject; -use Laudis\Neo4j\Types\CypherList; use Laudis\Neo4j\Types\CypherMap; use Psr\Log\LoggerInterface; use Symfony\Component\Uid\Uuid; @@ -61,7 +62,7 @@ public function handle($request): ResultResponse|DriverErrorResponse $params = []; if ($request->getParams() !== null) { foreach ($request->getParams() as $key => $value) { - $params[$key] = self::decodeToValue($value); + $params[$key] = NutkitValueDecoder::decode($value); } } @@ -70,7 +71,7 @@ public function handle($request): ResultResponse|DriverErrorResponse $actualMeta = []; if ($metaData !== null) { foreach ($metaData as $key => $meta) { - $actualMeta[$key] = self::decodeToValue($meta); + $actualMeta[$key] = NutkitValueDecoder::decode($meta); } } $config = TransactionConfiguration::default()->withMetadata($actualMeta)->withTimeout($request->getTimeout()); @@ -122,44 +123,13 @@ public function handle($request): ResultResponse|DriverErrorResponse } /** - * @param array{name: string, data: array{value: iterable|scalar|null}} $param + * @param array{name: string, data: array} $param * - * @return scalar|AbstractCypherObject|iterable|null + * @return scalar|AbstractCypherObject|DateTimeImmutable|iterable|null */ public static function decodeToValue(array $param) { - $value = $param['data']['value']; - if (is_iterable($value)) { - if ($param['name'] === 'CypherMap') { - /** @psalm-suppress MixedArgumentTypeCoercion */ - $map = []; - /** - * @var numeric $k - * @var mixed $v - */ - foreach ($value as $k => $v) { - /** @psalm-suppress MixedArgument */ - $map[(string) $k] = self::decodeToValue($v); - } - - return new CypherMap($map); - } - - if ($param['name'] === 'CypherList') { - $list = []; - /** - * @var mixed $v - */ - foreach ($value as $v) { - /** @psalm-suppress MixedArgument */ - $list[] = self::decodeToValue($v); - } - - return new CypherList($list); - } - } - - return $value; + return NutkitValueDecoder::decode($param); } /** diff --git a/testkit-backend/src/Handlers/ResultList.php b/testkit-backend/src/Handlers/ResultList.php index c06a306c..7d184eb0 100644 --- a/testkit-backend/src/Handlers/ResultList.php +++ b/testkit-backend/src/Handlers/ResultList.php @@ -67,22 +67,28 @@ public function handle($request): TestkitResponseInterface return new RecordListResponse($rows); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + // Keep error for RetryableNegative lookup by result id (execute_read tx_func tests). + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } diff --git a/testkit-backend/src/Handlers/ResultNext.php b/testkit-backend/src/Handlers/ResultNext.php index d9b99258..e1419296 100644 --- a/testkit-backend/src/Handlers/ResultNext.php +++ b/testkit-backend/src/Handlers/ResultNext.php @@ -15,7 +15,9 @@ use Bolt\error\BoltException; use Laudis\Neo4j\Databags\Neo4jError; +use Laudis\Neo4j\Databags\SummarizedResult; use Laudis\Neo4j\Exception\Neo4jException; +use Laudis\Neo4j\Formatter\RowDecodeFailure; use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; use Laudis\Neo4j\TestkitBackend\MainRepository; @@ -61,9 +63,21 @@ public function handle($request): TestkitResponseInterface } // Get the current record - $current = $iterator->current(); + $current = $iterator instanceof SummarizedResult + ? $iterator->currentAllowingDecodeFailures() + : $iterator->current(); $this->repository->setIteratorFetchedFirst($request->getResultId(), true); + if ($current instanceof RowDecodeFailure) { + // Do not replace the SummarizedResult in the repository: the stream must stay iterable so + // a following ResultNext can read later rows (e.g. unknown-then-known temporal stub). + $response = new DriverErrorResponse($request->getResultId(), $current->exception); + $this->repository->rememberResultDriverError($request->getResultId(), $response); + $this->repository->addPendingIteratorNext($request->getResultId()); + + return $response; + } + $values = []; foreach ($current as $value) { $values[] = CypherObject::autoDetect($value); @@ -73,22 +87,25 @@ public function handle($request): TestkitResponseInterface return new RecordResponse($values); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { - $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { - $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } diff --git a/testkit-backend/src/Handlers/ResultPeek.php b/testkit-backend/src/Handlers/ResultPeek.php index 504a5247..39fd6d41 100644 --- a/testkit-backend/src/Handlers/ResultPeek.php +++ b/testkit-backend/src/Handlers/ResultPeek.php @@ -15,7 +15,9 @@ use Bolt\error\BoltException; use Laudis\Neo4j\Databags\Neo4jError; +use Laudis\Neo4j\Databags\SummarizedResult; use Laudis\Neo4j\Exception\Neo4jException; +use Laudis\Neo4j\Formatter\RowDecodeFailure; use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; use Laudis\Neo4j\TestkitBackend\MainRepository; @@ -57,7 +59,17 @@ public function handle($request): TestkitResponseInterface return new NullRecordResponse(); } - $current = $iterator->current(); + $current = $iterator instanceof SummarizedResult + ? $iterator->currentAllowingDecodeFailures() + : $iterator->current(); + + if ($current instanceof RowDecodeFailure) { + // Same as ResultNext: keep SummarizedResult so peek/next can continue the stream. + $response = new DriverErrorResponse($request->getResultId(), $current->exception); + $this->repository->rememberResultDriverError($request->getResultId(), $response); + + return $response; + } $values = []; foreach ($current as $value) { @@ -66,22 +78,27 @@ public function handle($request): TestkitResponseInterface return new RecordResponse($values); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } @@ -101,6 +118,7 @@ private function isConnectionOrSocketError(Throwable $e): bool || str_contains($message, 'network read incomplete') || str_contains($message, 'network write incomplete') || str_contains($message, 'socket') - || str_contains($message, 'broken'); + || str_contains($message, 'broken') + || str_contains($message, 'already been closed'); } } diff --git a/testkit-backend/src/Handlers/ResultSingle.php b/testkit-backend/src/Handlers/ResultSingle.php index 6af2bcce..5152ec01 100644 --- a/testkit-backend/src/Handlers/ResultSingle.php +++ b/testkit-backend/src/Handlers/ResultSingle.php @@ -13,6 +13,7 @@ namespace Laudis\Neo4j\TestkitBackend\Handlers; +use Bolt\error\BoltException; use Laudis\Neo4j\Databags\Neo4jError; use Laudis\Neo4j\Exception\Neo4jException; use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; @@ -22,6 +23,7 @@ use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse; use Laudis\Neo4j\TestkitBackend\Responses\RecordResponse; use Laudis\Neo4j\TestkitBackend\Responses\Types\CypherObject; +use Throwable; /** * Request to expect and return exactly one record in the result stream. @@ -42,28 +44,72 @@ public function __construct( public function handle($request): TestkitResponseInterface { - $record = $this->repository->getRecords($request->getResultId()); - if ($record instanceof TestkitResponseInterface) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Statement.ResultNotSingle', 'Something went wrong with the result handling')]); + try { + $record = $this->repository->getRecords($request->getResultId()); + if ($record instanceof TestkitResponseInterface) { + return $record; + } - return new DriverErrorResponse($request->getResultId(), $err); - } + $count = $record->count(); + if ($count !== 1) { + $err = new Neo4jException([Neo4jError::fromMessageAndCode( + 'Neo.ClientError.Statement.ResultNotSingle', + sprintf('Expected exactly one result row, found %d.', $count) + )]); + $response = new DriverErrorResponse($request->getResultId(), $err); + $this->repository->addRecords($request->getResultId(), $response); - $count = $record->count(); - if ($count !== 1) { - $err = new Neo4jException([Neo4jError::fromMessageAndCode( - 'Neo.ClientError.Statement.ResultNotSingle', - sprintf('Expected exactly one result row, found %d.', $count) - )]); + return $response; + } - return new DriverErrorResponse($request->getResultId(), $err); - } + $values = []; + foreach ($record->getAsCypherMap(0) as $value) { + $values[] = CypherObject::autoDetect($value); + } + + $this->repository->removeRecords($request->getResultId()); + + return new RecordResponse($values); + } catch (Neo4jException $e) { + $response = new DriverErrorResponse($request->getResultId(), $e); + $this->repository->addRecords($request->getResultId(), $response); + + return $response; + } catch (BoltException $e) { + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - $values = []; - foreach ($record->getAsCypherMap(0) as $value) { - $values[] = CypherObject::autoDetect($value); + return $response; + } catch (Throwable $e) { + if ($this->isConnectionOrSocketError($e)) { + $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); + $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); + + return $response; + } + throw $e; } + } + + private function isConnectionOrSocketError(Throwable $e): bool + { + $message = strtolower($e->getMessage()); - return new RecordResponse($values); + return str_contains($message, 'broken pipe') + || str_contains($message, 'connection reset') + || str_contains($message, 'connection refused') + || str_contains($message, 'connection closed') + || str_contains($message, 'connection is closed') + || str_contains($message, 'interrupted system call') + || str_contains($message, 'i/o error') + || str_contains($message, 'network read incomplete') + || str_contains($message, 'network write incomplete') + || str_contains($message, 'socket') + || str_contains($message, 'broken') + || str_contains($message, 'already been closed'); } } diff --git a/testkit-backend/src/Handlers/ResultSingleOptional.php b/testkit-backend/src/Handlers/ResultSingleOptional.php index 89ffa677..cd40f785 100644 --- a/testkit-backend/src/Handlers/ResultSingleOptional.php +++ b/testkit-backend/src/Handlers/ResultSingleOptional.php @@ -69,22 +69,28 @@ public function handle($request): TestkitResponseInterface ['Expected a single record but found multiple records in the stream.'] ); } catch (Neo4jException $e) { - $this->repository->removeRecords($request->getResultId()); + $response = new DriverErrorResponse($request->getResultId(), $e); + // Keep error for RetryableNegative lookup by result id (execute_read tx_func tests). + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $e); + return $response; } catch (BoltException $e) { $this->repository->removeRecords($request->getResultId()); $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } catch (Throwable $e) { $this->repository->removeRecords($request->getResultId()); if ($this->isConnectionOrSocketError($e)) { $neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage()); $wrapped = new Neo4jException([$neo4jError], $e); + $response = new DriverErrorResponse($request->getResultId(), $wrapped); + $this->repository->addRecords($request->getResultId(), $response); - return new DriverErrorResponse($request->getResultId(), $wrapped); + return $response; } throw $e; } @@ -104,6 +110,7 @@ private function isConnectionOrSocketError(Throwable $e): bool || str_contains($message, 'network read incomplete') || str_contains($message, 'network write incomplete') || str_contains($message, 'socket') - || str_contains($message, 'broken'); + || str_contains($message, 'broken') + || str_contains($message, 'already been closed'); } } diff --git a/testkit-backend/src/Handlers/RetryableNegative.php b/testkit-backend/src/Handlers/RetryableNegative.php index 9481be64..fe8bfb4e 100644 --- a/testkit-backend/src/Handlers/RetryableNegative.php +++ b/testkit-backend/src/Handlers/RetryableNegative.php @@ -57,35 +57,51 @@ public function handle($request): TestkitResponseInterface return new BackendErrorResponse('Transaction not found '.$transactionId->toRfc4122()); } - try { - $tsx->rollback(); - } catch (Throwable $e) { - // Best-effort rollback: connection may already be broken. Proceed with error response. - $this->logger->debug('Rollback failed during RetryableNegative', ['exception' => $e->getMessage()]); - } - $errorId = $request->getErrorId(); + $resolvedException = null; if ($errorId !== '' && $errorId !== null) { try { $errorUuid = $errorId instanceof Uuid ? $errorId : Uuid::fromString($errorId); $errorResponse = $this->repository->getRecords($errorUuid); - if ($errorResponse instanceof DriverErrorResponse) { - $exception = $errorResponse->getException(); - if ($exception instanceof Neo4jException && $exception->getClassification() === 'TransientError') { - // If the original error was retryable, signal for retry - return new RetryableTryResponse($transactionId); + $resolvedException = $errorResponse->getException(); + } else { + $buffered = $this->repository->peekResultDriverError($errorUuid); + if ($buffered instanceof DriverErrorResponse) { + $resolvedException = $buffered->getException(); } - - // Otherwise, return the original error to the frontend - return new DriverErrorResponse($transactionId, $exception); } } catch (Throwable $e) { - // Invalid errorId or record not found - fall through to generic FrontendError $this->logger->debug('Could not retrieve error for RetryableNegative', ['exception' => $e->getMessage()]); } } + // Transient errors after PULL (e.g. tx_error_on_pull.script): connection is RESET to READY and the + // same transaction object is retried — do not ROLLBACK/BEGIN here; the next RUN will send BEGIN. + // Client errors (e.g. unknown zone) need a real ROLLBACK on the wire. + $skipRollback = $resolvedException instanceof Neo4jException + && $resolvedException->getClassification() === 'TransientError'; + + if (!$skipRollback) { + try { + $tsx->rollback(); + } catch (Throwable $e) { + $this->logger->debug('Rollback failed during RetryableNegative', ['exception' => $e->getMessage()]); + } + } + + if ($resolvedException instanceof Neo4jException) { + if ($resolvedException->getClassification() === 'TransientError') { + return new RetryableTryResponse($transactionId); + } + + return new DriverErrorResponse($transactionId, $resolvedException); + } + + if ($resolvedException !== null) { + return new DriverErrorResponse($transactionId, $resolvedException); + } + // If no specific error was provided or couldn't be retrieved, // client code caused the rollback (e.g. ApplicationCodeError) - return FrontendError return new FrontendErrorResponse('Client code caused transaction to be rolled back'); diff --git a/testkit-backend/src/MainRepository.php b/testkit-backend/src/MainRepository.php index efb19ee6..bdc36b2f 100644 --- a/testkit-backend/src/MainRepository.php +++ b/testkit-backend/src/MainRepository.php @@ -19,6 +19,7 @@ use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; use Laudis\Neo4j\Databags\SummarizedResult; use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; +use Laudis\Neo4j\TestkitBackend\Responses\DriverErrorResponse; use Laudis\Neo4j\Types\CypherMap; use Symfony\Component\Uid\Uuid; @@ -55,6 +56,14 @@ final class MainRepository */ private array $pendingIteratorNextCount = []; + /** + * {@see DriverErrorResponse} for a result id when the live {@see SummarizedResult} must stay in + * {@see $records} (RowDecodeFailure path) so {@see RetryableNegative} can still resolve {@code errorId}. + * + * @var array + */ + private array $resultDriverErrors = []; + /** * @param array>>> $drivers * @param array>>> $sessions @@ -188,17 +197,37 @@ public function getSession(Uuid $id): SessionInterface */ public function addRecords(Uuid $id, $result): void { - $this->records[$id->toRfc4122()] = $result; + $key = $id->toRfc4122(); + $this->records[$key] = $result; if ($result instanceof SummarizedResult) { /** @var SummarizedResult> $result */ - $this->recordIterators[$id->toRfc4122()] = $result; + $this->recordIterators[$key] = $result; + } else { + unset($this->recordIterators[$key]); } } public function removeRecords(Uuid $id): void { $key = $id->toRfc4122(); - unset($this->records[$key], $this->iteratorFetchedFirst[$key], $this->peekPrimed[$key], $this->pendingIteratorNextCount[$key]); + unset( + $this->records[$key], + $this->recordIterators[$key], + $this->iteratorFetchedFirst[$key], + $this->peekPrimed[$key], + $this->pendingIteratorNextCount[$key], + $this->resultDriverErrors[$key] + ); + } + + public function rememberResultDriverError(Uuid $id, DriverErrorResponse $response): void + { + $this->resultDriverErrors[$id->toRfc4122()] = $response; + } + + public function peekResultDriverError(Uuid $id): ?DriverErrorResponse + { + return $this->resultDriverErrors[$id->toRfc4122()] ?? null; } /** diff --git a/testkit-backend/src/NutkitValueDecoder.php b/testkit-backend/src/NutkitValueDecoder.php new file mode 100644 index 00000000..d2698206 --- /dev/null +++ b/testkit-backend/src/NutkitValueDecoder.php @@ -0,0 +1,149 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend; + +use DateTimeImmutable; +use DateTimeZone; +use InvalidArgumentException; +use Laudis\Neo4j\Types\CypherList; +use Laudis\Neo4j\Types\CypherMap; + +/** + * Decodes nutkit JSON parameter payloads ({@see \nutkit\backend\Encoder}) into PHP values for the driver. + */ +final class NutkitValueDecoder +{ + /** + * @param array{name?: string, data?: array} $param + */ + public static function decode(array $param): mixed + { + $name = $param['name'] ?? ''; + $data = $param['data'] ?? []; + + return match ($name) { + 'CypherNull' => null, + 'CypherDateTime' => self::decodeCypherDateTime($data), + 'CypherMap' => self::decodeCypherMap($data), + 'CypherList' => self::decodeCypherList($data), + default => self::decodeScalarWrapper($name, $data), + }; + } + + /** + * @param array $data + */ + private static function decodeScalarWrapper(string $name, array $data): mixed + { + if (!array_key_exists('value', $data)) { + throw new InvalidArgumentException('Unsupported nutkit type or missing data.value for: '.$name); + } + + return $data['value']; + } + + /** + * @param array $data + */ + private static function decodeCypherMap(array $data): CypherMap + { + $raw = $data['value'] ?? []; + if (!is_iterable($raw)) { + throw new InvalidArgumentException('CypherMap.value must be iterable'); + } + $map = []; + foreach ($raw as $k => $v) { + $map[(string) $k] = is_array($v) && isset($v['name'], $v['data']) + ? self::decode($v) + : $v; + } + + return new CypherMap($map); + } + + /** + * @param array $data + */ + private static function decodeCypherList(array $data): CypherList + { + $raw = $data['value'] ?? []; + if (!is_iterable($raw)) { + throw new InvalidArgumentException('CypherList.value must be iterable'); + } + $list = []; + foreach ($raw as $v) { + $list[] = is_array($v) && isset($v['name'], $v['data']) + ? self::decode($v) + : $v; + } + + return new CypherList($list); + } + + /** + * @param array $data + */ + private static function decodeCypherDateTime(array $data): DateTimeImmutable + { + $y = (int) ($data['year'] ?? 0); + $m = (int) ($data['month'] ?? 0); + $d = (int) ($data['day'] ?? 0); + $h = (int) ($data['hour'] ?? 0); + $i = (int) ($data['minute'] ?? 0); + $s = (int) ($data['second'] ?? 0); + $ns = (int) ($data['nanosecond'] ?? 0); + $utcOffsetS = array_key_exists('utc_offset_s', $data) && $data['utc_offset_s'] !== null + ? (int) $data['utc_offset_s'] + : null; + $timezoneId = $data['timezone_id'] ?? null; + if ($timezoneId === '') { + $timezoneId = null; + } + + $micro = intdiv($ns, 1000); + $base = sprintf('%04d-%02d-%02d %02d:%02d:%02d', $y, $m, $d, $h, $i, $s); + + if (is_string($timezoneId)) { + $tz = new DateTimeZone($timezoneId); + $dt = DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $base, $tz); + if ($dt === false) { + throw new InvalidArgumentException('Invalid CypherDateTime (timezone_id)'); + } + } elseif ($utcOffsetS !== null) { + $tz = self::timezoneFromOffsetSeconds($utcOffsetS); + $dt = DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $base, $tz); + if ($dt === false) { + throw new InvalidArgumentException('Invalid CypherDateTime (offset)'); + } + } else { + $dt = new DateTimeImmutable($base, new DateTimeZone('UTC')); + } + + if ($micro > 0) { + $dt = $dt->modify(sprintf('+%d microseconds', $micro)); + } + + return $dt; + } + + private static function timezoneFromOffsetSeconds(int $offsetSec): DateTimeZone + { + $sign = $offsetSec >= 0 ? '+' : '-'; + $abs = abs($offsetSec); + $hours = intdiv($abs, 3600); + $mins = intdiv($abs % 3600, 60); + + return new DateTimeZone(sprintf('%s%02d:%02d', $sign, $hours, $mins)); + } +} diff --git a/testkit-backend/src/Responses/Types/CypherObject.php b/testkit-backend/src/Responses/Types/CypherObject.php index 46a94117..e987dead 100644 --- a/testkit-backend/src/Responses/Types/CypherObject.php +++ b/testkit-backend/src/Responses/Types/CypherObject.php @@ -13,11 +13,15 @@ namespace Laudis\Neo4j\TestkitBackend\Responses\Types; +use DateTimeInterface; + use function get_debug_type; use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; use Laudis\Neo4j\Types\CypherList; use Laudis\Neo4j\Types\CypherMap; +use Laudis\Neo4j\Types\DateTime as Neo4jDateTime; +use Laudis\Neo4j\Types\DateTimeZoneId as Neo4jDateTimeZoneId; use Laudis\Neo4j\Types\Node; use Laudis\Neo4j\Types\Path; use Laudis\Neo4j\Types\Relationship; @@ -93,6 +97,19 @@ public static function autoDetect($value): TestkitResponseInterface } $tbr = new CypherObject('Vector', new CypherList($list)); break; + case Neo4jDateTime::class: + /** @var Neo4jDateTime $value */ + $tbr = NutkitFlatCypherValue::cypherDateTimeFromNeo4jDateTime($value); + break; + case Neo4jDateTimeZoneId::class: + /** @var Neo4jDateTimeZoneId $value */ + $tbr = NutkitFlatCypherValue::cypherDateTimeFromNeo4jDateTimeZoneId($value); + break; + case 'DateTimeImmutable': + case 'DateTime': + /** @var DateTimeInterface $value */ + $tbr = NutkitFlatCypherValue::cypherDateTimeFromDateTimeInterface($value); + break; case 'int': $tbr = new CypherObject('CypherInt', $value); break; diff --git a/testkit-backend/src/Responses/Types/NutkitFlatCypherValue.php b/testkit-backend/src/Responses/Types/NutkitFlatCypherValue.php new file mode 100644 index 00000000..b2d6055e --- /dev/null +++ b/testkit-backend/src/Responses/Types/NutkitFlatCypherValue.php @@ -0,0 +1,86 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\TestkitBackend\Responses\Types; + +use DateTimeInterface; +use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; +use Laudis\Neo4j\Types\DateTime as Neo4jDateTime; +use Laudis\Neo4j\Types\DateTimeZoneId as Neo4jDateTimeZoneId; + +/** + * Nutkit {@see \nutkit\backend\Encoder} expects several Cypher types with flat {@code data} fields + * (not wrapped in {@code data.value}); this response mirrors that shape for JSON serialization. + */ +final class NutkitFlatCypherValue implements TestkitResponseInterface +{ + /** + * @param array $data + */ + public function __construct( + private string $name, + private array $data, + ) { + } + + public static function cypherDateTimeFromDateTimeInterface(DateTimeInterface $dt): self + { + return new self('CypherDateTime', self::dateTimeToNutkitFields($dt)); + } + + public static function cypherDateTimeFromNeo4jDateTime(Neo4jDateTime $dt): self + { + return self::cypherDateTimeFromDateTimeInterface($dt->toDateTime()); + } + + public static function cypherDateTimeFromNeo4jDateTimeZoneId(Neo4jDateTimeZoneId $dt): self + { + return self::cypherDateTimeFromDateTimeInterface($dt->toDateTime()); + } + + /** + * @return array + */ + private static function dateTimeToNutkitFields(DateTimeInterface $dt): array + { + $nanosecond = (int) $dt->format('u') * 1000; + $tzName = $dt->getTimezone()->getName(); + $utcOffsetS = $dt->getOffset(); + $timezoneId = null; + if (!in_array($tzName, ['UTC', 'Z', 'GMT'], true) + && !str_starts_with($tzName, '+') + && !str_starts_with($tzName, '-')) { + $timezoneId = $tzName; + } + + return [ + 'year' => (int) $dt->format('Y'), + 'month' => (int) $dt->format('n'), + 'day' => (int) $dt->format('j'), + 'hour' => (int) $dt->format('G'), + 'minute' => (int) $dt->format('i'), + 'second' => (int) $dt->format('s'), + 'nanosecond' => $nanosecond, + 'utc_offset_s' => $utcOffsetS, + 'timezone_id' => $timezoneId, + ]; + } + + public function jsonSerialize(): array + { + return [ + 'name' => $this->name, + 'data' => $this->data, + ]; + } +} diff --git a/testkit-backend/src/Socket.php b/testkit-backend/src/Socket.php index a01133a4..683cf5bc 100644 --- a/testkit-backend/src/Socket.php +++ b/testkit-backend/src/Socket.php @@ -90,11 +90,15 @@ public static function fromAddressAndPort(string $address, int $port): self public function reset(): void { - if ($this->socket !== null && !stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR)) { - throw new RuntimeException(json_encode(error_get_last(), JSON_THROW_ON_ERROR)); + if ($this->socket === null) { + return; } + $sock = $this->socket; $this->socket = null; + if (is_resource($sock)) { + @stream_socket_shutdown($sock, STREAM_SHUT_RDWR); + } } public function readMessage(): ?string diff --git a/testkit-backend/testkit.sh b/testkit-backend/testkit.sh index 3ac85647..252ed962 100755 --- a/testkit-backend/testkit.sh +++ b/testkit-backend/testkit.sh @@ -163,29 +163,78 @@ python3 -m unittest -v \ tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_0_records \ tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_1_records \ tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_2_records \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_disconnect \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_failure \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_failure_tx_run \ + tests.stub.iteration.test_result_list.TestResultList.test_result_list_with_failure_tx_func_run \ tests.stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once \ tests.stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_run_result_list_pulls_all_records_at_once \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_run_result_list_pulls_all_records_at_once_next_before_list \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_func_result_list_pulls_all_records_at_once \ + tests.stub.iteration.test_result_list.TestResultList.test_tx_func_result_list_pulls_all_records_at_once_next_before_list \ \ tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_0_records \ tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_1_records \ tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_2_records \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_disconnect \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_failure \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_failure_tx_run \ + tests.stub.iteration.test_result_single.TestResultSingle.test_result_single_with_failure_tx_func_run \ \ tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_0_records \ tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_1_records \ tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_2_records \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_disconnect \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_failure \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_failure_tx_run \ + tests.stub.iteration.test_result_optional_single.TestResultSingleOptional.test_result_single_optional_with_failure_tx_func_run \ \ tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_0_records \ tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_1_records \ tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_2_records \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_disconnect \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure_tx_run \ + tests.stub.iteration.test_result_peek.TestResultPeek.test_result_peek_with_failure_tx_func_run \ \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_full_batch \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_half_batch \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_empty_batch \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_error \ tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all \ -\ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch \ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all \ - tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_all_slow_connection \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_discards_on_session_close \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested \ + tests.stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested_using_list \ +\ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_batch \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_all_slow_connection \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested \ + tests.stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested_using_list \ +\ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x2.test_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x2.test_zoned_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x3.test_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x3.test_date_time_with_patch \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x3.test_zoned_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x3.test_zoned_date_time_with_patch \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x4.test_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x4.test_date_time_with_patch \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x4.test_zoned_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x4.test_zoned_date_time_with_patch \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x4.test_unknown_zoned_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x4.test_unknown_zoned_date_time_patched \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x4.test_unknown_then_known_zoned_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV4x4.test_unknown_then_known_zoned_date_time_patched \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV5x0.test_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV5x0.test_zoned_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV5x0.test_unknown_zoned_date_time \ + tests.stub.datatypes.test_temporal_types.TestTemporalTypesV5x0.test_unknown_then_known_zoned_date_time \ + tests.stub.datatypes.test_unsupported_type.TestUnsupportedTypes.test_unsupported_type \ + tests.stub.datatypes.test_unsupported_type.TestUnsupportedTypes.test_unsupported_type_in_list \ + tests.stub.datatypes.test_vector_types.TestVectorTypes.test_vector \ EXIT_CODE=$? diff --git a/tests/Integration/SummarizedResultFormatterTest.php b/tests/Integration/SummarizedResultFormatterTest.php index 16e85cbe..32dd1d91 100644 --- a/tests/Integration/SummarizedResultFormatterTest.php +++ b/tests/Integration/SummarizedResultFormatterTest.php @@ -17,6 +17,7 @@ use DateInterval; use DateTimeImmutable; +use DateTimeZone; use function dump; @@ -113,11 +114,16 @@ public function testDateTime(): void $this->markTestSkipped('http does not support datetime conversion'); } - $dt = new DateTimeImmutable(); + // Neo4j may return either Bolt DateTime (offset) → Types\DateTime or Bolt DateTimeZoneId → Types\DateTimeZoneId + // depending on server/version and how the instant is canonicalised on the wire. + $dt = new DateTimeImmutable('2021-06-15T14:30:00', new DateTimeZone('Europe/Paris')); $ls = $this->getSession()->run('RETURN $x AS x', ['x' => $dt])->first()->get('x'); - $this->assertInstanceOf(DateTimeZoneId::class, $ls); - $this->assertEquals($dt, $ls->toDateTime()); + self::assertTrue( + $ls instanceof DateTimeZoneId || $ls instanceof DateTime, + sprintf('Expected DateTimeZoneId or DateTime, got %s', get_debug_type($ls)) + ); + self::assertEquals($dt, $ls->toDateTime()); } public function testNull(): void diff --git a/tests/Unit/BoltConnectionPoolTest.php b/tests/Unit/BoltConnectionPoolTest.php index 987ce985..d4de3e53 100644 --- a/tests/Unit/BoltConnectionPoolTest.php +++ b/tests/Unit/BoltConnectionPoolTest.php @@ -122,7 +122,9 @@ public function testReleaseReference(): void $this->pool->release($connection); - static::assertEquals($refCount - 1, $this->refCount($connection)); + // Release returns the permit but keeps the connection in the pool for reuse and for + // {@see ConnectionPool::close()} — it must not drop the pool's reference. + static::assertEquals($refCount, $this->refCount($connection)); } /** diff --git a/tests/Unit/CypherListPeekTest.php b/tests/Unit/CypherListPeekTest.php new file mode 100644 index 00000000..80cf9164 --- /dev/null +++ b/tests/Unit/CypherListPeekTest.php @@ -0,0 +1,59 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Laudis\Neo4j\Tests\Unit; + +use Laudis\Neo4j\Types\CypherList; +use Laudis\Neo4j\Types\CypherMap; +use PHPUnit\Framework\TestCase; + +final class CypherListPeekTest extends TestCase +{ + public function testPeekReturnsNullOnEmptyResult(): void + { + $list = new CypherList([]); + + self::assertFalse($list->valid()); + self::assertNull($list->peek()); + } + + public function testPeekMatchesCurrentAndDoesNotAdvance(): void + { + $list = new CypherList([ + new CypherMap(['n' => 1]), + new CypherMap(['n' => 2]), + ]); + + $a = $list->peek(); + $b = $list->peek(); + $c = $list->current(); + + self::assertNotNull($a); + self::assertSame($a, $b); + self::assertSame($a, $c); + self::assertSame(1, $a->get('n')); + + $list->next(); + + self::assertSame(2, $list->peek()?->get('n')); + } + + public function testPeekReturnsNullAfterAllRowsConsumed(): void + { + $list = new CypherList([new CypherMap(['x' => true])]); + foreach ($list as $_) { + } + self::assertFalse($list->valid()); + self::assertNull($list->peek()); + } +} diff --git a/tests/Unit/ParameterHelperTest.php b/tests/Unit/ParameterHelperTest.php index 2a18b8e1..51c6204b 100644 --- a/tests/Unit/ParameterHelperTest.php +++ b/tests/Unit/ParameterHelperTest.php @@ -13,7 +13,9 @@ namespace Laudis\Neo4j\Tests\Unit; +use Bolt\protocol\v1\structures\DateTime as BoltV1DateTime; use Bolt\protocol\v1\structures\DateTimeZoneId; +use Bolt\protocol\v5\structures\DateTime as BoltV5DateTimeStruct; use DateTime; use DateTimeZone; use InvalidArgumentException; @@ -175,4 +177,20 @@ public function testDateTime5(): void self::assertInstanceOf(\Bolt\protocol\v5\structures\DateTimeZoneId::class, $date); } + + public function testOffsetDateTimeWithBoltUtcPatchOn44UsesV5Structure(): void + { + $dt = new DateTime('2022-06-07 11:52:05', new DateTimeZone('+02:00')); + $p = ParameterHelper::asParameter($dt, ConnectionProtocol::BOLT_V44(), true); + + self::assertInstanceOf(BoltV5DateTimeStruct::class, $p); + } + + public function testOffsetDateTimeWithoutPatchOn44UsesLegacyStructure(): void + { + $dt = new DateTime('2022-06-07 11:52:05', new DateTimeZone('+02:00')); + $p = ParameterHelper::asParameter($dt, ConnectionProtocol::BOLT_V44(), false); + + self::assertInstanceOf(BoltV1DateTime::class, $p); + } } diff --git a/tests/Unit/TypeCasterTest.php b/tests/Unit/TypeCasterTest.php index 20a767eb..d6f53e05 100644 --- a/tests/Unit/TypeCasterTest.php +++ b/tests/Unit/TypeCasterTest.php @@ -33,9 +33,11 @@ final class TypeCasterTest extends TestCase * Complete coverage: every input type × every cast method. * When a cast isn't possible, expected is null (invalid case). * - * Yields argument lists for {@see testCastMatrix} in order: input, method, expected, class (null unless toClass). + * Yields argument lists for {@see testCastMatrix}: input, method, expected, class (null unless toClass). * * @return Generator + * + * @psalm-suppress MixedReturnTypeCoercion */ public static function provideCastMatrix(): Generator { @@ -94,7 +96,8 @@ public function __toString(): string foreach ($inputs as $inputName => $inputValue) { foreach ($matrix as $method => $expectations) { - $expected = self::expectedValueForInput($expectations, $inputName); + /** @psalm-suppress MixedAssignment */ + $expected = $expectations[$inputName] ?? null; $key = $inputName.'->'.$method; // Generator must be fresh per test (consumable once) @@ -107,6 +110,9 @@ public function __toString(): string $classForRow = null; if ($method === 'toClass') { + if (!array_key_exists('_class', $expectations) || !is_array($expectations['_class'])) { + throw new InvalidArgumentException('Malformed toClass expectations: missing _class map'); + } $classForRow = self::toClassClassNameForInput($expectations, $inputName); }