Skip to content

Commit 490c2bc

Browse files
authored
Fix disconnect and transaction timeout handling for TestKit + transaction optimisations
Fix disconnect and timeout handling; align with TestKit - Properly handle server disconnects and transaction timeouts, surfacing correct errors - Defer BEGIN until first run/commit/rollback to match expected behavior - Distinguish server errors vs connection errors in pull(): - server errors are rethrown - connection errors may return partial results - Ensure transaction timeouts are not swallowed - Simplify error handling and centralize connection invalidation
1 parent fa86072 commit 490c2bc

24 files changed

Lines changed: 479 additions & 59 deletions

src/Bolt/BoltConnection.php

Lines changed: 87 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
use Laudis\Neo4j\Contracts\ConnectionInterface;
3131
use Laudis\Neo4j\Databags\BookmarkHolder;
3232
use Laudis\Neo4j\Databags\DatabaseInfo;
33+
use Laudis\Neo4j\Databags\DriverConfiguration;
3334
use Laudis\Neo4j\Databags\Neo4jError;
3435
use Laudis\Neo4j\Enum\AccessMode;
3536
use Laudis\Neo4j\Enum\ConnectionProtocol;
@@ -66,6 +67,10 @@ class BoltConnection implements ConnectionInterface
6667
*/
6768
private array $subscribedResults = [];
6869

70+
private ?float $recvTimeoutHint = null;
71+
72+
private ?float $originalTimeout = null;
73+
6974
/**
7075
* @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}
7176
*/
@@ -85,6 +90,7 @@ public function __construct(
8590
/** @psalm-readonly */
8691
private readonly ConnectionConfiguration $config,
8792
private readonly ?Neo4jLogger $logger,
93+
private readonly float $defaultRecvTimeout = DriverConfiguration::DEFAULT_SOCKET_TIMEOUT,
8894
) {
8995
$this->messageFactory = new BoltMessageFactory($this, $this->logger);
9096
}
@@ -171,7 +177,17 @@ public function isStreaming(): bool
171177

172178
public function setTimeout(float $timeout): void
173179
{
174-
$this->connection->setTimeout($timeout);
180+
// Only set timeout if connection is still open
181+
// This prevents errors when trying to set timeout on a closed socket
182+
// Connection::setTimeout swallows errors on closed connections (cleanup scenario)
183+
if ($this->isOpen()) {
184+
$this->connection->setTimeout($timeout);
185+
}
186+
}
187+
188+
public function getTimeout(): float
189+
{
190+
return $this->connection->getTimeout();
175191
}
176192

177193
public function consumeResults(): void
@@ -300,13 +316,36 @@ public function pull(?int $qid, ?int $fetchSize): array
300316
$tbr = [];
301317
$message = $this->messageFactory->createPullMessage($extra);
302318

303-
foreach ($message->send()->getResponses() as $response) {
304-
$this->assertNoFailure($response);
305-
$tbr[] = $response->content;
306-
}
319+
try {
320+
// Apply timeout before iterating to ensure disconnects are detected
321+
$this->applyRecvTimeoutTemporarily();
322+
323+
// If no timeout hint is set, apply a default timeout to prevent hanging on disconnect.
324+
if ($this->originalTimeout === null && $this->recvTimeoutHint === null) {
325+
$this->originalTimeout = $this->connection->getTimeout();
326+
$this->connection->setTimeout($this->defaultRecvTimeout);
327+
}
307328

308-
/** @var non-empty-list<list> */
309-
return $tbr;
329+
foreach ($message->send()->getResponses() as $response) {
330+
$this->assertNoFailure($response);
331+
$tbr[] = $response->content;
332+
}
333+
334+
$this->restoreOriginalTimeout();
335+
336+
/** @var non-empty-list<list> */
337+
return $tbr;
338+
} catch (Throwable $e) {
339+
$this->restoreOriginalTimeout();
340+
// If we've received some records before the disconnect, return them so first next() succeeds and second next() fails.
341+
if (!empty($tbr)) {
342+
$tbr[] = [];
343+
344+
/** @var non-empty-list<list> */
345+
return $tbr;
346+
}
347+
throw $e;
348+
}
310349
}
311350

