Skip to content

Commit 63eccc3

Browse files
committed
fix(mesh-store): track and destroy all TCP sockets on shutdown
Track all accepted data server and coordinator server sockets in Sets, and the coordinator client-side socket, so shutdown() can destroy every live socket. Previously only peerConnections were destroyed, leaving unidentified data connections and coordinator sockets alive and preventing the Node.js event loop from exiting. Destroy sockets on all error paths (connectToCoordinator, connectToPeerData) so failed connections don't leak. Clear the connectToCoordinator timer on success to prevent timer handle leaks. Unref all root handles (dataServer, coordinatorServer, coordinatorSocket) immediately after init() so they function normally for I/O but don't keep the event loop alive — allowing pi to exit cleanly in print mode. Unref every tracked socket before destroying it in shutdown() and handoverCoordinator() to ensure libuv releases the handle promptly. Fix handoverCoordinator() to always clean up coordinator state (even with no successor) and destroy coordinator server sockets.
1 parent 8d8d7e1 commit 63eccc3

1 file changed

Lines changed: 65 additions & 11 deletions

File tree

src/core/mesh-store.ts

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ export class MeshStore implements CommsStore {
149149
string,
150150
{ socket: net.Socket; buffer: MessageBuffer }
151151
>();
152+
/** All sockets accepted by the data server — destroyed on shutdown. */
153+
private dataServerSockets = new Set<net.Socket>();
154+
/** All sockets accepted by the coordinator server — destroyed on shutdown. */
155+
private coordinatorServerSockets = new Set<net.Socket>();
156+
/** Socket connected to the coordinator (client side) — destroyed on shutdown. */
157+
private coordinatorSocket: net.Socket | undefined;
152158
private peerInfo = new Map<string, PeerInfo>();
153159
private staleCheckTimer: ReturnType<typeof setInterval> | undefined;
154160

@@ -169,6 +175,12 @@ export class MeshStore implements CommsStore {
169175
async init(): Promise<void> {
170176
await this.startDataServer();
171177
await this.tryJoinMesh();
178+
// Unref all root handles so the event loop can exit when pi shuts down.
179+
// Sockets/servers still function normally for I/O but don't keep the
180+
// process alive. shutdown() will destroy them explicitly.
181+
this.dataServer?.unref();
182+
this.coordinatorServer?.unref();
183+
this.coordinatorSocket?.unref();
172184
}
173185

174186
private startDataServer(): Promise<void> {
@@ -200,6 +212,7 @@ export class MeshStore implements CommsStore {
200212
const socket = net.createConnection(
201213
{ port: this.coordinatorPort, host: COORDINATOR_HOST },
202214
() => {
215+
this.coordinatorSocket = socket;
203216
const intro: MeshMessage = {
204217
method: "introduce",
205218
peerId: this.peerId,
@@ -220,6 +233,7 @@ export class MeshStore implements CommsStore {
220233
socket.on("error", () => {
221234
/* ignore late errors */
222235
});
236+
clearTimeout(timer);
223237
resolve();
224238
},
225239
);
@@ -231,6 +245,7 @@ export class MeshStore implements CommsStore {
231245

232246
socket.on("error", (err) => {
233247
clearTimeout(timer);
248+
socket.destroy();
234249
reject(err);
235250
});
236251
});
@@ -287,6 +302,9 @@ export class MeshStore implements CommsStore {
287302
// -----------------------------------------------------------------------
288303

289304
private handleCoordinatorConnection(socket: net.Socket): void {
305+
this.coordinatorServerSockets.add(socket);
306+
socket.on("close", () => this.coordinatorServerSockets.delete(socket));
307+
290308
const buffer = new MessageBuffer();
291309
socket.on("data", (data) => {
292310
const items = buffer.append(data.toString());
@@ -329,6 +347,9 @@ export class MeshStore implements CommsStore {
329347
// -----------------------------------------------------------------------
330348

331349
private handleDataConnection(socket: net.Socket): void {
350+
this.dataServerSockets.add(socket);
351+
socket.on("close", () => this.dataServerSockets.delete(socket));
352+
332353
const buffer = new MessageBuffer();
333354
let remotePeerId: string | undefined;
334355

@@ -481,6 +502,7 @@ export class MeshStore implements CommsStore {
481502
socket.on("close", () => this.peerConnections.delete(peer.id));
482503
socket.on("error", () => {
483504
this.peerConnections.delete(peer.id);
505+
socket.destroy();
484506
resolve();
485507
});
486508
});
@@ -1189,20 +1211,27 @@ export class MeshStore implements CommsStore {
11891211
}
11901212
}
11911213

1192-
if (!successor) return;
1193-
1194-
const peer = this.peerConnections.get(successor.id);
1195-
if (peer) {
1196-
const msg: MeshMessage = {
1197-
method: "become_coordinator",
1198-
peerList: [...this.peerInfo.values()].filter(
1199-
(p) => p.id !== this.peerId,
1200-
),
1201-
};
1202-
await writeAsync(peer.socket, encode(msg));
1214+
if (successor) {
1215+
const peer = this.peerConnections.get(successor.id);
1216+
if (peer) {
1217+
const msg: MeshMessage = {
1218+
method: "become_coordinator",
1219+
peerList: [...this.peerInfo.values()].filter(
1220+
(p) => p.id !== this.peerId,
1221+
),
1222+
};
1223+
await writeAsync(peer.socket, encode(msg));
1224+
}
12031225
}
12041226

1227+
// Always clean up coordinator state, even with no successor
12051228
this.stopStaleCheck();
1229+
for (const socket of this.coordinatorServerSockets) {
1230+
socket.unref();
1231+
socket.destroy();
1232+
}
1233+
this.coordinatorServerSockets.clear();
1234+
this.coordinatorServer?.unref();
12061235
this.coordinatorServer?.close();
12071236
this.coordinatorServer = undefined;
12081237
this.isCoordinator = false;
@@ -1224,14 +1253,39 @@ export class MeshStore implements CommsStore {
12241253

12251254
await this.handoverCoordinator();
12261255

1256+
// Destroy the coordinator client socket (not tracked in peerConnections)
1257+
this.coordinatorSocket?.unref();
1258+
this.coordinatorSocket?.destroy();
1259+
this.coordinatorSocket = undefined;
1260+
1261+
// Destroy all identified peer connections
12271262
for (const [, peer] of this.peerConnections) {
1263+
peer.socket.unref();
12281264
peer.socket.destroy();
12291265
}
12301266
this.peerConnections.clear();
12311267

1268+
// Destroy all data server accepted sockets (including unidentified)
1269+
for (const socket of this.dataServerSockets) {
1270+
socket.unref();
1271+
socket.destroy();
1272+
}
1273+
this.dataServerSockets.clear();
1274+
1275+
// Destroy all coordinator server accepted sockets
1276+
for (const socket of this.coordinatorServerSockets) {
1277+
socket.unref();
1278+
socket.destroy();
1279+
}
1280+
this.coordinatorServerSockets.clear();
1281+
12321282
this.stopStaleCheck();
1283+
1284+
// Close servers — stop accepting new connections
1285+
this.dataServer?.unref();
12331286
this.dataServer?.close();
12341287
this.dataServer = undefined;
1288+
this.coordinatorServer?.unref();
12351289
this.coordinatorServer?.close();
12361290
this.coordinatorServer = undefined;
12371291
}

0 commit comments

Comments
 (0)