Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ foreach ($results as $result) {
}
```

`peek()` returns the next row without moving the cursor (or `null` if there is none). It can pull the next record from the server into the buffer if needed, but does not advance past it—unlike `next()` in a `foreach`, which always consumes. Repeated calls to `peek()` return the same value until you advance the iterator (for example with `next()` or by continuing a `foreach`).

Cypher values and types map to these php types and classes:

| Cypher | Php |
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"psr/http-factory": "^1.0",
"psr/http-client": "^1.0",
"php-http/message": "^1.0",
"stefanak-michal/bolt": "^7.1.4",
"stefanak-michal/bolt": "^7.4",
"symfony/polyfill-php80": "^1.2",
"psr/simple-cache": ">=2.0",
"ext-json": "*",
Expand Down
48 changes: 48 additions & 0 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ class BoltConnection implements ConnectionInterface

private ?float $originalTimeout = null;

/**
* When one PULL yields RECORD(s) then FAILURE, {@see assertNoFailure()} runs RESET on the FAILURE before we
* can finish yielding those rows. {@see pull()} therefore defers the {@see Neo4jException} to the next
* {@see BoltResult::fetchResults()} so buffered RECORDs are delivered first (TestKit pull_2_end_error.script).
*
* This is not peek-specific: any consumption path that pulls (including {@see CypherList::peek()} calling
* {@see Iterator::current()} on a lazy {@see BoltResult}) relies on the same ordering. Official Neo4j drivers
* treat peek at end-of-stream as non-throwing (null / absent record); for a FAILURE that arrives after RECORDs
* in the same PULL response, surfacing the error only after those rows matches the same stream semantics.
*/
private ?Neo4jException $deferredPullFailure = null;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because of peek? Can you confirm peek is @nothrow against other drivers too? Mention it in the comment here

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — updated the $deferredPullFailure docblock: it’s not peek-only (RECORD+FAILURE in one PULL); peek uses the same path via lazy current(); end-of-stream peek is non-throwing (null), matching official drivers.


/**
* @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}
*/
Expand Down Expand Up @@ -217,6 +229,7 @@ public function consumeResults(): void
*/
public function reset(): void
{
$this->deferredPullFailure = null;
$message = $this->messageFactory->createResetMessage();
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
Expand Down Expand Up @@ -271,6 +284,7 @@ public function run(
?iterable $tsxMetadata,
): array {
$extra = $this->buildRunExtra($database, $timeout, $holder, $mode, $tsxMetadata);

$message = $this->messageFactory->createRunMessage($text, $parameters, $extra);
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
Expand Down Expand Up @@ -337,6 +351,23 @@ public function pull(?int $qid, ?int $fetchSize): array
return $tbr;
} catch (Throwable $e) {
$this->restoreOriginalTimeout();
if ($e instanceof Neo4jException) {
// RECORD(s) then FAILURE in one PULL: RESET already ran in assertNoFailure — return rows and
// defer the exception to the next fetchResults() so the last record is yielded first and no
// extra PULL is sent (pull_2_end_error.script).
if (!empty($tbr)) {
$this->deferredPullFailure = $e;
$tbr[] = ['has_more' => true];

/** @var non-empty-list<list> */
return $tbr;
}
throw $e;
}
// If we've received some records before the disconnect, return them so first next() succeeds.
// Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts).
// Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator
// would stop cleanly instead of surfacing the disconnect. A synthetic has_more:true means "not done".
// If we've received some records before the disconnect, return them so first next() succeeds.
// Second next() must pull again and fail with a connection error (TestKit exit_after_record scripts).
// Do not append []: BoltResult treats trailing empty SUCCESS as stream completion, so the iterator
Expand All @@ -363,6 +394,7 @@ public function close(): void
// Other exceptions (Neo4jException, TypeError, etc.) should propagate.
try {
if ($this->isOpen()) {
$this->deferredPullFailure = null;
if ($this->isStreaming()) {
$this->discardUnconsumedResults();
}
Expand Down Expand Up @@ -390,11 +422,23 @@ public function close(): void
*/
public function invalidate(): void
{
$this->deferredPullFailure = null;
$this->subscribedResults = [];
$this->connection->disconnect();
unset($this->boltProtocol);
}

/**
* Consumes a FAILURE that was deferred from the previous {@see pull()} (RECORD(s) then FAILURE).
*/
public function takeDeferredPullFailure(): ?Neo4jException
{
$e = $this->deferredPullFailure;
$this->deferredPullFailure = null;

return $e;
}

private function buildRunExtra(?string $database, ?float $timeout, ?BookmarkHolder $holder, ?AccessMode $mode, ?iterable $metadata): array
{
$extra = [];
Expand Down Expand Up @@ -431,6 +475,10 @@ private function buildResultExtra(?int $fetchSize, ?int $qid): array
}

if ($qid !== null && $qid >= 0) {
// Always send explicit qid (including 0). Omitting qid defaults to the "current" stream; with
// multiple concurrent RUN streams in a transaction, PULL must target the correct stream or Neo4j
// returns e.g. "No such statement: N" (Neo.ClientError.Request.InvalidFormat). The Bolt library's
// openStreams counter is not reliable for this across all server versions and message orderings.
$extra['qid'] = $qid;
}

Expand Down
7 changes: 7 additions & 0 deletions src/Bolt/BoltResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ private function fetchResults(): void
{
$this->networkPullOccurred = true;

$deferred = $this->connection->takeDeferredPullFailure();
if ($deferred !== null) {
throw $deferred;
}

$this->networkPullOccurred = true;

try {
$meta = $this->connection->pull($this->qid, $this->effectivePullSize());
} catch (BoltConnectException|BoltException $e) {
Expand Down
32 changes: 28 additions & 4 deletions src/Bolt/BoltUnmanagedTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace Laudis\Neo4j\Bolt;

use Bolt\enum\ServerState;
use Laudis\Neo4j\Contracts\ConnectionPoolInterface;
use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface;
use Laudis\Neo4j\Databags\BookmarkHolder;
Expand Down Expand Up @@ -86,6 +85,8 @@ public function commit(iterable $statements = []): CypherList

$this->ensureBeginSent();

$this->connection->consumeResults();

// Force the results to pull all the results.
// After a commit, the connection will be in the ready state, making it impossible to use PULL
$tbr = $this->runStatements($statements)->each(static function (CypherList $list) {
Expand All @@ -101,6 +102,11 @@ public function commit(iterable $statements = []): CypherList
public function rollback(): void
{
if ($this->isFinished()) {
if ($this->state === TransactionState::TERMINATED) {
// Run/pull already failed; connection may have been RESET — nothing to send.
return;
}

if ($this->state === TransactionState::COMMITTED) {
throw new TransactionException("Can't rollback a committed transaction.");
}
Expand All @@ -112,6 +118,14 @@ public function rollback(): void

$this->ensureBeginSent();

// FAILURE on PULL triggers RESET in {@see BoltConnection::assertNoFailure()}; server has no open tx.
// TestKit stubs (e.g. tx_error_on_pull.script) expect no ROLLBACK after that RESET.
if ($this->connection->getServerState() === 'READY') {
$this->state = TransactionState::ROLLED_BACK;

return;
}

$this->messageFactory->createRollbackMessage()->send();
$this->state = TransactionState::ROLLED_BACK;
}
Expand Down Expand Up @@ -146,8 +160,10 @@ public function runStatement(Statement $statement): SummarizedResult
$parameters = ParameterHelper::formatParameters($statement->getParameters(), $this->connection->getProtocol());
$start = microtime(true);

$serverState = $this->connection->protocol()->serverState;
if ($serverState === ServerState::STREAMING) {
// Only drain an outstanding autocommit result (STREAMING). In an explicit transaction (TX_STREAMING)
// several RUN streams may be open; consumeResults() would preload other streams and reorder PULLs
// vs RUN (TestKit tx_pull_1_nested*, Neo4j parallel/nested tx tests).
if ($this->connection->getServerState() === 'STREAMING') {
$this->connection->consumeResults();
}

Expand Down Expand Up @@ -217,7 +233,15 @@ public function isFinished(): bool

private function ensureBeginSent(): void
{
if ($this->isInstantTransaction || $this->beginSent) {
if ($this->isInstantTransaction) {
return;
}
// FAILURE on PULL triggers RESET in BoltConnection — server is READY with no tx, but we may still
// have beginSent=true (e.g. execute_read retry). Must send BEGIN again before RUN.
if ($this->beginSent && $this->state === TransactionState::ACTIVE && $this->connection->getServerState() === 'READY') {
$this->beginSent = false;
}
if ($this->beginSent) {
return;
}
try {
Expand Down
11 changes: 3 additions & 8 deletions src/Bolt/ConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,10 @@ public function acquire(SessionConfiguration $config): Generator

public function release(ConnectionInterface $connection): void
{
// Return a permit only — keep the connection in {@see $activeConnections} so it stays
// pooled for reuse and is still closed by {@see close()}. Removing it here orphaned
// sockets (no GOODBYE on driver close), which breaks TestKit stubs after errors.
$this->semaphore->post();

foreach ($this->activeConnections as $i => $activeConnection) {
if ($connection === $activeConnection) {
array_splice($this->activeConnections, $i, 1);

return;
}
}
}

public function getLogger(): ?Neo4jLogger
Expand Down
3 changes: 1 addition & 2 deletions src/Bolt/ProtocolFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ public function createProtocol(IConnection $connection): V4_4|V5|V5_1|V5_2|V5_3|
}

$bolt = new Bolt($connection);
// Offer protocol versions from newest to oldest (only 4.4 and above are supported)
$bolt->setProtocolVersions('5.4.4', 4.4);
$protocol = $bolt->build();

if (!($protocol instanceof V4_4 || $protocol instanceof V5 || $protocol instanceof V5_1 || $protocol instanceof V5_2 || $protocol instanceof V5_3 || $protocol instanceof V5_4)) {
throw new RuntimeException('Client only supports bolt version 4.4 to 5.4');
throw new RuntimeException('Client only supports Bolt protocol 4.4 through 5.4');
}

return $protocol;
Expand Down
2 changes: 1 addition & 1 deletion src/BoltFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use Laudis\Neo4j\Enum\SocketType;

/**
* Small wrapper around the bolt library to easily guarantee only bolt version 3 and up will be created and authenticated.
* Small wrapper around the bolt library to create and authenticate Bolt connections (protocol 4.4+).
*/
class BoltFactory
{
Expand Down
3 changes: 1 addition & 2 deletions src/Enum/ConnectionProtocol.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace Laudis\Neo4j\Enum;

use Bolt\protocol\V3;
use Bolt\protocol\V4;
use Bolt\protocol\V4_1;
use Bolt\protocol\V4_2;
Expand Down Expand Up @@ -67,7 +66,7 @@ final class ConnectionProtocol extends TypedEnum implements JsonSerializable
*
* @psalm-suppress ImpureMethodCall
*/
public static function determineBoltVersion(V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $bolt): self
public static function determineBoltVersion(V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $bolt): self
{
$version = self::resolve($bolt->getVersion());

Expand Down
6 changes: 3 additions & 3 deletions src/Formatter/Specialised/BoltOGMTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,13 @@ private function makeFromBoltPath(BoltPath $path): Path
foreach ($rels as $rel) {
$relationships[] = $this->makeFromBoltUnboundRelationship($rel);
}
/** @var list<int> $ids */
$ids = $path->ids;
/** @var list<int> $indices */
$indices = $path->indices;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this backwards compatible? What happens in a Neo4j 4.4 setting?
Another option is that it is already handled in the bolt library; if so, we need to ensure the minimum version enforces that the Path class has actual indices. Otherwise, this package will break for users.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s backwards compatible because our minimum dependency is stefanak-michal/bolt:^7.4, where Path has indices and ids is just a deprecated alias to the same array. So Neo4j 4.4 isn’t an issue here; the only risk would be someone running an older Bolt lib than our constraint. If you want extra safety, we can fall back to ids when indices isn’t present


return new Path(
new CypherList($nodes),
new CypherList($relationships),
new CypherList($ids),
new CypherList($indices),
);
}

Expand Down
18 changes: 18 additions & 0 deletions src/Types/CypherList.php
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,24 @@ public function key(): int
return $this->cacheKey();
}

/**
* Returns the next record without consuming it (the iterator position is unchanged).
*
* This may pull and buffer the next row from the server when the result is lazy, the same as
* {@see current()} on a non-empty result. When there is no next record, returns `null` without
* erroring; calling {@see current()} on an exhausted result is not safe.
*
* @return TValue|null
*/
public function peek()
{
if (!$this->valid()) {
return null;
}

return $this->current();
}

/**
* @return array<int, TValue>
*/
Expand Down
2 changes: 1 addition & 1 deletion testkit
2 changes: 1 addition & 1 deletion testkit-backend/features.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
// notified when the server reports a token expired.
'Feature:Auth:Managed' => false,
// The driver supports Bolt protocol version 3
'Feature:Bolt:3.0' => true,
'Feature:Bolt:3.0' => false,
// The driver supports Bolt protocol version 4.1
'Feature:Bolt:4.1' => true,
// The driver supports Bolt protocol version 4.2
Expand Down
14 changes: 10 additions & 4 deletions testkit-backend/src/Handlers/ResultList.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,28 @@ public function handle($request): TestkitResponseInterface

return new RecordListResponse($rows);
} catch (Neo4jException $e) {
$this->repository->removeRecords($request->getResultId());
$response = new DriverErrorResponse($request->getResultId(), $e);
// Keep error for RetryableNegative lookup by result id (execute_read tx_func tests).
$this->repository->addRecords($request->getResultId(), $response);

return new DriverErrorResponse($request->getResultId(), $e);
return $response;
} catch (BoltException $e) {
$this->repository->removeRecords($request->getResultId());
$neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage());
$wrapped = new Neo4jException([$neo4jError], $e);
$response = new DriverErrorResponse($request->getResultId(), $wrapped);
$this->repository->addRecords($request->getResultId(), $response);

return new DriverErrorResponse($request->getResultId(), $wrapped);
return $response;
} catch (Throwable $e) {
$this->repository->removeRecords($request->getResultId());
if ($this->isConnectionOrSocketError($e)) {
$neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage());
$wrapped = new Neo4jException([$neo4jError], $e);
$response = new DriverErrorResponse($request->getResultId(), $wrapped);
$this->repository->addRecords($request->getResultId(), $response);

return new DriverErrorResponse($request->getResultId(), $wrapped);
return $response;
}
throw $e;
}
Expand Down
15 changes: 9 additions & 6 deletions testkit-backend/src/Handlers/ResultNext.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,25 @@ public function handle($request): TestkitResponseInterface

return new RecordResponse($values);
} catch (Neo4jException $e) {
$this->repository->removeRecords($request->getResultId());
$response = new DriverErrorResponse($request->getResultId(), $e);
$this->repository->addRecords($request->getResultId(), $response);

return new DriverErrorResponse($request->getResultId(), $e);
return $response;
} catch (BoltException $e) {
$this->repository->removeRecords($request->getResultId());
$neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage());
$wrapped = new Neo4jException([$neo4jError], $e);
$response = new DriverErrorResponse($request->getResultId(), $wrapped);
$this->repository->addRecords($request->getResultId(), $response);

return new DriverErrorResponse($request->getResultId(), $wrapped);
return $response;
} catch (Throwable $e) {
$this->repository->removeRecords($request->getResultId());
if ($this->isConnectionOrSocketError($e)) {
$neo4jError = Neo4jError::fromMessageAndCode('Neo.ClientError.General.ConnectionError', $e->getMessage());
$wrapped = new Neo4jException([$neo4jError], $e);
$response = new DriverErrorResponse($request->getResultId(), $wrapped);
$this->repository->addRecords($request->getResultId(), $response);

return new DriverErrorResponse($request->getResultId(), $wrapped);
return $response;
}
throw $e;
}
Expand Down
Loading
Loading