312351
public function __destruct()
@@ -466,4 +505,45 @@ public function discardUnconsumedResults(): void
466505
$this->subscribedResults = [];
467506
}
468507
}
508+
509+
public function setRecvTimeoutHint(?float $timeout): void
510+
{
511+
$this->recvTimeoutHint = $timeout;
512+
}
513+
514+
public function getRecvTimeoutHint(): ?float
515+
{
516+
return $this->recvTimeoutHint;
517+
}
518+
519+
public function applyRecvTimeoutTemporarily(): void
520+
{
521+
if ($this->recvTimeoutHint !== null && $this->originalTimeout === null) {
522+
$this->originalTimeout = $this->connection->getTimeout();
523+
$this->connection->setTimeout($this->recvTimeoutHint);
524+
}
525+
}
526+
527+
public function restoreOriginalTimeout(): void
528+
{
529+
if ($this->originalTimeout !== null) {
530+
$this->connection->setTimeout($this->originalTimeout);
531+
$this->originalTimeout = null;
532+
}
533+
}
534+
535+
public function getOriginalTimeout(): ?float
536+
{
537+
return $this->originalTimeout;
538+
}
539+
540+
public function getDefaultRecvTimeout(): float
541+
{
542+
return $this->defaultRecvTimeout;
543+
}
544+
545+
public function setOriginalTimeout(?float $timeout): void
546+
{
547+
$this->originalTimeout = $timeout;
548+
}
469549
}

src/Bolt/BoltResult.php

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
use function array_splice;
1717

18+
use Bolt\error\BoltException;
1819
use Bolt\error\ConnectException as BoltConnectException;
1920

2021
use function count;
@@ -25,6 +26,8 @@
2526

2627
use Iterator;
2728
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
29+
use RuntimeException;
30+
use Throwable;
2831

2932
/**
3033
* @psalm-import-type BoltCypherStats from SummarizedResultFormatter
@@ -85,10 +88,20 @@ public function iterator(): Generator
8588
yield $i => $row;
8689
++$i;
8790
}
91+
// If meta is set to empty array (normal completion with no records), exit immediately
92+
if ($this->meta === []) {
93+
break;
94+
}
8895
}
8996

90-
foreach ($this->finishedCallbacks as $finishedCallback) {
91-
$finishedCallback($this->meta);
97+
$meta = $this->meta;
98+
// Finished callbacks are callable(array): void and read summary keys (e.g. db, bookmark); passing null
99+
// would error at runtime. We only run them when we have a real completion summary—if meta is still null
100+
// (e.g. partial pull / more fetches pending), there is nothing to hand to the callback yet.
101+
if ($meta !== null) {
102+
foreach ($this->finishedCallbacks as $finishedCallback) {
103+
$finishedCallback($meta);
104+
}
92105
}
93106
}
94107

@@ -105,20 +118,49 @@ private function fetchResults(): void
105118
{
106119
try {
107120
$meta = $this->connection->pull($this->qid, $this->fetchSize);
108-
} catch (BoltConnectException $e) {
121+
} catch (BoltConnectException|BoltException $e) {
109122
// Invalidate connection on socket/network errors so pool does not reuse it.
110123
// Rethrow as-is - Session retry logic inspects the actual exception via isConnectionError().
111124
$this->connection->invalidate();
112125
throw $e;
113126
}
127+
// Neo4jException and other Throwable propagate naturally - no invalidate needed for server errors
128+
129+
// Safety check: ensure pull response $meta is not empty (pull() is typed non-empty-list but we defend against empty)
130+
/** @psalm-suppress TypeDoesNotContainType */
131+
if (empty($meta)) {
132+
throw new RuntimeException('Empty response from server');
133+
}
114134

115135
/** @var list<list> $rows */
116136
$rows = array_splice($meta, 0, count($meta) - 1);
117137
$this->rows = $rows;
118138

119139
/** @var array{0: array} $meta */
120-
if (!array_key_exists('has_more', $meta[0]) || $meta[0]['has_more'] === false) {
121-
$this->meta = $meta[0];
140+
// Check if we have a valid summary
141+
/** @psalm-suppress RedundantConditionGivenDocblockType */
142+
if (count($meta) > 0 && is_array($meta[0])) {
143+
// If summary is empty array and we have no rows, it's a normal completion (no records)
144+
// If summary is empty array but we have rows, it's a partial pull from disconnect
145+
if (empty($meta[0]) && empty($rows)) {
146+
// Normal completion with no records - mark as complete
147+
$this->meta = [];
148+
} elseif (!empty($meta[0])) {
149+
// Valid summary with data
150+
if (!array_key_exists('has_more', $meta[0]) || $meta[0]['has_more'] === false) {
151+
$this->meta = $meta[0];
152+
}
153+
} else {
154+
// Empty summary but we have rows - partial result from disconnect
155+
// Set $this->meta to null so the next fetchResults() will try to pull again
156+
// This allows the first record to be consumed, and the next fetch will fail
157+
// which is the expected behavior for tests like exit_after_record
158+
$this->meta = null;
159+
}
160+
} else {
161+
// No summary received (connection closed before summary)
162+
// Set $this->meta to null so the next fetchResults() will try to pull again
163+
$this->meta = null;
122164
}
123165
}
124166

