Skip to content

Commit b5d5a69

Browse files
authored
fix: peer disconnection (#296)
2 parents 787588b + 1058623 commit b5d5a69

11 files changed

Lines changed: 300 additions & 115 deletions

File tree

package.json

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"type": "module",
33
"name": "@btc-vision/opnet-node",
4-
"version": "1.0.4",
4+
"version": "1.0.5",
55
"description": "Bitcoin Smart Contracts Node",
66
"main": "build/index.js",
77
"scripts": {
@@ -40,7 +40,8 @@
4040
"typescript-eslint": {
4141
"typescript": "$typescript"
4242
},
43-
"jsonpath-plus": "^10.4.0"
43+
"jsonpath-plus": "^10.4.0",
44+
"interface-datastore": "^10.0.1"
4445
},
4546
"dependencies": {
4647
"@asyncapi/generator": "^3.2.1",
@@ -64,25 +65,25 @@
6465
"@chainsafe/libp2p-yamux": "^8.0.1",
6566
"@datastructures-js/priority-queue": "^6.3.5",
6667
"@eslint/js": "^10.0.1",
67-
"@libp2p/autonat-v2": "^2.0.18",
68-
"@libp2p/bootstrap": "^12.0.20",
69-
"@libp2p/crypto": "^5.1.17",
70-
"@libp2p/identify": "^4.1.3",
68+
"@libp2p/autonat-v2": "^2.0.19",
69+
"@libp2p/bootstrap": "^12.0.21",
70+
"@libp2p/crypto": "^5.1.18",
71+
"@libp2p/identify": "^4.1.4",
7172
"@libp2p/interface": "^3.2.2",
7273
"@libp2p/interface-transport": "^4.0.3",
73-
"@libp2p/kad-dht": "^16.2.4",
74-
"@libp2p/logger": "^6.2.6",
75-
"@libp2p/mdns": "^12.0.20",
76-
"@libp2p/peer-id": "^6.0.8",
77-
"@libp2p/peer-store": "^12.0.18",
78-
"@libp2p/ping": "^3.1.3",
79-
"@libp2p/tcp": "^11.0.18",
80-
"@libp2p/upnp-nat": "^4.0.18",
81-
"@libp2p/websockets": "^10.1.11",
74+
"@libp2p/kad-dht": "^16.2.5",
75+
"@libp2p/logger": "^6.2.7",
76+
"@libp2p/mdns": "^12.0.21",
77+
"@libp2p/peer-id": "^6.0.9",
78+
"@libp2p/peer-store": "^12.0.19",
79+
"@libp2p/ping": "^3.1.4",
80+
"@libp2p/tcp": "^11.0.19",
81+
"@libp2p/upnp-nat": "^4.0.19",
82+
"@libp2p/websockets": "^10.1.12",
8283
"@multiformats/multiaddr": "^13.0.1",
8384
"@noble/curves": "^2.2.0",
8485
"@noble/secp256k1": "^3.1.0",
85-
"@typescript/native-preview": "^7.0.0-dev.20260506.1",
86+
"@typescript/native-preview": "^7.0.0-dev.20260511.1",
8687
"asyncapi-validator": "^5.1.1",
8788
"bignumber.js": "^11.1.1",
8889
"bip174": "^3.0.0",
@@ -91,43 +92,43 @@
9192
"bytenode": "^1.5.7",
9293
"chalk": "^5.6.2",
9394
"cors": "^2.8.6",
94-
"datastore-level": "^12.0.4",
95+
"datastore-level": "^13.0.1",
9596
"eslint-plugin-openapi": "^0.0.4",
9697
"express": "^5.2.1",
9798
"figlet": "^1.11.0",
98-
"interface-datastore": "^9.0.3",
99-
"it-length-prefixed-stream": "^2.0.6",
100-
"libp2p": "^3.2.4",
99+
"interface-datastore": "^10.0.1",
100+
"it-length-prefixed-stream": "^3.0.0",
101+
"libp2p": "^3.3.0",
101102
"long": "^5.3.2",
102103
"lru-cache": "^11.3.6",
103104
"mongodb": "^7.2.0",
104-
"multiformats": "^13.4.2",
105+
"multiformats": "^14.0.0",
105106
"openapi-comment-parser": "^1.0.0",
106-
"protobufjs": "^8.0.3",
107+
"protobufjs": "^8.2.0",
107108
"sodium-native": "^5.1.0",
108109
"ssh2": "^1.17.0",
109110
"swagger-ui-express": "^5.0.1",
110111
"tiny-secp256k1": "^2.2.4",
111112
"toml": "^4.1.1",
112113
"typescript": "^6.0.3",
113-
"typescript-eslint": "^8.59.2",
114-
"uint8arraylist": "^2.4.9"
114+
"typescript-eslint": "^8.59.3",
115+
"uint8arraylist": "^3.0.2"
115116
},
116117
"devDependencies": {
117118
"@types/cors": "^2.8.19",
118-
"@types/node": "^25.6.0",
119+
"@types/node": "^25.7.0",
119120
"@types/semver": "^7.7.1",
120121
"@types/sodium-native": "^2.3.9",
121122
"@types/ssh2": "^1.15.5",
122123
"@types/swagger-ui-express": "^4.1.8",
123-
"@vitest/coverage-v8": "^4.1.5",
124+
"@vitest/coverage-v8": "^4.1.6",
124125
"eslint": "^10.3.0",
125126
"gulp": "^5.0.1",
126127
"gulp-cached": "^1.1.1",
127128
"gulp-clean": "^0.4.0",
128129
"gulp-eslint-new": "^2.6.2",
129130
"patch-package": "^8.0.1",
130131
"prettier": "^3.8.3",
131-
"vitest": "^4.1.5"
132+
"vitest": "^4.1.6"
132133
}
133134
}

