Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ foreach ($results as $result) {
}
```

`peek()` returns the next row without moving the cursor (or `null` if there is none). It can pull the next record from the server into the buffer if needed, but does not advance past it—unlike `next()` in a `foreach`, which always consumes. Repeated calls to `peek()` return the same value until you advance the iterator (for example with `next()` or by continuing a `foreach`).

Cypher values and types map to these php types and classes:

| Cypher | Php |
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"psr/http-factory": "^1.0",
"psr/http-client": "^1.0",
"php-http/message": "^1.0",
"stefanak-michal/bolt": "^7.1.4",
"stefanak-michal/bolt": "^7.4",
"symfony/polyfill-php80": "^1.2",
"psr/simple-cache": ">=2.0",
"ext-json": "*",
Expand Down
12 changes: 6 additions & 6 deletions src/Authentication/BasicAuth.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public function __construct(
/**
* @throws Exception
*
* @return array{server: string, connection_id: string, hints: list}
* @return array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>}
*/
public function authenticateBolt(BoltConnection $connection, string $userAgent): array
{
$factory = $this->createMessageFactory($connection);

$protocol = $connection->protocol();
if (method_exists($protocol, 'logon')) {
$helloMetadata = ['user_agent' => $userAgent];
$helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, ['user_agent' => $userAgent]);

$responseHello = $factory->createHelloMessage($helloMetadata)->send()->getResponse();

Expand All @@ -55,20 +55,20 @@ public function authenticateBolt(BoltConnection $connection, string $userAgent):

$response = $factory->createLogonMessage($credentials)->send()->getResponse();

/** @var array{server: string, connection_id: string, hints: list} */
/** @var array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>} */
return array_merge($responseHello->content, $response->content);
}

$helloMetadata = [
$helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, [
'user_agent' => $userAgent,
'scheme' => 'basic',
'principal' => $this->username,
'credentials' => $this->password,
];
]);

$response = $factory->createHelloMessage($helloMetadata)->send()->getResponse();

/** @var array{server: string, connection_id: string, hints: list} */
/** @var array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>} */
return $response->content;
}

Expand Down
39 changes: 39 additions & 0 deletions src/Authentication/BoltHelloMetadata.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Authentication;

use Laudis\Neo4j\Bolt\BoltConnection;

/**
* Merges Bolt HELLO fields required for Neo4j 4.3+ temporal UTC patch negotiation.
*/
final class BoltHelloMetadata
{
/**
* @param array<string, mixed> $metadata
*
* @return array<string, mixed>
*/
public static function withUtcPatchIfSupported(BoltConnection $connection, array $metadata): array
{
// Neo4j 4.3–4.4: optional UTC temporal patch. Bolt 5+ TestKit scripts expect HELLO without patch_bolt.
// Compare against "5" (not "5.0"): bolt library getVersion() returns "5" for class V5.
$v = $connection->protocol()->getVersion();
if (version_compare($v, '4.3', '>=') && version_compare($v, '5', '<')) {
$metadata['patch_bolt'] = ['utc'];
}

return $metadata;
}
}
8 changes: 5 additions & 3 deletions src/Authentication/KerberosAuth.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ public function __construct(
/**
* @throws Exception
*
* @return array{server: string, connection_id: string, hints: list}
* @return array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>}
*/
public function authenticateBolt(BoltConnection $connection, string $userAgent): array
{
$factory = $this->createMessageFactory($connection);

$this->logger?->log(LogLevel::DEBUG, 'HELLO', ['user_agent' => $userAgent]);

$factory->createHelloMessage(['user_agent' => $userAgent])->send()->getResponse();
$helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, ['user_agent' => $userAgent]);

$factory->createHelloMessage($helloMetadata)->send()->getResponse();

$this->logger?->log(LogLevel::DEBUG, 'LOGON', ['scheme' => 'kerberos', 'principal' => '']);

Expand All @@ -56,7 +58,7 @@ public function authenticateBolt(BoltConnection $connection, string $userAgent):
])->send()->getResponse();

/**
* @var array{server: string, connection_id: string, hints: list}
* @var array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>}
*/
return $response->content;
}
Expand Down
12 changes: 6 additions & 6 deletions src/Authentication/NoAuth.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,31 @@ public function __construct(
/**
* @throws Exception
*
* @return array{server: string, connection_id: string, hints: list}
* @return array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>}
*/
public function authenticateBolt(BoltConnection $connection, string $userAgent): array
{
$factory = $this->createMessageFactory($connection);

if ($connection->getProtocol()->compare(ConnectionProtocol::BOLT_V5_1()) >= 0) {
$helloMetadata = ['user_agent' => $userAgent];
$helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, ['user_agent' => $userAgent]);

$factory->createHelloMessage($helloMetadata)->send()->getResponse();

$response = $factory->createLogonMessage(['scheme' => 'none'])->send()->getResponse();

/** @var array{server: string, connection_id: string, hints: list} */
/** @var array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>} */
return $response->content;
}

$helloMetadata = [
$helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, [
'user_agent' => $userAgent,
'scheme' => 'none',
];
]);

$response = $factory->createHelloMessage($helloMetadata)->send()->getResponse();

/** @var array{server: string, connection_id: string, hints: list} */
/** @var array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>} */
return $response->content;
}

Expand Down
8 changes: 5 additions & 3 deletions src/Authentication/OpenIDConnectAuth.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ public function authenticateHttp(RequestInterface $request, UriInterface $uri, s
/**
* @throws Exception
*
* @return array{server: string, connection_id: string, hints: list}
* @return array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>}
*/
public function authenticateBolt(BoltConnection $connection, string $userAgent): array
{
$factory = $this->createMessageFactory($connection);

$this->logger?->log(LogLevel::DEBUG, 'HELLO', ['user_agent' => $userAgent]);

$factory->createHelloMessage(['user_agent' => $userAgent])->send()->getResponse();
$helloMetadata = BoltHelloMetadata::withUtcPatchIfSupported($connection, ['user_agent' => $userAgent]);

$factory->createHelloMessage($helloMetadata)->send()->getResponse();

$this->logger?->log(LogLevel::DEBUG, 'LOGON', ['scheme' => 'bearer']);

Expand All @@ -61,7 +63,7 @@ public function authenticateBolt(BoltConnection $connection, string $userAgent):
])->send()->getResponse();

/**
* @var array{server: string, connection_id: string, hints: list}
* @var array{server: string, connection_id: string, hints: list, patch_bolt?: list<string>}
*/
return $response->content;
}
Expand Down
70 changes: 66 additions & 4 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use Bolt\enum\Signature;
use Bolt\error\ConnectException as BoltConnectException;
use Bolt\protocol\Response;
use Bolt\protocol\V4_2;
use Bolt\protocol\V4_3;
use Bolt\protocol\V4_4;
use Bolt\protocol\V5;
use Bolt\protocol\V5_1;
Expand Down Expand Up @@ -44,7 +46,7 @@
use WeakReference;

/**
* @implements ConnectionInterface<array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}>
* @implements ConnectionInterface<array{0: V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}>
*
* @psalm-import-type BoltMeta from SummarizedResultFormatter
*/
Expand Down Expand Up @@ -72,7 +74,21 @@ class BoltConnection implements ConnectionInterface
private ?float $originalTimeout = null;

/**
* @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}
* When one PULL yields RECORD(s) then FAILURE, {@see assertNoFailure()} runs RESET on the FAILURE before we
* can finish yielding those rows. {@see pull()} therefore defers the {@see Neo4jException} to the next
* {@see BoltResult::fetchResults()} so buffered RECORDs are delivered first (TestKit pull_2_end_error.script).
*
* This is not peek-specific: any consumption path that pulls (including {@see CypherList::peek()} calling
* {@see Iterator::current()} on a lazy {@see BoltResult}) relies on the same ordering. Official Neo4j drivers
* treat peek at end-of-stream as non-throwing (null / absent record); for a FAILURE that arrives after RECORDs
* in the same PULL response, surfacing the error only after those rows matches the same stream semantics.
*/
private ?Neo4jException $deferredPullFailure = null;

private bool $boltUtcPatchNegotiated = false;

/**
* @return array{0: V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}
*/
public function getImplementation(): array
{
Expand All @@ -83,7 +99,7 @@ public function getImplementation(): array
* @psalm-mutation-free
*/
public function __construct(
private V4_4|V5|V5_1|V5_2|V5_3|V5_4|null $boltProtocol,
private V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4|null $boltProtocol,
private readonly Connection $connection,
private readonly AuthenticateInterface $auth,
private readonly string $userAgent,
Expand Down Expand Up @@ -132,6 +148,16 @@ public function getProtocol(): ConnectionProtocol
return $this->config->getProtocol();
}

public function setBoltUtcPatchNegotiated(bool $negotiated): void
{
$this->boltUtcPatchNegotiated = $negotiated;
}

public function isBoltUtcPatchNegotiated(): bool
{
return $this->boltUtcPatchNegotiated;
}

/**
* @psalm-mutation-free
*/
Expand Down Expand Up @@ -217,6 +243,7 @@ public function consumeResults(): void
*/
public function reset(): void
{
$this->deferredPullFailure = null;
$message = $this->messageFactory->createResetMessage();
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
Expand Down Expand Up @@ -271,6 +298,7 @@ public function run(
?iterable $tsxMetadata,
): array {
$extra = $this->buildRunExtra($database, $timeout, $holder, $mode, $tsxMetadata);

$message = $this->messageFactory->createRunMessage($text, $parameters, $extra);
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
Expand All @@ -293,7 +321,7 @@ public function rollback(): void
$this->assertNoFailure($response);
}

public function protocol(): V4_4|V5|V5_1|V5_2|V5_3|V5_4
public function protocol(): V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4
{
if (!isset($this->boltProtocol)) {
throw new Exception('Connection is closed');
Expand Down Expand Up @@ -337,6 +365,23 @@ public function pull(?int $qid, ?int $fetchSize): array
return $tbr;
} catch (Throwable $e) {
$this->restoreOriginalTimeout();
if ($e instanceof Neo4jException) {
// RECORD(s) then FAILURE in one PULL: RESET already ran in assertNoFailure — return rows and
// defer the exception to the next fetchResults() so the last record is yielded first and no
// extra PULL is sent (pull_2_end_error.script).
if (!empty($tbr)) {
$this->deferredPullFailure = $e;
$tbr[] = ['has_more' => true];

/** @var non-empty-list<list> */
return $tbr;
}
throw $e;
}
// If we've received some records before the disconnect, return them so first next() succeeds.
// Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts).
// Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator
// would stop cleanly instead of surfacing the disconnect. A synthetic has_more:true means "not done".
// If we've received some records before the disconnect, return them so first next() succeeds.
// Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts).
// Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator
Expand All @@ -363,6 +408,7 @@ public function close(): void
// Other exceptions (Neo4jException, TypeError, etc.) should propagate.
try {
if ($this->isOpen()) {
$this->deferredPullFailure = null;
if ($this->isStreaming()) {
$this->discardUnconsumedResults();
}
Expand Down Expand Up @@ -390,11 +436,23 @@ public function close(): void
*/
public function invalidate(): void
{
$this->deferredPullFailure = null;
$this->subscribedResults = [];
$this->connection->disconnect();
unset($this->boltProtocol);
}

/**
* Consumes a FAILURE that was deferred from the previous {@see pull()} (RECORD(s) then FAILURE).
*/
public function takeDeferredPullFailure(): ?Neo4jException
{
$e = $this->deferredPullFailure;
$this->deferredPullFailure = null;

return $e;
}

private function buildRunExtra(?string $database, ?float $timeout, ?BookmarkHolder $holder, ?AccessMode $mode, ?iterable $metadata): array
{
$extra = [];
Expand Down Expand Up @@ -431,6 +489,10 @@ private function buildResultExtra(?int $fetchSize, ?int $qid): array
}

if ($qid !== null && $qid >= 0) {
// Always send explicit qid (including 0). Omitting qid defaults to the "current" stream; with
// multiple concurrent RUN streams in a transaction, PULL must target the correct stream or Neo4j
// returns e.g. "No such statement: N" (Neo.ClientError.Request.InvalidFormat). The Bolt library's
// openStreams counter is not reliable for this across all server versions and message orderings.
$extra['qid'] = $qid;
}

Expand Down
7 changes: 7 additions & 0 deletions src/Bolt/BoltResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ private function fetchResults(): void
{
$this->networkPullOccurred = true;

$deferred = $this->connection->takeDeferredPullFailure();
if ($deferred !== null) {
throw $deferred;
}

$this->networkPullOccurred = true;

try {
$meta = $this->connection->pull($this->qid, $this->effectivePullSize());
} catch (BoltConnectException|BoltException $e) {
Expand Down
Loading
Loading