Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 99 additions & 20 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Laudis\Neo4j;

use Bolt\error\ConnectException;
use Laudis\Neo4j\Common\DriverSetupManager;
use Laudis\Neo4j\Contracts\ClientInterface;
use Laudis\Neo4j\Contracts\DriverInterface;
Expand All @@ -23,7 +24,7 @@
use Laudis\Neo4j\Databags\Statement;
use Laudis\Neo4j\Databags\SummarizedResult;
use Laudis\Neo4j\Databags\TransactionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Exception\ConnectionPoolException;
use Laudis\Neo4j\Types\CypherList;

/**
Expand Down Expand Up @@ -100,22 +101,91 @@ private function getSession(?string $alias = null): SessionInterface
return $this->boundSessions[$alias] = $this->startSession($alias, $this->defaultSessionConfiguration);
}

/**
* Executes an operation with automatic retry on alternative drivers when connection exceptions occur.
*
* @template T
*
* @param callable(SessionInterface): T $operation The operation to execute
* @param string|null $alias The driver alias to use
*
* @throws ConnectionPoolException When all available drivers have been exhausted
*
* @return T The result of the operation
*/
private function executeWithRetry(callable $operation, ?string $alias = null, ?int $maxTries = 3)
{
$alias ??= $this->driverSetups->getDefaultAlias();
$attemptedDrivers = [];
$lastException = null;

$tries = $maxTries;

while ($tries > 0) {
try {
$driver = $this->driverSetups->getDriver($this->defaultSessionConfiguration, $alias);
Comment thread
transistive marked this conversation as resolved.

$driverHash = spl_object_hash($driver);
if (in_array($driverHash, $attemptedDrivers, true)) {
throw $lastException ?? new ConnectionPoolException('No available drivers');
}
$attemptedDrivers[] = $driverHash;

$session = $driver->createSession($this->defaultSessionConfiguration);

return $operation($session);
} catch (ConnectionPoolException|ConnectException $e) {
$lastException = $e;
$this->driverSetups->resetDriver($alias);

--$tries;
}
}

throw $lastException ?? new ConnectionPoolException('No available drivers');
}