patches/@libp2p+utils+7.1.0.patch

Lines changed: 0 additions & 49 deletions
This file was deleted.

src/src/blockchain-indexer/rpc/thread/RPCSubWorkerManager.ts

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,25 @@ export class RPCSubWorkerManager extends Logger {
4444
}
4545

4646
private requestToWorker(data: string): void {
47-
const worker = this.getWorker();
48-
worker.send(data);
47+
const total = this.workers.length;
48+
for (let attempt = 0; attempt < total; attempt++) {
49+
const worker = this.getWorker();
50+
if (!worker.connected || worker.killed || worker.exitCode !== null) continue;
51+
52+
try {
53+
worker.send(data);
54+
return;
55+
} catch (e) {
56+
const code = (e as NodeJS.ErrnoException).code;
57+
if (code === 'ERR_IPC_CHANNEL_CLOSED' || code === 'ERR_IPC_DISCONNECTED') {
58+
this.error(`RPC sub-worker pid=${worker.pid} send failed (${code}).`);
59+
continue;
60+
}
61+
throw e;
62+
}
63+
}
64+
65+
this.error('All RPC sub-workers are unavailable; dropping request.');
4966
}
5067

5168
private createTaskId(): string {
@@ -79,9 +96,22 @@ export class RPCSubWorkerManager extends Logger {
7996
});
8097

8198
worker.on('exit', (code: number) => {
82-
this.error(`Worker exited with code ${code}`);
99+
this.error(`Worker pid=${worker.pid} exited with code ${code}; respawning.`);
100+
this.replaceWorker(worker);
83101
});
84102

85103
return worker;
86104
}
105+
106+
private replaceWorker(dead: ChildProcess): void {
107+
const idx = this.workers.indexOf(dead);
108+
if (idx === -1) return;
109+
110+
try {
111+
const fresh = this.createWorker();
112+
this.workers[idx] = fresh;
113+
} catch (e) {
114+
this.error(`Failed to respawn RPC sub-worker: ${(e as Error).stack}`);
115+
}
116+
}
87117
}

