Skip to content

Commit 8a76b7a

Browse files
authored
Multi-Driver Failover when one of multiple hosts down (#282)
1 parent 490c2bc commit 8a76b7a

9 files changed

Lines changed: 1018 additions & 64 deletions

src/Client.php

Lines changed: 99 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
namespace Laudis\Neo4j;
1515

16+
use Bolt\error\ConnectException;
1617
use Laudis\Neo4j\Common\DriverSetupManager;
1718
use Laudis\Neo4j\Contracts\ClientInterface;
1819
use Laudis\Neo4j\Contracts\DriverInterface;
@@ -23,7 +24,7 @@
2324
use Laudis\Neo4j\Databags\Statement;
2425
use Laudis\Neo4j\Databags\SummarizedResult;
2526
use Laudis\Neo4j\Databags\TransactionConfiguration;
26-
use Laudis\Neo4j\Enum\AccessMode;
27+
use Laudis\Neo4j\Exception\ConnectionPoolException;
2728
use Laudis\Neo4j\Types\CypherList;
2829

2930
/**
@@ -100,22 +101,91 @@ private function getSession(?string $alias = null): SessionInterface
100101
return $this->boundSessions[$alias] = $this->startSession($alias, $this->defaultSessionConfiguration);
101102
}
102103

104+
/**
105+
* Executes an operation with automatic retry on alternative drivers when connection exceptions occur.
106+
*
107+
* @template T
108+
*
109+
* @param callable(SessionInterface): T $operation The operation to execute
110+
* @param string|null $alias The driver alias to use
111+
*
112+
* @throws ConnectionPoolException When all available drivers have been exhausted
113+
*
114+
* @return T The result of the operation
115+
*/
116+
private function executeWithRetry(callable $operation, ?string $alias = null, ?int $maxTries = 3)
117+
{
118+
$alias ??= $this->driverSetups->getDefaultAlias();
119+
$attemptedDrivers = [];
120+
$lastException = null;
121+
122+
$tries = $maxTries;
123+
124+
while ($tries > 0) {
125+
try {
126+
$driver = $this->driverSetups->getDriver($this->defaultSessionConfiguration, $alias);
127+
128+
$driverHash = spl_object_hash($driver);
129+
if (in_array($driverHash, $attemptedDrivers, true)) {
130+
throw $lastException ?? new ConnectionPoolException('No available drivers');
131+
}
132+
$attemptedDrivers[] = $driverHash;
133+
134+
$session = $driver->createSession($this->defaultSessionConfiguration);
135+
136+
return $operation($session);
137+
} catch (ConnectionPoolException|ConnectException $e) {
138+
$lastException = $e;
139+
$this->driverSetups->resetDriver($alias);
140+
141+
--$tries;
142+
}
143+
}
144+
145+
throw $lastException ?? new ConnectionPoolException('No available drivers');
146+
}
147+
103148
public function runStatements(iterable $statements, ?string $alias = null): CypherList
104149
{
105-
$runner = $this->getRunner($alias);
106-
if ($runner instanceof SessionInterface) {
107-
return $runner->runStatements($statements, $this->defaultTransactionConfiguration);
150+
$alias ??= $this->driverSetups->getDefaultAlias();
151+
152+
if (array_key_exists($alias, $this->boundTransactions)
153+
&& count($this->boundTransactions[$alias]) > 0) {
154+
$runner = $this->getRunner($alias);
155+
if ($runner instanceof TransactionInterface) {
156+
return $runner->runStatements($statements);
157+
}
158+
}
159+
160+
if (array_key_exists($alias, $this->boundSessions)) {
161+
$session = $this->boundSessions[$alias];
162+
163+
return $session->runStatements($statements, $this->defaultTransactionConfiguration);
108164
}
109165

110-
return $runner->runStatements($statements);
166+
return $this->executeWithRetry(
167+
function (SessionInterface $session) use ($statements) {
168+
return $session->runStatements($statements, $this->defaultTransactionConfiguration);
169+
},
170+
$alias
171+
);
111172
}
112173

113174
public function beginTransaction(?iterable $statements = null, ?string $alias = null, ?TransactionConfiguration $config = null): UnmanagedTransactionInterface
114175
{
115-
$session = $this->getSession($alias);
176+
$alias ??= $this->driverSetups->getDefaultAlias();
116177
$config = $this->getTsxConfig($config);
117178

118-
return $session->beginTransaction($statements, $config);
179+
if (array_key_exists($alias, $this->boundSessions)) {
180+
return $this->boundSessions[$alias]->beginTransaction($statements, $config);
181+
}
182+
183+
return $this->executeWithRetry(
184+
function (SessionInterface $session) use ($statements, $config) {
185+
return $session->beginTransaction($statements, $config);
186+
},
187+
$alias
188+
);
119189
}
120190

121191
public function getDriver(?string $alias): DriverInterface
@@ -130,27 +200,36 @@ private function startSession(?string $alias, SessionConfiguration $configuratio
130200

131201
public function writeTransaction(callable $tsxHandler, ?string $alias = null, ?TransactionConfiguration $config = null)
132202
{
133-
$accessMode = $this->defaultSessionConfiguration->getAccessMode();
134-
if ($accessMode === null || $accessMode === AccessMode::WRITE()) {
135-
$session = $this->getSession($alias);
136-
} else {
137-
$sessionConfig = $this->defaultSessionConfiguration->withAccessMode(AccessMode::WRITE());
138-
$session = $this->startSession($alias, $sessionConfig);
203+
$alias ??= $this->driverSetups->getDefaultAlias();
204+
$config = $this->getTsxConfig($config);
205+
206+
if (array_key_exists($alias, $this->boundSessions)) {
207+
return $this->boundSessions[$alias]->writeTransaction($tsxHandler, $config);
139208
}
140209

141-
return $session->writeTransaction($tsxHandler, $this->getTsxConfig($config));
210+
return $this->executeWithRetry(
211+
function (SessionInterface $session) use ($tsxHandler, $config) {
212+
return $session->writeTransaction($tsxHandler, $config);
213+
},
214+
$alias
215+
);
142216
}
143217

144218
public function readTransaction(callable $tsxHandler, ?string $alias = null, ?TransactionConfiguration $config = null)
145219
{
146-
if ($this->defaultSessionConfiguration->getAccessMode() === AccessMode::READ()) {
147-
$session = $this->getSession($alias);
148-
} else {
149-
$sessionConfig = $this->defaultSessionConfiguration->withAccessMode(AccessMode::WRITE());
150-
$session = $this->startSession($alias, $sessionConfig);
220+
$alias ??= $this->driverSetups->getDefaultAlias();
221+
$config = $this->getTsxConfig($config);
222+
223+
if (array_key_exists($alias, $this->boundSessions)) {
224+
return $this->boundSessions[$alias]->readTransaction($tsxHandler, $config);
151225
}
152226

153-
return $session->readTransaction($tsxHandler, $this->getTsxConfig($config));
227+
return $this->executeWithRetry(
228+
function (SessionInterface $session) use ($tsxHandler, $config) {
229+
return $session->readTransaction($tsxHandler, $config);
230+
},
231+
$alias
232+
);
154233
}
155234

156235
public function transaction(callable $tsxHandler, ?string $alias = null, ?TransactionConfiguration $config = null)

src/Common/DriverSetupManager.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,16 @@ public function getDriver(SessionConfiguration $config, ?string $alias = null):
136136
throw new RuntimeException(sprintf('Cannot connect to any server on alias: %s with Uris: (\'%s\')', $alias, implode('\', ', array_unique($urisTried))));
137137
}
138138

139+
/**
140+
* Resets the driver for the given alias, so that it will be recreated on the next call to getDriver, verifying the connection again.
141+
*/
142+
public function resetDriver(?string $alias): void
143+
{
144+
$alias ??= $this->decideAlias($alias);
145+
146+
unset($this->drivers[$alias]);
147+
}
148+
139149
public function verifyConnectivity(SessionConfiguration $config, ?string $alias = null): bool
140150
{
141151
try {

src/Common/SysVSemaphore.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@
2929

3030
class SysVSemaphore implements SemaphoreInterface
3131
{
32-
/**
33-
* @psalm-suppress UndefinedClass
34-
*/
3532
private function __construct(
3633
private readonly \SysvSemaphore $semaphore,
3734
) {

src/Neo4j/Neo4jDriver.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
use Laudis\Neo4j\Databags\ServerInfo;
3232
use Laudis\Neo4j\Databags\SessionConfiguration;
3333
use Laudis\Neo4j\Enum\AccessMode;
34+
use Laudis\Neo4j\Exception\ConnectionPoolException;
3435
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
3536
use Psr\Http\Message\UriInterface;
3637
use Psr\Log\LogLevel;
@@ -92,7 +93,7 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool
9293
$config ??= SessionConfiguration::default();
9394
try {
9495
GeneratorHelper::getReturnFromGenerator($this->pool->acquire($config));
95-
} catch (ConnectException $e) {
96+
} catch (ConnectException|ConnectionPoolException $e) {
9697
$this->pool->getLogger()?->log(LogLevel::WARNING, 'Could not connect to server on URI '.$this->parsedUrl->__toString(), ['error' => $e]);
9798

9899
return false;

src/Neo4j/RoutingTable.php

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -62,49 +62,48 @@ public function getWithRole(?RoutingRoles $role = null): array
6262
}
6363

6464
/**
65-
* Remove a server from the routing table.
66-
*
67-
* @param string $serverAddress The address to remove
68-
*
69-
* @return RoutingTable A new routing table with the server removed
65+
* Whether the given address appears in this routing table.
7066
*/
71-
public function removeServer(string $serverAddress): RoutingTable
67+
public function hasServer(string $serverAddress): bool
7268
{
73-
/** @var list<array{addresses: list<string>, role: string}> $updatedServers */
74-
$updatedServers = [];
75-
7669
foreach ($this->servers as $server) {
77-
$updatedAddresses = array_filter(
78-
$server['addresses'],
79-
static fn (string $address): bool => $address !== $serverAddress
80-
);
81-
82-
if (!empty($updatedAddresses)) {
83-
$updatedServers[] = [
84-
'addresses' => array_values($updatedAddresses),
85-
'role' => $server['role'],
86-
];
70+
if (in_array($serverAddress, $server['addresses'], true)) {
71+
return true;
8772
}
8873
}
8974

90-
return new self($updatedServers, $this->ttl);
75+
return false;
9176
}
9277

9378
/**
94-
* Check if a server exists in the routing table.
95-
*
96-
* @param string $serverAddress The address to check
79+
* Returns a new table with every occurrence of the address removed from all roles.
80+
* If the address was not present, returns the same instance.
9781
*
98-
* @return bool True if the server exists, false otherwise
82+
* @psalm-mutation-free
9983
*/
100-
public function hasServer(string $serverAddress): bool
84+
public function removeServer(string $serverAddress): self
10185
{
86+
$changed = false;
87+
/** @var list<array{addresses: list<string>, role: string}> $newServers */
88+
$newServers = [];
10289
foreach ($this->servers as $server) {
103-
if (in_array($serverAddress, $server['addresses'], true)) {
104-
return true;
90+
$addresses = [];
91+
foreach ($server['addresses'] as $address) {
92+
if ($address === $serverAddress) {
93+
$changed = true;
94+
} else {
95+
$addresses[] = $address;
96+
}
97+
}
98+
if ($addresses !== []) {
99+
$newServers[] = ['addresses' => $addresses, 'role' => $server['role']];
105100
}
106101
}
107102

108-
return false;
103+
if (!$changed) {
104+
return $this;
105+
}
106+
107+
return new self($newServers, $this->ttl);
109108
}
110109
}

0 commit comments

Comments
 (0)