public function runStatements(iterable $statements, ?string $alias = null): CypherList
{
$runner = $this->getRunner($alias);
if ($runner instanceof SessionInterface) {
return $runner->runStatements($statements, $this->defaultTransactionConfiguration);
$alias ??= $this->driverSetups->getDefaultAlias();

if (array_key_exists($alias, $this->boundTransactions)
&& count($this->boundTransactions[$alias]) > 0) {
$runner = $this->getRunner($alias);
if ($runner instanceof TransactionInterface) {
return $runner->runStatements($statements);
}
}

if (array_key_exists($alias, $this->boundSessions)) {
$session = $this->boundSessions[$alias];

return $session->runStatements($statements, $this->defaultTransactionConfiguration);
}

return $runner->runStatements($statements);
return $this->executeWithRetry(
function (SessionInterface $session) use ($statements) {
return $session->runStatements($statements, $this->defaultTransactionConfiguration);
},
$alias
);
}

public function beginTransaction(?iterable $statements = null, ?string $alias = null, ?TransactionConfiguration $config = null): UnmanagedTransactionInterface
{
$session = $this->getSession($alias);
$alias ??= $this->driverSetups->getDefaultAlias();
$config = $this->getTsxConfig($config);

return $session->beginTransaction($statements, $config);
if (array_key_exists($alias, $this->boundSessions)) {
return $this->boundSessions[$alias]->beginTransaction($statements, $config);
}

return $this->executeWithRetry(
function (SessionInterface $session) use ($statements, $config) {
return $session->beginTransaction($statements, $config);
},
$alias
);
}

public function getDriver(?string $alias): DriverInterface
Expand All @@ -130,27 +200,36 @@ private function startSession(?string $alias, SessionConfiguration $configuratio

public function writeTransaction(callable $tsxHandler, ?string $alias = null, ?TransactionConfiguration $config = null)
{
$accessMode = $this->defaultSessionConfiguration->getAccessMode();
if ($accessMode === null || $accessMode === AccessMode::WRITE()) {
$session = $this->getSession($alias);
} else {
$sessionConfig = $this->defaultSessionConfiguration->withAccessMode(AccessMode::WRITE());
$session = $this->startSession($alias, $sessionConfig);
$alias ??= $this->driverSetups->getDefaultAlias();
$config = $this->getTsxConfig($config);

if (array_key_exists($alias, $this->boundSessions)) {
return $this->boundSessions[$alias]->writeTransaction($tsxHandler, $config);
}

return $session->writeTransaction($tsxHandler, $this->getTsxConfig($config));
return $this->executeWithRetry(
function (SessionInterface $session) use ($tsxHandler, $config) {
return $session->writeTransaction($tsxHandler, $config);
},
$alias
);
}

public function readTransaction(callable $tsxHandler, ?string $alias = null, ?TransactionConfiguration $config = null)
{
if ($this->defaultSessionConfiguration->getAccessMode() === AccessMode::READ()) {
$session = $this->getSession($alias);
} else {
$sessionConfig = $this->defaultSessionConfiguration->withAccessMode(AccessMode::WRITE());
$session = $this->startSession($alias, $sessionConfig);
$alias ??= $this->driverSetups->getDefaultAlias();
$config = $this->getTsxConfig($config);

if (array_key_exists($alias, $this->boundSessions)) {
return $this->boundSessions[$alias]->readTransaction($tsxHandler, $config);
}

return $session->readTransaction($tsxHandler, $this->getTsxConfig($config));
return $this->executeWithRetry(
function (SessionInterface $session) use ($tsxHandler, $config) {
return $session->readTransaction($tsxHandler, $config);
},
$alias
);
}

public function transaction(callable $tsxHandler, ?string $alias = null, ?TransactionConfiguration $config = null)
Expand Down
10 changes: 10 additions & 0 deletions src/Common/DriverSetupManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ public function getDriver(SessionConfiguration $config, ?string $alias = null):
throw new RuntimeException(sprintf('Cannot connect to any server on alias: %s with Uris: (\'%s\')', $alias, implode('\', ', array_unique($urisTried))));
}

/**
* Resets the driver for the given alias, so that it will be recreated on the next call to getDriver, verifying the connection again.
*/
public function resetDriver(?string $alias): void
{
$alias ??= $this->decideAlias($alias);

unset($this->drivers[$alias]);
}

public function verifyConnectivity(SessionConfiguration $config, ?string $alias = null): bool
{
try {
Expand Down
3 changes: 0 additions & 3 deletions src/Common/SysVSemaphore.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@

class SysVSemaphore implements SemaphoreInterface
{
/**
* @psalm-suppress UndefinedClass
*/
private function __construct(
private readonly \SysvSemaphore $semaphore,
) {
Expand Down
3 changes: 2 additions & 1 deletion src/Neo4j/Neo4jDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Exception\ConnectionPoolException;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Psr\Http\Message\UriInterface;
use Psr\Log\LogLevel;
Expand Down Expand Up @@ -92,7 +93,7 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool
$config ??= SessionConfiguration::default();
try {
GeneratorHelper::getReturnFromGenerator($this->pool->acquire($config));
} catch (ConnectException $e) {
} catch (ConnectException|ConnectionPoolException $e) {
$this->pool->getLogger()?->log(LogLevel::WARNING, 'Could not connect to server on URI '.$this->parsedUrl->__toString(), ['error' => $e]);

return false;
Expand Down
55 changes: 27 additions & 28 deletions src/Neo4j/RoutingTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,49 +62,48 @@ public function getWithRole(?RoutingRoles $role = null): array
}

/**
* Remove a server from the routing table.
*
* @param string $serverAddress The address to remove
*
* @return RoutingTable A new routing table with the server removed
* Whether the given address appears in this routing table.
*/
public function removeServer(string $serverAddress): RoutingTable
public function hasServer(string $serverAddress): bool
{
/** @var list<array{addresses: list<string>, role: string}> $updatedServers */
$updatedServers = [];

foreach ($this->servers as $server) {
$updatedAddresses = array_filter(
$server['addresses'],
static fn (string $address): bool => $address !== $serverAddress
);

if (!empty($updatedAddresses)) {
$updatedServers[] = [
'addresses' => array_values($updatedAddresses),
'role' => $server['role'],
];
if (in_array($serverAddress, $server['addresses'], true)) {
return true;
}
}

return new self($updatedServers, $this->ttl);
return false;
}

/**
* Check if a server exists in the routing table.
*
* @param string $serverAddress The address to check
* Returns a new table with every occurrence of the address removed from all roles.
* If the address was not present, returns the same instance.
*
* @return bool True if the server exists, false otherwise
* @psalm-mutation-free
*/
public function hasServer(string $serverAddress): bool
public function removeServer(string $serverAddress): self
{
$changed = false;
/** @var list<array{addresses: list<string>, role: string}> $newServers */
$newServers = [];
foreach ($this->servers as $server) {
if (in_array($serverAddress, $server['addresses'], true)) {
return true;
$addresses = [];
foreach ($server['addresses'] as $address) {
if ($address === $serverAddress) {
$changed = true;
} else {
$addresses[] = $address;
}
}
if ($addresses !== []) {
$newServers[] = ['addresses' => $addresses, 'role' => $server['role']];
}
}

return false;
if (!$changed) {
return $this;
}

return new self($newServers, $this->ttl);
}
}
Loading
Loading