src/src/poc/PoC.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export class PoC extends Logger {
5252
): Promise<ThreadData> {
5353
switch (m.type) {
5454
case MessageType.BLOCK_PROCESSED: {
55-
return await this.onBlockProcessed(m as BlockProcessedMessage);
55+
return this.onBlockProcessed(m as BlockProcessedMessage);
5656
}
5757
case MessageType.RPC_METHOD: {
5858
return await this.handleRPCMessage(m as RPCMessage<BitcoinRPCThreadMessageType>);
@@ -107,11 +107,8 @@ export class PoC extends Logger {
107107
}
108108

109109
private async onBlockProcessed(m: BlockProcessedMessage): Promise<ThreadData> {
110-
// Wait for previous block to finish so height + proof are always in order.
111-
// Use catch so a failed broadcast doesn't permanently jam the lock.
112110
await this.blockProcessedLock.catch(() => {});
113111

114-
// Broadcast height to ALL witness instances
115112
this.blockProcessedLock = this.sendMessageToAllThreads(ThreadTypes.WITNESS, {
116113
type: MessageType.WITNESS_HEIGHT_UPDATE,
117114
data: { blockNumber: m.data.blockNumber },
@@ -123,15 +120,13 @@ export class PoC extends Logger {
123120
this.error(`Failed to broadcast height update: ${(e as Error).stack}`);
124121
}
125122

126-
// Round-robin proof generation to ONE witness instance
127123
void this.sendMessageToThread(ThreadTypes.WITNESS, {
128124
type: MessageType.WITNESS_BLOCK_PROCESSED,
129125
data: m.data,
130126
}).catch((e: unknown) => {
131127
this.error(`Failed to dispatch WITNESS_BLOCK_PROCESSED: ${(e as Error).stack}`);
132128
});
133129

134-
// Update consensus height on this thread
135130
this.p2p.updateConsensusHeight(m.data.blockNumber);
136131

137132
return {};

src/src/poc/networking/P2PManager.ts

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import { tcp } from '@libp2p/tcp';
2323
import { uPnPNAT } from '@libp2p/upnp-nat';
2424
import { multiaddr, Multiaddr } from '@multiformats/multiaddr';
2525
import figlet, { FontName } from 'figlet';
26-
import type { Datastore } from 'interface-datastore';
2726
import { createLibp2p, ServiceFactoryMap } from 'libp2p';
2827
import { BtcIndexerConfig } from '../../config/BtcIndexerConfig.js';
2928
import { DBManagerInstance } from '../../db/DBManager.js';
@@ -87,6 +86,7 @@ import {
8786
isPrivateOrLoopbackAddress,
8887
} from './AddressExtractor.js';
8988
import { Components } from 'libp2p/src/components.js';
89+
import { Datastore } from 'interface-datastore';
9090

9191
if (Config.P2P.ENABLE_P2P_LOGGING) {
9292
enable('libp2p:*');
@@ -539,29 +539,99 @@ export class P2PManager extends Logger {
539539
}
540540

541541
private async broadcastMempoolTransaction(transaction: ITransactionPacket): Promise<number> {
542+
const PEER_BROADCAST_TIMEOUT_MS = 8_000;
543+
542544
const broadcastPromises: Promise<void>[] = [];
543-
for (const peer of this.peers.values()) {
545+
for (const [peerIdStr, peer] of this.peers.entries()) {
544546
if (!peer.isAuthenticated) continue;
545547

546-
broadcastPromises.push(peer.broadcastMempoolTransaction(transaction));
548+
broadcastPromises.push(
549+
this.runPeerOp(
550+
peerIdStr,
551+
PEER_BROADCAST_TIMEOUT_MS,
552+
'Mempool broadcast',
553+
peer.broadcastMempoolTransaction(transaction),
554+
),
555+
);
547556
}
548557

549558
await Promise.safeAll(broadcastPromises);
550559

551560
return broadcastPromises.length;
552561
}
553562

563+
/**
564+
* Wrap a per-peer fan-out op with a hard timeout and full error isolation.
565+
* Every fan-out path through libp2p (mempool gossip, witness broadcast,
566+
* witness sync request) shares the same hazard: a single slow / dead /
567+
* misbehaving peer would otherwise pin `Promise.safeAll` until the upstream
568+
* 240s thread timeout fires and starts cascading. Always resolves; never
569+
* rejects, so callers can treat the batch as best-effort.
570+
*/
571+
private runPeerOp(
572+
peerIdStr: string,
573+
timeoutMs: number,
574+
opName: string,
575+
op: Promise<unknown>,
576+
): Promise<void> {
577+
return new Promise<void>((resolve) => {
578+
let settled = false;
579+
const timer = setTimeout(() => {
580+
if (settled) return;
581+
settled = true;
582+
583+
if (Config.DEBUG_LEVEL >= DebugLevel.DEBUG) {
584+
this.warn(
585+
`${opName} to peer ${peerIdStr} timed out after ${timeoutMs}ms; skipping.`,
586+
);
587+
}
588+
589+
resolve();
590+
}, timeoutMs);
591+
592+
op.then(
593+
() => {
594+
if (settled) return;
595+
settled = true;
596+
clearTimeout(timer);
597+
resolve();
598+
},
599+
(err: unknown) => {
600+
if (settled) return;
601+
settled = true;
602+
clearTimeout(timer);
603+
604+
if (Config.DEV_MODE) {
605+
const details = err instanceof Error ? err.message : String(err);
606+
this.warn(`${opName} to peer ${peerIdStr} failed: ${details}`);
607+
}
608+
609+
resolve();
610+
},
611+
);
612+
});
613+
}
614+
554615
private async requestBlockWitnessesFromPeer(blockNumber: bigint): Promise<void> {
616+
const PEER_WITNESS_REQUEST_TIMEOUT_MS = 8_000;
617+
555618
const promises: Promise<void>[] = [];
556-
for (const [_peerId, peer] of this.peers) {
619+
for (const [peerIdStr, peer] of this.peers.entries()) {
557620
if (!peer.isAuthenticated) continue;
558621

559622
// We skip asking proofs to light nodes, this is in TODO.
560623
// TODO: Handle correct proof validations for light nodes.
561624
const peerMode = peer.peerMode();
562625
if (peerMode === undefined || peerMode === OPNetIndexerMode.LIGHT) continue;
563626

564-
promises.push(peer.requestBlockWitnessesFromPeer(blockNumber));
627+
promises.push(
628+
this.runPeerOp(
629+
peerIdStr,
630+
PEER_WITNESS_REQUEST_TIMEOUT_MS,
631+
'Witness sync request',
632+
peer.requestBlockWitnessesFromPeer(blockNumber),
633+
),
634+
);
565635
}
566636

567637
await Promise.safeAll(promises);
@@ -599,12 +669,19 @@ export class P2PManager extends Logger {
599669
return;
600670
}
601671

602-
// send to all peers
672+
const PEER_WITNESS_TIMEOUT_MS = 8_000;
603673
const promises: Promise<void>[] = [];
604-
for (const [_peerId, peer] of this.peers) {
674+
for (const [peerIdStr, peer] of this.peers.entries()) {
605675
if (!peer.isAuthenticated) continue;
606676

607-
promises.push(peer.sendFromServer(generatedWitness));
677+
promises.push(
678+
this.runPeerOp(
679+
peerIdStr,
680+
PEER_WITNESS_TIMEOUT_MS,
681+
'Witness broadcast',
682+
peer.sendFromServer(generatedWitness),
683+
),
684+
);
608685
}
609686

610687
await Promise.safeAll(promises);

0 commit comments

Comments
 (0)