Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/Bolt/BoltUnmanagedTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public function __construct(
private readonly TelemetryAPIEnum $telemetryApi,
private readonly ?ConnectionPoolInterface $pool = null,
bool $beginAlreadySent = false,
private readonly ?SessionBookmarkTracker $bookmarkTracker = null,
) {
$this->beginSent = $beginAlreadySent;
}
Expand Down Expand Up @@ -173,6 +174,10 @@ public function runStatement(Statement $statement): SummarizedResult
$this->connection->consumeResults();
}

if ($this->isInstantTransaction) {
$this->bookmarkTracker?->prepareForSend(true);
}

$this->ensureBeginSent();

try {
Expand All @@ -183,7 +188,7 @@ public function runStatement(Statement $statement): SummarizedResult
$this->tsxConfig->getTimeout(),
$this->isInstantTransaction ? $this->bookmarkHolder : null, // let the begin transaction pass the bookmarks if it is a managed transaction
null, // mode is never sent in RUN messages - it comes from session configuration
$this->tsxConfig->getMetaData(),
$this->isInstantTransaction ? $this->tsxConfig->getMetaData() : null,
$this->telemetryApi,
);
} catch (Throwable $e) {
Expand Down Expand Up @@ -253,6 +258,7 @@ private function ensureBeginSent(): void
return;
}
try {
$this->bookmarkTracker?->prepareForSend(true);
$this->connection->sendTelemetryIfNeeded($this->telemetryApi);
$this->connection->begin($this->database, $this->tsxConfig->getTimeout(), $this->bookmarkHolder, $this->tsxConfig->getMetaData());
$this->beginSent = true;
Expand Down
3 changes: 2 additions & 1 deletion src/Bolt/Messages/BoltCommitMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public function getResponse(): Response
$bookmark = $content['bookmark'] ?? '';

if (trim($bookmark) !== '') {
$this->bookmarks->setBookmark(new Bookmark([$bookmark]));
// Propagate the committed bookmark to the shared BookmarkManager, not only the session holder.
$this->bookmarks->setBookmarkFromServer(new Bookmark([$bookmark]));
}

$this->connection->protocol()->serverState = ServerState::READY;
Expand Down
45 changes: 28 additions & 17 deletions src/Bolt/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
use Laudis\Neo4j\Exception\Neo4jException;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Laudis\Neo4j\Neo4j\Neo4jConnectionPool;
use Laudis\Neo4j\NoOpBookmarkManager;
use Laudis\Neo4j\Types\CypherList;
use Psr\Log\LogLevel;
use Throwable;
Expand All @@ -46,23 +47,30 @@ final class Session implements SessionInterface

/** @var list<BoltConnection> */
private array $usedConnections = [];
/** @psalm-readonly */
private readonly BookmarkHolder $bookmarkHolder;
private readonly SessionBookmarkTracker $bookmarkTracker;
private readonly SessionConfiguration $sessionConfig;

/**
* @param ConnectionPool|Neo4jConnectionPool $pool
*
* @psalm-mutation-free
*/
public function __construct(
private readonly SessionConfiguration $config,
SessionConfiguration $config,
private readonly ConnectionPoolInterface $pool,
/**
* @psalm-readonly
*/
private readonly SummarizedResultFormatter $formatter,
) {
$bookmarkManager = $config->getBookmarkManager() ?? NoOpBookmarkManager::instance();
$this->bookmarkHolder = new BookmarkHolder(Bookmark::from($config->getBookmarks()));
$this->bookmarkTracker = new SessionBookmarkTracker(
$this->bookmarkHolder,
$bookmarkManager,
$config->getBookmarks(),
);
$this->bookmarkHolder->onServerBookmark($this->bookmarkTracker->handleNewBookmark(...));
$this->sessionConfig = $config->withRoutingBookmarks($this->bookmarkTracker->getRediscoveryBookmarkValues());
}

/**
Expand Down Expand Up @@ -117,7 +125,7 @@ static function (TransactionInterface $tx) use ($statement): SummarizedResult {
return $result;
},
$this->mergeTsxConfig(null),
$this->config,
$this->sessionConfig,
TelemetryAPIEnum::EXECUTE_QUERY,
);
}
Expand All @@ -130,7 +138,7 @@ public function writeTransaction(callable $tsxHandler, ?TransactionConfiguration
return $this->retry(
$tsxHandler,
$config,
$this->config->withAccessMode(AccessMode::WRITE()),
$this->sessionConfig->withAccessMode(AccessMode::WRITE()),
TelemetryAPIEnum::TRANSACTION_FUNCTION
);
}
Expand All @@ -143,7 +151,7 @@ public function readTransaction(callable $tsxHandler, ?TransactionConfiguration
return $this->retry(
$tsxHandler,
$config,
$this->config->withAccessMode(AccessMode::READ()),
$this->sessionConfig->withAccessMode(AccessMode::READ()),
TelemetryAPIEnum::TRANSACTION_FUNCTION
);
}
Expand Down Expand Up @@ -186,7 +194,7 @@ private function retry(
$transaction = null;
if ($e->getTitle() === 'NotALeader' || $e->getNeo4jCode() === 'Neo.ClientError.Cluster.NotALeader') {
if ($this->pool instanceof Neo4jConnectionPool) {
$this->pool->clearRoutingTable($this->config);
$this->pool->clearRoutingTable($this->sessionConfig);
}
$error = $e;

Expand Down Expand Up @@ -284,7 +292,7 @@ private function executeStatementWithRetry(Statement $statement, TransactionConf

while ($retries < $maxRetries) {
try {
return $this->beginInstantTransaction($this->config, $config)->runStatement($statement);
return $this->beginInstantTransaction($this->sessionConfig, $config)->runStatement($statement);
} catch (Neo4jException $e) {
if (!$this->shouldClearRoutingTable($e)) {
throw $e;
Expand Down Expand Up @@ -332,7 +340,7 @@ public function beginTransaction(?iterable $statements = null, ?TransactionConfi
{
$this->getLogger()?->log(LogLevel::INFO, 'Beginning transaction', ['statements' => $statements, 'config' => $config]);
$config = $this->mergeTsxConfig($config);
$tsx = $this->startTransaction($config, $this->config, TelemetryAPIEnum::UNMANAGED_TRANSACTION);
$tsx = $this->startTransaction($config, $this->sessionConfig, TelemetryAPIEnum::UNMANAGED_TRANSACTION);

$tsx->runStatements($statements ?? []);

Expand All @@ -343,14 +351,14 @@ public function beginReadTransaction(?TransactionConfiguration $config = null):
{
$config = $this->mergeTsxConfig($config);

return $this->startTransaction($config, $this->config->withAccessMode(AccessMode::READ()), TelemetryAPIEnum::TRANSACTION_FUNCTION);
return $this->startTransaction($config, $this->sessionConfig->withAccessMode(AccessMode::READ()), TelemetryAPIEnum::TRANSACTION_FUNCTION);
}

public function beginWriteTransaction(?TransactionConfiguration $config = null): UnmanagedTransactionInterface
{
$config = $this->mergeTsxConfig($config);

return $this->startTransaction($config, $this->config->withAccessMode(AccessMode::WRITE()), TelemetryAPIEnum::TRANSACTION_FUNCTION);
return $this->startTransaction($config, $this->sessionConfig->withAccessMode(AccessMode::WRITE()), TelemetryAPIEnum::TRANSACTION_FUNCTION);
}

/**
Expand All @@ -367,16 +375,18 @@ private function beginInstantTransaction(
$pool = $this->pool;

return new BoltUnmanagedTransaction(
$this->config->getDatabase(),
$this->sessionConfig->getDatabase(),
$this->formatter,
$connection,
$this->config,
$this->sessionConfig,
$tsxConfig,
$this->bookmarkHolder,
new BoltMessageFactory($connection, $this->getLogger()),
true,
TelemetryAPIEnum::SESSION_RUN,
$pool,
false,
$this->bookmarkTracker,
);
}

Expand Down Expand Up @@ -419,17 +429,18 @@ private function startTransaction(TransactionConfiguration $config, SessionConfi
$pool = $this->pool;

return new BoltUnmanagedTransaction(
$this->config->getDatabase(),
$this->sessionConfig->getDatabase(),
$this->formatter,
$connection,
$this->config,
$this->sessionConfig,
$config,
$this->bookmarkHolder,
new BoltMessageFactory($connection, $this->getLogger()),
false,
$telemetryApi,
$pool,
false, // BEGIN sent on first run/commit/rollback
false,
$this->bookmarkTracker,
);
}

Expand Down
81 changes: 81 additions & 0 deletions src/Bolt/SessionBookmarkTracker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Bolt;

use Laudis\Neo4j\Contracts\BookmarkManagerInterface;
use Laudis\Neo4j\Databags\Bookmark;
use Laudis\Neo4j\Databags\BookmarkHolder;

/**
* Tracks bookmark manager state for a session, matching Java NetworkSession bookmark behaviour.
*/
final class SessionBookmarkTracker
{
/** @var list<Bookmark> */
private array $lastUsedBookmarks = [];

/** @var list<Bookmark> */
private array $lastReceivedBookmarks;

/**
* @param list<Bookmark> $sessionBookmarks
*/
public function __construct(
private readonly BookmarkHolder $bookmarkHolder,
private readonly BookmarkManagerInterface $bookmarkManager,
array $sessionBookmarks,
) {
$this->lastReceivedBookmarks = $sessionBookmarks;
$this->syncHolder(false);
}

public function prepareForSend(bool $updateLastUsed): void
{
$this->syncHolder($updateLastUsed);
}

public function handleNewBookmark(Bookmark $bookmark): void
{
if ($bookmark->isEmpty()) {
return;
}

$newBookmarks = [new Bookmark($bookmark->values())];
$this->lastReceivedBookmarks = $newBookmarks;
$this->bookmarkManager->updateBookmarks($this->lastUsedBookmarks, $newBookmarks);
}

/**
* @return list<string>
*/
public function getRediscoveryBookmarkValues(): array
{
return $this->determineBookmarks(false)->values();
}

private function syncHolder(bool $updateLastUsed): void
{
$this->bookmarkHolder->setBookmark($this->determineBookmarks($updateLastUsed));
}

private function determineBookmarks(bool $updateLastUsed): Bookmark
{
$bookmarks = $this->bookmarkManager->getBookmarks();
if ($updateLastUsed) {
$this->lastUsedBookmarks = $bookmarks;
}

return Bookmark::from([...$bookmarks, ...$this->lastReceivedBookmarks]);
}
}
36 changes: 36 additions & 0 deletions src/BookmarkManagers.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j;

use Laudis\Neo4j\Contracts\BookmarkManagerInterface;
use Laudis\Neo4j\Databags\BookmarkManagerConfig;

/**
* Creates new instances of {@see BookmarkManagerInterface}.
*/
final class BookmarkManagers
{
private function __construct()
{
}

public static function defaultManager(BookmarkManagerConfig $config): BookmarkManagerInterface
{
return new Neo4jBookmarkManager(
$config->getInitialBookmarks(),
$config->getBookmarksConsumer(),
$config->getBookmarksSupplier(),
);
}
}
39 changes: 39 additions & 0 deletions src/Contracts/BookmarkManagerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Contracts;

use Laudis\Neo4j\Databags\Bookmark;

/**
* Keeps track of bookmarks and is used by the driver to ensure causal consistency between sessions and query executions.
*
* @see SessionConfiguration::withBookmarkManager()
*/
interface BookmarkManagerInterface
{
/**
* Updates bookmarks by deleting the given previous bookmarks and adding the new bookmarks.
*
* @param list<Bookmark> $previousBookmarks
* @param list<Bookmark> $newBookmarks
*/
public function updateBookmarks(array $previousBookmarks, array $newBookmarks): void;

/**
* Gets an immutable set of bookmarks.
*
* @return list<Bookmark>
*/
public function getBookmarks(): array;
}
Loading
Loading