@@ -166,10 +208,9 @@ public function discard(): void
166208
{
167209
try {
168210
$this->connection->discard($this->qid === -1 ? null : $this->qid);
169-
} catch (BoltConnectException $e) {
211+
} catch (BoltConnectException|BoltException $e) {
170212
// Connection already broken if DISCARD fails. Invalidate to prevent pool from reusing it.
171213
// Don't rethrow: this is called from __destruct() where exceptions don't propagate properly.
172-
// Connection will be detected as broken on next operation when pool tries to reuse it.
173214
$this->connection->invalidate();
174215
}
175216
}

src/Bolt/BoltUnmanagedTransaction.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
final class BoltUnmanagedTransaction implements UnmanagedTransactionInterface
4040
{
4141
private TransactionState $state = TransactionState::ACTIVE;
42+
private bool $beginSent = false;
4243

4344
public function __construct(
4445
/** @psalm-readonly */
@@ -55,7 +56,9 @@ public function __construct(
5556
private readonly BoltMessageFactory $messageFactory,
5657
private readonly bool $isInstantTransaction,
5758
private readonly ?ConnectionPoolInterface $pool = null,
59+
bool $beginAlreadySent = false,
5860
) {
61+
$this->beginSent = $beginAlreadySent;
5962
}
6063

6164
/**
@@ -81,6 +84,8 @@ public function commit(iterable $statements = []): CypherList
8184
}
8285
}
8386

87+
$this->ensureBeginSent();
88+
8489
// Force the results to pull all the results.
8590
// After a commit, the connection will be in the ready state, making it impossible to use PULL
8691
$tbr = $this->runStatements($statements)->each(static function (CypherList $list) {
@@ -105,6 +110,8 @@ public function rollback(): void
105110
}
106111
}
107112

113+
$this->ensureBeginSent();
114+
108115
$this->messageFactory->createRollbackMessage()->send();
109116
$this->state = TransactionState::ROLLED_BACK;
110117
}
@@ -144,6 +151,8 @@ public function runStatement(Statement $statement): SummarizedResult
144151
$this->connection->consumeResults();
145152
}
146153

154+
$this->ensureBeginSent();
155+
147156
try {
148157
$meta = $this->connection->run(
149158
$statement->getText(),
@@ -205,4 +214,21 @@ public function isFinished(): bool
205214
{
206215
return $this->state != TransactionState::ACTIVE;
207216
}
217+
218+
private function ensureBeginSent(): void
219+
{
220+
if ($this->isInstantTransaction || $this->beginSent) {
221+
return;
222+
}
223+
try {
224+
$this->connection->begin($this->database, $this->tsxConfig->getTimeout(), $this->bookmarkHolder, $this->tsxConfig->getMetaData());
225+
$this->beginSent = true;
226+
} catch (Throwable $e) {
227+
$this->state = TransactionState::TERMINATED;
228+
if ($this->pool !== null) {
229+
$this->pool->release($this->connection);
230+
}
231+
throw $e;
232+
}
233+
}
208234
}

src/Bolt/Connection.php

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
namespace Laudis\Neo4j\Bolt;
1515

1616
use Bolt\connection\IConnection;
17+
use Bolt\error\ConnectException as BoltConnectException;
18+
use Psr\Log\LoggerInterface;
19+
use Throwable;
1720

1821
class Connection
1922
{
@@ -23,6 +26,7 @@ class Connection
2326
public function __construct(
2427
private readonly IConnection $connection,
2528
private readonly string $ssl,
29+
private readonly ?LoggerInterface $logger = null,
2630
) {
2731
}
2832

@@ -38,7 +42,14 @@ public function write(string $buffer): void
3842

3943
public function read(int $length = 2048): string
4044
{
41-
return $this->connection->read($length);
45+
$data = $this->connection->read($length);
46+
47+
// Detect EOF - empty read from blocking socket indicates connection closed
48+
if ($data === '' && $length > 0) {
49+
throw new BoltConnectException('Connection closed by remote host');
50+
}
51+
52+
return $data;
4253
}
4354

4455
public function disconnect(): void
@@ -63,7 +74,14 @@ public function getTimeout(): float
6374

6475
public function setTimeout(float $timeout): void
6576
{
66-
$this->connection->setTimeout($timeout);
77+
try {
78+
$this->connection->setTimeout($timeout);
79+
} catch (Throwable $e) {
80+
$this->logger?->warning('Failed to set socket timeout on connection', [
81+
'timeout' => $timeout,
82+
'exception' => $e->getMessage(),
83+
]);
84+
}
6785
}
6886

6987
/**

src/Bolt/ConnectionPool.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public static function create(
5959
$uri,
6060
$auth,
6161
$conf->getUserAgent(),
62-
$conf->getSslConfiguration()
62+
$conf->getSslConfiguration(),
63+
$conf->getSocketTimeoutSecondsExplicit()
6364
),
6465
$conf->getLogger(),
6566
$conf->getAcquireConnectionTimeout()

0 commit comments

Comments
 (0)