@@ -62,6 +62,7 @@ export class PipelinedWriter extends Writer {
6262 private canWrite = true ;
6363 // reusable buffer for coalescing
6464 private readonly coalesceBuf : Buffer ;
65+ private readonly drainListener : ( ) => void ;
6566
6667 constructor (
6768 private readonly socket : net . Socket ,
@@ -72,11 +73,13 @@ export class PipelinedWriter extends Writer {
7273 super ( ) ;
7374 this . coalesceBuf = Buffer . allocUnsafe ( threshold ) ;
7475
75- // write queued items on drain event
76- socket . on ( 'drain' , ( ) => {
76+ this . drainListener = ( ) => {
7777 this . canWrite = true ;
7878 this . schedule ( ) ;
79- } ) ;
79+ }
80+
81+ // write queued items on drain event
82+ socket . on ( 'drain' , this . drainListener ) ;
8083 }
8184
8285 write ( message : ClientMessage , resolver : DeferredPromise < void > ) : void {
@@ -94,6 +97,7 @@ export class PipelinedWriter extends Writer {
9497 }
9598 this . error = this . makeIOError ( error ) ;
9699 this . canWrite = false ;
100+ this . socket . off ( 'drain' , this . drainListener ) ;
97101 // If we pass an error to destroy, an unhandled error will be thrown because we don't handle the error event
98102 // So we don't pass anything to the socket. It is internal anyway.
99103 this . socket . destroy ( ) ;
@@ -214,6 +218,8 @@ export class DirectWriter extends Writer {
214218 }
215219
216220 close ( cause : Error ) : void {
221+ // Remove all listeners from this emitter
222+ this . removeAllListeners ( 'write' ) ;
217223 this . socket . destroy ( ) ;
218224 }
219225}
@@ -351,6 +357,8 @@ export class Connection {
351357 private readonly writer : Writer ;
352358 private readonly reader : ClientMessageReader ;
353359 private readonly fragmentedMessageHandler : FragmentedClientMessageHandler ;
360+ private dataListener : ( ( buffer : Buffer ) => void ) | null = null ;
361+ private writeListener : ( ( ) => void ) | null = null ;
354362
355363 constructor (
356364 private readonly connectionManager : ConnectionManager ,
@@ -374,9 +382,10 @@ export class Connection {
374382 this . connectedServerVersion = BuildInfo . UNKNOWN_VERSION_ID ;
375383 this . writer = enablePipelining ? new PipelinedWriter ( this . socket , pipeliningThreshold , this . incrementBytesWrittenFn )
376384 : new DirectWriter ( this . socket , this . incrementBytesWrittenFn ) ;
377- this . writer . on ( 'write' , ( ) => {
385+ this . writeListener = ( ) => {
378386 this . lastWriteTimeMillis = Date . now ( ) ;
379- } ) ;
387+ }
388+ this . writer . on ( 'write' , this . writeListener ) ;
380389 this . reader = new ClientMessageReader ( ) ;
381390 this . fragmentedMessageHandler = new FragmentedClientMessageHandler ( this . logger ) ;
382391 }
@@ -437,11 +446,28 @@ export class Connection {
437446
438447 this . logClose ( ) ;
439448
449+ // Remove socket listeners before closing
450+ this . removeListeners ( ) ;
451+
440452 this . writer . close ( this . closedCause ? this . closedCause : new Error ( reason ? reason : 'Connection closed' ) ) ;
441453
442454 this . connectionManager . onConnectionClose ( this ) ;
443455 }
444456
457+ /**
458+ * Removes all registered listeners from the socket and writer.
459+ */
460+ private removeListeners ( ) : void {
461+ if ( this . dataListener !== null ) {
462+ this . socket . off ( 'data' , this . dataListener ) ;
463+ this . dataListener = null ;
464+ }
465+ if ( this . writeListener !== null ) {
466+ this . writer . off ( 'write' , this . writeListener ) ;
467+ this . writeListener = null ;
468+ }
469+ }
470+
445471 isAlive ( ) : boolean {
446472 return this . closedTime === 0 ;
447473 }
@@ -483,7 +509,7 @@ export class Connection {
483509 * @param callback
484510 */
485511 registerResponseCallback ( callback : ClientMessageHandler ) : void {
486- this . socket . on ( 'data' , ( buffer : Buffer ) => {
512+ this . dataListener = ( buffer : Buffer ) => {
487513 this . lastReadTimeMillis = Date . now ( ) ;
488514 this . reader . append ( buffer ) ;
489515 let clientMessage = this . reader . read ( ) ;
@@ -496,14 +522,15 @@ export class Connection {
496522 clientMessage = this . reader . read ( ) ;
497523 }
498524 this . incrementBytesReadFn ( buffer . length ) ;
499- } ) ;
525+ }
526+ this . socket . on ( 'data' , this . dataListener ) ;
500527 }
501528
502529 setClusterUuid ( uuid : UUID ) : void {
503530 this . clusterUuid = uuid ;
504531 }
505532
506- getClusterUuid ( ) : UUID {
533+ getClusterUuid ( ) : UUID {
507534 return this . clusterUuid ;
508535 }
509536
0 commit comments